Skip to content

Commit

Permalink
Test(s) added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Feb 5, 2025
1 parent 8cfb79a commit 68ef614
Showing 1 changed file with 91 additions and 4 deletions.
95 changes: 91 additions & 4 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ async def harness(mod_flow, mod_scheduler, mod_start):
yield schd, data


@pytest.fixture(scope='module')
async def harness_prune(mod_flow, mod_scheduler, mod_start):
flow_def = {
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'graph': {
'R1': 'foo => bar'
}
},
'runtime': {
'BAR': {},
'bar': {
'inherit': 'BAR'
}
}
}
id_: str = mod_flow(flow_def)
schd: 'Scheduler' = mod_scheduler(id_)
async with mod_start(schd):
await schd.update_data_structure()
data = schd.data_store_mgr.data[schd.data_store_mgr.workflow_id]
yield schd, data

def collect_states(data, node_type):
return [
t.state
Expand Down Expand Up @@ -313,12 +338,74 @@ async def test_update_data_structure(harness):
assert TASK_STATUS_FAILED in set(collect_states(data, FAMILY_PROXIES))
# state totals changed
assert TASK_STATUS_FAILED in data[WORKFLOW].state_totals
# Shows pruning worked
# TODO: fixme
# https://github.com/cylc/cylc-flow/issues/4175#issuecomment-1025666413
# assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1


async def test_prune_data_store(harness_prune):
"""Test prune_data_store. This method will expand and reduce the data-store
to invoke pruning."""
schd, data = harness_prune
w_id = schd.data_store_mgr.workflow_id
schd.data_store_mgr.data[w_id] = data
schd.pool.hold_tasks(['*'])
await schd.update_data_structure()
assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 2
# Window size reduction to invoke pruning
schd.data_store_mgr.set_graph_window_extent(0)
schd.data_store_mgr.update_data_structure()
assert len({
t.is_held
for t in data[TASK_PROXIES].values()
if t.graph_depth == 0
}) == 1

# Test rapid addition and removal
assert len({
f.name
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
if f.name == 'BAR'
}) == 0
for itask in schd.pool.get_tasks():
schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED)
assert len({
f.name
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
if f.name == 'BAR'
}) == 1
for itask in schd.pool.get_tasks():
if itask.tdef.name == 'bar':
schd.pool.remove(itask, 'Test removal')
schd.data_store_mgr.update_data_structure()
assert len({
f.name
for f in schd.data_store_mgr.added[FAMILY_PROXIES].values()
if f.name == 'BAR'
}) == 0
assert len({
f.name
for f in data[FAMILY_PROXIES].values()
if f.name == 'BAR'
}) == 0


async def test_update_data_structure(harness):
"""Test update_data_structure. This method will generate and
apply adeltas/updates given."""
schd, data = harness
w_id = schd.data_store_mgr.workflow_id
schd.data_store_mgr.data[w_id] = data
schd.pool.hold_tasks(['*'])
await schd.update_data_structure()
assert TASK_STATUS_FAILED not in set(collect_states(data, TASK_PROXIES))
assert TASK_STATUS_FAILED not in set(collect_states(data, FAMILY_PROXIES))
assert TASK_STATUS_FAILED not in data[WORKFLOW].state_totals
assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 2
for itask in schd.pool.get_tasks():
itask.state.reset(TASK_STATUS_FAILED)
schd.data_store_mgr.delta_task_state(itask)
schd.data_store_mgr.update_data_structure()
# State change applied
assert TASK_STATUS_FAILED in set(collect_states(data, TASK_PROXIES))

def test_delta_task_prerequisite(harness):
"""Test delta_task_prerequisites."""
schd: Scheduler
Expand Down

0 comments on commit 68ef614

Please sign in to comment.