forked from Kyria/EsiPy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsecurity.py
318 lines (265 loc) · 11.3 KB
/
security.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# -*- encoding: utf-8 -*-
""" EsiPy Security definition where everything related to
SSO auth is defined """
from __future__ import absolute_import
import base64
import logging
import time
import warnings
from requests import Session
from requests.utils import quote
from six.moves.urllib.parse import urlparse
from .events import AFTER_TOKEN_REFRESH
from .exceptions import APIException
LOGGER = logging.getLogger(__name__)
class EsiSecurity(object):
""" Contains all the OAuth2 knowledge for ESI use.
Based on pyswagger Security object, to be used with pyswagger BaseClient
implementation.
"""
def __init__(
self,
redirect_uri,
client_id,
secret_key,
**kwargs):
""" Init the ESI Security Object
:param redirect_uri: the uri to redirect the user after login into SSO
:param client_id: the OAuth2 client ID
:param secret_key: the OAuth2 secret key
:param sso_url: the default sso URL used when no "app" is provided
:param esi_url: the default esi URL used for verify endpoint
:param app: (optionnal) the pyswagger app object
:param security_name: (optionnal) the name of the object holding the
informations in the securityDefinitions, used to check authed endpoint
:param esi_datasource: (optional) The ESI datasource used to validate
SSO authentication. Defaults to tranquility
:param headers: (optional) additional headers to add to the requests
done here
"""
app = kwargs.pop('app', None)
sso_url = kwargs.pop('sso_url', "https://login.eveonline.com")
esi_url = kwargs.pop('esi_url', "https://esi.evetech.net")
esi_datasource = kwargs.pop('esi_datasource', "tranquility")
self.security_name = kwargs.pop('security_name', 'evesso')
self.redirect_uri = redirect_uri
self.client_id = client_id
self.secret_key = secret_key
# we provide app object, so we don't use sso_url
if app is not None:
# check if the security_name exists in the securityDefinition
security = app.root.securityDefinitions.get(
self.security_name,
None
)
if security is None:
raise NameError(
"%s is not defined in the securityDefinitions" %
self.security_name
)
self.oauth_authorize = security.authorizationUrl
# some URL we still need to "manually" define... sadly
# we parse the authUrl so we don't care if it's TQ or SISI.
# https://github.com/ccpgames/esi-issues/issues/92
parsed_uri = urlparse(security.authorizationUrl)
self.oauth_token = '%s://%s/oauth/token' % (
parsed_uri.scheme,
parsed_uri.netloc
)
# no app object is provided, so we use direct URLs
else:
if sso_url is None or sso_url == "":
raise AttributeError("sso_url cannot be None or empty "
"without app parameter")
self.oauth_authorize = '%s/oauth/authorize' % sso_url
self.oauth_token = '%s/oauth/token' % sso_url
# use ESI url for verify, since it's better for caching
if esi_url is None or esi_url == "":
raise AttributeError("esi_url cannot be None or empty")
self.oauth_verify = '%s/verify/?datasource=%s' % (
esi_url,
esi_datasource
)
# session request stuff
self._session = Session()
headers = kwargs.pop('headers', {})
if 'User-Agent' not in headers:
warning_message = (
"Defining a 'User-Agent' header is a"
" good practice, and allows CCP to contact you if required."
" To do this, simply add the following when creating"
" the client: headers={'User-Agent':'something'}."
)
LOGGER.warning(warning_message)
warnings.warn(warning_message)
self._session.headers.update({
'User-Agent': (
'EsiPy/Security/ - '
'https://github.com/Kyria/EsiPy - '
'ClientID: %s' % self.client_id
)
})
self._session.headers.update({"Accept": "application/json"})
self._session.headers.update(headers)
# token data
self.refresh_token = None
self.access_token = None
self.token_expiry = None
def __get_token_auth_header(self):
""" Return the Basic Authorization header required to get the tokens
:return: a dict with the headers
"""
# encode/decode for py2/py3 compatibility
auth_b64 = "%s:%s" % (self.client_id, self.secret_key)
auth_b64 = base64.b64encode(auth_b64.encode('latin-1'))
auth_b64 = auth_b64.decode('latin-1')
return {'Authorization': 'Basic %s' % auth_b64}
def __get_oauth_header(self):
""" Return the Bearer Authorization header required in oauth calls
:return: a dict with the authorization header
"""
return {'Authorization': 'Bearer %s' % self.access_token}
def __make_token_request_parameters(self, params):
""" Return the token uri from the securityDefinition
:param params: the data given to the request
:return: the oauth/token uri
"""
request_params = {
'headers': self.__get_token_auth_header(),
'data': params,
'url': self.oauth_token,
}
return request_params
def get_auth_uri(self, scopes=None, state=None, implicit=False):
""" Constructs the full auth uri and returns it.
:param scopes: The list of scope
:param state: The state to pass through the auth process
:return: the authorizationUrl with the correct parameters.
"""
scopes_list = [] if not scopes else scopes
response_type = 'code' if not implicit else 'token'
return '%s?response_type=%s&redirect_uri=%s&client_id=%s%s%s' % (
self.oauth_authorize,
response_type,
quote(self.redirect_uri, safe=''),
self.client_id,
'&scope=%s' % '+'.join(scopes_list) if scopes else '',
'&state=%s' % state if state else ''
)
def get_access_token_params(self, code):
""" Return the param object for the post() call to get the access_token
from the auth process (using the code)
:param code: the code get from the authentification process
:return: a dict with the url, params and header
"""
return self.__make_token_request_parameters(
{
'grant_type': 'authorization_code',
'code': code,
}
)
def get_refresh_token_params(self):
""" Return the param object for the post() call to get the access_token
from the refresh_token
:param code: the refresh token
:return: a dict with the url, params and header
"""
if self.refresh_token is None:
raise AttributeError('No refresh token is defined.')
return self.__make_token_request_parameters(
{
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
}
)
def update_token(self, response_json):
""" Update access_token, refresh_token and token_expiry from the
response body.
The response must be converted to a json object before being passed as
a parameter
:param response_json: the response body to use.
"""
self.access_token = response_json['access_token']
self.token_expiry = int(time.time()) + response_json['expires_in']
if 'refresh_token' in response_json:
self.refresh_token = response_json['refresh_token']
def is_token_expired(self, offset=0):
""" Return true if the token is expired.
The offset can be used to change the expiry time:
- positive value decrease the time (sooner)
- negative value increase the time (later)
If the expiry is not set, always return True. This case allow the users
to define a security object, only knowing the refresh_token and get
a new access_token / expiry_time without errors.
:param offset: the expiry offset (in seconds) [default: 0]
:return: boolean true if expired, else false.
"""
if self.token_expiry is None:
return True
return int(time.time()) >= (self.token_expiry - offset)
def refresh(self):
""" Update the auth data (tokens) using the refresh token in auth.
"""
request_data = self.get_refresh_token_params()
res = self._session.post(**request_data)
if res.status_code != 200:
raise APIException(
request_data['url'],
res.status_code,
res.json()
)
json_res = res.json()
self.update_token(json_res)
return json_res
def auth(self, code):
""" Request the token to the /oauth/token endpoint.
Update the security tokens.
:param code: the code you get from the auth process
"""
request_data = self.get_access_token_params(code)
res = self._session.post(**request_data)
if res.status_code != 200:
raise APIException(
request_data['url'],
res.status_code,
res.json()
)
json_res = res.json()
self.update_token(json_res)
return json_res
def verify(self):
""" Make a get call to the oauth/verify endpoint to get the user data
:return: the json with the data.
"""
res = self._session.get(
self.oauth_verify,
headers=self.__get_oauth_header()
)
if res.status_code != 200:
raise APIException(
self.oauth_verify,
res.status_code,
res.json()
)
return res.json()
def __call__(self, request):
""" Check if the request need security header and apply them.
Required for pyswagger.core.BaseClient.request().
:param request: the pyswagger request object to check
:return: the updated request.
"""
if not request._security:
return request
if self.is_token_expired():
json_response = self.refresh()
AFTER_TOKEN_REFRESH.send(**json_response)
for security in request._security:
if self.security_name not in security:
LOGGER.warning(
"Missing Securities: [%s]",
", ".join(security.keys())
)
continue
if self.access_token is not None:
request._p['header'].update(self.__get_oauth_header())
return request