Commit c60ef739 authored by Ondřej Kuzník's avatar Ondřej Kuzník Committed by Ondřej Kuzník
Browse files

Rework upstream conn setup

parent 0b353106
......@@ -32,14 +32,13 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
{
PendingConnection *conn = arg;
Backend *b = conn->backend;
int rc = -1;
int error = 0, rc = -1;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
Debug( LDAP_DEBUG_CONNS, "upstream_connect_cb: "
"fd=%d connection callback for backend uri='%s'\n",
s, b->b_uri.bv_val );
if ( what == EV_WRITE ) {
int error;
socklen_t optlen = sizeof(error);
if ( getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, (void *)&error,
......@@ -59,22 +58,26 @@ upstream_connect_cb( evutil_socket_t s, short what, void *arg )
done:
if ( rc ) {
char ebuf[128];
evutil_closesocket( conn->fd );
b->b_opening--;
b->b_failed++;
Debug( LDAP_DEBUG_ANY, "upstream_connect_cb: "
"fd=%d connection set up failed\n",
s );
"fd=%d connection set up failed%s%s\n",
s, error ? ": " : "",
error ? sock_errstr( error, ebuf, sizeof(ebuf) ) : "" );
} else {
b->b_failed = 0;
}
b->b_opening--;
LDAP_LIST_REMOVE( conn, next );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
LDAP_LIST_REMOVE( conn, next );
event_free( conn->event );
ch_free( conn );
backend_retry( b );
if ( rc ) {
backend_retry( b );
}
}
static void
......@@ -147,20 +150,12 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
Debug( LDAP_DEBUG_CONNS, "upstream_name_cb: "
"connection to backend uri=%s in progress\n",
b->b_uri.bv_val );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
free( res );
return;
}
if ( !upstream_init( s, b ) ) {
} else if ( !upstream_init( s, b ) ) {
goto fail;
}
b->b_opening--;
b->b_failed = 0;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
free( res );
evutil_freeaddrinfo( res );
return;
fail:
......@@ -171,7 +166,9 @@ fail:
b->b_failed++;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
free( res );
if ( res ) {
evutil_freeaddrinfo( res );
}
}
Connection *
......@@ -386,17 +383,11 @@ backend_connect( evutil_socket_t s, short what, void *arg )
Debug( LDAP_DEBUG_CONNS, "backend_connect: "
"connection to backend uri=%s in progress\n",
b->b_uri.bv_val );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
} else if ( !upstream_init( s, b ) ) {
goto fail;
}
b->b_opening--;
b->b_failed = 0;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
return;
}
#endif /* LDAP_PF_LOCAL */
......@@ -448,6 +439,17 @@ backends_destroy( void )
LDAP_LIST_REMOVE( pending, next );
ch_free( pending );
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "backends_destroy: "
"destroying connection being set up connid=%lu\n",
c->c_connid );
assert( c->c_live );
CONNECTION_DESTROY(c);
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) {
Connection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns );
......
......@@ -476,6 +476,7 @@ config_backend( ConfigArgs *c )
LDAP_CIRCLEQ_INIT( &b->b_conns );
LDAP_CIRCLEQ_INIT( &b->b_bindconns );
LDAP_CIRCLEQ_INIT( &b->b_preparing );
b->b_numconns = 1;
b->b_numbindconns = 1;
......
......@@ -271,7 +271,7 @@ struct Backend {
int b_numconns, b_numbindconns;
int b_bindavail, b_active, b_opening;
LDAP_CIRCLEQ_HEAD(ConnSt, Connection) b_conns, b_bindconns;
LDAP_CIRCLEQ_HEAD(ConnSt, Connection) b_conns, b_bindconns, b_preparing;
LDAP_LIST_HEAD(ConnectingSt, PendingConnection) b_connecting;
long b_max_pending, b_max_conn_pending;
......@@ -295,7 +295,8 @@ enum sc_state {
SLAP_C_BINDING, /* binding */
};
enum sc_type {
SLAP_C_OPEN = 0, /* regular connection */
SLAP_C_OPEN = 0, /* regular connection */
SLAP_C_PREPARING, /* upstream connection not assigned yet */
SLAP_C_BIND, /* connection used to handle bind client requests if VC not enabled */
SLAP_C_PRIVILEGED, /* connection can override proxyauthz control */
};
......
......@@ -296,7 +296,9 @@ done:
static int
handle_unsolicited( Connection *c, BerElement *ber )
{
c->c_state = SLAP_C_CLOSING;
if ( c->c_state == SLAP_C_READY ) {
c->c_state = SLAP_C_CLOSING;
}
Debug( LDAP_DEBUG_CONNS, "handle_unsolicited: "
"teardown for upstream connection connid=%lu\n",
......@@ -465,84 +467,14 @@ fail:
}
int
upstream_finish( Connection *c )
upstream_bind_cb( Connection *c )
{
struct event_base *base;
struct event *event;
evutil_socket_t s = c->c_fd;
Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
"connection connid=%lu is ready for use\n",
c->c_connid );
base = slap_get_base( s );
event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "upstream_finish: "
"Read event could not be allocated\n" );
return -1;
}
event_add( event, NULL );
if ( c->c_read_event ) {
event_del( c->c_read_event );
event_free( c->c_read_event );
}
c->c_read_event = event;
c->c_state = SLAP_C_READY;
return 0;
}
void
upstream_bind_cb( evutil_socket_t s, short what, void *arg )
{
Connection *c = arg;
BerElement *ber;
BerElement *ber = c->c_currentber;
Backend *b = c->c_private;
BerValue matcheddn, message;
ber_tag_t tag;
ber_len_t len;
ber_int_t msgid, result;
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
"connection connid=%lu ready to read\n",
c->c_connid );
ber = c->c_currentber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
"ber_alloc failed\n" );
CONNECTION_UNLOCK(c);
return;
}
c->c_currentber = ber;
tag = ber_get_next( c->c_sb, &len, ber );
if ( tag != LDAP_TAG_MESSAGE ) {
int err = sock_errno();
if ( err != EWOULDBLOCK && err != EAGAIN ) {
if ( err || tag == LBER_ERROR ) {
char ebuf[128];
Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
"ber_get_next on fd=%d failed errno=%d (%s)\n",
c->c_fd, err,
sock_errstr( err, ebuf, sizeof(ebuf) ) );
} else {
Debug( LDAP_DEBUG_STATS, "upstream_bind_cb: "
"ber_get_next on fd=%d connid=%lu received "
"a strange PDU tag=%lx\n",
c->c_fd, c->c_connid, tag );
}
c->c_currentber = NULL;
goto fail;
}
CONNECTION_UNLOCK(c);
return;
}
c->c_currentber = NULL;
if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
......@@ -566,11 +498,21 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg )
}
switch ( result ) {
case LDAP_SUCCESS:
if ( upstream_finish( c ) ) {
goto fail;
}
break;
case LDAP_SUCCESS: {
c->c_pdu_cb = handle_one_response;
c->c_state = SLAP_C_READY;
c->c_type = SLAP_C_OPEN;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
b->b_active++;
b->b_opening--;
b->b_failed = 0;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
CONNECTION_LOCK_DECREF(c);
} break;
#ifdef HAVE_CYRUS_SASL
case LDAP_SASL_BIND_IN_PROGRESS:
/* TODO: fallthrough until we implement SASL */
......@@ -582,57 +524,36 @@ upstream_bind_cb( evutil_socket_t s, short what, void *arg )
goto fail;
}
CONNECTION_UNLOCK(c);
ber_free( ber, 1 );
return;
return LDAP_SUCCESS;
fail:
event_del( c->c_read_event );
Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
"suspended read event on dying connid=%lu\n",
c->c_connid );
ber_free( ber, 1 );
CONNECTION_DESTROY(c);
return -1;
}
void *
upstream_bind( void *ctx, void *arg )
{
Connection *c = arg;
Backend *b;
BerElement *ber = ber_alloc();
struct event_base *base;
struct event *event;
BerElement *ber;
ber_int_t msgid;
evutil_socket_t s;
assert( ber );
CONNECTION_LOCK(c);
b = c->c_private;
s = c->c_fd;
base = slap_get_base( s );
c->c_pdu_cb = upstream_bind_cb;
CONNECTION_UNLOCK_INCREF(c);
event = event_new( base, s, EV_READ|EV_PERSIST, upstream_bind_cb, c );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "upstream_bind: "
"Read event could not be allocated\n" );
CONNECTION_DESTROY(c);
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
ber = c->c_pendingber;
if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
CONNECTION_LOCK_DESTROY(c);
return NULL;
}
event_add( event, NULL );
if ( c->c_read_event ) {
event_del( c->c_read_event );
event_free( c->c_read_event );
}
c->c_read_event = event;
c->c_pendingber = ber;
msgid = c->c_next_msgid++;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( bindconf.sb_method == LDAP_AUTH_SIMPLE ) {
/* simple bind */
ber_printf( ber, "{it{iOtON}}",
......@@ -649,10 +570,6 @@ upstream_bind( void *ctx, void *arg )
&bindconf.sb_saslmech, BER_BV_OPTIONAL( &cred ) );
#endif /* HAVE_CYRUS_SASL */
}
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
c->c_pendingber = ber;
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
connection_write_cb( -1, 0, c );
......@@ -663,6 +580,62 @@ upstream_bind( void *ctx, void *arg )
return NULL;
}
/*
* The backend is already locked when entering the function.
*/
static int
upstream_finish( Connection *c )
{
Backend *b = c->c_private;
int is_bindconn = 0, rc = 0;
c->c_pdu_cb = handle_one_response;
/* Unless we are configured to use the VC exop, consider allocating the
* connection into the bind conn pool. Start off by allocating one for
* general use, then one for binds, then we start filling up the general
* connection pool, finally the bind pool */
if (
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
!(lload_features & LLOAD_FEATURE_VC) &&
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
b->b_active && b->b_numbindconns ) {
if ( !b->b_bindavail ) {
is_bindconn = 1;
} else if ( b->b_active >= b->b_numconns &&
b->b_bindavail < b->b_numbindconns ) {
is_bindconn = 1;
}
}
if ( is_bindconn ) {
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_bindconns, c, c_next );
c->c_state = SLAP_C_READY;
c->c_type = SLAP_C_BIND;
b->b_bindavail++;
b->b_opening--;
b->b_failed = 0;
} else if ( bindconf.sb_method == LDAP_AUTH_NONE ) {
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
c->c_state = SLAP_C_READY;
c->c_type = SLAP_C_OPEN;
b->b_active++;
b->b_opening--;
b->b_failed = 0;
} else {
rc = 1;
ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c );
}
Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
"%sconnection connid=%lu is%s ready for use\n",
is_bindconn ? "bind " : "", c->c_connid, rc ? " almost" : "" );
return rc;
}
/*
* We must already hold b->b_mutex when called.
*/
......@@ -672,7 +645,7 @@ upstream_init( ber_socket_t s, Backend *b )
Connection *c;
struct event_base *base = slap_get_base( s );
struct event *event;
int flags, is_bindconn = 0;
int flags, rc = -1;
assert( b != NULL );
......@@ -685,6 +658,9 @@ upstream_init( ber_socket_t s, Backend *b )
c->c_is_tls = b->b_tls;
c->c_pdu_cb = handle_one_response;
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_preparing, c, c_next );
c->c_type = SLAP_C_PREPARING;
{
ber_len_t max = sockbuf_max_incoming_upstream;
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
......@@ -707,42 +683,23 @@ upstream_init( ber_socket_t s, Backend *b )
/* We only add the write event when we have data pending */
c->c_write_event = event;
/* Unless we are configured to use the VC exop, consider allocating the
* connection into the bind conn pool. Start off by allocating one for
* general use, then one for binds, then we start filling up the general
* connection pool, finally the bind pool */
if (
#ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
!(lload_features & LLOAD_FEATURE_VC) &&
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
b->b_active && b->b_numbindconns ) {
if ( !b->b_bindavail ) {
is_bindconn = 1;
} else if ( b->b_active >= b->b_numconns &&
b->b_bindavail < b->b_numbindconns ) {
is_bindconn = 1;
}
rc = upstream_finish( c );
if ( rc < 0 ) {
goto fail;
}
if ( is_bindconn || bindconf.sb_method == LDAP_AUTH_NONE ) {
if ( upstream_finish( c ) ) {
goto fail;
}
} else {
ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c );
}
event_add( c->c_read_event, NULL );
if ( is_bindconn ) {
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_bindconns, c, c_next );
c->c_type = SLAP_C_BIND;
b->b_bindavail++;
} else {
LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
b->b_active++;
c->c_destroy = upstream_destroy;
CONNECTION_UNLOCK_OR_DESTROY(c);
/* has upstream_finish() finished? */
if ( rc == LDAP_SUCCESS ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
}
c->c_destroy = upstream_destroy;
CONNECTION_UNLOCK(c);
return c;
fail:
......@@ -808,7 +765,11 @@ upstream_destroy( Connection *c )
/* Remove from the backend on first pass */
if ( state != SLAP_C_CLOSING ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( c->c_type == SLAP_C_BIND ) {
if ( c->c_type == SLAP_C_PREPARING ) {
LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
b->b_opening--;
b->b_failed++;
} else if ( c->c_type == SLAP_C_BIND ) {
LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
b->b_bindavail--;
} else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment