Skip to content
Snippets Groups Projects
Commit e6475734 authored by Howard Chu's avatar Howard Chu
Browse files

ITS#4549, rewritten query_containment etc...

parent 4cc18ee8
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,7 @@
#include "slap.h"
#include "lutil.h"
#include "ldap_rq.h"
#include "avl.h"
#include "config.h"
......@@ -45,9 +46,17 @@ typedef struct Query_s {
struct query_template_s;
typedef struct Qbase_s {
Avlnode *scopes[4]; /* threaded AVL trees of cached queries */
struct berval base;
} Qbase;
/* struct representing a cached query */
typedef struct cached_query_s {
Query query; /* LDAP query */
Filter *filter;
Filter *first;
Qbase *qbase;
int scope;
struct berval q_uuid; /* query identifier */
struct query_template_s *qtemp; /* template of the query */
time_t expiry_time; /* time till the query is considered valid */
......@@ -78,6 +87,7 @@ typedef struct query_template_s {
struct query_template_s *qtnext;
struct query_template_s *qmnext;
Avlnode* qbase;
CachedQuery* query; /* most recent query cached for the template */
CachedQuery* query_last; /* oldest query cached for the template */
ldap_pvt_thread_rdwr_t *t_rwlock; /* Rd/wr lock for accessing queries in the template */
......@@ -212,44 +222,99 @@ merge_entry(
return rc;
}
/* compare base and scope of incoming and cached queries */
static int base_scope_compare(
struct berval* ndn_stored,
struct berval* ndn_incoming,
int scope_stored,
int scope_incoming )
/* Length-ordered sort on normalized DNs */
static int pcache_dn_cmp( const void *v1, const void *v2 )
{
struct berval pdn_incoming = BER_BVNULL;
if (scope_stored < scope_incoming)
return 0;
const Qbase *q1 = v1, *q2 = v2;
if ( !dnIsSuffix(ndn_incoming, ndn_stored))
return 0;
switch(scope_stored) {
case LDAP_SCOPE_BASE:
return (ndn_incoming->bv_len == ndn_stored->bv_len);
case LDAP_SCOPE_ONELEVEL:
switch(scope_incoming){
case LDAP_SCOPE_BASE:
dnParent(ndn_incoming, &pdn_incoming);
return (pdn_incoming.bv_len == ndn_stored->bv_len);
int rc = q1->base.bv_len - q2->base.bv_len;
if ( rc == 0 )
rc = strncmp( q1->base.bv_val, q2->base.bv_val, q1->base.bv_len );
return rc;
}
case LDAP_SCOPE_ONELEVEL:
return (ndn_incoming->bv_len == ndn_stored->bv_len);
/* compare the first value in each filter */
static int pcache_filter_cmp( const void *v1, const void *v2 )
{
const CachedQuery *q1 = v1, *q2 =v2;
int rc, weight1, weight2;
default:
return 0;
}
case LDAP_SCOPE_SUBTREE:
return 1;
switch( q1->first->f_choice ) {
case LDAP_FILTER_PRESENT:
weight1 = 0;
break;
case LDAP_FILTER_EQUALITY:
case LDAP_FILTER_GE:
case LDAP_FILTER_LE:
weight1 = 1;
break;
default:
return 0;
weight1 = 2;
}
switch( q2->first->f_choice ) {
case LDAP_FILTER_PRESENT:
weight2 = 0;
break;
case LDAP_FILTER_EQUALITY:
case LDAP_FILTER_GE:
case LDAP_FILTER_LE:
weight2 = 1;
break;
}
default:
weight2 = 2;
}
rc = weight1 - weight2;
if ( !rc ) {
switch( weight1 ) {
case 0: return 0;
case 1:
rc = ber_bvcmp( &q1->first->f_ava->aa_value,
&q2->first->f_ava->aa_value );
break;
case 2:
if ( q1->first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
rc = 0;
if ( !BER_BVISNULL( &q1->first->f_sub_initial )) {
if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
rc = ber_bvcmp( &q1->first->f_sub_initial,
&q2->first->f_sub_initial );
} else {
rc = 1;
}
} else if ( !BER_BVISNULL( &q2->first->f_sub_initial )) {
rc = -1;
}
if ( rc ) break;
if ( q1->first->f_sub_any ) {
if ( q2->first->f_sub_any ) {
rc = ber_bvcmp( q1->first->f_sub_any,
q2->first->f_sub_any );
} else {
rc = 1;
}
} else if ( q2->first->f_sub_any ) {
rc = -1;
}
if ( rc ) break;
if ( !BER_BVISNULL( &q1->first->f_sub_final )) {
if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
rc = ber_bvcmp( &q1->first->f_sub_final,
&q2->first->f_sub_final );
} else {
rc = 1;
}
} else if ( !BER_BVISNULL( &q2->first->f_sub_final )) {
rc = -1;
}
} else {
rc = ber_bvcmp( &q1->first->f_mr_value,
&q2->first->f_mr_value );
}
break;
}
}
return rc;
}
/* add query on top of LRU list */
......@@ -474,6 +539,133 @@ final:
return rc;
}
static Filter *
filter_first( Filter *f )
{
while ( f->f_choice == LDAP_FILTER_OR || f->f_choice == LDAP_FILTER_AND )
f = f->f_and;
return f;
}
static CachedQuery *
find_filter( Operation *op, Avlnode *root, Filter *inputf, Filter *first )
{
Filter* fs;
Filter* fi;
const char* text;
MatchingRule* mrule = NULL;
int res=0;
int ret, rc, dir;
Avlnode *ptr;
CachedQuery cq, *qc;
cq.filter = inputf;
cq.first = first;
/* substring matches sort to the end, and we just have to
* walk the entire list.
*/
if ( first->f_choice == LDAP_FILTER_SUBSTRINGS ) {
ptr = tavl_end( root, 1 );
dir = TAVL_DIR_LEFT;
} else {
ptr = tavl_find3( root, &cq, pcache_filter_cmp, &ret );
dir = (first->f_choice == LDAP_FILTER_LE) ? TAVL_DIR_LEFT :
TAVL_DIR_RIGHT;
}
while (ptr) {
qc = ptr->avl_data;
fi = inputf;
fs = qc->filter;
do {
res=0;
switch (fs->f_choice) {
case LDAP_FILTER_EQUALITY:
if (fi->f_choice == LDAP_FILTER_EQUALITY)
mrule = fs->f_ava->aa_desc->ad_type->sat_equality;
else
ret = 1;
break;
case LDAP_FILTER_GE:
case LDAP_FILTER_LE:
mrule = fs->f_ava->aa_desc->ad_type->sat_ordering;
break;
default:
mrule = NULL;
}
if (mrule) {
const char *text;
rc = value_match(&ret, fs->f_ava->aa_desc, mrule,
SLAP_MR_VALUE_OF_ASSERTION_SYNTAX,
&(fi->f_ava->aa_value),
&(fs->f_ava->aa_value), &text);
if (rc != LDAP_SUCCESS) {
return NULL;
}
}
switch (fs->f_choice) {
case LDAP_FILTER_OR:
case LDAP_FILTER_AND:
fs = fs->f_and;
fi = fi->f_and;
res=1;
break;
case LDAP_FILTER_SUBSTRINGS:
/* check if the equality query can be
* answered with cached substring query */
if ((fi->f_choice == LDAP_FILTER_EQUALITY)
&& substr_containment_equality( op,
fs, fi))
res=1;
/* check if the substring query can be
* answered with cached substring query */
if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS
) && substr_containment_substr( op,
fs, fi))
res= 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_PRESENT:
res=1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_EQUALITY:
if (ret == 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_GE:
if (mrule && ret >= 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_LE:
if (mrule && ret <= 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_NOT:
res=0;
break;
default:
break;
}
} while((res) && (fi != NULL) && (fs != NULL));
if ( res )
return qc;
ptr = tavl_next( ptr, dir );
}
return NULL;
}
/* check whether query is contained in any of
* the cached queries in template
*/
......@@ -483,120 +675,87 @@ query_containment(Operation *op, query_manager *qm,
QueryTemplate *templa)
{
CachedQuery* qc;
Query* q;
Filter* inputf = query->filter;
struct berval* base = &(query->base);
int scope = query->scope;
int res=0;
Filter* fs;
Filter* fi;
int ret, rc;
const char* text;
int depth = 0, tscope;
Qbase qbase, *qbptr = NULL;
struct berval pdn;
if (query->filter != NULL) {
Filter *first;
MatchingRule* mrule = NULL;
if (inputf != NULL) {
Debug( pcache_debug, "Lock QC index = %p\n",
templa, 0, 0 );
qbase.base = query->base;
first = filter_first( query->filter );
ldap_pvt_thread_rdwr_rlock(templa->t_rwlock);
for(qc=templa->query; qc != NULL; qc= qc->next) {
q = (Query*)qc;
if(base_scope_compare(&(q->base), base, q->scope, scope)) {
fi = inputf;
fs = q->filter;
do {
res=0;
switch (fs->f_choice) {
case LDAP_FILTER_EQUALITY:
if (fi->f_choice == LDAP_FILTER_EQUALITY)
mrule = fs->f_ava->aa_desc->ad_type->sat_equality;
else
ret = 1;
break;
case LDAP_FILTER_GE:
case LDAP_FILTER_LE:
mrule = fs->f_ava->aa_desc->ad_type->sat_ordering;
break;
default:
mrule = NULL;
}
if (mrule) {
rc = value_match(&ret, fs->f_ava->aa_desc, mrule,
SLAP_MR_VALUE_OF_ASSERTION_SYNTAX,
&(fi->f_ava->aa_value),
&(fs->f_ava->aa_value), &text);
if (rc != LDAP_SUCCESS) {
ldap_pvt_thread_rdwr_runlock(templa->t_rwlock);
Debug( pcache_debug,
"Unlock: Exiting QC index=%p\n",
templa, 0, 0 );
return NULL;
}
}
switch (fs->f_choice) {
case LDAP_FILTER_OR:
case LDAP_FILTER_AND:
fs = fs->f_and;
fi = fi->f_and;
res=1;
for( ;; ) {
/* Find the base */
qbptr = avl_find( templa->qbase, &qbase, pcache_dn_cmp );
if ( qbptr ) {
tscope = query->scope;
/* Find a matching scope:
* match at depth 0 OK
* scope is BASE,
* one at depth 1 OK
* subord at depth > 0 OK
* subtree at any depth OK
* scope is ONE,
* subtree or subord at any depth OK
* scope is SUBORD,
* subtree or subord at any depth OK
* scope is SUBTREE,
* subord at depth > 0 OK
* subtree at any depth OK
*/
for ( tscope = 0 ; tscope <= LDAP_SCOPE_CHILDREN; tscope++ ) {
switch ( query->scope ) {
case LDAP_SCOPE_BASE:
if ( tscope == LDAP_SCOPE_BASE && depth ) continue;
if ( tscope == LDAP_SCOPE_ONE && depth != 1) continue;
if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue;
break;
case LDAP_FILTER_SUBSTRINGS:
/* check if the equality query can be
* answered with cached substring query */
if ((fi->f_choice == LDAP_FILTER_EQUALITY)
&& substr_containment_equality( op,
fs, fi))
res=1;
/* check if the substring query can be
* answered with cached substring query */
if ((fi->f_choice ==LDAP_FILTER_SUBSTRINGS
) && substr_containment_substr( op,
fs, fi))
res= 1;
fs=fs->f_next;
fi=fi->f_next;
case LDAP_SCOPE_ONE:
if ( tscope == LDAP_SCOPE_BASE )
tscope = LDAP_SCOPE_ONE;
if ( tscope == LDAP_SCOPE_ONE && depth ) continue;
if ( !depth ) break;
if ( tscope < LDAP_SCOPE_SUBTREE )
tscope = LDAP_SCOPE_SUBTREE;
break;
case LDAP_FILTER_PRESENT:
res=1;
fs=fs->f_next;
fi=fi->f_next;
case LDAP_SCOPE_SUBTREE:
if ( tscope < LDAP_SCOPE_SUBTREE )
tscope = LDAP_SCOPE_SUBTREE;
if ( tscope == LDAP_SCOPE_CHILDREN && !depth ) continue;
break;
case LDAP_FILTER_EQUALITY:
if (ret == 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_GE:
if (mrule && ret >= 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_LE:
if (mrule && ret <= 0)
res = 1;
fs=fs->f_next;
fi=fi->f_next;
break;
case LDAP_FILTER_NOT:
res=0;
break;
default:
case LDAP_SCOPE_CHILDREN:
if ( tscope < LDAP_SCOPE_SUBTREE )
tscope = LDAP_SCOPE_SUBTREE;
break;
}
} while((res) && (fi != NULL) && (fs != NULL));
if(res) {
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
if (qm->lru_top != qc) {
remove_query(qm, qc);
add_query_on_top(qm, qc);
if ( !qbptr->scopes[tscope] ) continue;
/* Find filter */
qc = find_filter( op, qbptr->scopes[tscope],
query->filter, first );
if ( qc ) {
ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
if (qm->lru_top != qc) {
remove_query(qm, qc);
add_query_on_top(qm, qc);
}
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
return qc;
}
ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
return qc;
}
}
if ( be_issuffix( op->o_bd, &qbase.base ))
break;
/* Up a level */
dnParent( &qbase.base, &pdn );
qbase.base = pdn;
}
Debug( pcache_debug,
"Not answerable: Unlock QC index=%p\n",
templa, 0, 0 );
......@@ -608,11 +767,8 @@ query_containment(Operation *op, query_manager *qm,
static void
free_query (CachedQuery* qc)
{
Query* q = (Query*)qc;
free(qc->q_uuid.bv_val);
filter_free(q->filter);
free (q->base.bv_val);
filter_free(qc->filter);
free(qc);
}
......@@ -625,7 +781,8 @@ static void add_query(
struct berval* uuid)
{
CachedQuery* new_cached_query = (CachedQuery*) ch_malloc(sizeof(CachedQuery));
Query* new_query;
Qbase *qbase, qb;
new_cached_query->qtemp = templ;
if ( uuid ) {
new_cached_query->q_uuid = *uuid;
......@@ -638,22 +795,35 @@ static void add_query(
new_cached_query->lru_down = NULL;
Debug( pcache_debug, "Added query expires at %ld\n",
(long) new_cached_query->expiry_time, 0, 0 );
new_query = (Query*)new_cached_query;
ber_dupbv(&new_query->base, &query->base);
new_query->scope = query->scope;
new_query->filter = query->filter;
new_cached_query->scope = query->scope;
new_cached_query->filter = query->filter;
new_cached_query->first = filter_first( query->filter );
qb.base = query->base;
/* Adding a query */
Debug( pcache_debug, "Lock AQ index = %p\n",
templ, 0, 0 );
ldap_pvt_thread_rdwr_wlock(templ->t_rwlock);
qbase = avl_find( templ->qbase, &qb, pcache_dn_cmp );
if ( !qbase ) {
qbase = ch_calloc( 1, sizeof(Qbase) + qb.base.bv_len + 1 );
qbase->base.bv_len =qb.base.bv_len;
qbase->base.bv_val = (char *)(qbase+1);
memcpy( qbase->base.bv_val, qb.base.bv_val, qb.base.bv_len );
qbase->base.bv_val[qbase->base.bv_len] = '\0';
avl_insert( &templ->qbase, qbase, pcache_dn_cmp, avl_dup_error );
}
if (templ->query == NULL)
templ->query_last = new_cached_query;
else
templ->query->prev = new_cached_query;
new_cached_query->next = templ->query;
new_cached_query->prev = NULL;
new_cached_query->qbase = qbase;
tavl_insert( &qbase->scopes[query->scope], new_cached_query,
pcache_filter_cmp, avl_dup_ok );
templ->query = new_cached_query;
templ->no_of_queries++;
Debug( pcache_debug, "TEMPLATE %p QUERIES++ %d\n",
......@@ -684,6 +854,15 @@ remove_from_template (CachedQuery* qc, QueryTemplate* template)
qc->next->prev = qc->prev;
qc->prev->next = qc->next;
}
tavl_delete( &qc->qbase->scopes[qc->scope], qc, pcache_filter_cmp );
if ( qc->qbase->scopes[0] == NULL &&
qc->qbase->scopes[1] == NULL &&
qc->qbase->scopes[2] == NULL &&
qc->qbase->scopes[3] == NULL ) {
avl_delete( &template->qbase, qc->qbase, pcache_dn_cmp );
ch_free( qc->qbase );
qc->qbase = NULL;
}
template->no_of_queries--;
}
......@@ -1999,6 +2178,17 @@ pcache_db_open(
return rc;
}
static void
pcache_free_qbase( void *v )
{
Qbase *qb = v;
int i;
for (i=0; i<3; i++)
tavl_free( qb->scopes[i], NULL );
ch_free( qb );
}
static int
pcache_db_close(
BackendDB *be
......@@ -2024,6 +2214,7 @@ pcache_db_close(
qn = qc->next;
free_query( qc );
}
avl_free( tm->qbase, pcache_free_qbase );
free( tm->querystr.bv_val );
ldap_pvt_thread_rdwr_destroy( tm->t_rwlock );
ch_free( tm->t_rwlock );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment