Commit 485a1697 authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

Implement pause handlers

parent db5966f6
......@@ -74,6 +74,10 @@ volatile sig_atomic_t slapd_abrupt_shutdown = 0;
static int emfile;
ldap_pvt_thread_mutex_t lload_wait_mutex;
ldap_pvt_thread_cond_t lload_wait_cond;
ldap_pvt_thread_cond_t lload_pause_cond;
#ifndef SLAPD_MAX_DAEMON_THREADS
#define SLAPD_MAX_DAEMON_THREADS 16
#endif
......@@ -139,6 +143,43 @@ lloadd_close( ber_socket_t s )
tcp_close( s );
}
static int
lload_base_dispatch( struct event_base *base )
{
int rc;
while ( (rc = event_base_dispatch( base )) == 0 ) {
if ( event_base_got_exit( base ) ) {
break;
}
Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: "
"handling pause\n" );
/*
* We are pausing, signal the pausing thread we've finished and
* wait until the thread pool resumes operation.
*
* Do this in lockstep with the pausing thread.
*/
ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
ldap_pvt_thread_cond_signal( &lload_wait_cond );
/* Now wait until we resume */
ldap_pvt_thread_cond_wait( &lload_pause_cond, &lload_wait_mutex );
ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
Debug( LDAP_DEBUG_TRACE, "lload_base_dispatch: "
"resuming\n" );
}
if ( rc ) {
Debug( LDAP_DEBUG_ANY, "lload_base_dispatch: "
"event_base_dispatch() returned an error rc=%d\n",
rc );
}
return rc;
}
static void
lload_free_listener_addresses( struct sockaddr **sal )
{
......@@ -946,7 +987,7 @@ lload_listener(
static void *
lload_listener_thread( void *ctx )
{
int rc = event_base_dispatch( listener_base );
int rc = lload_base_dispatch( listener_base );
Debug( LDAP_DEBUG_ANY, "lload_listener_thread: "
"event loop finished: rc=%d\n",
rc );
......@@ -1214,7 +1255,7 @@ lloadd_io_task( void *ptr )
lload_daemon[tid].wakeup_event = event;
/* run */
rc = event_base_dispatch( base );
rc = lload_base_dispatch( base );
Debug( LDAP_DEBUG_ANY, "lloadd_io_task: "
"Daemon %d, event loop finished: rc=%d\n",
tid, rc );
......@@ -1305,7 +1346,7 @@ lloadd_daemon( struct event_base *daemon_base )
}
lloadd_inited = 1;
rc = event_base_dispatch( daemon_base );
rc = lload_base_dispatch( daemon_base );
Debug( LDAP_DEBUG_ANY, "lloadd shutdown: "
"Main event loop finished: rc=%d\n",
rc );
......@@ -1352,6 +1393,55 @@ daemon_wakeup_cb( evutil_socket_t sig, short what, void *arg )
}
}
#ifdef BALANCER_MODULE
/*
* Signal the event base to terminate processing as soon as it can and wait for
* lload_base_dispatch to notify us this has happened.
*/
static int
lload_pause_base( struct event_base *base )
{
int rc;
ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
event_base_loopbreak( base );
rc = ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
return rc;
}
void
lload_pause_server( void )
{
int i;
lload_pause_base( listener_base );
lload_pause_base( daemon_base );
for ( i = 0; i < lload_daemon_threads; i++ ) {
lload_pause_base( lload_daemon[i].base );
}
}
void
lload_unpause_server( void )
{
/*
* Make sure lloadd is completely ready to unpause by now:
*
* After the broadcast, we handle I/O and begin filling the thread pool, in
* high load conditions, we might hit the pool limits and start processing
* operations in the I/O threads (one PDU per socket at a time for fairness
* sake) even before a pause has finished from slapd's point of view!
*
* When (max_pdus_per_cycle == 0) we don't use the pool for these at all and
* most lload processing starts immediately making this even more prominent.
*/
ldap_pvt_thread_cond_broadcast( &lload_pause_cond );
}
#endif /* BALANCER_MODULE */
void
lload_sig_shutdown( evutil_socket_t sig, short what, void *arg )
{
......
......@@ -101,6 +101,10 @@ lload_init( int mode, const char *name )
LDAP_STAILQ_INIT( &slapd_rq.task_list );
LDAP_STAILQ_INIT( &slapd_rq.run_list );
ldap_pvt_thread_mutex_init( &lload_wait_mutex );
ldap_pvt_thread_cond_init( &lload_wait_cond );
ldap_pvt_thread_cond_init( &lload_pause_cond );
ldap_pvt_thread_mutex_init( &backend_mutex );
ldap_pvt_thread_mutex_init( &clients_mutex );
ldap_pvt_thread_mutex_init( &lload_pin_mutex );
......
......@@ -92,6 +92,11 @@ LDAP_SLAPD_V (LloadBackend *) current_backend;
LDAP_SLAPD_V (struct slap_bindconf) bindconf;
LDAP_SLAPD_V (struct berval) lloadd_identity;
/* Used to coordinate server (un)pause, shutdown */
LDAP_SLAPD_V (ldap_pvt_thread_mutex_t) lload_wait_mutex;
LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_pause_cond;
LDAP_SLAPD_V (ldap_pvt_thread_cond_t) lload_wait_cond;
typedef int lload_cf_aux_table_parse_x( struct berval *val,
void *bc,
slap_cf_aux_table *tab0,
......
......@@ -64,6 +64,10 @@ lload_conn_pool_init()
{
int rc = 0;
ldap_pvt_thread_mutex_init( &lload_wait_mutex );
ldap_pvt_thread_cond_init( &lload_pause_cond );
ldap_pvt_thread_cond_init( &lload_wait_cond );
ldap_pvt_thread_mutex_init( &backend_mutex );
ldap_pvt_thread_mutex_init( &clients_mutex );
ldap_pvt_thread_mutex_init( &lload_pin_mutex );
......@@ -74,6 +78,20 @@ lload_conn_pool_init()
return rc;
}
static int
lload_pause_cb( BackendInfo *bi )
{
lload_pause_server();
return 0;
}
static int
lload_unpause_cb( BackendInfo *bi )
{
lload_unpause_server();
return 0;
}
int
lload_back_open( BackendInfo *bi )
{
......@@ -110,7 +128,10 @@ lload_back_close( BackendInfo *bi )
return 0;
}
ldap_pvt_thread_mutex_lock( &lload_wait_mutex );
event_base_loopexit( daemon_base, NULL );
ldap_pvt_thread_cond_wait( &lload_wait_cond, &lload_wait_mutex );
ldap_pvt_thread_mutex_unlock( &lload_wait_mutex );
ldap_pvt_thread_join( lloadd_main_thread, (void *)NULL );
return 0;
......@@ -122,6 +143,8 @@ lload_back_initialize( BackendInfo *bi )
bi->bi_flags = SLAP_BFLAG_STANDALONE;
bi->bi_open = lload_back_open;
bi->bi_config = config_generic_wrapper;
bi->bi_pause = lload_pause_cb;
bi->bi_unpause = lload_unpause_cb;
bi->bi_close = lload_back_close;
bi->bi_destroy = 0;
......
......@@ -104,6 +104,9 @@ LDAP_SLAPD_V (int) lload_daemon_mask;
LDAP_SLAPD_F (void) lload_sig_shutdown( evutil_socket_t sig, short what, void *arg );
LDAP_SLAPD_F (void) lload_pause_server( void );
LDAP_SLAPD_F (void) lload_unpause_server( void );
LDAP_SLAPD_V (struct event_base *) daemon_base;
LDAP_SLAPD_V (struct evdns_base *) dnsbase;
LDAP_SLAPD_V (volatile sig_atomic_t) slapd_shutdown;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment