forked from watson-developer-cloud/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathiam_token_manager.py
More file actions
148 lines (134 loc) · 5.37 KB
/
iam_token_manager.py
File metadata and controls
148 lines (134 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import requests
import time
DEFAULT_IAM_URL = 'https://iam.bluemix.net/identity/token'
CONTENT_TYPE = 'application/x-www-form-urlencoded'
ACCEPT = 'application/json'
DEFAULT_AUTHORIZATION = 'Basic Yng6Yng='
REQUEST_TOKEN_GRANT_TYPE = 'urn:ibm:params:oauth:grant-type:apikey'
REQUEST_TOKEN_RESPONSE_TYPE = 'cloud_iam'
REFRESH_TOKEN_GRANT_TYPE = 'refresh_token'
class IAMTokenManager(object):
def __init__(self, iam_api_key=None, iam_access_token=None, iam_url=None):
self.iam_api_key = iam_api_key
self.user_access_token = iam_access_token
self.iam_url = iam_url if iam_url else DEFAULT_IAM_URL
self.token_info = {
'access_token': None,
'refresh_token': None,
'token_type': None,
'expires_in': None,
'expiration': None,
}
def request(self, method, url, headers=None, params=None, data=None, **kwargs):
response = requests.request(method=method, url=url,
headers=headers, params=params,
data=data, **kwargs)
if 200 <= response.status_code <= 299:
return response.json()
else:
from .watson_service import WatsonApiException, get_error_message
error_message = get_error_message(response)
raise WatsonApiException(response.status_code, message=error_message, httpResponse=response)
def get_token(self):
"""
The source of the token is determined by the following logic:
1. If user provides their own managed access token, assume it is valid and send it
2. If this class is managing tokens and does not yet have one, make a request for one
3. If this class is managing tokens and the token has expired refresh it. In case the refresh token is expired, get a new one
If this class is managing tokens and has a valid token stored, send it
"""
if self.user_access_token:
return self.user_access_token
elif not self.token_info.get('access_token'):
token_info = self._request_token()
self._save_token_info(token_info)
return self.token_info.get('access_token')
elif self._is_token_expired():
if self._is_refresh_token_expired():
token_info = self._request_token()
else:
token_info = self._refresh_token()
self._save_token_info(token_info)
return self.token_info.get('access_token')
else:
return self.token_info.get('access_token')
def _request_token(self):
"""
Request an IAM token using an API key
"""
headers = {
'Content-type': CONTENT_TYPE,
'Authorization': DEFAULT_AUTHORIZATION,
'accept': ACCEPT
}
data = {
'grant_type': REQUEST_TOKEN_GRANT_TYPE,
'apikey': self.iam_api_key,
'response_type': REQUEST_TOKEN_RESPONSE_TYPE
}
response = self.request(
method='POST',
url=self.iam_url,
headers=headers,
data=data)
return response
def _refresh_token(self):
"""
Refresh an IAM token using a refresh token
"""
headers = {
'Content-type': CONTENT_TYPE,
'Authorization': DEFAULT_AUTHORIZATION,
'accept': ACCEPT
}
data = {
'grant_type': REFRESH_TOKEN_GRANT_TYPE,
'refresh_token': self.token_info.get('refresh_token')
}
response = self.request(
method='POST',
url=self.iam_url,
headers=headers,
data=data)
return response
def set_access_token(self, iam_access_token):
"""
Set a self-managed IAM access token.
The access token should be valid and not yet expired.
"""
self.user_access_token = iam_access_token
def set_iam_api_key(self, iam_api_key):
"""
Set the IAM api key
"""
self.iam_api_key = iam_api_key
def _is_token_expired(self):
"""
Check if currently stored token is expired.
Using a buffer to prevent the edge case of the
oken expiring before the request could be made.
The buffer will be a fraction of the total TTL. Using 80%.
"""
fraction_of_ttl = 0.8
time_to_live = self.token_info.get('expires_in')
expire_time = self.token_info.get('expiration')
refresh_time = expire_time - (time_to_live * (1.0 - fraction_of_ttl))
current_time = int(time.time())
return refresh_time < current_time
def _is_refresh_token_expired(self):
"""
Used as a fail-safe to prevent the condition of a refresh token expiring,
which could happen after around 30 days. This function will return true
if it has been at least 7 days and 1 hour since the last token was set
"""
if self.token_info.get('expiration') is None:
return True
seven_days = 7 * 24 * 3600
current_time = int(time.time())
new_token_time = self.token_info.get('expiration') + seven_days
return new_token_time < current_time
def _save_token_info(self, token_info):
"""
Save the response from the IAM service request to the object's state.
"""
self.token_info = token_info