-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcheck_abandoned_records.py
161 lines (138 loc) · 6.02 KB
/
check_abandoned_records.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import json
import boto3
import dns.resolver
import gitlab
from datetime import datetime
# Load configuration from the EC2 Parameter Store
# Transforms: /red-x/gitlab/token
# Into: {'red-x': {'gitlab': {'token': 'value'}}}
def load_config(ssmPath):
ssm = boto3.client('ssm')
resp = ssm.get_parameters_by_path(
Path = ssmPath,
Recursive=True,
WithDecryption=True
)
config = {}
for param in resp['Parameters']:
path = param['Name'].split('/')
current_level = config
for level in path:
if(level == '' or level == 'red-x'):
continue
if(level not in current_level):
current_level[level] = {}
if(level == path[-1]):
current_level[level] = param['Value']
else:
current_level = current_level[level]
return config
# Open or close GitLab issues based on delegation errors discovered by red-x.
# Opens an issue in the configured project for delegation errors and closes
# any open issues when it no longer identifies that error.
def notify_gitlab_issues(config, errors):
# Load up all open issues in the configured project with label 'red-x'.
gl = gitlab.Gitlab(config['gitlab']['endpoint'], config['gitlab']['token'], api_version=4)
project = gl.projects.get(config['gitlab']['project'])
issues = project.issues.list(labels=['red-x', 'record'], state='opened')
zones_with_issues = [i.title for i in issues]
for error in errors:
# This error already has an issue
if f"{error} abandoned record" in zones_with_issues:
print(f"ALREADY FILED! {error}! Skipping")
zones_with_issues.remove(f"{error} abandoned record")
# This error needs a new issue created
else:
error_json = json.dumps(errors[error], indent=1)
print(f"FILING: {error}!")
issue = project.issues.create({'title': f"{error} abandoned record",
'description': f"""```
{error_json}
```""",
'labels': ['red-x', 'record']})
# These issues no longer have a delegation error associated with them
# and can be closed.
for leftover in zones_with_issues:
print(f"CLOSING ISSUE: {leftover}")
issue = [x for x in issues if x.title == leftover][0]
issue.notes.create({"body": "Subsequent runs of red-x no longer see this domain as an issue. Automatically closing ticket."})
issue.state_event = "close"
issue.save()
def eligible_cname(record):
if 'ResourceRecords' in record and ('elasticbeanstalk.com' in record['ResourceRecords'][0]['Value'] or 'cloudfront.net' in record['ResourceRecords'][0]['Value']):
return True
return False
def eligible_alias(record):
if 'AliasTarget' in record and ('elasticbeanstalk.com' in record['AliasTarget']['DNSName'] or 'cloudfront.net' in record['AliasTarget']['DNSName']):
return True
return False
# Send a summary of results to a configured SNS topic
def notify_sns_topic(config, errors):
if len(errors) == 0:
print("No record errors, not sending SNS notification...")
return
notification_time = str(datetime.now())
sns = boto3.client('sns')
error_text = json.dumps(errors, indent=2)
sns.publish(
TargetArn=config['sns']['topic'],
Subject=f"Red-X Record Errors @ {notification_time}",
Message=json.dumps({'default': f"""
Red-X has run and found the following DNS records pointing to inactive elasticbeanstalk or cloudfront domains. You should take action to prevent domain hijacking!
""" + error_text}),
MessageStructure='json'
)
def handler(event, context):
config = load_config('/red-x/')
r53 = boto3.client('route53')
zone_id = config['route53']['zoneId']
records = []
nextName = None
nextType = None
# Fetch all records in the requested hosted zone
while True:
if nextName and nextType:
response = r53.list_resource_record_sets(
HostedZoneId = zone_id,
StartRecordName = nextName,
StartRecordType = nextType
)
else:
response = r53.list_resource_record_sets(
HostedZoneId = zone_id
)
records = records + response['ResourceRecordSets']
if 'NextRecordName' in response and 'NextRecordType' in response:
nextName = response['NextRecordName']
nextType = response['NextRecordType']
else:
break
# Discard everything except beanstalk-related records
eligible_cnames = [{'name': x['Name'], 'value': x['ResourceRecords'][0]['Value'], 'type': x['Type']} for x in records if eligible_cname(x)]
eligible_aliases = [{'name': x['Name'], 'value': x['AliasTarget']['DNSName'], 'type': x['Type']} for x in records if eligible_alias(x)]
eligible_records = eligible_cnames + eligible_aliases
violating_records = {}
resolver = dns.resolver.Resolver(configure=False)
resolver.timeout = 5
# For each record pointing to beanstalk
for record in eligible_records:
violations = []
if record['type'] == 'CNAME':
violations.append(f"WARN: You should prefer A ALIAS over CNAME for {record['name']}")
try:
answer = dns.resolver.query(record['value'])
print(f"OK: {record['name']}: {', '.join(str(x) for x in answer)}")
except dns.resolver.NXDOMAIN:
violations.append(f"CRIT: {record['name']} points to non-existent beanstalk name: {record['value']}")
if len(violations) > 0:
violating_records[record['name']] = violations
# Open or close GitLab issues for these abandoned records.
if('gitlab' in config):
notify_gitlab_issues(config, violating_records)
# Notify an SNS topic of all abandoned records.
if('sns' in config):
notify_sns_topic(config, violating_records)
return {
"message": "Completed checking for abandoned records.",
"errors": violating_records
}