Skip to content

Commit

Permalink
Merge pull request #342 from kormat/bs_worker
Browse files Browse the repository at this point in the history
Refactor BS to merge various threads into one.
  • Loading branch information
kormat committed Aug 25, 2015
2 parents ffbe2a3 + 0453b8f commit 080b470
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 158 deletions.
175 changes: 74 additions & 101 deletions infrastructure/beacon_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,11 @@ def __init__(self, server_id, topo_file, config_file, path_policy_file,
# Add more IPs here if we support dual-stack
name_addrs = "\0".join([self.id, str(SCION_UDP_PORT),
str(self.addr.host_addr)])
# Set when we have connected and read the existing recent and
# incoming PCBs
self._state_synced = threading.Event()
self.zk = Zookeeper(
self.topology.isd_id, self.topology.ad_id, BEACON_SERVICE,
name_addrs, self.topology.zookeepers,
handle_paths=[(self.ZK_PCB_CACHE_PATH, self.process_pcbs,
self._state_synced)])
handle_paths=[(self.ZK_PCB_CACHE_PATH, self.process_pcbs)]
)
self.zk.retry("Joining party", self.zk.party_setup)

def _get_if_rev_token(self, if_id):
Expand Down Expand Up @@ -443,18 +440,57 @@ def run(self):
Run an instance of the Beacon Server.
"""
threading.Thread(
target=thread_safety_net, args=(self.handle_pcbs_propagation,),
name="BS.handle_pcbs_propagation", daemon=True).start()
threading.Thread(
target=thread_safety_net, args=(self.register_segments,),
name="BS.register_segments", daemon=True).start()
target=thread_safety_net, args=(self.worker,),
name="BS.worker", daemon=True).start()
# https://github.com/netsec-ethz/scion/issues/308:
# threading.Thread(
# target=thread_safety_net, args=(self._handle_if_timeouts,),
# name="BS._handle_if_timeouts", daemon=True).start()
# Run shared paths handling.
self.zk.run_shared_cache_handling()
SCIONElement.run(self)

def worker(self):
"""
Worker thread that takes care of reading shared PCBs from ZK, and
propagating PCBS/registering paths when master.
"""
last_propagation = last_registration = 0
worker_cycle = 0.5
while True:
if not self.zk.wait_connected():
continue
start = SCIONTime.get_time()
# Read shared PCBs from ZK
if not self.zk.run_shared_cache_handling():
continue
if not self.zk.get_lock(lock_timeout=0, conn_timeout=0):
continue
now = SCIONTime.get_time()
if now - last_propagation >= self.config.propagation_time:
self.handle_pcbs_propagation()
last_propagation = now
if not self._expire_shared_pcbs():
continue
if (self.config.registers_paths and
now - last_registration >= self.config.registration_time):
self.register_segments()
last_registration = now
sleep_interval(start, worker_cycle, "BS.worker cycle")

def _expire_shared_pcbs(self):
"""
Remove old PCBs from shared cache
"""
exp_count = None
try:
exp_count = self.zk.expire_shared_items(
self.ZK_PCB_CACHE_PATH,
SCIONTime.get_time() - self.config.propagation_time * 10)
except ZkConnectionLoss:
return False
if exp_count:
logging.debug("Expired %d old PCBs from shared cache", exp_count)
return True

def _try_to_verify_beacon(self, pcb):
"""
Try to verify a beacon.
Expand Down Expand Up @@ -797,63 +833,32 @@ def handle_pcbs_propagation(self):
"""
Generate a new beacon or gets ready to forward the one received.
"""
while True:
# Wait until we have enough context to be a useful master
# candidate.
self._state_synced.wait()
if not self.zk.get_lock():
continue
start_propagation = SCIONTime.get_time()
# Create beacon for downstream ADs.
downstream_pcb = PathSegment()
timestamp = int(SCIONTime.get_time())
downstream_pcb.iof = InfoOpaqueField.from_values(
OFT.CORE, False, timestamp, self.topology.isd_id)
self.propagate_downstream_pcb(downstream_pcb)
# Create beacon for core ADs.
core_pcb = PathSegment()
core_pcb.iof = InfoOpaqueField.from_values(
OFT.CORE, False, timestamp, self.topology.isd_id)
count = self.propagate_core_pcb(core_pcb)
# Propagate received beacons. A core beacon server can only receive
# beacons from other core beacon servers.
beacons = []
for ps in self.beacons.values():
beacons.extend(ps.get_best_segments())
for pcb in beacons:
count += self.propagate_core_pcb(pcb)
logging.info("Propagated %d Core PCBs", count)
try:
count = self.zk.expire_shared_items(
self.ZK_PCB_CACHE_PATH,
start_propagation - self.config.propagation_time * 10)
except ZkConnectionLoss:
continue
if count:
logging.debug("Expired %d old PCBs from shared cache", count)
sleep_interval(start_propagation, self.config.propagation_time,
"PCB propagation")
# Create beacon for downstream ADs.
downstream_pcb = PathSegment()
timestamp = int(SCIONTime.get_time())
downstream_pcb.iof = InfoOpaqueField.from_values(
OFT.CORE, False, timestamp, self.topology.isd_id)
self.propagate_downstream_pcb(downstream_pcb)
# Create beacon for core ADs.
core_pcb = PathSegment()
core_pcb.iof = InfoOpaqueField.from_values(
OFT.CORE, False, timestamp, self.topology.isd_id)
core_count = self.propagate_core_pcb(core_pcb)
# Propagate received beacons. A core beacon server can only receive
# beacons from other core beacon servers.
beacons = []
for ps in self.beacons.values():
beacons.extend(ps.get_best_segments())
for pcb in beacons:
core_count += self.propagate_core_pcb(pcb)
if core_count:
logging.info("Propagated %d Core PCBs", core_count)

def register_segments(self):
"""
"""
if not self.config.registers_paths:
logging.info("Path registration unwanted, leaving"
"register_segments")
return
while True:
lock = self.zk.have_lock()
if not lock:
logging.debug("register_segments: waiting for lock")
self.zk.wait_lock()
if not lock:
logging.debug("register_segments: have lock")
lock = True
start_registration = SCIONTime.get_time()
self.register_core_segments()
sleep_interval(start_registration, self.config.registration_time,
"Path registration")
self.register_core_segments()

def register_core_segment(self, pcb):
"""
Expand Down Expand Up @@ -1107,23 +1112,8 @@ def register_segments(self):
"""
Register paths according to the received beacons.
"""
if not self.config.registers_paths:
logging.info("Path registration unwanted, "
"leaving register_segments")
return
while True:
lock = self.zk.have_lock()
if not lock:
logging.debug("register_segements: waiting for lock")
self.zk.wait_lock()
if not lock:
logging.debug("register_segments: have lock")
lock = True
start_registration = SCIONTime.get_time()
self.register_up_segments()
self.register_down_segments()
sleep_interval(start_registration, self.config.registration_time,
"Path registration")
self.register_up_segments()
self.register_down_segments()

def process_pcbs(self, pcbs):
"""
Expand Down Expand Up @@ -1220,26 +1210,9 @@ def handle_pcbs_propagation(self):
Main loop to propagate received beacons.
"""
# TODO: define function that dispatches the pcbs among the interfaces
while True:
# Wait until we have enough context to be a useful master
# candidate.
self._state_synced.wait()
if not self.zk.get_lock():
continue
start_propagation = SCIONTime.get_time()
best_segments = self.beacons.get_best_segments()
for pcb in best_segments:
self.propagate_downstream_pcb(pcb)
try:
count = self.zk.expire_shared_items(
self.ZK_PCB_CACHE_PATH,
start_propagation - self.config.propagation_time * 10)
except ZkConnectionLoss:
continue
if count:
logging.debug("Expired %d old PCBs from shared cache", count)
sleep_interval(start_propagation, self.config.propagation_time,
"PCB propagation")
best_segments = self.beacons.get_best_segments()
for pcb in best_segments:
self.propagate_downstream_pcb(pcb)

def register_up_segments(self):
"""
Expand Down
80 changes: 33 additions & 47 deletions lib/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import os.path
import queue
import threading
import time

# External packages
from kazoo.client import KazooClient, KazooRetry, KazooState
Expand Down Expand Up @@ -95,20 +94,20 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id,
:param int isd_id: The ID of the current ISD.
:param int ad_id: The ID of the current AD.
:param str srv_type: a service type from
:const:`lib.defines.SERVICE_TYPES`
:param str srv_type:
a service type from :const:`lib.defines.SERVICE_TYPES`
:param str srv_id: Service instance identifier.
:param list zk_hosts: List of Zookeeper instances to connect to, in the
form of ``["host:port"..]``.
:param list zk_hosts:
List of Zookeeper instances to connect to, in the form of
``["host:port"..]``.
:param float timeout: Zookeeper session timeout length (in seconds).
:param on_connect: A function called everytime a connection is made to
Zookeeper.
:param on_disconnect: A function called everytime a connection is lost
to Zookeeper.
:param tuple handle_paths: A list of tuples of ZK paths, their
corresponding handler functions, and sync
states. It is ensured that paths exist on
connect.
:param on_connect:
A function called everytime a connection is made to Zookeeper.
:param on_disconnect:
A function called everytime a connection is lost to Zookeeper.
:param tuple handle_paths:
A list of tuples of ZK paths and their corresponding handler
functions. It is ensured that paths exist on connect.
"""
self._isd_id = isd_id
self._ad_id = ad_id
Expand All @@ -118,8 +117,8 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id,
self._on_disconnect = on_disconnect
self._shared_caches = []
if handle_paths:
for path, handler, state_synced in handle_paths:
shared_cache = ZkSharedCache(self, path, handler, state_synced)
for path, handler in handle_paths:
shared_cache = ZkSharedCache(self, path, handler)
self._shared_caches.append(shared_cache)
self._prefix = "/ISD%d-AD%d/%s" % (
self._isd_id, self._ad_id, srv_type)
Expand Down Expand Up @@ -569,8 +568,14 @@ def retry(self, desc, f, *args, _retries=4, _timeout=10.0, **kwargs):
(desc, 1+_retries))

def run_shared_cache_handling(self):
for shared_cache in self._shared_caches:
shared_cache.run()
for cache in self._shared_caches:
try:
cache.handle_shared_entries()
except ZkConnectionLoss:
logging.warning("Reading from %s: Connection to ZK dropped",
cache.path)
return False
return True


class ZkParty(object):
Expand Down Expand Up @@ -624,18 +629,17 @@ class ZkSharedCache(object):
"""
Class for handling ZK's shared path.
"""
def __init__(self, zk, path, handler, state_synced):
def __init__(self, zk, path, handler):
"""
:param Zookeeper zk: A Zookeeper instance
:param str path: The absolute path of the cache
:param function handler: Handler for a list of cached objects
:param threading.Event state_synced: state for synchronization
"""
self.zk = zk
self.path = path
self.handler = handler
self._state_synced = state_synced
self._latest_entry = 0
self._epoch = 0

def handle_shared_entries(self):
"""
Expand All @@ -648,25 +652,15 @@ def handle_shared_entries(self):
While connected, it calls _read_cached_entries() to read updated entries
from the cache.
"""
while True:
if not self.zk.is_connected():
self._state_synced.clear()
self.zk.wait_connected()
else:
time.sleep(0.5)
if not self._state_synced.is_set():
# Make sure we re-read the entire cache
self._latest_entry = 0
count = None
try:
count = self._read_cached_entries()
except ZkConnectionLoss:
self._state_synced.clear()
continue
if count:
logging.debug("Processed %d new/updated entries from %s",
count, self.path)
self._state_synced.set()
if not self.zk.wait_connected():
return
if self._epoch != self.zk.conn_epoch:
# Make sure we re-read the entire cache
self._latest_entry = 0
count = self._read_cached_entries()
if count:
logging.debug("Processed %d new/updated entries from %s",
count, self.path)

def _read_cached_entries(self):
"""
Expand Down Expand Up @@ -717,11 +711,3 @@ def _process_cached_entries(self, entries):
new_entries.append(raw)
self.handler(new_entries)
return len(new_entries)

def run(self):
"""
Run thread that handles shared path.
"""
threading.Thread(
target=thread_safety_net, args=(self.handle_shared_entries,),
name="handle_shared_entries(%s)" % self.path, daemon=True).start()
Loading

0 comments on commit 080b470

Please sign in to comment.