Commit c4ac5b31 authored by Ondřej Kuzník's avatar Ondřej Kuzník Committed by Quanah Gibson-Mount
Browse files

ITS#9538 Serialise CSN assignment in slapo-accesslog

parent f6a61ab7
......@@ -28,6 +28,8 @@
#include <ac/string.h>
#include <ac/ctype.h>
#include <assert.h>
#include "slap.h"
#include "slap-config.h"
#include "lutil.h"
......@@ -78,6 +80,17 @@ typedef struct log_info {
log_base *li_bases;
BerVarray li_mincsn;
int *li_sids, li_numcsns;
/*
* Allow partial concurrency, main operation processing serialised with
* li_op_rmutex (there might be multiple such in progress by the same
* thread at a time, think overlays), the actual logging serialised with
* li_log_mutex.
*
* ITS#9538:
* Any CSN assignment should happen while li_op_rmutex is held and
* li_log_mutex should be acquired before the former has been released.
*/
ldap_pvt_thread_mutex_t li_op_rmutex;
ldap_pvt_thread_mutex_t li_log_mutex;
} log_info;
......@@ -1493,8 +1506,11 @@ accesslog_op2logop( Operation *op )
return LOG_EN_UNKNOWN;
}
static int accesslog_response(Operation *op, SlapReply *rs) {
slap_overinst *on = (slap_overinst *)op->o_callback->sc_private;
static int
accesslog_response(Operation *op, SlapReply *rs)
{
slap_callback *sc = op->o_callback;
slap_overinst *on = (slap_overinst *)sc->sc_private;
log_info *li = on->on_bi.bi_private;
Attribute *a, *last_attr;
Modifications *m;
......@@ -1504,30 +1520,33 @@ static int accesslog_response(Operation *op, SlapReply *rs) {
slap_verbmasks *lo;
Entry *e = NULL, *old = NULL, *e_uuid = NULL;
char timebuf[LDAP_LUTIL_GENTIME_BUFSIZE+8];
struct berval bv, bv2 = BER_BVNULL;
struct berval bv;
char *ptr;
BerVarray vals;
Operation op2 = {0};
SlapReply rs2 = {REP_RESULT};
/* ITS#9051 Make sure we only remove the callback on a final response */
if ( rs->sr_type == REP_RESULT || rs->sr_type == REP_EXTENDED ||
rs->sr_type == REP_SASL ) {
slap_callback *sc = op->o_callback;
op->o_callback = sc->sc_next;
op->o_tmpfree(sc, op->o_tmpmemctx );
}
if ( rs->sr_type != REP_RESULT && rs->sr_type != REP_EXTENDED )
if ( rs->sr_type != REP_RESULT && rs->sr_type != REP_EXTENDED &&
rs->sr_type != REP_SASL )
return SLAP_CB_CONTINUE;
/* can't do anything if logDB isn't open */
if ( !SLAP_DBOPEN( li->li_db ))
return SLAP_CB_CONTINUE;
op->o_callback = sc->sc_next;
op->o_tmpfree( sc, op->o_tmpmemctx );
logop = accesslog_op2logop( op );
lo = logops+logop+EN_OFFSET;
if ( !( li->li_ops & lo->mask )) {
/* can't do anything if logDB isn't open */
if ( !SLAP_DBOPEN( li->li_db ) ) {
goto skip;
}
/* These internal ops are not logged */
if ( op->o_dont_replicate )
goto skip;
if ( !( li->li_ops & lo->mask ) ) {
log_base *lb;
i = 0;
......@@ -1537,36 +1556,38 @@ static int accesslog_response(Operation *op, SlapReply *rs) {
break;
}
if ( !i )
return SLAP_CB_CONTINUE;
goto skip;
}
/* mutex and so were only set for write operations;
* if we got here, the operation must be logged */
if ( lo->mask & LOG_OP_WRITES ) {
slap_callback *cb;
/* These internal ops are not logged */
if ( op->o_dont_replicate )
return SLAP_CB_CONTINUE;
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
old = li->li_old;
uuid = li->li_uuid;
li->li_old = NULL;
BER_BVZERO( &li->li_uuid );
#ifdef RMUTEX_DEBUG
Debug( LDAP_DEBUG_SYNC,
"accesslog_response: unlocking rmutex for tid %x\n",
op->o_tid );
#endif
ldap_pvt_thread_mutex_unlock( &li->li_op_rmutex );
if ( !( lo->mask & LOG_OP_WRITES ) ) {
ldap_pvt_thread_mutex_lock( &li->li_op_rmutex );
}
/* ignore these internal reads */
if (( lo->mask & LOG_OP_READS ) && op->o_do_not_cache ) {
return SLAP_CB_CONTINUE;
if ( SLAP_LASTMOD( li->li_db ) ) {
/*
* Make sure we have a CSN before we release li_op_rmutex to preserve
* ordering
*/
if ( BER_BVISEMPTY( &op->o_csn ) ) {
slap_get_csn( &op2, &op2.o_csn, 1 );
} else {
if ( !( lo->mask & LOG_OP_WRITES ) ) {
Debug( LDAP_DEBUG_ANY, "%s accesslog_response: "
"the op had a CSN assigned, if you're replicating the "
"accesslog at %s, you might lose changes\n",
op->o_log_prefix, li->li_db_suffix.bv_val );
assert(0);
}
op2.o_csn = op->o_csn;
}
}
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
old = li->li_old;
uuid = li->li_uuid;
li->li_old = NULL;
BER_BVZERO( &li->li_uuid );
ldap_pvt_thread_mutex_unlock( &li->li_op_rmutex );
/*
* ITS#9051 Technically LDAP_REFERRAL and LDAP_SASL_BIND_IN_PROGRESS
* are not errors, but they aren't really success either
......@@ -1906,11 +1927,10 @@ static int accesslog_response(Operation *op, SlapReply *rs) {
op2.o_req_ndn = e->e_nname;
op2.ora_e = e;
op2.o_callback = &nullsc;
op2.o_csn = op->o_csn;
/* contextCSN updates may still reach here */
op2.o_dont_replicate = op->o_dont_replicate;
if (( lo->mask & LOG_OP_WRITES ) && !BER_BVISEMPTY( &op->o_csn )) {
if ( lo->mask & LOG_OP_WRITES ) {
struct berval maxcsn;
char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
int foundit;
......@@ -1939,8 +1959,7 @@ static int accesslog_response(Operation *op, SlapReply *rs) {
if ( e == op2.ora_e ) entry_free( e );
e = NULL;
/* TODO: What to do about minCSN when we have an op without a CSN? */
if ( !BER_BVISEMPTY( &op->o_csn ) ) {
if ( ( lo->mask & LOG_OP_WRITES ) && !BER_BVISEMPTY( &op->o_csn ) ) {
Modifications mod;
int i, sid = slap_parse_csn_sid( &op->o_csn );
......@@ -1993,10 +2012,16 @@ static int accesslog_response(Operation *op, SlapReply *rs) {
}
done:
if ( lo->mask & LOG_OP_WRITES )
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
if ( old ) entry_free( old );
return SLAP_CB_CONTINUE;
skip:
if ( lo->mask & LOG_OP_WRITES ) {
/* We haven't transitioned to li_log_mutex yet */
ldap_pvt_thread_mutex_unlock( &li->li_op_rmutex );
}
return SLAP_CB_CONTINUE;
}
static int
......@@ -2023,8 +2048,8 @@ accesslog_op_mod( Operation *op, SlapReply *rs )
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
log_info *li = on->on_bi.bi_private;
slap_verbmasks *lo;
slap_callback *cb;
int logop;
int doit = 0;
/* These internal ops are not logged */
if ( op->o_dont_replicate )
......@@ -2037,67 +2062,63 @@ accesslog_op_mod( Operation *op, SlapReply *rs )
logop = accesslog_op2logop( op );
lo = logops+logop+EN_OFFSET;
if ( li->li_ops & lo->mask ) {
doit = 1;
} else {
/* Ignore these internal reads */
if (( lo->mask & LOG_OP_READS ) && op->o_do_not_cache ) {
return SLAP_CB_CONTINUE;
}
if ( !( li->li_ops & lo->mask )) {
log_base *lb;
int i = 0;
for ( lb = li->li_bases; lb; lb = lb->lb_next )
if (( lb->lb_ops & lo->mask ) && dnIsSuffix( &op->o_req_ndn, &lb->lb_base )) {
doit = 1;
i = 1;
break;
}
if ( !i )
return SLAP_CB_CONTINUE;
}
if ( doit ) {
slap_callback *cb = op->o_tmpcalloc( 1, sizeof( slap_callback ), op->o_tmpmemctx );
cb->sc_cleanup = accesslog_response;
cb->sc_response = accesslog_response;
cb->sc_private = on;
cb->sc_next = op->o_callback;
op->o_callback = cb;
#ifdef RMUTEX_DEBUG
Debug( LDAP_DEBUG_SYNC,
"accesslog_op_mod: locking rmutex for tid %x\n",
op->o_tid );
#endif
ldap_pvt_thread_mutex_lock( &li->li_op_rmutex );
#ifdef RMUTEX_DEBUG
Debug( LDAP_DEBUG_STATS,
"accesslog_op_mod: locked rmutex for tid %x\n",
op->o_tid );
#endif
if ( li->li_oldf && ( op->o_tag == LDAP_REQ_DELETE ||
op->o_tag == LDAP_REQ_MODIFY ||
( op->o_tag == LDAP_REQ_MODRDN && li->li_oldattrs )))
{
int rc;
Entry *e;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, &op->o_req_ndn, NULL, NULL, 0, &e );
if ( e ) {
if ( test_filter( op, e, li->li_oldf ) == LDAP_COMPARE_TRUE )
li->li_old = entry_dup( e );
be_entry_release_rw( op, e, 0 );
}
op->o_bd->bd_info = (BackendInfo *)on;
} else {
int rc;
Entry *e;
cb = op->o_tmpcalloc( 1, sizeof( slap_callback ), op->o_tmpmemctx );
cb->sc_cleanup = accesslog_response;
cb->sc_response = accesslog_response;
cb->sc_private = on;
cb->sc_next = op->o_callback;
op->o_callback = cb;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, &op->o_req_ndn, NULL, NULL, 0, &e );
if ( e ) {
Attribute *a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a ) {
ber_dupbv( &li->li_uuid, &a->a_vals[0] );
}
be_entry_release_rw( op, e, 0 );
ldap_pvt_thread_mutex_lock( &li->li_op_rmutex );
if ( li->li_oldf && ( op->o_tag == LDAP_REQ_DELETE ||
op->o_tag == LDAP_REQ_MODIFY ||
( op->o_tag == LDAP_REQ_MODRDN && li->li_oldattrs )))
{
int rc;
Entry *e;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, &op->o_req_ndn, NULL, NULL, 0, &e );
if ( e ) {
if ( test_filter( op, e, li->li_oldf ) == LDAP_COMPARE_TRUE )
li->li_old = entry_dup( e );
be_entry_release_rw( op, e, 0 );
}
op->o_bd->bd_info = (BackendInfo *)on;
} else {
int rc;
Entry *e;
op->o_bd->bd_info = (BackendInfo *)on->on_info;
rc = be_entry_get_rw( op, &op->o_req_ndn, NULL, NULL, 0, &e );
if ( e ) {
Attribute *a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a ) {
ber_dupbv( &li->li_uuid, &a->a_vals[0] );
}
op->o_bd->bd_info = (BackendInfo *)on;
be_entry_release_rw( op, e, 0 );
}
op->o_bd->bd_info = (BackendInfo *)on;
}
return SLAP_CB_CONTINUE;
}
......@@ -2109,43 +2130,76 @@ static int
accesslog_unbind( Operation *op, SlapReply *rs )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
if ( op->o_conn->c_authz_backend == on->on_info->oi_origdb ) {
log_info *li = on->on_bi.bi_private;
Operation op2 = {0};
void *cids[SLAP_MAX_CIDS];
SlapReply rs2 = {REP_RESULT};
Entry *e;
log_info *li = on->on_bi.bi_private;
Operation op2 = {};
char csnbuf[LDAP_PVT_CSNSTR_BUFSIZE];
void *cids[SLAP_MAX_CIDS];
SlapReply rs2 = {REP_RESULT};
Entry *e;
if ( !( li->li_ops & LOG_OP_UNBIND )) {
log_base *lb;
int i = 0;
if ( op->o_conn->c_authz_backend != on->on_info->oi_origdb )
return SLAP_CB_CONTINUE;
for ( lb = li->li_bases; lb; lb=lb->lb_next )
if (( lb->lb_ops & LOG_OP_UNBIND ) && dnIsSuffix( &op->o_ndn, &lb->lb_base )) {
i = 1;
break;
}
if ( !i )
return SLAP_CB_CONTINUE;
}
e = accesslog_entry( op, rs, li, LOG_EN_UNBIND, &op2 );
op2.o_hdr = op->o_hdr;
op2.o_tag = LDAP_REQ_ADD;
op2.o_bd = li->li_db;
op2.o_dn = li->li_db->be_rootdn;
op2.o_ndn = li->li_db->be_rootndn;
op2.o_req_dn = e->e_name;
op2.o_req_ndn = e->e_nname;
op2.ora_e = e;
op2.o_callback = &nullsc;
op2.o_controls = cids;
memset(cids, 0, sizeof( cids ));
op2.o_bd->be_add( &op2, &rs2 );
if ( e == op2.ora_e )
entry_free( e );
if ( !( li->li_ops & LOG_OP_UNBIND ) ) {
log_base *lb;
int i = 0;
for ( lb = li->li_bases; lb; lb=lb->lb_next )
if (( lb->lb_ops & LOG_OP_UNBIND ) && dnIsSuffix( &op->o_ndn, &lb->lb_base )) {
i = 1;
break;
}
if ( !i )
return SLAP_CB_CONTINUE;
}
ldap_pvt_thread_mutex_lock( &li->li_op_rmutex );
if ( SLAP_LASTMOD( li->li_db ) ) {
/*
* Make sure we have a CSN before we release li_op_rmutex to preserve
* ordering
*/
if ( BER_BVISEMPTY( &op->o_csn ) ) {
op2.o_csn.bv_val = csnbuf;
op2.o_csn.bv_len = sizeof(csnbuf);
slap_get_csn( &op2, &op2.o_csn, 1 );
} else {
Debug( LDAP_DEBUG_ANY, "%s accesslog_unbind: "
"the op had a CSN assigned, if you're replicating the "
"accesslog at %s, you might lose changes\n",
op->o_log_prefix, li->li_db_suffix.bv_val );
assert(0);
op2.o_csn = op->o_csn;
}
}
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
ldap_pvt_thread_mutex_unlock( &li->li_op_rmutex );
e = accesslog_entry( op, rs, li, LOG_EN_UNBIND, &op2 );
op2.o_hdr = op->o_hdr;
op2.o_tag = LDAP_REQ_ADD;
op2.o_bd = li->li_db;
op2.o_dn = li->li_db->be_rootdn;
op2.o_ndn = li->li_db->be_rootndn;
op2.o_req_dn = e->e_name;
op2.o_req_ndn = e->e_nname;
op2.ora_e = e;
op2.o_callback = &nullsc;
op2.o_controls = cids;
memset(cids, 0, sizeof( cids ));
op2.o_bd->be_add( &op2, &rs2 );
if ( rs2.sr_err != LDAP_SUCCESS ) {
Debug( LDAP_DEBUG_SYNC, "%s accesslog_unbind: "
"got result 0x%x adding log entry %s\n",
op->o_log_prefix, rs2.sr_err, op2.o_req_dn.bv_val );
}
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
if ( e == op2.ora_e )
entry_free( e );
return SLAP_CB_CONTINUE;
}
......@@ -2158,6 +2212,7 @@ accesslog_abandon( Operation *op, SlapReply *rs )
void *cids[SLAP_MAX_CIDS];
SlapReply rs2 = {REP_RESULT};
Entry *e;
char csnbuf[LDAP_PVT_CSNSTR_BUFSIZE];
char buf[64];
struct berval bv;
......@@ -2177,6 +2232,28 @@ accesslog_abandon( Operation *op, SlapReply *rs )
return SLAP_CB_CONTINUE;
}
ldap_pvt_thread_mutex_lock( &li->li_op_rmutex );
if ( SLAP_LASTMOD( li->li_db ) ) {
/*
* Make sure we have a CSN before we release li_op_rmutex to preserve
* ordering
*/
if ( BER_BVISEMPTY( &op->o_csn ) ) {
op2.o_csn.bv_val = csnbuf;
op2.o_csn.bv_len = sizeof(csnbuf);
slap_get_csn( &op2, &op2.o_csn, 1 );
} else {
Debug( LDAP_DEBUG_ANY, "%s accesslog_abandon: "
"the op had a CSN assigned, if you're replicating the "
"accesslog at %s, you might lose changes\n",
op->o_log_prefix, li->li_db_suffix.bv_val );
assert(0);
op2.o_csn = op->o_csn;
}
}
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
ldap_pvt_thread_mutex_unlock( &li->li_op_rmutex );
e = accesslog_entry( op, rs, li, LOG_EN_ABANDON, &op2 );
bv.bv_val = buf;
bv.bv_len = snprintf( buf, sizeof( buf ), "%d", op->orn_msgid );
......@@ -2197,6 +2274,12 @@ accesslog_abandon( Operation *op, SlapReply *rs )
memset(cids, 0, sizeof( cids ));
op2.o_bd->be_add( &op2, &rs2 );
if ( rs2.sr_err != LDAP_SUCCESS ) {
Debug( LDAP_DEBUG_SYNC, "%s accesslog_abandon: "
"got result 0x%x adding log entry %s\n",
op->o_log_prefix, rs2.sr_err, op2.o_req_dn.bv_val );
}
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
if ( e == op2.ora_e )
entry_free( e );
......@@ -2405,6 +2488,11 @@ accesslog_db_root(
op->o_callback = &nullsc;
SLAP_DBFLAGS( op->o_bd ) |= SLAP_DBFLAG_NOLASTMOD;
rc = op->o_bd->be_add( op, &rs );
if ( rs.sr_err != LDAP_SUCCESS ) {
Debug( LDAP_DEBUG_SYNC, "%s accesslog_db_root: "
"got result 0x%x adding log root entry %s\n",
op->o_log_prefix, rs.sr_err, op->o_req_dn.bv_val );
}
if ( e == op->ora_e )
entry_free( e );
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment