Skip to content

Commit

Permalink
fix: wrong event count in logs
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Nov 14, 2023
1 parent b488be9 commit 365df96
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
0.13.1
- fix: wrong event count in logs
- ref: np.product is deprecated in numpy 2.0
0.13.0
- feat: support writing file-based HDF5 basins (#11)
Expand Down
18 changes: 10 additions & 8 deletions dcnum/write/queue_collector_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ def run(self):
# our queue thread if we are told to stop.
self.event_queue.cancel_join_thread()
# Indexes the current frame in `self.data`.
cur_frame = 0
last_idx = 0
self.logger.debug("Started collector thread.")
while True:
# Slice of the shared nevents array. If it contains -1 values,
# this means that some of the frames have not yet been processed.
cur_nevents = self.feat_nevents[
cur_frame:cur_frame + self.write_threshold]
last_idx:last_idx + self.write_threshold]
if np.any(np.array(cur_nevents) < 0):
# We are not yet ready to write any new data to the queue.
time.sleep(.01)
Expand All @@ -185,24 +185,26 @@ def run(self):
if len(cur_nevents) == 0:
self.logger.info(
"Reached the end of the current dataset (frame "
f"{cur_frame + 1} of {len(self.feat_nevents)}).")
# `last_idx` is the size of the dataset in the end,
# because `len(cur_nevents)` is always added to it.
f"{last_idx} of {len(self.feat_nevents)}).")
break

# We have reached the writer threshold. This means the extractor
# has analyzed at least `write_threshold` frames (not events).
self.logger.debug(f"Current frame: {cur_frame}")
self.logger.debug(f"Current frame: {last_idx}")

# Create an event stash
stash = EventStash(
index_offset=cur_frame,
index_offset=last_idx,
feat_nevents=cur_nevents
)

# First check whether there is a matching event from the buffer
# that we possibly populated earlier.
for ii in range(len(self.buffer_dq)):
idx, events = self.buffer_dq.popleft()
if cur_frame <= idx < cur_frame + self.write_threshold:
if last_idx <= idx < last_idx + self.write_threshold:
stash.add_events(index=idx, events=events)
else:
# Put it back into the buffer (this should not happen
Expand All @@ -221,7 +223,7 @@ def run(self):
# No time.sleep here, because we are already using
# a timeout in event_queue.get.
continue
if cur_frame <= idx < cur_frame + self.write_threshold:
if last_idx <= idx < last_idx + self.write_threshold:
stash.add_events(index=idx, events=events)
else:
# Goes onto the buffer stack (might happen if a
Expand Down Expand Up @@ -270,4 +272,4 @@ def run(self):
self.written_frames += stash.num_frames

# Increment current frame index.
cur_frame += len(cur_nevents)
last_idx += len(cur_nevents)

0 comments on commit 365df96

Please sign in to comment.