Commit 9d2f1530 authored by Howard Chu's avatar Howard Chu
Browse files

ITS#7926 dynamic changes to olcListenerThreads

Reallocates sockets from old to new listener threads
parent 2f94318f
......@@ -282,6 +282,10 @@ LDAP_F( int )
ldap_pvt_thread_pool_pausecheck LDAP_P((
ldap_pvt_thread_pool_t *pool ));
LDAP_F( int )
ldap_pvt_thread_pool_pausecheck_native LDAP_P((
ldap_pvt_thread_pool_t *pool ));
LDAP_F( int )
ldap_pvt_thread_pool_pause LDAP_P((
ldap_pvt_thread_pool_t *pool ));
......
......@@ -1239,6 +1239,32 @@ ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
}
/*
* Wait for a pause, from a non-pooled thread.
*/
int
ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool )
{
struct ldap_int_thread_pool_s *pool;
if (tpool == NULL)
return(-1);
pool = *tpool;
if (pool == NULL)
return(0);
if (!pool->ltp_pause)
return(0);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
while (pool->ltp_pause)
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return 1;
}
/*
* Pause the pool. The calling task must be active, not idle.
* Return when all other tasks are paused or idle.
......
......@@ -1043,6 +1043,14 @@ typedef struct ADlist {
static ADlist *sortVals;
static int new_daemon_threads;
static int
config_resize_lthreads(ConfigArgs *c)
{
return slapd_daemon_resize( new_daemon_threads );
}
static int
config_generic(ConfigArgs *c) {
int i;
......@@ -1806,7 +1814,7 @@ config_generic(ConfigArgs *c) {
case CFG_THREADQS:
if ( c->value_int < 1 ) {
snprintf( c->cr_msg, sizeof( c->cr_msg ),
"threadqueuess=%d smaller than minimum value 1",
"threadqueues=%d smaller than minimum value 1",
c->value_int );
Debug(LDAP_DEBUG_ANY, "%s: %s.\n",
c->log, c->cr_msg );
......@@ -1824,6 +1832,14 @@ config_generic(ConfigArgs *c) {
break;
case CFG_LTHREADS:
if ( c->value_uint < 1 ) {
snprintf( c->cr_msg, sizeof( c->cr_msg ),
"listenerthreads=%u smaller than minimum value 1",
c->value_uint );
Debug(LDAP_DEBUG_ANY, "%s: %s.\n",
c->log, c->cr_msg );
return 1;
}
{ int mask = 0;
/* use a power of two */
while (c->value_uint > 1) {
......@@ -1831,8 +1847,8 @@ config_generic(ConfigArgs *c) {
mask <<= 1;
mask |= 1;
}
slapd_daemon_mask = mask;
slapd_daemon_threads = mask+1;
new_daemon_threads = mask+1;
config_push_cleanup( c, config_resize_lthreads );
}
break;
......@@ -4195,11 +4211,11 @@ config_tls_option(ConfigArgs *c) {
if (c->op == SLAP_CONFIG_EMIT) {
return ldap_pvt_tls_get_option( ld, flag, berval ? (void *)&c->value_bv : (void *)&c->value_string );
} else if ( c->op == LDAP_MOD_DELETE ) {
c->cleanup = config_tls_cleanup;
config_push_cleanup( c, config_tls_cleanup );
return ldap_pvt_tls_set_option( ld, flag, NULL );
}
if ( !berval ) ch_free(c->value_string);
c->cleanup = config_tls_cleanup;
config_push_cleanup( c, config_tls_cleanup );
rc = ldap_pvt_tls_set_option(ld, flag, berval ? (void *)&c->value_bv : (void *)c->argv[1]);
if ( berval ) ch_free(c->value_bv.bv_val);
return rc;
......@@ -4223,11 +4239,11 @@ config_tls_config(ConfigArgs *c) {
return slap_tls_get_config( slap_tls_ld, flag, &c->value_string );
} else if ( c->op == LDAP_MOD_DELETE ) {
int i = 0;
c->cleanup = config_tls_cleanup;
config_push_cleanup( c, config_tls_cleanup );
return ldap_pvt_tls_set_option( slap_tls_ld, flag, &i );
}
ch_free( c->value_string );
c->cleanup = config_tls_cleanup;
config_push_cleanup( c, config_tls_cleanup );
if ( isdigit( (unsigned char)c->argv[1][0] ) && c->type != CFG_TLS_PROTOCOL_MIN ) {
if ( lutil_atoi( &i, c->argv[1] ) != 0 ) {
Debug(LDAP_DEBUG_ANY, "%s: "
......@@ -5613,8 +5629,8 @@ ok:
rc = ca->bi->bi_db_open( ca->be, &ca->reply );
ca->be->bd_info = bi_orig;
}
} else if ( ca->cleanup ) {
rc = ca->cleanup( ca );
} else if ( ca->num_cleanups ) {
rc = config_run_cleanup( ca );
}
if ( rc ) {
if (ca->cr_msg[0] == '\0')
......@@ -5684,8 +5700,8 @@ done:
overlay_destroy_one( ca->be, (slap_overinst *)ca->bi );
} else if ( coptr->co_type == Cft_Schema ) {
schema_destroy_one( ca, colst, nocs, last );
} else if ( ca->cleanup ) {
ca->cleanup( ca );
} else if ( ca->num_cleanups ) {
config_run_cleanup( ca );
}
}
done_noop:
......@@ -6224,8 +6240,8 @@ out:
ca->reply = msg;
}
if ( ca->cleanup ) {
i = ca->cleanup( ca );
if ( ca->num_cleanups ) {
i = config_run_cleanup( ca );
if (rc == LDAP_SUCCESS)
rc = i;
}
......
......@@ -706,6 +706,17 @@ connection_destroy( Connection *c )
}
}
int connection_is_active( ber_socket_t s )
{
Connection *c;
assert( s < dtblsize );
c = &connections[s];
return c->c_conn_state == SLAP_C_CLOSING ||
c->c_conn_state == SLAP_C_BINDING ||
c->c_conn_state == SLAP_C_ACTIVE ;
}
int connection_valid( Connection *c )
{
/* c_mutex must be locked by caller */
......
......@@ -81,9 +81,6 @@ ber_socket_t dtblsize;
slap_ssf_t local_ssf = LDAP_PVT_SASL_LOCAL_SSF;
struct runqueue_s slapd_rq;
#ifndef SLAPD_MAX_DAEMON_THREADS
#define SLAPD_MAX_DAEMON_THREADS 16
#endif
int slapd_daemon_threads = 1;
int slapd_daemon_mask;
......@@ -94,7 +91,6 @@ int slapd_tcp_wmem;
Listener **slap_listeners = NULL;
static volatile sig_atomic_t listening = 1; /* 0 when slap_listeners closed */
static ldap_pvt_thread_t *listener_tid;
#ifndef SLAPD_LISTEN_BACKLOG
#define SLAPD_LISTEN_BACKLOG 2048
......@@ -102,7 +98,9 @@ static ldap_pvt_thread_t *listener_tid;
#define DAEMON_ID(fd) (fd & slapd_daemon_mask)
static ber_socket_t wake_sds[SLAPD_MAX_DAEMON_THREADS][2];
typedef ber_socket_t sdpair[2];
static sdpair *wake_sds;
static ldap_pvt_thread_mutex_t emfile_mutex;
static int emfile;
......@@ -136,6 +134,7 @@ typedef struct slap_daemon_st {
ber_socket_t sd_nactives;
int sd_nwriters;
int sd_nfds;
ldap_pvt_thread_t sd_tid;
#if defined(HAVE_KQUEUE)
uint8_t* sd_fdmodes; /* indexed by fd */
......@@ -173,7 +172,7 @@ typedef struct slap_daemon_st {
#endif /* ! kqueue && ! epoll && ! /dev/poll */
} slap_daemon_st;
static slap_daemon_st slap_daemon[SLAPD_MAX_DAEMON_THREADS];
static slap_daemon_st *slap_daemon;
/*
* NOTE: naming convention for macros:
......@@ -1881,11 +1880,13 @@ slapd_daemon_init( const char *urls )
Debug( LDAP_DEBUG_ARGS, "daemon_init: %s\n",
urls ? urls : "<null>" );
for ( i=0; i<SLAPD_MAX_DAEMON_THREADS; i++ ) {
wake_sds = ch_malloc( slapd_daemon_threads * sizeof( sdpair ));
for ( i=0; i<slapd_daemon_threads; i++ ) {
wake_sds[i][0] = AC_SOCKET_INVALID;
wake_sds[i][1] = AC_SOCKET_INVALID;
}
slap_daemon = ch_calloc( slapd_daemon_threads, sizeof( slap_daemon_st ));
ldap_pvt_thread_mutex_init( &slap_daemon[0].sd_mutex );
#ifdef HAVE_TCPD
ldap_pvt_thread_mutex_init( &sd_tcpd_mutex );
......@@ -1972,6 +1973,61 @@ slapd_daemon_init( const char *urls )
return !i;
}
/* transfer control of active sockets from old to new listener threads */
static void
slapd_socket_realloc( int newnum )
{
int i, j, oldid, newid;
int newmask = newnum - 1;
Listener *sl;
int num_listeners;
for ( i=0; slap_listeners[i] != NULL; i++ ) ;
num_listeners = i;
for ( i=0; i<dtblsize; i++ ) {
int skip = 0;
/* don't bother with wake_sds, they're assigned independent of mask */
for (j=0; j<slapd_daemon_threads; j++) {
if ( i == wake_sds[j][0] || i == wake_sds[j][1] ) {
skip = 1;
break;
}
}
if ( skip ) continue;
oldid = DAEMON_ID(i);
newid = i & newmask;
if ( oldid == newid ) continue;
if ( !SLAP_SOCK_IS_ACTIVE( oldid, i )) continue;
sl = NULL;
if ( num_listeners ) {
for ( j=0; slap_listeners[j] != NULL; j++ ) {
if ( slap_listeners[j]->sl_sd == i ) {
sl = slap_listeners[j];
num_listeners--;
break;
}
}
}
SLAP_SOCK_ADD( newid, i, sl );
if ( SLAP_SOCK_IS_READ( oldid, i )) {
SLAP_SOCK_SET_READ( newid, i );
}
if ( SLAP_SOCK_IS_WRITE( oldid, i )) {
SLAP_SOCK_SET_WRITE( newid, i );
slap_daemon[oldid].sd_nwriters--;
slap_daemon[newid].sd_nwriters++;
}
if ( connection_is_active( i )) {
slap_daemon[oldid].sd_nactives--;
slap_daemon[newid].sd_nactives++;
}
SLAP_SOCK_DEL( oldid, i );
}
}
int
slapd_daemon_destroy( void )
......@@ -2409,7 +2465,8 @@ slapd_daemon_task(
int l;
time_t last_idle_check = 0;
int ebadf = 0;
int tid = (ldap_pvt_thread_t *) ptr - listener_tid;
int tid = (slap_daemon_st *) ptr - slap_daemon;
int old_threads = slapd_daemon_threads;
#define SLAPD_IDLE_CHECK_LIMIT 4
......@@ -2783,6 +2840,8 @@ loop:
continue;
}
if ( DAEMON_ID( lr->sl_sd ) != tid ) continue;
if ( lr->sl_mute ) {
Debug( LDAP_DEBUG_CONNS,
"daemon: " SLAP_EVENT_FNAME ": "
......@@ -2870,6 +2929,7 @@ loop:
if ( ns <= 0 ) break;
if ( slap_listeners[l]->sl_sd == AC_SOCKET_INVALID ) continue;
if ( DAEMON_ID( slap_listeners[l]->sl_sd ) != tid ) continue;
#ifdef LDAP_CONNECTIONLESS
if ( slap_listeners[l]->sl_is_udp ) continue;
#endif /* LDAP_CONNECTIONLESS */
......@@ -3088,6 +3148,13 @@ loop:
}
#endif /* SLAP_EVENTS_ARE_INDEXED */
/* Was number of listener threads decreased? */
if ( ldap_pvt_thread_pool_pausecheck_native( &connection_pool )) {
/* decreased, let this thread finish */
if ( tid >= slapd_daemon_threads )
break;
}
#ifndef HAVE_YIELDING_SELECT
ldap_pvt_thread_yield();
#endif /* ! HAVE_YIELDING_SELECT */
......@@ -3136,6 +3203,107 @@ loop:
return NULL;
}
typedef struct slap_tid_waiter {
int num_tids;
ldap_pvt_thread_t tids[0];
} slap_tid_waiter;
static void *
slapd_daemon_tid_cleanup(
void *ctx,
void *ptr )
{
slap_tid_waiter *tids = ptr;
int i;
for ( i=0; i<tids->num_tids; i++ )
ldap_pvt_thread_join( tids->tids[i], (void *)NULL );
ch_free( ptr );
return NULL;
}
int
slapd_daemon_resize( int newnum )
{
int i, rc;
if ( newnum == slapd_daemon_threads )
return 0;
/* wake up all current listener threads */
for ( i=0; i<slapd_daemon_threads; i++ )
WAKE_LISTENER(i,1);
/* mutexes may not survive realloc, so destroy & recreate later */
for ( i=0; i<slapd_daemon_threads; i++ )
ldap_pvt_thread_mutex_destroy( &slap_daemon[i].sd_mutex );
if ( newnum > slapd_daemon_threads ) {
wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair ));
slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st ));
for ( i=slapd_daemon_threads; i<newnum; i++ )
{
memset( &slap_daemon[i], 0, sizeof( slap_daemon_st ));
if( (rc = lutil_pair( wake_sds[i] )) < 0 ) {
Debug( LDAP_DEBUG_ANY,
"daemon: lutil_pair() failed rc=%d\n", rc );
return rc;
}
ber_pvt_socket_set_nonblock( wake_sds[i][1], 1 );
SLAP_SOCK_INIT(i);
}
for ( i=0; i<newnum; i++ )
ldap_pvt_thread_mutex_init( &slap_daemon[i].sd_mutex );
slapd_socket_realloc( newnum );
for ( i=slapd_daemon_threads; i<newnum; i++ )
{
/* listener as a separate THREAD */
rc = ldap_pvt_thread_create( &slap_daemon[i].sd_tid,
0, slapd_daemon_task, &slap_daemon[i] );
if ( rc != 0 ) {
Debug( LDAP_DEBUG_ANY,
"listener ldap_pvt_thread_create failed (%d)\n", rc );
return rc;
}
}
} else {
int j;
slap_tid_waiter *tids = ch_malloc( sizeof(slap_tid_waiter) +
((slapd_daemon_threads - newnum) * sizeof(ldap_pvt_thread_t )));
slapd_socket_realloc( newnum );
tids->num_tids = slapd_daemon_threads - newnum;
for ( i=newnum, j=0; i<slapd_daemon_threads; i++, j++ ) {
tids->tids[j] = slap_daemon[i].sd_tid;
#ifdef HAVE_WINSOCK
if ( wake_sds[i][1] != INVALID_SOCKET &&
SLAP_FD2SOCK( wake_sds[i][1] ) != SLAP_FD2SOCK( wake_sds[i][0] ))
#endif /* HAVE_WINSOCK */
tcp_close( SLAP_FD2SOCK(wake_sds[i][1]) );
#ifdef HAVE_WINSOCK
if ( wake_sds[i][0] != INVALID_SOCKET )
#endif /* HAVE_WINSOCK */
tcp_close( SLAP_FD2SOCK(wake_sds[i][0]) );
SLAP_SOCK_DESTROY( i );
}
wake_sds = ch_realloc( wake_sds, newnum * sizeof( sdpair ));
slap_daemon = ch_realloc( slap_daemon, newnum * sizeof( slap_daemon_st ));
for ( i=0; i<newnum; i++ )
ldap_pvt_thread_mutex_init( &slap_daemon[i].sd_mutex );
ldap_pvt_thread_pool_submit( &connection_pool,
slapd_daemon_tid_cleanup, (void *) tids );
}
slapd_daemon_threads = newnum;
slapd_daemon_mask = newnum - 1;
return 0;
}
#ifdef LDAP_CONNECTIONLESS
static int
......@@ -3177,11 +3345,6 @@ slapd_daemon( void )
connectionless_init();
#endif /* LDAP_CONNECTIONLESS */
if ( slapd_daemon_threads > SLAPD_MAX_DAEMON_THREADS )
slapd_daemon_threads = SLAPD_MAX_DAEMON_THREADS;
listener_tid = ch_malloc(slapd_daemon_threads * sizeof(ldap_pvt_thread_t));
SLAP_SOCK_INIT2();
/* daemon_init only inits element 0 */
......@@ -3202,8 +3365,8 @@ slapd_daemon( void )
for ( i=0; i<slapd_daemon_threads; i++ )
{
/* listener as a separate THREAD */
rc = ldap_pvt_thread_create( &listener_tid[i],
0, slapd_daemon_task, &listener_tid[i] );
rc = ldap_pvt_thread_create( &slap_daemon[i].sd_tid,
0, slapd_daemon_task, &slap_daemon[i] );
if ( rc != 0 ) {
Debug( LDAP_DEBUG_ANY,
......@@ -3214,11 +3377,9 @@ slapd_daemon( void )
/* wait for the listener threads to complete */
for ( i=0; i<slapd_daemon_threads; i++ )
ldap_pvt_thread_join( listener_tid[i], (void *)NULL );
ldap_pvt_thread_join( slap_daemon[i].sd_tid, (void *)NULL );
destroy_listeners();
ch_free( listener_tid );
listener_tid = NULL;
return 0;
}
......
......@@ -802,6 +802,7 @@ LDAP_SLAPD_F (Connection *) connection_init LDAP_P((
LDAP_SLAPD_F (void) connection_closing LDAP_P((
Connection *c, const char *why ));
LDAP_SLAPD_F (int) connection_is_active LDAP_P(( ber_socket_t s ));
LDAP_SLAPD_F (int) connection_valid LDAP_P(( Connection *c ));
LDAP_SLAPD_F (const char *) connection_state2str LDAP_P(( int state ))
LDAP_GCCATTR((const));
......@@ -876,6 +877,7 @@ LDAP_SLAPD_F (void) slap_queue_csn LDAP_P(( Operation *, struct berval * ));
*/
LDAP_SLAPD_F (void) slapd_add_internal(ber_socket_t s, int isactive);
LDAP_SLAPD_F (int) slapd_daemon_init( const char *urls );
LDAP_SLAPD_F (int) slapd_daemon_resize( int newnum );
LDAP_SLAPD_F (int) slapd_daemon_destroy(void);
LDAP_SLAPD_F (int) slapd_daemon(void);
LDAP_SLAPD_F (Listener **) slapd_get_listeners LDAP_P((void));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment