Commit 50039818 authored by Nadezhda Ivanova's avatar Nadezhda Ivanova Committed by Howard Chu
Browse files

ITS#8303 Track pending ops per a_metasingleconn_t

so that unused target connections can be properly reset.
parent 26f88817
......@@ -360,8 +360,9 @@ asyncmeta_back_add( Operation *op, SlapReply *rs )
assert( 0 );
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_one_listener(mc, candidates, candidate);
asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
......@@ -192,7 +192,7 @@ struct a_metatarget_t;
typedef struct bm_context_t {
LDAP_SLIST_ENTRY(bm_context_t) bc_next;
time_t timeout;
time_t stoptime;
time_t stoptime;
ldap_back_send_t sendok;
ldap_back_send_t retrying;
int candidate_match;
......@@ -202,7 +202,8 @@ typedef struct bm_context_t {
int is_ok;
SlapReply rs;
Operation *op;
LDAPControl **ctrls;
LDAPControl **ctrls;
int *msgids;
SlapReply *candidates;
} bm_context_t;
......@@ -231,7 +232,7 @@ typedef struct a_metasingleconn_t {
/* NOTE: lc_lcflags is redefined to msc_mscflags to reuse the macros
* defined for back-ldap */
#define lc_lcflags msc_mscflags
int msc_pending_ops;
int msc_timeout_ops;
/* Connection for the select */
Connection *conn;
......@@ -773,8 +774,8 @@ void
asyncmeta_send_result(bm_context_t* bc, int error, char *text);
int asyncmeta_new_bm_context(Operation *op, SlapReply *rs, bm_context_t **new_bc, int ntargets);
int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates);
int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, int candidate);
int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc);
int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc, int candidate);
meta_search_candidate_t
asyncmeta_back_search_start(
......
......@@ -285,5 +285,6 @@ asyncmeta_clear_one_msc(
msc->msc_time = 0;
msc->msc_mscflags = 0;
msc->msc_timeout_ops = 0;
msc->msc_pending_ops = 0;
return 0;
}
......@@ -285,7 +285,7 @@ asyncmeta_back_compare( Operation *op, SlapReply *rs )
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_one_listener(mc, candidates, candidate);
asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
......@@ -1295,16 +1295,19 @@ asyncmeta_get_next_mc( a_metainfo_t *mi )
return mc;
}
int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates)
int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc)
{
int i;
for (i = 0; i < mc->mc_info->mi_ntargets; i++) {
asyncmeta_start_one_listener(mc, candidates, i);
asyncmeta_start_one_listener(mc, candidates, bc, i);
}
return LDAP_SUCCESS;
}
int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, int candidate)
int asyncmeta_start_one_listener(a_metaconn_t *mc,
SlapReply *candidates,
bm_context_t *bc,
int candidate)
{
a_metasingleconn_t *msc;
ber_socket_t s;
......@@ -1314,6 +1317,8 @@ int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, int ca
if (msc->msc_ld == NULL || !META_IS_CANDIDATE( &candidates[ candidate ] )) {
return LDAP_SUCCESS;
}
bc->msgids[candidate] = candidates[candidate].sr_msgid;
msc->msc_pending_ops++;
if ( msc->conn == NULL) {
ldap_get_option( msc->msc_ld, LDAP_OPT_DESC, &s );
if (s < 0) {
......
......@@ -233,7 +233,7 @@ asyncmeta_back_delete( Operation *op, SlapReply *rs )
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_one_listener(mc, candidates, candidate);
asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
......@@ -281,7 +281,7 @@ static void asyncmeta_memctx_put(void *threadctx, void *memctx)
int asyncmeta_new_bm_context(Operation *op, SlapReply *rs, bm_context_t **new_bc, int ntargets)
{
void *oldctx = op->o_tmpmemctx;
int i;
/* prevent old memctx from being destroyed */
slap_sl_mem_setctx(op->o_threadctx, NULL);
/* create new memctx */
......@@ -290,6 +290,10 @@ int asyncmeta_new_bm_context(Operation *op, SlapReply *rs, bm_context_t **new_bc
(*new_bc)->op = asyncmeta_copy_op(op);
(*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
(*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
for (i = 0; i < ntargets; i++) {
(*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
}
/* restore original memctx */
slap_sl_mem_setctx(op->o_threadctx, oldctx);
op->o_tmpmemctx = oldctx;
......@@ -505,8 +509,15 @@ void
asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
{
bm_context_t *om;
int i;
LDAP_SLIST_FOREACH( om, &mc->mc_om_list, bc_next ) {
if (om == bc) {
for (i = 0; i < mc->mc_info->mi_ntargets; i++)
{
if (bc->msgids[i] >= 0) {
mc->mc_conns[i].msc_pending_ops--;
}
}
LDAP_SLIST_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
mc->pending_ops--;
break;
......
......@@ -1574,7 +1574,7 @@ again:
for (j=0; j<ntargets; j++) {
i++;
if (i >= ntargets) i = 0;
if (!mc->mc_conns[i].msc_ldr) continue;
if (!mc->mc_conns[i].msc_ldr || mc->mc_conns[i].msc_pending_ops <= 0) continue;
rc = ldap_result( mc->mc_conns[i].msc_ldr, LDAP_RES_ANY, LDAP_MSG_RECEIVED, &tv, &msg );
msc = &mc->mc_conns[i];
if (rc < 1) {
......@@ -1715,6 +1715,9 @@ void* asyncmeta_timeout_loop(void *ctx, void *arg)
a_metasingleconn_t *msc = &mc->mc_conns[j];
a_metatarget_t *mt = mi->mi_targets[j];
msc->msc_timeout_ops++;
if (bc->msgids[j] >= 0) {
msc->msc_pending_ops--;
}
asyncmeta_back_cancel( mc, op,
bc->candidates[ j ].sr_msgid, j );
if (!META_BACK_TGT_QUARANTINE( mt ) ||
......@@ -1741,13 +1744,14 @@ void* asyncmeta_timeout_loop(void *ctx, void *arg)
}
}
if (!mc->pending_ops && mi->mi_idle_timeout) {
if (mi->mi_idle_timeout) {
for (j=0; j<mi->mi_ntargets; j++) {
a_metasingleconn_t *msc = &mc->mc_conns[j];
if (msc->msc_pending_ops > 0) {
continue;
}
if (msc->msc_ld && msc->msc_time > 0 && msc->msc_time + mi->mi_idle_timeout <= current_time) {
if (mc->mc_active < 1) {
asyncmeta_clear_one_msc(NULL, mc, j);
}
asyncmeta_clear_one_msc(NULL, mc, j);
}
}
}
......
......@@ -362,7 +362,7 @@ asyncmeta_back_modify( Operation *op, SlapReply *rs )
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_one_listener(mc, candidates, candidate);
asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
......@@ -308,7 +308,7 @@ asyncmeta_back_modrdn( Operation *op, SlapReply *rs )
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_one_listener(mc, candidates, candidate);
asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
......@@ -442,7 +442,7 @@ asyncmeta_back_search( Operation *op, SlapReply *rs )
initial_candidates = 0, candidate_match = 0,
needbind = 0;
ldap_back_send_t sendok = LDAP_BACK_SENDERR;
long i;
long i,j;
int is_ok = 0;
void *savepriv;
SlapReply *candidates = NULL;
......@@ -484,8 +484,7 @@ asyncmeta_back_search( Operation *op, SlapReply *rs )
for ( i = 0; i < mi->mi_ntargets; i++ ) {
/* reset sr_msgid; it is used in most loops
* to check if that target is still to be considered */
candidates[ i ].sr_msgid = META_MSGID_UNDEFINED;
candidates[i].sr_msgid = META_MSGID_UNDEFINED;
/* a target is marked as candidate by asyncmeta_getconn();
* if for any reason (an error, it's over or so) it is
* no longer active, sr_msgid is set to META_MSGID_IGNORE
......@@ -673,7 +672,7 @@ asyncmeta_back_search( Operation *op, SlapReply *rs )
goto finish;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
asyncmeta_start_listeners(mc, candidates);
asyncmeta_start_listeners(mc, candidates, bc);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment