environment.py 9.17 KB
Newer Older
Ondřej Kuzník's avatar
Ondřej Kuzník committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-
# This work is part of OpenLDAP Software <http://www.openldap.org/>.
#
# Copyright 2018-2020 The OpenLDAP Foundation.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted only as authorized by the OpenLDAP
# Public License.
#
# A copy of this license is available in the file LICENSE in the
# top-level directory of the distribution or, alternatively, at
# <http://www.OpenLDAP.org/license.html>.
#
# ACKNOWLEDGEMENTS:
# This work was initially developed by Ondřej Kuzník
# for inclusion in OpenLDAP Software.
"""
OpenLDAP multiprovider replication monitor
"""

import asyncio
import enum
import logging
Ondřej Kuzník's avatar
Ondřej Kuzník committed
25
from tenacity import retry, stop_after_delay, wait_exponential
Ondřej Kuzník's avatar
Ondřej Kuzník committed
26
27
28
29
30
31

import ldap0
import ldap0.controls
import ldap0.controls.syncrepl

from .cookie import SyncreplCookie
32
from .connection import connect_and_setup
Ondřej Kuzník's avatar
Ondřej Kuzník committed
33
34
35
36
37
38
39
40
41
42
43
44
45
from .signals import Signal
from .syncrepl_observer import SyncreplObserver
from .watchdog import Watchdog


logger = logging.getLogger(__name__)


class ReplicaState(enum.Enum):
    CONNECTING = "Connecting"
    UPTODATE = "In-sync"
    CATCHING_UP = "Replicating"
    BEHIND = "Replicating slowly"
46
    PARTIAL = "Replicating partially"
Ondřej Kuzník's avatar
Ondřej Kuzník committed
47
48
49
50
    BROKEN = "Not replicating"
    STOPPED = "Stopped"


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class CookieProgress:
    def __init__(self, timeout=30):
        super().__init__(self, timeout)
        self.reset()

    def start(self, initial, target):
        self.initial = initial
        self.target = target
        self._diff = initial - target

    def reset(self):
        self.initial = None
        self.target = None
        self._diff = None

    def update(self, cookie):
        if not self.target:
            return

        if self.target in cookie:
            self.caught_up.clear()

    def trip(self, rearm=True):
        self.reset()
        super().trip(self, rearm)


Ondřej Kuzník's avatar
Ondřej Kuzník committed
78
79
80
81
82
83
class Provider:
    sid = None
    state: ReplicaState = ReplicaState.CONNECTING

    cookie_updated = Signal()
    state_changed = Signal()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
84
    sid_discovered = Signal()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
85

86
    def __init__(self, uri, config, searchbase, scope=ldap0.SCOPE_BASE, cookie=None, mode='refreshAndPersist'):
Ondřej Kuzník's avatar
Ondřej Kuzník committed
87
        self.uri = uri
88
        self.config = config
Ondřej Kuzník's avatar
Ondřej Kuzník committed
89
90
91
92
93
        self.base = searchbase
        self.scope = scope
        self.cookie = SyncreplCookie(cookie)
        self.mode = mode

94
95
96
        if self.scope == ldap0.SCOPE_BASE and not searchbase:
            self.scope = ldap0.SCOPE_SUBTREE

Ondřej Kuzník's avatar
Ondřej Kuzník committed
97
98
        self.up_to_date = asyncio.Event()

Ondřej Kuzník's avatar
Ondřej Kuzník committed
99
100
        self.behind = Watchdog(30)
        self.behind.triggered.connect(self._no_progress)
101

Ondřej Kuzník's avatar
Ondřej Kuzník committed
102
103
        self.catching_up = Watchdog(30)
        self.catching_up.triggered.connect(self._catch_up_event)
104
        self.catching_up_from = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
105
        self.catching_up_to = None
106
        self.catching_up_set = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
107

Ondřej Kuzník's avatar
Ondřej Kuzník committed
108
109
110
    @retry(wait=wait_exponential(multiplier=0.1, max=5),
           stop=stop_after_delay(30))
    async def set_up(self):
Ondřej Kuzník's avatar
Ondřej Kuzník committed
111
112
        self.state = ReplicaState.CONNECTING
        self.state_changed(self.state)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
113
114
        self.up_to_date.clear()
        self.client = await connect_and_setup(self.uri, self.config)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
115
116
117
118

        control = ldap0.controls.syncrepl.SyncRequestControl(
            cookie=self.cookie, mode=self.mode)

Ondřej Kuzník's avatar
Ondřej Kuzník committed
119
120
        self.search = self.client.search(self.base, scope=self.scope,
                                         filterstr="(|)", req_ctrls=[control])
Ondřej Kuzník's avatar
Ondřej Kuzník committed
121
122
123
124
125
126
127
128
129
        self.observer = SyncreplObserver(self.search, self.cookie)

        self.observer.cookie_updated.connect(self._update_cookie)
        self.observer.finished.connect(self.search_finished)

    def stop(self):
        self.state = ReplicaState.STOPPED
        self.observer.finished.disconnect(self.search_finished)
        self.observer.cookie_updated.disconnect(self._update_cookie)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
130
        self.client.unbind()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
131
132
133

    def _update_cookie(self, new_cookie):
        logger.debug("Update from ourselves, cookie %s", new_cookie)
134
        if self.sid != new_cookie.sid:
Ondřej Kuzník's avatar
Ondřej Kuzník committed
135
            self.sid = new_cookie.sid
Ondřej Kuzník's avatar
Ondřej Kuzník committed
136
            self.sid_discovered(self.sid)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
137
138
139
140
141
142
143
144
        self.cookie.update(new_cookie)
        self.cookie_updated(new_cookie)

    def _no_progress(self, value):
        self.state = ReplicaState.BROKEN
        self.state_changed(self.state)

    def _catch_up_event(self, caught_up):
Ondřej Kuzník's avatar
Ondřej Kuzník committed
145
        assert self.catching_up_set
146
147
148
149
150
151
152
153
154
        diff = self.cookie - self.catching_up_from
        if self.catching_up_set & diff == diff:
            # we have seen progress on all relevant sids
            self.state = ReplicaState.BEHIND
        else:
            # there's at least one sid we're behind on that hasn't made
            # any progress - the environment may be split?
            self.state = ReplicaState.PARTIAL
        self.catching_up_from = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
155
        self.catching_up_to = None
156
        self.catching_up_set = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
157
158
159
160
161
162
163
164
165
166
167
        self.state_changed(self.state)

    def environment_update(self, cookie):
        old_state = self.state
        logger.debug("Update from environment, behind/catching_up: %s/%s", bool(self.behind), bool(self.catching_up))
        logger.debug("Update from environment, cookie/self.cookie: %s vs. %s", cookie, self.cookie)

        if cookie in self.cookie:
            self.state = ReplicaState.UPTODATE
            self.behind.clear()
            self.catching_up.clear()
168
            self.catching_up_from = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
169
            self.catching_up_to = None
170
            self.catching_up_set = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
171
172
173
174
        elif self.behind:
            if self.catching_up_to in self.cookie:
                self.state = ReplicaState.CATCHING_UP
                self.catching_up.clear()
175
                self.catching_up_from = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
176
                self.catching_up_to = None
177
                self.catching_up_set = None
Ondřej Kuzník's avatar
Ondřej Kuzník committed
178
        elif not self.behind and cookie not in self.cookie:
179
            if not self.catching_up:
Ondřej Kuzník's avatar
Ondřej Kuzník committed
180
                self.state = ReplicaState.CATCHING_UP
181
                self.catching_up_from = self.cookie.copy()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
182
                self.catching_up_to = cookie.copy()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
183
184
185
                self.catching_up_set = self.catching_up_to - \
                    self.catching_up_from
                assert self.catching_up_set
Ondřej Kuzník's avatar
Ondřej Kuzník committed
186
187
188
189
190
            self.catching_up.rearm()
            self.behind.rearm()

        if old_state != self.state:
            self.state_changed(self.state)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
191
192
193
194
            if self.state == ReplicaState.UPTODATE:
                self.up_to_date.set()
            else:
                self.up_to_date.clear()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
195
196
197
198

    async def search_finished(self):
        try:
            result = await self.search
199
            self.state = ReplicaState.CONNECTING
Ondřej Kuzník's avatar
Ondřej Kuzník committed
200
            self.up_to_date.clear()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
201
202
203
            if self.mode == 'refreshOnly':
                await asyncio.sleep(1)
        except ldap0.LDAPError as e:
204
205
            result = e.args[0]['result']
            if result not in (
Ondřej Kuzník's avatar
Ondřej Kuzník committed
206
207
208
                    ldap0.TIMELIMIT_EXCEEDED,
                    ldap0.SIZELIMIT_EXCEEDED,
                    0x1000):
Ondřej Kuzník's avatar
Ondřej Kuzník committed
209
210
211
                logger.warning("Search to %r finished with %r, "
                               "waiting before reconnection", self.uri, e)
                await asyncio.sleep(0.1)
212
213
214
        if result == 0x1000:
            self.cookie = SyncreplCookie()
            self.cookie_updated(None)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
215
        await self.set_up()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
216
217
218
219
220


class SyncreplEnvironment:
    cookie_updated = Signal()

221
    def __init__(self, uris, searchbase, scope=ldap0.SCOPE_BASE, cookie=None, config=None):
Ondřej Kuzník's avatar
Ondřej Kuzník committed
222
223
224
        self.base = searchbase
        self.scope = scope
        self.cookie = SyncreplCookie(cookie)
225
        self.config = config
Ondřej Kuzník's avatar
Ondřej Kuzník committed
226
227
228
        self.providers = {}

        for uri in uris:
229
230
231
232
            provider_config = None
            if config:
                provider_config = config.get(uri, config)
            provider = Provider(uri, provider_config, searchbase, scope, cookie, mode='refreshAndPersist')
Ondřej Kuzník's avatar
Ondřej Kuzník committed
233
234
235
236
237
238

            provider.cookie_updated.connect(self.update_cookie)
            self.cookie_updated.connect(provider.environment_update)

            self.providers[uri] = provider

Ondřej Kuzník's avatar
Ondřej Kuzník committed
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
    async def set_up(self):
        await asyncio.gather(*(provider.set_up()
                               for _, provider in self.providers.items()))

    async def add_server(self, uri, config=None):
        if uri in self.providers:
            raise ValueError(f"Provider at {uri!r} already set up")

        provider_config = None
        if config:
            provider_config = config.get(uri, config)

        provider = Provider(uri, provider_config, self.base, self.scope,
                            self.cookie, mode='refreshAndPersist')

        provider.cookie_updated.connect(self.update_cookie)
        self.cookie_updated.connect(provider.environment_update)

        self.providers[uri] = provider
        await provider.set_up()

Ondřej Kuzník's avatar
Ondřej Kuzník committed
260
261
262
263
    def stop(self):
        for uri, provider in self.providers.items():
            provider.cookie_updated.disconnect(self.update_cookie)
            self.cookie_updated.disconnect(provider.environment_update)
Ondřej Kuzník's avatar
Ondřej Kuzník committed
264
            provider.stop()
Ondřej Kuzník's avatar
Ondřej Kuzník committed
265
266

    def update_cookie(self, new_cookie):
267
268
269
270
271
272
        if new_cookie:
            self.cookie.update(new_cookie)
            self.cookie_updated(self.cookie)
        else:  # a cookie reset happened, recreate environment cookie
            self.cookie = SyncreplCookie()
            for uri, provider in self.providers.items():
Ondřej Kuzník's avatar
Ondřej Kuzník committed
273
                self.cookie.update(provider.cookie)
274
            self.cookie_updated(self.cookie)