Skip to content

Commit

Permalink
Merge pull request #339 from kormat/zk_get_lock
Browse files Browse the repository at this point in the history
Improve ZK lock handling
  • Loading branch information
pszal committed Aug 24, 2015
2 parents f1214bd + f83fd74 commit ffbe2a3
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 80 deletions.
18 changes: 0 additions & 18 deletions infrastructure/beacon_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,21 +797,12 @@ def handle_pcbs_propagation(self):
"""
Generate a new beacon or gets ready to forward the one received.
"""
master = False
while True:
# Wait until we have enough context to be a useful master
# candidate.
self._state_synced.wait()
if not master:
logging.debug("Trying to become master")
if not self.zk.get_lock():
if master:
logging.debug("No longer master")
master = False
continue
if not master:
logging.debug("Became master")
master = True
start_propagation = SCIONTime.get_time()
# Create beacon for downstream ADs.
downstream_pcb = PathSegment()
Expand Down Expand Up @@ -1229,21 +1220,12 @@ def handle_pcbs_propagation(self):
Main loop to propagate received beacons.
"""
# TODO: define function that dispatches the pcbs among the interfaces
master = False
while True:
# Wait until we have enough context to be a useful master
# candidate.
self._state_synced.wait()
if not master:
logging.debug("Trying to become master")
if not self.zk.get_lock():
if master:
logging.debug("No longer master")
master = False
continue
if not master:
logging.debug("Became master")
master = True
start_propagation = SCIONTime.get_time()
best_segments = self.beacons.get_best_segments()
for pcb in best_segments:
Expand Down
48 changes: 35 additions & 13 deletions lib/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ def __init__(self, isd_id, ad_id, srv_type, srv_id,
self._lock = threading.Event()
# Used to signal connection state changes
self._state_events = queue.Queue()
self.conn_epoch = 0
# Kazoo parties
self._parties = {}
# Kazoo lock (initialised later)
self._zk_lock = None
self._lock_epoch = 0

self._kazoo_setup(zk_hosts)
self._setup_state_listener()
Expand Down Expand Up @@ -186,8 +188,10 @@ def _state_listener(self, new_state):
"""
Called everytime the Kazoo connection state changes.
"""
self.conn_epoch += 1
# Signal a connection state change
logging.debug("Kazoo state changed to %s", new_state)
logging.debug("Kazoo state changed to %s (epoch %d)",
new_state, self.conn_epoch)
self._state_events.put(new_state)
# Tell kazoo not to remove this listener:
return False
Expand Down Expand Up @@ -251,7 +255,6 @@ def _state_suspended(self):
This means that the connection to Zookeeper is down.
"""
self._connected.clear()
self._lock.clear()
logging.info("Connection to Zookeeper suspended")
if self._on_disconnect:
self._on_disconnect()
Expand All @@ -264,7 +267,6 @@ def _state_lost(self):
re-done on connect.
"""
self._connected.clear()
self._lock.clear()
logging.info("Connection to Zookeeper lost")
if self._on_disconnect:
self._on_disconnect()
Expand Down Expand Up @@ -349,15 +351,18 @@ def party_setup(self, prefix=None, autojoin=True):
self._parties[party_path] = party
return party

def get_lock(self, timeout=60.0):
def get_lock(self, lock_timeout=None, conn_timeout=None):
"""
Try to get the lock. Returns immediately if we already have the lock.
:param float timeout: Time (in seconds) to wait for lock acquisition,
or ``None`` to wait forever.
:type timeout: float or None.
:return: ``True`` if we got the lock, or already had it, otherwise
``False``.
:param float lock_timeout:
Time (in seconds) to wait for lock acquisition, or ``None`` to wait
forever (Default).
:param float conn_timeout:
Time (in seconds) to wait for a connection to ZK, or ``None`` to
wait forever (Default).
:return:
``True`` if we got the lock, or already had it, otherwise ``False``.
:rtype: :class:`bool`
"""
if self._zk_lock is None:
Expand All @@ -366,15 +371,24 @@ def get_lock(self, timeout=60.0):
self._zk_lock = self._zk.Lock(lock_path, self._srv_id)
if not self.is_connected():
self.release_lock()
if not self.wait_connected(timeout=conn_timeout):
return False
if self._lock_epoch != self.conn_epoch:
logging.debug("ZK lock state is stale, releasing (epoch %d != %d)",
self._lock_epoch, self.conn_epoch)
self.release_lock()
elif self._lock.is_set():
# We already have the lock
# We already have the lock.
return True
self._lock_epoch = self.conn_epoch
logging.debug("Trying to acquire ZK lock (epoch %d)", self._lock_epoch)
try:
if self._zk_lock.acquire(timeout=timeout):
if self._zk_lock.acquire(timeout=lock_timeout):
logging.info("Successfully acquired ZK lock (epoch %d)",
self._lock_epoch)
self._lock.set()
else:
pass
logging.debug("Failed to acquire ZK lock")
except (LockTimeout, ConnectionLoss, SessionExpiredError):
pass
ret = self.have_lock()
Expand All @@ -385,6 +399,8 @@ def release_lock(self):
Release the lock
"""
self._lock.clear()
if self._zk_lock is None:
return
if self.is_connected():
try:
self._zk_lock.release()
Expand All @@ -397,7 +413,13 @@ def have_lock(self):
"""
Check if we currently hold the lock
"""
return self.is_connected() and self._lock.is_set()
if (self.is_connected() and
self._lock_epoch == self.conn_epoch and
self._lock.is_set()):
return True
else:
self.release_lock()
return False

def wait_lock(self):
"""
Expand Down
131 changes: 82 additions & 49 deletions test/lib_zookeeper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,12 @@ def test(self, init):
# Setup
inst = self._init_basic_setup()
inst._state_events = create_mock(["put"])
inst.conn_epoch = 47
# Call
ntools.eq_(inst._state_listener("statist"), False)
# Tests
inst._state_events.put.assert_called_once_with("statist")
ntools.eq_(inst.conn_epoch, 48)


class TestZookeeperStateHandler(BaseZookeeper):
Expand Down Expand Up @@ -327,15 +329,13 @@ class TestZookeeperStateDisconnected(BaseZookeeper):
def _check(self, f_name, init, test_callback=False):
inst = self._init_basic_setup()
inst._connected = create_mock(["clear"])
inst._lock = create_mock(["clear"])
inst._on_disconnect = None
if test_callback:
inst._on_disconnect = create_mock()
# Call
getattr(inst, f_name)()
# Tests
inst._connected.clear.assert_called_once_with()
inst._lock.clear.assert_called_once_with()
if test_callback:
inst._on_disconnect.assert_called_once_with()

Expand Down Expand Up @@ -517,74 +517,82 @@ class TestZookeeperGetLock(BaseZookeeper):
"""
Unit tests for lib.zookeeper.Zookeeper.get_lock
"""
@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_no_lock(self, init):
def _setup(self, is_conn=True, wait_conn=True, l_epoch=1, c_epoch=1):
inst = self._init_basic_setup()
inst._zk_lock = None
inst._prefix = "/prefix"
inst._zk_lock = create_mock(["acquire"])
inst._zk = create_mock(["Lock"])
inst._srv_id = "srvid"
# Short-circuit the rest of get_lock() by making is_connected raise
# StopIteration.
inst.is_connected = create_mock()
inst.is_connected.side_effect = []
# Call
ntools.assert_raises(StopIteration, inst.get_lock)
# Tests
inst._zk.Lock.assert_called_once_with("/prefix/lock", "srvid")
inst.is_connected.return_value = is_conn
inst.release_lock = create_mock()
inst.wait_connected = create_mock()
inst.wait_connected.return_value = wait_conn
inst._lock_epoch = l_epoch
inst.conn_epoch = c_epoch
inst._lock = create_mock(["is_set", "set"])
inst.have_lock = create_mock()
return inst

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_not_connected(self, init):
inst = self._init_basic_setup()
inst._zk_lock = True
inst.is_connected = create_mock()
inst.is_connected.return_value = False
inst.release_lock = create_mock()
def test_no_lock_not_conn(self, init):
inst = self._setup(is_conn=False, wait_conn=False)
inst._zk_lock = None
inst._prefix = "/prefix"
inst._srv_id = "srvid"
# Call
ntools.assert_false(inst.get_lock())
ntools.assert_false(inst.get_lock(conn_timeout="conn t/o"))
# Tests
inst._zk.Lock.assert_called_once_with("/prefix/lock", "srvid")
inst.is_connected.assert_called_once_with()
inst.release_lock.assert_called_once_with()
inst.wait_connected.assert_called_once_with(timeout="conn t/o")

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_have_lock(self, init):
inst = self._init_basic_setup()
inst._zk_lock = True
inst.is_connected = create_mock()
inst._lock = create_mock(["is_set"])
inst = self._setup()
# Call
ntools.assert_true(inst.get_lock())
# Tests
inst._lock.is_set.assert_called_once_with()

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_acquire(self, init):
inst = self._init_basic_setup()
inst._zk_lock = create_mock(["acquire"])
inst.is_connected = create_mock()
inst._lock = create_mock(["is_set", "set"])
inst._lock.is_set.return_value = False
inst.have_lock = create_mock()
def test_stale_epoch_acquire(self, init):
inst = self._setup(l_epoch=0)
# Call
ntools.eq_(inst.get_lock(), inst.have_lock.return_value)
ntools.eq_(inst.get_lock(lock_timeout="lock t/o"),
inst.have_lock.return_value)
# Tests
inst._zk_lock.acquire.assert_called_once_with(timeout=60.0)
inst.release_lock.assert_called_once_with()
ntools.eq_(inst._lock_epoch, inst.conn_epoch)
inst._zk_lock.acquire.assert_called_once_with(timeout="lock t/o")
inst._lock.set.assert_called_once_with()
inst.have_lock.assert_called_once_with()

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_acquire_fail(self, init):
inst = self._setup(l_epoch=0)
inst._zk_lock.acquire.return_value = False
# Call
ntools.eq_(inst.get_lock(lock_timeout="lock t/o"),
inst.have_lock.return_value)
# Tests
inst._zk_lock.acquire.assert_called_once_with(timeout="lock t/o")
ntools.assert_false(inst._lock.set.called)

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def _check_exception(self, exception, init):
inst = self._init_basic_setup()
inst._zk_lock = create_mock(["acquire"])
inst._zk_lock.acquire.side_effect = exception
inst.is_connected = create_mock()
inst.wait_connected = create_mock()
inst._lock_epoch = inst.conn_epoch = 22
inst._lock = create_mock(["is_set"])
inst._lock.is_set.return_value = False
inst.have_lock = create_mock()
# Call
ntools.eq_(inst.get_lock(), inst.have_lock.return_value)
# Tests
inst._zk_lock.acquire.assert_called_once_with(timeout=60.0)
inst._zk_lock.acquire.assert_called_once_with(timeout=None)

def test_exceptions(self):
for excp in (LockTimeout, ConnectionLoss,
Expand All @@ -596,6 +604,18 @@ class TestZookeeperReleaseLock(BaseZookeeper):
"""
Unit tests for lib.zookeeper.Zookeeper.release_lock
"""
@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_no_lock(self, init):
inst = self._init_basic_setup()
inst._lock = create_mock(["clear"])
inst._zk_lock = None
inst.is_connected = create_mock()
# Call
inst.release_lock()
# Tests
inst._lock.clear.assert_called_once_with()
ntools.assert_false(inst.is_connected.called)

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_not_connected(self, init):
inst = self._init_basic_setup()
Expand Down Expand Up @@ -645,29 +665,42 @@ class TestZookeeperHaveLock(BaseZookeeper):
"""
Unit tests for lib.zookeeper.Zookeeper.have_lock
"""
@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def _check(self, connected, have_lock, init):
def _setup(self, connected, l_epoch, c_epoch, lock_is_set):
inst = self._init_basic_setup()
inst.is_connected = create_mock()
inst.is_connected.return_value = connected
inst._lock_epoch = l_epoch
inst.conn_epoch = c_epoch
inst._lock = create_mock(["is_set"])
inst._lock.is_set.return_value = have_lock
expected = connected and have_lock
inst._lock.is_set.return_value = lock_is_set
inst.release_lock = create_mock()
return inst

@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def test_have(self, init):
inst = self._setup(True, 1, 1, True)
# Call
ntools.eq_(inst.have_lock(), expected)
ntools.ok_(inst.have_lock())
# Tests
inst.is_connected.assert_called_once_with()
if connected:
inst._lock.is_set.assert_called_once_with()
inst._lock.is_set.assert_called_once_with()
ntools.assert_false(inst.release_lock.called)

def test(self):
for connected, have_lock in (
(False, False),
(False, True),
(True, False),
(True, True)
@patch("lib.zookeeper.Zookeeper.__init__", autospec=True, return_value=None)
def _check_not_have(self, connected, l_epoch, c_epoch, lock_is_set, init):
inst = self._setup(connected, l_epoch, c_epoch, lock_is_set)
# Call
ntools.assert_false(inst.have_lock())
# Tests
inst.release_lock.assert_called_once_with()

def test_not_have(self):
for connected, l_epoch, c_epoch, lock_is_set in (
(False, 1, 1, True),
(True, 0, 1, True),
(True, 1, 1, False),
):
yield self._check, connected, have_lock
yield self._check_not_have, connected, l_epoch, c_epoch, lock_is_set


class TestZookeeperWaitLock(BaseZookeeper):
Expand Down

0 comments on commit ffbe2a3

Please sign in to comment.