-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprism_dev.py
435 lines (345 loc) · 15 KB
/
prism_dev.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Sistemi Corporation, copyright, all rights reserved, 2019-2023
Martin Guthrie
"""
import os
import sys
import argparse
import jstyleson
import importlib
import time
from public.prism.api import ResultAPI
from public.prism.ResultBaseKeysV1 import ResultBaseKeysV1
from core.shared_state import SharedState
from prism_result_scan import scan_result_file
import logging
import logging.handlers as handlers
logger = logging.getLogger()
class attrdict(dict):
"""
Attribute Dictionary.
Enables getting/setting/deleting dictionary keys via attributes.
Getting/deleting a non-existent key via attribute raises `AttributeError`.
Objects are passed to `__convert` before `dict.__setitem__` is called.
This class rebinds `__setattr__` to call `dict.__setitem__`. Attributes
will not be set on the object, but will be added as keys to the dictionary.
This prevents overwriting access to built-in attributes. Since we defined
`__getattr__` but left `__getattribute__` alone, built-in attributes will
be returned before `__getattr__` is called. Be careful::
>>> a = attrdict()
>>> a['key'] = 'value'
>>> a.key
'value'
>>> a['keys'] = 'oops'
>>> a.keys
<built-in method keys of attrdict object at 0xabcdef123456>
Use `'key' in a`, not `hasattr(a, 'key')`, as a consequence of the above.
"""
def __init__(self, *args, **kwargs):
# We trust the dict to init itself better than we can.
dict.__init__(self, *args, **kwargs)
# Because of that, we do duplicate work, but it's worth it.
for k, v in self.items():
self.__setitem__(k, v)
def __getattr__(self, k):
try:
return dict.__getitem__(self, k)
except KeyError:
# Maintain consistent syntactical behaviour.
raise AttributeError(
"'attrdict' object has no attribute '" + str(k) + "'"
)
def __setitem__(self, k, v):
dict.__setitem__(self, k, attrdict.__convert(v))
__setattr__ = __setitem__
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(
"'attrdict' object has no attribute '" + str(k) + "'"
)
@staticmethod
def __convert(o):
"""
Recursively convert `dict` objects in `dict`, `list`, `set`, and
`tuple` objects to `attrdict` objects.
"""
if isinstance(o, dict):
o = attrdict(o)
elif isinstance(o, list):
o = list(attrdict.__convert(v) for v in o)
elif isinstance(o, set):
o = set(attrdict.__convert(v) for v in o)
elif isinstance(o, tuple):
o = tuple(attrdict.__convert(v) for v in o)
return o
class ChanCon(object):
def __init__(self, num=0, script=None, shared_state=None, script_filename="UNKNOWN", operator="UNKNOWN"):
self.logger = logging.getLogger("{}.{}".format(__class__.__name__, num))
self.ch = num
self.script = script
self.shared_state = shared_state
self.operator = operator
self.num_channels = 0
self.record = ResultBaseKeysV1(0, "prismdev", script_filename)
self.record.record_info_set(script.get("info", {}))
self.record.record_record_meta_init()
def item_start(self):
d = {"item": self._item,
# item dict from the script, ex {"id": "TST000", "enable": true, "args": {"min": 0, "max": 2}}
"options": self._options, # options dict from the script, ex { "fail_fast": false }
"record": self.record,
# TODO: add more stuff as needed
}
self.record.record_item_create(d["item"]["id"])
return attrdict(d)
def item_end(self, item_result_state=ResultAPI.RECORD_RESULT_PASS, _next=None):
self.logger.debug("{}, {}".format(self._item["id"], item_result_state))
if self.record.record_test_get_result() not in [ResultAPI.RECORD_RESULT_UNKNOWN]:
# there must have been another early failure, either a timeout or crash...
# bail on processing, assume its already been done...
self.logger.warning("record_test_set_result already set... aborting")
return
# process a list of results, set the final state to the first non pass state
if isinstance(item_result_state, list):
_final = ResultAPI.RECORD_RESULT_PASS
for result in item_result_state:
if result is not ResultAPI.RECORD_RESULT_PASS:
_final = result
break
item_result_state = _final
self.record.record_test_set_result(item_result_state)
self.record.record_item_end()
def log_bullet(self, text, ovrwrite_last_line):
self.logger.info("BULLET: {}".format(text))
def run(self):
show_pass_fail = None
# process HW drivers
num_channels = -1
for hwdrv in self.script["config"]["drivers"]:
self.logger.info("HWDRV: {}".format(hwdrv))
if isinstance(hwdrv, list):
_hwd, _args = hwdrv[0], hwdrv[1]
hwdrv_sname = _hwd.split(".")[-1]
i = importlib.import_module(_hwd)
hwdriver = getattr(i, "HWDriver")()
_num_channels, driver_type, drivers = hwdriver.discover_channels(_args)
else:
_hwd = hwdrv
hwdrv_sname = _hwd.split(".")[-1]
i = importlib.import_module(hwdrv)
hwdriver = getattr(i, "HWDriver")()
_num_channels, driver_type, drivers = hwdriver.discover_channels()
if _num_channels >= 0: # add to shared state if all good
shared = False
if _num_channels == 0: shared = True
self.shared_state.add_drivers(driver_type, drivers, shared)
# call the player function if exist, ignore result, but see logs
if drivers:
if drivers[0].get('play', None):
play = drivers[0].get('play')()
while not play:
play = drivers[0].get("play")()
self.logger.info("player: {}".format(play))
if not play: time.sleep(1)
if show_pass_fail is None:
show_pass_fail = drivers[0].get("show_pass_fail", None)
if show_pass_fail:
show_pass_fail(False, False, False)
self.logger.info("{} - number channels {}".format(hwdrv_sname, _num_channels))
if _num_channels == 0:
# this HW DRV does not indicate number of channels, its a shared resource
pass
elif _num_channels < 0:
raise ValueError('Error returned by HWDRV {}'.format(hwdrv_sname))
elif num_channels == -1:
num_channels = _num_channels
elif num_channels != _num_channels:
self.logger.error(
"{} - number channels {} does not match previous HWDRV".format(hwdriver, _num_channels))
raise ValueError('Mismatch number of channels between HW Drivers')
self.num_channels = num_channels
self.logger.info("number channels {}".format(self.num_channels))
if self.num_channels < 1:
self.logger.error("Invalid number of channels, must be >0")
raise ValueError('Invalid number of channels')
for test in self.script["tests"]:
fail_fast = self.script["config"].get("fail_fast", True)
self._options = test["options"]
if "fail_fast" in self._options:
fail_fast = self._options["fail_fast"]
self.logger.info("Module: {}, fail_fast: {}".format(test["module"], fail_fast))
test_module = importlib.import_module(test["module"])
klass = test["module"].split(".")[-1]
self.logger.debug("class: {}".format(klass))
test_module_klass = getattr(test_module, klass)
test_klass = test_module_klass(controller=self, chan=self.ch, shared_state=self.shared_state)
for item in test["items"]:
self.logger.info("ITEM: {}".format(item))
if item.get("enable", True):
self._item = item
if not getattr(test_klass, item["id"], False):
msg = "method {} is not in module {}".format(item["id"], test_klass)
self.logger.error(msg)
raise ValueError(msg)
func = getattr(test_klass, item["id"])
func()
if fail_fast and self.record.record_meta_get_result() != ResultAPI.RECORD_RESULT_PASS:
break
# run teardown if not done so already
if item != test["items"][-1]:
item = test["items"][-1]
self.logger.info("ITEM: {}".format(item))
if item.get("enable", True):
self._item = item
if not getattr(test_klass, item["id"], False):
msg = "method {} is not in module {}".format(item["id"], test_klass)
self.logger.error(msg)
raise ValueError(msg)
func = getattr(test_klass, item["id"])
func()
else:
logger.error("teardown (last item) script should not be disabled")
if fail_fast and self.record.record_meta_get_result() != ResultAPI.RECORD_RESULT_PASS:
break
self.record.record_record_meta_fini()
result_file = self.record.record_publish()
if show_pass_fail is not None:
p = f = o = False
if self.record.record_meta_get_result() == ResultAPI.RECORD_RESULT_PASS:
p = True
elif self.record.record_meta_get_result() == ResultAPI.RECORD_RESULT_FAIL:
f = True
else:
o = True
show_pass_fail(p, f, o)
return result_file
def setup_logging(log_file_name_prefix="log", level=logging.INFO, path="./log"):
global logger
logger = logging.getLogger()
logger.setLevel(level)
log_file_name_prefix = os.path.basename(log_file_name_prefix)
if not os.path.exists(path): os.makedirs(path)
# Here we define our formatter
FORMAT = "%(relativeCreated)5d %(filename)30s:%(lineno)4s - %(name)30s:%(funcName)20s() %(levelname)-5.5s : %(message)s"
formatter = logging.Formatter(FORMAT)
allLogHandler_filename = os.path.join(path, "".join([log_file_name_prefix, ".log"]))
allLogHandler = handlers.RotatingFileHandler(allLogHandler_filename, maxBytes=1024 * 1024, backupCount=4)
allLogHandler.setLevel(logging.INFO)
allLogHandler.setFormatter(formatter)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(allLogHandler)
logger.addHandler(consoleHandler)
def read_json_file_to_dict(file):
if not os.path.isfile(file):
msg = "Unable to find json file %s" % file
logger.error(msg)
return False, msg
with open(file) as f:
json_data = f.read()
try:
result_dict = jstyleson.loads(json_data) # OK
except Exception as e:
logger.error(e)
return False, e
return True, result_dict
def parse_args():
"""
:return: args
"""
epilog = """
Usage examples:
python3 prism_dev.py --script ./public/prism/scripts/example/prod_v0/prod_0.scr
python3 prism_dev.py --script ./public/prism/scripts/example/pybrd_v0/pybrd_0.scr
Sistemi Corporation, copyright, all rights reserved, 2019
"""
parser = argparse.ArgumentParser(description='prism_dev',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=epilog)
parser.add_argument("--script",
dest="script",
action="store",
required=True,
help="Path to script file to run")
parser.add_argument("--result-scan",
dest="result_scan",
action="store_true",
help="Scan result file for correctness")
args = parser.parse_args()
args_dict = vars(args)
return args_dict
def script_validated(script):
if not script.get("info", False):
logger.error("Script is missing 'info' section")
return False
INFO_PRODUCT_LEN = 32
INFO_BOM_LEN = 32
INFO_LOT_LEN = 16
INFO_LOCATION_LEN = 128
INFO_CONFIG_LEN = 16
info = script["info"]
items = [("product", INFO_PRODUCT_LEN),
("bom", INFO_BOM_LEN),
("lot", INFO_LOT_LEN),
("location", INFO_LOCATION_LEN)]
for i in items:
if not info.get(i[0], False):
logger.error(f"Script 'info' section missing {i[0]}")
return False
if len(info[i[0]]) > i[1]:
logger.error(f"Script 'info' section {i[0]} exceeds max length {i[1]}")
return False
if info.get("config", False):
if len(info["config"]) > INFO_CONFIG_LEN:
logger.error(f"Script 'info' section config exceeds max length {INFO_CONFIG_LEN}")
return False
if not script.get("config", False):
logger.error("Script is missing 'config' section")
return False
if not script["config"].get("drivers", False):
logger.error("Script is missing 'config.drivers' section")
return False
if script.get("subs", False):
logger.error("'subs' are not supported in console development")
logger.error("Use prism_subs.py to process script substitutions.")
return False
# TODO: add more stuff, check imports....
logger.info("Script passed validation tests")
return True
def main():
setup_logging(log_file_name_prefix="dev", path="log")
args = parse_args()
logger.info("args: {}".format(args))
if args is None:
logger.error("Failed to parse args")
return 1
# read script
success, script = read_json_file_to_dict(args["script"])
if not success:
logger.error(script)
return 1
# validate script
if not script_validated(script):
logger.error("Script failed to validate")
return 1
shared_state = SharedState()
con = ChanCon(0, script, shared_state, args["script"])
result_file = con.run()
# close any drivers
drivers = shared_state.get_drivers(0)
drivers.extend(shared_state.get_drivers(None))
for d in drivers:
if d['obj'].get("close", False):
d["obj"]["close"]()
# TODO: publish shutdown
if args["result_scan"]:
logger.info(f"Running result record scan on {result_file}")
scan_result_file(result_file)
return 0
if __name__ == "__main__":
retcode = main()
sys.exit(retcode)