Skip to content
Snippets Groups Projects
Commit 4e530bf5 authored by Howard Chu's avatar Howard Chu
Browse files

First pass at persist, not working

parent 02e171e8
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ typedef struct syncops {
struct berval s_base; /* ndn of search base */
ID s_eid; /* entryID of search base */
Operation *s_op; /* search op */
Filter *s_filter;
int s_flags; /* search status */
} syncops;
......@@ -55,15 +56,19 @@ typedef struct syncprov_info_t {
typedef struct opcookie {
slap_overinst *son;
syncmatches *smatches;
struct berval suuid;
struct berval sdn; /* DN of entry, for deletes */
struct berval sndn;
struct berval suuid; /* UUID of entry */
struct berval sctxcsn;
int sreference; /* Is the entry a reference? */
} opcookie;
typedef struct findcookie {
typedef struct fbase_cookie {
struct berval *fdn;
syncops *fss;
int fbase;
int fsuffix;
} findcookie;
} fbase_cookie;
static AttributeName csn_anlist[2];
static AttributeName uuid_anlist[2];
......@@ -85,8 +90,16 @@ findbase_cb( Operation *op, SlapReply *rs )
slap_callback *sc = op->o_callback;
if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
findcookie *fc = sc->sc_private;
if ( rs->sr_entry->e_id == fc->fss->s_eid &&
fbase_cookie *fc = sc->sc_private;
/* If no entryID, we're looking for the first time.
* Just store whatever we got.
*/
if ( fc->fss->s_eid == NOID ) {
fc->fbase = 1;
fc->fss->s_eid = rs->sr_entry->e_id;
ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname );
} else if ( rs->sr_entry->e_id == fc->fss->s_eid &&
dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) {
fc->fbase = 1;
fc->fsuffix = dnIsSuffix( fc->fdn, &rs->sr_entry->e_nname );
......@@ -96,9 +109,10 @@ findbase_cb( Operation *op, SlapReply *rs )
}
static int
syncprov_findbase( Operation *op, syncops *ss, findcookie *fc )
syncprov_findbase( Operation *op, syncops *ss, fbase_cookie *fc )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
opcookie *opc = op->o_callback->sc_private;
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
slap_callback cb = {0};
......@@ -111,6 +125,7 @@ syncprov_findbase( Operation *op, syncops *ss, findcookie *fc )
cb.sc_response = findbase_cb;
cb.sc_private = fc;
fop.o_sync_mode = 0;
fop.o_callback = &cb;
fop.o_tag = LDAP_REQ_SEARCH;
fop.ors_scope = LDAP_SCOPE_BASE;
......@@ -124,7 +139,9 @@ syncprov_findbase( Operation *op, syncops *ss, findcookie *fc )
fop.o_req_ndn = ss->s_op->o_req_ndn;
fop.o_bd->bd_info = on->on_info->oi_orig;
rc = fop.o_bd->be_search( &fop, &frs );
fop.o_bd->bd_info = (BackendInfo *)on;
if ( fc->fbase ) return LDAP_SUCCESS;
......@@ -291,7 +308,7 @@ syncprov_findcsn( Operation *op, int mode )
fop.o_bd->bd_info = on->on_info->oi_orig;
rc = fop.o_bd->be_search( &fop, &frs );
fop.o_bd->bd_info = on;
fop.o_bd->bd_info = (BackendInfo *)on;
if ( mode == FIND_CSN ) {
if ( !si->si_gotcsn ) {
......@@ -311,23 +328,93 @@ syncprov_findcsn( Operation *op, int mode )
return LDAP_NO_SUCH_OBJECT;
}
static int
syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry *e, int mode )
{
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
SlapReply rs = { REP_SEARCH };
LDAPControl *ctrls[2];
struct berval cookie;
Entry e_uuid = {0};
Attribute a_uuid = {0};
Operation sop = *so->s_op;
sop.o_tmpmemctx = op->o_tmpmemctx;
ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn,
sop.o_sync_state.sid, sop.o_sync_state.rid );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
a_uuid.a_nvals = &opc->suuid;
rs.sr_err = slap_build_sync_state_ctrl( &sop, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
rs.sr_entry = e;
rs.sr_ctrls = ctrls;
switch( mode ) {
case LDAP_SYNC_ADD:
if ( opc->sreference ) {
rs.sr_ref = get_entry_referrals( &sop, e );
send_search_reference( &sop, &rs );
ber_bvarray_free( rs.sr_ref );
break;
}
/* fallthru */
case LDAP_SYNC_MODIFY:
rs.sr_attrs = sop.ors_attrs;
send_search_entry( &sop, &rs );
break;
case LDAP_SYNC_DELETE:
e_uuid.e_attrs = NULL;
e_uuid.e_name = opc->sdn;
e_uuid.e_nname = opc->sndn;
rs.sr_entry = &e_uuid;
if ( opc->sreference ) {
send_search_entry( &sop, &rs );
} else {
struct berval bv;
bv.bv_val = NULL;
bv.bv_len = 0;
rs.sr_ref = &bv;
send_search_reference( &sop, &rs );
}
break;
default:
assert(0);
}
free( rs.sr_ctrls[0] );
return rs.sr_err;
}
static void
syncprov_matchops( Operation *op, opcookie *opc, int saveit )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
findcookie fc;
fbase_cookie fc;
syncops *ss;
Entry *e;
Attribute *a;
int rc;
fc.fdn = &op->o_req_ndn;
rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e );
if ( rc ) return;
if ( op->o_tag != LDAP_REQ_ADD ) {
rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e );
if ( rc ) return;
} else {
e = op->ora_e;
}
if ( saveit ) {
ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
opc->sreference = is_entry_referral( e );
}
if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a )
ber_dupbv_x( &opc->suuid, &a->a_vals[0], op->o_tmpmemctx );
......@@ -361,7 +448,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
}
/* check if current o_req_dn is in scope and matches filter */
if ( fc.fsuffix && test_filter( op, e, ss->s_op->ors_filter ) ==
if ( fc.fsuffix && test_filter( op, e, ss->s_filter ) ==
LDAP_COMPARE_TRUE ) {
if ( saveit ) {
sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
......@@ -370,23 +457,27 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
opc->smatches = sm;
} else {
/* if found send UPDATE else send ADD */
if ( found ) {
} else {
}
syncprov_sendresp( op, opc, ss, e,
found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
}
} else if ( !saveit && found ) {
/* send DELETE */
syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE );
}
}
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
be_entry_release_r( op, e );
if ( op->o_tag != LDAP_REQ_ADD ) {
op->o_bd->bd_info = (BackendInfo *)on->on_info;
be_entry_release_r( op, e );
op->o_bd->bd_info = (BackendInfo *)on;
}
}
static int
syncprov_op_cleanup( Operation *op, SlapReply *rs )
{
slap_callback *cb = op->o_callback;
opcookie *opc = (opcookie *)(cb+1);
opcookie *opc = cb->sc_private;
syncmatches *sm, *snext;
for (sm = opc->smatches; sm; sm=snext) {
......@@ -400,8 +491,7 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
static int
syncprov_op_response( Operation *op, SlapReply *rs )
{
slap_callback *cb = op->o_callback;
opcookie *opc = (opcookie *)(cb+1);
opcookie *opc = op->o_callback->sc_private;
slap_overinst *on = opc->son;
syncprov_info_t *si = on->on_bi.bi_private;
syncmatches *sm;
......@@ -409,19 +499,25 @@ syncprov_op_response( Operation *op, SlapReply *rs )
if ( rs->sr_err == LDAP_SUCCESS )
{
struct berval maxcsn;
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
void *memctx = op->o_tmpmemctx;
cbuf[0] = '\0';
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
op->o_tmpmemctx = NULL;
slap_get_commit_csn( op, &maxcsn );
op->o_tmpmemctx = memctx;
if ( maxcsn.bv_val ) {
strcpy( cbuf, maxcsn.bv_val );
free( si->si_ctxcsn.bv_val );
si->si_ctxcsn = maxcsn;
si->si_gotcsn = 1;
}
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
opc->sctxcsn.bv_len = maxcsn.bv_len;
opc->sctxcsn.bv_val = cbuf;
if ( si->si_ops ) {
switch(op->o_tag) {
case LDAP_REQ_ADD:
......@@ -435,6 +531,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
* send DELETE msg
*/
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
syncprov_sendresp( op, opc, sm->sm_op, NULL,
LDAP_SYNC_DELETE );
}
break;
}
......@@ -536,24 +634,33 @@ syncprov_op_extended( Operation *op, SlapReply *rs )
return SLAP_CB_CONTINUE;
}
typedef struct searchstate {
slap_overinst *ss_on;
syncops *ss_so;
int ss_done;
} searchstate;
static int
syncprov_search_cleanup( Operation *op, SlapReply *rs )
{
searchstate *ss = op->o_callback->sc_private;
if ( rs->sr_ctrls ) {
free( rs->sr_ctrls[0] );
op->o_tmpfree( rs->sr_ctrls, op->o_tmpmemctx );
}
if ( ss->ss_done )
op->o_sync_mode = SLAP_SYNC_REFRESH_AND_PERSIST;
return 0;
}
static int
syncprov_search_response( Operation *op, SlapReply *rs )
{
slap_callback *cb = op->o_callback;
slap_overinst *on = cb->sc_private;
searchstate *ss = op->o_callback->sc_private;
slap_overinst *on = ss->ss_on;
syncprov_info_t *si = on->on_bi.bi_private;
if ( rs->sr_type == REP_SEARCH ) {
if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
int i;
if ( op->o_sync_state.ctxcsn ) {
Attribute *a = attr_find( rs->sr_entry->e_attrs,
......@@ -567,16 +674,38 @@ syncprov_search_response( Operation *op, SlapReply *rs )
rs->sr_ctrls[1] = NULL;
rs->sr_err = slap_build_sync_state_ctrl( op, rs, rs->sr_entry,
LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
} else if (rs->sr_type == REP_RESULT ) {
} else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
struct berval cookie;
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie,
&op->ors_filter->f_and->f_ava->aa_value,
op->o_sync_state.sid, op->o_sync_state.rid );
rs->sr_err = slap_build_sync_done_ctrl( op, rs, rs->sr_ctrls,
0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS );
/* Is this a regular refresh? */
if ( !ss->ss_so ) {
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
rs->sr_err = slap_build_sync_done_ctrl( op, rs, rs->sr_ctrls,
0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS );
} else {
/* It's RefreshAndPersist, transition to Persist phase */
rs->sr_rspoid = LDAP_SYNC_INFO;
slap_send_syncinfo( op, rs, rs->sr_nentries ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
;
/* Turn off the refreshing flag */
ss->ss_so->s_flags ^= PS_IS_REFRESHING;
/* Detach this Op from frontend control */
ss->ss_done = 1;
;
return LDAP_SUCCESS;
}
}
return SLAP_CB_CONTINUE;
......@@ -588,8 +717,10 @@ syncprov_op_search( Operation *op, SlapReply *rs )
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
slap_callback *cb;
int gotstate = 0;
int gotstate = 0, nochange = 0;
Filter *fand, *fava;
syncops *sop = NULL;
searchstate *ss;
if ( !op->o_sync_mode ) return SLAP_CB_CONTINUE;
......@@ -598,6 +729,38 @@ syncprov_op_search( Operation *op, SlapReply *rs )
return rs->sr_err;
}
/* If this is a persistent search, set it up right away */
if ( op->o_sync_mode == SLAP_SYNC_REFRESH_AND_PERSIST ) {
syncops so;
fbase_cookie fc;
opcookie opc;
slap_callback sc;
fc.fss = &so;
fc.fbase = 0;
so.s_eid = NOID;
so.s_op = op;
so.s_flags = PS_IS_REFRESHING;
/* syncprov_findbase expects to be called as a callback... */
sc.sc_private = &opc;
opc.son = on;
cb = op->o_callback;
op->o_callback = ≻
rs->sr_err = syncprov_findbase( op, &so, &fc );
op->o_callback = cb;
if ( rs->sr_err != LDAP_SUCCESS ) {
send_ldap_result( op, rs );
return rs->sr_err;
}
sop = ch_malloc( sizeof( syncops ));
*sop = so;
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
sop->s_next = si->si_ops;
si->si_ops = sop;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}
/* If we have a cookie, handle the PRESENT lookups
*/
if ( op->o_sync_state.ctxcsn ) {
......@@ -616,20 +779,24 @@ syncprov_op_search( Operation *op, SlapReply *rs )
}
#endif
} else {
/* Does it match the current ctxCSN? */
if ( bvmatch( op->o_sync_state.ctxcsn, &si->si_ctxcsn )) {
LDAPControl *ctrls[2];
ctrls[0] = NULL;
ctrls[1] = NULL;
slap_build_sync_done_ctrl( op, rs, ctrls, 0, 0,
NULL, LDAP_SYNC_REFRESH_DELETES );
rs->sr_err = LDAP_SUCCESS;
send_ldap_result( op, rs );
return rs->sr_err;
}
gotstate = 1;
/* OK, let's send all the Present UUIDs */
/* If just Refreshing and nothing has changed, shortcut it */
if ( bvmatch( op->o_sync_state.ctxcsn, &si->si_ctxcsn )) {
nochange = 1;
if ( op->o_sync_mode == SLAP_SYNC_REFRESH ) {
LDAPControl *ctrls[2];
ctrls[0] = NULL;
ctrls[1] = NULL;
slap_build_sync_done_ctrl( op, rs, ctrls, 0, 0,
NULL, LDAP_SYNC_REFRESH_DELETES );
rs->sr_err = LDAP_SUCCESS;
send_ldap_result( op, rs );
return rs->sr_err;
}
goto shortcut;
} else
/* If context has changed, check for Present UUIDs */
if ( syncprov_findcsn( op, FIND_PRESENT ) != LDAP_SUCCESS ) {
send_ldap_result( op, rs );
return rs->sr_err;
......@@ -647,7 +814,9 @@ syncprov_op_search( Operation *op, SlapReply *rs )
}
/* Append CSN range to search filter */
op->o_tmpfree( op->ors_filterstr.bv_val, op->o_tmpmemctx );
if ( sop ) {
sop->s_filter = op->ors_filter;
}
fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
fand->f_choice = LDAP_FILTER_AND;
......@@ -670,14 +839,31 @@ syncprov_op_search( Operation *op, SlapReply *rs )
op->ors_filter = fand;
filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
shortcut:
/* Let our callback add needed info to returned entries */
cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(opcookie), op->o_tmpmemctx);
cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx);
ss = (searchstate *)(cb+1);
ss->ss_on = on;
ss->ss_so = sop;
ss->ss_done = 0;
cb->sc_response = syncprov_search_response;
cb->sc_cleanup = syncprov_search_cleanup;
cb->sc_private = on;
cb->sc_private = ss;
cb->sc_next = op->o_callback;
op->o_callback = cb;
op->o_sync_mode = 0; /* Don't let back-bdb see this */
/* If this is a persistent search and no changes were reported during
* the refresh phase, just invoke the response callback to transition
* us into persist phase
*/
if ( nochange ) {
rs->sr_err = LDAP_SUCCESS;
rs->sr_nentries = 0;
send_ldap_result( op, rs );
return rs->sr_err;
}
return SLAP_CB_CONTINUE;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment