Commit 5a1da417 authored by Ondřej Kuzník's avatar Ondřej Kuzník Committed by Quanah Gibson-Mount
Browse files

ITS#9620 Fix deadlocks between searches and register_entry

The cache is again protected by a simple mutex and mp_children/mp_next
is protected by mp_mutex. Each monitor_send_children is called holding
the corresponding mp_mutex meaning the whole path from search base to
the current entry is off limits - cn=monitor searches cannot run in
parallel right now.
parent b096aed4
......@@ -56,6 +56,7 @@ typedef struct monitor_entry_t {
ldap_pvt_thread_mutex_t mp_mutex; /* entry mutex */
Entry *mp_next; /* pointer to next sibling */
Entry *mp_children; /* pointer to first child */
Entry *mp_last; /* pointer to last child */
struct monitor_subsys_t *mp_info; /* subsystem info */
#define mp_type mp_info->mss_type
unsigned long mp_flags; /* flags */
......@@ -81,9 +82,13 @@ typedef struct monitor_info_t {
/*
* Internal data
*
* Lock order:
* - cache first, then entry
* - DIT in preorder DFS
*/
Avlnode *mi_cache;
ldap_pvt_thread_rdwr_t mi_cache_rwlock;
ldap_pvt_thread_mutex_t mi_cache_lock;
/*
* Config parameters
......
......@@ -38,7 +38,7 @@ monitor_subsys_backend_init(
)
{
monitor_info_t *mi;
Entry *e_backend, **ep;
Entry *e_backend;
int i;
monitor_entry_t *mp;
monitor_subsys_t *ms_database;
......@@ -64,10 +64,6 @@ monitor_subsys_backend_init(
return( -1 );
}
mp = ( monitor_entry_t * )e_backend->e_private;
mp->mp_children = NULL;
ep = &mp->mp_children;
i = -1;
LDAP_STAILQ_FOREACH( bi, &backendInfo, bi_next ) {
char buf[ BACKMONITOR_BUFSIZE ];
......@@ -139,7 +135,7 @@ monitor_subsys_backend_init(
mp->mp_info = ms;
mp->mp_flags = ms->mss_flags | MONITOR_F_SUB;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_backend ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_backend_init: "
"unable to add entry \"cn=Backend %d,%s\"\n",
......@@ -147,9 +143,6 @@ monitor_subsys_backend_init(
ms->mss_ndn.bv_val );
return( -1 );
}
*ep = e;
ep = &mp->mp_next;
}
monitor_cache_release( mi, e_backend );
......
......@@ -79,22 +79,74 @@ monitor_cache_dup(
int
monitor_cache_add(
monitor_info_t *mi,
Entry *e )
Entry *e,
Entry *parent )
{
monitor_cache_t *mc;
int rc;
monitor_cache_t tmp_mc, *mc, *pmc = NULL;
Entry **ep = NULL, *prev = NULL;
int rc = -1;
assert( mi != NULL );
assert( e != NULL );
dnParent( &e->e_nname, &tmp_mc.mc_ndn );
mc = ( monitor_cache_t * )ch_malloc( sizeof( monitor_cache_t ) );
mc->mc_ndn = e->e_nname;
mc->mc_e = e;
ldap_pvt_thread_rdwr_wlock( &mi->mi_cache_rwlock );
rc = ldap_avl_insert( &mi->mi_cache, ( caddr_t )mc,
if ( parent ) {
/* Shortcut, but follow lock order as a fallback */
if ( ldap_pvt_thread_mutex_trylock( &mi->mi_cache_lock ) ) {
monitor_cache_release( mi, parent );
ldap_pvt_thread_mutex_lock( &mi->mi_cache_lock );
monitor_cache_lock( parent );
}
} else {
ldap_pvt_thread_mutex_lock( &mi->mi_cache_lock );
}
/* Allow database root be added */
if ( parent == NULL && mi->mi_cache != NULL ) {
pmc = ldap_avl_find( mi->mi_cache, &tmp_mc, monitor_cache_cmp );
if ( pmc == NULL ) {
goto done;
}
parent = pmc->mc_e;
monitor_cache_lock( parent );
}
rc = ldap_avl_insert( &mi->mi_cache, mc,
monitor_cache_cmp, monitor_cache_dup );
ldap_pvt_thread_rdwr_wunlock( &mi->mi_cache_rwlock );
if ( rc != LDAP_SUCCESS ) {
goto done;
}
if ( parent != NULL ) {
monitor_entry_t *mp = parent->e_private;
if ( mp->mp_children ) {
monitor_entry_t *tail;
monitor_cache_lock( mp->mp_last );
tail = mp->mp_last->e_private;
tail->mp_next = e;
monitor_cache_release( mi, mp->mp_last );
mp->mp_last = e;
} else {
mp->mp_children = mp->mp_last = e;
}
}
done:
if ( pmc != NULL ) {
monitor_cache_release( mi, parent );
}
ldap_pvt_thread_mutex_unlock( &mi->mi_cache_lock );
if ( rc != LDAP_SUCCESS ) {
ch_free( mc );
}
return rc;
}
......@@ -151,22 +203,18 @@ monitor_cache_get(
*ep = NULL;
tmp_mc.mc_ndn = *ndn;
retry:;
ldap_pvt_thread_rdwr_rlock( &mi->mi_cache_rwlock );
ldap_pvt_thread_mutex_lock( &mi->mi_cache_lock );
mc = ( monitor_cache_t * )ldap_avl_find( mi->mi_cache,
( caddr_t )&tmp_mc, monitor_cache_cmp );
if ( mc != NULL ) {
/* entry is returned with mutex locked */
if ( monitor_cache_trylock( mc->mc_e ) ) {
ldap_pvt_thread_rdwr_runlock( &mi->mi_cache_rwlock );
ldap_pvt_thread_yield();
goto retry;
}
monitor_cache_lock( mc->mc_e );
*ep = mc->mc_e;
}
ldap_pvt_thread_rdwr_runlock( &mi->mi_cache_rwlock );
ldap_pvt_thread_mutex_unlock( &mi->mi_cache_lock );
return ( *ep == NULL ? -1 : 0 );
}
......@@ -193,7 +241,7 @@ monitor_cache_remove(
dnParent( ndn, &pndn );
retry:;
ldap_pvt_thread_rdwr_wlock( &mi->mi_cache_rwlock );
ldap_pvt_thread_mutex_lock( &mi->mi_cache_lock );
tmp_mc.mc_ndn = *ndn;
mc = ( monitor_cache_t * )ldap_avl_find( mi->mi_cache,
......@@ -202,35 +250,38 @@ retry:;
if ( mc != NULL ) {
monitor_cache_t *pmc;
if ( monitor_cache_trylock( mc->mc_e ) ) {
ldap_pvt_thread_rdwr_wunlock( &mi->mi_cache_rwlock );
goto retry;
}
tmp_mc.mc_ndn = pndn;
pmc = ( monitor_cache_t * )ldap_avl_find( mi->mi_cache,
( caddr_t )&tmp_mc, monitor_cache_cmp );
if ( pmc != NULL ) {
monitor_entry_t *mp = (monitor_entry_t *)mc->mc_e->e_private,
*pmp = (monitor_entry_t *)pmc->mc_e->e_private;
Entry **entryp;
Entry **entryp, *prev = NULL;
if ( monitor_cache_trylock( pmc->mc_e ) ) {
monitor_cache_release( mi, mc->mc_e );
ldap_pvt_thread_rdwr_wunlock( &mi->mi_cache_rwlock );
goto retry;
}
monitor_cache_lock( pmc->mc_e );
for ( entryp = &pmp->mp_children; *entryp != NULL; ) {
monitor_entry_t *next = (monitor_entry_t *)(*entryp)->e_private;
monitor_cache_lock( *entryp );
if ( next == mp ) {
if ( mc->mc_e == pmp->mp_last ) {
pmp->mp_last = prev;
}
*entryp = next->mp_next;
entryp = NULL;
break;
}
if ( prev != NULL ) {
monitor_cache_release( mi, prev );
}
prev = *entryp;
entryp = &next->mp_next;
}
if ( prev ) {
monitor_cache_release( mi, prev );
}
if ( entryp != NULL ) {
Debug( LDAP_DEBUG_ANY,
......@@ -263,6 +314,7 @@ retry:;
ldap_pvt_thread_mutex_destroy( &mp->mp_mutex );
mp->mp_next = NULL;
mp->mp_children = NULL;
mp->mp_last = NULL;
}
}
......@@ -272,7 +324,7 @@ retry:;
}
}
ldap_pvt_thread_rdwr_wunlock( &mi->mi_cache_rwlock );
ldap_pvt_thread_mutex_unlock( &mi->mi_cache_lock );
return ( *ep == NULL ? -1 : 0 );
}
......
......@@ -48,7 +48,7 @@ monitor_subsys_conn_init(
monitor_subsys_t *ms )
{
monitor_info_t *mi;
Entry *e, **ep, *e_conn;
Entry *e, *e_conn;
monitor_entry_t *mp;
char buf[ BACKMONITOR_BUFSIZE ];
struct berval bv;
......@@ -69,8 +69,6 @@ monitor_subsys_conn_init(
}
mp = ( monitor_entry_t * )e_conn->e_private;
mp->mp_children = NULL;
ep = &mp->mp_children;
/*
* Max file descriptors
......@@ -106,7 +104,7 @@ monitor_subsys_conn_init(
| MONITOR_F_SUB | MONITOR_F_PERSISTENT;
mp->mp_flags &= ~MONITOR_F_VOLATILE_CH;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_conn ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_conn_init: "
"unable to add entry \"cn=Total,%s\"\n",
......@@ -114,9 +112,6 @@ monitor_subsys_conn_init(
return( -1 );
}
*ep = e;
ep = &mp->mp_next;
/*
* Total conns
*/
......@@ -145,7 +140,7 @@ monitor_subsys_conn_init(
| MONITOR_F_SUB | MONITOR_F_PERSISTENT;
mp->mp_flags &= ~MONITOR_F_VOLATILE_CH;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_conn ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_conn_init: "
"unable to add entry \"cn=Total,%s\"\n",
......@@ -153,9 +148,6 @@ monitor_subsys_conn_init(
return( -1 );
}
*ep = e;
ep = &mp->mp_next;
/*
* Current conns
*/
......@@ -184,7 +176,7 @@ monitor_subsys_conn_init(
| MONITOR_F_SUB | MONITOR_F_PERSISTENT;
mp->mp_flags &= ~MONITOR_F_VOLATILE_CH;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_conn ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_conn_init: "
"unable to add entry \"cn=Current,%s\"\n",
......@@ -192,9 +184,6 @@ monitor_subsys_conn_init(
return( -1 );
}
*ep = e;
ep = &mp->mp_next;
monitor_cache_release( mi, e_conn );
return( 0 );
......
......@@ -179,7 +179,7 @@ monitor_subsys_overlay_init_one(
mp_overlay->mp_info = ms;
mp_overlay->mp_flags = ms->mss_flags | MONITOR_F_SUB;
if ( monitor_cache_add( mi, e_overlay ) ) {
if ( monitor_cache_add( mi, e_overlay, e_database ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_overlay_init_one: "
"unable to add entry "
......@@ -189,8 +189,6 @@ monitor_subsys_overlay_init_one(
}
*ep_overlay = e_overlay;
ep_overlay = &mp_overlay->mp_next;
return 0;
}
......@@ -203,7 +201,8 @@ monitor_subsys_database_init_one(
monitor_subsys_t *ms_overlay,
struct berval *rdn,
Entry *e_database,
Entry ***epp )
struct slap_overinst *overlay,
Entry **ep )
{
char buf[ BACKMONITOR_BUFSIZE ];
int j;
......@@ -348,7 +347,7 @@ monitor_subsys_database_init_one(
| MONITOR_F_SUB;
mp->mp_private = be;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_database ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_subsys_database_init_one: "
"unable to add entry \"%s,%s\"\n",
......@@ -361,17 +360,20 @@ monitor_subsys_database_init_one(
#endif /* defined(LDAP_SLAPI) */
if ( oi != NULL ) {
Entry **ep_overlay = &mp->mp_children;
Entry *e_overlay;
slap_overinst *on = oi->oi_list;
for ( ; on; on = on->on_next ) {
monitor_subsys_overlay_init_one( mi, be,
ms, ms_overlay, on, e, ep_overlay );
ms, ms_overlay, on, e, &e_overlay );
if ( overlay == on ) {
*ep = e_overlay;
}
}
}
**epp = e;
*epp = &mp->mp_next;
if ( overlay == NULL ) {
*ep = e;
}
return 0;
}
......@@ -383,7 +385,7 @@ monitor_back_register_database_and_overlay(
struct berval *ndn_out )
{
monitor_info_t *mi;
Entry *e_database, **ep;
Entry *e_database, *e = NULL;
int i, rc;
monitor_entry_t *mp;
monitor_subsys_t *ms_backend,
......@@ -443,16 +445,17 @@ monitor_back_register_database_and_overlay(
return( -1 );
}
/* FIXME: It's only safe since we're paused */
mp = ( monitor_entry_t * )e_database->e_private;
for ( i = -1, ep = &mp->mp_children; *ep; i++ ) {
mp = ( monitor_entry_t * )(*ep)->e_private;
for ( i = -1, e = mp->mp_children; e; i++ ) {
mp = ( monitor_entry_t * )e->e_private;
assert( mp != NULL );
if ( mp->mp_private == be->bd_self ) {
rc = 0;
goto done;
}
ep = &mp->mp_next;
e = mp->mp_next;
}
bv.bv_val = buf;
......@@ -463,40 +466,15 @@ monitor_back_register_database_and_overlay(
}
rc = monitor_subsys_database_init_one( mi, be,
ms_database, ms_backend, ms_overlay, &bv, e_database, &ep );
ms_database, ms_backend, ms_overlay, &bv, e_database, on, &e );
if ( rc != 0 ) {
goto done;
}
/* database_init_one advanced ep past where we want.
* But it stored the entry we want in mp->mp_next.
*/
ep = &mp->mp_next;
done:;
monitor_cache_release( mi, e_database );
if ( rc == 0 && ndn_out && ep && *ep ) {
if ( on ) {
Entry *e_ov;
struct berval ov_type;
ber_str2bv( on->on_bi.bi_type, 0, 0, &ov_type );
mp = ( monitor_entry_t * ) (*ep)->e_private;
for ( e_ov = mp->mp_children; e_ov; ) {
Attribute *a = attr_find( e_ov->e_attrs, mi->mi_ad_monitoredInfo );
if ( a != NULL && bvmatch( &a->a_nvals[ 0 ], &ov_type ) ) {
*ndn_out = e_ov->e_nname;
break;
}
mp = ( monitor_entry_t * ) e_ov->e_private;
e_ov = mp->mp_next;
}
} else {
*ndn_out = (*ep)->e_nname;
}
if ( rc == 0 && ndn_out && e ) {
*ndn_out = e->e_nname;
}
return rc;
......@@ -525,9 +503,8 @@ monitor_subsys_database_init(
monitor_subsys_t *ms )
{
monitor_info_t *mi;
Entry *e_database, **ep;
Entry *e_database, *e;
int i, rc;
monitor_entry_t *mp;
monitor_subsys_t *ms_backend,
*ms_overlay;
struct berval bv;
......@@ -569,13 +546,9 @@ monitor_subsys_database_init(
(void)init_readOnly( mi, e_database, frontendDB->be_restrictops );
(void)init_restrictedOperation( mi, e_database, frontendDB->be_restrictops );
mp = ( monitor_entry_t * )e_database->e_private;
mp->mp_children = NULL;
ep = &mp->mp_children;
BER_BVSTR( &bv, "cn=Frontend" );
rc = monitor_subsys_database_init_one( mi, frontendDB,
ms, ms_backend, ms_overlay, &bv, e_database, &ep );
ms, ms_backend, ms_overlay, &bv, e_database, NULL, &e );
if ( rc != 0 ) {
return rc;
}
......@@ -591,7 +564,7 @@ monitor_subsys_database_init(
}
rc = monitor_subsys_database_init_one( mi, be,
ms, ms_backend, ms_overlay, &bv, e_database, &ep );
ms, ms_backend, ms_overlay, &bv, e_database, NULL, &e );
if ( rc != 0 ) {
return rc;
}
......
......@@ -479,9 +479,7 @@ monitor_back_register_entry(
assert( e->e_private == NULL );
if ( monitor_subsys_is_opened() ) {
Entry *e_parent = NULL,
*e_new = NULL,
**ep = NULL;
Entry *e_parent = NULL, *e_new = NULL;
struct berval pdn = BER_BVNULL;
monitor_entry_t *mp = NULL,
*mp_parent = NULL;
......@@ -550,14 +548,7 @@ monitor_back_register_entry(
}
mp->mp_cb = cb;
ep = &mp_parent->mp_children;
for ( ; *ep; ) {
mp_parent = ( monitor_entry_t * )(*ep)->e_private;
ep = &mp_parent->mp_next;
}
*ep = e_new;
if ( monitor_cache_add( mi, e_new ) ) {
if ( monitor_cache_add( mi, e_new, e_parent ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_back_register_entry(\"%s\"): "
"unable to add entry\n",
......@@ -656,9 +647,7 @@ monitor_back_register_entry_parent(
}
if ( monitor_subsys_is_opened() ) {
Entry *e_parent = NULL,
*e_new = NULL,
**ep = NULL;
Entry *e_parent = NULL, *e_new = NULL;
struct berval e_name = BER_BVNULL,
e_nname = BER_BVNULL;
monitor_entry_t *mp = NULL,
......@@ -750,14 +739,7 @@ monitor_back_register_entry_parent(
}
mp->mp_cb = cb;
ep = &mp_parent->mp_children;
for ( ; *ep; ) {
mp_parent = ( monitor_entry_t * )(*ep)->e_private;
ep = &mp_parent->mp_next;
}
*ep = e_new;
if ( monitor_cache_add( mi, e_new ) ) {
if ( monitor_cache_add( mi, e_new, e_parent ) ) {
Debug( LDAP_DEBUG_ANY,
"monitor_back_register_entry(\"%s\"): "
"unable to add entry\n",
......@@ -2162,7 +2144,7 @@ monitor_back_db_init(
/* NOTE: only one monitor database is allowed,
* so we use static storage */
ldap_pvt_thread_rdwr_init( &monitor_info.mi_cache_rwlock );
ldap_pvt_thread_mutex_init( &monitor_info.mi_cache_lock );
be->be_private = &monitor_info;
......@@ -2229,7 +2211,7 @@ monitor_back_db_open(
{
monitor_info_t *mi = (monitor_info_t *)be->be_private;
struct monitor_subsys_t **ms;
Entry *e, **ep, *root;
Entry *e, *root;
monitor_entry_t *mp;
int i;
struct berval bv, rdn = BER_BVC(SLAPD_MONITOR_DN);
......@@ -2318,9 +2300,8 @@ monitor_back_db_open(
return -1;
}
e->e_private = ( void * )mp;
ep = &mp->mp_children;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, NULL ) ) {
Debug( LDAP_DEBUG_ANY,
"unable to add entry \"%s\" to cache\n",
SLAPD_MONITOR_DN );
......@@ -2375,15 +2356,12 @@ monitor_back_db_open(
mp->mp_info = monitor_subsys[ i ];
mp->mp_flags = monitor_subsys[ i ]->mss_flags;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, root ) ) {
Debug( LDAP_DEBUG_ANY,
"unable to add entry \"%s\" to cache\n",
monitor_subsys[ i ]->mss_dn.bv_val );
return -1;
}
*ep = e;
ep = &mp->mp_next;
}
assert( be != NULL );
......@@ -2565,7 +2543,7 @@ monitor_back_db_destroy(
}
}
ldap_pvt_thread_rdwr_destroy( &monitor_info.mi_cache_rwlock );
ldap_pvt_thread_mutex_destroy( &monitor_info.mi_cache_lock );
be->be_private = NULL;
......
......@@ -34,7 +34,7 @@ monitor_subsys_listener_init(
)
{
monitor_info_t *mi;
Entry *e_listener, **ep;
Entry *e_listener;
int i;
monitor_entry_t *mp;
Listener **l;
......@@ -62,10 +62,6 @@ monitor_subsys_listener_init(
return( -1 );
}
mp = ( monitor_entry_t * )e_listener->e_private;
mp->mp_children = NULL;
ep = &mp->mp_children;
for ( i = 0; l[ i ]; i++ ) {
char buf[ BACKMONITOR_BUFSIZE ];
Entry *e;
......@@ -119,16 +115,13 @@ monitor_subsys_listener_init(
mp->mp_flags = ms->mss_flags
| MONITOR_F_SUB;
if ( monitor_cache_add( mi, e ) ) {
if ( monitor_cache_add( mi, e, e_listener ) ) {
Debug( LDAP_DEBUG_ANY,