Commit aecc62c0 authored by Ondřej Kuzník's avatar Ondřej Kuzník Committed by Ondřej Kuzník
Browse files

Introduce operation timeout machinery

parent 8ba44630
......@@ -69,7 +69,8 @@ char *global_host = NULL;
static FILE *logfile;
static char *logfileName;
static struct timeval timeout_net_tv, timeout_write_tv = { 10, 0 };
static struct timeval timeout_api_tv, timeout_net_tv,
timeout_write_tv = { 10, 0 };
lload_features_t lload_features;
......@@ -78,6 +79,7 @@ ber_len_t sockbuf_max_incoming_upstream = LLOAD_SB_MAX_INCOMING_UPSTREAM;
int slap_conn_max_pdus_per_cycle = LLOAD_CONN_MAX_PDUS_PER_CYCLE_DEFAULT;
struct timeval *lload_timeout_api = NULL;
struct timeval *lload_timeout_net = NULL;
struct timeval *lload_write_timeout = &timeout_write_tv;
......@@ -665,6 +667,19 @@ config_bindconf( ConfigArgs *c )
*ptr = '\0';
}
if ( bindconf.sb_timeout_api ) {
timeout_api_tv.tv_sec = bindconf.sb_timeout_api;
lload_timeout_api = &timeout_api_tv;
if ( lload_timeout_event ) {
event_add( lload_timeout_event, lload_timeout_api );
}
} else {
lload_timeout_api = NULL;
if ( lload_timeout_event ) {
event_del( lload_timeout_event );
}
}
if ( bindconf.sb_timeout_net ) {
timeout_net_tv.tv_sec = bindconf.sb_timeout_net;
lload_timeout_net = &timeout_net_tv;
......
......@@ -83,6 +83,8 @@ static ldap_pvt_thread_t listener_tid, *daemon_tid;
struct evdns_base *dnsbase;
struct event *lload_timeout_event;
#ifndef SLAPD_LISTEN_BACKLOG
#define SLAPD_LISTEN_BACKLOG 1024
#endif /* ! SLAPD_LISTEN_BACKLOG */
......@@ -1234,6 +1236,7 @@ slapd_daemon( struct event_base *daemon_base )
int i, rc;
Backend *b;
struct event_base *base;
struct event *event;
assert( daemon_base != NULL );
......@@ -1280,19 +1283,31 @@ slapd_daemon( struct event_base *daemon_base )
current_backend = LDAP_CIRCLEQ_FIRST( &backend );
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
struct event *retry_event =
evtimer_new( daemon_base, backend_connect, b );
if ( !retry_event ) {
event = evtimer_new( daemon_base, backend_connect, b );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "lloadd: "
"failed to allocate retry event\n" );
return -1;
}
b->b_retry_event = retry_event;
b->b_retry_event = event;
backend_retry( b );
}
event = evtimer_new( daemon_base, operations_timeout, event_self_cbarg() );
if ( !event ) {
Debug( LDAP_DEBUG_ANY, "lloadd: "
"failed to allocate timeout event\n" );
return -1;
}
lload_timeout_event = event;
/* TODO: should we just add it with any timeout and re-add when the timeout
* changes? */
if ( lload_timeout_api ) {
event_add( event, lload_timeout_api );
}
lloadd_inited = 1;
rc = event_base_dispatch( daemon_base );
Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
......
......@@ -739,3 +739,190 @@ operation_lost_upstream( Operation *op )
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(c);
}
void
connection_timeout( Connection *upstream, time_t threshold )
{
Operation *op;
TAvlnode *ops = NULL, *node;
Backend *b = upstream->c_private;
int rc, nops = 0;
for ( node = tavl_end( upstream->c_ops, TAVL_DIR_LEFT ); node &&
((Operation *)node->avl_data)->o_start < threshold; /* shortcut */
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
Operation *found_op;
op = node->avl_data;
/* Have we received another response since? */
if ( op->o_last_response && op->o_last_response >= threshold ) {
continue;
}
op->o_upstream_refcnt++;
found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
assert( op == found_op );
rc = tavl_insert( &ops, op, operation_upstream_cmp, avl_dup_error );
assert( rc == LDAP_SUCCESS );
Debug( LDAP_DEBUG_STATS2, "connection_timeout: "
"timing out %s from connid=%lu msgid=%d sent to connid=%lu as "
"msgid=%d\n",
slap_msgtype2str( op->o_tag ), op->o_client_connid,
op->o_client_msgid, op->o_upstream_connid,
op->o_upstream_msgid );
nops++;
}
if ( nops == 0 ) {
return;
}
upstream->c_n_ops_executing -= nops;
Debug( LDAP_DEBUG_STATS, "connection_timeout: "
"timing out %d operations for connid=%lu\n",
nops, upstream->c_connid );
CONNECTION_UNLOCK_INCREF(upstream);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing -= nops;
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
for ( node = tavl_end( ops, TAVL_DIR_LEFT ); node;
node = tavl_next( node, TAVL_DIR_RIGHT ) ) {
Connection *client;
op = node->avl_data;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
client = op->o_client;
if ( !client ) {
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
continue;
}
CONNECTION_LOCK(client);
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
/* operation_send_reject_locked unlocks and destroys client on
* failure */
if ( operation_send_reject_locked( op,
op->o_tag == LDAP_REQ_SEARCH ? LDAP_TIMELIMIT_EXCEEDED :
LDAP_ADMINLIMIT_EXCEEDED,
"upstream did not respond in time", 0 ) == LDAP_SUCCESS ) {
CONNECTION_UNLOCK_OR_DESTROY(client);
}
if ( rc == LDAP_SUCCESS ) {
rc = operation_send_abandon( op );
}
CONNECTION_LOCK(upstream);
op->o_upstream_refcnt--;
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(upstream);
}
/* TODO: if operation_send_abandon failed, we need to kill the upstream */
if ( rc == LDAP_SUCCESS ) {
connection_write_cb( -1, 0, upstream );
}
CONNECTION_LOCK_DECREF(upstream);
/* just dispose of the AVL, most operations should already be gone */
tavl_free( ops, NULL );
}
static void
backend_timeout(
Backend *b,
struct ConnSt *cq,
Connection **lastp,
time_t threshold )
{
Connection *c, *old;
unsigned long last_connid;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
if ( !*lastp ) {
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
return;
}
last_connid = (*lastp)->c_connid;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, *lastp, c_next );
CONNECTION_LOCK(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
/*
* Ugh... concurrency is annoying:
* - we maintain the connections in the cq CIRCLEQ_ in ascending c_connid
* order
* - the connection with the highest c_connid is maintained at *lastp
* - we can only use cq when we hold b->b_mutex
* - connections might be added to or removed from cq while we're busy
* processing connections
* - connection_destroy touches cq
* - we can't even hold locks of two different connections
* - we need a way to detect we've finished looping around cq for some
* definition of looping around
*
* So as a result, 90% of the code below is spent navigating that...
*/
while ( c->c_connid <= last_connid ) {
Debug( LDAP_DEBUG_TRACE, "backend_timeout: "
"timing out operations for connid=%lu which has %ld "
"pending ops\n",
c->c_connid, c->c_n_ops_executing );
connection_timeout( c, threshold );
if ( c->c_connid == last_connid ) {
break;
}
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_lock( &b->b_mutex );
old = c;
c = LDAP_CIRCLEQ_LOOP_NEXT( cq, c, c_next );
CONNECTION_LOCK(c);
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
CONNECTION_LOCK_DECREF(old);
CONNECTION_UNLOCK_OR_DESTROY(old);
CONNECTION_LOCK_DECREF(c);
}
CONNECTION_UNLOCK_OR_DESTROY(c);
}
void
operations_timeout( evutil_socket_t s, short what, void *arg )
{
struct event *self = arg;
Backend *b;
time_t threshold;
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"running timeout task\n" );
if ( !lload_timeout_api ) goto done;
threshold = slap_get_time() - lload_timeout_api->tv_sec;
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
if ( b->b_n_ops_executing == 0 ) continue;
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out binds for backend uri=%s\n",
b->b_uri.bv_val );
backend_timeout( b, &b->b_bindconns, &b->b_last_bindconn, threshold );
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timing out other operations for backend uri=%s\n",
b->b_uri.bv_val );
backend_timeout( b, &b->b_conns, &b->b_last_conn, threshold );
}
done:
Debug( LDAP_DEBUG_TRACE, "operations_timeout: "
"timeout task finished\n" );
evtimer_add( self, lload_timeout_api );
}
......@@ -122,6 +122,8 @@ LDAP_SLAPD_V (int) slapd_tcp_rmem;
LDAP_SLAPD_V (int) slapd_tcp_wmem;
#endif /* LDAP_TCP_BUFFER */
LDAP_SLAPD_V (struct event *) lload_timeout_event;
#define bvmatch( bv1, bv2 ) \
( ( (bv1)->bv_len == (bv2)->bv_len ) && \
( memcmp( (bv1)->bv_val, (bv2)->bv_val, (bv1)->bv_len ) == 0 ) )
......@@ -178,6 +180,7 @@ LDAP_SLAPD_F (int) operation_send_reject_locked( Operation *op, int result, cons
LDAP_SLAPD_F (void) operation_lost_upstream( Operation *op );
LDAP_SLAPD_F (void) operation_destroy_from_client( Operation *op );
LDAP_SLAPD_F (void) operation_destroy_from_upstream( Operation *op );
LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg );
/*
* sl_malloc.c
......@@ -255,6 +258,7 @@ LDAP_SLAPD_V (const char) Versionstr[];
LDAP_SLAPD_V (int) global_gentlehup;
LDAP_SLAPD_V (int) global_idletimeout;
LDAP_SLAPD_V (struct timeval *) lload_timeout_api;
LDAP_SLAPD_V (struct timeval *) lload_timeout_net;
LDAP_SLAPD_V (struct timeval *) lload_write_timeout;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment