Commit 0c07c7dc authored by Ondřej Kuzník's avatar Ondřej Kuzník

Handle more cases of broken sync

parent 756fc0fc
......@@ -70,7 +70,7 @@ class SyncreplCookie:
return result
def copy(self):
return SyncreplCookie(self)
return __class__(self)
def update(self, cookie):
"Merge the two cookies, incorporating all CSNs into this one"
......@@ -116,6 +116,42 @@ class SyncreplCookie:
def __iter__(self):
return self._csnset.values().__iter__()
def __getitem__(self, key):
return self._csnset[key]
def __sub__(self, cookie):
if not isinstance(cookie, __class__):
raise NotImplementedError
result = self.copy()
for sid, csn in self._csnset.items():
other = cookie._csnset.get(sid)
if other and csn <= other:
result._csnset.pop(sid)
return result
def __and__(self, cookie):
"""The highest cookie contained in both"""
if not isinstance(item, __class__):
raise NotImplementedError
result = __class__()
for sid, csn in self._csnset.items():
other = cookie._csnset.get(sid)
if not other:
continue
if csn > other:
csn = other
result._csnset[sid] = csn
return result
def keys(self):
return self._csnset.keys()
def __contains__(self, item):
if not isinstance(item, __class__):
raise NotImplementedError
......
......@@ -42,10 +42,38 @@ class ReplicaState(enum.Enum):
UPTODATE = "In-sync"
CATCHING_UP = "Replicating"
BEHIND = "Replicating slowly"
PARTIAL = "Replicating partially"
BROKEN = "Not replicating"
STOPPED = "Stopped"
class CookieProgress:
def __init__(self, timeout=30):
super().__init__(self, timeout)
self.reset()
def start(self, initial, target):
self.initial = initial
self.target = target
self._diff = initial - target
def reset(self):
self.initial = None
self.target = None
self._diff = None
def update(self, cookie):
if not self.target:
return
if self.target in cookie:
self.caught_up.clear()
def trip(self, rearm=True):
self.reset()
super().trip(self, rearm)
class Provider:
sid = None
state: ReplicaState = ReplicaState.CONNECTING
......@@ -63,9 +91,12 @@ class Provider:
self.behind = Watchdog(30)
self.behind.triggered.connect(self._no_progress)
self.catching_up = Watchdog(30)
self.catching_up.triggered.connect(self._catch_up_event)
self.catching_up_from = None
self.catching_up_to = None
self.catching_up_set = None
self.set_up()
......@@ -101,8 +132,17 @@ class Provider:
self.state_changed(self.state)
def _catch_up_event(self, caught_up):
diff = self.cookie - self.catching_up_from
if self.catching_up_set & diff == diff:
# we have seen progress on all relevant sids
self.state = ReplicaState.BEHIND
else:
# there's at least one sid we're behind on that hasn't made
# any progress - the environment may be split?
self.state = ReplicaState.PARTIAL
self.catching_up_from = None
self.catching_up_to = None
self.state = ReplicaState.BEHIND
self.catching_up_set = None
self.state_changed(self.state)
def environment_update(self, cookie):
......@@ -114,15 +154,21 @@ class Provider:
self.state = ReplicaState.UPTODATE
self.behind.clear()
self.catching_up.clear()
self.catching_up_from = None
self.catching_up_to = None
self.catching_up_set = None
elif self.behind:
if self.catching_up_to in self.cookie:
self.state = ReplicaState.CATCHING_UP
self.catching_up.clear()
self.catching_up_from = None
self.catching_up_to = None
self.catching_up_set = None
elif not self.behind and cookie not in self.cookie:
if not self.catching_up_to:
if not self.catching_up:
self.catching_up_from = self.cookie.copy()
self.catching_up_to = cookie.copy()
self.catching_up_set = self.catching_up_to - self.catching_up_from
self.catching_up.rearm()
self.behind.rearm()
......@@ -132,15 +178,20 @@ class Provider:
async def search_finished(self):
try:
result = await self.search
self.state = ReplicaState.CONNECTING
if self.mode == 'refreshOnly':
await asyncio.sleep(1)
except ldap0.LDAPError as e:
if e.args[0]['result'] not in (
result = e.args[0]['result']
if result not in (
ldap0.TIMELIMIT_EXCEEDED,
ldap0.SIZELIMIT_EXCEEDED,
0x1000):
logger.error("Search to %r finished with %r, waiting before reconnection", self.uri, e)
await asyncio.sleep(1)
if result == 0x1000:
self.cookie = SyncreplCookie()
self.cookie_updated(None)
self.set_up()
......@@ -171,5 +222,11 @@ class SyncreplEnvironment:
self.cookie_updated.disconnect(provider.environment_update)
def update_cookie(self, new_cookie):
self.cookie.update(new_cookie)
self.cookie_updated(self.cookie)
if new_cookie:
self.cookie.update(new_cookie)
self.cookie_updated(self.cookie)
else: # a cookie reset happened, recreate environment cookie
self.cookie = SyncreplCookie()
for uri, provider in self.providers.items():
cookie.update(provider.cookie)
self.cookie_updated(self.cookie)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment