Skip to content
Snippets Groups Projects
Commit 6f08468e authored by Howard Chu's avatar Howard Chu
Browse files

Fix concurrency issues

parent 40f818da
No related branches found
No related tags found
No related merge requests found
......@@ -49,6 +49,7 @@ struct query_template_s;
typedef struct Qbase_s {
Avlnode *scopes[4]; /* threaded AVL trees of cached queries */
struct berval base;
int queries;
} Qbase;
/* struct representing a cached query */
......@@ -106,7 +107,7 @@ struct query_manager_s;
* 2) query addition, 3) cache replacement
*/
typedef CachedQuery * (QCfunc)(Operation *op, struct query_manager_s*, Query*, QueryTemplate*);
typedef void (AddQueryfunc)(struct query_manager_s*, Query*, QueryTemplate*, struct berval*);
typedef CachedQuery * (AddQueryfunc)(Operation *op, struct query_manager_s*, Query*, QueryTemplate*, int positive);
typedef void (CRfunc)(struct query_manager_s*, struct berval * );
/* LDAP query cache */
......@@ -233,6 +234,18 @@ static int pcache_dn_cmp( const void *v1, const void *v2 )
return rc;
}
static int lex_bvcmp( struct berval *bv1, struct berval *bv2 )
{
int len, dif;
dif = bv1->bv_len - bv2->bv_len;
len = bv1->bv_len;
if ( dif > 0 ) len -= dif;
len = memcmp( bv1->bv_val, bv2->bv_val, len );
if ( !len )
len = dif;
return len;
}
/* compare the first value in each filter */
static int pcache_filter_cmp( const void *v1, const void *v2 )
{
......@@ -268,15 +281,14 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
switch( weight1 ) {
case 0: return 0;
case 1:
rc = ber_bvcmp( &q1->first->f_ava->aa_value,
&q2->first->f_ava->aa_value );
rc = lex_bvcmp( &q1->first->f_av_value, &q2->first->f_av_value );
break;
case 2:
if ( q1->first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
rc = 0;
if ( !BER_BVISNULL( &q1->first->f_sub_initial )) {
if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
rc = ber_bvcmp( &q1->first->f_sub_initial,
rc = lex_bvcmp( &q1->first->f_sub_initial,
&q2->first->f_sub_initial );
} else {
rc = 1;
......@@ -287,7 +299,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
if ( rc ) break;
if ( q1->first->f_sub_any ) {
if ( q2->first->f_sub_any ) {
rc = ber_bvcmp( q1->first->f_sub_any,
rc = lex_bvcmp( q1->first->f_sub_any,
q2->first->f_sub_any );
} else {
rc = 1;
......@@ -298,7 +310,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
if ( rc ) break;
if ( !BER_BVISNULL( &q1->first->f_sub_final )) {
if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
rc = ber_bvcmp( &q1->first->f_sub_final,
rc = lex_bvcmp( &q1->first->f_sub_final,
&q2->first->f_sub_final );
} else {
rc = 1;
......@@ -307,7 +319,7 @@ static int pcache_filter_cmp( const void *v1, const void *v2 )
rc = -1;
}
} else {
rc = ber_bvcmp( &q1->first->f_mr_value,
rc = lex_bvcmp( &q1->first->f_mr_value,
&q2->first->f_mr_value );
}
break;
......@@ -322,7 +334,6 @@ static void
add_query_on_top (query_manager* qm, CachedQuery* qc)
{
CachedQuery* top = qm->lru_top;
Query* q = (Query*)qc;
qm->lru_top = qc;
......@@ -334,7 +345,7 @@ add_query_on_top (query_manager* qm, CachedQuery* qc)
qc->lru_down = top;
qc->lru_up = NULL;
Debug( pcache_debug, "Base of added query = %s\n",
q->base.bv_val, 0, 0 );
qc->qbase->base.bv_val, 0, 0 );
}
/* remove_query from LRU list */
......@@ -801,21 +812,23 @@ free_query (CachedQuery* qc)
/* Add query to query cache */
static void add_query(
static CachedQuery * add_query(
Operation *op,
query_manager* qm,
Query* query,
QueryTemplate *templ,
struct berval* uuid)
int positive)
{
CachedQuery* new_cached_query = (CachedQuery*) ch_malloc(sizeof(CachedQuery));
Qbase *qbase, qb;
Filter *first;
int rc;
new_cached_query->qtemp = templ;
if ( uuid ) {
new_cached_query->q_uuid = *uuid;
BER_BVZERO( &new_cached_query->q_uuid );
if ( positive ) {
new_cached_query->expiry_time = slap_get_time() + templ->ttl;
} else {
BER_BVZERO( &new_cached_query->q_uuid );
new_cached_query->expiry_time = slap_get_time() + templ->negttl;
}
new_cached_query->lru_up = NULL;
......@@ -825,7 +838,7 @@ static void add_query(
new_cached_query->scope = query->scope;
new_cached_query->filter = query->filter;
new_cached_query->first = filter_first( query->filter );
new_cached_query->first = first = filter_first( query->filter );
qb.base = query->base;
......@@ -842,17 +855,25 @@ static void add_query(
qbase->base.bv_val[qbase->base.bv_len] = '\0';
avl_insert( &templ->qbase, qbase, pcache_dn_cmp, avl_dup_error );
}
if (templ->query == NULL)
templ->query_last = new_cached_query;
else
templ->query->prev = new_cached_query;
new_cached_query->next = templ->query;
new_cached_query->prev = NULL;
new_cached_query->qbase = qbase;
tavl_insert( &qbase->scopes[query->scope], new_cached_query,
pcache_filter_cmp, avl_dup_ok );
templ->query = new_cached_query;
templ->no_of_queries++;
rc = tavl_insert( &qbase->scopes[query->scope], new_cached_query,
pcache_filter_cmp, avl_dup_error );
if ( rc == 0 ) {
qbase->queries++;
if (templ->query == NULL)
templ->query_last = new_cached_query;
else
templ->query->prev = new_cached_query;
templ->query = new_cached_query;
templ->no_of_queries++;
} else {
ch_free( new_cached_query );
new_cached_query = find_filter( op, qbase->scopes[query->scope],
query->filter, first );
filter_free( query->filter );
}
Debug( pcache_debug, "TEMPLATE %p QUERIES++ %d\n",
templ, templ->no_of_queries, 0 );
......@@ -861,9 +882,12 @@ static void add_query(
ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
/* Adding on top of LRU list */
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
add_query_on_top(qm, new_cached_query);
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
if ( rc == 0 ) {
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
add_query_on_top(qm, new_cached_query);
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
}
return rc == 0 ? new_cached_query : NULL;
}
static void
......@@ -882,10 +906,8 @@ remove_from_template (CachedQuery* qc, QueryTemplate* template)
qc->prev->next = qc->next;
}
tavl_delete( &qc->qbase->scopes[qc->scope], qc, pcache_filter_cmp );
if ( qc->qbase->scopes[0] == NULL &&
qc->qbase->scopes[1] == NULL &&
qc->qbase->scopes[2] == NULL &&
qc->qbase->scopes[3] == NULL ) {
qc->qbase->queries--;
if ( qc->qbase->queries == 0 ) {
avl_delete( &template->qbase, qc->qbase, pcache_dn_cmp );
ch_free( qc->qbase );
qc->qbase = NULL;
......@@ -1272,29 +1294,40 @@ pcache_response(
}
} else if ( rs->sr_type == REP_RESULT ) {
QueryTemplate* templ = si->qtemp;
if (( si->count && cache_entries( op, rs, &uuid ) == 0 ) ||
( templ->negttl && !si->count && !si->over &&
if ( si->count ||
( si->qtemp->negttl && !si->count && !si->over &&
rs->sr_err == LDAP_SUCCESS )) {
qm->addfunc(qm, &si->query, si->qtemp,
si->count ? &uuid : NULL);
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
cm->num_cached_queries++;
Debug( pcache_debug, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
CachedQuery *qc = qm->addfunc(op, qm, &si->query, si->qtemp,
si->count);
if ( qc != NULL ) {
if ( si->count )
cache_entries( op, rs, &qc->q_uuid );
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
cm->num_cached_queries++;
Debug( pcache_debug, "STORED QUERIES = %lu\n",
cm->num_cached_queries, 0, 0 );
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
/* If the consistency checker suspended itself,
* wake it back up
*/
if ( cm->cc_paused ) {
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
/* If the consistency checker suspended itself,
* wake it back up
*/
if ( cm->cc_paused ) {
cm->cc_paused = 0;
ldap_pvt_runqueue_resched( &slapd_rq, cm->cc_arg, 0 );
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
if ( cm->cc_paused ) {
cm->cc_paused = 0;
ldap_pvt_runqueue_resched( &slapd_rq, cm->cc_arg, 0 );
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
} else if ( si->count ) {
/* duplicate query, free it */
Entry *e;
for (;si->head; si->head=e) {
e = si->head->e_private;
si->head->e_private = NULL;
entry_free(si->head);
}
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
}
} else {
filter_free( si->query.filter );
......@@ -1453,7 +1486,7 @@ pcache_op_search(
continue;
cacheable = 1;
qtemp = qt;
Debug( LDAP_DEBUG_NONE, "Entering QC, querystr = %s\n",
Debug( pcache_debug, "Entering QC, querystr = %s\n",
op->ors_filterstr.bv_val, 0, 0 );
answerable = (*(qm->qcfunc))(op, qm, &query, qt);
......@@ -1471,7 +1504,6 @@ pcache_op_search(
Debug( pcache_debug, "QUERY ANSWERABLE\n", 0, 0, 0 );
op->o_tmpfree( filter_attrs, op->o_tmpmemctx );
ldap_pvt_thread_rdwr_runlock(&qtemp->t_rwlock);
if ( BER_BVISNULL( &answerable->q_uuid )) {
/* No entries cached, just an empty result set */
i = rs->sr_err = 0;
......@@ -1481,6 +1513,7 @@ pcache_op_search(
op->o_callback = NULL;
i = cm->db.bd_info->bi_op_search( op, rs );
}
ldap_pvt_thread_rdwr_runlock(&qtemp->t_rwlock);
op->o_bd = save_bd;
op->o_callback = save_cb;
return i;
......@@ -1605,7 +1638,7 @@ consistency_check(
Operation *op;
SlapReply rs = {REP_RESULT};
CachedQuery* query, *query_prev;
CachedQuery* query;
int return_val, pause = 1;
QueryTemplate* templ;
......@@ -1623,15 +1656,23 @@ consistency_check(
if ( query ) pause = 0;
op->o_time = slap_get_time();
while (query && (query->expiry_time < op->o_time)) {
int rem = 0;
Debug( pcache_debug, "Lock CR index = %p\n",
templ, 0, 0 );
ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
remove_from_template(query, templ);
Debug( pcache_debug, "TEMPLATE %p QUERIES-- %d\n",
templ, templ->no_of_queries, 0 );
Debug( pcache_debug, "Unlock CR index = %p\n",
templ, 0, 0 );
if ( query == templ->query_last ) {
rem = 1;
remove_from_template(query, templ);
Debug( pcache_debug, "TEMPLATE %p QUERIES-- %d\n",
templ, templ->no_of_queries, 0 );
Debug( pcache_debug, "Unlock CR index = %p\n",
templ, 0, 0 );
}
ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
if ( !rem ) {
query = templ->query_last;
continue;
}
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
remove_query(qm, query);
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
......@@ -1651,9 +1692,8 @@ consistency_check(
"STALE QUERY REMOVED, CACHE ="
"%d entries\n",
cm->cur_entries, 0, 0 );
query_prev = query;
query = query->prev;
free_query(query_prev);
free_query(query);
query = templ->query_last;
}
}
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
......@@ -2043,8 +2083,6 @@ pc_cf_gen( ConfigArgs *c )
Debug( pcache_debug, "\t%s\n",
attrarray[i].an_name.bv_val, 0, 0 );
}
temp++;
temp->querystr.bv_val = NULL;
break;
case PC_RESP:
if ( strcasecmp( c->argv[1], "head" ) == 0 ) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment