Commit 0190f18b authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

ITS#9598 Introduce backend-restricted selection

parent 7173e472
......@@ -341,6 +341,16 @@ Specify the number of milliseconds to wait before forcibly closing
a connection with an outstanding write. This allows faster recovery from
various network hang conditions. An iotimeout of 0 disables this feature.
The default is 10000.
.TP
.B write_coherence <integer>
Specify the number of seconds after a write operation is finished that
.B lloadd
will direct operations exclusively to the last selected backend. A write
operation is anything not handled internally (certain exops, abandon),
excepting search, compare and bind operations. Bind operations also reset this
restriction. The default is 0, write operations do not restrict selection. When
negative, the restriction is not time limited and will persist until the next
bind.
.SH TLS OPTIONS
If
......
......@@ -335,6 +335,12 @@ request_bind( LloadConnection *client, LloadOperation *op )
rc = ldap_tavl_insert( &client->c_ops, op, operation_client_cmp, ldap_avl_dup_error );
assert( rc == LDAP_SUCCESS );
client->c_n_ops_executing++;
if ( client->c_backend ) {
assert( client->c_restricted_inflight == 0 );
client->c_backend = NULL;
client->c_restricted_at = 0;
}
CONNECTION_UNLOCK(client);
if ( pin ) {
......
......@@ -90,11 +90,28 @@ request_process( LloadConnection *client, LloadOperation *op )
{
BerElement *output;
LloadConnection *upstream = NULL;
LloadBackend *b = NULL;
ber_int_t msgid;
int res = LDAP_UNAVAILABLE, rc = LDAP_SUCCESS;
char *message = "no connections available";
upstream_select( op, &upstream, &res, &message );
if ( lload_write_coherence ) {
CONNECTION_LOCK(client);
if ( client->c_restricted_inflight || client->c_restricted_at < 0 ||
client->c_restricted_at + lload_write_coherence >= op->o_start ) {
b = client->c_backend;
} else {
client->c_backend = NULL;
}
CONNECTION_UNLOCK(client);
}
if ( b ) {
backend_select( b, op, &upstream, &res, &message );
} else {
upstream_select( op, &upstream, &res, &message );
}
if ( !upstream ) {
Debug( LDAP_DEBUG_STATS, "request_process: "
"connid=%lu, msgid=%d no available connection found\n",
......@@ -166,6 +183,21 @@ request_process( LloadConnection *client, LloadOperation *op )
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
if ( lload_write_coherence && !b &&
op->o_tag != LDAP_REQ_SEARCH &&
op->o_tag != LDAP_REQ_COMPARE ) {
/*
* TODO: There can't be more than one thread receiving a new request,
* so we could drop the lock. We'd still need some atomics for the
* counters.
*/
CONNECTION_LOCK(client);
client->c_backend = upstream->c_backend;
client->c_restricted_inflight++;
op->o_restricted = 1;
CONNECTION_UNLOCK(client);
}
if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
client->c_type != LLOAD_C_PRIVILEGED ) {
CONNECTION_LOCK(client);
......@@ -199,6 +231,8 @@ fail:
if ( upstream ) {
CONNECTION_LOCK_DESTROY(upstream);
/* We have not committed any restrictions in the end */
op->o_restricted = LLOAD_OP_NOT_RESTRICTED;
operation_send_reject( op, LDAP_OTHER, "internal error", 0 );
}
......
......@@ -80,6 +80,7 @@ static struct timeval timeout_api_tv, timeout_net_tv,
timeout_write_tv = { 10, 0 };
lload_features_t lload_features;
int lload_write_coherence = 0;
ber_len_t sockbuf_max_incoming_client = LLOAD_SB_MAX_INCOMING_CLIENT;
ber_len_t sockbuf_max_incoming_upstream = LLOAD_SB_MAX_INCOMING_UPSTREAM;
......@@ -631,6 +632,17 @@ static ConfigTable config_back_cf_table[] = {
NULL,
{ .v_uint = 0 }
},
{ "write_coherence", "seconds", 2, 2, 0,
ARG_INT,
&lload_write_coherence,
"( OLcfgBkAt:13.38 "
"NAME 'olcBkLloadWriteCoherence' "
"DESC 'Keep operations to the same backend after a write' "
"EQUALITY integerMatch "
"SYNTAX OMsInteger "
"SINGLE-VALUE )",
NULL, NULL
},
/* cn=config only options */
#ifdef BALANCER_MODULE
......
......@@ -295,6 +295,14 @@ enum sc_io_state {
* sent */
};
/* Tracking whether an operation might cause a client to restrict which
* upstreams are eligible */
enum op_restriction {
LLOAD_OP_NOT_RESTRICTED, /* operation didn't trigger any restriction */
LLOAD_OP_RESTRICTED_BACKEND, /* operation restricts a client to a certain backend */
LLOAD_OP_RESTRICTED_UPSTREAM, /* operation restricts a client to a certain upstream */
};
/*
* represents a connection from an ldap client/to ldap server
*/
......@@ -403,6 +411,8 @@ struct LloadConnection {
lload_counters_t c_counters; /* per connection operation counters */
LloadBackend *c_backend;
uintptr_t c_restricted_inflight;
time_t c_restricted_at;
/*
* Protected by the CIRCLEQ mutex:
......@@ -441,6 +451,7 @@ struct LloadOperation {
unsigned long o_client_connid;
ber_int_t o_client_msgid;
ber_int_t o_saved_msgid;
enum op_restriction o_restricted;
LloadConnection *o_upstream;
unsigned long o_upstream_connid;
......
......@@ -276,6 +276,20 @@ operation_unlink_client( LloadOperation *op, LloadConnection *client )
assert( op == removed );
client->c_n_ops_executing--;
if ( op->o_restricted ) {
if ( !--client->c_restricted_inflight && client->c_restricted_at >= 0 ) {
if ( lload_write_coherence < 0 ) {
client->c_restricted_at = -1;
} else if ( op->o_last_response ) {
client->c_restricted_at = op->o_last_response;
} else {
/* We have to default to o_start just in case we abandoned an
* operation that the backend actually processed */
client->c_restricted_at = op->o_start;
}
}
}
if ( op->o_tag == LDAP_REQ_BIND &&
client->c_state == LLOAD_C_BINDING ) {
client->c_state = LLOAD_C_READY;
......
......@@ -202,6 +202,8 @@ LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_client;
LDAP_SLAPD_V (ber_len_t) sockbuf_max_incoming_upstream;
LDAP_SLAPD_V (int) lload_conn_max_pdus_per_cycle;
LDAP_SLAPD_V (int) lload_write_coherence;
LDAP_SLAPD_V (lload_features_t) lload_features;
LDAP_SLAPD_V (slap_mask_t) global_allows;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment