-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcheck_delegations.py
171 lines (150 loc) · 6.2 KB
/
check_delegations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
import boto3
import dns.resolver
import gitlab
from datetime import datetime
# Load configuration from the EC2 Parameter Store
# Transforms: /red-x/gitlab/token
# Into: {'red-x': {'gitlab': {'token': 'value'}}}
def load_config(ssmPath):
ssm = boto3.client('ssm')
resp = ssm.get_parameters_by_path(
Path = ssmPath,
Recursive=True,
WithDecryption=True
)
config = {}
for param in resp['Parameters']:
path = param['Name'].split('/')
current_level = config
for level in path:
if(level == '' or level == 'red-x'):
continue
if(level not in current_level):
current_level[level] = {}
if(level == path[-1]):
current_level[level] = param['Value']
else:
current_level = current_level[level]
return config
# Open or close GitLab issues based on delegation errors discovered by red-x.
# Opens an issue in the configured project for delegation errors and closes
# any open issues when it no longer identifies that error.
def notify_gitlab_issues(config, errors):
# Load up all open issues in the configured project with label 'red-x'.
gl = gitlab.Gitlab(config['gitlab']['endpoint'], config['gitlab']['token'], api_version=4)
project = gl.projects.get(config['gitlab']['project'])
issues = project.issues.list(labels=['red-x', 'delegation'], state='opened')
zones_with_issues = [i.title for i in issues]
for error in errors:
# This error already has an issue
if f"{error} delegation error" in zones_with_issues:
print(f"ALREADY FILED! {error}! Skipping")
zones_with_issues.remove(f"{error} delegation error")
# This error needs a new issue created
else:
error_json = json.dumps(errors[error], indent=1)
print(f"FILING: {error}!")
issue = project.issues.create({'title': f"{error} delegation error",
'description': f"""```
{error_json}
```""",
'labels': ['red-x', 'delegation']})
# These issues no longer have a delegation error associated with them
# and can be closed.
for leftover in zones_with_issues:
print(f"CLOSING ISSUE: {leftover}")
issue = [x for x in issues if x.title == leftover][0]
issue.notes.create({"body": "Subsequent runs of red-x no longer see this delegation as an issue. Automatically closing ticket."})
issue.state_event = "close"
issue.save()
# Send a summary of results to a configured SNS topic
def notify_sns_topic(config, errors):
if len(errors) == 0:
print("No delegation errors, not sending SNS notification...")
return
notification_time = str(datetime.now())
sns = boto3.client('sns')
error_text = json.dumps(errors, indent=2)
sns.publish(
TargetArn=config['sns']['topic'],
Subject=f"Red-X Delegation Errors @ {notification_time}",
Message=json.dumps({'default': f"""
Red-X has run and found the following abandoned or misconfigured delegations. You should take action to prevent zone hijacking!
""" + error_text}),
MessageStructure='json'
)
def handler(event, context):
config = load_config('/red-x/')
r53 = boto3.client('route53')
zone_id = config['route53']['zoneId']
records = []
nextName = None
nextType = None
# Fetch all records in the requested hosted zone
while True:
if nextName and nextType:
response = r53.list_resource_record_sets(
HostedZoneId = zone_id,
StartRecordName = nextName,
StartRecordType = nextType
)
else:
response = r53.list_resource_record_sets(
HostedZoneId = zone_id
)
records = records + response['ResourceRecordSets']
if 'NextRecordName' in response and 'NextRecordType' in response:
nextName = response['NextRecordName']
nextType = response['NextRecordType']
else:
break
# Discard everything except NS records
delegations = [x for x in records if x['Type'] == 'NS']
delegation_errors = {}
resolver = dns.resolver.Resolver(configure=False)
resolver.timeout = 5
# For each delegated zone
for delegation in delegations:
zone = delegation['Name']
nameservers = [d['Value'] for d in delegation["ResourceRecords"]]
# For each nameserver in the delegation
for ns in nameservers:
resolver.nameservers = [ns]
try:
# Query the nameserver for our zone
answer = dns.resolver.query(zone, 'NS')
found = []
for s in answer:
found = found + [s.to_text()]
# If the nameserver we queried didn't return the expected results,
# the domain may have been hijacked (or just misconfigured).
if set(nameservers) != set(found):
if zone not in delegation_errors:
delegation_errors[zone] = []
delegation_errors[zone].append({
"error": "NS Mismatch",
"source": ns,
"found": found,
"expected": nameservers
})
# If the nameserver doesn't know about this zone, the delegation
# may be abandoned.
except dns.resolver.NoNameservers:
if zone not in delegation_errors:
delegation_errors[zone] = []
delegation_errors[zone].append({
"zone": zone,
"source": ns,
"error": "Unreachable nameservers or no delegation"
})
# Open or close GitLab issues for these delegation errors.
if('gitlab' in config):
notify_gitlab_issues(config, delegation_errors)
# Notify an SNS topic of all delegation errors.
if('sns' in config):
notify_sns_topic(config, delegation_errors)
return {
"message": "Completed checking for abandoned delegations.",
"errors": delegation_errors
}