diff --git a/infrastructure/beacon_server.py b/infrastructure/beacon_server.py index 2f4545c3e7..5c5c8d04a7 100644 --- a/infrastructure/beacon_server.py +++ b/infrastructure/beacon_server.py @@ -797,21 +797,12 @@ def handle_pcbs_propagation(self): """ Generate a new beacon or gets ready to forward the one received. """ - master = False while True: # Wait until we have enough context to be a useful master # candidate. self._state_synced.wait() - if not master: - logging.debug("Trying to become master") if not self.zk.get_lock(): - if master: - logging.debug("No longer master") - master = False continue - if not master: - logging.debug("Became master") - master = True start_propagation = SCIONTime.get_time() # Create beacon for downstream ADs. downstream_pcb = PathSegment() @@ -1229,21 +1220,12 @@ def handle_pcbs_propagation(self): Main loop to propagate received beacons. """ # TODO: define function that dispatches the pcbs among the interfaces - master = False while True: # Wait until we have enough context to be a useful master # candidate. self._state_synced.wait() - if not master: - logging.debug("Trying to become master") if not self.zk.get_lock(): - if master: - logging.debug("No longer master") - master = False continue - if not master: - logging.debug("Became master") - master = True start_propagation = SCIONTime.get_time() best_segments = self.beacons.get_best_segments() for pcb in best_segments: diff --git a/lib/zookeeper.py b/lib/zookeeper.py index 95b3afd5e3..75a9d51fae 100644 --- a/lib/zookeeper.py +++ b/lib/zookeeper.py @@ -129,10 +129,12 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id, self._lock = threading.Event() # Used to signal connection state changes self._state_events = queue.Queue() + self.conn_epoch = 0 # Kazoo parties self._parties = {} # Kazoo lock (initialised later) self._zk_lock = None + self._lock_epoch = 0 self._kazoo_setup(zk_hosts) self._setup_state_listener() @@ -186,8 +188,10 @@ def _state_listener(self, new_state): """ Called everytime the Kazoo connection state changes. """ + self.conn_epoch += 1 # Signal a connection state change - logging.debug("Kazoo state changed to %s", new_state) + logging.debug("Kazoo state changed to %s (epoch %d)", + new_state, self.conn_epoch) self._state_events.put(new_state) # Tell kazoo not to remove this listener: return False @@ -251,7 +255,6 @@ def _state_suspended(self): This means that the connection to Zookeeper is down. """ self._connected.clear() - self._lock.clear() logging.info("Connection to Zookeeper suspended") if self._on_disconnect: self._on_disconnect() @@ -264,7 +267,6 @@ def _state_lost(self): re-done on connect. """ self._connected.clear() - self._lock.clear() logging.info("Connection to Zookeeper lost") if self._on_disconnect: self._on_disconnect() @@ -349,15 +351,18 @@ def party_setup(self, prefix=None, autojoin=True): self._parties[party_path] = party return party - def get_lock(self, timeout=60.0): + def get_lock(self, lock_timeout=None, conn_timeout=None): """ Try to get the lock. Returns immediately if we already have the lock. - :param float timeout: Time (in seconds) to wait for lock acquisition, - or ``None`` to wait forever. - :type timeout: float or None. - :return: ``True`` if we got the lock, or already had it, otherwise - ``False``. + :param float lock_timeout: + Time (in seconds) to wait for lock acquisition, or ``None`` to wait + forever (Default). + :param float conn_timeout: + Time (in seconds) to wait for a connection to ZK, or ``None`` to + wait forever (Default). + :return: + ``True`` if we got the lock, or already had it, otherwise ``False``. :rtype: :class:`bool` """ if self._zk_lock is None: @@ -366,15 +371,24 @@ def get_lock(self, timeout=60.0): self._zk_lock = self._zk.Lock(lock_path, self._srv_id) if not self.is_connected(): self.release_lock() + if not self.wait_connected(timeout=conn_timeout): return False + if self._lock_epoch != self.conn_epoch: + logging.debug("ZK lock state is stale, releasing (epoch %d != %d)", + self._lock_epoch, self.conn_epoch) + self.release_lock() elif self._lock.is_set(): - # We already have the lock + # We already have the lock. return True + self._lock_epoch = self.conn_epoch + logging.debug("Trying to acquire ZK lock (epoch %d)", self._lock_epoch) try: - if self._zk_lock.acquire(timeout=timeout): + if self._zk_lock.acquire(timeout=lock_timeout): + logging.info("Successfully acquired ZK lock (epoch %d)", + self._lock_epoch) self._lock.set() else: - pass + logging.debug("Failed to acquire ZK lock") except (LockTimeout, ConnectionLoss, SessionExpiredError): pass ret = self.have_lock() @@ -385,6 +399,8 @@ def release_lock(self): Release the lock """ self._lock.clear() + if self._zk_lock is None: + return if self.is_connected(): try: self._zk_lock.release() @@ -397,7 +413,13 @@ def have_lock(self): """ Check if we currently hold the lock """ - return self.is_connected() and self._lock.is_set() + if (self.is_connected() and + self._lock_epoch == self.conn_epoch and + self._lock.is_set()): + return True + else: + self.release_lock() + return False def wait_lock(self): """ diff --git a/test/lib_zookeeper_test.py b/test/lib_zookeeper_test.py index 7f5f15e22d..a59862eca6 100644 --- a/test/lib_zookeeper_test.py +++ b/test/lib_zookeeper_test.py @@ -193,10 +193,12 @@ def test(self, init): # Setup inst = self._init_basic_setup() inst._state_events = create_mock(["put"]) + inst.conn_epoch = 47 # Call ntools.eq_(inst._state_listener("statist"), False) # Tests inst._state_events.put.assert_called_once_with("statist") + ntools.eq_(inst.conn_epoch, 48) class TestZookeeperStateHandler(BaseZookeeper): @@ -327,7 +329,6 @@ class TestZookeeperStateDisconnected(BaseZookeeper): def _check(self, f_name, init, test_callback=False): inst = self._init_basic_setup() inst._connected = create_mock(["clear"]) - inst._lock = create_mock(["clear"]) inst._on_disconnect = None if test_callback: inst._on_disconnect = create_mock() @@ -335,7 +336,6 @@ def _check(self, f_name, init, test_callback=False): getattr(inst, f_name)() # Tests inst._connected.clear.assert_called_once_with() - inst._lock.clear.assert_called_once_with() if test_callback: inst._on_disconnect.assert_called_once_with() @@ -517,74 +517,82 @@ class TestZookeeperGetLock(BaseZookeeper): """ Unit tests for lib.zookeeper.Zookeeper.get_lock """ - @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) - def test_no_lock(self, init): + def _setup(self, is_conn=True, wait_conn=True, l_epoch=1, c_epoch=1): inst = self._init_basic_setup() - inst._zk_lock = None - inst._prefix = "/prefix" + inst._zk_lock = create_mock(["acquire"]) inst._zk = create_mock(["Lock"]) - inst._srv_id = "srvid" - # Short-circuit the rest of get_lock() by making is_connected raise - # StopIteration. inst.is_connected = create_mock() - inst.is_connected.side_effect = [] - # Call - ntools.assert_raises(StopIteration, inst.get_lock) - # Tests - inst._zk.Lock.assert_called_once_with("/prefix/lock", "srvid") + inst.is_connected.return_value = is_conn + inst.release_lock = create_mock() + inst.wait_connected = create_mock() + inst.wait_connected.return_value = wait_conn + inst._lock_epoch = l_epoch + inst.conn_epoch = c_epoch + inst._lock = create_mock(["is_set", "set"]) + inst.have_lock = create_mock() + return inst @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) - def test_not_connected(self, init): - inst = self._init_basic_setup() - inst._zk_lock = True - inst.is_connected = create_mock() - inst.is_connected.return_value = False - inst.release_lock = create_mock() + def test_no_lock_not_conn(self, init): + inst = self._setup(is_conn=False, wait_conn=False) + inst._zk_lock = None + inst._prefix = "/prefix" + inst._srv_id = "srvid" # Call - ntools.assert_false(inst.get_lock()) + ntools.assert_false(inst.get_lock(conn_timeout="conn t/o")) # Tests + inst._zk.Lock.assert_called_once_with("/prefix/lock", "srvid") inst.is_connected.assert_called_once_with() inst.release_lock.assert_called_once_with() + inst.wait_connected.assert_called_once_with(timeout="conn t/o") @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) def test_have_lock(self, init): - inst = self._init_basic_setup() - inst._zk_lock = True - inst.is_connected = create_mock() - inst._lock = create_mock(["is_set"]) + inst = self._setup() # Call ntools.assert_true(inst.get_lock()) # Tests inst._lock.is_set.assert_called_once_with() @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) - def test_acquire(self, init): - inst = self._init_basic_setup() - inst._zk_lock = create_mock(["acquire"]) - inst.is_connected = create_mock() - inst._lock = create_mock(["is_set", "set"]) - inst._lock.is_set.return_value = False - inst.have_lock = create_mock() + def test_stale_epoch_acquire(self, init): + inst = self._setup(l_epoch=0) # Call - ntools.eq_(inst.get_lock(), inst.have_lock.return_value) + ntools.eq_(inst.get_lock(lock_timeout="lock t/o"), + inst.have_lock.return_value) # Tests - inst._zk_lock.acquire.assert_called_once_with(timeout=60.0) + inst.release_lock.assert_called_once_with() + ntools.eq_(inst._lock_epoch, inst.conn_epoch) + inst._zk_lock.acquire.assert_called_once_with(timeout="lock t/o") inst._lock.set.assert_called_once_with() inst.have_lock.assert_called_once_with() + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) + def test_acquire_fail(self, init): + inst = self._setup(l_epoch=0) + inst._zk_lock.acquire.return_value = False + # Call + ntools.eq_(inst.get_lock(lock_timeout="lock t/o"), + inst.have_lock.return_value) + # Tests + inst._zk_lock.acquire.assert_called_once_with(timeout="lock t/o") + ntools.assert_false(inst._lock.set.called) + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) def _check_exception(self, exception, init): inst = self._init_basic_setup() inst._zk_lock = create_mock(["acquire"]) inst._zk_lock.acquire.side_effect = exception inst.is_connected = create_mock() + inst.wait_connected = create_mock() + inst._lock_epoch = inst.conn_epoch = 22 inst._lock = create_mock(["is_set"]) inst._lock.is_set.return_value = False inst.have_lock = create_mock() # Call ntools.eq_(inst.get_lock(), inst.have_lock.return_value) # Tests - inst._zk_lock.acquire.assert_called_once_with(timeout=60.0) + inst._zk_lock.acquire.assert_called_once_with(timeout=None) def test_exceptions(self): for excp in (LockTimeout, ConnectionLoss, @@ -596,6 +604,18 @@ class TestZookeeperReleaseLock(BaseZookeeper): """ Unit tests for lib.zookeeper.Zookeeper.release_lock """ + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) + def test_no_lock(self, init): + inst = self._init_basic_setup() + inst._lock = create_mock(["clear"]) + inst._zk_lock = None + inst.is_connected = create_mock() + # Call + inst.release_lock() + # Tests + inst._lock.clear.assert_called_once_with() + ntools.assert_false(inst.is_connected.called) + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) def test_not_connected(self, init): inst = self._init_basic_setup() @@ -645,29 +665,42 @@ class TestZookeeperHaveLock(BaseZookeeper): """ Unit tests for lib.zookeeper.Zookeeper.have_lock """ - @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) - def _check(self, connected, have_lock, init): + def _setup(self, connected, l_epoch, c_epoch, lock_is_set): inst = self._init_basic_setup() inst.is_connected = create_mock() inst.is_connected.return_value = connected + inst._lock_epoch = l_epoch + inst.conn_epoch = c_epoch inst._lock = create_mock(["is_set"]) - inst._lock.is_set.return_value = have_lock - expected = connected and have_lock + inst._lock.is_set.return_value = lock_is_set + inst.release_lock = create_mock() + return inst + + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) + def test_have(self, init): + inst = self._setup(True, 1, 1, True) # Call - ntools.eq_(inst.have_lock(), expected) + ntools.ok_(inst.have_lock()) # Tests inst.is_connected.assert_called_once_with() - if connected: - inst._lock.is_set.assert_called_once_with() + inst._lock.is_set.assert_called_once_with() + ntools.assert_false(inst.release_lock.called) - def test(self): - for connected, have_lock in ( - (False, False), - (False, True), - (True, False), - (True, True) + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) + def _check_not_have(self, connected, l_epoch, c_epoch, lock_is_set, init): + inst = self._setup(connected, l_epoch, c_epoch, lock_is_set) + # Call + ntools.assert_false(inst.have_lock()) + # Tests + inst.release_lock.assert_called_once_with() + + def test_not_have(self): + for connected, l_epoch, c_epoch, lock_is_set in ( + (False, 1, 1, True), + (True, 0, 1, True), + (True, 1, 1, False), ): - yield self._check, connected, have_lock + yield self._check_not_have, connected, l_epoch, c_epoch, lock_is_set class TestZookeeperWaitLock(BaseZookeeper):