connection.c 13 KB
Newer Older
Ondřej Kuzník's avatar
Ondřej Kuzník committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
 *
 * Copyright 1998-2020 The OpenLDAP Foundation.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted only as authorized by the OpenLDAP
 * Public License.
 *
 * A copy of this license is available in the file LICENSE in the
 * top-level directory of the distribution or, alternatively, at
 * <http://www.OpenLDAP.org/license.html>.
 */
/* Portions Copyright (c) 1995 Regents of the University of Michigan.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms are permitted
 * provided that this notice is preserved and that due credit is given
 * to the University of Michigan at Ann Arbor. The name of the University
 * may not be used to endorse or promote products derived from this
 * software without specific prior written permission. This software
 * is provided ``as is'' without express or implied warranty.
 */

#include "portable.h"

#include <stdio.h>
#ifdef HAVE_LIMITS_H
#include <limits.h>
#endif

#include <ac/socket.h>
#include <ac/errno.h>
#include <ac/string.h>
#include <ac/time.h>
#include <ac/unistd.h>

#include "lutil.h"
40
#include "lload.h"
Ondřej Kuzník's avatar
Ondřej Kuzník committed
41
42
43
44
45

static ldap_pvt_thread_mutex_t conn_nextid_mutex;
static unsigned long conn_nextid = 0;

static void
46
lload_connection_assign_nextid( LloadConnection *conn )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
47
48
49
50
51
52
{
    ldap_pvt_thread_mutex_lock( &conn_nextid_mutex );
    conn->c_connid = conn_nextid++;
    ldap_pvt_thread_mutex_unlock( &conn_nextid_mutex );
}

53
54
55
56
57
/*
 * We start off with the connection muted and c_currentber holding the pdu we
 * received.
 *
 * We run c->c_pdu_cb for each pdu, stopping once we hit an error, have to wait
58
 * on reading or after we process lload_conn_max_pdus_per_cycle pdus so as to
59
60
61
62
63
64
65
66
67
68
 * maintain fairness and not hog the worker thread forever.
 *
 * If we've run out of pdus immediately available from the stream or hit the
 * budget, we unmute the connection.
 *
 * c->c_pdu_cb might return an 'error' and not free the connection. That can
 * happen when changing the state or when client is blocked on writing and
 * already has a pdu pending on the same operation, it's their job to make sure
 * we're woken up again.
 */
69
void *
70
71
handle_pdus( void *ctx, void *arg )
{
72
    LloadConnection *c = arg;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    int pdus_handled = 0;

    CONNECTION_LOCK_DECREF(c);
    for ( ;; ) {
        BerElement *ber;
        ber_tag_t tag;
        ber_len_t len;

        /* handle_one_response may unlock the connection in the process, we
         * need to expect that might be our responsibility to destroy it */
        if ( c->c_pdu_cb( c ) ) {
            /* Error, connection is unlocked and might already have been
             * destroyed */
            return NULL;
        }
        /* Otherwise, handle_one_request leaves the connection locked */

90
        if ( ++pdus_handled >= lload_conn_max_pdus_per_cycle ) {
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
            /* Do not read now, re-enable read event instead */
            break;
        }

        if ( (ber = ber_alloc()) == NULL ) {
            Debug( LDAP_DEBUG_ANY, "handle_pdus: "
                    "connid=%lu, ber_alloc failed\n",
                    c->c_connid );
            CONNECTION_DESTROY(c);
            return NULL;
        }
        c->c_currentber = ber;

        tag = ber_get_next( c->c_sb, &len, ber );
        if ( tag != LDAP_TAG_MESSAGE ) {
            int err = sock_errno();

            if ( err != EWOULDBLOCK && err != EAGAIN ) {
                if ( err || tag == LBER_ERROR ) {
                    char ebuf[128];
                    Debug( LDAP_DEBUG_ANY, "handle_pdus: "
                            "ber_get_next on fd=%d failed errno=%d (%s)\n",
                            c->c_fd, err,
                            sock_errstr( err, ebuf, sizeof(ebuf) ) );
                } else {
                    Debug( LDAP_DEBUG_STATS, "handle_pdus: "
                            "ber_get_next on fd=%d connid=%lu received "
                            "a strange PDU tag=%lx\n",
                            c->c_fd, c->c_connid, tag );
                }

                c->c_currentber = NULL;
                ber_free( ber, 1 );
                CONNECTION_DESTROY(c);
                return NULL;
            }
            break;
        }
    }

131
    event_add( c->c_read_event, c->c_read_timeout );
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
            "re-enabled read event on connid=%lu\n",
            c->c_connid );
    CONNECTION_UNLOCK_OR_DESTROY(c);
    return NULL;
}

/*
 * Initial read on the connection, if we get an LDAP PDU, submit the
 * processing of this and successive ones to the work queue.
 *
 * If we can't submit it to the queue (overload), process this one and return
 * to the event loop immediately after.
 */
void
connection_read_cb( evutil_socket_t s, short what, void *arg )
{
149
    LloadConnection *c = arg;
150
151
152
153
154
155
156
157
158
159
160
161
162
163
    BerElement *ber;
    ber_tag_t tag;
    ber_len_t len;

    CONNECTION_LOCK(c);
    if ( !c->c_live ) {
        event_del( c->c_read_event );
        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                "suspended read event on a dead connid=%lu\n",
                c->c_connid );
        CONNECTION_UNLOCK(c);
        return;
    }

164
165
166
167
168
169
170
171
    if ( what & EV_TIMEOUT ) {
        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                "connid=%lu, timeout reached, destroying\n",
                c->c_connid );
        CONNECTION_DESTROY(c);
        return;
    }

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
            "connection connid=%lu ready to read\n",
            c->c_connid );

    ber = c->c_currentber;
    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
        Debug( LDAP_DEBUG_ANY, "connection_read_cb: "
                "connid=%lu, ber_alloc failed\n",
                c->c_connid );
        CONNECTION_DESTROY(c);
        return;
    }
    c->c_currentber = ber;

    tag = ber_get_next( c->c_sb, &len, ber );
    if ( tag != LDAP_TAG_MESSAGE ) {
        int err = sock_errno();

        if ( err != EWOULDBLOCK && err != EAGAIN ) {
            if ( err || tag == LBER_ERROR ) {
                char ebuf[128];
                Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
                        "ber_get_next on fd=%d failed errno=%d (%s)\n",
                        c->c_fd, err,
                        sock_errstr( err, ebuf, sizeof(ebuf) ) );
            } else {
                Debug( LDAP_DEBUG_STATS, "connection_read_cb: "
                        "ber_get_next on fd=%d connid=%lu received "
                        "a strange PDU tag=%lx\n",
                        c->c_fd, c->c_connid, tag );
            }

            c->c_currentber = NULL;
            ber_free( ber, 1 );

            event_del( c->c_read_event );
            Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                    "suspended read event on dying connid=%lu\n",
                    c->c_connid );
            CONNECTION_DESTROY(c);
            return;
        }
214
        event_add( c->c_read_event, c->c_read_timeout );
215
216
217
218
219
220
221
        Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
                "re-enabled read event on connid=%lu\n",
                c->c_connid );
        CONNECTION_UNLOCK(c);
        return;
    }

222
    if ( !lload_conn_max_pdus_per_cycle ||
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
            ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
        /* If we're overloaded or configured as such, process one and resume in
         * the next cycle.
         *
         * handle_one_request re-locks the mutex in the
         * process, need to test it's still alive */
        if ( c->c_pdu_cb( c ) == LDAP_SUCCESS ) {
            CONNECTION_UNLOCK_OR_DESTROY(c);
        }
        return;
    }

    event_del( c->c_read_event );
    Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
            "suspended read event on connid=%lu\n",
            c->c_connid );

    /* We have scheduled a call to handle_requests which takes care of
     * handling further requests, just make sure the connection sticks around
     * for that */
    CONNECTION_UNLOCK_INCREF(c);
    return;
}

void
connection_write_cb( evutil_socket_t s, short what, void *arg )
{
250
    LloadConnection *c = arg;
251
252
253
254
255
256

    CONNECTION_LOCK(c);
    if ( !c->c_live ) {
        CONNECTION_UNLOCK(c);
        return;
    }
257
258
259
260
261
262
263
264

    if ( what & EV_TIMEOUT ) {
        Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
                "connid=%lu, timeout reached, destroying\n",
                c->c_connid );
        CONNECTION_DESTROY(c);
        return;
    }
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
    CONNECTION_UNLOCK_INCREF(c);

    /* Before we acquire any locks */
    event_del( c->c_write_event );

    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
    Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
            "have something to write to connection connid=%lu\n",
            c->c_connid );

    /* We might have been beaten to flushing the data by another thread */
    if ( c->c_pendingber && ber_flush( c->c_sb, c->c_pendingber, 1 ) ) {
        int err = sock_errno();

        if ( err != EWOULDBLOCK && err != EAGAIN ) {
            char ebuf[128];
            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
            Debug( LDAP_DEBUG_ANY, "connection_write_cb: "
                    "ber_flush on fd=%d failed errno=%d (%s)\n",
                    c->c_fd, err, sock_errstr( err, ebuf, sizeof(ebuf) ) );
            CONNECTION_LOCK_DESTROY(c);
            return;
        }
288
        event_add( c->c_write_event, lload_write_timeout );
289
290
291
292
293
294
295
296
297
    } else {
        c->c_pendingber = NULL;
    }
    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );

    CONNECTION_LOCK_DECREF(c);
    CONNECTION_UNLOCK_OR_DESTROY(c);
}

Ondřej Kuzník's avatar
Ondřej Kuzník committed
298
void
299
connection_destroy( LloadConnection *c )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
300
301
{
    assert( c );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
302
    Debug( LDAP_DEBUG_CONNS, "connection_destroy: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
303
            "destroying connection connid=%lu\n",
Ondřej Kuzník's avatar
Ondřej Kuzník committed
304
305
            c->c_connid );

306
307
    assert( c->c_live == 0 );
    assert( c->c_refcnt == 0 );
308
    assert( c->c_state == LLOAD_C_INVALID );
309

Ondřej Kuzník's avatar
Ondřej Kuzník committed
310
311
312
313
    ber_sockbuf_free( c->c_sb );

    if ( c->c_currentber ) {
        ber_free( c->c_currentber, 1 );
314
        c->c_currentber = NULL;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
315
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
316
317
    if ( c->c_pendingber ) {
        ber_free( c->c_pendingber, 1 );
318
        c->c_pendingber = NULL;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
319
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
320

321
322
323
324
325
    if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
        ber_memfree( c->c_sasl_bind_mech.bv_val );
        BER_BVZERO( &c->c_sasl_bind_mech );
    }

326
    CONNECTION_UNLOCK(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
327
328
329
330
331

    ldap_pvt_thread_mutex_destroy( &c->c_io_mutex );
    ldap_pvt_thread_mutex_destroy( &c->c_mutex );

    ch_free( c );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
332
333

    listeners_reactivate();
Ondřej Kuzník's avatar
Ondřej Kuzník committed
334
335
}

336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
/*
 * Expected to be run from lload_unpause_server, so there are no other threads
 * running.
 */
void
lload_connection_close( LloadConnection *c )
{
    TAvlnode *node;

    /* We lock so we can use CONNECTION_UNLOCK_OR_DESTROY to drop the
     * connection if we can */
    CONNECTION_LOCK(c);

    /* The first thing we do is make sure we don't get new Operations in */
    c->c_state = LLOAD_C_CLOSING;

    for ( node = tavl_end( c->c_ops, TAVL_DIR_LEFT ); node;
            node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
        LloadOperation *op = node->avl_data;

        if ( op->o_client_msgid == 0 ) {
            if ( op->o_client == c ) {
                operation_destroy_from_client( op );
            } else {
                assert( op->o_upstream == c );
                operation_destroy_from_upstream( op );
            }
        }
    }
    CONNECTION_UNLOCK_OR_DESTROY(c);
}

368
369
LloadConnection *
lload_connection_init( ber_socket_t s, const char *peername, int flags )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
370
{
371
    LloadConnection *c;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
372
373
374
375

    assert( peername != NULL );

    if ( s == AC_SOCKET_INVALID ) {
376
        Debug( LDAP_DEBUG_ANY, "lload_connection_init: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
377
                "init of socket fd=%ld invalid\n",
Ondřej Kuzník's avatar
Ondřej Kuzník committed
378
379
380
381
382
383
                (long)s );
        return NULL;
    }

    assert( s >= 0 );

384
    c = ch_calloc( 1, sizeof(LloadConnection) );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
385

Ondřej Kuzník's avatar
Ondřej Kuzník committed
386
    c->c_fd = s;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
    c->c_sb = ber_sockbuf_alloc();
    ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );

#ifdef LDAP_PF_LOCAL
    if ( flags & CONN_IS_IPC ) {
#ifdef LDAP_DEBUG
        ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
                LBER_SBIOD_LEVEL_PROVIDER, (void *)"ipc_" );
#endif
        ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_fd,
                LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
    } else
#endif /* LDAP_PF_LOCAL */
    {
#ifdef LDAP_DEBUG
        ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_debug,
                LBER_SBIOD_LEVEL_PROVIDER, (void *)"tcp_" );
#endif
        ber_sockbuf_add_io( c->c_sb, &ber_sockbuf_io_tcp,
                LBER_SBIOD_LEVEL_PROVIDER, (void *)&s );
    }

#ifdef LDAP_DEBUG
    ber_sockbuf_add_io(
            c->c_sb, &ber_sockbuf_io_debug, INT_MAX, (void *)"lload_" );
#endif

Ondřej Kuzník's avatar
Ondřej Kuzník committed
414
    c->c_next_msgid = 1;
415
    c->c_refcnt = c->c_live = 1;
416
    c->c_destroy = connection_destroy;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
417

418
    LDAP_CIRCLEQ_ENTRY_INIT( c, c_next );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
419

Ondřej Kuzník's avatar
Ondřej Kuzník committed
420
421
422
    ldap_pvt_thread_mutex_init( &c->c_mutex );
    ldap_pvt_thread_mutex_init( &c->c_io_mutex );

423
    lload_connection_assign_nextid( c );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
424

425
    Debug( LDAP_DEBUG_CONNS, "lload_connection_init: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
426
427
            "connection connid=%lu allocated for socket fd=%d peername=%s\n",
            c->c_connid, s, peername );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
428

429
    CONNECTION_LOCK(c);
430
    c->c_state = LLOAD_C_ACTIVE;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
431

Ondřej Kuzník's avatar
Ondřej Kuzník committed
432
433
    return c;
}