Commit b4f43ed8 authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

Refactor backend reset

Reuse the connection walking facility in timeout management.
parent 638f8a2c
......@@ -488,7 +488,7 @@ backend_connect_task( void *ctx, void *arg )
* Needs exclusive access to the backend.
*/
void
backend_reset( LloadBackend *b )
backend_reset( LloadBackend *b, int gentle )
{
if ( b->b_cookie ) {
int rc;
......@@ -497,7 +497,8 @@ backend_reset( LloadBackend *b )
b->b_cookie = NULL;
b->b_opening--;
}
if ( event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
if ( b->b_retry_event &&
event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
assert( b->b_failed );
event_del( b->b_retry_event );
b->b_opening--;
......@@ -520,64 +521,19 @@ backend_reset( LloadBackend *b )
ch_free( pending );
b->b_opening--;
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) ) {
LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_preparing );
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "backend_reset: "
"destroying connection being set up connid=%lu\n",
c->c_connid );
assert( c->c_live );
CONNECTION_DESTROY(c);
assert( !c );
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_bindconns ) ) {
LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_bindconns );
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "backend_reset: "
"destroying bind connection connid=%lu, pending ops=%ld\n",
c->c_connid, c->c_n_ops_executing );
assert( c->c_live );
CONNECTION_DESTROY(c);
assert( !c );
}
while ( !LDAP_CIRCLEQ_EMPTY( &b->b_conns ) ) {
LloadConnection *c = LDAP_CIRCLEQ_FIRST( &b->b_conns );
CONNECTION_LOCK(c);
Debug( LDAP_DEBUG_CONNS, "backend_reset: "
"destroying regular connection connid=%lu, pending ops=%ld\n",
c->c_connid, c->c_n_ops_executing );
assert( c->c_live );
CONNECTION_DESTROY(c);
assert( !c );
}
if ( b->b_dns_req ) {
evdns_getaddrinfo_cancel( b->b_dns_req );
b->b_dns_req = NULL;
b->b_opening--;
}
if ( b->b_cookie ) {
int rc;
rc = ldap_pvt_thread_pool_retract( b->b_cookie );
assert( rc == 1 );
b->b_cookie = NULL;
b->b_opening--;
}
if ( b->b_retry_event &&
event_pending( b->b_retry_event, EV_TIMEOUT, NULL ) ) {
assert( b->b_failed );
event_del( b->b_retry_event );
b->b_opening--;
}
connections_walk(
&b->b_mutex, &b->b_preparing, lload_connection_close, &gentle );
assert( LDAP_CIRCLEQ_EMPTY( &b->b_preparing ) );
assert( b->b_opening == 0 );
assert( b->b_active == 0 );
assert( b->b_bindavail == 0 );
b->b_failed = 0;
connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
lload_connection_close, &gentle );
assert( gentle || b->b_bindavail == 0 );
connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
lload_connection_close, &gentle );
assert( gentle || b->b_active == 0 );
}
void
......@@ -589,8 +545,9 @@ lload_backend_destroy( LloadBackend *b )
"destroying backend uri='%s', numconns=%d, numbindconns=%d\n",
b->b_uri.bv_val, b->b_numconns, b->b_numbindconns );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_numconns = b->b_numbindconns = 0;
backend_reset( b );
backend_reset( b, 0 );
LDAP_CIRCLEQ_REMOVE( &backend, b, b_next );
if ( b == next ) {
......@@ -613,6 +570,7 @@ lload_backend_destroy( LloadBackend *b )
assert( rc == LDAP_SUCCESS );
}
#endif /* BALANCER_MODULE */
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
ldap_pvt_thread_mutex_destroy( &b->b_mutex );
if ( b->b_retry_event ) {
......
......@@ -575,7 +575,7 @@ clients_destroy( void )
}
void
clients_walk( CONNECTION_CLIENT_WALK apply, void *argv )
clients_walk( CONNCB apply, void *argv )
{
LloadConnection *c;
ldap_pvt_thread_mutex_lock( &clients_mutex );
......
......@@ -334,17 +334,130 @@ connection_destroy( LloadConnection *c )
}
/*
* Expected to be run from lload_unpause_server, so there are no other threads
* running.
* Called holding mutex, will walk cq calling cb on all connections whose
* c_connid <= cq_last->c_connid that still exist at the time we get to them.
*/
void
lload_connection_close( LloadConnection *c )
connections_walk_last(
ldap_pvt_thread_mutex_t *cq_mutex,
lload_c_head *cq,
LloadConnection *cq_last,
CONNCB cb,
void *arg )
{
TAvlnode *node;
LloadConnection *c, *old;
unsigned long last_connid;
if ( LDAP_CIRCLEQ_EMPTY( cq ) ) {
return;
}
last_connid = cq_last->c_connid;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, cq_last, c_next );
assert( c->c_connid <= last_connid );
/* We lock so we can use CONNECTION_UNLOCK_OR_DESTROY to drop the
* connection if we can */
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( cq_mutex );
/*
* Ugh... concurrency is annoying:
* - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
* order
* - the connection with the highest c_connid is maintained at cq_last
* - we can only use cq when we hold cq_mutex
* - connections might be added to or removed from cq while we're busy
* processing connections
* - connection_destroy touches cq
* - we can't even hold locks of two different connections
* - we need a way to detect we've finished looping around cq for some
* definition of looping around
*
* So as a result, 90% of the code below is spent navigating that...
*/
while ( c->c_connid <= last_connid ) {
/* Do not permit the callback to actually free the connection even if
* it wants to, we need it to traverse cq */
c->c_refcnt++;
if ( cb( c, arg ) ) {
c->c_refcnt--;
break;
}
c->c_refcnt--;
if ( c->c_connid == last_connid ) {
break;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( cq_mutex );
old = c;
retry:
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
if ( c->c_connid <= old->c_connid ) {
ldap_pvt_thread_mutex_unlock( cq_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
ldap_pvt_thread_mutex_lock( cq_mutex );
return;
}
CONNECTION_LOCK(c);
assert( c->c_state != LLOAD_C_DYING );
if ( c->c_state == LLOAD_C_INVALID ) {
/* This dying connection will be unlinked once we release cq_mutex
* and it wouldn't be safe to iterate further, skip over it */
CONNECTION_UNLOCK(c);
goto retry;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( cq_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
CONNECTION_LOCK_DECREF(c);
assert( c->c_state != LLOAD_C_DYING );
assert( c->c_state != LLOAD_C_INVALID );
}
CONNECTION_UNLOCK_OR_DESTROY(c);
ldap_pvt_thread_mutex_lock( cq_mutex );
}
void
connections_walk(
ldap_pvt_thread_mutex_t *cq_mutex,
lload_c_head *cq,
CONNCB cb,
void *arg )
{
LloadConnection *cq_last = LDAP_CIRCLEQ_LAST( cq );
return connections_walk_last( cq_mutex, cq, cq_last, cb, arg );
}
/*
* Caller is expected to hold the lock.
*/
int
lload_connection_close( LloadConnection *c, void *arg )
{
TAvlnode *node;
int gentle = *(int *)arg;
if ( !c->c_live ) {
return LDAP_SUCCESS;
}
if ( !gentle ) {
/* Caller has a reference on this connection,
* it doesn't actually die here */
CONNECTION_DESTROY(c);
assert( c );
CONNECTION_LOCK(c);
return LDAP_SUCCESS;
}
/* The first thing we do is make sure we don't get new Operations in */
c->c_state = LLOAD_C_CLOSING;
......@@ -362,7 +475,7 @@ lload_connection_close( LloadConnection *c )
}
}
}
CONNECTION_UNLOCK_OR_DESTROY(c);
return LDAP_SUCCESS;
}
LloadConnection *
......
......@@ -1388,11 +1388,11 @@ lloadd_daemon( struct event_base *daemon_base )
/* wait for the listener threads to complete */
destroy_listeners();
/* TODO: Mark upstream connections closing */
/* Mark upstream connections closing and prevent from opening new ones */
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_numconns = b->b_numbindconns = 0;
backend_reset( b );
backend_reset( b, 1 );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
......@@ -1496,7 +1496,9 @@ lload_handle_backend_invalidation( LloadChange *change )
if ( !current_backend ) {
current_backend = b;
}
ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
} else if ( change->type == LLOAD_CHANGE_DEL ) {
ldap_pvt_thread_pool_walk(
......@@ -1517,8 +1519,10 @@ lload_handle_backend_invalidation( LloadChange *change )
&connection_pool, handle_pdus, backend_conn_cb, b );
ldap_pvt_thread_pool_walk(
&connection_pool, upstream_bind, backend_conn_cb, b );
backend_reset( b );
ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_reset( b, 0 );
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
}
......@@ -1599,7 +1603,9 @@ lload_handle_backend_invalidation( LloadChange *change )
assert( need_close >= diff );
LDAP_CIRCLEQ_FOREACH ( c, &b->b_bindconns, c_next ) {
lload_connection_close( c );
int gentle = 1;
lload_connection_close( c, &gentle );
need_close--;
diff--;
if ( !diff ) {
......@@ -1615,7 +1621,9 @@ lload_handle_backend_invalidation( LloadChange *change )
assert( need_close >= diff );
LDAP_CIRCLEQ_FOREACH ( c, &b->b_conns, c_next ) {
lload_connection_close( c );
int gentle = 1;
lload_connection_close( c, &gentle );
need_close--;
diff--;
if ( !diff ) {
......@@ -1627,7 +1635,9 @@ lload_handle_backend_invalidation( LloadChange *change )
assert( need_close == 0 );
if ( need_open ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
}
}
......@@ -1725,7 +1735,7 @@ lload_handle_global_invalidation( LloadChange *change )
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
backend_reset( b );
backend_reset( b, 0 );
backend_retry( b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
......
......@@ -446,7 +446,7 @@ struct LloadListener {
#endif
};
typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv );
typedef int (*CONNCB)( LloadConnection *c, void *arg );
struct lload_monitor_conn_arg {
Operation *op;
......
......@@ -814,12 +814,13 @@ operation_lost_upstream( LloadOperation *op )
CONNECTION_UNLOCK(c);
}
void
connection_timeout( LloadConnection *upstream, time_t threshold )
int
connection_timeout( LloadConnection *upstream, void *arg )
{
LloadOperation *op;
TAvlnode *ops = NULL, *node;
LloadBackend *b = upstream->c_private;
time_t threshold = *(time_t *)arg;
int rc, nops = 0;
for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
......@@ -862,7 +863,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
}
if ( nops == 0 ) {
return;
return LDAP_SUCCESS;
}
upstream->c_n_ops_executing -= nops;
Debug( LDAP_DEBUG_STATS, "connection_timeout: "
......@@ -916,68 +917,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
CONNECTION_LOCK_DECREF(upstream);
/* just dispose of the AVL, most operations should already be gone */
tavl_free( ops, NULL );
}
static void
backend_timeout(
LloadBackend *b,
lload_c_head *cq,
LloadConnection **lastp,
time_t threshold )
{
LloadConnection *c, *old;
unsigned long last_connid;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( !*lastp ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
}
last_connid = (*lastp)->c_connid;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next );
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
/*
* Ugh... concurrency is annoying:
* - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
* order
* - the connection with the highest c_connid is maintained at *lastp
* - we can only use cq when we hold b->b_mutex
* - connections might be added to or removed from cq while we're busy
* processing connections
* - connection_destroy touches cq
* - we can't even hold locks of two different connections
* - we need a way to detect we've finished looping around cq for some
* definition of looping around
*
* So as a result, 90% of the code below is spent navigating that...
*/
while ( c->c_connid <= last_connid ) {
Debug( LDAP_DEBUG_TRACE, "backend_timeout: "
"timing out operations for connid=%lu which has %ld "
"pending ops\n",
c->c_connid, c->c_n_ops_executing );
connection_timeout( c, threshold );
if ( c->c_connid == last_connid ) {
break;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
old = c;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
CONNECTION_LOCK(c);
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
CONNECTION_LOCK_DECREF(c);
}
CONNECTION_UNLOCK_OR_DESTROY(c);
return LDAP_SUCCESS;
}
void
......@@ -993,17 +933,25 @@ operations_timeout( evutil_socket_t s, short what, void *arg )
threshold = slap_get_time() - lload_timeout_api->tv_sec;
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
if ( b->b_n_ops_executing == 0 ) continue;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( b->b_n_ops_executing == 0 ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
continue;
}
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out binds for backend uri=%s\n",
b->b_uri.bv_val );
backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold );
connections_walk_last( &b->b_mutex, &b->b_bindconns, b->b_last_bindconn,
connection_timeout, &threshold );
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out other operations for backend uri=%s\n",
b->b_uri.bv_val );
backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold );
connections_walk_last( &b->b_mutex, &b->b_conns, b->b_last_conn,
connection_timeout, &threshold );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
done:
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
......
......@@ -41,7 +41,7 @@ LDAP_SLAPD_F (void) backend_connect( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void *) backend_connect_task( void *ctx, void *arg );
LDAP_SLAPD_F (void) backend_retry( LloadBackend *b );
LDAP_SLAPD_F (LloadConnection *) backend_select( LloadOperation *op, int *res );
LDAP_SLAPD_F (void) backend_reset( LloadBackend *b );
LDAP_SLAPD_F (void) backend_reset( LloadBackend *b, int gentle );
LDAP_SLAPD_F (void) lload_backend_destroy( LloadBackend *b );
LDAP_SLAPD_F (void) lload_backends_destroy( void );
......@@ -64,7 +64,7 @@ LDAP_SLAPD_F (LloadConnection *) client_init( ber_socket_t s, LloadListener *url
LDAP_SLAPD_F (void) client_reset( LloadConnection *c );
LDAP_SLAPD_F (void) client_destroy( LloadConnection *c );
LDAP_SLAPD_F (void) clients_destroy( void );
LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv );
LDAP_SLAPD_F (void) clients_walk( CONNCB apply, void *argv );
/*
* config.c
......@@ -90,9 +90,15 @@ LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) clients_mutex;
LDAP_SLAPD_F (void *) handle_pdus( void *ctx, void *arg );
LDAP_SLAPD_F (void) connection_write_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) connection_read_cb( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) lload_connection_close( LloadConnection *c );
LDAP_SLAPD_F (int) lload_connection_close( LloadConnection *c, void *arg );
LDAP_SLAPD_F (LloadConnection *) lload_connection_init( ber_socket_t s, const char *peername, int use_tls );
LDAP_SLAPD_F (void) connection_destroy( LloadConnection *c );
LDAP_SLAPD_F (void) connections_walk_last( ldap_pvt_thread_mutex_t *cq_mutex,
lload_c_head *cq,
LloadConnection *cq_last,
CONNCB cb,
void *arg );
LDAP_SLAPD_F (void) connections_walk( ldap_pvt_thread_mutex_t *cq_mutex, lload_c_head *cq, CONNCB cb, void *arg );
/*
* daemon.c
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment