Commit 1cf58aba authored by Howard Chu's avatar Howard Chu
Browse files

Revert prev commit, spoke too soon, close race condition came back.

parent d74a2302
...@@ -43,10 +43,33 @@ ...@@ -43,10 +43,33 @@
#include "slapi/slapi.h" #include "slapi/slapi.h"
#endif #endif
#ifdef SLAP_MULTI_CONN_ARRAY
/* for Multiple Connection Arrary (MCA) Support */
static ldap_pvt_thread_mutex_t* connections_mutex;
static Connection **connections = NULL;
/* set to the number of processors (round up to a power of 2) */
# define NUM_CONNECTION_ARRAY 4
/* partition the array in a modulo manner */
# define MCA_conn_array_id(fd) ((int)(fd)%NUM_CONNECTION_ARRAY)
# define MCA_conn_array_element_id(fd) ((int)(fd)/NUM_CONNECTION_ARRAY)
# define MCA_ARRAY_SIZE ((int)(MCA_conn_array_element_id(dtblsize) + (MCA_conn_array_id(dtblsize) ? 1 : 0)))
# define MCA_conn_check(fd) (dtblsize > 0 && (fd) >= 0 && (fd) < (MCA_ARRAY_SIZE*NUM_CONNECTION_ARRAY))
# define MCA_GET_CONNECTION(fd) (&(connections[MCA_conn_array_id(fd)]) \
[MCA_conn_array_element_id(fd)])
# define MCA_GET_CONN_MUTEX(fd) (&connections_mutex[MCA_conn_array_id(fd)])
#else
/* protected by connections_mutex */ /* protected by connections_mutex */
static ldap_pvt_thread_mutex_t connections_mutex; static ldap_pvt_thread_mutex_t connections_mutex;
static Connection *connections = NULL; static Connection *connections = NULL;
# define MCA_conn_check(fd) (dtblsize > 0 && (fd) < dtblsize)
# define MCA_GET_CONNECTION(fd) (&connections[s])
# define MCA_GET_CONN_MUTEX(fd) (&connections_mutex)
#endif
static ldap_pvt_thread_mutex_t conn_nextid_mutex; static ldap_pvt_thread_mutex_t conn_nextid_mutex;
static unsigned long conn_nextid = 0; static unsigned long conn_nextid = 0;
...@@ -56,7 +79,6 @@ static const char conn_lost_str[] = "connection lost"; ...@@ -56,7 +79,6 @@ static const char conn_lost_str[] = "connection lost";
#define SLAP_C_UNINITIALIZED 0x00 /* MUST BE ZERO (0) */ #define SLAP_C_UNINITIALIZED 0x00 /* MUST BE ZERO (0) */
#define SLAP_C_UNUSED 0x01 #define SLAP_C_UNUSED 0x01
#define SLAP_C_USED 0x02 #define SLAP_C_USED 0x02
#define SLAP_C_PENDING 0x03
/* connection state (protected by c_mutex ) */ /* connection state (protected by c_mutex ) */
#define SLAP_C_INVALID 0x00 /* MUST BE ZERO (0) */ #define SLAP_C_INVALID 0x00 /* MUST BE ZERO (0) */
...@@ -112,6 +134,69 @@ static ldap_pvt_thread_start_t connection_operation; ...@@ -112,6 +134,69 @@ static ldap_pvt_thread_start_t connection_operation;
* Initialize connection management infrastructure. * Initialize connection management infrastructure.
*/ */
int connections_init(void) int connections_init(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i, j;
Connection* conn;
assert( connections == NULL );
if( connections != NULL) {
Debug( LDAP_DEBUG_ANY, "connections_init: already initialized.\n",
0, 0, 0 );
return -1;
}
connections_mutex = (ldap_pvt_thread_mutex_t*) ch_calloc(
NUM_CONNECTION_ARRAY, sizeof(ldap_pvt_thread_mutex_t) );
if( connections_mutex == NULL ) {
Debug( LDAP_DEBUG_ANY, "connections_init: "
"allocation of connection mutexes failed\n", 0, 0, 0 );
return -1;
}
connections = (Connection**) ch_calloc(
NUM_CONNECTION_ARRAY, sizeof(Connection*));
if( connections == NULL ) {
Debug( LDAP_DEBUG_ANY, "connections_init: "
"allocation of connection[%d] failed\n", 0, 0, 0 );
return -1;
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
ldap_pvt_thread_mutex_init( connections_mutex+i );
connections[i] = (Connection*) ch_calloc(
MCA_ARRAY_SIZE, sizeof(Connection) );
if( connections[i] == NULL ) {
Debug( LDAP_DEBUG_ANY, "connections_init: "
"allocation (%d*%ld) of connection array[%d] failed\n",
dtblsize, (long) sizeof(Connection), i );
return -1;
}
}
/* should check return of every call */
ldap_pvt_thread_mutex_init( &conn_nextid_mutex );
assert( connections[0]->c_struct_state == SLAP_C_UNINITIALIZED );
assert( connections[NUM_CONNECTION_ARRAY-1]->c_struct_state ==
SLAP_C_UNINITIALIZED );
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
conn = connections[i];
for ( j = 0; j < MCA_ARRAY_SIZE; j++ ) {
conn[j].c_conn_idx = j;
}
}
/*
* per entry initialization of the Connection array initialization
* will be done by connection_init()
*/
return 0;
}
#else
{ {
int i; int i;
...@@ -148,12 +233,57 @@ int connections_init(void) ...@@ -148,12 +233,57 @@ int connections_init(void)
return 0; return 0;
} }
#endif
/* /*
* Destroy connection management infrastructure. * Destroy connection management infrastructure.
*/ */
int connections_destroy(void) int connections_destroy(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i;
ber_socket_t j;
if( connections == NULL) {
Debug( LDAP_DEBUG_ANY, "connections_destroy: nothing to destroy.\n",
0, 0, 0 );
return -1;
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
Connection* conn = connections[i];
for ( j = 0; j < MCA_ARRAY_SIZE; j++ ) {
if( conn[j].c_struct_state != SLAP_C_UNINITIALIZED ) {
ber_sockbuf_free( conn[j].c_sb );
ldap_pvt_thread_mutex_destroy( &conn[j].c_mutex );
ldap_pvt_thread_mutex_destroy( &conn[j].c_write_mutex );
ldap_pvt_thread_cond_destroy( &conn[j].c_write_cv );
#ifdef LDAP_SLAPI
if ( slapi_plugins_used ) {
slapi_int_free_object_extensions( SLAPI_X_EXT_CONNECTION,
&conn[j] );
}
#endif
}
}
}
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
free( connections[i] );
connections[i] = NULL;
ldap_pvt_thread_mutex_destroy( &connections_mutex[i] );
}
free( connections );
free( connections_mutex );
ldap_pvt_thread_mutex_destroy( &conn_nextid_mutex );
return 0;
}
#else
{ {
ber_socket_t i; ber_socket_t i;
...@@ -187,14 +317,50 @@ int connections_destroy(void) ...@@ -187,14 +317,50 @@ int connections_destroy(void)
ldap_pvt_thread_mutex_destroy( &conn_nextid_mutex ); ldap_pvt_thread_mutex_destroy( &conn_nextid_mutex );
return 0; return 0;
} }
#endif
/* /*
* shutdown all connections * shutdown all connections
*/ */
int connections_shutdown(void) int connections_shutdown(void)
#ifdef SLAP_MULTI_CONN_ARRAY
{
int i;
ber_socket_t j;
for ( i = 0; i < NUM_CONNECTION_ARRAY; i++ ) {
Connection* conn = connections[i];
ldap_pvt_thread_mutex_lock( &connections_mutex[i] );
for ( j = 0; j < MCA_ARRAY_SIZE; j++ ) {
if( conn[j].c_struct_state != SLAP_C_USED ) {
continue;
}
/* give persistent clients a chance to cleanup */
if( conn[j].c_conn_state == SLAP_C_CLIENT ) {
ldap_pvt_thread_pool_submit( &connection_pool,
conn[j].c_clientfunc, conn[j].c_clientarg );
continue;
}
ldap_pvt_thread_mutex_lock( &conn[j].c_mutex );
/* connections_mutex and c_mutex are locked */
connection_closing( &conn[j], "connection shutdown" );
connection_close( &conn[j] );
ldap_pvt_thread_mutex_unlock( &conn[j].c_mutex );
}
ldap_pvt_thread_mutex_unlock( &connections_mutex[i] );
}
return 0;
}
#else
{ {
ber_socket_t i; ber_socket_t i;
ldap_pvt_thread_mutex_lock( &connections_mutex );
for ( i = 0; i < dtblsize; i++ ) { for ( i = 0; i < dtblsize; i++ ) {
if( connections[i].c_struct_state != SLAP_C_USED ) { if( connections[i].c_struct_state != SLAP_C_USED ) {
continue; continue;
...@@ -208,15 +374,18 @@ int connections_shutdown(void) ...@@ -208,15 +374,18 @@ int connections_shutdown(void)
ldap_pvt_thread_mutex_lock( &connections[i].c_mutex ); ldap_pvt_thread_mutex_lock( &connections[i].c_mutex );
/* c_mutex is locked */ /* connections_mutex and c_mutex are locked */
connection_closing( &connections[i], "slapd shutdown" ); connection_closing( &connections[i], "slapd shutdown" );
connection_close( &connections[i] ); connection_close( &connections[i] );
ldap_pvt_thread_mutex_unlock( &connections[i].c_mutex ); ldap_pvt_thread_mutex_unlock( &connections[i].c_mutex );
} }
ldap_pvt_thread_mutex_unlock( &connections_mutex );
return 0; return 0;
} }
#endif
/* /*
* Timeout idle connections. * Timeout idle connections.
...@@ -251,6 +420,8 @@ int connections_timeout_idle(time_t now) ...@@ -251,6 +420,8 @@ int connections_timeout_idle(time_t now)
static Connection* connection_get( ber_socket_t s ) static Connection* connection_get( ber_socket_t s )
{ {
/* connections_mutex should be locked by caller */
Connection *c; Connection *c;
Debug( LDAP_DEBUG_ARGS, Debug( LDAP_DEBUG_ARGS,
...@@ -262,19 +433,15 @@ static Connection* connection_get( ber_socket_t s ) ...@@ -262,19 +433,15 @@ static Connection* connection_get( ber_socket_t s )
if(s == AC_SOCKET_INVALID) return NULL; if(s == AC_SOCKET_INVALID) return NULL;
#ifndef HAVE_WINSOCK #ifndef HAVE_WINSOCK
assert( s < dtblsize ); assert( MCA_conn_check( s ) );
c = &connections[s]; c = MCA_GET_CONNECTION(s);
#else #else
c = NULL; c = NULL;
{ {
ber_socket_t i, sd; ber_socket_t i, sd;
ldap_pvt_thread_mutex_lock( &connections_mutex );
for(i=0; i<dtblsize; i++) { for(i=0; i<dtblsize; i++) {
if( connections[i].c_struct_state == SLAP_C_PENDING )
continue;
if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) { if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
assert( connections[i].c_conn_state == SLAP_C_INVALID ); assert( connections[i].c_conn_state == SLAP_C_INVALID );
assert( connections[i].c_sb == 0 ); assert( connections[i].c_sb == 0 );
...@@ -299,7 +466,6 @@ static Connection* connection_get( ber_socket_t s ) ...@@ -299,7 +466,6 @@ static Connection* connection_get( ber_socket_t s )
break; break;
} }
} }
ldap_pvt_thread_mutex_unlock( &connections_mutex );
} }
#endif #endif
...@@ -362,7 +528,6 @@ long connection_init( ...@@ -362,7 +528,6 @@ long connection_init(
{ {
unsigned long id; unsigned long id;
Connection *c; Connection *c;
int doinit = 0;
assert( connections != NULL ); assert( connections != NULL );
...@@ -383,29 +548,24 @@ long connection_init( ...@@ -383,29 +548,24 @@ long connection_init(
assert( s >= 0 ); assert( s >= 0 );
#ifndef HAVE_WINSOCK #ifndef HAVE_WINSOCK
assert( s < dtblsize ); assert( s < dtblsize );
c = &connections[s]; #endif
if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
doinit = 1; ldap_pvt_thread_mutex_lock( MCA_GET_CONN_MUTEX(s) );
} else {
assert( c->c_struct_state == SLAP_C_UNUSED ); #ifndef HAVE_WINSOCK
} assert( MCA_conn_check( s ) );
c = MCA_GET_CONNECTION(s);
#else #else
{ {
ber_socket_t i; ber_socket_t i;
c = NULL; c = NULL;
ldap_pvt_thread_mutex_lock( &connections_mutex );
for( i=0; i < dtblsize; i++) { for( i=0; i < dtblsize; i++) {
ber_socket_t sd; ber_socket_t sd;
if ( connections[i].c_struct_state == SLAP_C_PENDING )
continue;
if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) { if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
assert( connections[i].c_sb == 0 ); assert( connections[i].c_sb == 0 );
c = &connections[i]; c = &connections[i];
c->c_struct_state = SLAP_C_PENDING;
doinit = 1;
break; break;
} }
...@@ -418,7 +578,6 @@ long connection_init( ...@@ -418,7 +578,6 @@ long connection_init(
if( connections[i].c_struct_state == SLAP_C_UNUSED ) { if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
assert( sd == AC_SOCKET_INVALID ); assert( sd == AC_SOCKET_INVALID );
c = &connections[i]; c = &connections[i];
c->c_struct_state = SLAP_C_PENDING;
break; break;
} }
...@@ -428,18 +587,20 @@ long connection_init( ...@@ -428,18 +587,20 @@ long connection_init(
assert( connections[i].c_conn_state != SLAP_C_INVALID ); assert( connections[i].c_conn_state != SLAP_C_INVALID );
assert( sd != AC_SOCKET_INVALID ); assert( sd != AC_SOCKET_INVALID );
} }
ldap_pvt_thread_mutex_unlock( &connections_mutex );
if( c == NULL ) { if( c == NULL ) {
Debug( LDAP_DEBUG_ANY, Debug( LDAP_DEBUG_ANY,
"connection_init(%d): connection table full " "connection_init(%d): connection table full "
"(%d/%d)\n", s, i, dtblsize); "(%d/%d)\n", s, i, dtblsize);
ldap_pvt_thread_mutex_unlock( &connections_mutex );
return -1; return -1;
} }
} }
#endif #endif
if( doinit ) { assert( c != NULL );
if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
c->c_send_ldap_result = slap_send_ldap_result; c->c_send_ldap_result = slap_send_ldap_result;
c->c_send_search_entry = slap_send_search_entry; c->c_send_search_entry = slap_send_search_entry;
c->c_send_search_reference = slap_send_search_reference; c->c_send_search_reference = slap_send_search_reference;
...@@ -489,10 +650,13 @@ long connection_init( ...@@ -489,10 +650,13 @@ long connection_init(
slapi_int_create_object_extensions( SLAPI_X_EXT_CONNECTION, c ); slapi_int_create_object_extensions( SLAPI_X_EXT_CONNECTION, c );
} }
#endif #endif
c->c_struct_state = SLAP_C_UNUSED;
} }
ldap_pvt_thread_mutex_lock( &c->c_mutex ); ldap_pvt_thread_mutex_lock( &c->c_mutex );
assert( c->c_struct_state == SLAP_C_UNUSED );
assert( BER_BVISNULL( &c->c_authmech ) ); assert( BER_BVISNULL( &c->c_authmech ) );
assert( BER_BVISNULL( &c->c_dn ) ); assert( BER_BVISNULL( &c->c_dn ) );
assert( BER_BVISNULL( &c->c_ndn ) ); assert( BER_BVISNULL( &c->c_ndn ) );
...@@ -523,6 +687,7 @@ long connection_init( ...@@ -523,6 +687,7 @@ long connection_init(
c->c_close_reason = "?"; /* should never be needed */ c->c_close_reason = "?"; /* should never be needed */
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s ); ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_FD, &s );
ldap_pvt_thread_mutex_unlock( &c->c_mutex ); ldap_pvt_thread_mutex_unlock( &c->c_mutex );
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
return 0; return 0;
} }
...@@ -611,6 +776,7 @@ long connection_init( ...@@ -611,6 +776,7 @@ long connection_init(
slapd_add_internal( s, 1 ); slapd_add_internal( s, 1 );
ldap_pvt_thread_mutex_unlock( &c->c_mutex ); ldap_pvt_thread_mutex_unlock( &c->c_mutex );
ldap_pvt_thread_mutex_unlock( MCA_GET_CONN_MUTEX(s) );
backend_connection_init(c); backend_connection_init(c);
...@@ -653,6 +819,7 @@ void connection2anonymous( Connection *c ) ...@@ -653,6 +819,7 @@ void connection2anonymous( Connection *c )
static void static void
connection_destroy( Connection *c ) connection_destroy( Connection *c )
{ {
/* note: connections_mutex should be locked by caller */
ber_socket_t sd; ber_socket_t sd;
unsigned long connid; unsigned long connid;
const char *close_reason; const char *close_reason;
...@@ -674,10 +841,6 @@ connection_destroy( Connection *c ) ...@@ -674,10 +841,6 @@ connection_destroy( Connection *c )
connid = c->c_connid; connid = c->c_connid;
close_reason = c->c_close_reason; close_reason = c->c_close_reason;
ldap_pvt_thread_mutex_lock( &connections_mutex );
c->c_struct_state = SLAP_C_PENDING;
ldap_pvt_thread_mutex_unlock( &connections_mutex );
backend_connection_destroy(c); backend_connection_destroy(c);
c->c_protocol = 0; c->c_protocol = 0;
...@@ -850,7 +1013,7 @@ static void connection_close( Connection *c ) ...@@ -850,7 +1013,7 @@ static void connection_close( Connection *c )
assert( c->c_struct_state == SLAP_C_USED ); assert( c->c_struct_state == SLAP_C_USED );
assert( c->c_conn_state == SLAP_C_CLOSING ); assert( c->c_conn_state == SLAP_C_CLOSING );
/* note: c_mutex should be locked by caller */ /* note: connections_mutex and c_mutex should be locked by caller */
ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd ); ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_GET_FD, &sd );
if( !LDAP_STAILQ_EMPTY(&c->c_ops) ) { if( !LDAP_STAILQ_EMPTY(&c->c_ops) ) {
...@@ -881,21 +1044,71 @@ unsigned long connections_nextid(void) ...@@ -881,21 +1044,71 @@ unsigned long connections_nextid(void)
Connection* connection_first( ber_socket_t *index ) Connection* connection_first( ber_socket_t *index )
{ {
#ifdef SLAP_MULTI_CONN_ARRAY
int conn_array_id;
#endif
assert( connections != NULL ); assert( connections != NULL );
assert( index != NULL ); assert( index != NULL );
ldap_pvt_thread_mutex_lock( &connections_mutex ); #ifdef SLAP_MULTI_CONN_ARRAY
for( *index = 0; *index < dtblsize; (*index)++) { for ( conn_array_id = 0;
if( connections[*index].c_struct_state != SLAP_C_UNINITIALIZED ) { conn_array_id < NUM_CONNECTION_ARRAY;
break; conn_array_id++ )
} {
ldap_pvt_thread_mutex_lock( &connections_mutex[ conn_array_id ] );
} }
ldap_pvt_thread_mutex_unlock( &connections_mutex ); #else
ldap_pvt_thread_mutex_lock( &connections_mutex );
#endif
*index = 0;
return connection_next(NULL, index); return connection_next(NULL, index);
} }
Connection* connection_next( Connection *c, ber_socket_t *index ) Connection* connection_next( Connection *c, ber_socket_t *index )
#ifdef SLAP_MULTI_CONN_ARRAY
{
Connection* conn;
assert( connections != NULL );
assert( index != NULL );
assert( *index >= 0 && *index <= dtblsize );
if( c != NULL ) ldap_pvt_thread_mutex_unlock( &c->c_mutex );
c = NULL;
for(; *index < dtblsize; (*index)++) {
assert( MCA_conn_check( *index ) );
conn = MCA_GET_CONNECTION(*index);
if( conn->c_struct_state == SLAP_C_UNINITIALIZED ) {
assert( conn->c_conn_state == SLAP_C_INVALID );
#ifndef HAVE_WINSOCK
continue;
#else
break;
#endif
}
if( conn->c_struct_state == SLAP_C_USED ) {
assert( conn->c_conn_state != SLAP_C_INVALID );
c = conn;
(*index)++;
break;
}
assert( conn->c_struct_state == SLAP_C_UNUSED );
assert( conn->c_conn_state == SLAP_C_INVALID );
}
if( c != NULL ) ldap_pvt_thread_mutex_lock( &c->c_mutex );
return c;
}
#else
{ {
assert( connections != NULL ); assert( connections != NULL );
assert( index != NULL ); assert( index != NULL );