Commit 721a91f2 authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

Add code

parent af5835f6
build
dist
.coverage
.eggs
.tox
.venv
*.egg
*.egg-info
*.py[cod]
*.swp
*~
__pycache__
.cache
.pytest_cache/
stages:
- build
build:
stage: build
script:
- apt update && apt install -y python3-wheel python3-setuptools python3-pip python3-dev libldap2-dev libsasl2-dev build-essential
- python3 ./setup.py bdist_wheel
artifacts:
paths:
- dist/
The OpenLDAP Public License
Version 2.8, 17 August 2003
Redistribution and use of this software and associated documentation
("Software"), with or without modification, are permitted provided
that the following conditions are met:
1. Redistributions in source form must retain copyright statements
and notices,
2. Redistributions in binary form must reproduce applicable copyright
statements and notices, this list of conditions, and the following
disclaimer in the documentation and/or other materials provided
with the distribution, and
3. Redistributions must contain a verbatim copy of this document.
The OpenLDAP Foundation may revise this license from time to time.
Each revision is distinguished by a version number. You may use
this Software under terms of this license revision or under the
terms of any subsequent revision of the license.
THIS SOFTWARE IS PROVIDED BY THE OPENLDAP FOUNDATION AND ITS
CONTRIBUTORS ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL THE OPENLDAP FOUNDATION, ITS CONTRIBUTORS, OR THE AUTHOR(S)
OR OWNER(S) OF THE SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
The names of the authors and copyright holders must not be used in
advertising or otherwise to promote the sale, use or other dealing
in this Software without specific, written prior permission. Title
to copyright in this Software shall at all times remain with copyright
holders.
OpenLDAP is a registered trademark of the OpenLDAP Foundation.
Copyright 1999-2003 The OpenLDAP Foundation, Redwood City,
California, USA. All Rights Reserved. Permission to copy and
distribute verbatim copies of this document is granted.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""OpenLDAP Syncrepl monitoring tool"""
from setuptools import setup, find_packages
with open('LICENSE') as f:
LICENSE = f.read()
setup(
name='syncmonitor',
version='0.0.1',
description=__doc__,
author='Ondřej Kuzník',
author_email='ondra@openldap.org',
url='https://git.openldap.org/openldap/syncmonitor',
license=LICENSE,
packages=find_packages(exclude=('tests', 'docs')),
python_requires=">= 3.7", # Modern asyncio
entry_points={
'console_scripts': ['syncmonitor = syncmonitor.ui:main'],
},
install_requires=[
'ldap0 >= 1.1.0',
'urwid',
],
)
# -*- coding: utf-8 -*-
# This work is part of OpenLDAP Software <http://www.openldap.org/>.
#
# Copyright 2018-2020 The OpenLDAP Foundation.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# ACKNOWLEDGEMENTS:
# This work was initially developed by Ondřej Kuzník
# for inclusion in OpenLDAP Software.
"""
OpenLDAP multiprovider syncrepl compatible cookie implementation.
"""
import logging
logger = logging.getLogger(__name__)
class SyncreplCookie:
"""
Represents an OpenLDAP's MMR cookie.
Cookies are of the form:
rid=[0-9]{3},sid=[0-9a-f]{3}(,csn=<csnlist>)
where csnlist is a semicolon (';') delimited list of CSNs:
<UTC timestamp>#<counter>#<server id>#<reserved>
"""
def __init__(self, cookie=None, rid=None, sid=None):
self.rid = rid
self.sid = sid
self._csnset = {}
if cookie:
self.update(cookie)
if self.rid is None:
self.rid = 0
if self.sid is None:
self.sid = 0
def _parse_csn(self, csn):
return csn.split('#', 3) # time, order, sid, other
def _parse_cookie(self, cookie):
if isinstance(cookie, bytes):
cookie = cookie.decode()
result = {}
parts = cookie.split(',')
for part in parts:
if part.startswith('rid='):
result['rid'] = part[4:]
elif part.startswith('sid='):
result['sid'] = part[4:]
elif part.startswith('csn='):
result['csn'] = part[4:].split(';')
elif part.startswith('delcsn='):
result['delcsn'] = part[7:]
else:
logger.warning("Did not recognise cookie part=%r", part)
return result
def copy(self):
return SyncreplCookie(self)
def update(self, cookie):
"Merge the two cookies, incorporating all CSNs into this one"
updated = False
if isinstance(cookie, __class__):
for sid, csn in cookie._csnset.items():
if sid not in self._csnset or self._csnset[sid] < csn:
updated = True
self._csnset[sid] = csn
if self.rid is None:
self.rid = cookie.rid
if self.sid is None:
self.sid = cookie.sid
return updated
components = self._parse_cookie(cookie)
for csn in components.get('csn', []):
_, _, sid, _ = self._parse_csn(csn)
if sid not in self._csnset or self._csnset[sid] < csn:
updated = True
self._csnset[sid] = csn
if self.rid is None:
self.rid = int(components['rid'])
if self.sid is None:
self.sid = int(components['sid'], 16)
return updated
def unparse(self):
"Return the cookie as a string"
cookie = 'rid={:03},sid={:03x}'.format(self.rid, self.sid)
if self._csnset:
cookie += ',csn='
cookie += ';'.join(sorted(self._csnset.values()))
return cookie
def __str__(self):
return self.unparse()
def __len__(self):
return len(self._csnset)
def __iter__(self):
return self._csnset.values().__iter__()
def __contains__(self, item):
if not isinstance(item, __class__):
raise NotImplementedError
for sid, csn in item._csnset.items():
my_csn = self._csnset.get(sid)
if not my_csn or my_csn < csn:
return False
return True
def __eq__(self, other):
if not isinstance(other, __class__):
return False
if self.rid != other.rid:
return False
if self.sid != other.sid:
return False
if set(self._csnset) != set(other._csnset):
return False
return True
# -*- coding: utf-8 -*-
# This work is part of OpenLDAP Software <http://www.openldap.org/>.
#
# Copyright 2018-2020 The OpenLDAP Foundation.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# ACKNOWLEDGEMENTS:
# This work was initially developed by Ondřej Kuzník
# for inclusion in OpenLDAP Software.
"""
OpenLDAP multiprovider replication monitor
"""
import asyncio
import enum
import logging
import ldap0
import ldap0.controls
import ldap0.controls.syncrepl
from .cookie import SyncreplCookie
from .ldap_wrapper import AsyncClient
from .signals import Signal
from .syncrepl_observer import SyncreplObserver
from .watchdog import Watchdog
logger = logging.getLogger(__name__)
class ReplicaState(enum.Enum):
CONNECTING = "Connecting"
UPTODATE = "In-sync"
CATCHING_UP = "Replicating"
BEHIND = "Replicating slowly"
BROKEN = "Not replicating"
STOPPED = "Stopped"
class Provider:
sid = None
state: ReplicaState = ReplicaState.CONNECTING
cookie_updated = Signal()
state_changed = Signal()
def __init__(self, uri, searchbase, scope=ldap0.SCOPE_BASE, cookie=None, mode='refreshAndPersist'):
self.uri = uri
self.base = searchbase
self.scope = scope
self.cookie = SyncreplCookie(cookie)
self.mode = mode
self.behind = Watchdog(30)
self.behind.triggered.connect(self._no_progress)
self.catching_up = Watchdog(30)
self.catching_up.triggered.connect(self._catch_up_event)
self.catching_up_to = None
self.set_up()
def set_up(self):
self.state = ReplicaState.CONNECTING
self.state_changed(self.state)
self.client = AsyncClient(self.uri)
control = ldap0.controls.syncrepl.SyncRequestControl(
cookie=self.cookie, mode=self.mode)
self.search = self.client.search(self.base, scope=self.scope,
req_ctrls=[control])
self.observer = SyncreplObserver(self.search, self.cookie)
self.observer.cookie_updated.connect(self._update_cookie)
self.observer.finished.connect(self.search_finished)
def stop(self):
self.state = ReplicaState.STOPPED
self.observer.finished.disconnect(self.search_finished)
self.observer.cookie_updated.disconnect(self._update_cookie)
def _update_cookie(self, new_cookie):
logger.debug("Update from ourselves, cookie %s", new_cookie)
if self.sid is None:
self.sid = new_cookie.sid
self.cookie.update(new_cookie)
self.cookie_updated(new_cookie)
def _no_progress(self, value):
self.state = ReplicaState.BROKEN
self.state_changed(self.state)
def _catch_up_event(self, caught_up):
self.catching_up_to = None
self.state = ReplicaState.BEHIND
self.state_changed(self.state)
def environment_update(self, cookie):
old_state = self.state
logger.debug("Update from environment, behind/catching_up: %s/%s", bool(self.behind), bool(self.catching_up))
logger.debug("Update from environment, cookie/self.cookie: %s vs. %s", cookie, self.cookie)
if cookie in self.cookie:
self.state = ReplicaState.UPTODATE
self.behind.clear()
self.catching_up.clear()
self.catching_up_to = None
elif self.behind:
if self.catching_up_to in self.cookie:
self.state = ReplicaState.CATCHING_UP
self.catching_up.clear()
self.catching_up_to = None
elif not self.behind and cookie not in self.cookie:
if not self.catching_up_to:
self.catching_up_to = cookie.copy()
self.catching_up.rearm()
self.behind.rearm()
if old_state != self.state:
self.state_changed(self.state)
async def search_finished(self):
try:
result = await self.search
if self.mode == 'refreshOnly':
await asyncio.sleep(1)
except ldap0.LDAPError as e:
if e.args[0]['result'] not in (
ldap0.TIMELIMIT_EXCEEDED,
ldap0.SIZELIMIT_EXCEEDED,
0x1000):
logger.error("Search to %r finished with %r, waiting before reconnection", self.uri, e)
await asyncio.sleep(1)
self.set_up()
class SyncreplEnvironment:
cookie_updated = Signal()
def __init__(self, uris, searchbase, scope=ldap0.SCOPE_BASE, cookie=None):
self.base = searchbase
self.scope = scope
self.cookie = SyncreplCookie(cookie)
self.providers = {}
for uri in uris:
provider = Provider(uri, searchbase, scope, cookie, mode='refreshAndPersist')
provider.cookie_updated.connect(self.update_cookie)
self.cookie_updated.connect(provider.environment_update)
self.providers[uri] = provider
def stop(self):
for uri, provider in self.providers.items():
provider.cookie_updated.disconnect(self.update_cookie)
self.cookie_updated.disconnect(provider.environment_update)
def update_cookie(self, new_cookie):
self.cookie.update(new_cookie)
self.cookie_updated(self.cookie)
# -*- coding: utf-8 -*-
# This work is part of OpenLDAP Software <http://www.openldap.org/>.
#
# Copyright 2018-2020 The OpenLDAP Foundation.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# ACKNOWLEDGEMENTS:
# This work was initially developed by Ondřej Kuzník
# for inclusion in OpenLDAP Software.
"""
python-ldap0 asyncio wrapper
"""
import asyncio
import collections
import collections.abc
import logging
import ldap0 as ldap
import ldap0.ldapobject
logger = logging.getLogger(__name__)
INTERMEDIATE_RESULTS = (
ldap.RES_SEARCH_REFERENCE,
ldap.RES_SEARCH_ENTRY,
ldap.RES_INTERMEDIATE,
)
class LDAPRequest(asyncio.Future):
"Awaitable/iterable object that corresponds to the request"
def __init__(self, client, msgid):
super().__init__()
self.full_result = None
self._client = client
self._msgid = msgid
self._queue = asyncio.Queue()
def on_message(self, message):
"Message callback to enqueue the result"
if isinstance(message, ldap.LDAPError):
self.full_result = message
self.set_exception(message)
elif message.rtype in INTERMEDIATE_RESULTS:
self._queue.put_nowait(message)
else:
self.full_result = message
# unfortunately, we have to assume LDAP_SUCCESS
self.set_result(ldap0.SUCCESS)
def cancel(self, abandon=False):
if not abandon:
self._client.abandon(self._msgid)
return super().cancel()
async def __aiter__(self):
next_item = asyncio.create_task(self._queue.get())
while not self.done():
done, _ = await asyncio.wait({self, next_item}, return_when=asyncio.FIRST_COMPLETED)
if next_item in done:
yield next_item.result()
next_item = asyncio.create_task(self._queue.get())
if next_item.done():
yield next_item.result()
else:
next_item.cancel()
while not self._queue.empty():
yield self._queue.get_nowait()
class AsyncClient(ldap.ldapobject.LDAPObject):
"Asyncio friendly class"
def __init__(self, *args, loop=None, **kwargs):
super().__init__(*args, **kwargs)
self._in_progress = {}
self._loop = loop or asyncio.get_event_loop()
self._have_reader = False
def _shutdown(self):
self._loop.remove_reader(self)
self._have_reader = False
while self._in_progress:
self._in_progress.popitem()[1].cancel()
def _read(self):
while True:
try:
message = self.result(msgid=ldap.RES_ANY,
all_results=ldap.MSG_ONE, timeout=0, add_intermediates=1)
except ldap.SERVER_DOWN as e:
self._shutdown()
break
except ldap.LDAPError as e:
if not e.args[0]['msgid']:
print("Can't process a result", e)
self._shutdown()
break
request = self._in_progress.get(e.args[0]['msgid'])
if request:
request.on_message(e)
continue
if message is None:
break
if message.msgid == ldap.RES_UNSOLICITED:
self._shutdown()
else:
request = self._in_progress.get(message.msgid)
if request:
request.on_message(message)
def abandon(self, msgid, *args, **kwargs):
"Abandon the request and cancel any tasks waiting on it"
request = self._in_progress.pop(msgid, None)
if request:
request.cancel(abandon=True)
return super().abandon(msgid, *args, **kwargs)
def unbind(self, *args, **kwargs):
"Send unbind and cancel all tasks"
self._shutdown()
return super().unbind_ext(*args, **kwargs)
def __send_request(self, name, *args, **kwargs):
"Send a request and return the awaitable+iterable object"
method_name = name #+ '_ext'
method = getattr(super(), method_name)
msgid = method(*args, **kwargs)
request = LDAPRequest(self, msgid)
self._in_progress[msgid] = request
if not self._have_reader:
self._loop.add_reader(self, self._read)
self._have_reader = True
return request
def add(self, *args, **kwargs):
"Add operation"
return self.__send_request('add', *args, **kwargs)
def bind(self, *args, **kwargs):
"Bind operation"
return self.__send_request('bind', *args, **kwargs)
def delete(self, *args, **kwargs):
"Delete operation"
return self.__send_request('delete', *args, **kwargs)
def extended(self, *args, **kwargs):
"Extended operation"
return self.__send_request('extop', *args, **kwargs)
def modify(self, *args, **kwargs):
"Modify operation"
return self.__send_request('modify', *args, **kwargs)
def rename(self, *args, **kwargs):
"ModRDN operation"
return self.__send_request('rename', *args, **kwargs)
def search(self, *args, **kwargs):
"Search operation"
return self.__send_request('search', *args, **kwargs)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This work is part of OpenLDAP Software <http://www.openldap.org/>.
#
# Copyright 2020 The OpenLDAP Foundation.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at