Commit ece2a4da authored by Howard Chu's avatar Howard Chu
Browse files

Use read-only txn's instead of read lockers. Support BDB 4.4-4.7

parent 0836d387
......@@ -32,11 +32,10 @@ bdb_add(Operation *op, SlapReply *rs )
size_t textlen = sizeof textbuf;
AttributeDescription *children = slap_schema.si_ad_children;
AttributeDescription *entry = slap_schema.si_ad_entry;
DB_TXN *ltid = NULL, *lt2;
DB_TXN *ltid = NULL, *lt2, *rtxn;
ID eid = NOID;
struct bdb_op_info opinfo = {0};
int subentry;
BDB_LOCKER locker = 0, rlocker = 0;
DB_LOCK lock;
int num_retries = 0;
......@@ -115,8 +114,8 @@ txnReturn:
subentry = is_entry_subentry( op->oq_add.rs_e );
/* Get our thread locker ID */
rs->sr_err = LOCK_ID( bdb->bi_dbenv, &rlocker );
/* Get our reader TXN */
rs->sr_err = bdb_reader_get( op, bdb->bi_dbenv, &rtxn );
if( 0 ) {
retry: /* transaction retry */
......@@ -157,8 +156,6 @@ retry: /* transaction retry */
goto return_results;
}
locker = TXN_ID ( ltid );
opinfo.boi_oe.oe_key = bdb;
opinfo.boi_txn = ltid;
opinfo.boi_err = 0;
......@@ -176,7 +173,7 @@ retry: /* transaction retry */
/* get entry or parent */
rs->sr_err = bdb_dn2entry( op, ltid, &op->ora_e->e_nname, &ei,
1, locker, &lock );
1, &lock );
switch( rs->sr_err ) {
case 0:
rs->sr_err = LDAP_ALREADY_EXISTS;
......@@ -428,8 +425,8 @@ retry: /* transaction retry */
nrdn = op->ora_e->e_nname;
}
/* Use the thread locker here, outside the txn */
bdb_cache_add( bdb, ei, op->ora_e, &nrdn, rlocker, &lock );
/* Use the reader txn here, outside the add txn */
bdb_cache_add( bdb, ei, op->ora_e, &nrdn, rtxn, &lock );
if(( rs->sr_err=TXN_COMMIT( ltid, 0 )) != 0 ) {
rs->sr_text = "txn_commit failed";
......
......@@ -58,30 +58,6 @@ LDAP_BEGIN_DECL
#define BDB_PAGESIZE 4096 /* BDB's original default */
#endif
/* 4.6.18 redefines cursor->locker */
#if DB_VERSION_FULL >= 0x04060012
struct __db_locker {
u_int32_t id;
};
typedef struct __db_locker * BDB_LOCKER;
extern int __lock_getlocker(DB_LOCKTAB *lt, u_int32_t locker, int create, DB_LOCKER **ret);
#define CURSOR_SETLOCKER(cursor, id) cursor->locker = id
#define CURSOR_GETLOCKER(cursor) cursor->locker
#define BDB_LOCKID(locker) locker->id
#else
typedef u_int32_t BDB_LOCKER;
#define CURSOR_SETLOCKER(cursor, id) cursor->locker = id
#define CURSOR_GETLOCKER(cursor) cursor->locker
#define BDB_LOCKID(locker) locker
#endif
#define DEFAULT_CACHE_SIZE 1000
/* The default search IDL stack cache depth */
......@@ -160,7 +136,7 @@ typedef struct bdb_cache {
int c_eiused; /* EntryInfo's in use */
int c_leaves; /* EntryInfo leaf nodes */
int c_purging;
BDB_LOCKER c_locker; /* used by lru cleaner */
DB_TXN *c_txn; /* used by lru cleaner */
ldap_pvt_thread_rdwr_t c_rwlock;
ldap_pvt_thread_mutex_t c_lru_mutex;
ldap_pvt_thread_mutex_t c_count_mutex;
......@@ -309,12 +285,6 @@ struct bdb_op_info {
((db)->open)(db, NULL, file, name, type, flags, mode)
#endif
/* BDB 4.6.18 makes locker a struct instead of an int */
#if DB_VERSION_FULL >= 0x04060012
#undef TXN_ID
#define TXN_ID(txn) (txn)->locker
#endif
/* #undef BDB_LOG_DEBUG */
#ifdef BDB_LOG_DEBUG
......@@ -343,8 +313,6 @@ extern int __db_logmsg(const DB_ENV *env, DB_TXN *txn, const char *op, u_int32_t
#define DB_BUFFER_SMALL ENOMEM
#endif
#define BDB_REUSE_LOCKERS
#define BDB_CSN_COMMIT 0
#define BDB_CSN_ABORT 1
#define BDB_CSN_RETRY 2
......
......@@ -32,7 +32,7 @@ bdb_bind( Operation *op, SlapReply *rs )
AttributeDescription *password = slap_schema.si_ad_userPassword;
BDB_LOCKER locker;
DB_TXN *rtxn;
DB_LOCK lock;
Debug( LDAP_DEBUG_ARGS,
......@@ -55,7 +55,7 @@ bdb_bind( Operation *op, SlapReply *rs )
break;
}
rs->sr_err = LOCK_ID(bdb->bi_dbenv, &locker);
rs->sr_err = bdb_reader_get(op, bdb->bi_dbenv, &rtxn);
switch(rs->sr_err) {
case 0:
break;
......@@ -67,8 +67,8 @@ bdb_bind( Operation *op, SlapReply *rs )
dn2entry_retry:
/* get entry with reader lock */
rs->sr_err = bdb_dn2entry( op, NULL, &op->o_req_ndn, &ei, 1,
locker, &lock );
rs->sr_err = bdb_dn2entry( op, rtxn, &op->o_req_ndn, &ei, 1,
&lock );
switch(rs->sr_err) {
case DB_NOTFOUND:
......@@ -76,14 +76,12 @@ dn2entry_retry:
break;
case LDAP_BUSY:
send_ldap_error( op, rs, LDAP_BUSY, "ldap_server_busy" );
LOCK_ID_FREE(bdb->bi_dbenv, locker);
return LDAP_BUSY;
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
goto dn2entry_retry;
default:
send_ldap_error( op, rs, LDAP_OTHER, "internal error" );
LOCK_ID_FREE(bdb->bi_dbenv, locker);
return rs->sr_err;
}
......@@ -97,8 +95,6 @@ dn2entry_retry:
rs->sr_err = LDAP_INVALID_CREDENTIALS;
send_ldap_result( op, rs );
LOCK_ID_FREE(bdb->bi_dbenv, locker);
return rs->sr_err;
}
......@@ -158,8 +154,6 @@ done:
bdb_cache_return_entry_r( bdb, e, &lock );
}
LOCK_ID_FREE(bdb->bi_dbenv, locker);
if ( rs->sr_err ) {
send_ldap_result( op, rs );
if ( rs->sr_ref ) {
......
......@@ -159,7 +159,7 @@ bdb_cache_lru_link( struct bdb_info *bdb, EntryInfo *ei )
int
bdb_cache_entry_db_relock(
struct bdb_info *bdb,
BDB_LOCKER locker,
DB_TXN *txn,
EntryInfo *ei,
int rw,
int tryOnly,
......@@ -183,7 +183,7 @@ bdb_cache_entry_db_relock(
list[1].lock = *lock;
list[1].mode = rw ? DB_LOCK_WRITE : DB_LOCK_READ;
list[1].obj = &lockobj;
rc = bdb->bi_dbenv->lock_vec(bdb->bi_dbenv, BDB_LOCKID(locker), tryOnly ? DB_LOCK_NOWAIT : 0,
rc = bdb->bi_dbenv->lock_vec(bdb->bi_dbenv, TXN_ID(txn), tryOnly ? DB_LOCK_NOWAIT : 0,
list, 2, NULL );
if (rc && !tryOnly) {
......@@ -198,7 +198,7 @@ bdb_cache_entry_db_relock(
}
static int
bdb_cache_entry_db_lock( struct bdb_info *bdb, BDB_LOCKER locker, EntryInfo *ei,
bdb_cache_entry_db_lock( struct bdb_info *bdb, DB_TXN *txn, EntryInfo *ei,
int rw, int tryOnly, DB_LOCK *lock )
{
#ifdef NO_DB_LOCK
......@@ -218,7 +218,7 @@ bdb_cache_entry_db_lock( struct bdb_info *bdb, BDB_LOCKER locker, EntryInfo *ei,
lockobj.data = &ei->bei_id;
lockobj.size = sizeof(ei->bei_id) + 1;
rc = LOCK_GET(bdb->bi_dbenv, BDB_LOCKID(locker), tryOnly ? DB_LOCK_NOWAIT : 0,
rc = LOCK_GET(bdb->bi_dbenv, TXN_ID(txn), tryOnly ? DB_LOCK_NOWAIT : 0,
&lockobj, db_rw, lock);
if (rc && !tryOnly) {
Debug( LDAP_DEBUG_TRACE,
......@@ -394,7 +394,7 @@ bdb_entryinfo_add_internal(
int
bdb_cache_find_ndn(
Operation *op,
BDB_LOCKER locker,
DB_TXN *txn,
struct berval *ndn,
EntryInfo **res )
{
......@@ -448,7 +448,7 @@ bdb_cache_find_ndn(
ei.bei_nrdn.bv_val );
lock.mode = DB_LOCK_NG;
rc = bdb_dn2id( op, &ei.bei_nrdn, &ei, locker, &lock );
rc = bdb_dn2id( op, &ei.bei_nrdn, &ei, txn, &lock );
if (rc) {
bdb_cache_entryinfo_lock( eip );
bdb_cache_entry_db_unlock( bdb, &lock );
......@@ -508,7 +508,7 @@ bdb_cache_find_ndn(
int
hdb_cache_find_parent(
Operation *op,
BDB_LOCKER locker,
DB_TXN *txn,
ID id,
EntryInfo **res )
{
......@@ -521,7 +521,7 @@ hdb_cache_find_parent(
ei.bei_ckids = 0;
for (;;) {
rc = hdb_dn2id_parent( op, locker, &ei, &eip.bei_id );
rc = hdb_dn2id_parent( op, txn, &ei, &eip.bei_id );
if ( rc ) break;
/* Save the previous node, if any */
......@@ -668,7 +668,7 @@ bdb_cache_lru_purge( struct bdb_info *bdb )
return;
}
if ( bdb->bi_cache.c_locker ) {
if ( bdb->bi_cache.c_txn ) {
lockp = &lock;
} else {
lockp = NULL;
......@@ -715,7 +715,7 @@ bdb_cache_lru_purge( struct bdb_info *bdb )
* the object is idle.
*/
if ( bdb_cache_entry_db_lock( bdb,
bdb->bi_cache.c_locker, elru, 1, 1, lockp ) == 0 ) {
bdb->bi_cache.c_txn, elru, 1, 1, lockp ) == 0 ) {
/* Free entry for this node if it's present */
if ( elru->bei_e ) {
......@@ -793,7 +793,6 @@ bdb_cache_find_id(
ID id,
EntryInfo **eip,
int flag,
BDB_LOCKER locker,
DB_LOCK *lock )
{
struct bdb_info *bdb = (struct bdb_info *) op->o_bd->be_private;
......@@ -842,9 +841,9 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
/* See if the ID exists in the database; add it to the cache if so */
if ( !*eip ) {
#ifndef BDB_HIER
rc = bdb_id2entry( op->o_bd, tid, locker, id, &ep );
rc = bdb_id2entry( op->o_bd, tid, id, &ep );
if ( rc == 0 ) {
rc = bdb_cache_find_ndn( op, locker,
rc = bdb_cache_find_ndn( op, tid,
&ep->e_nname, eip );
if ( *eip ) flag |= ID_LOCKED;
if ( rc ) {
......@@ -858,7 +857,7 @@ again: ldap_pvt_thread_rdwr_rlock( &bdb->bi_cache.c_rwlock );
}
}
#else
rc = hdb_cache_find_parent(op, locker, id, eip );
rc = hdb_cache_find_parent(op, tid, id, eip );
if ( rc == 0 ) flag |= ID_LOCKED;
#endif
}
......@@ -902,14 +901,14 @@ load1:
bdb_cache_entryinfo_unlock( *eip );
flag ^= ID_LOCKED;
}
rc = bdb_cache_entry_db_lock( bdb, locker, *eip, load, 0, lock );
rc = bdb_cache_entry_db_lock( bdb, tid, *eip, load, 0, lock );
if ( (*eip)->bei_state & CACHE_ENTRY_DELETED ) {
rc = DB_NOTFOUND;
bdb_cache_entry_db_unlock( bdb, lock );
} else if ( rc == 0 ) {
if ( load ) {
if ( !ep) {
rc = bdb_id2entry( op->o_bd, tid, locker, id, &ep );
rc = bdb_id2entry( op->o_bd, tid, id, &ep );
}
if ( rc == 0 ) {
ep->e_private = *eip;
......@@ -934,7 +933,7 @@ load1:
}
if ( rc == 0 ) {
/* If we succeeded, downgrade back to a readlock. */
rc = bdb_cache_entry_db_relock( bdb, locker,
rc = bdb_cache_entry_db_relock( bdb, tid,
*eip, 0, 0, lock );
} else {
/* Otherwise, release the lock. */
......@@ -955,12 +954,12 @@ load1:
rc = bdb_fix_dn( (*eip)->bei_e, 1 );
if ( rc ) {
bdb_cache_entry_db_relock( bdb,
locker, *eip, 1, 0, lock );
tid, *eip, 1, 0, lock );
/* check again in case other modifier did it already */
if ( bdb_fix_dn( (*eip)->bei_e, 1 ) )
rc = bdb_fix_dn( (*eip)->bei_e, 2 );
bdb_cache_entry_db_relock( bdb,
locker, *eip, 0, 0, lock );
tid, *eip, 0, 0, lock );
}
#endif
}
......@@ -1039,7 +1038,7 @@ bdb_cache_add(
EntryInfo *eip,
Entry *e,
struct berval *nrdn,
BDB_LOCKER locker,
DB_TXN *txn,
DB_LOCK *lock )
{
EntryInfo *new, ei;
......@@ -1056,7 +1055,7 @@ bdb_cache_add(
/* Lock this entry so that bdb_add can run to completion.
* It can only fail if BDB has run out of lock resources.
*/
rc = bdb_cache_entry_db_lock( bdb, locker, &ei, 0, 0, lock );
rc = bdb_cache_entry_db_lock( bdb, txn, &ei, 0, 0, lock );
if ( rc ) {
bdb_cache_entryinfo_unlock( eip );
return rc;
......@@ -1114,13 +1113,13 @@ bdb_cache_modify(
struct bdb_info *bdb,
Entry *e,
Attribute *newAttrs,
BDB_LOCKER locker,
DB_TXN *txn,
DB_LOCK *lock )
{
EntryInfo *ei = BEI(e);
int rc;
/* Get write lock on data */
rc = bdb_cache_entry_db_relock( bdb, locker, ei, 1, 0, lock );
rc = bdb_cache_entry_db_relock( bdb, txn, ei, 1, 0, lock );
/* If we've done repeated mods on a cached entry, then e_attrs
* is no longer contiguous with the entry, and must be freed.
......@@ -1144,7 +1143,7 @@ bdb_cache_modrdn(
struct berval *nrdn,
Entry *new,
EntryInfo *ein,
BDB_LOCKER locker,
DB_TXN *txn,
DB_LOCK *lock )
{
EntryInfo *ei = BEI(e), *pei;
......@@ -1154,7 +1153,7 @@ bdb_cache_modrdn(
#endif
/* Get write lock on data */
rc = bdb_cache_entry_db_relock( bdb, locker, ei, 1, 0, lock );
rc = bdb_cache_entry_db_relock( bdb, txn, ei, 1, 0, lock );
if ( rc ) return rc;
/* If we've done repeated mods on a cached entry, then e_attrs
......@@ -1249,7 +1248,7 @@ int
bdb_cache_delete(
struct bdb_info *bdb,
Entry *e,
BDB_LOCKER locker,
DB_TXN *txn,
DB_LOCK *lock )
{
EntryInfo *ei = BEI(e);
......@@ -1266,7 +1265,7 @@ bdb_cache_delete(
bdb_cache_entryinfo_unlock( ei );
/* Get write lock on the data */
rc = bdb_cache_entry_db_relock( bdb, locker, ei, 1, 0, lock );
rc = bdb_cache_entry_db_relock( bdb, txn, ei, 1, 0, lock );
if ( rc ) {
/* couldn't lock, undo and give up */
ei->bei_state ^= CACHE_ENTRY_DELETED;
......@@ -1436,56 +1435,36 @@ bdb_lru_print( Cache *cache )
#endif
#endif
#ifdef BDB_REUSE_LOCKERS
static void
bdb_locker_id_free( void *key, void *data )
bdb_reader_free( void *key, void *data )
{
DB_ENV *env = key;
u_int32_t lockid;
int rc;
/* DB_ENV *env = key; */
DB_TXN *txn = data;
#if DB_VERSION_FULL >= 0x04060012
BDB_LOCKER lptr = data;
lockid = lptr->id;
#else
lockid = (long)data;
#endif
rc = XLOCK_ID_FREE( env, lockid );
if ( rc == EINVAL ) {
DB_LOCKREQ lr;
Debug( LDAP_DEBUG_ANY,
"bdb_locker_id_free: %lu err %s(%d)\n",
(unsigned long) lockid, db_strerror(rc), rc );
/* release all locks held by this locker. */
lr.op = DB_LOCK_PUT_ALL;
lr.obj = NULL;
env->lock_vec( env, lockid, 0, &lr, 1, NULL );
XLOCK_ID_FREE( env, lockid );
}
TXN_ABORT( txn );
}
/* free up any keys used by the main thread */
void
bdb_locker_flush( DB_ENV *env )
bdb_reader_flush( DB_ENV *env )
{
void *data;
void *ctx = ldap_pvt_thread_pool_context();
if ( !ldap_pvt_thread_pool_getkey( ctx, env, &data, NULL ) ) {
ldap_pvt_thread_pool_setkey( ctx, env, NULL, 0, NULL, NULL );
bdb_locker_id_free( env, data );
bdb_reader_free( env, data );
}
}
int
bdb_locker_id( Operation *op, DB_ENV *env, BDB_LOCKER *locker )
bdb_reader_get( Operation *op, DB_ENV *env, DB_TXN **txn )
{
int i, rc;
u_int32_t lockid;
void *data;
void *ctx;
if ( !env || !locker ) return -1;
if ( !env || !txn ) return -1;
/* If no op was provided, try to find the ctx anyway... */
if ( op ) {
......@@ -1496,42 +1475,29 @@ bdb_locker_id( Operation *op, DB_ENV *env, BDB_LOCKER *locker )
/* Shouldn't happen unless we're single-threaded */
if ( !ctx ) {
*locker = 0;
*txn = NULL;
return 0;
}
if ( ldap_pvt_thread_pool_getkey( ctx, env, &data, NULL ) ) {
for ( i=0, rc=1; rc != 0 && i<4; i++ ) {
rc = XLOCK_ID( env, &lockid );
rc = TXN_BEGIN( env, NULL, txn, DB_READ_COMMITTED );
if (rc) ldap_pvt_thread_yield();
}
if ( rc != 0) {
return rc;
}
#if DB_VERSION_FULL >= 0x04060012
{ BDB_LOCKER lptr;
__lock_getlocker( env->lk_handle, lockid, 0, &lptr );
data = lptr;
}
#else
data = (void *)((long)lockid);
#endif
data = *txn;
if ( ( rc = ldap_pvt_thread_pool_setkey( ctx, env,
data, bdb_locker_id_free, NULL, NULL ) ) ) {
XLOCK_ID_FREE( env, lockid );
Debug( LDAP_DEBUG_ANY, "bdb_locker_id: err %s(%d)\n",
data, bdb_reader_free, NULL, NULL ) ) ) {
TXN_ABORT( *txn );
Debug( LDAP_DEBUG_ANY, "bdb_reader_get: err %s(%d)\n",
db_strerror(rc), rc, 0 );
return rc;
}
} else {
lockid = (long)data;
*txn = data;
}
#if DB_VERSION_FULL >= 0x04060012
*locker = data;
#else
*locker = lockid;
#endif
return 0;
}
#endif /* BDB_REUSE_LOCKERS */
......@@ -30,10 +30,10 @@ bdb_compare( Operation *op, SlapReply *rs )
Attribute *a;
int manageDSAit = get_manageDSAit( op );
BDB_LOCKER locker;
DB_TXN *rtxn;
DB_LOCK lock;
rs->sr_err = LOCK_ID(bdb->bi_dbenv, &locker);
rs->sr_err = bdb_reader_get(op, bdb->bi_dbenv, &rtxn);
switch(rs->sr_err) {
case 0:
break;
......@@ -44,8 +44,8 @@ bdb_compare( Operation *op, SlapReply *rs )
dn2entry_retry:
/* get entry */
rs->sr_err = bdb_dn2entry( op, NULL, &op->o_req_ndn, &ei, 1,
locker, &lock );
rs->sr_err = bdb_dn2entry( op, rtxn, &op->o_req_ndn, &ei, 1,
&lock );
switch( rs->sr_err ) {
case DB_NOTFOUND:
......@@ -185,6 +185,5 @@ done:
bdb_cache_return_entry_r( bdb, e, &lock );
}
LOCK_ID_FREE ( bdb->bi_dbenv, locker );
return rs->sr_err;
}
......@@ -202,7 +202,6 @@ bdb_online_index( void *ctx, void *arg )
DBT key, data;
DB_TXN *txn;
DB_LOCK lock;
BDB_LOCKER locker;
ID id, nid;
EntryInfo *ei;
int rc, getnext = 1;
......@@ -231,7 +230,6 @@ bdb_online_index( void *ctx, void *arg )
rc = TXN_BEGIN( bdb->bi_dbenv, NULL, &txn, bdb->bi_db_opflags );
if ( rc )
break;
locker = TXN_ID( txn );
if ( getnext ) {
getnext = 0;
BDB_ID2DISK( id, &nid );
......@@ -257,7 +255,7 @@ bdb_online_index( void *ctx, void *arg )
}
ei = NULL;
rc = bdb_cache_find_id( op, txn, id, &ei, 0, locker, &lock );
rc = bdb_cache_find_id( op, txn, id, &ei, 0, &lock );
if ( rc ) {
TXN_ABORT( txn );
if ( rc == DB_LOCK_DEADLOCK ) {
......
......@@ -38,7 +38,6 @@ bdb_delete( Operation *op, SlapReply *rs )
struct bdb_op_info opinfo = {0};
ID eid;
BDB_LOCKER locker = 0;
DB_LOCK lock, plock;
int num_retries = 0;
......@@ -154,8 +153,6 @@ retry: /* transaction retry */
goto return_results;
}
locker = TXN_ID ( ltid );
opinfo.boi_oe.oe_key = bdb;
opinfo.boi_txn = ltid;
opinfo.boi_err = 0;
......@@ -168,7 +165,7 @@ retry: /* transaction retry */
/* get entry */
rs->sr_err = bdb_dn2entry( op, ltid, &op->o_req_ndn, &ei, 1,
locker, &lock );
&lock );
switch( rs->sr_err ) {
case 0:
......@@ -217,7 +214,7 @@ retry: /* transaction retry */
goto return_results;
}
rc = bdb_cache_find_id( op, ltid, eip->bei_id, &eip, 0, locker, &plock );
rc = bdb_cache_find_id( op, ltid, eip->bei_id, &eip, 0, &plock );
switch( rc ) {
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
......@@ -527,7 +524,7 @@ retry: /* transaction retry */
BDB_LOG_PRINTF( bdb->bi_dbenv, ltid, "slapd Cache delete %s(%d)",
e->e_nname.bv_val, e->e_id );
rc = bdb_cache_delete( bdb, e, locker, &lock );
rc = bdb_cache_delete( bdb, e, ltid, &lock );
switch( rc ) {
case DB_LOCK_DEADLOCK:
case DB_LOCK_NOTGRANTED:
......
......@@ -34,7 +34,6 @@ bdb_dn2entry(
struct berval *dn,
EntryInfo **e,
int matched,
BDB_LOCKER locker,
DB_LOCK *lock )
{
EntryInfo *ei = NULL;
......@@ -45,7 +44,7 @@ bdb_dn2entry(