Commit 68ebee47 authored by Howard Chu's avatar Howard Chu
Browse files

Concurrency tweaks:

  store conn->c_sd, don't use ber_sockbuf_ctrl to retrieve it.
  use per-thread free lists for operations, no mutexes needed.
parent 41e62ddb
......@@ -98,7 +98,7 @@ do_abandon( Operation *op, SlapReply *rs )
o, Operation, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
op->o_conn->c_n_ops_pending--;
slap_op_free( o );
slap_op_free( o, NULL );
break;
}
}
......
......@@ -70,7 +70,7 @@ int cancel_extop( Operation *op, SlapReply *rs )
LDAP_STAILQ_REMOVE( &op->o_conn->c_pending_ops, o, Operation, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
op->o_conn->c_n_ops_pending--;
slap_op_free( o );
slap_op_free( o, NULL );
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
return LDAP_SUCCESS;
}
......
......@@ -89,6 +89,7 @@ typedef struct conn_readinfo {
Operation *op;
ldap_pvt_thread_start_t *func;
void *arg;
void *ctx;
int nullop;
} conn_readinfo;
......@@ -267,8 +268,6 @@ static Connection* connection_get( ber_socket_t s )
#else
c = NULL;
{
ber_socket_t i, sd;
ldap_pvt_thread_mutex_lock( &connections_mutex );
for(i=0; i<dtblsize; i++) {
if( connections[i].c_struct_state == SLAP_C_PENDING )
......@@ -280,12 +279,9 @@ static Connection* connection_get( ber_socket_t s )
break;
}
ber_sockbuf_ctrl( connections[i].c_sb,
LBER_SB_OPT_GET_FD, &sd );
if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
assert( connections[i].c_conn_state == SLAP_C_INVALID );
assert( sd == AC_SOCKET_INVALID );
assert( connections[i].c_sd == AC_SOCKET_INVALID );
continue;
}
......@@ -293,7 +289,7 @@ static Connection* connection_get( ber_socket_t s )
* so don't assert details here.
*/
if( sd == s ) {
if( connections[i].c_sd == s ) {
c = &connections[i];
break;
}
......@@ -303,18 +299,15 @@ static Connection* connection_get( ber_socket_t s )
#endif
if( c != NULL ) {
ber_socket_t sd;
ldap_pvt_thread_mutex_lock( &c->c_mutex );
assert( c->c_struct_state != SLAP_C_UNINITIALIZED );
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd );
#ifdef HAVE_WINSOCK
/* Avoid race condition after releasing
* connections_mutex
*/
if ( sd != s ) {
if ( c->c_sd != s ) {
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
return NULL;
}
......@@ -323,7 +316,7 @@ static Connection* connection_get( ber_socket_t s )
/* connection must have been closed due to resched */
assert( c->c_conn_state == SLAP_C_INVALID );
assert( sd == AC_SOCKET_INVALID );
assert( c->c_sd == AC_SOCKET_INVALID );
Debug( LDAP_DEBUG_TRACE,
"connection_get(%d): connection not used\n",
......@@ -341,7 +334,7 @@ static Connection* connection_get( ber_socket_t s )
assert( c->c_struct_state == SLAP_C_USED );
assert( c->c_conn_state != SLAP_C_INVALID );
assert( sd != AC_SOCKET_INVALID );
assert( c->c_sd != AC_SOCKET_INVALID );
#ifndef SLAPD_MONITOR
if ( global_idletimeout > 0 )
......@@ -405,8 +398,6 @@ Connection * connection_init(
ldap_pvt_thread_mutex_lock( &connections_mutex );
for( i=0; i < dtblsize; i++) {
ber_socket_t sd;
if ( connections[i].c_struct_state == SLAP_C_PENDING )
continue;
......@@ -418,14 +409,7 @@ Connection * connection_init(
break;
}
sd = AC_SOCKET_INVALID;
if (connections[i].c_sb != NULL) {
ber_sockbuf_ctrl( connections[i].c_sb,
LBER_SB_OPT_GET_FD, &sd );
}
if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
assert( sd == AC_SOCKET_INVALID );
c = &connections[i];
c->c_struct_state = SLAP_C_PENDING;
break;
......@@ -435,7 +419,6 @@ Connection * connection_init(
assert( connections[i].c_struct_state == SLAP_C_USED );
assert( connections[i].c_conn_state != SLAP_C_INVALID );
assert( sd != AC_SOCKET_INVALID );
}
ldap_pvt_thread_mutex_unlock( &connections_mutex );
......@@ -525,6 +508,7 @@ Connection * connection_init(
assert( c->c_writewaiter == 0);
c->c_listener = listener;
c->c_sd = s;
if ( flags & CONN_IS_CLIENT ) {
c->c_connid = 0;
......@@ -677,7 +661,6 @@ void connection2anonymous( Connection *c )
static void
connection_destroy( Connection *c )
{
ber_socket_t sd;
unsigned long connid;
const char *close_reason;
Sockbuf *sb;
......@@ -754,21 +737,20 @@ connection_destroy( Connection *c )
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
}
ber_sockbuf_ctrl( sb, LBER_SB_OPT_GET_FD, &sd );
/* c must be fully reset by this point; when we call slapd_remove
* it may get immediately reused by a new connection.
*/
if ( sd != AC_SOCKET_INVALID ) {
slapd_remove( sd, sb, 1, 0, 0 );
if ( c->c_sd != AC_SOCKET_INVALID ) {
slapd_remove( c->c_sd, sb, 1, 0, 0 );
if ( close_reason == NULL ) {
Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed\n",
connid, (long) sd, 0, 0, 0 );
connid, (long) c->c_sd, 0, 0, 0 );
} else {
Statslog( LDAP_DEBUG_STATS, "conn=%lu fd=%ld closed (%s)\n",
connid, (long) sd, close_reason, 0, 0 );
connid, (long) c->c_sd, close_reason, 0, 0 );
}
c->c_sd = AC_SOCKET_INVALID;
}
}
......@@ -813,7 +795,7 @@ static void connection_abandon( Connection *c )
while ( (o = LDAP_STAILQ_FIRST( &c->c_txn_ops )) != NULL) {
LDAP_STAILQ_REMOVE_HEAD( &c->c_txn_ops, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
slap_op_free( o );
slap_op_free( o, NULL );
}
/* clear transaction */
......@@ -825,7 +807,7 @@ static void connection_abandon( Connection *c )
while ( (o = LDAP_STAILQ_FIRST( &c->c_pending_ops )) != NULL) {
LDAP_STAILQ_REMOVE_HEAD( &c->c_pending_ops, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
slap_op_free( o );
slap_op_free( o, NULL );
}
}
......@@ -839,18 +821,15 @@ void connection_closing( Connection *c, const char *why )
/* c_mutex must be locked by caller */
if( c->c_conn_state != SLAP_C_CLOSING ) {
ber_socket_t sd;
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd );
Debug( LDAP_DEBUG_TRACE,
"connection_closing: readying conn=%lu sd=%d for close\n",
c->c_connid, sd, 0 );
c->c_connid, c->c_sd, 0 );
/* update state to closing */
c->c_conn_state = SLAP_C_CLOSING;
c->c_close_reason = why;
/* don't listen on this port anymore */
slapd_clr_read( sd, 0 );
slapd_clr_read( c->c_sd, 0 );
/* abandon active operations */
connection_abandon( c );
......@@ -862,13 +841,13 @@ void connection_closing( Connection *c, const char *why )
* connection_resched / connection_close before we
* finish, but that's OK.
*/
slapd_clr_write( sd, 1 );
slapd_clr_write( c->c_sd, 1 );
ldap_pvt_thread_mutex_unlock( &c->c_mutex );
ldap_pvt_thread_mutex_lock( &c->c_write_mutex );
ldap_pvt_thread_mutex_lock( &c->c_mutex );
ldap_pvt_thread_mutex_unlock( &c->c_write_mutex );
} else {
slapd_clr_write( sd, 1 );
slapd_clr_write( c->c_sd, 1 );
}
} else if( why == NULL && c->c_close_reason == conn_lost_str ) {
......@@ -880,8 +859,6 @@ void connection_closing( Connection *c, const char *why )
static void
connection_close( Connection *c )
{
ber_socket_t sd = AC_SOCKET_INVALID;
assert( connections != NULL );
assert( c != NULL );
......@@ -894,22 +871,17 @@ connection_close( Connection *c )
/* NOTE: c_mutex should be locked by caller */
/* NOTE: don't get the file descriptor if not needed */
if ( LogTest( LDAP_DEBUG_TRACE ) ) {
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd );
}
if ( !LDAP_STAILQ_EMPTY(&c->c_ops) ||
!LDAP_STAILQ_EMPTY(&c->c_pending_ops) )
{
Debug( LDAP_DEBUG_TRACE,
"connection_close: deferring conn=%lu sd=%d\n",
c->c_connid, sd, 0 );
c->c_connid, c->c_sd, 0 );
return;
}
Debug( LDAP_DEBUG_TRACE, "connection_close: conn=%lu sd=%d\n",
c->c_connid, sd, 0 );
c->c_connid, c->c_sd, 0 );
connection_destroy( c );
}
......@@ -1175,7 +1147,6 @@ operations_error:
LDAP_STAILQ_REMOVE( &conn->c_ops, op, Operation, o_next);
LDAP_STAILQ_NEXT(op, o_next) = NULL;
slap_op_free( op );
conn->c_n_ops_executing--;
conn->c_n_ops_completed++;
......@@ -1190,6 +1161,7 @@ operations_error:
connection_resched( conn );
ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
slap_op_free( op, ctx );
return NULL;
}
......@@ -1261,6 +1233,7 @@ static void* connection_read_thread( void* ctx, void* argv )
* read incoming LDAP requests. If there is more than one,
* the first one is returned with new_op
*/
cri.ctx = ctx;
if( ( rc = connection_read( s, &cri ) ) < 0 ) {
Debug( LDAP_DEBUG_CONNS, "connection_read(%d) error\n", s, 0, 0 );
return (void*)(long)rc;
......@@ -1509,6 +1482,7 @@ connection_input( Connection *conn )
char *cdn = NULL;
#endif
char *defer = NULL;
void *ctx;
if ( conn->c_currentber == NULL &&
( conn->c_currentber = ber_alloc()) == NULL )
......@@ -1538,15 +1512,12 @@ connection_input( Connection *conn )
tag = ber_get_next( conn->c_sb, &len, conn->c_currentber );
if ( tag != LDAP_TAG_MESSAGE ) {
int err = sock_errno();
ber_socket_t sd;
ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd );
if ( err != EWOULDBLOCK && err != EAGAIN ) {
/* log, close and send error */
Debug( LDAP_DEBUG_TRACE,
"ber_get_next on fd %d failed errno=%d (%s)\n",
sd, err, sock_errstr(err) );
conn->c_sd, err, sock_errstr(err) );
ber_free( conn->c_currentber, 1 );
conn->c_currentber = NULL;
......@@ -1592,7 +1563,12 @@ connection_input( Connection *conn )
connection_abandon( conn );
}
op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++ );
#ifdef SLAP_LIGHTWEIGHT_DISPATCHER
ctx = cri->ctx;
#else
ctx = NULL;
#endif
op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++, ctx );
op->o_conn = conn;
/* clear state if the connection is being reused from inactive */
......@@ -1723,12 +1699,9 @@ connection_resched( Connection *conn )
Operation *op;
if( conn->c_conn_state == SLAP_C_CLOSING ) {
ber_socket_t sd;
ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd );
Debug( LDAP_DEBUG_TRACE, "connection_resched: "
"attempting closing conn=%lu sd=%d\n",
conn->c_connid, sd, 0 );
conn->c_connid, conn->c_sd, 0 );
connection_close( conn );
return 0;
}
......@@ -2010,6 +1983,7 @@ connection_fake_init2(
Operation *op = (Operation *) opbuf;
conn->c_connid = -1;
conn->c_conn_idx = -1;
conn->c_send_ldap_result = slap_send_ldap_result;
conn->c_send_search_entry = slap_send_search_entry;
conn->c_send_search_reference = slap_send_search_reference;
......
......@@ -38,26 +38,27 @@
#endif
static ldap_pvt_thread_mutex_t slap_op_mutex;
static LDAP_STAILQ_HEAD(s_o, Operation) slap_free_ops;
static time_t last_time;
static int last_incr;
void slap_op_init(void)
{
ldap_pvt_thread_mutex_init( &slap_op_mutex );
LDAP_STAILQ_INIT(&slap_free_ops);
}
void slap_op_destroy(void)
{
Operation *o;
ldap_pvt_thread_mutex_destroy( &slap_op_mutex );
}
while ( (o = LDAP_STAILQ_FIRST( &slap_free_ops )) != NULL) {
LDAP_STAILQ_REMOVE_HEAD( &slap_free_ops, o_next );
LDAP_STAILQ_NEXT(o, o_next) = NULL;
ch_free( o );
static void
slap_op_q_destroy( void *key, void *data )
{
Operation *op, *op2;
for ( op = data; op; op = op2 ) {
op2 = LDAP_STAILQ_NEXT( op, o_next );
ber_memfree_x( op, NULL );
}
ldap_pvt_thread_mutex_destroy( &slap_op_mutex );
}
void
......@@ -72,7 +73,7 @@ slap_op_groups_free( Operation *op )
}
void
slap_op_free( Operation *op )
slap_op_free( Operation *op, void *ctx )
{
OperationBuffer *opbuf;
......@@ -110,15 +111,22 @@ slap_op_free( Operation *op )
}
#endif /* defined( LDAP_SLAPI ) */
opbuf = (OperationBuffer *) op;
memset( opbuf, 0, sizeof(*opbuf) );
op->o_hdr = &opbuf->ob_hdr;
op->o_controls = opbuf->ob_controls;
ldap_pvt_thread_mutex_lock( &slap_op_mutex );
LDAP_STAILQ_INSERT_HEAD( &slap_free_ops, op, o_next );
ldap_pvt_thread_mutex_unlock( &slap_op_mutex );
if ( ctx ) {
Operation *op2;
void *otmp = NULL;
ldap_pvt_thread_pool_getkey( ctx, (void *)slap_op_free, &otmp, NULL );
op2 = otmp;
LDAP_STAILQ_NEXT( op, o_next ) = op2;
ldap_pvt_thread_pool_setkey( ctx, (void *)slap_op_free, (void *)op,
slap_op_q_destroy );
} else {
ber_memfree_x( op, NULL );
}
}
void
......@@ -141,16 +149,21 @@ slap_op_alloc(
BerElement *ber,
ber_int_t msgid,
ber_tag_t tag,
ber_int_t id )
ber_int_t id,
void *ctx )
{
Operation *op;
ldap_pvt_thread_mutex_lock( &slap_op_mutex );
if ((op = LDAP_STAILQ_FIRST( &slap_free_ops ))) {
LDAP_STAILQ_REMOVE_HEAD( &slap_free_ops, o_next );
Operation *op = NULL;
if ( ctx ) {
void *otmp = NULL;
ldap_pvt_thread_pool_getkey( ctx, (void *)slap_op_free, &otmp, NULL );
if ( otmp ) {
op = otmp;
otmp = LDAP_STAILQ_NEXT( op, o_next );
ldap_pvt_thread_pool_setkey( ctx, (void *)slap_op_free, otmp,
slap_op_q_destroy );
}
}
ldap_pvt_thread_mutex_unlock( &slap_op_mutex );
if (!op) {
op = (Operation *) ch_calloc( 1, sizeof(OperationBuffer) );
op->o_hdr = &((OperationBuffer *) op)->ob_hdr;
......
......@@ -1372,11 +1372,11 @@ LDAP_SLAPD_F (int) parse_oidm LDAP_P((
LDAP_SLAPD_F (void) slap_op_init LDAP_P(( void ));
LDAP_SLAPD_F (void) slap_op_destroy LDAP_P(( void ));
LDAP_SLAPD_F (void) slap_op_groups_free LDAP_P(( Operation *op ));
LDAP_SLAPD_F (void) slap_op_free LDAP_P(( Operation *op ));
LDAP_SLAPD_F (void) slap_op_free LDAP_P(( Operation *op, void *ctx ));
LDAP_SLAPD_F (void) slap_op_time LDAP_P(( time_t *t, int *n ));
LDAP_SLAPD_F (Operation *) slap_op_alloc LDAP_P((
BerElement *ber, ber_int_t msgid,
ber_tag_t tag, ber_int_t id ));
ber_tag_t tag, ber_int_t id, void *ctx ));
LDAP_SLAPD_F (int) slap_op_add LDAP_P(( Operation **olist, Operation *op ));
LDAP_SLAPD_F (int) slap_op_remove LDAP_P(( Operation **olist, Operation *op ));
......
......@@ -149,7 +149,6 @@ static long send_ldap_ber(
/* write the pdu */
while( 1 ) {
int err;
ber_socket_t sd;
if ( connection_state_closing( conn ) ) {
ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
......@@ -184,8 +183,7 @@ static long send_ldap_ber(
/* wait for socket to be write-ready */
conn->c_writewaiter = 1;
ber_sockbuf_ctrl( conn->c_sb, LBER_SB_OPT_GET_FD, &sd );
slapd_set_write( sd, 1 );
slapd_set_write( conn->c_sd, 1 );
ldap_pvt_thread_cond_wait( &conn->c_write_cv, &conn->c_mutex );
conn->c_writewaiter = 0;
......
......@@ -2672,6 +2672,7 @@ struct Connection {
ldap_pvt_thread_mutex_t c_mutex; /* protect the connection */
Sockbuf *c_sb; /* ber connection stuff */
ber_socket_t c_sd;
/* only can be changed by connect_init */
time_t c_starttime; /* when the connection was opened */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment