Commit f6a61ab7 authored by Howard Chu's avatar Howard Chu Committed by Quanah Gibson-Mount
Browse files

ITS#8958 back-mdb: checkpoint online indexer

Save old/new indexmasks to allow processing to resume if slapd is
stopped and restarted. Save last entryID indexed to allow resume
after restart or pool pause.
parent 5ad6ab35
......@@ -33,7 +33,8 @@ LDAP_BEGIN_DECL
#define MDB_DN2ID 1
#define MDB_ID2ENTRY 2
#define MDB_ID2VAL 3
#define MDB_NDB 4
#define MDB_IDXCKP 4
#define MDB_NDB 5
/* The default search IDL stack cache depth */
#define DEFAULT_SEARCH_STACK_DEPTH 16
......@@ -122,6 +123,7 @@ struct mdb_info {
#define mi_dn2id mi_dbis[MDB_DN2ID]
#define mi_ad2id mi_dbis[MDB_AD2ID]
#define mi_id2val mi_dbis[MDB_ID2VAL]
#define mi_idxckp mi_dbis[MDB_IDXCKP]
typedef struct mdb_op_info {
OpExtra moi_oe;
......
......@@ -210,23 +210,52 @@ mdb_online_index( void *ctx, void *arg )
ID id;
Entry *e;
int rc, getnext = 1;
int i;
int i, first = 1;
int intr = 0;
connection_fake_init( &conn, &opbuf, ctx );
op = &opbuf.ob_op;
op->o_bd = be;
id = 1;
key.mv_size = sizeof(ID);
while ( 1 ) {
if ( slapd_shutdown )
break;
rc = mdb_txn_begin( mdb->mi_dbenv, NULL, 0, &txn );
if ( rc )
break;
/* pick up where we left off */
if ( first ) {
MDB_val k0;
unsigned short s = 0;
first = 0;
k0.mv_size = sizeof(s);
k0.mv_data = &s;
rc = mdb_get( txn, mdb->mi_idxckp, &k0, &data );
if ( rc ) {
mdb_txn_abort( txn );
break;
}
memcpy( &id, data.mv_data, sizeof( id ));
}
/* Save our stopping point */
if ( slapd_shutdown || ldap_pvt_thread_pool_pausequery( &connection_pool )) {
MDB_val k0;
unsigned short s = 0;
k0.mv_size = sizeof(s);
k0.mv_data = &s;
data.mv_data = &id;
data.mv_size = sizeof( id );
mdb_put( txn, mdb->mi_idxckp, &k0, &data, 0 );
mdb_txn_commit( txn );
intr = 1;
break;
}
rc = mdb_cursor_open( txn, mdb->mi_id2entry, &curs );
if ( rc ) {
mdb_txn_abort( txn );
......@@ -276,25 +305,137 @@ mdb_online_index( void *ctx, void *arg )
getnext = 1;
}
for ( i = 0; i < mdb->mi_nattrs; i++ ) {
if ( mdb->mi_attrs[ i ]->ai_indexmask & MDB_INDEX_DELETING
|| mdb->mi_attrs[ i ]->ai_newmask == 0 )
{
continue;
/* all done */
if ( !intr ) {
for ( i = 0; i < mdb->mi_nattrs; i++ ) {
if ( mdb->mi_attrs[ i ]->ai_indexmask & MDB_INDEX_DELETING
|| mdb->mi_attrs[ i ]->ai_newmask == 0 )
{
continue;
}
mdb->mi_attrs[ i ]->ai_indexmask = mdb->mi_attrs[ i ]->ai_newmask;
mdb->mi_attrs[ i ]->ai_newmask = 0;
}
/* zero out checkpoint DB */
rc = mdb_txn_begin( mdb->mi_dbenv, NULL, 0, &txn );
if ( !rc ) {
mdb_drop( txn, mdb->mi_idxckp, 0 );
mdb_txn_commit( txn );
}
mdb->mi_attrs[ i ]->ai_indexmask = mdb->mi_attrs[ i ]->ai_newmask;
mdb->mi_attrs[ i ]->ai_newmask = 0;
}
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
mdb->mi_index_task = NULL;
ldap_pvt_runqueue_remove( &slapd_rq, rtask );
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, rtask ))
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
if ( intr && !slapd_shutdown ) {
/* on pause, resched to run again immediately */
time_t t = rtask->interval.tv_sec;
rtask->interval.tv_sec = 0;
ldap_pvt_runqueue_resched( &slapd_rq, rtask, 0 );
rtask->interval.tv_sec = t;
} else if ( mdb->mi_index_task ) {
mdb->mi_index_task = NULL;
ldap_pvt_runqueue_remove( &slapd_rq, rtask );
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
}
static int
mdb_setup_indexer( struct mdb_info *mdb )
{
MDB_txn *txn;
MDB_cursor *curs;
MDB_val key, data;
int i, rc;
unsigned short s;
rc = mdb_txn_begin( mdb->mi_dbenv, NULL, 0, &txn );
if ( rc )
return rc;
rc = mdb_cursor_open( txn, mdb->mi_idxckp, &curs );
if ( rc ) {
mdb_txn_abort( txn );
return rc;
}
key.mv_size = sizeof( s );
key.mv_data = &s;
/* set indexer task to start at first entry */
{
ID id = 0;
s = 0; /* key 0 records next entryID to index */
data.mv_size = sizeof( ID );
data.mv_data = &id;
rc = mdb_cursor_put( curs, &key, &data, 0 );
if ( rc )
goto done;
}
/* record current and new index masks for all new index definitions */
{
slap_mask_t mask[2];
data.mv_size = sizeof(mask);
data.mv_data = mask;
for ( i = 0; i < mdb->mi_nattrs; i++ ) {
if ( !mdb->mi_attrs[i]->ai_newmask ) continue;
s = mdb->mi_adxs[ mdb->mi_attrs[i]->ai_desc->ad_index ];
mask[0] = mdb->mi_attrs[i]->ai_indexmask;
mask[1] = mdb->mi_attrs[i]->ai_newmask;
rc = mdb_cursor_put( curs, &key, &data, 0 );
if ( rc )
goto done;
}
}
done:
mdb_cursor_close( curs );
if ( !rc )
mdb_txn_commit( txn );
else
mdb_txn_abort( txn );
return rc;
}
void
mdb_resume_index( BackendDB *be, MDB_txn *txn )
{
struct mdb_info *mdb = be->be_private;
MDB_cursor *curs;
MDB_val key, data;
int i, rc;
unsigned short *s;
slap_mask_t *mask;
AttributeDescription *ad;
rc = mdb_cursor_open( txn, mdb->mi_idxckp, &curs );
if ( rc )
return;
while(( rc = mdb_cursor_get( curs, &key, &data, MDB_NEXT )) == 0) {
s = key.mv_data;
if ( !*s )
continue;
ad = mdb->mi_ads[*s];
for ( i=0; i<mdb->mi_nattrs; i++) {
if (mdb->mi_attrs[i]->ai_desc == ad ) {
mask = data.mv_data;
mdb->mi_attrs[i]->ai_indexmask = mask[0];
mdb->mi_attrs[i]->ai_newmask = mask[1];
break;
}
}
}
mdb_cursor_close( curs );
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
mdb->mi_index_task = ldap_pvt_runqueue_insert( &slapd_rq, 36000,
mdb_online_index, be,
LDAP_XSTRING(mdb_online_index), be->be_suffix[0].bv_val );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
/* Cleanup loose ends after Modify completes */
static int
mdb_cf_cleanup( ConfigArgs *c )
......@@ -328,6 +469,7 @@ mdb_cf_cleanup( ConfigArgs *c )
rc = mdb_attr_dbs_open( c->be, NULL, &c->reply );
if ( rc )
rc = LDAP_OTHER;
mdb_setup_indexer( mdb );
}
return rc;
}
......
......@@ -32,6 +32,7 @@ static const struct berval mdmi_databases[] = {
BER_BVC("dn2i"),
BER_BVC("id2e"),
BER_BVC("id2v"),
BER_BVC("ixck"),
BER_BVNULL
};
......@@ -286,6 +287,13 @@ mdb_db_open( BackendDB *be, ConfigReply *cr )
}
}
if ( slapMode & SLAP_SERVER_MODE ) {
MDB_stat st;
rc = mdb_stat( txn, mdb->mi_idxckp, &st );
if ( st.ms_entries )
mdb_resume_index( be, txn );
}
rc = mdb_txn_commit(txn);
if ( rc != 0 ) {
Debug( LDAP_DEBUG_ANY,
......@@ -321,11 +329,21 @@ mdb_db_close( BackendDB *be, ConfigReply *cr )
mdb->mi_flags &= ~MDB_IS_OPEN;
if( mdb->mi_dbenv ) {
mdb_reader_flush( mdb->mi_dbenv );
/* remove indexer task */
if ( mdb->mi_index_task ) {
struct re_s *re = mdb->mi_index_task;
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
mdb->mi_index_task = NULL;
/* can never actually be running at this point, but paranoia */
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, re ) )
ldap_pvt_runqueue_stoptask( &slapd_rq, re );
ldap_pvt_runqueue_remove( &slapd_rq, re );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
if ( mdb->mi_dbenv ) {
mdb_reader_flush( mdb->mi_dbenv );
if ( mdb->mi_dbis[0] ) {
int i;
......
......@@ -64,6 +64,7 @@ void mdb_ad_unwind( struct mdb_info *mdb, int prev_ads );
*/
int mdb_back_init_cf( BackendInfo *bi );
void mdb_resume_index( BackendDB *be, MDB_txn *txn );
/*
* dn2entry.c
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment