-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgen_token.py
117 lines (85 loc) · 3.98 KB
/
gen_token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import datetime
import logging
import time
from typing import Union
import jwt
import requests
import ujson
import db
import utils
from constants import github_app_id
from utils import plural
def generate_jwt(min_time: int) -> str:
current_time = time.time()
cached_jwt, time_remaining = get_cached_token('_jwt', min_time)
if cached_jwt:
log.info(f"Reused JWT with {round(time_remaining / 60, 1)} mins remaining")
return cached_jwt[0]
with open('celestetas-improvements-tracker.2022-05-01.private-key.pem', 'rb') as pem_file:
private = pem_file.read()
payload = {'iat': round(current_time - 60),
'exp': round(current_time + (9.5 * 60)),
'iss': github_app_id}
generated_jwt = jwt.encode(payload, private, algorithm='RS256')
log.info("Generated JWT")
set_token_cache('_jwt', [generated_jwt, payload['exp']])
return generated_jwt
def generate_access_token(installation_owner: str, min_jwt_time: int) -> tuple:
headers = {'Authorization': f'Bearer {generate_jwt(min_jwt_time)}', 'Accept': 'application/vnd.github.v3+json'}
installations_saved = {}
try:
installations_saved[installation_owner] = db.installations.get(installation_owner, consistent_read=False)
except db.DBKeyError:
log.info(f"Installation ID not cached for owner \"{installation_owner}\"")
r = requests.get('https://api.github.com/app/installations', headers=headers, timeout=30)
utils.handle_potential_request_error(r, 200)
installations = ujson.loads(r.content)
log.info(f"Found {len(installations)} installation{plural(installations)}: {[(i['id'], i['account']['login'], i['created_at']) for i in installations]}")
for installation in installations:
installations_saved[installation['account']['login']] = installation['id']
db.installations.set(installation['account']['login'], installation['id'])
if installation_owner in installations_saved:
installation_id = installations_saved[installation_owner]
else:
raise InstallationOwnerMissingError(installation_owner)
r = requests.post(f'https://api.github.com/app/installations/{installation_id}/access_tokens', headers=headers, timeout=30)
utils.handle_potential_request_error(r, 201)
access_token_data = ujson.loads(r.content)
token_expiration_str = access_token_data['expires_at'][:-1]
token_expiration = datetime.datetime.fromisoformat(f'{token_expiration_str}+00:00')
log.info(f"Generated {installation_owner} access token: {access_token_data}")
return access_token_data['token'], token_expiration.timestamp()
def access_token(installation_owner: str, min_time: int):
token, time_remaining = get_cached_token(installation_owner, min_time)
if not token:
token = generate_access_token(installation_owner, min_time)
set_token_cache(installation_owner, token)
else:
log.info(f"Reusing {installation_owner} access token with {round(time_remaining / 60, 1)} mins remaining")
return token[0]
def get_cached_token(key: str, min_time: int) -> tuple[list, int]:
if key in tokens_local:
time_remaining = tokens_local[key][1] - time.time()
if time_remaining > min_time:
return tokens_local[key], time_remaining
try:
token = db.tokens.get(key, consistent_read=False)
token[1] = int(token[1])
time_remaining = token[1] - time.time()
if time_remaining > min_time:
tokens_local[key] = token
return token, time_remaining
except db.DBKeyError:
pass
return [], 0
def set_token_cache(key: str, token: Union[tuple, list]):
token = list(token)
token[1] = round(token[1])
tokens_local[key] = token
db.tokens.set(key, token)
class InstallationOwnerMissingError(Exception):
pass
tokens_local = {}
log: Union[logging.Logger, utils.LogPlaceholder] = utils.LogPlaceholder()
if __name__ == '__main__':
generate_access_token('Kataiser', 30)