From 209cb6fece25f86e6bace152d64b721c294153fa Mon Sep 17 00:00:00 2001 From: ignormies Date: Thu, 5 Aug 2021 12:30:44 -0700 Subject: [PATCH] Add streaming pause/unpause queue to support many simultaneous streams (#178) --- brainframe_qt/__init__.py | 2 +- brainframe_qt/api_utils/__init__.py | 39 +- brainframe_qt/api_utils/streaming/__init__.py | 3 +- .../api_utils/streaming/stream_listener.py | 28 -- .../api_utils/streaming/stream_manager.py | 276 +++++++------ .../api_utils/streaming/synced_reader.py | 391 ++++++++++++------ .../api_utils/streaming/zone_status_frame.py | 13 +- brainframe_qt/ui/brainframe_app.py | 13 +- .../task_configuration/task_configuration.py | 32 +- .../video_task_config/video_task_config.py | 2 +- .../activities/stream_activity/stream_view.py | 2 +- .../stream_configuration.py | 17 +- brainframe_qt/ui/main_window/main_window.py | 5 +- .../alert_log/alert_log.py | 6 +- .../video_expanded_view.py | 12 +- .../stream_overlay/stream_widget_overlay.py | 22 +- .../video_large/video_large.py | 17 +- .../video_small/video_small.py | 140 ------- .../video_thumbnail_view.py | 85 ++-- .../video_thumbnail_view_ui.py | 7 +- .../widgets/thumbnail_grid_layout/__init__.py | 1 + .../thumbnail_grid_layout.py | 26 +- .../thumbnail_grid_layout.ui | 0 .../widgets/video_small/__init__.py | 1 + .../widgets/video_small/video_small.py | 98 +++++ .../ui/resources/i18n/brainframe_zh.ts | 70 ++-- .../ui/resources/images/stream_paused.png | 3 + .../images/stream_paused_inkscape.svg | 3 + .../ui/resources/paths/qt_ui_paths.py | 3 +- brainframe_qt/ui/resources/qt_resources.qrc | 2 + .../streams/stream_event_manager.py | 251 +++++++++++ .../streams/stream_graphics_scene.py | 26 +- .../streams/stream_listener_widget.py | 148 ------- .../video_items/streams/stream_widget.py | 45 +- pyproject.toml | 2 +- 35 files changed, 1029 insertions(+), 762 deletions(-) delete mode 100644 brainframe_qt/api_utils/streaming/stream_listener.py delete mode 100644 brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/video_small/video_small.py create mode 100644 brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/__init__.py rename brainframe_qt/ui/main_window/video_thumbnail_view/{ => widgets}/thumbnail_grid_layout/thumbnail_grid_layout.py (90%) rename brainframe_qt/ui/main_window/video_thumbnail_view/{ => widgets}/thumbnail_grid_layout/thumbnail_grid_layout.ui (100%) create mode 100644 brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/__init__.py create mode 100644 brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/video_small.py create mode 100644 brainframe_qt/ui/resources/images/stream_paused.png create mode 100644 brainframe_qt/ui/resources/images/stream_paused_inkscape.svg create mode 100644 brainframe_qt/ui/resources/video_items/streams/stream_event_manager.py delete mode 100644 brainframe_qt/ui/resources/video_items/streams/stream_listener_widget.py diff --git a/brainframe_qt/__init__.py b/brainframe_qt/__init__.py index df7db8648..2e378084f 100644 --- a/brainframe_qt/__init__.py +++ b/brainframe_qt/__init__.py @@ -1 +1 @@ -__version__ = "0.28.0" +__version__ = "0.28.4" diff --git a/brainframe_qt/api_utils/__init__.py b/brainframe_qt/api_utils/__init__.py index aa618b8b2..a06d00a30 100644 --- a/brainframe_qt/api_utils/__init__.py +++ b/brainframe_qt/api_utils/__init__.py @@ -1,5 +1,38 @@ +import typing + +from PyQt5.QtCore import QObject from brainframe.api import BrainFrameAPI -from .streaming import StreamManager, StreamManagerAPI -# API instance that is later monkeypatched to be a singleton -api: BrainFrameAPI = StreamManagerAPI() +# Singleton API instance +api = BrainFrameAPI() + +# Must come after api import +from .streaming import StreamManager + +# Set using init_stream_manager before use +# +# Note: Creating singleton QObjects is a bit of a hassle because they need a parent +# object. Flutter has a really cool system called InheritedWidget/Provider that makes +# passing objects like this around really easily. I'd like to implement something +# similar, but for now this is the only singleton object we have so I think this +# should be begrudgingly "ok". +_stream_manager: StreamManager = typing.cast(StreamManager, None) + + +def init_stream_manager(*, parent: QObject) -> None: + global _stream_manager + + if _stream_manager is not None: + raise RuntimeError("StreamManager has already been initialized") + + _stream_manager = StreamManager(parent=parent) + + +def get_stream_manager() -> StreamManager: + if _stream_manager is None: + raise RuntimeError( + f"StreamManager has not been initialized yet. Call " + f"{init_stream_manager.__name__} first." + ) + + return _stream_manager diff --git a/brainframe_qt/api_utils/streaming/__init__.py b/brainframe_qt/api_utils/streaming/__init__.py index 8485f736d..1b0f56c2b 100644 --- a/brainframe_qt/api_utils/streaming/__init__.py +++ b/brainframe_qt/api_utils/streaming/__init__.py @@ -1,3 +1,2 @@ -from .stream_manager import StreamManager, StreamManagerAPI +from .stream_manager import StreamManager from .synced_reader import SyncedStreamReader -from .stream_listener import StreamListener diff --git a/brainframe_qt/api_utils/streaming/stream_listener.py b/brainframe_qt/api_utils/streaming/stream_listener.py deleted file mode 100644 index c977042f1..000000000 --- a/brainframe_qt/api_utils/streaming/stream_listener.py +++ /dev/null @@ -1,28 +0,0 @@ -from threading import Event - - -class StreamListener: - """This is used by SyncedStreamReader to pass events to the UI""" - - def __init__(self): - self.frame_event = Event() - """Called when a new ZoneStatusFrame is ready""" - - self.stream_initializing_event = Event() - """Called when the stream starts initializing""" - - self.stream_halted_event = Event() - """Called when the stream has halted""" - - self.stream_closed_event = Event() - """Called when the stream connection has closed""" - - self.stream_error_event = Event() - """Called upon serious error (this shouldn't happen?)""" - - def clear_all_events(self): - self.frame_event.clear() - self.stream_initializing_event.clear() - self.stream_halted_event.clear() - self.stream_closed_event.clear() - self.stream_error_event.clear() diff --git a/brainframe_qt/api_utils/streaming/stream_manager.py b/brainframe_qt/api_utils/streaming/stream_manager.py index 5a75731ca..1d5663bee 100644 --- a/brainframe_qt/api_utils/streaming/stream_manager.py +++ b/brainframe_qt/api_utils/streaming/stream_manager.py @@ -1,139 +1,175 @@ import logging +import typing +from threading import RLock +from typing import Dict, List, Optional -from brainframe.api import BrainFrameAPI, StatusReceiver -from brainframe.api.bf_codecs import StreamConfiguration -from gstly import gobject_init -from gstly.stream_reader import GstStreamReader, StreamReader +from PyQt5.QtCore import QObject +from brainframe.api.bf_codecs import StreamConfiguration +from brainframe_qt.api_utils import api from .synced_reader import SyncedStreamReader -class StreamManager: - """Keeps track of existing Stream objects, and creates new ones as - necessary. - """ +class StreamManager(QObject): + """Keeps track of existing Stream objects, and creates new ones as necessary""" - REHOSTED_VIDEO_TYPES = [StreamConfiguration.ConnType.WEBCAM, - StreamConfiguration.ConnType.FILE] - """These video types are re-hosted by the server.""" + _MAX_ACTIVE_STREAMS = 5 + """Number of streams to run concurrently""" - def __init__(self, status_receiver: StatusReceiver): - self._stream_readers = {} - self._status_receiver = status_receiver - self._async_closing_streams = [] - """A list of StreamReader objects that are closing or may have finished - closing""" + def __init__(self, *, parent: QObject): + super().__init__(parent=parent) - def start_streaming(self, - stream_config: StreamConfiguration, - url: str) -> SyncedStreamReader: - """Starts reading from the stream using the given information, or - returns an existing reader if we're already reading this stream. + self._stream_lock = RLock() - :param stream_config: The stream to connect to - :param url: The URL to stream on - :return: A Stream object + self._running_streams: List[int] = [] + """Currently running streams. Does not include stay-alive streams. + + Max length should be _NUM_ACTIVE_STREAMS """ - if not self.is_streaming(stream_config.id): - # pipeline will be None if not in the options - pipeline: str = stream_config.connection_options.get("pipeline") - - latency = StreamReader.DEFAULT_LATENCY - if stream_config.connection_type in self.REHOSTED_VIDEO_TYPES: - latency = StreamReader.REHOSTED_LATENCY - gobject_init.start() - - # Streams created with a premises are always proxied from that - # premises - proxied = stream_config.premises_id is not None - - stream_reader = GstStreamReader( - url, - latency=latency, - runtime_options=stream_config.runtime_options, - pipeline_str=pipeline, - proxied=proxied) - synced_stream_reader = SyncedStreamReader( - stream_config.id, - stream_reader, - self._status_receiver) - self._stream_readers[stream_config.id] = synced_stream_reader - - return self._stream_readers[stream_config.id] - - def is_streaming(self, stream_id: int) -> bool: - """Checks if the the manager has a stream reader for the given stream - id. - - :param stream_id: The stream ID to check - :return: True if the stream manager has a stream reader, false - otherwise - """ - return stream_id in self._stream_readers + self._paused_streams: List[int] = [] + """Currently paused streams""" - def close_stream(self, stream_id: int) -> None: - """Close a specific stream and remove the reference. + self.stream_readers: Dict[int, SyncedStreamReader] = {} + """All StreamReaders currently instantiated, paused or unpaused""" - :param stream_id: The ID of the stream to delete - """ - stream = self.close_stream_async(stream_id) - stream.wait_until_closed() + self._init_signals() + + def _init_signals(self) -> None: + self.destroyed.connect(self.close) def close(self) -> None: - """Close all streams and remove references""" - for stream_id in self._stream_readers.copy().keys(): - self.close_stream_async(stream_id) - self._stream_readers = {} - - for stream in self._async_closing_streams: - stream.wait_until_closed() - self._async_closing_streams.remove(stream) - - def close_stream_async(self, stream_id: int) -> SyncedStreamReader: - stream = self._stream_readers.pop(stream_id) - self._async_closing_streams.append(stream) - stream.close() - return stream - - -class StreamManagerAPI(BrainFrameAPI): - """Augments the API class to manage and provide a StreamManager.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._stream_manager = None - - def delete_stream_configuration(self, stream_id, - timeout=120): - super().delete_stream_configuration(stream_id, timeout) - if self._stream_manager is not None \ - and self._stream_manager.is_streaming(stream_id): - self._stream_manager.close_stream_async(stream_id) - - def get_stream_manager(self): - """Returns a singleton StreamManager object""" - # Lazily import streaming code to avoid OpenCV dependencies unless - # necessary - from brainframe_qt.api_utils.streaming import StreamManager - - if self._stream_manager is None: - self._stream_manager = StreamManager(self.get_status_receiver()) - return self._stream_manager - - def get_stream_reader(self, stream_config: StreamConfiguration): - """Get the SyncedStreamReader for the given stream_configuration. - - :param stream_config: The stream configuration to open. - :return: A SyncedStreamReader object + """Request and wait for all streams to close""" + logging.info("Initiating StreamManager close") + self._close() + logging.info("StreamManager close finished") + + def delete_stream(self, stream_id: int, timeout: int = 120) -> None: + """[blocking API] Delete a stream through the API and initiate the closing of + its corresponding StreamReader + """ + api.delete_stream_configuration(stream_id, timeout=timeout) + self.stop_streaming(stream_id) + + def pause_streaming(self, stream_id) -> None: + self._set_stream_paused(stream_id, True) + self._ensure_running_streams() + + def resume_streaming(self, stream_id) -> None: + self._set_stream_paused(stream_id, False) + self._ensure_running_streams() + + def start_streaming( + self, + stream_conf: StreamConfiguration, + url: str, + ) -> SyncedStreamReader: + """Starts reading from the stream using the given information, or returns an + existing reader if we're already reading this stream. + + :param stream_conf: The stream to connect to + :param url: The URL to stream on + :return: A SyncedStreamReader for the stream """ - url = self.get_stream_url(stream_config.id) - logging.info("API: Opening stream on url " + url) + return self._start_stream(stream_conf, url) - return self.get_stream_manager().start_streaming(stream_config, url) + def stop_streaming(self, stream_id: int) -> None: + """Requests a stream to close asynchronously - def close(self): - super().close() - if self._stream_manager is not None: - self._stream_manager.close() - self._stream_manager = None + :param stream_id: The ID of the stream to delete + """ + self._stop_stream(stream_id) + self._ensure_running_streams() + + def _close(self) -> None: + with self._stream_lock: + streams = self.stream_readers.copy() + for stream_id, stream_reader in streams.items(): + self._stop_stream(stream_id) + stream_reader.wait_until_closed() + + def _create_synced_reader( + self, stream_conf: StreamConfiguration, url: str + ) -> SyncedStreamReader: + + synced_stream_reader = SyncedStreamReader( + stream_conf, + url, + # No parent if moving to a different thread + parent=typing.cast(QObject, None), + ) + + return synced_stream_reader + + def _ensure_running_streams(self) -> None: + """Pause/unpause streams if over/under max""" + with self._stream_lock: + while ( + len(self._running_streams) < self._MAX_ACTIVE_STREAMS + and self._paused_streams + ): + self._set_stream_paused(self._paused_streams[0], False) + + for _running_stream_id in self._running_streams[self._MAX_ACTIVE_STREAMS:]: + self._set_stream_paused(_running_stream_id, paused=True) + + def _forget_stream(self, stream_id: int) -> None: + with self._stream_lock: + self._set_stream_paused(stream_id, paused=True) + + self._paused_streams.remove(stream_id) + self.stream_readers.pop(stream_id) + + def _get_stream_reader( + self, + stream_conf: StreamConfiguration, + url: str, + ) -> Optional[SyncedStreamReader]: + with self._stream_lock: + if stream_conf.id in self.stream_readers: + # If it's paused, then run self._unpause_stream, otherwise, return it + stream_reader = self.stream_readers[stream_conf.id] + else: + stream_reader = self._create_synced_reader(stream_conf, url) + self.stream_readers[stream_conf.id] = stream_reader + + return stream_reader + + def _set_stream_paused(self, stream_id: int, paused: bool) -> None: + with self._stream_lock: + stream_reader = self.stream_readers[stream_id] + + if paused: + stream_reader.pause_streaming() + destination_list = self._paused_streams + else: + stream_reader.resume_streaming() + destination_list = self._running_streams + + if stream_id in self._paused_streams: + self._paused_streams.remove(stream_id) + if stream_id in self._running_streams: + self._running_streams.remove(stream_id) + + destination_list.insert(0, stream_id) + + def _start_stream( + self, + stream_conf: StreamConfiguration, + url: str, + ) -> SyncedStreamReader: + with self._stream_lock: + stream_reader = self._get_stream_reader(stream_conf, url) + + if stream_conf.id not in self._running_streams: + self._set_stream_paused(stream_conf.id, False) + + self._ensure_running_streams() + + return stream_reader + + def _stop_stream(self, stream_id: int) -> None: + with self._stream_lock: + stream_reader = self.stream_readers[stream_id] + self._forget_stream(stream_id) + stream_reader.close() diff --git a/brainframe_qt/api_utils/streaming/synced_reader.py b/brainframe_qt/api_utils/streaming/synced_reader.py index 4c501b81b..5f855b3e2 100644 --- a/brainframe_qt/api_utils/streaming/synced_reader.py +++ b/brainframe_qt/api_utils/streaming/synced_reader.py @@ -1,176 +1,303 @@ import logging -from threading import RLock, Thread -from time import sleep -from typing import Optional, Set +from enum import Enum, auto +from threading import Event, Thread +from typing import Optional -from brainframe.api import StatusReceiver +from PyQt5.QtCore import QObject, pyqtSignal + +from brainframe.api.bf_codecs import StreamConfiguration +from gstly import gobject_init from gstly.stream_reader import GstStreamReader, StreamReader, StreamStatus +from brainframe_qt.api_utils import api from brainframe_qt.util.events import or_events + from .frame_syncer import FrameSyncer -from .stream_listener import StreamListener from .zone_status_frame import ZoneStatusFrame -class SyncedStreamReader(StreamReader): +class SyncedStatus(Enum): + """SyncedStreamReader wrapper of gstly's StreamStatus. + + Adds PAUSED and FINISHED states. + """ + INITIALIZING = auto() + STREAMING = auto() + HALTED = auto() + CLOSED = auto() + PAUSED = auto() + """Streaming is paused client-side to save bandwidth/resources""" + FINISHED = auto() + """The SyncedStreamReader is terminating and its thread will close soon""" + + @classmethod + def from_stream_status(cls, status: StreamStatus) -> "SyncedStatus": + if status is StreamStatus.INITIALIZING: + return cls.INITIALIZING + elif status is StreamStatus.STREAMING: + return cls.STREAMING + elif status is StreamStatus.HALTED: + return cls.HALTED + elif status is StreamStatus.CLOSED: + return cls.CLOSED + else: + raise ValueError(f"Unknown StreamStatus {status}") + + +class SyncedStreamReader(QObject): """Reads frames from a stream and syncs them up with zone statuses.""" - def __init__(self, - stream_id: int, - stream_reader: GstStreamReader, - status_receiver: StatusReceiver): + frame_received = pyqtSignal() + stream_state_changed = pyqtSignal(SyncedStatus) + + finished = pyqtSignal() + + REHOSTED_VIDEO_TYPES = [ + StreamConfiguration.ConnType.WEBCAM, + StreamConfiguration.ConnType.FILE, + ] + """Video types that are re-hosted by the server""" + + def __init__( + self, + stream_conf: StreamConfiguration, + stream_url: str, + *, + parent: QObject + ): """Creates a new SyncedStreamReader. - :param stream_id: The stream ID that this synced stream reader is for - :param stream_reader: The stream reader to get frames from - :param status_receiver: The StatusReceiver currently in use + :param stream_conf: The stream that this synced stream reader is for + :param stream_url: The url of the stream """ - self.stream_id = stream_id - self._stream_reader = stream_reader - self.status_receiver = status_receiver + super().__init__(parent=parent) + + self.stream_conf = stream_conf + self.stream_url = stream_url + + self._stream_reader: Optional[GstStreamReader] = None self.latest_processed_frame: Optional[ZoneStatusFrame] = None - """Latest frame synced with results. None if no frames have been synced - yet""" + """Latest frame synced with results. None if no frames have been synced yet""" + + self.frame_syncer = FrameSyncer() - self.stream_listeners: Set[StreamListener] = set() - self._stream_listeners_lock = RLock() + self._stream_status = SyncedStatus.INITIALIZING - # Start threads, now that the object is all set up - self._thread = Thread( - name=f"SyncedStreamReader thread for stream ID {stream_reader}", - target=self._sync_detections_with_stream, + self._start_streaming_event = Event() + """Used to request the thread to start streaming""" + self._pause_streaming_event = Event() + """Used to request the thread to (temporarily) pause streaming""" + + self._start_streaming_event.set() + + self._interrupt_requested = False + """Used to signal thread to stop""" + + # Start thread, now that the object is all set up + self._thread = self._init_thread() + + def _init_thread(self) -> Thread: + thread = Thread( + name=f"SyncedStreamReader thread for stream ID {self.stream_conf.id}", + target=self.run, daemon=True ) - self._thread.start() - - def alert_frame_listeners(self): - with self._stream_listeners_lock: - if self.status is StreamStatus.STREAMING: - for listener in self.stream_listeners: - listener.frame_event.set() - - def alert_status_listeners(self, status: StreamStatus): - """This should be called whenever the StreamStatus has changed""" - with self._stream_listeners_lock: - if status is StreamStatus.INITIALIZING: - for listener in self.stream_listeners: - listener.stream_initializing_event.set() - elif status is StreamStatus.HALTED: - for listener in self.stream_listeners: - listener.stream_halted_event.set() - elif status is StreamStatus.CLOSED: - for listener in self.stream_listeners: - listener.stream_closed_event.set() - else: - logging.critical("SyncedStreamReader: An event was called, but" - " was not actually necessary!") - for listener in self.stream_listeners: - listener.stream_error_event.set() - - def add_listener(self, listener: StreamListener): - with self._stream_listeners_lock: - self.stream_listeners.add(listener) - if self.status is not StreamStatus.STREAMING: - self.alert_status_listeners(self.status) - elif self.latest_processed_frame is not None: - self.alert_frame_listeners() - else: - listener.stream_initializing_event.set() - def remove_listener(self, listener: StreamListener): - with self._stream_listeners_lock: - self.stream_listeners.remove(listener) - listener.clear_all_events() + thread.start() - @property - def status(self) -> StreamStatus: - return self._stream_reader.status + return thread @property - def latest_frame(self): - return self._stream_reader.latest_frame + def is_streaming_paused(self) -> bool: + """Whether the stream is paused or has been requested to do so""" + return ( + self._pause_streaming_event.is_set() + or self.stream_status is SyncedStatus.PAUSED + ) @property - def new_frame_event(self): - return self._stream_reader.new_frame_event + def stream_status(self) -> SyncedStatus: + """The current status of the stream""" + return self._stream_status - @property - def new_status_event(self): - return self._stream_reader.new_status_event + @stream_status.setter + def stream_status(self, stream_status: SyncedStatus) -> None: + """[private] Setter method for the stream status - def set_runtime_option_vals(self, runtime_options: dict): - self._stream_reader.set_runtime_option_vals(runtime_options) + The stream_state_changed signal is emitted if the status changes. + """ + if stream_status is not self._stream_status: + self._stream_status = stream_status + self.stream_state_changed.emit(stream_status) - def _sync_detections_with_stream(self): - while self.status != StreamStatus.INITIALIZING: - sleep(0.01) + def close(self) -> None: + """Sends a request to close the SyncedStreamReader""" + logging.debug(f"SyncedStreamReader for stream {self.stream_conf.id} closing") - # Create the FrameSyncer - frame_syncer = FrameSyncer() + self._interrupt_requested = True - frame_or_status_event = or_events(self._stream_reader.new_frame_event, - self._stream_reader.new_status_event) + def pause_streaming(self) -> None: + """Pause streaming. - while True: - frame_or_status_event.wait() + Streaming is not immediately paused, but will be handled in the main loop in + _process_stream_events + """ + self._pause_streaming_event.set() - if self._stream_reader.new_status_event.is_set(): - self._stream_reader.new_status_event.clear() - if self.status is StreamStatus.CLOSED: - break - if self.status is not StreamStatus.STREAMING: - self.alert_status_listeners(self.status) - continue - - # If streaming is the new event we need to process the frame - if not self._stream_reader.new_frame_event.is_set(): - continue + def resume_streaming(self) -> None: + """Resume (more accurately, re-start) streaming. - # new_frame_event must have been triggered - self._stream_reader.new_frame_event.clear() + Streaming is not immediately paused, but will be handled in the main loop. This + function does nothing (except print a warning) if the stream is not already + paused. + """ + if not self.is_streaming_paused: + logging.warning( + "Attempted to unpause streaming on SyncedStreamReader for stream " + f"{self.stream_conf.id}, but it is not paused." + ) + return - # Get the new frame + timestamp - frame_tstamp, frame_bgr = self._stream_reader.latest_frame - frame_rgb = frame_bgr[..., ::-1].copy() - del frame_bgr + self._start_streaming_event.set() - # Get the latest zone statuses from thread status receiver thread - statuses = self.status_receiver.latest_statuses(self.stream_id) + def run(self) -> None: + """Main loop for the created thread""" + while not self._interrupt_requested: + if self._start_streaming_event.wait(0.2): + self._start_streaming() + self._process_stream_events() - # Run the syncing algorithm - new_processed_frame = frame_syncer.sync( - latest_frame=ZoneStatusFrame( - frame=frame_rgb, - tstamp=frame_tstamp, - ), - latest_zone_statuses=statuses - ) + if self._stream_reader is not None: + self._stop_streaming() + + self._finish() + + def wait_until_closed(self) -> None: + """Hangs until the SyncedStreamReader has been closed. Must be called from + another QThread""" + if self._stream_reader is not None: + self._stream_reader.wait_until_closed() - if new_processed_frame is not None: - if self.latest_processed_frame is None: - is_new = True - else: - previous_tstamp = self.latest_processed_frame.tstamp - new_tstamp = new_processed_frame.tstamp - is_new = new_tstamp > previous_tstamp + self.thread().wait() + + def _finish(self) -> None: + """Final clean-up for the SyncedStreamReader. + + To be called during complete stream shutdown, not simple pauses. + Sets the status to FINISHED and emits the `finished` signal + """ + logging.debug(f"SyncedStreamReader for stream {self.stream_conf.id} closed") - # This value must be set before alerting frame listeners. This - # prevents a race condition where latest_processed_frame is - # None - self.latest_processed_frame = new_processed_frame + self.stream_status = SyncedStatus.FINISHED + self.finished.emit() - # Alert frame listeners if this a new frame - if is_new: - self.alert_frame_listeners() + def _handle_frame_event(self) -> None: + self._stream_reader.new_frame_event.clear() - logging.info("SyncedStreamReader: Closing") + # Get the new frame + timestamp + frame_tstamp, frame_bgr = self._stream_reader.latest_frame + frame_rgb = frame_bgr[..., ::-1].copy() + del frame_bgr - def close(self): - """Sends a request to close the SyncedStreamReader.""" - self._stream_reader.close() + # Get the latest zone statuses from status receiver thread + statuses = api.get_status_receiver().latest_statuses(self.stream_conf.id) - def wait_until_closed(self): - """Hangs until the SyncedStreamReader has been closed.""" + # Convert the numpy frame to a QPixmap + frame = ZoneStatusFrame.pixmap_from_numpy_frame(frame_rgb) + + # Run the syncing algorithm + new_processed_frame = self.frame_syncer.sync( + latest_frame=ZoneStatusFrame( + frame=frame, + tstamp=frame_tstamp, + ), + latest_zone_statuses=statuses + ) + + if new_processed_frame is not None: + if self.latest_processed_frame is None: + is_new = True + else: + previous_tstamp = self.latest_processed_frame.tstamp + new_tstamp = new_processed_frame.tstamp + is_new = new_tstamp > previous_tstamp + + # This value must be set before alerting frame listeners. This prevents a + # race condition where latest_processed_frame is None + self.latest_processed_frame = new_processed_frame + + # Alert frame listeners if this a new frame + if is_new: + self.frame_received.emit() + + def _handle_status_event(self) -> None: + self._stream_reader.new_status_event.clear() + + self.stream_status = SyncedStatus.from_stream_status(self._stream_reader.status) + + def _start_streaming(self) -> None: + """Create a new GstStreamReader. Gstreamer streaming begins immediately""" + self._start_streaming_event.clear() + + pipeline: Optional[str] = self.stream_conf.connection_options.get("pipeline") + + latency = StreamReader.DEFAULT_LATENCY + if self.stream_conf.connection_type in self.REHOSTED_VIDEO_TYPES: + latency = StreamReader.REHOSTED_LATENCY + + # Streams created with a premises are always proxied from that premises + is_proxied = self.stream_conf.premises_id is not None + + gobject_init.start() + + self._stream_reader = GstStreamReader( + url=self.stream_url, + latency=latency, + runtime_options=self.stream_conf.runtime_options, + pipeline_str=pipeline, + proxied=is_proxied + ) + + # Ensure that the status is sent out (esp. if we're resuming a stream) + self.stream_status = SyncedStatus.INITIALIZING + + def _stop_streaming(self) -> None: + """Stop the current stream. Blocking function. + + Tells the GstStreamReader to close and wait for the thread to join. Then + discards the reference to the GstStreamReader + """ self._stream_reader.wait_until_closed() - self._thread.join() + self._stream_reader = None + + def _process_stream_events(self) -> None: + """Handle posted events in current object and within the GstStreamReader""" + if self._stream_reader is None: + logging.warning( + f"Attempted to process stream events on SyncedStreamReader for stream " + f"{self.stream_conf.id} without a GstStreamReader set" + ) + return + + frame_or_status_event = or_events(self._stream_reader.new_frame_event, + self._stream_reader.new_status_event) + + while not self._interrupt_requested: + + if self._pause_streaming_event.is_set(): + # Streaming paused. Stop loop for now + self.stream_status = SyncedStatus.PAUSED + self._pause_streaming_event.clear() + break + + if not frame_or_status_event.wait(0.2): + continue + + if self._stream_reader.new_status_event.is_set(): + self._handle_status_event() + if self._stream_reader.new_frame_event.is_set(): + self._handle_frame_event() + + if self._stream_reader is not None: + self._stop_streaming() diff --git a/brainframe_qt/api_utils/streaming/zone_status_frame.py b/brainframe_qt/api_utils/streaming/zone_status_frame.py index cae6555a5..83b902489 100644 --- a/brainframe_qt/api_utils/streaming/zone_status_frame.py +++ b/brainframe_qt/api_utils/streaming/zone_status_frame.py @@ -3,6 +3,8 @@ from typing import Dict, List, Optional import numpy as np +from PyQt5.QtGui import QImage, QPixmap + from brainframe.api.bf_codecs import ZoneStatus from brainframe_qt.api_utils.detection_tracks import DetectionTrack @@ -12,8 +14,8 @@ class ZoneStatusFrame: """A frame that may or may not have undergone processing on the server.""" - frame: np.ndarray - """RGB data on the frame""" + frame: QPixmap + """Frame as a QPixmap""" tstamp: float """The timestamp of the frame""" @@ -38,6 +40,13 @@ class ZoneStatusFrame: 'frame_metadata': 'ZoneStatusFrameMeta', } + @staticmethod + def pixmap_from_numpy_frame(frame: np.ndarray) -> QPixmap: + height, width, channel = frame.shape + bytes_per_line = width * 3 + image = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888) + return QPixmap.fromImage(image) + @dataclass class ZoneStatusFrameMeta: diff --git a/brainframe_qt/ui/brainframe_app.py b/brainframe_qt/ui/brainframe_app.py index 91ebcdece..33ca8f703 100644 --- a/brainframe_qt/ui/brainframe_app.py +++ b/brainframe_qt/ui/brainframe_app.py @@ -12,7 +12,7 @@ from gstly import gobject_init import brainframe_qt -from brainframe_qt.api_utils import api +from brainframe_qt.api_utils import api, init_stream_manager, get_stream_manager from brainframe_qt.api_utils.connection_manager import ConnectionManager from brainframe_qt.extensions.loader import ExtensionLoader from brainframe_qt.ui import EULADialog, MainWindow, SplashScreen @@ -193,6 +193,13 @@ def _shutdown(self): api.close() gobject_init.close() + try: + stream_manager = get_stream_manager() + except RuntimeError: + pass + else: + stream_manager.close() + self.connection_manager.requestInterruption() self.connection_manager.wait(5000) # milliseconds self.connection_manager.terminate() @@ -203,6 +210,10 @@ def _start_ui(self): self._check_server_version() + # Initialize the StreamManager + # TODO: Find a better way of doing this + init_stream_manager(parent=self) + ExtensionLoader().load_extensions() self.main_window = MainWindow() diff --git a/brainframe_qt/ui/dialogs/task_configuration/task_configuration.py b/brainframe_qt/ui/dialogs/task_configuration/task_configuration.py index ce96d0dd7..e120bfcb2 100644 --- a/brainframe_qt/ui/dialogs/task_configuration/task_configuration.py +++ b/brainframe_qt/ui/dialogs/task_configuration/task_configuration.py @@ -1,6 +1,6 @@ from typing import Optional -from PyQt5.QtCore import pyqtSlot +from PyQt5.QtCore import pyqtSlot, QObject from PyQt5.QtWidgets import QDialog, QDialogButtonBox, QInputDialog from PyQt5.uic import loadUi @@ -15,21 +15,17 @@ class TaskConfiguration(QDialog): - def __init__(self, parent=None, stream_conf=None): - """There should _always_ be a stream_configuration passed here. The - reason it is None is to allow compatibility with QTDesigner.""" - super().__init__(parent) + def __init__(self, stream_conf: StreamConfiguration, *, parent: QObject): + super().__init__(parent=parent) loadUi(qt_ui_paths.task_configuration_ui, self) - # If not running within QTDesigner, run logic - self.stream_conf: StreamConfiguration = stream_conf - if stream_conf: - self.video_task_config.change_stream(stream_conf) - self.stream_name_label.setText(stream_conf.name) + self.stream_conf = stream_conf + self.video_task_config.change_stream(stream_conf) + self.stream_name_label.setText(stream_conf.name) - # Create TaskAndZone widgets in ZoneList for zones in database - self.zone_list.init_zones(stream_conf.id) + # Create TaskAndZone widgets in ZoneList for zones in database + self.zone_list.init_zones(stream_conf.id) self.unconfirmed_zone: Zone = None @@ -39,7 +35,7 @@ def __init__(self, parent=None, stream_conf=None): @classmethod def open_configuration(cls, stream_conf, parent): - dialog = cls(parent=parent, stream_conf=stream_conf) + dialog = cls(stream_conf=stream_conf, parent=parent) result = dialog.exec_() return result @@ -232,13 +228,3 @@ def _hide_operation_widgets(self, hidden): self.confirm_op_button.setHidden(hidden) self.cancel_op_button.setHidden(hidden) self.instruction_label.setHidden(hidden) - - -if __name__ == '__main__': - from PyQt5.QtWidgets import QApplication - - app = QApplication([]) - window = TaskConfiguration(None) - window.show() - - app.exec_() diff --git a/brainframe_qt/ui/dialogs/task_configuration/video_task_config/video_task_config.py b/brainframe_qt/ui/dialogs/task_configuration/video_task_config/video_task_config.py index a5b435a99..50420ffe5 100644 --- a/brainframe_qt/ui/dialogs/task_configuration/video_task_config/video_task_config.py +++ b/brainframe_qt/ui/dialogs/task_configuration/video_task_config/video_task_config.py @@ -94,7 +94,7 @@ def on_frame(self, frame: ZoneStatusFrame): if self.in_progress_zone is None: super().on_frame(frame) else: - self.scene().set_frame(frame=frame.frame) + self.scene().set_frame(pixmap=frame.frame) def start_new_zone(self, zone_type: InProgressZoneType) -> None: # Temporarily disable region and line drawing diff --git a/brainframe_qt/ui/main_window/activities/stream_activity/stream_view.py b/brainframe_qt/ui/main_window/activities/stream_activity/stream_view.py index bb02bcc73..843e43e5c 100644 --- a/brainframe_qt/ui/main_window/activities/stream_activity/stream_view.py +++ b/brainframe_qt/ui/main_window/activities/stream_activity/stream_view.py @@ -19,7 +19,7 @@ def _init_signals(self): lambda: self.display_expanded_video(False)) self.video_expanded_view.stream_delete_signal.connect( - self.video_thumbnail_view.delete_stream_conf) + self.video_thumbnail_view.remove_stream) self.video_expanded_view.stream_delete_signal.connect( lambda: self.display_expanded_video(False)) diff --git a/brainframe_qt/ui/main_window/activities/stream_configuration/stream_configuration.py b/brainframe_qt/ui/main_window/activities/stream_configuration/stream_configuration.py index 71c3a3def..de1ad448d 100644 --- a/brainframe_qt/ui/main_window/activities/stream_configuration/stream_configuration.py +++ b/brainframe_qt/ui/main_window/activities/stream_configuration/stream_configuration.py @@ -6,19 +6,16 @@ from PyQt5.QtCore import Qt, pyqtSignal from PyQt5.QtGui import QKeyEvent from PyQt5.QtWidgets import QDialogButtonBox, QWidget + from brainframe.api import bf_codecs, bf_errors -from brainframe_qt.api_utils import api +from brainframe_qt.api_utils import api, get_stream_manager from brainframe_qt.ui.main_window.activities.stream_configuration \ .stream_configuration_ui import StreamConfigurationUI -from brainframe_qt.ui.resources import CanceledError, ProgressFileReader, \ - QTAsyncWorker -from brainframe_qt.ui.resources.links.documentation \ - import IP_CAMERA_DOCS_LINK -from brainframe_qt.ui.resources.ui_elements.widgets import \ - FileUploadProgressDialog -from brainframe_qt.ui.resources.ui_elements.widgets.dialogs import \ - BrainFrameMessage +from brainframe_qt.ui.resources import CanceledError, ProgressFileReader, QTAsyncWorker +from brainframe_qt.ui.resources.links.documentation import IP_CAMERA_DOCS_LINK +from brainframe_qt.ui.resources.ui_elements.widgets import FileUploadProgressDialog +from brainframe_qt.ui.resources.ui_elements.widgets.dialogs import BrainFrameMessage class StreamConfiguration(StreamConfigurationUI): @@ -697,7 +694,7 @@ def _handle_start_analysis_error( if isinstance(exc, bf_errors.AnalysisLimitExceededError): # Delete the stream configuration, since you almost never want to # have a stream that can't have analysis running - QTAsyncWorker(self, api.delete_stream_configuration, + QTAsyncWorker(self, get_stream_manager().delete_stream, f_args=(stream_conf.id,)) \ .start() diff --git a/brainframe_qt/ui/main_window/main_window.py b/brainframe_qt/ui/main_window/main_window.py index fd0c90b19..6e43bad06 100644 --- a/brainframe_qt/ui/main_window/main_window.py +++ b/brainframe_qt/ui/main_window/main_window.py @@ -129,7 +129,8 @@ def close_stream_configuration(): self.close_sidebar_widget() def change_stream_configuration( - stream_conf: Optional[bf_codecs.StreamConfiguration]): + stream_conf: bf_codecs.StreamConfiguration + ) -> None: sidebar_widget = self.sidebar_dock_widget.widget() if not isinstance(sidebar_widget, StreamConfiguration): return @@ -185,7 +186,7 @@ def _connect_sidebar_widget_signals(self) -> None: if isinstance(sidebar_widget, StreamConfiguration): stream_view = self._activity_widget_map[self._stream_activity] sidebar_widget.stream_conf_modified.connect( - stream_view.video_thumbnail_view.add_stream_conf) + stream_view.video_thumbnail_view.add_stream) sidebar_widget.stream_conf_modified.connect( self._handle_stream_config_modification) diff --git a/brainframe_qt/ui/main_window/video_expanded_view/alert_log/alert_log.py b/brainframe_qt/ui/main_window/video_expanded_view/alert_log/alert_log.py index 44a9a3a0a..af326f4cb 100644 --- a/brainframe_qt/ui/main_window/video_expanded_view/alert_log/alert_log.py +++ b/brainframe_qt/ui/main_window/video_expanded_view/alert_log/alert_log.py @@ -33,11 +33,15 @@ def __init__(self, parent=None): self.alert_log.setLayout(QVBoxLayout()) - def change_stream(self, stream_id): + def change_stream(self, stream_id: int) -> None: self.stream_id = stream_id self._delete_alerts_by_id(self.alert_widgets.keys()) self.sync_alerts_with_server() + def stop_streaming(self) -> None: + self.stream_id = None + self._delete_alerts_by_id(self.alert_widgets.keys()) + def sync_alerts_with_server(self): # Important. Used in set_alerts_checked diff --git a/brainframe_qt/ui/main_window/video_expanded_view/video_expanded_view.py b/brainframe_qt/ui/main_window/video_expanded_view/video_expanded_view.py index dbb24d65a..3908aa8fa 100644 --- a/brainframe_qt/ui/main_window/video_expanded_view/video_expanded_view.py +++ b/brainframe_qt/ui/main_window/video_expanded_view/video_expanded_view.py @@ -4,16 +4,16 @@ from PyQt5.QtCore import QEvent, QTimerEvent, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QWidget from PyQt5.uic import loadUi + from brainframe.api.bf_codecs import StreamConfiguration from requests.exceptions import RequestException -from brainframe_qt.api_utils import api +from brainframe_qt.api_utils import api, get_stream_manager from brainframe_qt.ui.dialogs import CapsuleConfigDialog, TaskConfiguration from brainframe_qt.ui.resources import QTAsyncWorker, stylesheet_watcher from brainframe_qt.ui.resources.paths import qt_qss_paths, qt_ui_paths from brainframe_qt.ui.resources.ui_elements.buttons import FloatingXButton -from brainframe_qt.ui.resources.ui_elements.widgets.dialogs import \ - BrainFrameMessage +from brainframe_qt.ui.resources.ui_elements.widgets.dialogs import BrainFrameMessage class VideoExpandedView(QWidget): @@ -119,10 +119,10 @@ def expanded_stream_closed_slot(self): """Signaled by close button""" # Stop attempting to display a stream - self.expanded_video.change_stream(None) + self.expanded_video.stop_streaming() # Stop alert log from asking for alerts from stream - self.alert_log.change_stream(None) + self.alert_log.stop_streaming() # No more stream_conf associate with self.stream_conf = None @@ -146,7 +146,7 @@ def delete_stream_button_clicked(self): # Delete stream from database QTAsyncWorker( - self, api.delete_stream_configuration, + self, get_stream_manager().delete_stream, f_args=(self.stream_conf.id, 600) ).start() diff --git a/brainframe_qt/ui/main_window/video_expanded_view/video_large/stream_overlay/stream_widget_overlay.py b/brainframe_qt/ui/main_window/video_expanded_view/video_large/stream_overlay/stream_widget_overlay.py index 295b7eb29..35f2daa0f 100644 --- a/brainframe_qt/ui/main_window/video_expanded_view/video_large/stream_overlay/stream_widget_overlay.py +++ b/brainframe_qt/ui/main_window/video_expanded_view/video_large/stream_overlay/stream_widget_overlay.py @@ -1,11 +1,11 @@ from datetime import timedelta -from typing import List, Optional +from typing import List from PyQt5.QtWidgets import QWidget from brainframe.api import bf_codecs -from brainframe_qt.api_utils.streaming.zone_status_frame import \ - ZoneStatusFrameMeta +from brainframe_qt.api_utils.streaming.zone_status_frame import ZoneStatusFrameMeta + from . import alerts as stream_alerts from .stream_widget_overlay_ui import StreamWidgetOverlayUI @@ -16,21 +16,17 @@ class StreamWidgetOverlay(StreamWidgetOverlayUI): def __init__(self, *, parent: QWidget): super().__init__(parent=parent) - def change_stream(self, - stream_conf: Optional[bf_codecs.StreamConfiguration]) \ - -> None: - - if stream_conf is None: - self.titlebar.set_stream_name(None) - else: - self.titlebar.set_stream_name(stream_conf.name) + def change_stream(self, stream_conf: bf_codecs.StreamConfiguration) -> None: + self.titlebar.set_stream_name(stream_conf.name) - def handle_frame_metadata(self, frame_metadata: ZoneStatusFrameMeta) \ - -> None: + def handle_frame_metadata(self, frame_metadata: ZoneStatusFrameMeta) -> None: alerts = self._metadata_to_alerts(frame_metadata) self.body.handle_alerts(alerts) + def stop_streaming(self) -> None: + self.titlebar.set_stream_name(None) + def _metadata_to_alerts(self, frame_metadata: ZoneStatusFrameMeta) \ -> List[stream_alerts.AbstractOverlayAlert]: alerts: List[stream_alerts.AbstractOverlayAlert] = [] diff --git a/brainframe_qt/ui/main_window/video_expanded_view/video_large/video_large.py b/brainframe_qt/ui/main_window/video_expanded_view/video_large/video_large.py index b975948f2..77fcad227 100644 --- a/brainframe_qt/ui/main_window/video_expanded_view/video_large/video_large.py +++ b/brainframe_qt/ui/main_window/video_expanded_view/video_large/video_large.py @@ -1,11 +1,10 @@ -from typing import Optional - from PyQt5.QtWidgets import QSizePolicy, QVBoxLayout, QWidget + from brainframe.api import bf_codecs -from brainframe_qt.api_utils.streaming.zone_status_frame import \ - ZoneStatusFrame +from brainframe_qt.api_utils.streaming.zone_status_frame import ZoneStatusFrame from brainframe_qt.ui.resources.video_items.streams import StreamWidget + from .stream_overlay import StreamWidgetOverlay @@ -38,14 +37,16 @@ def _init_style(self): # of the layout when the VideoLarge widget is very narrow self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred) - def change_stream(self, - stream_conf: Optional[bf_codecs.StreamConfiguration]) \ - -> None: - + def change_stream(self, stream_conf: bf_codecs.StreamConfiguration) -> None: super().change_stream(stream_conf) self.stream_overlay.change_stream(stream_conf) + def stop_streaming(self) -> None: + super().stop_streaming() + + self.stream_overlay.stop_streaming() + def on_frame(self, frame: ZoneStatusFrame): super().on_frame(frame) self.stream_overlay.handle_frame_metadata(frame.frame_metadata) diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/video_small/video_small.py b/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/video_small/video_small.py deleted file mode 100644 index 4dd65fd23..000000000 --- a/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/video_small/video_small.py +++ /dev/null @@ -1,140 +0,0 @@ -from PyQt5.QtCore import QRectF, Qt, pyqtSignal -from PyQt5.QtGui import QColor, QFontMetricsF, QImage, QPainter -from PyQt5.QtWidgets import QWidget - -from brainframe_qt.api_utils.streaming.zone_status_frame import \ - ZoneStatusFrame -from brainframe_qt.ui.resources.video_items.streams import StreamWidget - - -class VideoSmall(StreamWidget): - """Video for ThumbnailView""" - - stream_clicked = pyqtSignal(object) - """Alerts the layout view that a thumbnail has been clicked - - Connected to: - - ThumbnailGridLayout -- Dynamic - [parent].thumbnail_stream_clicked_slot - """ - ongoing_alerts_signal = pyqtSignal(bool) - """Alerts the layout view that the stream has a change in the state of its - ongoing alert - - Connected to: - - ThumbnailGridLayout -- Dynamic - [parent].ongoing_alerts_slot - """ - - def __init__(self, parent: QWidget): - - self.alerts_ongoing: bool = False - - super().__init__(parent=parent) - - def on_frame(self, frame: ZoneStatusFrame): - super().on_frame(frame) - - # zone_statuses can be None if the server has never once returned a - # result for this stream. - if frame.zone_statuses is not None: - self.manage_alert_indication(frame.zone_statuses) - - def manage_alert_indication(self, zone_statuses): - - # Any active alerts? - alerts = any(zone_status.alerts - for zone_status in zone_statuses.values()) - - # self.ongoing_alerts is used during every paint in drawForeground - if alerts and not self.alerts_ongoing: - self.alerts_ongoing = True - # noinspection PyUnresolvedReferences - self.ongoing_alerts_signal.emit(True) - elif not alerts and self.alerts_ongoing: - self.alerts_ongoing = False - # noinspection PyUnresolvedReferences - self.ongoing_alerts_signal.emit(False) - - def drawForeground(self, painter: QPainter, rect: QRectF): - """Draw the alert UI if there are ongoing alerts - - Alert is at the bottom of the QGraphics view, full width and 1/3 height. - Text is positioned manually such that it looks nice within alert. - Icon is positioned manually such that it looks nice within alert. - - Overrides the super().drawForeground method which by default does pretty - much nothing - """ - # Gray rectangle - brush = painter.brush() - brush.setColor(QColor(0, 0, 0, 127)) # Half transparent black - brush.setStyle(Qt.SolidPattern) - painter.setBrush(brush) - - # No border - painter.setPen(Qt.NoPen) - - width = self.scene().width() - height = self.scene().height() - bar_percent = .1 - - bar_height = width * bar_percent - - # Draw alert background - painter.fillRect(0, int(height - bar_height), - int(width), int(bar_height), - painter.brush()) - - image_width_with_margins = 0 - if self.alerts_ongoing: - # Draw icon - image_percent = 0.8 - - image = QImage(":/icons/alert") - image = image.scaled(int(self.sceneRect().width()), - int(bar_height * image_percent), - Qt.KeepAspectRatio) - - image_margin = bar_height / 4 - painter.drawImage( - int(width - image.width() - image_margin), - int(height - image.height() - - (bar_height * (1 - image_percent) / 2)), - image) - - image_width_with_margins = image.width() + (2.5 * image_margin) - - # Draw text - font = painter.font() - point_size = bar_height / 2 - font.setPointSizeF(point_size if point_size > 0 else 1) - painter.setFont(font) - painter.setPen(Qt.white) - - font_metric = QFontMetricsF(font) - stream_name_text = font_metric.elidedText( - self.stream_conf.name, - Qt.ElideRight, - width - image_width_with_margins) - - painter.drawText(int(point_size / 2), - int(height - (point_size / 2)), - stream_name_text) - - def mouseReleaseEvent(self, event): - # Add border around stream to indicate its selection - self.add_selection_border() - - # noinspection PyUnresolvedReferences - self.stream_clicked.emit(self.stream_conf) - - super().mousePressEvent(event) - - def add_selection_border(self): - """Add border around stream""" - pass - - def remove_selection_border(self): - """Remove border around stream""" - pass diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view.py b/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view.py index f0b42a896..581326679 100644 --- a/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view.py +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view.py @@ -1,53 +1,44 @@ from typing import Dict, List -from PyQt5.QtCore import QMetaObject, QThread, Q_ARG, Qt, pyqtSignal, \ - pyqtSlot +from PyQt5.QtCore import QMetaObject, QThread, Q_ARG, Qt, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QWidget -from brainframe.api import bf_codecs -from brainframe_qt.api_utils import api +from brainframe.api.bf_codecs import Alert, StreamConfiguration + +from brainframe_qt.api_utils import api, get_stream_manager from brainframe_qt.api_utils.zss_pubsub import zss_publisher -from brainframe_qt.ui.main_window.video_thumbnail_view \ - .thumbnail_grid_layout.video_small.video_small import VideoSmall -from brainframe_qt.ui.main_window.video_thumbnail_view \ - .video_thumbnail_view_ui import _VideoThumbnailViewUI from brainframe_qt.ui.resources import QTAsyncWorker +from .widgets.video_small import VideoSmall +from .video_thumbnail_view_ui import _VideoThumbnailViewUI + class VideoThumbnailView(_VideoThumbnailViewUI): - stream_clicked = pyqtSignal(bf_codecs.StreamConfiguration) + stream_clicked = pyqtSignal(StreamConfiguration) def __init__(self, parent: QWidget): super().__init__(parent) - self.update_streams() - self._init_signals() - def _init_signals(self): + self._retrieve_remote_streams() + + def _init_signals(self) -> None: self.alert_stream_layout.thumbnail_stream_clicked_signal.connect( self.stream_clicked) self.alertless_stream_layout.thumbnail_stream_clicked_signal.connect( self.stream_clicked) + self.alert_stream_layout.thumbnail_stream_clicked_signal.connect( + self._refresh_active_streams) + self.alertless_stream_layout.thumbnail_stream_clicked_signal.connect( + self._refresh_active_streams) def _init_alert_pubsub(self): """Called after streams are initially populated""" stream_sub = zss_publisher.subscribe_alerts(self._handle_alerts) self.destroyed.connect(lambda: zss_publisher.unsubscribe(stream_sub)) - def update_streams(self): - - def on_success(stream_confs: List[bf_codecs.StreamConfiguration]): - for stream_conf in stream_confs: - self.add_stream_conf(stream_conf) - - self._init_alert_pubsub() - - QTAsyncWorker(self, api.get_stream_configurations, - on_success=on_success) \ - .start() - @property def streams(self) -> Dict[int, VideoSmall]: alert_streams = self.alert_stream_layout.stream_widgets @@ -57,16 +48,20 @@ def streams(self) -> Dict[int, VideoSmall]: return all_streams - def show_background_image(self, show_background: bool): + def show_background_image(self, show_background: bool) -> None: self.scroll_area.setHidden(show_background) self.background_widget.setVisible(show_background) - def add_stream_conf(self, stream_conf: bf_codecs.StreamConfiguration): + def add_stream(self, stream_conf: StreamConfiguration) -> None: self.alertless_stream_layout.new_stream_widget(stream_conf) self.show_background_image(False) - def delete_stream_conf(self, stream_conf: bf_codecs.StreamConfiguration): + def expand_video_grids(self, expand) -> None: + self.alertless_stream_layout.expand_grid(expand) + self.alert_stream_layout.expand_grid(expand) + + def remove_stream(self, stream_conf: StreamConfiguration) -> None: # Figure out which layout the stream is in, and remove it for layout in [self.alert_stream_layout, self.alertless_stream_layout]: for stream_id, stream_widget in layout.stream_widgets.items(): @@ -82,12 +77,8 @@ def delete_stream_conf(self, stream_conf: bf_codecs.StreamConfiguration): if len(self.streams) == 0: self.show_background_image(True) - def expand_video_grids(self, expand): - self.alertless_stream_layout.expand_grid(expand) - self.alert_stream_layout.expand_grid(expand) - - @pyqtSlot(object) - def _handle_alerts(self, alerts: List[bf_codecs.Alert]): + @pyqtSlot(object) # This is here for QMetaObject.invokeMethod + def _handle_alerts(self, alerts: List[Alert]) -> None: if QThread.currentThread() != self.thread(): # Move to the UI Thread @@ -105,17 +96,16 @@ def _handle_alerts(self, alerts: List[bf_codecs.Alert]): # Currently unsupported continue - if alert.end_time is not None \ - and stream_id not in alertless_streams: + if alert.end_time is not None and stream_id not in alertless_streams: + video_widget = self.alert_stream_layout.pop_stream_widget(stream_id) - video_widget = self.alert_stream_layout.pop_stream_widget( - stream_id) + video_widget.alerts_ongoing = False self.alertless_stream_layout.add_video(video_widget) elif alert.end_time is None and stream_id not in alert_streams: + video_widget = self.alertless_stream_layout.pop_stream_widget(stream_id) - video_widget = self.alertless_stream_layout.pop_stream_widget( - stream_id) + video_widget.alerts_ongoing = True self.alert_stream_layout.add_video(video_widget) streams_with_alerts = len(self.alert_stream_layout.stream_widgets) > 0 @@ -127,6 +117,19 @@ def _handle_alerts(self, alerts: List[bf_codecs.Alert]): abandoned_stream_ids = alert_streams.keys() - stream_ids_with_alerts for stream_id in abandoned_stream_ids: - video_widget = self.alert_stream_layout \ - .pop_stream_widget(stream_id) + video_widget = self.alert_stream_layout.pop_stream_widget(stream_id) self.alertless_stream_layout.add_video(video_widget) + + def _refresh_active_streams(self, stream_conf: StreamConfiguration) -> None: + get_stream_manager().resume_streaming(stream_conf.id) + + def _retrieve_remote_streams(self) -> None: + + def on_success(stream_confs: List[StreamConfiguration]) -> None: + for stream_conf in stream_confs: + self.add_stream(stream_conf) + + self._init_alert_pubsub() + + QTAsyncWorker(self, api.get_stream_configurations, on_success=on_success) \ + .start() diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view_ui.py b/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view_ui.py index aaa275936..4c1b74e63 100644 --- a/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view_ui.py +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/video_thumbnail_view_ui.py @@ -2,13 +2,12 @@ from PyQt5.QtGui import QResizeEvent from PyQt5.QtWidgets import QScrollArea, QVBoxLayout, QWidget -from brainframe_qt.ui.main_window.video_thumbnail_view.thumbnail_grid_layout.thumbnail_grid_layout import \ - ThumbnailGridLayout from brainframe_qt.ui.resources import stylesheet_watcher from brainframe_qt.ui.resources.mixins.style import TransientScrollbarMI from brainframe_qt.ui.resources.paths import qt_qss_paths -from brainframe_qt.ui.resources.ui_elements.widgets import \ - BackgroundImageText +from brainframe_qt.ui.resources.ui_elements.widgets import BackgroundImageText + +from .widgets.thumbnail_grid_layout import ThumbnailGridLayout class _VideoThumbnailViewUI(QWidget): diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/__init__.py b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/__init__.py new file mode 100644 index 000000000..f30b1d8c9 --- /dev/null +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/__init__.py @@ -0,0 +1 @@ +from .thumbnail_grid_layout import ThumbnailGridLayout diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/thumbnail_grid_layout.py b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/thumbnail_grid_layout.py similarity index 90% rename from brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/thumbnail_grid_layout.py rename to brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/thumbnail_grid_layout.py index f42a0065d..47b26a2f8 100644 --- a/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/thumbnail_grid_layout.py +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/thumbnail_grid_layout.py @@ -1,12 +1,15 @@ +import typing from typing import Dict from PyQt5.QtCore import Qt, pyqtProperty, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QWidget from PyQt5.uic import loadUi + from brainframe.api.bf_codecs import StreamConfiguration from brainframe_qt.ui.resources.paths import qt_ui_paths -from .video_small.video_small import VideoSmall + +from ..video_small import VideoSmall class ThumbnailGridLayout(QWidget): @@ -46,17 +49,18 @@ def __init__(self, parent=None, grid_num_columns=3): self._init_style() - def new_stream_widget(self, stream_conf: StreamConfiguration): + def new_stream_widget(self, stream_conf: StreamConfiguration) -> None: video = VideoSmall(parent=self) video.change_stream(stream_conf) - self.add_video(video) + video.stream_clicked.connect(self.thumbnail_stream_clicked_slot) + video.alert_status_changed.connect(self.ongoing_alerts_slot) - self._connect_widget_signals(video) + self.add_video(video) def add_video(self, video: VideoSmall) -> None: - stream_id = video.stream_conf.id + stream_id = video.stream_event_manager.stream_conf.id self.stream_widgets[stream_id] = video self._add_widget_to_layout(video) @@ -71,15 +75,6 @@ def _add_widget_to_layout(self, widget): # (+1 is for indexing at 1 for a count) self.grid_num_rows = row + 1 - def _connect_widget_signals(self, widget: VideoSmall): - """Connect the stream widget's signal(s) to the grid and to the parent - view - """ - # Because the widgets are added dynamically, we can't connect slots - # and signals using QtDesigner and have to do it manually - widget.stream_clicked.connect(self.thumbnail_stream_clicked_slot) - widget.ongoing_alerts_signal.connect(self.ongoing_alerts_slot) - @pyqtSlot(object) def thumbnail_stream_clicked_slot(self, stream_conf): """Signaled by child VideoWidget and then passed upwards @@ -105,7 +100,8 @@ def ongoing_alerts_slot(self, alerts_ongoing: bool): - VideoSmall -- Dynamic [child].ongoing_alerts_signal """ - stream_conf = self.sender().stream_conf + stream_widget = typing.cast(VideoSmall, self.sender()) + stream_conf = stream_widget.stream_event_manager.stream_conf # noinspection PyUnresolvedReferences self.ongoing_alerts_signal.emit(stream_conf, alerts_ongoing) diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/thumbnail_grid_layout.ui b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/thumbnail_grid_layout.ui similarity index 100% rename from brainframe_qt/ui/main_window/video_thumbnail_view/thumbnail_grid_layout/thumbnail_grid_layout.ui rename to brainframe_qt/ui/main_window/video_thumbnail_view/widgets/thumbnail_grid_layout/thumbnail_grid_layout.ui diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/__init__.py b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/__init__.py new file mode 100644 index 000000000..a0362d9d2 --- /dev/null +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/__init__.py @@ -0,0 +1 @@ +from .video_small import VideoSmall diff --git a/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/video_small.py b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/video_small.py new file mode 100644 index 000000000..12eddb4e2 --- /dev/null +++ b/brainframe_qt/ui/main_window/video_thumbnail_view/widgets/video_small/video_small.py @@ -0,0 +1,98 @@ +from PyQt5.QtCore import QRectF, Qt, pyqtSignal +from PyQt5.QtGui import QColor, QFontMetricsF, QImage, QPainter, QMouseEvent +from PyQt5.QtWidgets import QWidget +from brainframe.api.bf_codecs import StreamConfiguration + +from brainframe_qt.api_utils import get_stream_manager +from brainframe_qt.ui.resources.video_items.streams import StreamWidget + + +class VideoSmall(StreamWidget): + """Video for ThumbnailView""" + + stream_clicked = pyqtSignal(StreamConfiguration) + """A thumbnail has been clicked""" + + alert_status_changed = pyqtSignal(bool) + """The stream's ongoing alert state has changed""" + + def __init__(self, parent: QWidget): + super().__init__(parent=parent) + + self.alerts_ongoing: bool = False + + def drawForeground(self, painter: QPainter, rect: QRectF): + """Draw the alert UI if there are ongoing alerts + + Alert is at the bottom of the QGraphics view, full width and 1/3 height. + Text is positioned manually such that it looks nice within alert. + Icon is positioned manually such that it looks nice within alert. + + Overrides the super().drawForeground method which by default does pretty much + nothing + """ + # Gray rectangle + brush = painter.brush() + brush.setColor(QColor(0, 0, 0, 127)) # Half transparent black + brush.setStyle(Qt.SolidPattern) + painter.setBrush(brush) + + # No border + painter.setPen(Qt.NoPen) + + width = self.scene().width() + height = self.scene().height() + bar_percent = .1 + + bar_height = width * bar_percent + + # Draw alert background + painter.fillRect(0, int(height - bar_height), + int(width), int(bar_height), + painter.brush()) + + image_width_with_margins = 0 + if self.alerts_ongoing: + # Draw icon + image_percent = 0.8 + + image = QImage(":/icons/alert") + image = image.scaled(int(self.sceneRect().width()), + int(bar_height * image_percent), + Qt.KeepAspectRatio) + + image_margin = bar_height / 4 + painter.drawImage( + int(width - image.width() - image_margin), + int(height - image.height() + - (bar_height * (1 - image_percent) / 2)), + image) + + image_width_with_margins = image.width() + (2.5 * image_margin) + + if self.stream_event_manager.stream_conf is not None: + # Draw text + font = painter.font() + point_size = bar_height / 2 + font.setPointSizeF(point_size if point_size > 0 else 1) + painter.setFont(font) + painter.setPen(Qt.white) + + font_metric = QFontMetricsF(font) + stream_name_text = font_metric.elidedText( + self.stream_event_manager.stream_conf.name, + Qt.ElideRight, + width - image_width_with_margins) + + painter.drawText(int(point_size / 2), + int(height - (point_size / 2)), + stream_name_text) + + def mouseReleaseEvent(self, event: QMouseEvent) -> None: + super().mousePressEvent(event) + + if event.button() == Qt.LeftButton: + self.stream_clicked.emit(self.stream_event_manager.stream_conf) + + def manage_alert_state(self, alerts_active: bool) -> None: + self.alerts_ongoing = alerts_active diff --git a/brainframe_qt/ui/resources/i18n/brainframe_zh.ts b/brainframe_qt/ui/resources/i18n/brainframe_zh.ts index 9cf73fd73..9d5b0659e 100644 --- a/brainframe_qt/ui/resources/i18n/brainframe_zh.ts +++ b/brainframe_qt/ui/resources/i18n/brainframe_zh.ts @@ -282,7 +282,7 @@ Read the manual to learn about the required directory structure.<br><br 连接服务器成功 - + Program Closing: License Not Accepted 程序正在关闭:授权不符 @@ -332,7 +332,7 @@ Read the manual to learn about the required directory structure.<br><br 。客户端必须关闭。 - + Version Mismatch 版本不匹配 @@ -342,22 +342,22 @@ Read the manual to learn about the required directory structure.<br><br 正在收集服务器身份验证配置 - + The server is using version {server_version} but this client is on version {client_version}. 服务器版本为{server_version},但此客户端版为{client_version}。 - + For a stable experience, please <a href='{client_download_url}'>download</a> the latest {outdated} version. 为了获得更稳定的体验,请<a href='{client_download_url}'>下载</a>最新的{outdated}版本。 - + client 客户端 - + server 服务器端 @@ -1163,87 +1163,87 @@ Please recheck the entered server address. StreamConfiguration - + Error Opening Stream 打开视频流时出错 - + Error encountered while uploading video file 上传视频文件过程中发生了一个错误 - + Stream source already open 视频流源已经打开 - + You already have the stream source open. 您已经打开了视频流源。 - + Error: 错误: - + Error encountered while opening stream 打开视频流时出错 - + Is stream already open? 视频流是否已打开? - + Is this a valid stream source? 这是一个有效的视频流源吗? - + Active Stream Limit Exceeded 视频流数量超过上限 - + You have exceeded the number of active streams available to you under the terms of your license. Consider deleting another stream or contacting Aotu to increase your active stream limit. 您已经超出了许可条款下可供使用的最大视频流数量,请考虑删除其他视频流或与Aotu联系以增加最大视频流限制。 - + Error uploading file 上传文件时发生错误 - + No such file: {filepath} 没有这样的文件:{filepath} - + Adding a webcam 添加一个网络摄像头 - + Webcams and other video devices must be attached to the computer running the BrainFrame server.<br><br>To add a webcam, open a terminal on the server machine and run <pre>ls /dev/video*</pre>If you get a message about "No such file or directory", you do not have any webcams attached to theserver computer.<br><br>Otherwise, select the digit at the end of the results and provide it to BrainFrame. For example, if the command returns "/dev/video0", input "0" (without the quotes). 网络摄像头和其他视频设备必须连接到运行BrainFrame的计算机。如果在启动BrainFrame后插入设备,请重新启动BrainFrame。<br><br>要添加网络摄像头,请在服务器上打开终端并运行<pre>ls /dev/video*</pre>如果收到类似“No such file or directory“的消息,则说明运行BrainFrame的计算机没有连接任何网络摄像头。<br><br>否则,请选择结果末尾的数字并将其提供给BrainFrame。例如,如果命令返回“/dev/video0“,则输入“0“(不带引号)。 - + Adding an IP Camera 添加IP摄像头 - + Please see the <a href='{ip_camera_docs_link}'>documentation</a> for more information on adding IP Cameras. 有关添加IP摄像机的详细信息,请参见<a href='{ip_camera_docs_link}'>文档</a>。 - + Standard RTSP format:<br>{rtsp_format} 标准RTSP格式:<br>{rtsp_format} @@ -1284,27 +1284,27 @@ Please recheck the entered server address. TaskConfiguration - + New zone cannot have fewer than 2 points 新检测区域不能由于少于两个点组成 - + Add points until done, then press "Confirm" button 添加点直到完成,然后按“确认”按钮 - + Item Name Already Exists 名称已存在 - + Item {} already exists in Stream 项目{}已存在于视频流中 - + Please use another name. 请使用另一个名称。 @@ -1349,22 +1349,22 @@ Please recheck the entered server address. 取消 - + New Line 新检测线段 - + Name for new line: 新检测线段名称: - + New Region 新检测区域 - + Name for new region: 新检测区域名称: @@ -1539,17 +1539,17 @@ Please recheck the entered server address. _VideoThumbnailViewUI - + Streams with ongoing alerts: 正在警报的视频流: - + Streams without alerts: 无警报的视频流: - + Click the âž• button to add a new stream 点击 ➕ 按钮以添加新的视频流 diff --git a/brainframe_qt/ui/resources/images/stream_paused.png b/brainframe_qt/ui/resources/images/stream_paused.png new file mode 100644 index 000000000..8d3aedaff --- /dev/null +++ b/brainframe_qt/ui/resources/images/stream_paused.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a37918105a2ff2d3e8b8a8b555bca2762a86ce48150647485834508724196e08 +size 43653 diff --git a/brainframe_qt/ui/resources/images/stream_paused_inkscape.svg b/brainframe_qt/ui/resources/images/stream_paused_inkscape.svg new file mode 100644 index 000000000..0d3f584d2 --- /dev/null +++ b/brainframe_qt/ui/resources/images/stream_paused_inkscape.svg @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ddf620b6d4110b5d5070e63c526fe570e2a03da0c117d450de4e65457393a59c +size 4030 diff --git a/brainframe_qt/ui/resources/paths/qt_ui_paths.py b/brainframe_qt/ui/resources/paths/qt_ui_paths.py index 7fe3983e4..614e97da6 100644 --- a/brainframe_qt/ui/resources/paths/qt_ui_paths.py +++ b/brainframe_qt/ui/resources/paths/qt_ui_paths.py @@ -12,8 +12,9 @@ alert_log_ui = register_path(alert_log_dir , "alert_log.ui" ) video_thumbnail_view_dir = register_path(main_window_dir , "video_thumbnail_view" ) +thumbnail_widget_dir = register_path(video_thumbnail_view_dir , "widgets" ) -thumbnail_grid_layout_dir = register_path(video_thumbnail_view_dir , "thumbnail_grid_layout" ) +thumbnail_grid_layout_dir = register_path(thumbnail_widget_dir , "thumbnail_grid_layout" ) thumbnail_grid_layout_ui = register_path(thumbnail_grid_layout_dir , "thumbnail_grid_layout.ui" ) # Activities diff --git a/brainframe_qt/ui/resources/qt_resources.qrc b/brainframe_qt/ui/resources/qt_resources.qrc index 4c53019bd..88639547a 100644 --- a/brainframe_qt/ui/resources/qt_resources.qrc +++ b/brainframe_qt/ui/resources/qt_resources.qrc @@ -6,6 +6,8 @@ images/loading_image.svg images/no_image_available.svg images/splash_screen.png + images/stream_paused.png + images/streaming_stopped.png images/icons/alarm.svg diff --git a/brainframe_qt/ui/resources/video_items/streams/stream_event_manager.py b/brainframe_qt/ui/resources/video_items/streams/stream_event_manager.py new file mode 100644 index 000000000..4271c8d96 --- /dev/null +++ b/brainframe_qt/ui/resources/video_items/streams/stream_event_manager.py @@ -0,0 +1,251 @@ +import logging +from threading import Event +from typing import Optional + +from PyQt5.QtCore import QObject, pyqtSignal, QTimer + +from brainframe.api.bf_codecs import StreamConfiguration +from brainframe.api.bf_errors import StreamConfigNotFoundError, StreamNotOpenedError + +from brainframe_qt.api_utils import api, get_stream_manager +from brainframe_qt.api_utils.streaming import SyncedStreamReader +from brainframe_qt.api_utils.streaming.synced_reader import SyncedStatus +from brainframe_qt.api_utils.streaming.zone_status_frame import ZoneStatusFrame +from brainframe_qt.ui.resources import QTAsyncWorker + + +class StreamEventManager(QObject): + + stream_initializing = pyqtSignal() + stream_halted = pyqtSignal() + stream_closed = pyqtSignal() + stream_paused = pyqtSignal() + stream_finished = pyqtSignal() + + stream_error = pyqtSignal() + + frame_received = pyqtSignal(ZoneStatusFrame) + + VIDEO_FRAME_RATE = 30 + """Frame rate of polling loop in FPS""" + + def __init__(self, *, parent: QObject): + """Manages events from the stream's SyncedStreamReader""" + super().__init__(parent=parent) + + self._frame_event = Event() + self._status_event = Event() + + self.stream_conf: Optional[StreamConfiguration] = None + self.stream_reader: Optional[SyncedStreamReader] = None + + self._event_timer = self._init_event_timer() + + self._init_signals() + + def _init_event_timer(self) -> QTimer: + timer = QTimer(parent=self) + + timer.setInterval(1000 // self.VIDEO_FRAME_RATE) + timer.start() + + return timer + + def _init_signals(self) -> None: + self._event_timer.timeout.connect(self._process_events) + + @property + def is_streaming_paused(self) -> bool: + if self.stream_conf is None: + return False + if self.stream_reader is None: + return False + + return self.stream_reader.is_streaming_paused + + def change_stream(self, stream_conf: StreamConfiguration) -> None: + if self.stream_reader is not None: + self.stop_streaming() + + self.stream_conf = stream_conf + self.start_streaming() + + def pause_streaming(self) -> None: + if self.stream_reader is None: + logging.warning( + f"Attempted to pause StreamEventManager, but it had no " + f"SyncedStreamReader" + ) + return + + self.stream_reader.pause_streaming() + + def resume_streaming(self) -> None: + if self.stream_reader is None: + logging.warning( + f"Attempted to resume StreamEventManager, but it had no " + f"SyncedStreamReader" + ) + return + + if self.stream_reader.stream_status is not SyncedStatus.PAUSED: + logging.warning( + f"Attempted to resume StreamEventManager streaming for stream " + f"{self.stream_conf.id}, but it was not paused" + ) + return + + self.stream_reader.resume_streaming() + + def start_streaming(self) -> None: + + # Store the current stream_conf before async code is run to check to see if we + # should abort after async + stream_conf = self.stream_conf + + def handle_stream_url(stream_url: Optional[str]) -> None: + # Occurs when the get_stream_url() call fails due to + # the stream having been deleted + if stream_url is None: + return + + # User must have already changed stream again by the time this callback is + # called. Just forget about this current change request. + if stream_conf is not self.stream_conf: + return + + self._subscribe_to_stream(stream_conf, stream_url) + + QTAsyncWorker(self, self._get_stream_url, f_args=(stream_conf,), + on_success=handle_stream_url) \ + .start() + + def stop_streaming(self) -> None: + if self.stream_reader is None: + logging.warning( + f"Attempted to stop StreamEventManager, but it had no " + f"SyncedStreamReader" + ) + return + + self._unsubscribe_from_stream() + + self.stream_conf = None + + def _handle_frame_signal(self) -> None: + """Connected to the SyncedStreamReader""" + self._frame_event.set() + + def _handle_status_signal(self) -> None: + self._status_event.set() + + def _on_frame(self): + self._frame_event.clear() + + if self.stream_reader is None: + logging.info( + f"StreamEventManager for stream {self.stream_conf.id} received " + f"frame event, but SyncedStreamReader is None") + return + + frame = self.stream_reader.latest_processed_frame + + if frame is None: + logging.info( + f"StreamEventManager for stream {self.stream_conf.id} received " + f"frame event, but frame is None") + return + + self.frame_received.emit(frame) + + def _on_state_change(self) -> None: + self._status_event.clear() + + if self.stream_reader is None: + logging.info( + f"StreamEventManager for stream {self.stream_conf.id} received " + f"status event, but SyncedStreamReader is None") + return + + state = self.stream_reader.stream_status + + if state is SyncedStatus.INITIALIZING: + self.stream_initializing.emit() + elif state is SyncedStatus.HALTED: + self.stream_halted.emit() + elif state is SyncedStatus.CLOSED: + self.stream_closed.emit() + elif state is SyncedStatus.PAUSED: + self.stream_paused.emit() + elif state is SyncedStatus.FINISHED: + self.stream_finished.emit() + elif state is SyncedStatus.STREAMING: + # Streaming, but no frame received yet + self.stream_initializing.emit() + else: + self.stream_error.emit() + + def _process_events(self) -> None: + if self._frame_event.is_set(): + self._on_frame() + if self._status_event.is_set(): + self._on_state_change() + + def _unsubscribe_from_stream(self) -> None: + """Remove the StreamEventManager's reference to the SyncedStreamReader after + disconnecting the connected signals/slots. + + Note that this does not directly delete/remove/stop the SyncedStreamReader. We + rely on garbage collection to do this. Multiple StreamWidgets (each with their + own StreamEventManager) could be using the same SyncedStreamReader. + """ + if self.stream_reader is None: + logging.warning( + "Attempted to unsubscribe StreamEventManager from stream when it was " + "already disconnected" + ) + return + + self.stream_reader.frame_received.disconnect(self._handle_frame_signal) + self.stream_reader.stream_state_changed.disconnect(self._handle_status_signal) + + self._frame_event.clear() + self._status_event.clear() + + self.stream_reader = None + + def _subscribe_to_stream( + self, + stream_conf: StreamConfiguration, + stream_url: str + ) -> None: + + # Create the stream reader + stream_manager = get_stream_manager() + stream_reader = stream_manager.start_streaming(stream_conf, stream_url) + + if stream_reader is None: + # This will happen if we try to get a StreamReader for a stream that no + # longer exists, for example if a user clicks to expand a stream the very + # instant before it's deleted from the server. We don't want to do anything + return + + # Connect new signals + stream_reader.frame_received.connect(self._handle_frame_signal) + stream_reader.stream_state_changed.connect(self._handle_status_signal) + + self.stream_reader = stream_reader + + # Don't wait for the first event to start displaying + latest_frame = self.stream_reader.latest_processed_frame + if latest_frame is not None: + self._on_frame() + else: + self._on_state_change() + + @staticmethod + def _get_stream_url(stream_conf: StreamConfiguration) -> Optional[str]: + try: + return api.get_stream_url(stream_conf.id) + except (StreamConfigNotFoundError, StreamNotOpenedError): + return None diff --git a/brainframe_qt/ui/resources/video_items/streams/stream_graphics_scene.py b/brainframe_qt/ui/resources/video_items/streams/stream_graphics_scene.py index b72325c1b..a0d44b9f5 100644 --- a/brainframe_qt/ui/resources/video_items/streams/stream_graphics_scene.py +++ b/brainframe_qt/ui/resources/video_items/streams/stream_graphics_scene.py @@ -1,6 +1,7 @@ -from typing import List +from typing import List, overload -from PyQt5.QtGui import QImage, QPixmap +import numpy as np +from PyQt5.QtGui import QPixmap from PyQt5.QtWidgets import QGraphicsScene, QWidget from brainframe.api import bf_codecs @@ -20,12 +21,17 @@ def __init__(self, *, render_config: RenderSettings, parent: QWidget): self.current_frame = None - def set_frame(self, *, pixmap=None, frame=None, path=None): + @overload + def set_frame(self, pixmap: QPixmap) -> None: + ... - if frame is not None: - pixmap = self._get_pixmap_from_numpy_frame(frame) + @overload + def set_frame(self, path: str) -> None: + ... - elif path is not None: + def set_frame(self, *, pixmap=None, path=None) -> None: + + if path is not None: pixmap = QPixmap(str(path)) """Set the current frame to the given pixmap""" @@ -97,14 +103,6 @@ def condition(item): self.remove_items(self.items(), condition) - @staticmethod - def _get_pixmap_from_numpy_frame(frame): - height, width, channel = frame.shape - bytes_per_line = width * 3 - image = QImage(frame.data, width, height, bytes_per_line, - QImage.Format_RGB888) - return QPixmap.fromImage(image) - def _new_zone_status_polygon(self, zone_status): zone_status_item = ZoneStatusItem( zone_status, diff --git a/brainframe_qt/ui/resources/video_items/streams/stream_listener_widget.py b/brainframe_qt/ui/resources/video_items/streams/stream_listener_widget.py deleted file mode 100644 index 384636a2f..000000000 --- a/brainframe_qt/ui/resources/video_items/streams/stream_listener_widget.py +++ /dev/null @@ -1,148 +0,0 @@ -from typing import Optional - -from PyQt5.QtCore import QCoreApplication, QTimer -from PyQt5.QtWidgets import QWidget -from brainframe.api import bf_codecs, bf_errors - -from brainframe_qt.api_utils import api -from brainframe_qt.api_utils.streaming import StreamListener, \ - SyncedStreamReader -from brainframe_qt.api_utils.streaming.zone_status_frame import \ - ZoneStatusFrame -from brainframe_qt.ui.resources import QTAsyncWorker - - -class StreamListenerWidget(QWidget, StreamListener): - - def __init__(self, *, parent: QWidget): - QWidget.__init__(self, parent=parent) - StreamListener.__init__(self) - - self.stream_conf: Optional[bf_codecs.StreamConfiguration] = None - """Current stream configuration used by the StreamReader""" - - self.stream_reader: Optional[SyncedStreamReader] = None - - self._frame_event_timer = QTimer() - self._frame_event_timer.timeout.connect(self.check_for_frame_events) - self._frame_event_timer.start(1000 // 30) # ~30 FPS - - def on_frame(self, frame: ZoneStatusFrame) -> None: - pass - - def on_stream_init(self) -> None: - pass - - def on_stream_closed(self) -> None: - pass - - def on_stream_error(self) -> None: - pass - - def on_stream_halted(self) -> None: - pass - - def check_for_frame_events(self) -> None: - if self.frame_event.is_set(): - self.frame_event.clear() - - frame = self.stream_reader.latest_processed_frame - self.on_frame(frame) - - if self.stream_initializing_event.is_set(): - self.stream_initializing_event.clear() - self.on_stream_init() - - if self.stream_halted_event.is_set(): - self.stream_halted_event.clear() - self.on_stream_halted() - - if self.stream_closed_event.is_set(): - self.stream_closed_event.clear() - self.on_stream_closed() - - if self.stream_error_event.is_set(): - self.stream_error_event.clear() - self.on_stream_error() - - def change_stream(self, - stream_conf: Optional[bf_codecs.StreamConfiguration]) \ - -> None: - - # Clear the existing stream reader to get ready for a new one - self._clear_current_stream_reader() - self.stream_conf = stream_conf - - # When we no longer want to use a StreamListenerWidget for an active - # stream - if not stream_conf: - # Typically a user shouldn't see this, but sometimes the client is - # laggy in closing the widget, so we don't use the error message - self.on_stream_closed() - - return - - def handle_stream_url(stream_url: Optional[str]) -> None: - # Occurs when the get_stream_url() call fails due to - # the stream having been deleted - if stream_url is None: - return - - self._subscribe_to_stream(stream_conf, stream_url) - - QTAsyncWorker(self, self._get_stream_url, f_args=(stream_conf,), - on_success=handle_stream_url) \ - .start() - - def _clear_current_stream_reader(self): - """If we currently have a stream reader, unsubscribe its listener - and clear any posted events""" - - # Ensure that we're not storing a stream_conf - self.stream_conf = None - - if not self.stream_reader: - return - - self.destroyed.disconnect() - - self.stream_reader.remove_listener(listener=self) - - # Make sure no more events are sent to this listener - QCoreApplication.removePostedEvents(self) - - self.stream_reader = None - - def _subscribe_to_stream(self, stream_conf: bf_codecs.StreamConfiguration, - stream_url: str) \ - -> None: - - # Create the stream reader - stream_reader = api.get_stream_manager() \ - .start_streaming(stream_conf, stream_url) - - if stream_reader is None: - # This will happen if we try to get a StreamReader for a stream - # that no longer exists, for example if a user clicks to expand - # a stream the very instant before it's deleted from the server - # We don't want to do anything - return - - # Subscribe to the StreamReader - self.stream_reader = stream_reader - self.stream_reader.add_listener(listener=self) - - # Make sure video is unsubscribed before it is GCed - self.destroyed.connect( - lambda: self.stream_reader.remove_listener(listener=self)) - - @staticmethod - def _get_stream_url(stream_conf: bf_codecs.StreamConfiguration) \ - -> Optional[str]: - try: - return api.get_stream_url(stream_conf.id) - except ( - bf_errors.StreamConfigNotFoundError, - bf_errors.StreamNotOpenedError - ): - return None diff --git a/brainframe_qt/ui/resources/video_items/streams/stream_widget.py b/brainframe_qt/ui/resources/video_items/streams/stream_widget.py index a41456a5d..a6c9dafdb 100644 --- a/brainframe_qt/ui/resources/video_items/streams/stream_widget.py +++ b/brainframe_qt/ui/resources/video_items/streams/stream_widget.py @@ -4,13 +4,15 @@ from PyQt5.QtGui import QResizeEvent from PyQt5.QtWidgets import QWidget -from brainframe_qt.api_utils.streaming.zone_status_frame import \ - ZoneStatusFrame -from .stream_listener_widget import StreamListenerWidget +from brainframe.api.bf_codecs import StreamConfiguration + +from brainframe_qt.api_utils.streaming.zone_status_frame import ZoneStatusFrame + +from .stream_event_manager import StreamEventManager from .stream_widget_ui import StreamWidgetUI -class StreamWidget(StreamWidgetUI, StreamListenerWidget): +class StreamWidget(StreamWidgetUI): """Base widget that uses Stream object to get frames. Makes use of a QTimer to get frames @@ -19,10 +21,23 @@ class StreamWidget(StreamWidgetUI, StreamListenerWidget): def __init__(self, *, parent: QWidget): super().__init__(parent=parent) + self.stream_event_manager = StreamEventManager(parent=self) + self._draw_lines: Optional[bool] = None self._draw_regions: Optional[bool] = None self._draw_detections: Optional[bool] = None + self._init_signals() + + def _init_signals(self) -> None: + self.stream_event_manager.frame_received.connect(self.on_frame) + + self.stream_event_manager.stream_initializing.connect(self.on_stream_init) + self.stream_event_manager.stream_halted.connect(self.on_stream_halted) + self.stream_event_manager.stream_closed.connect(self.on_stream_halted) + self.stream_event_manager.stream_paused.connect(self.on_stream_paused) + self.stream_event_manager.stream_error.connect(self.on_stream_error) + def resizeEvent(self, _event: Optional[QResizeEvent] = None) -> None: """Take up entire width using aspect ratio of scene""" @@ -67,10 +82,21 @@ def draw_detections(self) -> bool: def draw_detections(self, draw_detections: bool): self._draw_detections = draw_detections - def on_frame(self, frame: ZoneStatusFrame) -> None: + def change_stream(self, stream_conf: StreamConfiguration) -> None: + self.stream_event_manager.change_stream(stream_conf) + + def pause_streaming(self) -> None: + self.stream_event_manager.pause_streaming() + def stop_streaming(self) -> None: + self.stream_event_manager.stop_streaming() + + self.scene().remove_all_items() + self.scene().set_frame(path=":/images/streaming_stopped_png") + + def on_frame(self, frame: ZoneStatusFrame) -> None: self.scene().remove_all_items() - self.scene().set_frame(frame=frame.frame) + self.scene().set_frame(pixmap=frame.frame) # This frame has never been paired with ZoneStatuses from the server # so nothing should be rendered. This occurs when the server has @@ -100,9 +126,10 @@ def on_stream_halted(self) -> None: self.scene().remove_all_items() self.scene().set_frame(path=":/images/connection_lost_png") - def on_stream_closed(self) -> None: - self.on_stream_halted() - def on_stream_error(self) -> None: self.scene().remove_all_items() self.scene().set_frame(path=":/images/error_message_png") + + def on_stream_paused(self) -> None: + self.scene().remove_all_items() + self.scene().set_frame(path=":/images/stream_paused_png") diff --git a/pyproject.toml b/pyproject.toml index 5e6da1f25..9647c12e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "brainframe-qt" -version = "0.28.2" +version = "0.28.4" description = "Official BrainFrame client" authors = ["Aotu"] license = "Proprietary"