-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecording.py
99 lines (79 loc) · 2.83 KB
/
recording.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import sys
import io
from contextlib import redirect_stdout, contextmanager
from collections import namedtuple, defaultdict
import re
import numpy as np
import pandas as pd
Event = namedtuple("event", ("step", "name", "value"))
class OperatorScoreParser:
pattern = re.compile(
"\s+Step\s+(?P<step>\d+)\s+\|(?P<operator>[\sa-zA-Z\-_()]+)(?P<score>[\d.+-eE]+)\s+"
)
def parse(self, line):
match = self.pattern.match(line)
if match is None:
return None
matches = [match.group(group) for group in ("step", "operator", "score")]
step = int(matches[0])
name = matches[1].strip()
value = float(matches[2])
return [Event(step, name, value)]
class PyramidLevelParser:
pattern = re.compile(
"\s+Pyramid level\s+(?P<level>\d+)\s+\((?P<width>\d+)\s+x\s+(?P<height>\d+)\)\s+"
)
def parse(self, line):
match = self.pattern.match(line)
if match is None:
return None
step = None
names = ("level", "width", "height")
matches = [match.group(group) for group in names]
return [
Event(step, name, value) for name, value in zip(names, map(int, matches))
]
class ImageOptimizerProgressRecorder(io.StringIO):
def __init__(self, *args, quiet=False, **kwargs):
super().__init__(*args, **kwargs)
self.quiet = quiet
self._parsers = (OperatorScoreParser(), PyramidLevelParser())
self._events = []
def write(self, s):
self.parse(s)
if not self.quiet:
sys.__stdout__.write(s)
def parse(self, s):
for parser in self._parsers:
events = parser.parse(s)
if events is not None:
self._events.extend(events)
break
def extract(self, filler=np.nan, dropna=False):
step_offset = 0
step = 0
data = defaultdict(list)
for event in self._events:
if event.name == "level":
step_offset = step
step = 0
if event.step is not None:
step = event.step
data[event.name].append((step + step_offset, event.value))
num_rows = max([entry[-1][0] for entry in data.values()]) + 1
def fill(ind, v, **kwargs):
# FIXME: rename a
a = np.full((num_rows,), filler)
np.put(a, ind, v, **kwargs)
return a
data = {name: fill(*zip(*entries)) for name, entries in data.items()}
data["step"] = range(num_rows)
df = pd.DataFrame.from_dict(data).set_index("step")
if dropna:
df = df.dropna(axis="index", how="all")
return df
@contextmanager
def record_nst(quiet=False):
recorder = ImageOptimizerProgressRecorder(quiet=quiet)
with redirect_stdout(recorder):
yield recorder