Commit 5d61e28b authored by Howard Chu's avatar Howard Chu Committed by Quanah Gibson-Mount
Browse files

ITS#7473 Setup for subtree entry counts

parent abbdfd67
......@@ -153,7 +153,7 @@ txnReturn:
}
/* get entry or parent */
rs->sr_err = mdb_dn2entry( op, txn, mcd, &op->ora_e->e_nname, &p, 1 );
rs->sr_err = mdb_dn2entry( op, txn, mcd, &op->ora_e->e_nname, &p, NULL, 1 );
switch( rs->sr_err ) {
case 0:
rs->sr_err = LDAP_ALREADY_EXISTS;
......@@ -338,7 +338,7 @@ txnReturn:
op->ora_e->e_id = eid;
/* dn2id index */
rs->sr_err = mdb_dn2id_add( op, mcd, mcd, pid, op->ora_e );
rs->sr_err = mdb_dn2id_add( op, mcd, mcd, pid, 1, op->ora_e );
mdb_cursor_close( mcd );
if ( rs->sr_err != 0 ) {
Debug( LDAP_DEBUG_TRACE,
......
......@@ -67,7 +67,7 @@ mdb_bind( Operation *op, SlapReply *rs )
rtxn = moi->moi_txn;
/* get entry with reader lock */
rs->sr_err = mdb_dn2entry( op, rtxn, NULL, &op->o_req_ndn, &e, 0 );
rs->sr_err = mdb_dn2entry( op, rtxn, NULL, &op->o_req_ndn, &e, NULL, 0 );
switch(rs->sr_err) {
case MDB_NOTFOUND:
......
......@@ -43,7 +43,7 @@ mdb_compare( Operation *op, SlapReply *rs )
rtxn = moi->moi_txn;
/* get entry */
rs->sr_err = mdb_dn2entry( op, rtxn, NULL, &op->o_req_ndn, &e, 1 );
rs->sr_err = mdb_dn2entry( op, rtxn, NULL, &op->o_req_ndn, &e, NULL, 1 );
switch( rs->sr_err ) {
case MDB_NOTFOUND:
case 0:
......
......@@ -124,7 +124,7 @@ txnReturn:
goto return_results;
}
/* get parent */
rs->sr_err = mdb_dn2entry( op, txn, mc, &pdn, &p, 1 );
rs->sr_err = mdb_dn2entry( op, txn, mc, &pdn, &p, NULL, 1 );
switch( rs->sr_err ) {
case 0:
case MDB_NOTFOUND:
......@@ -167,7 +167,7 @@ txnReturn:
}
/* get entry */
rs->sr_err = mdb_dn2entry( op, txn, mc, &op->o_req_ndn, &e, 0 );
rs->sr_err = mdb_dn2entry( op, txn, mc, &op->o_req_ndn, &e, NULL, 0 );
switch( rs->sr_err ) {
case MDB_NOTFOUND:
e = p;
......@@ -332,7 +332,7 @@ txnReturn:
}
/* delete from dn2id */
rs->sr_err = mdb_dn2id_delete( op, mc, e->e_id );
rs->sr_err = mdb_dn2id_delete( op, mc, e->e_id, 1 );
mdb_cursor_close( mc );
if ( rs->sr_err != 0 ) {
Debug(LDAP_DEBUG_TRACE,
......
......@@ -34,6 +34,7 @@ mdb_dn2entry(
MDB_cursor *m2,
struct berval *dn,
Entry **e,
ID *nsubs,
int matched )
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
......@@ -47,7 +48,7 @@ mdb_dn2entry(
*e = NULL;
rc = mdb_dn2id( op, tid, m2, dn, &id, &mbv, &nmbv );
rc = mdb_dn2id( op, tid, m2, dn, &id, nsubs, &mbv, &nmbv );
if ( rc ) {
if ( matched ) {
rc2 = mdb_cursor_open( tid, mdb->mi_id2entry, &mc );
......
......@@ -36,6 +36,9 @@
* RDNs up to length 32767, but that's fine since full DNs are already
* restricted to 8192.
*
* Also each child node contains a count of the number of entries in
* its subtree, appended after its entryID.
*
* The diskNode is a variable length structure. This definition is not
* directly usable for in-memory manipulation.
*/
......@@ -44,6 +47,7 @@ typedef struct diskNode {
char nrdn[1];
char rdn[1]; /* variable placement */
unsigned char entryID[sizeof(ID)]; /* variable placement */
/* unsigned char nsubs[sizeof(ID)]; in child nodes only */
} diskNode;
/* Sort function for the sorted duplicate data items of a dn2id key.
......@@ -67,7 +71,7 @@ mdb_dup_compare(
rc = un->nrdnlen[1] - cn->nrdnlen[1];
if ( rc ) return rc;
nrlen = (un->nrdnlen[0] << 8) | un->nrdnlen[1];
nrlen = ((un->nrdnlen[0] & 0x7f) << 8) | un->nrdnlen[1];
return strncmp( un->nrdn, cn->nrdn, nrlen );
}
......@@ -81,6 +85,7 @@ mdb_dn2id_add(
MDB_cursor *mcp,
MDB_cursor *mcd,
ID pid,
ID nsubs,
Entry *e )
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
......@@ -101,7 +106,7 @@ mdb_dn2id_add(
rlen = e->e_name.bv_len;
}
d = op->o_tmpalloc(sizeof(diskNode) + rlen + nrlen, op->o_tmpmemctx);
d = op->o_tmpalloc(sizeof(diskNode) + rlen + nrlen + sizeof(ID), op->o_tmpmemctx);
d->nrdnlen[1] = nrlen & 0xff;
d->nrdnlen[0] = (nrlen >> 8) | 0x80;
ptr = lutil_strncopy( d->nrdn, e->e_nname.bv_val, nrlen );
......@@ -109,6 +114,8 @@ mdb_dn2id_add(
ptr = lutil_strncopy( ptr, e->e_name.bv_val, rlen );
*ptr++ = '\0';
memcpy( ptr, &e->e_id, sizeof( ID ));
ptr += sizeof( ID );
memcpy( ptr, &nsubs, sizeof( ID ));
key.mv_size = sizeof(ID);
key.mv_data = &nid;
......@@ -127,13 +134,18 @@ mdb_dn2id_add(
}
data.mv_data = d;
data.mv_size = sizeof(diskNode) + rlen + nrlen;
data.mv_size = sizeof(diskNode) + rlen + nrlen + sizeof( ID );
/* Add our child node under parent's key */
rc = mdb_cursor_put( mcp, &key, &data, MDB_NODUPDATA );
/* Add our own node */
if (rc == 0) {
int flag = MDB_NODUPDATA;
nid = e->e_id;
/* drop subtree count */
data.mv_size -= sizeof( ID );
ptr -= sizeof( ID );
memcpy( ptr, &pid, sizeof( ID ));
d->nrdnlen[0] ^= 0x80;
......@@ -141,9 +153,46 @@ mdb_dn2id_add(
flag |= MDB_APPEND;
rc = mdb_cursor_put( mcd, &key, &data, flag );
}
fail:
op->o_tmpfree( d, op->o_tmpmemctx );
/* Add our subtree count to all superiors */
if ( rc == 0 && nsubs && pid ) {
ID subs;
nid = pid;
do {
/* Get parent's RDN */
rc = mdb_cursor_get( mcp, &key, &data, MDB_SET );
if ( !rc ) {
char *p2;
ptr = data.mv_data + data.mv_size - sizeof( ID );
memcpy( &nid, ptr, sizeof( ID ));
/* Get parent's node under grandparent */
d = data.mv_data;
rlen = ( d->nrdnlen[0] << 8 ) | d->nrdnlen[1];
p2 = op->o_tmpalloc( rlen + 2, op->o_tmpmemctx );
memcpy( p2, data.mv_data, rlen+2 );
*p2 ^= 0x80;
data.mv_data = p2;
rc = mdb_cursor_get( mcp, &key, &data, MDB_GET_BOTH );
op->o_tmpfree( p2, op->o_tmpmemctx );
if ( !rc ) {
/* Get parent's subtree count */
ptr = data.mv_data + data.mv_size - sizeof( ID );
memcpy( &subs, ptr, sizeof( ID ));
subs += nsubs;
p2 = op->o_tmpalloc( data.mv_size, op->o_tmpmemctx );
memcpy( p2, data.mv_data, data.mv_size - sizeof( ID ));
memcpy( p2+data.mv_size - sizeof( ID ), &subs, sizeof( ID ));
data.mv_data = p2;
rc = mdb_cursor_put( mcp, &key, &data, MDB_CURRENT );
op->o_tmpfree( p2, op->o_tmpmemctx );
}
}
if ( rc )
break;
} while ( nid );
}
Debug( LDAP_DEBUG_TRACE, "<= mdb_dn2id_add 0x%lx: %d\n", e->e_id, rc, 0 );
return rc;
......@@ -154,8 +203,11 @@ int
mdb_dn2id_delete(
Operation *op,
MDB_cursor *mc,
ID id )
ID id,
ID nsubs )
{
ID nid;
char *ptr;
int rc;
Debug( LDAP_DEBUG_TRACE, "=> mdb_dn2id_delete 0x%lx\n",
......@@ -170,6 +222,10 @@ mdb_dn2id_delete(
*/
if ( rc == 0 ) {
MDB_val key, data;
if ( nsubs ) {
mdb_cursor_get( mc, &key, NULL, MDB_GET_CURRENT );
memcpy( &nid, key.mv_data, sizeof( ID ));
}
key.mv_size = sizeof(ID);
key.mv_data = &id;
rc = mdb_cursor_get( mc, &key, &data, MDB_SET );
......@@ -177,13 +233,56 @@ mdb_dn2id_delete(
rc = mdb_cursor_del( mc, 0 );
}
/* Delete our subtree count from all superiors */
if ( rc == 0 && nsubs && nid ) {
MDB_val key, data;
ID subs;
key.mv_data = &nid;
key.mv_size = sizeof( ID );
do {
rc = mdb_cursor_get( mc, &key, &data, MDB_SET );
if ( !rc ) {
char *p2;
diskNode *d;
int rlen;
ptr = data.mv_data + data.mv_size - sizeof( ID );
memcpy( &nid, ptr, sizeof( ID ));
/* Get parent's node under grandparent */
d = data.mv_data;
rlen = ( d->nrdnlen[0] << 8 ) | d->nrdnlen[1];
p2 = op->o_tmpalloc( rlen + 2, op->o_tmpmemctx );
memcpy( p2, data.mv_data, rlen+2 );
*p2 ^= 0x80;
data.mv_data = p2;
rc = mdb_cursor_get( mc, &key, &data, MDB_GET_BOTH );
op->o_tmpfree( p2, op->o_tmpmemctx );
if ( !rc ) {
/* Get parent's subtree count */
ptr = data.mv_data + data.mv_size - sizeof( ID );
memcpy( &subs, ptr, sizeof( ID ));
subs -= nsubs;
p2 = op->o_tmpalloc( data.mv_size, op->o_tmpmemctx );
memcpy( p2, data.mv_data, data.mv_size - sizeof( ID ));
memcpy( p2+data.mv_size - sizeof( ID ), &subs, sizeof( ID ));
data.mv_data = p2;
rc = mdb_cursor_put( mc, &key, &data, MDB_CURRENT );
op->o_tmpfree( p2, op->o_tmpmemctx );
}
}
if ( rc )
break;
} while ( nid );
}
Debug( LDAP_DEBUG_TRACE, "<= mdb_dn2id_delete 0x%lx: %d\n", id, rc, 0 );
return rc;
}
/* return last found ID in *id if no match
* If mc is provided, it will be left pointing to the RDN's
* record under the parent's ID.
* record under the parent's ID. If nsubs is provided, return
* the number of entries in this entry's subtree.
*/
int
mdb_dn2id(
......@@ -192,6 +291,7 @@ mdb_dn2id(
MDB_cursor *mc,
struct berval *in,
ID *id,
ID *nsubs,
struct berval *matched,
struct berval *nmatched )
{
......@@ -263,14 +363,14 @@ mdb_dn2id(
op->o_tmpfree( d, op->o_tmpmemctx );
if ( rc )
break;
ptr = (char *) data.mv_data + data.mv_size - sizeof(ID);
ptr = (char *) data.mv_data + data.mv_size - 2*sizeof(ID);
memcpy( &nid, ptr, sizeof(ID));
/* grab the non-normalized RDN */
if ( matched ) {
int rlen;
d = data.mv_data;
rlen = data.mv_size - sizeof(diskNode) - tmp.bv_len;
rlen = data.mv_size - sizeof(diskNode) - tmp.bv_len - sizeof(ID);
matched->bv_len += rlen;
matched->bv_val -= rlen + 1;
ptr = lutil_strcopy( matched->bv_val, d->rdn + tmp.bv_len );
......@@ -296,6 +396,11 @@ mdb_dn2id(
}
}
*id = nid;
/* return subtree count if requested */
if ( !rc && nsubs ) {
ptr = data.mv_data + data.mv_size - sizeof(ID);
memcpy( nsubs, ptr, sizeof( ID ));
}
if ( !mc )
mdb_cursor_close( cursor );
done:
......@@ -383,7 +488,7 @@ mdb_dn2sups(
mdb_cursor_close( cursor );
break;
}
ptr = (char *) data.mv_data + data.mv_size - sizeof(ID);
ptr = (char *) data.mv_data + data.mv_size - 2*sizeof(ID);
memcpy( &nid, ptr, sizeof(ID));
if ( pid )
......
......@@ -489,7 +489,7 @@ ext_candidates(
MDB_IDL_ZERO( ids );
if ( mra->ma_rule == slap_schema.si_mr_distinguishedNameMatch ) {
base:
rc = mdb_dn2id( op, rtxn, NULL, &mra->ma_value, &id, NULL, NULL );
rc = mdb_dn2id( op, rtxn, NULL, &mra->ma_value, &id, NULL, NULL, NULL );
if ( rc == MDB_SUCCESS ) {
mdb_idl_insert( ids, id );
}
......@@ -690,7 +690,7 @@ equality_candidates(
if ( ava->aa_desc == slap_schema.si_ad_entryDN ) {
ID id;
rc = mdb_dn2id( op, rtxn, NULL, &ava->aa_value, &id, NULL, NULL );
rc = mdb_dn2id( op, rtxn, NULL, &ava->aa_value, &id, NULL, NULL, NULL );
if ( rc == LDAP_SUCCESS ) {
/* exactly one ID can match */
ids[0] = 1;
......
......@@ -323,7 +323,7 @@ int mdb_entry_get(
txn = moi->moi_txn;
/* can we find entry */
rc = mdb_dn2entry( op, txn, NULL, ndn, &e, 0 );
rc = mdb_dn2entry( op, txn, NULL, ndn, &e, NULL, 0 );
switch( rc ) {
case MDB_NOTFOUND:
case 0:
......
......@@ -479,7 +479,7 @@ txnReturn:
}
/* get entry or ancestor */
rs->sr_err = mdb_dn2entry( op, txn, NULL, &op->o_req_ndn, &e, 1 );
rs->sr_err = mdb_dn2entry( op, txn, NULL, &op->o_req_ndn, &e, NULL, 1 );
if ( rs->sr_err != 0 ) {
Debug( LDAP_DEBUG_TRACE,
......
......@@ -46,7 +46,7 @@ mdb_modrdn( Operation *op, SlapReply *rs )
int manageDSAit = get_manageDSAit( op );
ID nid;
ID nid, nsubs;
LDAPControl **preread_ctrl = NULL;
LDAPControl **postread_ctrl = NULL;
LDAPControl *ctrls[SLAP_MAX_RESPONSE_CONTROLS];
......@@ -145,7 +145,7 @@ txnReturn:
rs->sr_text = "DN cursor_open failed";
goto return_results;
}
rs->sr_err = mdb_dn2entry( op, txn, mc, &p_ndn, &p, 0 );
rs->sr_err = mdb_dn2entry( op, txn, mc, &p_ndn, &p, NULL, 0 );
switch( rs->sr_err ) {
case MDB_NOTFOUND:
Debug( LDAP_DEBUG_TRACE, LDAP_XSTRING(mdb_modrdn)
......@@ -199,7 +199,7 @@ txnReturn:
p_dn.bv_val, 0, 0 );
/* get entry */
rs->sr_err = mdb_dn2entry( op, txn, mc, &op->o_req_ndn, &e, 0 );
rs->sr_err = mdb_dn2entry( op, txn, mc, &op->o_req_ndn, &e, &nsubs, 0 );
switch( rs->sr_err ) {
case MDB_NOTFOUND:
e = p;
......@@ -321,7 +321,7 @@ txnReturn:
goto return_results;
}
/* Get Entry with dn=newSuperior. Does newSuperior exist? */
rs->sr_err = mdb_dn2entry( op, txn, NULL, np_ndn, &np, 0 );
rs->sr_err = mdb_dn2entry( op, txn, NULL, np_ndn, &np, NULL, 0 );
switch( rs->sr_err ) {
case 0:
......@@ -432,7 +432,7 @@ txnReturn:
new_ndn.bv_val, 0, 0 );
/* Shortcut the search */
rs->sr_err = mdb_dn2id ( op, txn, NULL, &new_ndn, &nid, NULL, NULL );
rs->sr_err = mdb_dn2id ( op, txn, NULL, &new_ndn, &nid, NULL, NULL, NULL );
switch( rs->sr_err ) {
case MDB_NOTFOUND:
break;
......@@ -469,8 +469,11 @@ txnReturn:
}
}
/* delete old DN */
rs->sr_err = mdb_dn2id_delete( op, mc, e->e_id );
/* delete old DN
* If moving to a new parent, must delete current subtree count,
* otherwise leave it unchanged since we'll be adding it right back.
*/
rs->sr_err = mdb_dn2id_delete( op, mc, e->e_id, np ? nsubs : 0 );
if ( rs->sr_err != 0 ) {
Debug(LDAP_DEBUG_TRACE,
"<=- " LDAP_XSTRING(mdb_modrdn)
......@@ -488,7 +491,8 @@ txnReturn:
dummy.e_attrs = NULL;
/* add new DN */
rs->sr_err = mdb_dn2id_add( op, mc, mc, np ? np->e_id : p->e_id, &dummy );
rs->sr_err = mdb_dn2id_add( op, mc, mc, np ? np->e_id : p->e_id,
np ? nsubs : 0, &dummy );
if ( rs->sr_err != 0 ) {
Debug(LDAP_DEBUG_TRACE,
"<=- " LDAP_XSTRING(mdb_modrdn)
......@@ -526,23 +530,24 @@ txnReturn:
}
if ( p_ndn.bv_len != 0 ) {
parent_is_glue = is_entry_glue(p);
rs->sr_err = mdb_dn2id_children( op, txn, p );
if ( rs->sr_err != MDB_NOTFOUND ) {
switch( rs->sr_err ) {
case 0:
break;
default:
Debug(LDAP_DEBUG_ARGS,
"<=- " LDAP_XSTRING(mdb_modrdn)
": has_children failed: %s (%d)\n",
mdb_strerror(rs->sr_err), rs->sr_err, 0 );
rs->sr_err = LDAP_OTHER;
rs->sr_text = "internal error";
goto return_results;
if ((parent_is_glue = is_entry_glue(p))) {
rs->sr_err = mdb_dn2id_children( op, txn, p );
if ( rs->sr_err != MDB_NOTFOUND ) {
switch( rs->sr_err ) {
case 0:
break;
default:
Debug(LDAP_DEBUG_ARGS,
"<=- " LDAP_XSTRING(mdb_modrdn)
": has_children failed: %s (%d)\n",
mdb_strerror(rs->sr_err), rs->sr_err, 0 );
rs->sr_err = LDAP_OTHER;
rs->sr_text = "internal error";
goto return_results;
}
} else {
parent_is_leaf = 1;
}
} else {
parent_is_leaf = 1;
}
mdb_entry_return( op, p );
p = NULL;
......
......@@ -60,7 +60,7 @@ int mdb_back_init_cf( BackendInfo *bi );
*/
int mdb_dn2entry LDAP_P(( Operation *op, MDB_txn *tid, MDB_cursor *mc,
struct berval *dn, Entry **e, int matched ));
struct berval *dn, Entry **e, ID *nsubs, int matched ));
/*
* dn2id.c
......@@ -72,6 +72,7 @@ int mdb_dn2id(
MDB_cursor *mc,
struct berval *ndn,
ID *id,
ID *nsubs,
struct berval *matched,
struct berval *nmatched );
......@@ -80,12 +81,14 @@ int mdb_dn2id_add(
MDB_cursor *mcp,
MDB_cursor *mcd,
ID pid,
ID nsubs,
Entry *e );
int mdb_dn2id_delete(
Operation *op,
MDB_cursor *mc,
ID id );
ID id,
ID nsubs );
int mdb_dn2id_children(
Operation *op,
......
......@@ -98,7 +98,7 @@ static Entry * deref_base (
break;
}
rs->sr_err = mdb_dn2entry( op, txn, NULL, &ndn, &e, 0 );
rs->sr_err = mdb_dn2entry( op, txn, NULL, &ndn, &e, NULL, 0 );
if (rs->sr_err) {
rs->sr_err = LDAP_ALIAS_PROBLEM;
rs->sr_text = "aliasedObject not found";
......@@ -316,7 +316,7 @@ int
mdb_search( Operation *op, SlapReply *rs )
{
struct mdb_info *mdb = (struct mdb_info *) op->o_bd->be_private;
ID id, cursor;
ID id, cursor, nsubs;
ID lastid = NOID;
ID candidates[MDB_IDL_UM_SIZE];
ID2 *scopes;
......@@ -365,7 +365,7 @@ mdb_search( Operation *op, SlapReply *rs )
}
dn2entry_retry:
/* get entry with reader lock */
rs->sr_err = mdb_dn2entry( op, ltid, NULL, &op->o_req_ndn, &e, 1 );
rs->sr_err = mdb_dn2entry( op, ltid, NULL, &op->o_req_ndn, &e, &nsubs, 1 );
switch(rs->sr_err) {
case MDB_NOTFOUND:
......
......@@ -296,7 +296,7 @@ ID mdb_tool_dn2id_get(
op.o_tmpmemctx = NULL;
op.o_tmpmfuncs = &ch_mfuncs;
rc = mdb_dn2id( &op, txn, NULL, dn, &id, NULL, NULL );
rc = mdb_dn2id( &op, txn, NULL, dn, &id, NULL, NULL, NULL );
if ( rc == MDB_NOTFOUND )
return NOID;
......@@ -422,7 +422,7 @@ static int mdb_tool_next_id(
return 0;
}
rc = mdb_dn2id( op, tid, mcp, &ndn, &id, NULL, &nmatched );
rc = mdb_dn2id( op, tid, mcp, &ndn, &id, NULL, NULL, &nmatched );
if ( rc == MDB_NOTFOUND ) {
if ( !be_issuffix( op->o_bd, &ndn ) ) {
ID eid = e->e_id;
......@@ -457,7 +457,7 @@ static int mdb_tool_next_id(
"=> mdb_tool_next_id: %s\n", text->bv_val, 0, 0 );
return rc;
}
rc = mdb_dn2id_add( op, mcp, mcd, pid, e );
rc = mdb_dn2id_add( op, mcp, mcd, pid, 1, e );
if ( rc ) {
snprintf( text->bv_val, text->bv_len,
"dn2id_add failed: %s (%d)",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment