Commit 8f35dc45 authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

ITS#9720 Manage the purge task properly

parent 2a6b1629
......@@ -68,6 +68,8 @@ typedef struct log_base {
typedef struct log_info {
BackendDB *li_db;
struct berval li_db_suffix;
int li_open;
slap_mask_t li_ops;
int li_age;
int li_cycle;
......@@ -84,8 +86,8 @@ typedef struct log_info {
/*
* Allow partial concurrency, main operation processing serialised with
* li_op_rmutex (there might be multiple such in progress by the same
* thread at a time, think overlays), the actual logging serialised with
* li_log_mutex.
* thread at a time, think overlays), the actual logging and mincsn
* management are serialised with li_log_mutex.
*
* ITS#9538:
* Any CSN assignment should happen while li_op_rmutex is held and
......@@ -652,6 +654,7 @@ log_old_lookup( Operation *op, SlapReply *rs )
/* Find the correct sid */
sid = slap_parse_csn_sid( &a->a_nvals[0] );
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
for ( i=0; i < li->li_numcsns; i++ ) {
if ( sid <= li->li_sids[i] ) break;
}
......@@ -668,6 +671,7 @@ log_old_lookup( Operation *op, SlapReply *rs )
pd->mincsn_updated = 1;
AC_MEMCPY( li->li_mincsn[i].bv_val, a->a_nvals[0].bv_val, len );
}
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
}
if ( pd->used >= pd->slots ) {
pd->slots += PURGE_INCREMENT;
......@@ -744,6 +748,7 @@ accesslog_purge( void *ctx, void *arg )
if ( pd.mincsn_updated ) {
Modifications mod;
/* update context's minCSN to reflect oldest CSN */
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
mod.sml_numvals = li->li_numcsns;
mod.sml_values = li->li_mincsn;
mod.sml_nvalues = NULL;
......@@ -765,6 +770,7 @@ accesslog_purge( void *ctx, void *arg )
li->li_numcsns );
op->o_bd->be_modify( op, &rs );
}
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
}
/* delete the expired entries */
......@@ -995,7 +1001,7 @@ log_cf_gen(ConfigArgs *c)
struct re_s *re = li->li_task;
if ( re )
re->interval.tv_sec = li->li_cycle;
else {
else if ( li->li_open ) {
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
li->li_task = ldap_pvt_runqueue_insert( &slapd_rq,
li->li_cycle, accesslog_purge, li,
......@@ -2381,6 +2387,7 @@ accesslog_db_root(
Entry *e;
int rc;
ldap_pvt_thread_mutex_lock( &li->li_log_mutex );
connection_fake_init( &conn, &opbuf, ctx );
op = &opbuf.ob_op;
op->o_bd = li->li_db;
......@@ -2501,9 +2508,19 @@ accesslog_db_root(
if ( e == op->ora_e )
entry_free( e );
}
li->li_open = 1;
ldap_pvt_thread_mutex_unlock( &li->li_log_mutex );
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
ldap_pvt_runqueue_remove( &slapd_rq, rtask );
if ( li->li_age && li->li_cycle ) {
assert( li->li_task == NULL );
li->li_task = ldap_pvt_runqueue_insert( &slapd_rq,
li->li_cycle, accesslog_purge, li,
"accesslog_purge", li->li_db->be_suffix[0].bv_val );
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
......@@ -2551,6 +2568,30 @@ accesslog_db_open(
return 0;
}
static int
accesslog_db_close(
BackendDB *be,
ConfigReply *cr
)
{
slap_overinst *on = (slap_overinst *)be->bd_info;
log_info *li = on->on_bi.bi_private;
struct re_s *re = li->li_task;
li->li_open = 0;
if ( re ) {
li->li_task = NULL;
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, re ) )
ldap_pvt_runqueue_stoptask( &slapd_rq, re );
ldap_pvt_runqueue_remove( &slapd_rq, re );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
return 0;
}
enum { start = 0 };
static int
......@@ -2717,6 +2758,7 @@ int accesslog_initialize()
accesslog.on_bi.bi_db_init = accesslog_db_init;
accesslog.on_bi.bi_db_destroy = accesslog_db_destroy;
accesslog.on_bi.bi_db_open = accesslog_db_open;
accesslog.on_bi.bi_db_close = accesslog_db_close;
accesslog.on_bi.bi_op_add = accesslog_op_mod;
accesslog.on_bi.bi_op_bind = accesslog_op_misc;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment