Commit 3783df8d authored by Quanah Gibson-Mount's avatar Quanah Gibson-Mount
Browse files

Use per-thread slap_counters to eliminate lock contention

Unifdef SLAP_LIGHTWEIGHT_DISPATCHER
Use thread-specific data in pool_context()
Added native thread-specific data support
Concurrency tweaks:
  store conn->c_sd, don't use ber_sockbuf_ctrl to retrieve it.
  use per-thread free lists for operations, no mutexes needed.
parent 15da22d8
......@@ -44,6 +44,7 @@ LDAP_BEGIN_DECL
typedef pthread_t ldap_int_thread_t;
typedef pthread_mutex_t ldap_int_thread_mutex_t;
typedef pthread_cond_t ldap_int_thread_cond_t;
typedef pthread_key_t ldap_int_thread_key_t;
#define ldap_int_thread_equal(a, b) pthread_equal((a), (b))
......@@ -88,6 +89,7 @@ LDAP_BEGIN_DECL
typedef cthread_t ldap_int_thread_t;
typedef struct mutex ldap_int_thread_mutex_t;
typedef struct condition ldap_int_thread_cond_t;
typedef cthread_key_t ldap_int_thread_key_t;
LDAP_END_DECL
......@@ -106,6 +108,7 @@ LDAP_BEGIN_DECL
typedef pth_t ldap_int_thread_t;
typedef pth_mutex_t ldap_int_thread_mutex_t;
typedef pth_cond_t ldap_int_thread_cond_t;
typedef pth_key_t ldap_int_thread_key_t;
#if 0
#define LDAP_THREAD_HAVE_RDWR 1
......@@ -129,6 +132,7 @@ LDAP_BEGIN_DECL
typedef thread_t ldap_int_thread_t;
typedef mutex_t ldap_int_thread_mutex_t;
typedef cond_t ldap_int_thread_cond_t;
typedef thread_key_t ldap_int_thread_key_t;
#define HAVE_REENTRANT_FUNCTIONS 1
......@@ -181,6 +185,7 @@ LDAP_BEGIN_DECL
typedef unsigned long ldap_int_thread_t;
typedef HANDLE ldap_int_thread_mutex_t;
typedef HANDLE ldap_int_thread_cond_t;
typedef DWORD ldap_int_thread_key_t;
LDAP_END_DECL
......@@ -201,6 +206,7 @@ LDAP_BEGIN_DECL
typedef int ldap_int_thread_t;
typedef int ldap_int_thread_mutex_t;
typedef int ldap_int_thread_cond_t;
typedef int ldap_int_thread_key_t;
#define LDAP_THREAD_HAVE_TPOOL 1
typedef int ldap_int_thread_pool_t;
......
......@@ -34,6 +34,7 @@ typedef ldap_int_thread_cond_t ldap_pvt_thread_cond_t;
typedef ldap_int_thread_rdwr_t ldap_pvt_thread_rdwr_t;
#endif
typedef ldap_int_thread_rmutex_t ldap_pvt_thread_rmutex_t;
typedef ldap_int_thread_key_t ldap_pvt_thread_key_t;
#endif /* !LDAP_PVT_THREAD_H_DONE */
#define ldap_pvt_thread_equal ldap_int_thread_equal
......@@ -169,6 +170,18 @@ ldap_pvt_thread_rdwr_wtrylock LDAP_P((ldap_pvt_thread_rdwr_t *rdwrp));
LDAP_F( int )
ldap_pvt_thread_rdwr_wunlock LDAP_P((ldap_pvt_thread_rdwr_t *rdwrp));
LDAP_F( int )
ldap_pvt_thread_key_create LDAP_P((ldap_pvt_thread_key_t *keyp));
LDAP_F( int )
ldap_pvt_thread_key_destroy LDAP_P((ldap_pvt_thread_key_t key));
LDAP_F( int )
ldap_pvt_thread_key_setdata LDAP_P((ldap_pvt_thread_key_t key, void *data));
LDAP_F( int )
ldap_pvt_thread_key_getdata LDAP_P((ldap_pvt_thread_key_t key, void **data));
#ifdef LDAP_DEBUG
LDAP_F( int )
ldap_pvt_thread_rdwr_readers LDAP_P((ldap_pvt_thread_rdwr_t *rdwrp));
......
......@@ -153,4 +153,28 @@ ldap_pvt_thread_self( void )
return cthread_self();
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *key )
{
return cthread_keycreate( key );
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
return( 0 );
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return cthread_setspecific( key, data );
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
return cthread_getspecific( key, data );
}
#endif /* HAVE_MACH_CTHREADS */
......@@ -198,4 +198,37 @@ ldap_pvt_thread_self( void )
return GetCurrentThreadId();
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *keyp )
{
DWORD key = TlsAlloc();
if ( key != TLS_OUT_OF_INDEXES ) {
*keyp = key;
return 0;
} else {
return -1;
}
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
/* TlsFree returns 0 on failure */
return( TlsFree( key ) == 0 );
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return ( TlsSetValue( key, data ) == 0 );
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
void *ptr = TlsGetValue( key );
*data = ptr;
return( ptr ? GetLastError() : 0 );
}
#endif
......@@ -312,6 +312,31 @@ ldap_pvt_thread_t ldap_pvt_thread_self( void )
return pthread_self();
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *key )
{
return pthread_key_create( key, NULL );
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
return pthread_key_delete( key );
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return pthread_setspecific( key, data );
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
*data = pthread_getspecific( key );
return 0;
}
#ifdef LDAP_THREAD_HAVE_RDWR
#ifdef HAVE_PTHREAD_RWLOCK_DESTROY
int
......
......@@ -159,6 +159,31 @@ ldap_pvt_thread_self( void )
return pth_self();
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *key )
{
return pth_key_create( key, NULL );
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
return pth_key_delete( key );
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return pth_key_setdata( key, data );
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
*data = pth_key_getdata( key );
return 0;
}
#ifdef LDAP_THREAD_HAVE_RDWR
int
ldap_pvt_thread_rdwr_init( ldap_pvt_thread_rdwr_t *rw )
......
......@@ -237,6 +237,30 @@ ldap_pvt_thread_self( void )
return(0);
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *key )
{
return(0);
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
return(0);
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return(0);
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
return(0);
}
ldap_pvt_thread_t
ldap_pvt_thread_pool_tid( void *vctx )
{
......
......@@ -159,4 +159,28 @@ ldap_pvt_thread_self( void )
return thr_self();
}
int
ldap_pvt_thread_key_create( ldap_pvt_thread_key_t *key )
{
return thr_keycreate( key, NULL );
}
int
ldap_pvt_thread_key_destroy( ldap_pvt_thread_key_t key )
{
return( 0 );
}
int
ldap_pvt_thread_key_setdata( ldap_pvt_thread_key_t key, void *data )
{
return thr_setspecific( key, data );
}
int
ldap_pvt_thread_key_getdata( ldap_pvt_thread_key_t key, void **data )
{
return thr_getspecific( key, data );
}
#endif /* HAVE_THR */
......@@ -38,11 +38,11 @@ typedef enum ldap_int_thread_pool_state_e {
} ldap_int_thread_pool_state_t;
/* Thread-specific key with data and optional free function */
typedef struct ldap_int_thread_key_s {
typedef struct ldap_int_tpool_key_s {
void *ltk_key;
void *ltk_data;
ldap_pvt_thread_pool_keyfree_t *ltk_free;
} ldap_int_thread_key_t;
} ldap_int_tpool_key_t;
/* Max number of thread-specific keys we store per thread.
* We don't expect to use many...
......@@ -55,7 +55,7 @@ typedef struct ldap_int_thread_key_s {
/* Context: thread ID and thread-specific key/data pairs */
typedef struct ldap_int_thread_userctx_s {
ldap_pvt_thread_t ltu_id;
ldap_int_thread_key_t ltu_key[MAXKEYS];
ldap_int_tpool_key_t ltu_key[MAXKEYS];
} ldap_int_thread_userctx_t;
......@@ -126,6 +126,8 @@ static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
static void *ldap_int_thread_pool_wrapper( void *pool );
static ldap_pvt_thread_key_t ldap_tpool_key;
/* Context of the main thread */
static ldap_int_thread_userctx_t ldap_int_main_thrctx;
......@@ -133,6 +135,7 @@ int
ldap_int_thread_pool_startup ( void )
{
ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
ldap_pvt_thread_key_create( &ldap_tpool_key );
return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
}
......@@ -145,6 +148,7 @@ ldap_int_thread_pool_shutdown ( void )
(ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
}
ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
ldap_pvt_thread_key_destroy( ldap_tpool_key );
return(0);
}
......@@ -552,6 +556,8 @@ ldap_int_thread_pool_wrapper (
ctx.ltu_id = ldap_pvt_thread_self();
TID_HASH(ctx.ltu_id, hash);
ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
/* thread_keys[] is read-only when paused */
......@@ -819,29 +825,10 @@ void ldap_pvt_thread_pool_purgekey( void *key )
*/
void *ldap_pvt_thread_pool_context( )
{
ldap_pvt_thread_t tid;
unsigned i, hash;
ldap_int_thread_userctx_t *ctx;
tid = ldap_pvt_thread_self();
if ( ldap_pvt_thread_equal( tid, ldap_int_main_thrctx.ltu_id ))
return &ldap_int_main_thrctx;
TID_HASH( tid, hash );
i = hash &= (LDAP_MAXTHR-1);
ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
do {
ctx = thread_keys[i].ctx;
if ( ctx == DELETED_THREAD_CTX )
continue;
if ( !ctx || ldap_pvt_thread_equal(thread_keys[i].ctx->ltu_id, tid) )
goto done;
} while ( (i = (i+1) & (LDAP_MAXTHR-1)) != hash );
ctx = NULL;
done:
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
void *ctx = NULL;
return ctx;
ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
return ctx ? ctx : &ldap_int_main_thrctx;
}
/*
......
......@@ -98,7 +98,7 @@ do_abandon( Operation *op, SlapReply *rs )
o, Operation, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
op->o_conn->c_n_ops_pending--;
slap_op_free( o );
slap_op_free( o, NULL );
break;
}
}
......
......@@ -177,6 +177,7 @@ monitor_subsys_ops_update(
struct berval rdn;
int i;
Attribute *a;
slap_counters_t *sc;
static struct berval bv_ops = BER_BVC( "cn=operations" );
assert( mi != NULL );
......@@ -188,21 +189,35 @@ monitor_subsys_ops_update(
ldap_pvt_mp_init( nInitiated );
ldap_pvt_mp_init( nCompleted );
ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex );
ldap_pvt_thread_mutex_lock( &slap_counters.sc_mutex );
for ( i = 0; i < SLAP_OP_LAST; i++ ) {
ldap_pvt_mp_add( nInitiated, slap_counters.sc_ops_initiated_[ i ] );
ldap_pvt_mp_add( nCompleted, slap_counters.sc_ops_completed_[ i ] );
}
ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
for ( i = 0; i < SLAP_OP_LAST; i++ ) {
ldap_pvt_mp_add( nInitiated, sc->sc_ops_initiated_[ i ] );
ldap_pvt_mp_add( nCompleted, sc->sc_ops_completed_[ i ] );
}
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
ldap_pvt_thread_mutex_unlock( &slap_counters.sc_mutex );
} else {
for ( i = 0; i < SLAP_OP_LAST; i++ ) {
if ( dn_match( &rdn, &monitor_op[ i ].nrdn ) )
{
ldap_pvt_thread_mutex_lock( &slap_counters.sc_ops_mutex );
ldap_pvt_thread_mutex_lock( &slap_counters.sc_mutex );
ldap_pvt_mp_init_set( nInitiated, slap_counters.sc_ops_initiated_[ i ] );
ldap_pvt_mp_init_set( nCompleted, slap_counters.sc_ops_completed_[ i ] );
ldap_pvt_thread_mutex_unlock( &slap_counters.sc_ops_mutex );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
ldap_pvt_mp_add( nInitiated, sc->sc_ops_initiated_[ i ] );
ldap_pvt_mp_add( nCompleted, sc->sc_ops_completed_[ i ] );
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
ldap_pvt_thread_mutex_unlock( &slap_counters.sc_mutex );
break;
}
}
......
......@@ -166,6 +166,7 @@ monitor_subsys_sent_update(
struct berval nrdn;
ldap_pvt_mp_t n;
Attribute *a;
slap_counters_t *sc;
int i;
assert( mi != NULL );
......@@ -183,28 +184,48 @@ monitor_subsys_sent_update(
return SLAP_CB_CONTINUE;
}
ldap_pvt_thread_mutex_lock(&slap_counters.sc_sent_mutex);
ldap_pvt_thread_mutex_lock(&slap_counters.sc_mutex);
switch ( i ) {
case MONITOR_SENT_ENTRIES:
ldap_pvt_mp_init_set( n, slap_counters.sc_entries );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
ldap_pvt_mp_add( n, sc->sc_entries );
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
break;
case MONITOR_SENT_REFERRALS:
ldap_pvt_mp_init_set( n, slap_counters.sc_refs );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
ldap_pvt_mp_add( n, sc->sc_refs );
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
break;
case MONITOR_SENT_PDU:
ldap_pvt_mp_init_set( n, slap_counters.sc_pdu );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
ldap_pvt_mp_add( n, sc->sc_pdu );
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
break;
case MONITOR_SENT_BYTES:
ldap_pvt_mp_init_set( n, slap_counters.sc_bytes );
for ( sc = slap_counters.sc_next; sc; sc = sc->sc_next ) {
ldap_pvt_thread_mutex_lock( &sc->sc_mutex );
ldap_pvt_mp_add( n, sc->sc_bytes );
ldap_pvt_thread_mutex_unlock( &sc->sc_mutex );
}
break;
default:
assert(0);
}
ldap_pvt_thread_mutex_unlock(&slap_counters.sc_sent_mutex);
ldap_pvt_thread_mutex_unlock(&slap_counters.sc_mutex);
a = attr_find( e->e_attrs, mi->mi_ad_monitorCounter );
assert( a != NULL );
......
......@@ -70,7 +70,7 @@ int cancel_extop( Operation *op, SlapReply *rs )
LDAP_STAILQ_REMOVE( &op->o_conn->c_pending_ops, o, Operation, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
op->o_conn->c_n_ops_pending--;
slap_op_free( o );
slap_op_free( o, NULL );
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
return LDAP_SUCCESS;
}
......
......@@ -83,25 +83,19 @@ connection_state2str( int state )
static Connection* connection_get( ber_socket_t s );
#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
typedef struct conn_readinfo {
Operation *op;
ldap_pvt_thread_start_t *func;
void *arg;
void *ctx;
int nullop;
} conn_readinfo;
static int connection_input( Connection *c, conn_readinfo *cri );
#else
static int connection_input( Connection *c );
#endif
static void connection_close( Connection *c );
static int connection_op_activate( Operation *op );
#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
static void connection_op_queue( Operation *op );
#endif
static int connection_resched( Connection *conn );
static void connection_abandon( Connection *conn );
static void connection_destroy( Connection *c );
......@@ -267,8 +261,6 @@ static Connection* connection_get( ber_socket_t s )
#else
c = NULL;
{
ber_socket_t i, sd;
ldap_pvt_thread_mutex_lock( &connections_mutex );
for(i=0; i<dtblsize; i++) {
if( connections[i].c_struct_state == SLAP_C_PENDING )
......@@ -280,12 +272,9 @@ static Connection* connection_get( ber_socket_t s )
break;
}
ber_sockbuf_ctrl( connections[i].c_sb,
LBER_SB_OPT_GET_FD, &sd );
if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
assert( connections[i].c_conn_state == SLAP_C_INVALID );
assert( sd == AC_SOCKET_INVALID );
assert( connections[i].c_sd == AC_SOCKET_INVALID );
continue;
}
......@@ -293,7 +282,7 @@ static Connection* connection_get( ber_socket_t s )
* so don't assert details here.
*/
if( sd == s ) {
if( connections[i].c_sd == s ) {
c = &connections[i];
break;
}
......@@ -303,18 +292,15 @@ static Connection* connection_get( ber_socket_t s )
#endif
if( c != NULL ) {
ber_socket_t sd;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
assert( c->c_struct_state != SLAP_C_UNINITIALIZED );
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd );
#ifdef HAVE_WINSOCK
/* Avoid race condition after releasing
* connections_mutex
*/
if ( sd != s ) {
if ( c->c_sd != s ) {
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return NULL;
}
......@@ -323,7 +309,7 @@ static Connection* connection_get( ber_socket_t s )
/* connection must have been closed due to resched */
assert( c->c_conn_state == SLAP_C_INVALID );
assert( sd == AC_SOCKET_INVALID );
assert( c->c_sd == AC_SOCKET_INVALID );
Debug( LDAP_DEBUG_TRACE,
"connection_get(%d): connection not used\n",
......@@ -341,7 +327,7 @@ static Connection* connection_get( ber_socket_t s )
assert( c->c_struct_state == SLAP_C_USED );
assert( c->c_conn_state != SLAP_C_INVALID );
assert( sd != AC_SOCKET_INVALID );
assert( c->c_sd != AC_SOCKET_INVALID );
#ifndef SLAPD_MONITOR
if ( global_idletimeout > 0 )
......@@ -405,8 +391,6 @@ Connection * connection_init(
ldap_pvt_thread_mutex_lock( &connections_mutex );
for( i=0; i < dtblsize; i++) {
ber_socket_t sd;
if ( connections[i].c_struct_state == SLAP_C_PENDING )
continue;
......@@ -418,14 +402,7 @@ Connection * connection_init(
break;
}
sd = AC_SOCKET_INVALID;
if (connections[i].c_sb != NULL) {
ber_sockbuf_ctrl( connections[i].c_sb,
LBER_SB_OPT_GET_FD, &sd );
}
if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
assert( sd == AC_SOCKET_INVALID );
c = &connections[i];
c->c_struct_state = SLAP_C_PENDING;
break;
......@@ -435,7 +412,6 @@ Connection * connection_init(
assert( connections[i].c_struct_state == SLAP_C_USED );
assert( connections[i].c_conn_state != SLAP_C_INVALID );
assert( sd != AC_SOCKET_INVALID );
}
ldap_pvt_thread_mutex_unlock( &connections_mutex );
......@@ -525,6 +501,7 @@ Connection * connection_init(
assert( c->c_writewaiter == 0);
c->c_listener = listener;
c->c_sd = s;
if ( flags & CONN_IS_CLIENT ) {
c->c_connid = 0;
......@@ -677,7 +654,6 @@ void connection2anonymous( Connection *c )
static void
connection_destroy( Connection *c )
{
ber_socket_t sd;
unsigned long connid;
const char *close_reason;