Commit 673513a0 authored by Ondřej Kuzník's avatar Ondřej Kuzník Committed by Ondřej Kuzník
Browse files

Maintain the configured amount of connections per backend

parent 798e215e
......@@ -32,25 +32,26 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
{
Backend *b = arg;
Connection *c;
ber_socket_t s;
ber_socket_t s = AC_SOCKET_INVALID;
int rc;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( result || !res ) {
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
"name resolution failed for backend '%s': %s\n",
b->b_bindconf.sb_uri.bv_val, evutil_gai_strerror( result ) );
return;
goto fail;
}
s = socket( res->ai_family, SOCK_STREAM, 0 );
if ( s == AC_SOCKET_INVALID ) {
return;
/* TODO: if we get failures, try the other addrinfos */
if ( (s = socket( res->ai_family, SOCK_STREAM, 0 )) ==
AC_SOCKET_INVALID ) {
goto fail;
}
rc = ber_pvt_socket_set_nonblock( s, 1 );
if ( rc ) {
evutil_closesocket( s );
return;
if ( ber_pvt_socket_set_nonblock( s, 1 ) ) {
goto fail;
}
if ( res->ai_family == PF_INET ) {
......@@ -66,14 +67,21 @@ upstream_name_cb( int result, struct evutil_addrinfo *res, void *arg )
Debug( LDAP_DEBUG_ANY, "upstream_name_cb: "
"failed to connect to server '%s'\n",
b->b_bindconf.sb_uri.bv_val );
evutil_closesocket( s );
return;
goto fail;
}
c = upstream_init( s, b );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_conns = c;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
return;
fail:
if ( s != AC_SOCKET_INVALID ) {
evutil_closesocket( s );
}
b->b_opening--;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
}
Connection *
......@@ -81,48 +89,95 @@ backend_select( Operation *op )
{
Backend *b;
/* TODO: Two runs, one with trylock, then one actually locked if we don't
* find anything? */
LDAP_STAILQ_FOREACH ( b, &backend, b_next ) {
struct ConnSt *head;
Connection *c;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
c = b->b_conns;
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return b->b_conns;
if ( op->o_tag == LDAP_REQ_BIND &&
!(lload_features & LLOAD_FEATURE_VC) ) {
head = &b->b_bindconns;
} else {
head = &b->b_conns;
}
/* TODO: Use CIRCLEQ so that we can do a natural round robin over the
* backend's connections? */
LDAP_LIST_FOREACH( c, head, c_next )
{
ldap_pvt_thread_mutex_lock( &c->c_io_mutex );
if ( c->c_state == SLAP_C_READY && !c->c_pendingber ) {
Debug( LDAP_DEBUG_CONNS, "backend_select: "
"selected connection %lu for client %lu msgid=%d\n",
c->c_connid, op->o_client->c_connid,
op->o_client_msgid );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return c;
}
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
}
ldap_pvt_thread_mutex_unlock( &c->c_io_mutex );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
return NULL;
}
void
backend_retry( Backend *b )
{
int rc, requested;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
/* TODO: timeout regime */
requested = b->b_numconns;
if ( !(lload_features & LLOAD_FEATURE_VC) ) {
requested += b->b_numbindconns;
}
if ( b->b_active + b->b_bindavail + b->b_opening < requested ) {
b->b_opening++;
rc = ldap_pvt_thread_pool_submit(
&connection_pool, backend_connect, b );
/* TODO check we're not shutting down */
if ( rc ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_connect( NULL, b );
return;
}
}
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
void *
backend_connect( void *ctx, void *arg )
{
struct evutil_addrinfo hints = {};
Backend *b = arg;
char *hostname;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
#ifdef LDAP_PF_LOCAL
if ( b->b_proto == LDAP_PROTO_IPC ) {
struct sockaddr_un addr;
Connection *c;
ber_socket_t s = socket( PF_LOCAL, SOCK_STREAM, 0 );
int rc;
if ( s == AC_SOCKET_INVALID ) {
return (void *)-1;
goto fail;
}
rc = ber_pvt_socket_set_nonblock( s, 1 );
if ( rc ) {
evutil_closesocket( s );
return (void *)-1;
goto fail;
}
if ( strlen( b->b_host ) > ( sizeof(addr.sun_path) - 1 ) ) {
evutil_closesocket( s );
return (void *)-1;
goto fail;
}
memset( &addr, '\0', sizeof(addr) );
addr.sun_family = AF_LOCAL;
......@@ -132,10 +187,12 @@ backend_connect( void *ctx, void *arg )
s, (struct sockaddr *)&addr, sizeof(struct sockaddr_un) );
if ( rc && errno != EINPROGRESS && errno != EWOULDBLOCK ) {
evutil_closesocket( s );
return (void *)-1;
goto fail;
}
b->b_conns = upstream_init( s, b );
c = upstream_init( s, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
return NULL;
}
#endif /* LDAP_PF_LOCAL */
......@@ -145,6 +202,15 @@ backend_connect( void *ctx, void *arg )
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
evdns_getaddrinfo( dnsbase, b->b_host, NULL, &hints, upstream_name_cb, b );
hostname = b->b_host;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
evdns_getaddrinfo( dnsbase, hostname, NULL, &hints, upstream_name_cb, b );
return NULL;
fail:
b->b_opening--;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
backend_retry( b );
return (void *)-1;
}
......@@ -462,6 +462,9 @@ config_backend( ConfigArgs *c )
b = ch_calloc( 1, sizeof(Backend) );
LDAP_LIST_INIT( &b->b_conns );
LDAP_LIST_INIT( &b->b_bindconns );
b->b_numconns = 1;
b->b_numbindconns = 1;
......
......@@ -1300,6 +1300,7 @@ slapd_daemon( struct event_base *daemon_base )
}
LDAP_STAILQ_FOREACH ( b, &backend, b_next ) {
b->b_opening++;
ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b );
}
......
......@@ -41,6 +41,7 @@ struct config_reply_s; /* config.h */
*/
LDAP_SLAPD_F (void *) backend_connect( void *ctx, void *arg );
LDAP_SLAPD_F (void) backend_retry( Backend *b );
LDAP_SLAPD_F (Connection *) backend_select( Operation *op );
/*
......
......@@ -248,7 +248,8 @@ struct Backend {
char *b_host;
int b_numconns, b_numbindconns;
Connection *b_conns, *b_bindconns;
int b_bindavail, b_active, b_opening;
LDAP_LIST_HEAD(ConnSt, Connection) b_conns, b_bindconns;
LDAP_STAILQ_ENTRY(Backend) b_next;
};
......@@ -263,11 +264,16 @@ enum sc_state {
SLAP_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */
SLAP_C_BINDING, /* binding */
};
enum sc_type {
SLAP_C_OPEN = 0, /* regular connection */
SLAP_C_BIND, /* connection used to handle bind client requests if VC not enabled */
};
/*
* represents a connection from an ldap client/to ldap server
*/
struct Connection {
enum sc_state c_state; /* connection state */
enum sc_type c_type;
ber_socket_t c_fd;
ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
......@@ -301,7 +307,7 @@ struct Connection {
TAvlnode *c_ops; /* Operations pending on the connection */
#define CONN_IS_TLS 1
#define CONN_IS_CLIENT 4
#define CONN_IS_BIND 4
#define CONN_IS_IPC 8
#ifdef HAVE_TLS
......@@ -312,6 +318,9 @@ struct Connection {
long c_n_ops_executing; /* num of ops currently executing */
long c_n_ops_completed; /* num of ops completed */
/* Upstream: Protected by its backend's mutex */
LDAP_LIST_ENTRY( Connection ) c_next;
void *c_private;
};
......
......@@ -735,16 +735,22 @@ upstream_bind( void *ctx, void *arg )
return NULL;
}
/*
* We must already hold b->b_mutex when called.
*/
Connection *
upstream_init( ber_socket_t s, Backend *b )
{
Connection *c;
struct event_base *base = slap_get_base( s );
struct event *event;
int flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
int flags, is_bindconn = 0;
assert( b != NULL );
b->b_opening--;
flags = (b->b_tls == LLOAD_LDAPS) ? CONN_IS_TLS : 0;
c = connection_init( s, b->b_host, flags );
c->c_private = b;
......@@ -757,15 +763,38 @@ upstream_init( ber_socket_t s, Backend *b )
/* We only register the write event when we have data pending */
c->c_write_event = event;
if ( b->b_bindconf.sb_method == LDAP_AUTH_NONE ) {
/* Unless we are configured to use the VC exop, consider allocating the
* connection into the bind conn pool. Start off by allocating one for
* general use, then one for binds, then we start filling up the general
* connection pool, finally the bind pool */
if ( !(lload_features & LLOAD_FEATURE_VC) && b->b_active &&
b->b_numbindconns ) {
if ( !b->b_bindavail ) {
is_bindconn = 1;
} else if ( b->b_active >= b->b_numconns &&
b->b_bindavail < b->b_numbindconns ) {
is_bindconn = 1;
}
}
if ( is_bindconn || b->b_bindconf.sb_method == LDAP_AUTH_NONE ) {
upstream_finish( c );
} else {
ldap_pvt_thread_pool_submit( &connection_pool, upstream_bind, c );
}
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
if ( is_bindconn ) {
LDAP_LIST_INSERT_HEAD( &b->b_bindconns, c, c_next );
c->c_type = SLAP_C_BIND;
b->b_bindavail++;
} else {
LDAP_LIST_INSERT_HEAD( &b->b_conns, c, c_next );
b->b_active++;
}
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return c;
fail:
if ( c->c_write_event ) {
event_del( c->c_write_event );
......@@ -784,18 +813,22 @@ upstream_destroy( Connection *c )
{
Backend *b = c->c_private;
Debug( LDAP_DEBUG_CONNS, "upstream_destroy: freeing connection %lu\n",
c->c_connid );
assert( c->c_state != SLAP_C_INVALID );
c->c_state = SLAP_C_INVALID;
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( !(b->b_conns == c) ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
LDAP_LIST_REMOVE( c, c_next );
if ( c->c_type == SLAP_C_BIND ) {
b->b_bindavail--;
} else {
b->b_active--;
}
b->b_conns = NULL;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
ldap_pvt_thread_pool_submit( &connection_pool, backend_connect, b );
backend_retry( b );
ldap_pvt_thread_mutex_lock( &c->c_mutex );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment