Commit c6b75e8b authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

Implement catchup monitoring

parent 9d3465d2
Pipeline #4188 passed with stage
in 48 seconds
......@@ -22,9 +22,11 @@ OpenLDAP multiprovider replication monitor
import argparse
import asyncio
import json
import logging
import sys
from .cookie import SyncreplCookie
from .environment import SyncreplEnvironment, ReplicaState
......@@ -36,18 +38,118 @@ parser = argparse.ArgumentParser()
parser.add_argument('-f', '--config', type=argparse.FileType('r'),
help='Authentication and connection config')
parser.add_argument('-b', '--base', required=True, help='Search base')
parser.add_argument('-c', '--cookie', action='store', type=SyncreplCookie,
help='Initial cookie')
parser.add_argument('-p', '--persist', action='store_const',
const='refreshAndPersist', default='refreshOnly',
help='Use refresh and persist mode')
parser.add_argument('-c', '--cookie', action='store', help='Initial cookie')
parser.add_argument('-q', '--quiet', action='store_true',
help="Do not print statistics")
parser.add_argument('-s', '--stabilise', action='store_true',
help="Wait for the whole environment to stabilise instead")
parser.add_argument('-t', '--timeout', type=int, default=30,
help=("How long to wait for servers to sync, "
"-1 to wait forever"))
parser.add_argument('-v', '--verbose', action='store_true',
help="Set log level to 'debug'")
parser.add_argument('-t', '--timeout', type=int, default=30,
help="How long to wait for servers to sync")
parser.add_argument('args', nargs='+', help='URLs for servers to contact')
async def stabilise(options, environment):
finished = asyncio.Event()
loop = asyncio.get_running_loop()
def state_changed(new_state):
"""
Wait until all replicas are up to date with each other
at the same time
"""
if all(p.state == ReplicaState.UPTODATE
for uri, p in environment.providers.items()):
finished.set()
start_time = loop.time()
for uri, provider in environment.providers.items():
provider.state_changed.connect(state_changed)
done, pending = await asyncio.wait([finished.wait()],
timeout=options.timeout)
if pending:
for task in pending:
task.cancel()
raise SystemExit("Timed out waiting for systems to resync")
return {'time_elapsed': loop.time() - start_time}
async def catch_up(options, environment):
queue = asyncio.Queue()
initial_cookie = SyncreplCookie(options.cookie)
loop = asyncio.get_running_loop()
class CookieObserver:
def __init__(self, cookie, provider):
self.cookie = cookie
self.provider = provider
async def new_cookie(self, cookie):
await queue.get()
self.cookie.update(cookie)
self.provider.cookie_updated.disconnect(self.new_cookie)
queue.task_done()
for uri, provider in environment.providers.items():
observer = CookieObserver(initial_cookie, provider)
queue.put_nowait(None)
provider.cookie_updated.connect(observer.new_cookie)
# TODO: timeout?
await queue.join()
logger.debug("snapshotted a cookie %s", initial_cookie)
# Allow for an early out if already stabilised
if all(p.state == ReplicaState.UPTODATE
for uri, p in environment.providers.items()):
return {
'time_elapsed': 0,
'servers': {uri: 0 for uri in environment.providers},
}
class CookieUpdateObserver:
def __init__(self, cookie, provider):
self.cookie = cookie
self.provider = provider
async def new_cookie(self, cookie):
if self.cookie not in provider.cookie:
return
await queue.get()
self.provider.cookie_updated.disconnect(self.new_cookie)
timings[self.provider.uri] = loop.time() - start_time
queue.task_done()
timings = {}
start_time = loop.time()
for uri, provider in environment.providers.items():
observer = CookieUpdateObserver(initial_cookie, provider)
queue.put_nowait(None)
provider.cookie_updated.connect(observer.new_cookie)
done, pending = await asyncio.wait([queue.join()],
timeout=options.timeout)
if pending:
for task in pending:
task.cancel()
raise SystemExit("Timed out waiting for systems to catch up")
return {
'time_elapsed': loop.time() - start_time,
'servers': timings,
}
async def run(args=None):
logging.basicConfig()
......@@ -57,6 +159,9 @@ async def run(args=None):
import yaml
options.config = yaml.full_load(options.config.read())
if options.timeout < 0:
options.timeout = None
level = None
if options.verbose:
level = logging.DEBUG
......@@ -74,27 +179,16 @@ async def run(args=None):
await environment.set_up()
logger.error("started")
finished = asyncio.Event()
def state_changed(new_state):
if all(p.state == ReplicaState.UPTODATE
for uri, p in environment.providers.items()):
finished.set()
for uri, provider in environment.providers.items():
provider.state_changed.connect(state_changed)
done, pending = await asyncio.wait([finished.wait()], timeout=30)
if pending:
for task in pending:
task.cancel()
raise SystemExit("Timed out waiting for systems to resync")
logger.info("starting check")
if options.stabilise:
result = await stabilise(options, environment)
else:
result = await catch_up(options, environment)
for uri, p in environment.providers.items():
print("%s cookie %s" % (p, p.cookie), file=sys.stderr)
environment.stop()
logger.info("Finished sync")
logger.info("Systems passed check")
if not options.quiet:
json.dump(result, sys.stdout)
def main(args=None):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment