diff --git a/infrastructure/beacon_server.py b/infrastructure/beacon_server.py index 5c5c8d04a7..ecaa097150 100644 --- a/infrastructure/beacon_server.py +++ b/infrastructure/beacon_server.py @@ -224,14 +224,11 @@ def __init__(self, server_id, topo_file, config_file, path_policy_file, # Add more IPs here if we support dual-stack name_addrs = "\0".join([self.id, str(SCION_UDP_PORT), str(self.addr.host_addr)]) - # Set when we have connected and read the existing recent and - # incoming PCBs - self._state_synced = threading.Event() self.zk = Zookeeper( self.topology.isd_id, self.topology.ad_id, BEACON_SERVICE, name_addrs, self.topology.zookeepers, - handle_paths=[(self.ZK_PCB_CACHE_PATH, self.process_pcbs, - self._state_synced)]) + handle_paths=[(self.ZK_PCB_CACHE_PATH, self.process_pcbs)] + ) self.zk.retry("Joining party", self.zk.party_setup) def _get_if_rev_token(self, if_id): @@ -443,18 +440,57 @@ def run(self): Run an instance of the Beacon Server. """ threading.Thread( - target=thread_safety_net, args=(self.handle_pcbs_propagation,), - name="BS.handle_pcbs_propagation", daemon=True).start() - threading.Thread( - target=thread_safety_net, args=(self.register_segments,), - name="BS.register_segments", daemon=True).start() + target=thread_safety_net, args=(self.worker,), + name="BS.worker", daemon=True).start() + # https://github.com/netsec-ethz/scion/issues/308: # threading.Thread( # target=thread_safety_net, args=(self._handle_if_timeouts,), # name="BS._handle_if_timeouts", daemon=True).start() - # Run shared paths handling. - self.zk.run_shared_cache_handling() SCIONElement.run(self) + def worker(self): + """ + Worker thread that takes care of reading shared PCBs from ZK, and + propagating PCBS/registering paths when master. + """ + last_propagation = last_registration = 0 + worker_cycle = 0.5 + while True: + if not self.zk.wait_connected(): + continue + start = SCIONTime.get_time() + # Read shared PCBs from ZK + if not self.zk.run_shared_cache_handling(): + continue + if not self.zk.get_lock(lock_timeout=0, conn_timeout=0): + continue + now = SCIONTime.get_time() + if now - last_propagation >= self.config.propagation_time: + self.handle_pcbs_propagation() + last_propagation = now + if not self._expire_shared_pcbs(): + continue + if (self.config.registers_paths and + now - last_registration >= self.config.registration_time): + self.register_segments() + last_registration = now + sleep_interval(start, worker_cycle, "BS.worker cycle") + + def _expire_shared_pcbs(self): + """ + Remove old PCBs from shared cache + """ + exp_count = None + try: + exp_count = self.zk.expire_shared_items( + self.ZK_PCB_CACHE_PATH, + SCIONTime.get_time() - self.config.propagation_time * 10) + except ZkConnectionLoss: + return False + if exp_count: + logging.debug("Expired %d old PCBs from shared cache", exp_count) + return True + def _try_to_verify_beacon(self, pcb): """ Try to verify a beacon. @@ -797,63 +833,32 @@ def handle_pcbs_propagation(self): """ Generate a new beacon or gets ready to forward the one received. """ - while True: - # Wait until we have enough context to be a useful master - # candidate. - self._state_synced.wait() - if not self.zk.get_lock(): - continue - start_propagation = SCIONTime.get_time() - # Create beacon for downstream ADs. - downstream_pcb = PathSegment() - timestamp = int(SCIONTime.get_time()) - downstream_pcb.iof = InfoOpaqueField.from_values( - OFT.CORE, False, timestamp, self.topology.isd_id) - self.propagate_downstream_pcb(downstream_pcb) - # Create beacon for core ADs. - core_pcb = PathSegment() - core_pcb.iof = InfoOpaqueField.from_values( - OFT.CORE, False, timestamp, self.topology.isd_id) - count = self.propagate_core_pcb(core_pcb) - # Propagate received beacons. A core beacon server can only receive - # beacons from other core beacon servers. - beacons = [] - for ps in self.beacons.values(): - beacons.extend(ps.get_best_segments()) - for pcb in beacons: - count += self.propagate_core_pcb(pcb) - logging.info("Propagated %d Core PCBs", count) - try: - count = self.zk.expire_shared_items( - self.ZK_PCB_CACHE_PATH, - start_propagation - self.config.propagation_time * 10) - except ZkConnectionLoss: - continue - if count: - logging.debug("Expired %d old PCBs from shared cache", count) - sleep_interval(start_propagation, self.config.propagation_time, - "PCB propagation") + # Create beacon for downstream ADs. + downstream_pcb = PathSegment() + timestamp = int(SCIONTime.get_time()) + downstream_pcb.iof = InfoOpaqueField.from_values( + OFT.CORE, False, timestamp, self.topology.isd_id) + self.propagate_downstream_pcb(downstream_pcb) + # Create beacon for core ADs. + core_pcb = PathSegment() + core_pcb.iof = InfoOpaqueField.from_values( + OFT.CORE, False, timestamp, self.topology.isd_id) + core_count = self.propagate_core_pcb(core_pcb) + # Propagate received beacons. A core beacon server can only receive + # beacons from other core beacon servers. + beacons = [] + for ps in self.beacons.values(): + beacons.extend(ps.get_best_segments()) + for pcb in beacons: + core_count += self.propagate_core_pcb(pcb) + if core_count: + logging.info("Propagated %d Core PCBs", core_count) def register_segments(self): """ """ - if not self.config.registers_paths: - logging.info("Path registration unwanted, leaving" - "register_segments") - return - while True: - lock = self.zk.have_lock() - if not lock: - logging.debug("register_segments: waiting for lock") - self.zk.wait_lock() - if not lock: - logging.debug("register_segments: have lock") - lock = True - start_registration = SCIONTime.get_time() - self.register_core_segments() - sleep_interval(start_registration, self.config.registration_time, - "Path registration") + self.register_core_segments() def register_core_segment(self, pcb): """ @@ -1107,23 +1112,8 @@ def register_segments(self): """ Register paths according to the received beacons. """ - if not self.config.registers_paths: - logging.info("Path registration unwanted, " - "leaving register_segments") - return - while True: - lock = self.zk.have_lock() - if not lock: - logging.debug("register_segements: waiting for lock") - self.zk.wait_lock() - if not lock: - logging.debug("register_segments: have lock") - lock = True - start_registration = SCIONTime.get_time() - self.register_up_segments() - self.register_down_segments() - sleep_interval(start_registration, self.config.registration_time, - "Path registration") + self.register_up_segments() + self.register_down_segments() def process_pcbs(self, pcbs): """ @@ -1220,26 +1210,9 @@ def handle_pcbs_propagation(self): Main loop to propagate received beacons. """ # TODO: define function that dispatches the pcbs among the interfaces - while True: - # Wait until we have enough context to be a useful master - # candidate. - self._state_synced.wait() - if not self.zk.get_lock(): - continue - start_propagation = SCIONTime.get_time() - best_segments = self.beacons.get_best_segments() - for pcb in best_segments: - self.propagate_downstream_pcb(pcb) - try: - count = self.zk.expire_shared_items( - self.ZK_PCB_CACHE_PATH, - start_propagation - self.config.propagation_time * 10) - except ZkConnectionLoss: - continue - if count: - logging.debug("Expired %d old PCBs from shared cache", count) - sleep_interval(start_propagation, self.config.propagation_time, - "PCB propagation") + best_segments = self.beacons.get_best_segments() + for pcb in best_segments: + self.propagate_downstream_pcb(pcb) def register_up_segments(self): """ diff --git a/lib/zookeeper.py b/lib/zookeeper.py index 75a9d51fae..7b208a0b76 100644 --- a/lib/zookeeper.py +++ b/lib/zookeeper.py @@ -20,7 +20,6 @@ import os.path import queue import threading -import time # External packages from kazoo.client import KazooClient, KazooRetry, KazooState @@ -95,20 +94,20 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id, :param int isd_id: The ID of the current ISD. :param int ad_id: The ID of the current AD. - :param str srv_type: a service type from - :const:`lib.defines.SERVICE_TYPES` + :param str srv_type: + a service type from :const:`lib.defines.SERVICE_TYPES` :param str srv_id: Service instance identifier. - :param list zk_hosts: List of Zookeeper instances to connect to, in the - form of ``["host:port"..]``. + :param list zk_hosts: + List of Zookeeper instances to connect to, in the form of + ``["host:port"..]``. :param float timeout: Zookeeper session timeout length (in seconds). - :param on_connect: A function called everytime a connection is made to - Zookeeper. - :param on_disconnect: A function called everytime a connection is lost - to Zookeeper. - :param tuple handle_paths: A list of tuples of ZK paths, their - corresponding handler functions, and sync - states. It is ensured that paths exist on - connect. + :param on_connect: + A function called everytime a connection is made to Zookeeper. + :param on_disconnect: + A function called everytime a connection is lost to Zookeeper. + :param tuple handle_paths: + A list of tuples of ZK paths and their corresponding handler + functions. It is ensured that paths exist on connect. """ self._isd_id = isd_id self._ad_id = ad_id @@ -118,8 +117,8 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id, self._on_disconnect = on_disconnect self._shared_caches = [] if handle_paths: - for path, handler, state_synced in handle_paths: - shared_cache = ZkSharedCache(self, path, handler, state_synced) + for path, handler in handle_paths: + shared_cache = ZkSharedCache(self, path, handler) self._shared_caches.append(shared_cache) self._prefix = "/ISD%d-AD%d/%s" % ( self._isd_id, self._ad_id, srv_type) @@ -569,8 +568,14 @@ def retry(self, desc, f, *args, _retries=4, _timeout=10.0, **kwargs): (desc, 1+_retries)) def run_shared_cache_handling(self): - for shared_cache in self._shared_caches: - shared_cache.run() + for cache in self._shared_caches: + try: + cache.handle_shared_entries() + except ZkConnectionLoss: + logging.warning("Reading from %s: Connection to ZK dropped", + cache.path) + return False + return True class ZkParty(object): @@ -624,18 +629,17 @@ class ZkSharedCache(object): """ Class for handling ZK's shared path. """ - def __init__(self, zk, path, handler, state_synced): + def __init__(self, zk, path, handler): """ :param Zookeeper zk: A Zookeeper instance :param str path: The absolute path of the cache :param function handler: Handler for a list of cached objects - :param threading.Event state_synced: state for synchronization """ self.zk = zk self.path = path self.handler = handler - self._state_synced = state_synced self._latest_entry = 0 + self._epoch = 0 def handle_shared_entries(self): """ @@ -648,25 +652,15 @@ def handle_shared_entries(self): While connected, it calls _read_cached_entries() to read updated entries from the cache. """ - while True: - if not self.zk.is_connected(): - self._state_synced.clear() - self.zk.wait_connected() - else: - time.sleep(0.5) - if not self._state_synced.is_set(): - # Make sure we re-read the entire cache - self._latest_entry = 0 - count = None - try: - count = self._read_cached_entries() - except ZkConnectionLoss: - self._state_synced.clear() - continue - if count: - logging.debug("Processed %d new/updated entries from %s", - count, self.path) - self._state_synced.set() + if not self.zk.wait_connected(): + return + if self._epoch != self.zk.conn_epoch: + # Make sure we re-read the entire cache + self._latest_entry = 0 + count = self._read_cached_entries() + if count: + logging.debug("Processed %d new/updated entries from %s", + count, self.path) def _read_cached_entries(self): """ @@ -717,11 +711,3 @@ def _process_cached_entries(self, entries): new_entries.append(raw) self.handler(new_entries) return len(new_entries) - - def run(self): - """ - Run thread that handles shared path. - """ - threading.Thread( - target=thread_safety_net, args=(self.handle_shared_entries,), - name="handle_shared_entries(%s)" % self.path, daemon=True).start() diff --git a/test/lib_zookeeper_test.py b/test/lib_zookeeper_test.py index a59862eca6..b929a359f1 100644 --- a/test/lib_zookeeper_test.py +++ b/test/lib_zookeeper_test.py @@ -73,7 +73,7 @@ def test_full(self, event, queue, ksetup, listener, kstart, cache): event.side_effect = ["event0", "event1"] inst = self._init_basic_setup( timeout=4.5, on_connect="on_conn", on_disconnect="on_dis", - handle_paths=[("path0", "handler0", "state0")]) + handle_paths=[("path0", "handler0")]) # Tests ntools.eq_(inst._isd_id, 1) ntools.eq_(inst._ad_id, 2) @@ -1031,12 +1031,24 @@ class TestZookeeperRunSharedCacheHandling(BaseZookeeper): Unit tests for lib.zookeeper.Zookeeper.run_shared_cache_handling """ @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) - def test(self, init): + def test_basic(self, init): inst = self._init_basic_setup() - inst._shared_caches = [create_mock(['run']), create_mock(['run'])] - inst.run_shared_cache_handling() - inst._shared_caches[0].run.assert_called_once_with() - inst._shared_caches[1].run.assert_called_once_with() + inst._shared_caches = [ + create_mock(['handle_shared_entries']), + create_mock(['handle_shared_entries']) + ] + ntools.ok_(inst.run_shared_cache_handling()) + inst._shared_caches[0].handle_shared_entries.assert_called_once_with() + inst._shared_caches[1].handle_shared_entries.assert_called_once_with() + + @patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None) + def test_conn_loss(self, init): + inst = self._init_basic_setup() + cache = create_mock(['handle_shared_entries', 'path']) + cache.handle_shared_entries.side_effect = ZkConnectionLoss + inst._shared_caches = [cache] + # Call + ntools.assert_false(inst.run_shared_cache_handling()) class TestZkPartyInit(object): @@ -1154,12 +1166,12 @@ class TestZkSharedCacheInit(object): Unit tests for lib.zookeeper.ZkSharedCache.__init__ """ def test(self): - inst = ZkSharedCache("zk", "path", "handler", "state_synced") + inst = ZkSharedCache("zk", "path", "handler") ntools.eq_(inst.zk, "zk") ntools.eq_(inst.path, "path") ntools.eq_(inst.handler, "handler") - ntools.eq_(inst._state_synced, "state_synced") ntools.eq_(inst._latest_entry, 0) + ntools.eq_(inst._epoch, 0) class TestZkSharedCacheReadCachedEntries(object): @@ -1167,7 +1179,7 @@ class TestZkSharedCacheReadCachedEntries(object): Unit test for lib.zookeeper.ZkSharedCache._read_cached_entries """ def test_no_entries(self): - inst = ZkSharedCache("zk", "path", "handler", "state_synced") + inst = ZkSharedCache("zk", "path", "handler") inst.zk = create_mock(["get_shared_metadata"]) inst.zk.get_shared_metadata.return_value = 0 ntools.eq_(inst._read_cached_entries(), 0) @@ -1176,7 +1188,7 @@ def test_no_entries(self): "path") def test_entries(self): - inst = ZkSharedCache("zk", "path", "handler", "state_synced") + inst = ZkSharedCache("zk", "path", "handler") inst.zk = create_mock(["get_shared_metadata"]) inst._process_cached_entries = create_mock() inst._latest_entry = 1