This repository has been archived by the owner on Nov 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathgenerator.py
executable file
·435 lines (381 loc) · 15 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
#!/usr/bin/env python3
from __future__ import annotations
import json
import logging
import os
import signal
import subprocess
from multiprocessing import Process, Queue
from os.path import join as pjoin
from pathlib import Path
from random import randint
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Generator, Optional, Union
from ccbuilder import Builder, PatchDB, get_compiler_info
from dead_instrumenter.instrumenter import instrument_program
import checker
import parsers
import utils
def run_csmith(csmith: str) -> str:
"""Generate random code with csmith.
Args:
csmith (str): Path to executable or name in $PATH to csmith.
Returns:
str: csmith generated program.
"""
tries = 0
while True:
options = [
"arrays",
"bitfields",
"checksum",
"comma-operators",
"compound-assignment",
"consts",
"divs",
"embedded-assigns",
"jumps",
"longlong",
"force-non-uniform-arrays",
"math64",
"muls",
"packed-struct",
"paranoid",
"pointers",
"structs",
"inline-function",
"return-structs",
"arg-structs",
"dangling-global-pointers",
]
cmd = [
csmith,
"--no-unions",
"--safe-math",
"--no-argc",
"--no-volatiles",
"--no-volatile-pointers",
]
for option in options:
if randint(0, 1):
cmd.append(f"--{option}")
else:
cmd.append(f"--no-{option}")
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode == 0:
return result.stdout.decode("utf-8")
else:
tries += 1
if tries > 10:
raise Exception("CSmith failed 10 times in a row!")
def generate_file(
config: utils.NestedNamespace, additional_flags: str
) -> tuple[str, str]:
"""Generate an instrumented csmith program.
Args:
config (utils.NestedNamespace): THE config
additional_flags (str): Additional flags to use when
compiling the program when checking.
Returns:
tuple[str, str]: Marker prefix and instrumented code.
"""
additional_flags += f" -I {config.csmith.include_path}"
while True:
try:
logging.debug("Generating new candidate...")
candidate = run_csmith(config.csmith.executable)
if len(candidate) > config.csmith.max_size:
continue
if len(candidate) < config.csmith.min_size:
continue
with NamedTemporaryFile(suffix=".c") as ntf:
with open(ntf.name, "w") as f:
print(candidate, file=f)
logging.debug("Checking if program is sane...")
if not checker.sanitize(
config.gcc.sane_version,
config.llvm.sane_version,
config.ccomp,
Path(ntf.name),
additional_flags,
):
continue
logging.debug("Instrumenting candidate...")
marker_prefix = instrument_program(
Path(ntf.name), [f"-I{config.csmith.include_path}"]
)
with open(ntf.name, "r") as f:
return marker_prefix, f.read()
return marker_prefix, candidate
except subprocess.TimeoutExpired:
pass
class CSmithCaseGenerator:
def __init__(
self,
config: utils.NestedNamespace,
patchdb: PatchDB,
cores: Optional[int] = None,
):
self.config: utils.NestedNamespace = config
_, llvm_repo = get_compiler_info("llvm", Path(config.repodir))
_, gcc_repo = get_compiler_info("gcc", Path(config.repodir))
self.builder: Builder = Builder(
Path(config.cachedir),
gcc_repo,
llvm_repo,
patchdb,
cores,
logdir=Path(config.logdir),
)
self.chkr: checker.Checker = checker.Checker(config, self.builder)
self.procs: list[Process] = []
self.try_counter: int = 0
def generate_interesting_case(self, scenario: utils.Scenario) -> utils.Case:
"""Generate a case which is interesting i.e. has one compiler which does
not eliminate a marker (from the target settings) a and at least one from
the attacker settings.
Args:
scenario (utils.Scenario): Which compiler to compare.
Returns:
utils.Case: Intersting case.
"""
# Because the resulting code will be of csmith origin, we have to add
# the csmith include path to all settings
csmith_include_flag = f"-I{self.config.csmith.include_path}"
scenario.add_flags([csmith_include_flag])
self.try_counter = 0
while True:
self.try_counter += 1
logging.debug("Generating new candidate...")
marker_prefix, candidate_code = generate_file(self.config, "")
# Find alive markers
logging.debug("Getting alive markers...")
try:
target_alive_marker_list = [
(
tt,
utils.find_alive_markers(
candidate_code, tt, marker_prefix, self.builder
),
)
for tt in scenario.target_settings
]
tester_alive_marker_list = [
(
tt,
utils.find_alive_markers(
candidate_code, tt, marker_prefix, self.builder
),
)
for tt in scenario.attacker_settings
]
except utils.CompileError:
continue
target_alive_markers = set()
for _, marker_set in target_alive_marker_list:
target_alive_markers.update(marker_set)
# Extract reduce cases
logging.debug("Extracting reduce cases...")
for marker in target_alive_markers:
good: list[utils.CompilerSetting] = []
for good_setting, good_alive_markers in tester_alive_marker_list:
if (
marker not in good_alive_markers
): # i.e. the setting eliminated the call
good.append(good_setting)
# Find bad cases
if len(good) > 0:
good_opt_levels = [gs.opt_level for gs in good]
for bad_setting, bad_alive_markers in target_alive_marker_list:
# XXX: Here you can enable inter-opt_level comparison!
if (
marker in bad_alive_markers
and bad_setting.opt_level in good_opt_levels
): # i.e. the setting didn't eliminate the call
# Create reduce case
case = utils.Case(
code=candidate_code,
marker=marker,
bad_setting=bad_setting,
good_settings=good,
scenario=scenario,
reduced_code=None,
bisection=None,
path=None,
)
# TODO: Optimize interestingness test and document behaviour
try:
if self.chkr.is_interesting(case):
logging.info(
f"Try {self.try_counter}: Found case! LENGTH: {len(candidate_code)}"
)
return case
except utils.CompileError:
continue
else:
logging.debug(
f"Try {self.try_counter}: Found no case. Onto the next one!"
)
def _wrapper_interesting(self, queue: Queue[str], scenario: utils.Scenario) -> None:
"""Wrapper for generate_interesting_case for easier use
with python multiprocessing.
Args:
queue (Queue): The multiprocessing queue to do IPC with.
scenario (utils.Scenario): Scenario
"""
logging.info("Starting worker...")
while True:
case = self.generate_interesting_case(scenario)
queue.put(json.dumps(case.to_jsonable_dict()))
def parallel_interesting_case_file(
self,
config: utils.NestedNamespace,
scenario: utils.Scenario,
processes: int,
output_dir: os.PathLike[str],
start_stop: Optional[bool] = False,
) -> Generator[Path, None, None]:
"""Generate interesting cases in parallel
WARNING: If you use this method, you have to call `terminate_processes`
Args:
config (utils.NestedNamespace): THE config.
scenario (utils.Scenario): Scenario.
processes (int): Amount of jobs.
output_dir (os.PathLike): Directory where to output the found cases.
start_stop (Optional[bool]): Whether or not stop the processes when
finding a case. This is useful when running a pipeline and thus
the processing power is needed somewhere else.
Returns:
Generator[Path, None, None]: Interesting case generator giving paths.
"""
gen = self.parallel_interesting_case(config, scenario, processes, start_stop)
counter = 0
while True:
case = next(gen)
h = hash(str(case))
h = max(h, -h)
path = Path(pjoin(output_dir, f"case_{counter:08}-{h:019}.tar"))
logging.debug("Writing case to {path}...")
case.to_file(path)
yield path
counter += 1
def parallel_interesting_case(
self,
config: utils.NestedNamespace,
scenario: utils.Scenario,
processes: int,
start_stop: Optional[bool] = False,
) -> Generator[utils.Case, None, None]:
"""Generate interesting cases in parallel
WARNING: If you use this method, you have to call `terminate_processes`
Args:
config (utils.NestedNamespace): THE config.
scenario (utils.Scenario): Scenario.
processes (int): Amount of jobs.
output_dir (os.PathLike): Directory where to output the found cases.
start_stop (Optional[bool]): Whether or not stop the processes when
finding a case. This is useful when running a pipeline and thus
the processing power is needed somewhere else.
Returns:
Generator[utils.Case, None, None]: Interesting case generator giving Cases.
"""
queue: Queue[str] = Queue()
# Create processes
self.procs = [
Process(
target=self._wrapper_interesting,
args=(queue, scenario),
)
for _ in range(processes)
]
# Start processes
for p in self.procs:
p.daemon = True
p.start()
# read queue
while True:
# TODO: handle process failure
case_str: str = queue.get()
case = utils.Case.from_jsonable_dict(config, json.loads(case_str))
if start_stop:
# Send processes to "sleep"
logging.debug("Stopping workers...")
for p in self.procs:
if p.pid is None:
continue
os.kill(p.pid, signal.SIGSTOP)
yield case
if start_stop:
logging.debug("Restarting workers...")
# Awake processes again for further search
for p in self.procs:
if p.pid is None:
continue
os.kill(p.pid, signal.SIGCONT)
def terminate_processes(self) -> None:
for p in self.procs:
if p.pid is None:
continue
# This is so cruel
os.kill(p.pid, signal.SIGCONT)
p.terminate()
if __name__ == "__main__":
config, args = utils.get_config_and_parser(parsers.generator_parser())
cores = args.cores
patchdb = PatchDB(Path(config.patchdb))
case_generator = CSmithCaseGenerator(config, patchdb, cores)
if args.interesting:
scenario = utils.Scenario([], [])
if args.scenario:
scenario = utils.Scenario.from_file(config, Path(args.scenario))
if not args.scenario and args.targets is None:
print(
"--targets is required for --interesting if you don't specify a scenario"
)
exit(1)
elif args.targets:
target_settings = utils.get_compiler_settings(
config, args.targets, default_opt_levels=args.targets_default_opt_levels
)
scenario.target_settings = target_settings
if not args.scenario and args.additional_compilers is None:
print(
"--additional-compilers is required for --interesting if you don't specify a scenario"
)
exit(1)
elif args.additional_compilers:
additional_compilers = utils.get_compiler_settings(
config,
args.additional_compilers,
default_opt_levels=args.additional_compilers_default_opt_levels,
)
scenario.attacker_settings = additional_compilers
if args.output_directory is None:
print("Missing output directory!")
exit(1)
else:
output_dir = os.path.abspath(args.output_directory)
os.makedirs(output_dir, exist_ok=True)
if args.parallel is not None:
amount_cases = args.amount if args.amount is not None else 0
amount_processes = max(1, args.parallel)
gen = case_generator.parallel_interesting_case_file(
config=config,
scenario=scenario,
processes=amount_processes,
output_dir=output_dir,
start_stop=False,
)
if amount_cases == 0:
while True:
print(next(gen))
else:
for i in range(amount_cases):
print(next(gen))
else:
print(case_generator.generate_interesting_case(scenario))
else:
# TODO
print("Not implemented yet")
# This is not needed here but I don't know why.
case_generator.terminate_processes()