Commit 8d666c3e authored by Quanah Gibson-Mount's avatar Quanah Gibson-Mount
Browse files

ITS#5985

parent a2ad6dd1
......@@ -21,6 +21,7 @@ OpenLDAP 2.4.16 Engineering
Fixed slapo-dynlist conversion to cn=config (ITS#6002)
Fixed slapo-syncprov newCookie sync messages (ITS#5972)
Fixed slapd-syncprov too many MMR messages (ITS#6020)
Fixed slapo-syncprov replica lockout (ITS#5985)
Build Environment
Cleaned up alloc/free functions for Windows (ITS#6005)
Documentation
......
......@@ -71,12 +71,11 @@ typedef struct syncops {
#define PS_WROTE_BASE 0x04
#define PS_FIND_BASE 0x08
#define PS_FIX_FILTER 0x10
#define PS_TASK_QUEUED 0x20
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
struct re_s *s_qtask; /* task for playing psearch responses */
#define RUNQ_INTERVAL 36000 /* a long time */
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
......@@ -748,13 +747,6 @@ syncprov_free_syncop( syncops *so )
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return;
}
if ( so->s_qtask ) {
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) )
ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
if ( so->s_flags & PS_IS_DETACHED ) {
filter_free( so->s_op->ors_filter );
......@@ -861,11 +853,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
return rs.sr_err;
}
static void
syncprov_qstart( syncops *so );
/* Play back queued responses */
static int
syncprov_qplay( Operation *op, struct re_s *rtask )
syncprov_qplay( Operation *op, syncops *so )
{
syncops *so = rtask->arg;
slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
syncres *sr;
Entry *e;
......@@ -874,7 +868,7 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
opc.son = on;
for (;;) {
do {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
sr = so->s_res;
if ( sr )
......@@ -918,37 +912,31 @@ syncprov_qplay( Operation *op, struct re_s *rtask )
ch_free( sr );
if ( rc ) {
/* Exit loop with mutex held */
ldap_pvt_thread_mutex_lock( &so->s_mutex );
break;
}
}
/* Exit loop with mutex held */
ldap_pvt_thread_mutex_lock( &so->s_mutex );
/* wait until we get explicitly scheduled again */
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
if ( rc == 0 ) {
ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
} else {
/* bail out on any error */
ldap_pvt_runqueue_remove( &slapd_rq, rtask );
} while (0);
/* We now only send one change at a time, to prevent one
* psearch from hogging all the CPU. Resubmit this task if
* there are more responses queued and no errors occurred.
*/
/* Prevent duplicate remove */
if ( so->s_qtask == rtask )
so->s_qtask = NULL;
if ( rc == 0 && so->s_res ) {
syncprov_qstart( so );
} else {
so->s_flags ^= PS_TASK_QUEUED;
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return rc;
}
/* runqueue task for playing back queued responses */
/* task for playing back queued responses */
static void *
syncprov_qtask( void *ctx, void *arg )
{
struct re_s *rtask = arg;
syncops *so = rtask->arg;
syncops *so = arg;
OperationBuffer opbuf;
Operation *op;
BackendDB be;
......@@ -973,22 +961,11 @@ syncprov_qtask( void *ctx, void *arg )
LDAP_SLIST_FIRST(&op->o_extra) = NULL;
op->o_callback = NULL;
rc = syncprov_qplay( op, rtask );
rc = syncprov_qplay( op, so );
/* decrement use count... */
syncprov_free_syncop( so );
#if 0 /* FIXME: connection_close isn't exported from slapd.
* should it be?
*/
if ( rc ) {
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
if ( connection_state_closing( op->o_conn )) {
connection_close( op->o_conn );
}
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
}
#endif
return NULL;
}
......@@ -996,27 +973,10 @@ syncprov_qtask( void *ctx, void *arg )
static void
syncprov_qstart( syncops *so )
{
int wake=0;
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
if ( !so->s_qtask ) {
so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
syncprov_qtask, so, "syncprov_qtask",
so->s_op->o_conn->c_peer_name.bv_val );
++so->s_inuse;
wake = 1;
} else {
if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
!so->s_qtask->next_sched.tv_sec ) {
so->s_qtask->interval.tv_sec = 0;
ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
++so->s_inuse;
wake = 1;
}
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
if ( wake )
slap_wake_listener();
so->s_flags |= PS_TASK_QUEUED;
so->s_inuse++;
ldap_pvt_thread_pool_submit( &connection_pool,
syncprov_qtask, so );
}
/* Queue a persistent search response */
......@@ -1076,7 +1036,7 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode )
so->s_flags ^= PS_WROTE_BASE;
so->s_flags |= PS_FIND_BASE;
}
if ( so->s_flags & PS_IS_DETACHED ) {
if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) {
syncprov_qstart( so );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment