Skip to content
Snippets Groups Projects
Commit 865f0db1 authored by Howard Chu's avatar Howard Chu
Browse files

Add checkpointing to save contextCSN periodically. Read contextCSN

on startup, save on shutdown.
parent 30333b98
Branches
Tags
No related merge requests found
......@@ -94,7 +94,10 @@ typedef struct syncmatches {
typedef struct syncprov_info_t {
syncops *si_ops;
struct berval si_ctxcsn; /* ldapsync context */
int si_gotcsn; /* is our ctxcsn up to date? */
int si_chkops; /* checkpointing info */
int si_chktime;
int si_numops; /* number of ops since last checkpoint */
time_t si_chklast; /* time of last checkpoint */
Avlnode *si_mods; /* entries being modified */
ldap_pvt_thread_mutex_t si_csn_mutex;
ldap_pvt_thread_mutex_t si_ops_mutex;
......@@ -471,61 +474,26 @@ syncprov_findbase( Operation *op, fbase_cookie *fc )
}
/* syncprov_findcsn:
* This function has three different purposes, but they all use a search
* This function has two different purposes, but they both use a search
* that filters on entryCSN so they're combined here.
* 1: when the current contextCSN is unknown (i.e., at server start time)
* and a syncrepl search has arrived with a cookie, we search for all entries
* with CSN >= the cookie CSN, and store the maximum as our contextCSN. Also,
* we expect to find the cookie CSN in the search results, and note if we did
* or not. If not, we assume the cookie is stale. (This may be too restrictive,
* notice case 2.)
*
* 2: when the current contextCSN is known and we have a sync cookie, we search
* 1: when the current contextCSN is known and we have a sync cookie, we search
* for one entry with CSN <= the cookie CSN. (Used to search for =.) If an
* entry is found, the cookie CSN is valid, otherwise it is stale. Case 1 is
* considered a special case of case 2, and both are generally called the
* "find CSN" task.
* entry is found, the cookie CSN is valid, otherwise it is stale.
*
* 3: during a refresh phase, we search for all entries with CSN <= the cookie
* 2: during a refresh phase, we search for all entries with CSN <= the cookie
* CSN, and generate Present records for them. We always collect this result
* in SyncID sets, even if there's only one match.
*/
#define FIND_CSN 1
#define FIND_PRESENT 2
typedef struct fcsn_cookie {
struct berval maxcsn;
int gotmatch;
} fcsn_cookie;
static int
findcsn_cb( Operation *op, SlapReply *rs )
{
slap_callback *sc = op->o_callback;
if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
/* If the private pointer is set, it points to an fcsn_cookie
* and we want to record the maxcsn and match state.
*/
if ( sc->sc_private ) {
int i;
fcsn_cookie *fc = sc->sc_private;
sync_control *srs = op->o_controls[sync_cid];
Attribute *a = attr_find(rs->sr_entry->e_attrs,
slap_schema.si_ad_entryCSN );
i = ber_bvcmp( &a->a_vals[0], srs->sr_state.ctxcsn );
if ( i == 0 ) fc->gotmatch = 1;
i = ber_bvcmp( &a->a_vals[0], &fc->maxcsn );
if ( i > 0 ) {
fc->maxcsn.bv_len = a->a_vals[0].bv_len;
strcpy(fc->maxcsn.bv_val, a->a_vals[0].bv_val );
}
} else {
/* Otherwise, if the private pointer is not set, we just
* want to know if any entry matched the filter.
*/
sc->sc_private = (void *)1;
}
sc->sc_private = (void *)1;
}
return LDAP_SUCCESS;
}
......@@ -588,7 +556,6 @@ syncprov_findcsn( Operation *op, int mode )
Filter cf;
AttributeAssertion eq;
int rc;
fcsn_cookie fcookie;
fpres_cookie pcookie;
int locked = 0;
sync_control *srs = op->o_controls[sync_cid];
......@@ -602,37 +569,13 @@ syncprov_findcsn( Operation *op, int mode )
fbuf.bv_val = buf;
if ( mode == FIND_CSN ) {
if ( !si->si_gotcsn ) {
/* If we don't know the current ctxcsn, find it */
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
locked = 1;
}
if ( !si->si_gotcsn ) {
cf.f_choice = LDAP_FILTER_GE;
fop.ors_attrsonly = 0;
fop.ors_attrs = csn_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
cb.sc_private = &fcookie;
fcookie.maxcsn.bv_val = cbuf;
fcookie.maxcsn.bv_len = 0;
fcookie.gotmatch = 0;
fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)", srs->sr_state.ctxcsn->bv_val );
} else {
if ( locked ) {
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
locked = 0;
}
cf.f_choice = LDAP_FILTER_LE;
fop.ors_attrsonly = 1;
fop.ors_attrs = slap_anlist_no_attrs;
fop.ors_slimit = 1;
cb.sc_private = NULL;
fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
}
fop.ors_attrsonly = 1;
fop.ors_attrs = slap_anlist_no_attrs;
fop.ors_slimit = 1;
cb.sc_private = NULL;
cb.sc_response = findcsn_cb;
} else if ( mode == FIND_PRESENT ) {
cf.f_choice = LDAP_FILTER_LE;
fop.ors_attrsonly = 0;
fop.ors_attrs = uuid_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
......@@ -642,12 +585,14 @@ syncprov_findcsn( Operation *op, int mode )
cb.sc_response = findpres_cb;
pcookie.num = 0;
pcookie.uuids = NULL;
fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
}
cf.f_choice = LDAP_FILTER_LE;
cf.f_ava = &eq;
cf.f_av_desc = slap_schema.si_ad_entryCSN;
cf.f_av_value = *srs->sr_state.ctxcsn;
cf.f_next = NULL;
fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
srs->sr_state.ctxcsn->bv_val );
fop.o_callback = &cb;
fop.ors_tlimit = SLAP_NO_LIMIT;
......@@ -659,16 +604,7 @@ syncprov_findcsn( Operation *op, int mode )
fop.o_bd->bd_info = (BackendInfo *)on;
if ( mode == FIND_CSN ) {
if ( !si->si_gotcsn ) {
strcpy(si->si_ctxcsnbuf, fcookie.maxcsn.bv_val);
si->si_ctxcsn.bv_len = fcookie.maxcsn.bv_len;
si->si_gotcsn = 1;
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
if ( fcookie.gotmatch ) return LDAP_SUCCESS;
} else {
if ( cb.sc_private ) return LDAP_SUCCESS;
}
if ( cb.sc_private ) return LDAP_SUCCESS;
} else if ( mode == FIND_PRESENT ) {
return LDAP_SUCCESS;
}
......@@ -994,6 +930,36 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
op->o_tmpfree(cb, op->o_tmpmemctx);
}
static void
syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
{
syncprov_info_t *si = on->on_bi.bi_private;
Modifications mod;
Operation opm;
struct berval bv[2];
BackendInfo *orig;
slap_callback cb = {0};
mod.sml_values = bv;
bv[1].bv_val = NULL;
bv[0] = si->si_ctxcsn;
mod.sml_nvalues = NULL;
mod.sml_desc = slap_schema.si_ad_contextCSN;
mod.sml_op = LDAP_MOD_REPLACE;
mod.sml_next = NULL;
cb.sc_response = slap_null_cb;
opm = *op;
opm.o_tag = LDAP_REQ_MODIFY;
opm.o_callback = &cb;
opm.orm_modlist = &mod;
opm.o_req_dn = op->o_bd->be_suffix[0];
opm.o_req_ndn = op->o_bd->be_nsuffix[0];
orig = opm.o_bd->bd_info;
opm.o_bd->bd_info = on->on_info->oi_orig;
opm.o_bd->be_modify( &opm, rs );
}
static int
syncprov_op_response( Operation *op, SlapReply *rs )
{
......@@ -1017,7 +983,23 @@ syncprov_op_response( Operation *op, SlapReply *rs )
strcpy( si->si_ctxcsnbuf, cbuf );
si->si_ctxcsn.bv_len = maxcsn.bv_len;
}
si->si_gotcsn = 1;
}
si->si_numops++;
if ( si->si_chkops || si->si_chktime ) {
int do_check=0;
if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
do_check = 1;
si->si_numops = 0;
}
if ( si->si_chktime &&
(op->o_time - si->si_chklast >= si->si_chktime )) {
do_check = 1;
si->si_chklast = op->o_time;
}
if ( do_check ) {
syncprov_checkpoint( op, rs, on );
}
}
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
......@@ -1467,18 +1449,6 @@ syncprov_op_search( Operation *op, SlapReply *rs )
}
}
/* If we didn't get a cookie and we don't know our contextcsn, try to
* find it anyway.
*/
if ( !gotstate && !si->si_gotcsn ) {
struct berval bv = BER_BVC("1"), *old;
old = srs->sr_state.ctxcsn;
srs->sr_state.ctxcsn = &bv;
syncprov_findcsn( op, FIND_CSN );
srs->sr_state.ctxcsn = old;
}
/* Append CSN range to search filter, save original filter
* for persistent search evaluation
*/
......@@ -1547,49 +1517,36 @@ syncprov_operational(
if ( rs->sr_entry &&
dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {
Attribute **ap;
for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ;
if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
Attribute *a, **ap = NULL;
Attribute *a = ch_malloc( sizeof(Attribute));
a->a_desc = slap_schema.si_ad_contextCSN;
a->a_vals = ch_malloc( 2 * sizeof(struct berval));
#if 0 /* causes a deadlock */
if ( !si->si_gotcsn ) {
sync_control sc, *old;
void *ctrls[SLAP_MAX_CIDS];
struct berval bv = BER_BVC("1");
if ( !op->o_controls ) {
memset(ctrls, 0, sizeof(ctrls));
op->o_controls = ctrls;
} else {
old = op->o_controls[sync_cid];
}
op->o_controls[sync_cid] = &sc;
sc.sr_state.ctxcsn = &bv;
syncprov_findcsn( op, FIND_CSN );
if ( op->o_controls == ctrls ) {
op->o_controls = NULL;
} else {
op->o_controls[sync_cid] = old;
}
for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
if ( a->a_desc == slap_schema.si_ad_contextCSN )
break;
}
#endif
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
a->a_vals[1].bv_val = NULL;
a->a_nvals = a->a_vals;
a->a_next = NULL;
a->a_flags = 0;
if ( !a ) {
for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next );
a = ch_malloc( sizeof(Attribute));
a->a_desc = slap_schema.si_ad_contextCSN;
a->a_vals = ch_malloc( 2 * sizeof(struct berval));
a->a_vals[1].bv_val = NULL;
a->a_nvals = a->a_vals;
a->a_next = NULL;
a->a_flags = 0;
*ap = a;
}
*ap = a;
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
if ( !ap ) {
strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf );
} else {
ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
}
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
}
}
return LDAP_SUCCESS;
......@@ -1625,6 +1582,95 @@ syncprov_db_config(
return SLAP_CONF_UNKNOWN;
}
/* Cheating - we have no thread pool context for these functions,
* so make one.
*/
typedef struct thread_keys {
void *key;
void *data;
ldap_pvt_thread_pool_keyfree_t *free;
} thread_keys;
/* A fake thread context */
static thread_keys thrctx[8];
/* Read any existing contextCSN from the underlying db.
* Then search for any entries newer than that. If no value exists,
* just generate it. Cache whatever result.
*/
static int
syncprov_db_open(
BackendDB *be
)
{
slap_overinst *on = (slap_overinst *) be->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op = (Operation *)opbuf;
Entry *e;
Attribute *a;
int rc;
memset(opbuf, 0, sizeof(opbuf));
op->o_hdr = (Opheader *)(op+1);
op->o_bd = be;
op->o_dn = be->be_rootdn;
op->o_ndn = be->be_rootndn;
op->o_threadctx = thrctx;
op->o_tmpmfuncs = &ch_mfuncs;
op->o_bd->bd_info = on->on_info->oi_orig;
rc = be_entry_get_rw( op, be->be_nsuffix, NULL,
slap_schema.si_ad_contextCSN, 0, &e );
if ( e ) {
a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
if ( a ) {
si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len;
if ( si->si_ctxcsn.bv_len >= sizeof(si->si_ctxcsnbuf ))
si->si_ctxcsn.bv_len = sizeof(si->si_ctxcsnbuf)-1;
strncpy( si->si_ctxcsnbuf, a->a_nvals[0].bv_val,
si->si_ctxcsn.bv_len );
si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
}
be_entry_release_r( op, e );
}
op->o_bd->bd_info = (BackendInfo *)on;
return 0;
}
/* Write the current contextCSN into the underlying db.
*/
static int
syncprov_db_close(
BackendDB *be
)
{
slap_overinst *on = (slap_overinst *) be->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
int i;
if ( si->si_numops ) {
Connection conn;
char opbuf[OPERATION_BUFFER_SIZE];
Operation *op = (Operation *)opbuf;
SlapReply rs = {REP_RESULT};
connection_fake_init( &conn, op, thrctx );
op->o_bd = be;
op->o_dn = be->be_rootdn;
op->o_ndn = be->be_rootndn;
syncprov_checkpoint( op, &rs, on );
}
for ( i=0; thrctx[i].key; i++) {
if ( thrctx[i].free )
thrctx[i].free( thrctx[i].key, thrctx[i].data );
}
return 0;
}
static int
syncprov_db_init(
BackendDB *be
......@@ -1637,6 +1683,7 @@ syncprov_db_init(
on->on_bi.bi_private = si;
ldap_pvt_thread_mutex_init( &si->si_csn_mutex );
ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;
csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
......@@ -1657,6 +1704,7 @@ syncprov_db_destroy(
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
if ( si ) {
ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
ch_free( si );
......@@ -1790,6 +1838,8 @@ syncprov_init()
syncprov.on_bi.bi_db_init = syncprov_db_init;
syncprov.on_bi.bi_db_config = syncprov_db_config;
syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
syncprov.on_bi.bi_db_open = syncprov_db_open;
syncprov.on_bi.bi_db_close = syncprov_db_close;
syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment