Skip to content

Commit

Permalink
Improve robustness for multiple client connections (#803)
Browse files Browse the repository at this point in the history
* Stability improvements for multiple clients - 1

* Improve robustness for multiple client connections.

* Protect callbacks with reentrant lock.

* Do not re-raise WebSocketClosedError.

* Add comment why WebSocketClosedError is not re-raised.

* Simplify conditional assignment.

Co-authored-by: Jacob Bandes-Storch <[email protected]>

Co-authored-by: Steffen Nattke <[email protected]>
Co-authored-by: Jacob Bandes-Storch <[email protected]>
  • Loading branch information
3 people authored Oct 17, 2022
1 parent a093509 commit fada151
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 20 deletions.
41 changes: 22 additions & 19 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from threading import Lock
from threading import Lock, RLock

from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
Expand Down Expand Up @@ -124,7 +124,7 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F
# Create the subscriber and associated member variables
# Subscriptions is initialized with the current client to start with.
self.subscriptions = {client_id: callback}
self.lock = Lock()
self.rlock = RLock()
self.msg_class = msg_class
self.node_handle = node_handle
self.topic = topic
Expand All @@ -140,8 +140,11 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F

def unregister(self):
self.node_handle.destroy_subscription(self.subscriber)
with self.lock:
with self.rlock:
self.subscriptions.clear()
if self.new_subscriber:
self.node_handle.destroy_subscription(self.new_subscriber)
self.new_subscriber = None

def verify_type(self, msg_type):
"""Verify that the subscriber subscribes to messages of this type.
Expand All @@ -167,7 +170,7 @@ def subscribe(self, client_id, callback):
messages
"""
with self.lock:
with self.rlock:
# If the topic is latched, adding a new subscriber will immediately invoke
# the given callback.
# In any case, the first message is handled using new_sub_callback,
Expand All @@ -190,16 +193,16 @@ def unsubscribe(self, client_id):
client_id -- the ID of the client to unsubscribe
"""
with self.lock:
with self.rlock:
if client_id in self.new_subscriptions:
del self.new_subscriptions[client_id]
else:
del self.subscriptions[client_id]

def has_subscribers(self):
"""Return true if there are subscribers"""
with self.lock:
return len(self.subscriptions) != 0
with self.rlock:
return len(self.subscriptions) + len(self.new_subscriptions) != 0

def callback(self, msg, callbacks=None):
"""Callback for incoming messages on the rclpy subscription.
Expand All @@ -213,18 +216,18 @@ def callback(self, msg, callbacks=None):
"""
outgoing = OutgoingMessage(msg)

# Get the callbacks to call
if not callbacks:
with self.lock:
callbacks = self.subscriptions.values()
with self.rlock:
callbacks = callbacks or self.subscriptions.values()

# Pass the JSON to each of the callbacks
for callback in callbacks:
try:
callback(outgoing)
except Exception as exc:
# Do nothing if one particular callback fails except log it
self.node_handle.get_logger().error(f"Exception calling subscribe callback: {exc}")
# Pass the JSON to each of the callbacks
for callback in callbacks:
try:
callback(outgoing)
except Exception as exc:
# Do nothing if one particular callback fails except log it
self.node_handle.get_logger().error(
f"Exception calling subscribe callback: {exc}"
)

def _new_sub_callback(self, msg):
"""
Expand All @@ -237,7 +240,7 @@ def _new_sub_callback(self, msg):
the subscriptions dictionary is updated with the newly incorporated
subscriptors.
"""
with self.lock:
with self.rlock:
self.callback(msg, self.new_subscriptions.values())
self.subscriptions.update(self.new_subscriptions)
self.new_subscriptions = {}
Expand Down
3 changes: 2 additions & 1 deletion rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ async def prewrite_message(self, message, binary):
"WebSocketClosedError: Tried to write to a closed websocket",
throttle_duration_sec=1.0,
)
raise
# If we end up here, a client has disconnected before its message callback(s) could be removed.
# To avoid log spamming, we only log a warning and do not re-raise the exception here.
except StreamClosedError:
cls.node_handle.get_logger().warn(
"StreamClosedError: Tried to write to a closed stream",
Expand Down

0 comments on commit fada151

Please sign in to comment.