Commit 0af878ac authored by Jong Hyuk Choi's avatar Jong Hyuk Choi
Browse files

syncrepl consistency patch: utilize BDB txn rollback

parent 38c7e86d
......@@ -369,7 +369,10 @@ retry: /* transaction retry */
ldap_pvt_thread_rdwr_wlock( &bdb->bi_pslist_rwlock );
LDAP_LIST_FOREACH( ps_list, &bdb->bi_psearch_list, o_ps_link ) {
rc = bdb_psearch( op, rs, ps_list, e, LDAP_PSEARCH_BY_PREDELETE );
if ( rc ) {
if ( rc == LDAP_BUSY && op->o_ps_send_wait ) {
ldap_pvt_thread_rdwr_wunlock( &bdb->bi_pslist_rwlock );
goto retry;
} else if ( rc ) {
Debug( LDAP_DEBUG_TRACE,
"bdb_delete: persistent search failed (%d,%d)\n",
rc, rs->sr_err, 0 );
......
......@@ -450,7 +450,10 @@ retry: /* transaction retry */
ldap_pvt_thread_rdwr_wlock( &bdb->bi_pslist_rwlock );
LDAP_LIST_FOREACH ( ps_list, &bdb->bi_psearch_list, o_ps_link ) {
rc = bdb_psearch(op, rs, ps_list, e, LDAP_PSEARCH_BY_PREMODIFY );
if ( rc ) {
if ( rc == LDAP_BUSY && op->o_ps_send_wait ) {
ldap_pvt_thread_rdwr_wunlock( &bdb->bi_pslist_rwlock );
goto retry;
} else if ( rc ) {
Debug( LDAP_DEBUG_TRACE,
"bdb_modify: persistent search failed (%d,%d)\n",
rc, rs->sr_err, 0 );
......
......@@ -428,12 +428,14 @@ int bdb_search( Operation *op, SlapReply *rs )
return rc;
}
#define BDB_PSEARCH_MAX_WAIT 3
int bdb_psearch( Operation *op, SlapReply *rs, Operation *sop,
Entry *ps_e, int ps_type )
{
int rc;
struct pc_entry *pce = NULL;
struct pc_entry *p = NULL;
int num_retries = 0;
op->ors_post_search_id = NOID;
......@@ -441,41 +443,55 @@ int bdb_psearch( Operation *op, SlapReply *rs, Operation *sop,
case LDAP_PSEARCH_BY_PREMODIFY:
case LDAP_PSEARCH_BY_PREDELETE:
if ( sop->o_refresh_in_progress ) {
pce = (struct pc_entry *) ch_calloc( 1, sizeof( struct pc_entry ));
pce->pc_id = ps_e->e_id;
ldap_pvt_thread_mutex_lock( &sop->o_pcmutex );
if ( LDAP_TAILQ_EMPTY( &sop->o_ps_pre_candidates )) {
LDAP_TAILQ_INSERT_HEAD( &sop->o_ps_pre_candidates, pce, pc_link );
} else {
LDAP_TAILQ_FOREACH( p, &sop->o_ps_pre_candidates, pc_link ) {
if ( p->pc_id > pce->pc_id )
break;
}
if ( p ) {
LDAP_TAILQ_INSERT_BEFORE( p, pce, pc_link );
if ( !op->o_ps_send_wait ) {
if ( sop->o_refresh_in_progress ) {
pce = (struct pc_entry *) ch_calloc(
1, sizeof( struct pc_entry ));
pce->pc_id = ps_e->e_id;
ldap_pvt_thread_mutex_lock( &sop->o_pcmutex );
if ( LDAP_TAILQ_EMPTY( &sop->o_ps_pre_candidates )) {
LDAP_TAILQ_INSERT_HEAD(
&sop->o_ps_pre_candidates, pce, pc_link );
} else {
LDAP_TAILQ_INSERT_TAIL(
&sop->o_ps_pre_candidates,
pce, pc_link );
LDAP_TAILQ_FOREACH( p,
&sop->o_ps_pre_candidates, pc_link ) {
if ( p->pc_id > pce->pc_id )
break;
}
if ( p ) {
LDAP_TAILQ_INSERT_BEFORE( p, pce, pc_link );
} else {
LDAP_TAILQ_INSERT_TAIL(
&sop->o_ps_pre_candidates,
pce, pc_link );
}
}
ldap_pvt_thread_mutex_unlock( &sop->o_pcmutex );
} else {
rc = bdb_do_search( op, rs, sop, ps_e, ps_type );
return rc;
}
ldap_pvt_thread_mutex_unlock( &sop->o_pcmutex );
} else {
rc = bdb_do_search( op, rs, sop, ps_e, ps_type );
return rc;
pce = op->o_ps_send_wait;
}
/* Wait until refresh search send the entry */
while ( !pce->pc_sent ) {
if ( sop->o_refresh_in_progress ) {
if ( num_retries == BDB_PSEARCH_MAX_WAIT ) {
op->o_ps_send_wait = pce;
return LDAP_BUSY;
}
ldap_pvt_thread_yield();
bdb_trans_backoff( ++num_retries );
} else {
break;
}
}
op->o_ps_send_wait = NULL;
if ( !sop->o_refresh_in_progress && !pce->pc_sent ) {
/* refresh ended without processing pce */
/* need to perform psearch for ps_e */
......@@ -511,7 +527,6 @@ int bdb_psearch( Operation *op, SlapReply *rs, Operation *sop,
!LDAP_TAILQ_EMPTY( &sop->o_ps_post_candidates )) {
pce = (struct pc_entry *) ch_calloc( 1, sizeof( struct pc_entry ));
pce->pc_id = ps_e->e_id;
// pce->ps_type = ps_type;
ber_dupbv( &pce->pc_csn, &op->o_sync_csn );
if ( ps_type == LDAP_PSEARCH_BY_DELETE ) {
Attribute *a;
......
......@@ -2522,7 +2522,7 @@ parse_syncrepl_line(
#else /* HAVE_CYRUS_SASL */
fprintf( stderr, "Error: parse_syncrepl_line: "
"not compiled with SASL support\n" );
return 1;
return -1;
#endif /* HAVE_CYRUS_SASL */
} else {
si->si_bindmethod = -1;
......@@ -2599,7 +2599,7 @@ parse_syncrepl_line(
ber_str2bv( val, 0, 0, &bv );
if ( dnNormalize( 0, NULL, NULL, &bv, &si->si_base, NULL )) {
fprintf( stderr, "Invalid base DN \"%s\"\n", val );
return 1;
return -1;
}
} else if ( !strncasecmp( cargv[ i ], SCOPESTR "=",
STRLENOF( SCOPESTR "=" ) ) )
......@@ -2620,7 +2620,7 @@ parse_syncrepl_line(
} else {
fprintf( stderr, "Error: parse_syncrepl_line: "
"unknown scope \"%s\"\n", val);
return 1;
return -1;
}
} else if ( !strncasecmp( cargv[ i ], ATTRSONLYSTR "=",
STRLENOF( ATTRSONLYSTR "=" ) ) )
......@@ -2699,7 +2699,7 @@ parse_syncrepl_line(
} else {
fprintf( stderr, "Error: parse_syncrepl_line: "
"unknown sync type \"%s\"\n", val);
return 1;
return -1;
}
} else if ( !strncasecmp( cargv[ i ], INTERVALSTR "=",
STRLENOF( INTERVALSTR "=" ) ) )
......@@ -2718,21 +2718,21 @@ parse_syncrepl_line(
if ( hstr == NULL ) {
fprintf( stderr, "Error: parse_syncrepl_line: "
"invalid interval \"%s\"\n", val );
return 1;
return -1;
}
*hstr++ = '\0';
mstr = strchr( hstr, ':' );
if ( mstr == NULL ) {
fprintf( stderr, "Error: parse_syncrepl_line: "
"invalid interval \"%s\"\n", val );
return 1;
return -1;
}
*mstr++ = '\0';
sstr = strchr( mstr, ':' );
if ( sstr == NULL ) {
fprintf( stderr, "Error: parse_syncrepl_line: "
"invalid interval \"%s\"\n", val );
return 1;
return -1;
}
*sstr++ = '\0';
......@@ -2745,7 +2745,7 @@ parse_syncrepl_line(
( ss > 60 ) || ( ss < 0 ) || ( dd < 0 )) {
fprintf( stderr, "Error: parse_syncrepl_line: "
"invalid interval \"%s\"\n", val );
return 1;
return -1;
}
si->si_interval = (( dd * 24 + hh ) * 60 + mm ) * 60 + ss;
}
......@@ -2753,7 +2753,7 @@ parse_syncrepl_line(
fprintf( stderr, "Error: parse_syncrepl_line: "
"invalid interval \"%ld\"\n",
(long) si->si_interval);
return 1;
return -1;
}
} else if ( !strncasecmp( cargv[ i ], RETRYSTR "=",
STRLENOF( RETRYSTR "=" ) ) )
......@@ -2820,6 +2820,7 @@ parse_syncrepl_line(
} else {
fprintf( stderr, "Error: parse_syncrepl_line: "
"unknown keyword \"%s\"\n", cargv[ i ] );
return -1;
}
}
......
......@@ -2219,6 +2219,7 @@ typedef struct slap_op {
LDAP_TAILQ_HEAD(pc_pre, pc_entry) o_ps_pre_candidates;
LDAP_TAILQ_HEAD(pc_post, pc_entry) o_ps_post_candidates;
Avlnode *o_psearch_finished;
struct pc_entry *o_ps_send_wait;
ldap_pvt_thread_mutex_t o_pcmutex;
AuthorizationInformation o_authz;
......
......@@ -1195,34 +1195,31 @@ syncrepl_entry(
struct berval org_ndn = BER_BVNULL;
int org_managedsait;
Debug( LDAP_DEBUG_SYNC, "%s: %s",
"syncrepl_entry",
"LDAP_RES_SEARCH_ENTRY", 0 );
switch( syncstate ) {
case LDAP_SYNC_PRESENT:
Debug( LDAP_DEBUG_SYNC, "%s: %s",
Debug( LDAP_DEBUG_SYNC, "%s: %s\n",
"syncrepl_entry",
"LDAP_SYNC_PRESENT", "\n" );
"LDAP_RES_SEARCH_ENTRY(LDAP_SYNC_PRESENT)", 0 );
break;
case LDAP_SYNC_ADD:
Debug( LDAP_DEBUG_SYNC, "%s: %s",
Debug( LDAP_DEBUG_SYNC, "%s: %s\n",
"syncrepl_entry",
"LDAP_SYNC_ADD", "\n" );
"LDAP_RES_SEARCH_ENTRY(LDAP_SYNC_ADD)", 0 );
break;
case LDAP_SYNC_DELETE:
Debug( LDAP_DEBUG_SYNC, "%s: %s",
Debug( LDAP_DEBUG_SYNC, "%s: %s\n",
"syncrepl_entry",
"LDAP_SYNC_DELETE", "\n" );
"LDAP_RES_SEARCH_ENTRY(LDAP_SYNC_DELETE)", 0 );
break;
case LDAP_SYNC_MODIFY:
Debug( LDAP_DEBUG_SYNC, "%s: %s",
Debug( LDAP_DEBUG_SYNC, "%s: %s\n",
"syncrepl_entry",
"LDAP_SYNC_MODIFY", "\n" );
"LDAP_RES_SEARCH_ENTRY(LDAP_SYNC_MODIFY)", 0 );
break;
default:
Debug( LDAP_DEBUG_ANY, "%s: %s",
Debug( LDAP_DEBUG_ANY, "%s: %s\n",
"syncrepl_entry",
"UNKNONW syncstate", "\n" );
"LDAP_RES_SEARCH_ENTRY(UNKNOWN syncstate)", 0 );
}
if (( syncstate == LDAP_SYNC_PRESENT || syncstate == LDAP_SYNC_ADD )) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment