upstream.c 25.3 KB
Newer Older
Ondřej Kuzník's avatar
Ondřej Kuzník committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
 *
 * Copyright 1998-2020 The OpenLDAP Foundation.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted only as authorized by the OpenLDAP
 * Public License.
 *
 * A copy of this license is available in the file LICENSE in the
 * top-level directory of the distribution or, alternatively, at
 * <http://www.OpenLDAP.org/license.html>.
 */

#include "portable.h"

#include <ac/socket.h>
#include <ac/errno.h>
#include <ac/string.h>
#include <ac/time.h>
#include <ac/unistd.h>

#include "lutil.h"
25
#include "lload.h"
Ondřej Kuzník's avatar
Ondřej Kuzník committed
26

Ondřej Kuzník's avatar
Ondřej Kuzník committed
27
int
28
forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
29
30
31
32
33
34
35
36
37
38
39
40
41
{
    BerElement *output;
    BerValue response, controls = BER_BVNULL;
    ber_tag_t tag, response_tag;
    ber_len_t len;

    response_tag = ber_skip_element( ber, &response );

    tag = ber_peek_tag( ber, &len );
    if ( tag == LDAP_TAG_CONTROLS ) {
        ber_skip_element( ber, &controls );
    }

Ondřej Kuzník's avatar
Ondřej Kuzník committed
42
    Debug( LDAP_DEBUG_TRACE, "forward_response: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
43
            "%s to client connid=%lu request msgid=%d\n",
44
            lload_msgtype2str( response_tag ), op->o_client_connid,
45
            op->o_client_msgid );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
46

47
48
    ldap_pvt_thread_mutex_lock( &client->c_io_mutex );
    output = client->c_pendingber;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
49
50
    if ( output == NULL && (output = ber_alloc()) == NULL ) {
        ber_free( ber, 1 );
51
        ldap_pvt_thread_mutex_unlock( &client->c_io_mutex );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
52
53
        return -1;
    }
54
    client->c_pendingber = output;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
55
56
57
58
59
60

    ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE,
            LDAP_TAG_MSGID, op->o_client_msgid,
            response_tag, &response,
            LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) );

61
    ldap_pvt_thread_mutex_unlock( &client->c_io_mutex );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
62
63

    ber_free( ber, 1 );
64
    connection_write_cb( -1, 0, client );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
65
66
67
    return 0;
}

Ondřej Kuzník's avatar
Ondřej Kuzník committed
68
int
69
70
71
72
forward_final_response(
        LloadConnection *client,
        LloadOperation *op,
        BerElement *ber )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
73
74
75
{
    int rc;

Ondřej Kuzník's avatar
Ondřej Kuzník committed
76
    Debug( LDAP_DEBUG_STATS, "forward_final_response: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
77
78
79
            "connid=%lu msgid=%d finishing up with a request for "
            "client connid=%lu\n",
            op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
80
    rc = forward_response( client, op, ber );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
81
82
83
84
85
    CONNECTION_LOCK(op->o_upstream);
    if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) {
        operation_destroy_from_upstream( op );
    }
    CONNECTION_UNLOCK(op->o_upstream);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
86
87
88
89
90

    return rc;
}

static int
91
handle_unsolicited( LloadConnection *c, BerElement *ber )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
92
{
93
94
    if ( c->c_state == LLOAD_C_READY ) {
        c->c_state = LLOAD_C_CLOSING;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
95
    }
96

Ondřej Kuzník's avatar
Ondřej Kuzník committed
97
    Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
98
            "teardown for upstream connection connid=%lu\n",
Ondřej Kuzník's avatar
Ondřej Kuzník committed
99
100
            c->c_connid );

101
    CONNECTION_DESTROY(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    ber_free( ber, 1 );

    return -1;
}

/*
 * Pull c->c_currentber from the connection and try to look up the operation on
 * the upstream.
 *
 * If it's a notice of disconnection, we won't find it and need to tear down
 * the connection and tell the clients, if we can't find the operation, ignore
 * the message (either client already disconnected/abandoned it or the upstream
 * is pulling our leg).
 *
 * Some responses need special handling:
 * - Bind response
 * - VC response where the client requested a Bind (both need to update the
 *   client's bind status)
 * - search entries/referrals and intermediate responses (will not trigger
 *   operation to be removed)
 *
 * If the worker pool is overloaded, we might be called directly from
124
 * the read callback, at that point, the connection hasn't been muted.
Ondřej Kuzník's avatar
Ondřej Kuzník committed
125
126
127
128
 *
 * TODO: when the client already has data pending on write, we should mute the
 * upstream.
 * - should record the BerElement on the Op and the Op on the client
Ondřej Kuzník's avatar
Ondřej Kuzník committed
129
130
131
132
133
 *
 * The following hold on entering any of the handlers:
 * - op->o_upstream_refcnt > 0
 * - op->o_upstream->c_refcnt > 0
 * - op->o_client->c_refcnt > 0
Ondřej Kuzník's avatar
Ondřej Kuzník committed
134
135
 */
static int
136
handle_one_response( LloadConnection *c )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
137
138
{
    BerElement *ber;
139
140
    LloadOperation *op = NULL, needle = { .o_upstream_connid = c->c_connid };
    LloadOperationHandler handler = NULL;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
141
142
    ber_tag_t tag;
    ber_len_t len;
143
    int rc = LDAP_SUCCESS;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159

    ber = c->c_currentber;
    c->c_currentber = NULL;

    tag = ber_get_int( ber, &needle.o_upstream_msgid );
    if ( tag != LDAP_TAG_MSGID ) {
        rc = -1;
        ber_free( ber, 1 );
        goto fail;
    }

    if ( needle.o_upstream_msgid == 0 ) {
        return handle_unsolicited( c, ber );
    } else if ( !( op = tavl_find(
                           c->c_ops, &needle, operation_upstream_cmp ) ) ) {
        /* Already abandoned, do nothing */
160
161
        ber_free( ber, 1 );
        return rc;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
        /*
    } else if ( op->o_response_pending ) {
        c->c_pendingop = op;
        event_del( c->c_read_event );
    */
    } else {
        /*
        op->o_response_pending = ber;
        */

        tag = ber_peek_tag( ber, &len );
        switch ( tag ) {
            case LDAP_RES_SEARCH_ENTRY:
            case LDAP_RES_SEARCH_REFERENCE:
            case LDAP_RES_INTERMEDIATE:
                handler = forward_response;
                break;
            case LDAP_RES_BIND:
                handler = handle_bind_response;
                break;
            case LDAP_RES_EXTENDED:
183
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
Ondřej Kuzník's avatar
Ondřej Kuzník committed
184
185
186
                if ( op->o_tag == LDAP_REQ_BIND ) {
                    handler = handle_vc_bind_response;
                }
187
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
Ondřej Kuzník's avatar
Ondřej Kuzník committed
188
189
190
191
192
193
194
                break;
        }
        if ( !handler ) {
            handler = forward_final_response;
        }
    }
    if ( op ) {
195
        op->o_last_response = slap_get_time();
Ondřej Kuzník's avatar
Ondřej Kuzník committed
196
        Debug( LDAP_DEBUG_STATS2, "handle_one_response: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
197
198
                "upstream connid=%lu, processing response for "
                "client connid=%lu, msgid=%d\n",
199
                c->c_connid, op->o_client_connid, op->o_client_msgid );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
200
201
    } else {
        tag = ber_peek_tag( ber, &len );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
202
203
204
        Debug( LDAP_DEBUG_STATS2, "handle_one_response: "
                "upstream connid=%lu, %s, msgid=%d not for a pending "
                "operation\n",
205
206
                c->c_connid, lload_msgtype2str( tag ),
                needle.o_upstream_msgid );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
207
208
209
    }

    if ( handler ) {
210
        LloadConnection *client;
211

Ondřej Kuzník's avatar
Ondřej Kuzník committed
212
213
        op->o_upstream_refcnt++;
        CONNECTION_UNLOCK_INCREF(c);
214

Ondřej Kuzník's avatar
Ondřej Kuzník committed
215
        ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
216
217
218
        client = op->o_client;
        if ( client ) {
            CONNECTION_LOCK(client);
219
220
221
222
223
224
225
            if ( client->c_live ) {
                op->o_client_refcnt++;
                CONNECTION_UNLOCK_INCREF(client);
            } else {
                CONNECTION_UNLOCK(client);
                client = NULL;
            }
226
        }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
227
        ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
228
229

        if ( client ) {
230
            rc = handler( client, op, ber );
231
            CONNECTION_LOCK_DECREF(client);
232
233
234
235
            op->o_client_refcnt--;
            if ( !op->o_client_refcnt ) {
                operation_destroy_from_client( op );
            }
236
            CONNECTION_UNLOCK_OR_DESTROY(client);
237
238
239
240
        } else {
            ber_free( ber, 1 );
        }

Ondřej Kuzník's avatar
Ondřej Kuzník committed
241
242
        CONNECTION_LOCK_DECREF(c);
        op->o_upstream_refcnt--;
243
        if ( !client || !op->o_upstream_refcnt ) {
244
245
            if ( c->c_state == LLOAD_C_BINDING ) {
                c->c_state = LLOAD_C_READY;
246
            }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
247
248
            operation_destroy_from_upstream( op );
        }
249
    } else {
250
        assert(0);
251
        ber_free( ber, 1 );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
252
253
254
255
    }

fail:
    if ( rc ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
256
257
258
        Debug( LDAP_DEBUG_STATS, "handle_one_response: "
                "error on processing a response (%s) on upstream connection "
                "connid=%ld, tag=%lx\n",
259
                lload_msgtype2str( tag ), c->c_connid, tag );
260
        CONNECTION_DESTROY(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
261
    }
262
    /* We leave the connection locked */
Ondřej Kuzník's avatar
Ondřej Kuzník committed
263
264
265
    return rc;
}

266
int
267
upstream_bind_cb( LloadConnection *c )
268
{
Ondřej Kuzník's avatar
Ondřej Kuzník committed
269
    BerElement *ber = c->c_currentber;
270
    LloadBackend *b = c->c_private;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
271
    BerValue matcheddn, message;
272
273
274
275
276
277
    ber_tag_t tag;
    ber_int_t msgid, result;

    c->c_currentber = NULL;

    if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
278
279
        Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
                "protocol violation from server\n" );
280
281
282
283
        goto fail;
    }

    if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_BIND ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
284
285
        Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
                "unexpected %s from server, msgid=%d\n",
286
                lload_msgtype2str( tag ), msgid );
287
288
289
        goto fail;
    }

Ondřej Kuzník's avatar
Ondřej Kuzník committed
290
    if ( ber_scanf( ber, "{emm" /* "}" */, &result, &matcheddn, &message ) ==
291
                 LBER_ERROR ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
292
293
        Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
                "response does not conform with a bind response\n" );
294
295
296
297
        goto fail;
    }

    switch ( result ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
298
299
        case LDAP_SUCCESS: {
            c->c_pdu_cb = handle_one_response;
300
301
            c->c_state = LLOAD_C_READY;
            c->c_type = LLOAD_C_OPEN;
302
303
304
305
306
            c->c_read_timeout = NULL;
            event_add( c->c_read_event, c->c_read_timeout );
            Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
                    "connid=%lu finished binding, now active\n",
                    c->c_connid );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
307
308
309
310
311
312
            CONNECTION_UNLOCK_INCREF(c);
            ldap_pvt_thread_mutex_lock( &b->b_mutex );
            LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
            b->b_active++;
            b->b_opening--;
            b->b_failed = 0;
313
314
315
316
317
318
319
            if ( b->b_last_conn ) {
                LDAP_CIRCLEQ_INSERT_AFTER(
                        &b->b_conns, b->b_last_conn, c, c_next );
            } else {
                LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
            }
            b->b_last_conn = c;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
320
321
322
323
            ldap_pvt_thread_mutex_unlock( &b->b_mutex );
            backend_retry( b );
            CONNECTION_LOCK_DECREF(c);
        } break;
324
325
326
327
328
329
330
#ifdef HAVE_CYRUS_SASL
        case LDAP_SASL_BIND_IN_PROGRESS:
            /* TODO: fallthrough until we implement SASL */
#endif /* HAVE_CYRUS_SASL */
        default:
            Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
                    "upstream bind failed, rc=%d, message='%s'\n",
Ondřej Kuzník's avatar
Ondřej Kuzník committed
331
                    result, message.bv_val );
332
333
334
            goto fail;
    }

Ondřej Kuzník's avatar
Ondřej Kuzník committed
335
    ber_free( ber, 1 );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
336
    return LDAP_SUCCESS;
337

Ondřej Kuzník's avatar
Ondřej Kuzník committed
338
fail:
339
    ber_free( ber, 1 );
340
    CONNECTION_DESTROY(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
341
    return -1;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
342
343
}

344
345
void *
upstream_bind( void *ctx, void *arg )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
346
{
347
    LloadConnection *c = arg;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
348
    BerElement *ber;
349
    ber_int_t msgid;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
350

351
    CONNECTION_LOCK(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
352
353
    c->c_pdu_cb = upstream_bind_cb;
    CONNECTION_UNLOCK_INCREF(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
354

Ondřej Kuzník's avatar
Ondřej Kuzník committed
355
356
357
358
359
    ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
    ber = c->c_pendingber;
    if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
        CONNECTION_LOCK_DESTROY(c);
360
        return NULL;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
361
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
362
    c->c_pendingber = ber;
363
364
    msgid = c->c_next_msgid++;

365
    if ( bindconf.sb_method == LDAP_AUTH_SIMPLE ) {
366
        /* simple bind */
367
        ber_printf( ber, "{it{iOtON}}",
368
                msgid, LDAP_REQ_BIND, LDAP_VERSION3,
369
370
                &bindconf.sb_binddn, LDAP_AUTH_SIMPLE,
                &bindconf.sb_cred );
371
372
373

#ifdef HAVE_CYRUS_SASL
    } else {
374
375
376
        BerValue cred = BER_BVNULL;
        ber_printf( ber, "{it{iOt{OON}N}}",
                msgid, LDAP_REQ_BIND, LDAP_VERSION3,
377
378
                &bindconf.sb_binddn, LDAP_AUTH_SASL,
                &bindconf.sb_saslmech, BER_BV_OPTIONAL( &cred ) );
379
380
381
382
#endif /* HAVE_CYRUS_SASL */
    }
    ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );

383
    connection_write_cb( -1, 0, c );
384

385
    CONNECTION_LOCK_DECREF(c);
386
387
    c->c_read_timeout = lload_timeout_net;
    event_add( c->c_read_event, c->c_read_timeout );
388
    CONNECTION_UNLOCK_OR_DESTROY(c);
389

390
391
392
    return NULL;
}

Ondřej Kuzník's avatar
Ondřej Kuzník committed
393
394
395
396
/*
 * The backend is already locked when entering the function.
 */
static int
397
upstream_finish( LloadConnection *c )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
398
{
399
    LloadBackend *b = c->c_private;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
    int is_bindconn = 0, rc = 0;

    c->c_pdu_cb = handle_one_response;

    /* Unless we are configured to use the VC exop, consider allocating the
     * connection into the bind conn pool. Start off by allocating one for
     * general use, then one for binds, then we start filling up the general
     * connection pool, finally the bind pool */
    if (
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
            !(lload_features & LLOAD_FEATURE_VC) &&
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
            b->b_active && b->b_numbindconns ) {
        if ( !b->b_bindavail ) {
            is_bindconn = 1;
        } else if ( b->b_active >= b->b_numconns &&
                b->b_bindavail < b->b_numbindconns ) {
            is_bindconn = 1;
        }
    }

    if ( is_bindconn ) {
        LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
423
424
        c->c_state = LLOAD_C_READY;
        c->c_type = LLOAD_C_BIND;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
425
426
427
        b->b_bindavail++;
        b->b_opening--;
        b->b_failed = 0;
428
429
430
431
432
433
434
        if ( b->b_last_bindconn ) {
            LDAP_CIRCLEQ_INSERT_AFTER(
                    &b->b_bindconns, b->b_last_bindconn, c, c_next );
        } else {
            LDAP_CIRCLEQ_INSERT_HEAD( &b->b_bindconns, c, c_next );
        }
        b->b_last_bindconn = c;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
435
436
    } else if ( bindconf.sb_method == LDAP_AUTH_NONE ) {
        LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
437
438
        c->c_state = LLOAD_C_READY;
        c->c_type = LLOAD_C_OPEN;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
439
440
441
        b->b_active++;
        b->b_opening--;
        b->b_failed = 0;
442
443
444
445
446
447
        if ( b->b_last_conn ) {
            LDAP_CIRCLEQ_INSERT_AFTER( &b->b_conns, b->b_last_conn, c, c_next );
        } else {
            LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
        }
        b->b_last_conn = c;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
448
449
450
451
452
453
454
455
456
457
458
459
    } else {
        rc = 1;
        ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c );
    }

    Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
            "%sconnection connid=%lu is%s ready for use\n",
            is_bindconn ? "bind " : "", c->c_connid, rc ? " almost" : "" );

    return rc;
}

Ondřej Kuzník's avatar
Ondřej Kuzník committed
460
461
462
static void
upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
{
463
464
    LloadConnection *c = arg;
    LloadBackend *b;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
465
466
467
468
469
470
471
472
473
474
475
    int rc = LDAP_SUCCESS;

    CONNECTION_LOCK(c);
    if ( what & EV_TIMEOUT ) {
        Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
                "connid=%lu, timeout reached, destroying\n",
                c->c_connid );
        goto fail;
    }
    b = c->c_private;

476
    rc = ldap_pvt_tls_connect( lload_tls_backend_ld, c->c_sb, b->b_host );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
    if ( rc < 0 ) {
        goto fail;
    }

    if ( rc == 0 ) {
        struct event_base *base = event_get_base( c->c_read_event );

        /*
         * We're finished, replace the callbacks
         *
         * This is deadlock-safe, since both share the same base - the one
         * that's just running us.
         */
        event_del( c->c_read_event );
        event_del( c->c_write_event );

493
        c->c_read_timeout = NULL;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
494
495
        event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
                connection_read_cb, c );
496
        event_add( c->c_read_event, c->c_read_timeout );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532

        event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
                connection_write_cb, c );
        Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
                "connid=%lu finished\n",
                c->c_connid );
        c->c_is_tls = LLOAD_TLS_ESTABLISHED;

        CONNECTION_UNLOCK_INCREF(c);
        ldap_pvt_thread_mutex_lock( &b->b_mutex );
        CONNECTION_LOCK_DECREF(c);

        rc = upstream_finish( c );

        ldap_pvt_thread_mutex_unlock( &b->b_mutex );

        if ( rc == LDAP_SUCCESS ) {
            backend_retry( b );
        }
    } else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
        event_add( c->c_write_event, lload_write_timeout );
        Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
                "connid=%lu need write rc=%d\n",
                c->c_connid, rc );
    }
    CONNECTION_UNLOCK_OR_DESTROY(c);
    return;

fail:
    Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
            "connid=%lu failed rc=%d\n",
            c->c_connid, rc );
    CONNECTION_DESTROY(c);
}

static int
533
upstream_starttls( LloadConnection *c )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
{
    BerValue matcheddn, message, responseOid,
             startTLSOid = BER_BVC(LDAP_EXOP_START_TLS);
    BerElement *ber = c->c_currentber;
    struct event_base *base;
    ber_int_t msgid, result;
    ber_tag_t tag;

    c->c_currentber = NULL;

    if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
        Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
                "protocol violation from server\n" );
        goto fail;
    }

    if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_EXTENDED ) {
        Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
                "unexpected %s from server, msgid=%d\n",
553
                lload_msgtype2str( tag ), msgid );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
        goto fail;
    }

    if ( ber_scanf( ber, "{emm}", &result, &matcheddn, &message ) ==
                 LBER_ERROR ) {
        Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
                "protocol violation on StartTLS response\n" );
        goto fail;
    }

    if ( (tag = ber_get_tag( ber )) != LBER_DEFAULT ) {
        if ( tag != LDAP_TAG_EXOP_RES_OID ||
                ber_scanf( ber, "{m}", &responseOid ) == LBER_DEFAULT ) {
            Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
                    "protocol violation on StartTLS response\n" );
            goto fail;
        }

        if ( ber_bvcmp( &responseOid, &startTLSOid ) ) {
            Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
                    "oid=%s not a StartTLS response\n",
                    responseOid.bv_val );
            goto fail;
        }
    }

    if ( result != LDAP_SUCCESS ) {
581
        LloadBackend *b = c->c_private;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
        int rc;

        Debug( LDAP_DEBUG_STATS, "upstream_starttls: "
                "server doesn't support StartTLS rc=%d message='%s'%s\n",
                result, message.bv_val,
                (c->c_is_tls == LLOAD_STARTTLS_OPTIONAL) ? ", ignored" : "" );
        if ( c->c_is_tls != LLOAD_STARTTLS_OPTIONAL ) {
            goto fail;
        }
        c->c_is_tls = LLOAD_CLEARTEXT;

        ber_free( ber, 1 );

        CONNECTION_UNLOCK_INCREF(c);
        ldap_pvt_thread_mutex_lock( &b->b_mutex );
        CONNECTION_LOCK_DECREF(c);

        rc = upstream_finish( c );

        ldap_pvt_thread_mutex_unlock( &b->b_mutex );

        if ( rc == LDAP_SUCCESS ) {
            backend_retry( b );
        }

        CONNECTION_UNLOCK_OR_DESTROY(c);
        return rc;
    }

    base = event_get_base( c->c_read_event );

    event_del( c->c_read_event );
    event_del( c->c_write_event );

616
    c->c_read_timeout = lload_timeout_net;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
617
618
619
620
621
    event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
            upstream_tls_handshake_cb, c );
    event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
            upstream_tls_handshake_cb, c );

622
    event_add( c->c_read_event, c->c_read_timeout );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
623
624
625
626
627
628
629
630
631
632
633
634
635
    event_add( c->c_write_event, lload_write_timeout );

    CONNECTION_UNLOCK(c);

    ber_free( ber, 1 );
    return -1;

fail:
    ber_free( ber, 1 );
    CONNECTION_DESTROY(c);
    return -1;
}

636
637
638
/*
 * We must already hold b->b_mutex when called.
 */
639
640
LloadConnection *
upstream_init( ber_socket_t s, LloadBackend *b )
641
{
642
643
    LloadConnection *c;
    struct event_base *base = lload_get_base( s );
644
    struct event *event;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
645
    int flags, rc = -1;
646
647
648

    assert( b != NULL );

Ondřej Kuzník's avatar
Ondřej Kuzník committed
649
    flags = (b->b_proto == LDAP_PROTO_IPC) ? CONN_IS_IPC : 0;
650
    if ( (c = lload_connection_init( s, b->b_host, flags )) == NULL ) {
651
652
653
        return NULL;
    }

654
    c->c_private = b;
655
656
    c->c_is_tls = b->b_tls;
    c->c_pdu_cb = handle_one_response;
657

Ondřej Kuzník's avatar
Ondřej Kuzník committed
658
    LDAP_CIRCLEQ_INSERT_HEAD( &b->b_preparing, c, c_next );
659
    c->c_type = LLOAD_C_PREPARING;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
660

661
662
663
664
665
    {
        ber_len_t max = sockbuf_max_incoming_upstream;
        ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
    }

666
667
668
669
670
671
672
673
674
    event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
    if ( !event ) {
        Debug( LDAP_DEBUG_ANY, "upstream_init: "
                "Read event could not be allocated\n" );
        goto fail;
    }
    c->c_read_event = event;

    event = event_new( base, s, EV_WRITE, connection_write_cb, c );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
675
    if ( !event ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
676
677
        Debug( LDAP_DEBUG_ANY, "upstream_init: "
                "Write event could not be allocated\n" );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
678
679
        goto fail;
    }
680
    /* We only add the write event when we have data pending */
Ondřej Kuzník's avatar
Ondřej Kuzník committed
681
682
    c->c_write_event = event;

Ondřej Kuzník's avatar
Ondřej Kuzník committed
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
    if ( c->c_is_tls == LLOAD_CLEARTEXT ) {
        rc = upstream_finish( c );
        if ( rc < 0 ) {
            goto fail;
        }
    } else if ( c->c_is_tls == LLOAD_LDAPS ) {
        event_assign( c->c_read_event, base, s, EV_READ|EV_PERSIST,
                upstream_tls_handshake_cb, c );
        event_assign( c->c_write_event, base, s, EV_WRITE,
                upstream_tls_handshake_cb, c );
        event_add( c->c_write_event, lload_write_timeout );
    } else if ( c->c_is_tls == LLOAD_STARTTLS ||
            c->c_is_tls == LLOAD_STARTTLS_OPTIONAL ) {
        BerElement *output;

        ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
        if ( (output = c->c_pendingber = ber_alloc()) == NULL ) {
            ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
            goto fail;
        }
        ber_printf( output, "t{tit{ts}}", LDAP_TAG_MESSAGE,
                LDAP_TAG_MSGID, c->c_next_msgid++,
                LDAP_REQ_EXTENDED,
                LDAP_TAG_EXOP_REQ_OID, LDAP_EXOP_START_TLS );
        ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
708

Ondřej Kuzník's avatar
Ondřej Kuzník committed
709
710
711
712
713
        c->c_pdu_cb = upstream_starttls;
        CONNECTION_UNLOCK_INCREF(c);
        connection_write_cb( s, 0, c );
        CONNECTION_LOCK_DECREF(c);
    }
714
    event_add( c->c_read_event, c->c_read_timeout );
715

Ondřej Kuzník's avatar
Ondřej Kuzník committed
716
717
718
719
720
721
722
723
    c->c_destroy = upstream_destroy;
    CONNECTION_UNLOCK_OR_DESTROY(c);

    /* has upstream_finish() finished? */
    if ( rc == LDAP_SUCCESS ) {
        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
        backend_retry( b );
        ldap_pvt_thread_mutex_lock( &b->b_mutex );
724
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
725
726

    return c;
727

Ondřej Kuzník's avatar
Ondřej Kuzník committed
728
729
730
731
732
733
734
735
736
fail:
    if ( c->c_write_event ) {
        event_del( c->c_write_event );
        event_free( c->c_write_event );
    }
    if ( c->c_read_event ) {
        event_del( c->c_read_event );
        event_free( c->c_read_event );
    }
737

738
    c->c_state = LLOAD_C_INVALID;
739
740
741
    CONNECTION_DESTROY(c);
    assert( c == NULL );

Ondřej Kuzník's avatar
Ondřej Kuzník committed
742
743
744
    return NULL;
}

Ondřej Kuzník's avatar
Ondřej Kuzník committed
745
void
746
upstream_destroy( LloadConnection *c )
Ondřej Kuzník's avatar
Ondřej Kuzník committed
747
{
748
    LloadBackend *b = c->c_private;
749
    struct event *read_event, *write_event;
750
    TAvlnode *root;
751
    long freed, executing;
752
    enum sc_state state;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
753

754
    Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
Ondřej Kuzník's avatar
Ondřej Kuzník committed
755
            "freeing connection connid=%lu\n",
756
757
            c->c_connid );

758
    assert( c->c_state != LLOAD_C_INVALID );
759
    state = c->c_state;
760
    c->c_state = LLOAD_C_INVALID;
761

762
763
    root = c->c_ops;
    c->c_ops = NULL;
764
    executing = c->c_n_ops_executing;
765
    c->c_n_ops_executing = 0;
766

767
768
    read_event = c->c_read_event;
    write_event = c->c_write_event;
769

770
771
    CONNECTION_UNLOCK_INCREF(c);

772
    freed = tavl_free( root, (AVL_FREE)operation_lost_upstream );
773
    assert( freed == executing );
774

775
776
777
778
779
    /*
     * Avoid a deadlock:
     * event_del will block if the event is currently executing its callback,
     * that callback might be waiting to lock c->c_mutex
     */
Ondřej Kuzník's avatar
Ondřej Kuzník committed
780
781
782
    if ( read_event ) {
        event_del( read_event );
    }
783

Ondřej Kuzník's avatar
Ondřej Kuzník committed
784
785
786
    if ( write_event ) {
        event_del( write_event );
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
787

788
    /* Remove from the backend on first pass */
789
    if ( state != LLOAD_C_CLOSING ) {
790
        ldap_pvt_thread_mutex_lock( &b->b_mutex );
791
        if ( c->c_type == LLOAD_C_PREPARING ) {
Ondřej Kuzník's avatar
Ondřej Kuzník committed
792
793
794
            LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
            b->b_opening--;
            b->b_failed++;
795
        } else if ( c->c_type == LLOAD_C_BIND ) {
796
            if ( c == b->b_last_bindconn ) {
797
                LloadConnection *prev =
798
799
800
801
802
803
804
                        LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next );
                if ( prev == c ) {
                    b->b_last_bindconn = NULL;
                } else {
                    b->b_last_bindconn = prev;
                }
            }
805
806
807
            LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
            b->b_bindavail--;
        } else {
808
            if ( c == b->b_last_conn ) {
809
                LloadConnection *prev =
810
811
812
813
814
815
816
                        LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next );
                if ( prev == c ) {
                    b->b_last_conn = NULL;
                } else {
                    b->b_last_conn = prev;
                }
            }
817
818
819
820
821
822
            LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
            b->b_active--;
        }
        b->b_n_ops_executing -= executing;
        ldap_pvt_thread_mutex_unlock( &b->b_mutex );
        backend_retry( b );
Ondřej Kuzník's avatar
Ondřej Kuzník committed
823
    }
824

825
    CONNECTION_LOCK_DECREF(c);
Ondřej Kuzník's avatar
Ondřej Kuzník committed
826

Ondřej Kuzník's avatar
Ondřej Kuzník committed
827
828
829
830
831
832
833
834
835
836
    if ( c->c_read_event ) {
        event_free( c->c_read_event );
        c->c_read_event = NULL;
    }

    if ( c->c_write_event ) {
        event_free( c->c_write_event );
        c->c_write_event = NULL;
    }

Ondřej Kuzník's avatar
Ondřej Kuzník committed
837
838
839
840
841
842
843
    /*
     * If we attempted to destroy any operations, we might have lent a new
     * refcnt token for a thread that raced us to that, let them call us again
     * later
     */
    assert( c->c_refcnt >= 0 );
    if ( c->c_refcnt ) {
844
        c->c_state = LLOAD_C_CLOSING;
Ondřej Kuzník's avatar
Ondřej Kuzník committed
845
846
847
848
849
850
        Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
                "connid=%lu aborting with refcnt=%d\n",
                c->c_connid, c->c_refcnt );
        CONNECTION_UNLOCK(c);
        return;
    }
Ondřej Kuzník's avatar
Ondřej Kuzník committed
851
852
    connection_destroy( c );
}