Commit 1f6d8611 authored by Ondřej Kuzník's avatar Ondřej Kuzník
Browse files

Implement read throttling when writes backlog

Reject operations in such a case with LDAP_BUSY. If read_event feature
is on, just stop reading from the connection. However this could still
result in deadlocks in reasonable situations. Need to figure out better
ways to make it safe and still protect ourselves.
parent 68b163fc
......@@ -266,6 +266,11 @@ handle_one_request( LloadConnection *c )
0 );
return LDAP_SUCCESS;
}
if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
operation_send_reject( op, LDAP_BUSY,
"writing side backlogged, please keep reading", 0 );
return LDAP_SUCCESS;
}
if ( op->o_tag == LDAP_REQ_EXTENDED ) {
handler = request_extended;
} else {
......
......@@ -1856,6 +1856,7 @@ config_feature( ConfigArgs *c )
{ BER_BVC("vc"), LLOAD_FEATURE_VC },
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
{ BER_BVC("proxyauthz"), LLOAD_FEATURE_PROXYAUTHZ },
{ BER_BVC("read_pause"), LLOAD_FEATURE_PAUSE },
{ BER_BVNULL, 0 }
};
slap_mask_t mask = 0;
......
......@@ -102,6 +102,10 @@ handle_pdus( void *ctx, void *arg )
c->c_currentber = ber;
checked_lock( &c->c_io_mutex );
if ( (lload_features & LLOAD_FEATURE_PAUSE) &&
(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
goto pause;
}
tag = ber_get_next( c->c_sb, &len, ber );
checked_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
......@@ -135,10 +139,18 @@ handle_pdus( void *ctx, void *arg )
assert( IS_ALIVE( c, c_refcnt ) );
}
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
checked_lock( &c->c_io_mutex );
if ( !(lload_features & LLOAD_FEATURE_PAUSE) ||
!(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "handle_pdus: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
}
pause:
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
done:
RELEASE_REF( c, c_refcnt, c->c_destroy );
epoch_leave( epoch );
......@@ -160,6 +172,7 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
ber_tag_t tag;
ber_len_t len;
epoch_t epoch;
int pause;
if ( !IS_ALIVE( c, c_live ) ) {
event_del( c->c_read_event );
......@@ -199,7 +212,9 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
c->c_currentber = ber;
checked_lock( &c->c_io_mutex );
assert( !(c->c_io_state & LLOAD_C_READ_HANDOVER) );
tag = ber_get_next( c->c_sb, &len, ber );
pause = c->c_io_state & LLOAD_C_READ_PAUSE;
checked_unlock( &c->c_io_mutex );
if ( tag != LDAP_TAG_MESSAGE ) {
......@@ -229,20 +244,34 @@ connection_read_cb( evutil_socket_t s, short what, void *arg )
CONNECTION_LOCK_DESTROY(c);
goto out;
}
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
if ( !(lload_features & LLOAD_FEATURE_PAUSE) || !pause ) {
event_add( c->c_read_event, c->c_read_timeout );
Debug( LDAP_DEBUG_CONNS, "connection_read_cb: "
"re-enabled read event on connid=%lu\n",
c->c_connid );
}
goto out;
}
checked_lock( &c->c_io_mutex );
c->c_io_state |= LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
event_del( c->c_read_event );
if ( !lload_conn_max_pdus_per_cycle ||
ldap_pvt_thread_pool_submit( &connection_pool, handle_pdus, c ) ) {
/* If we're overloaded or configured as such, process one and resume in
* the next cycle. */
event_add( c->c_read_event, c->c_read_timeout );
c->c_pdu_cb( c );
int rc = c->c_pdu_cb( c );
checked_lock( &c->c_io_mutex );
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
if ( rc == LDAP_SUCCESS &&
( !(lload_features & LLOAD_FEATURE_PAUSE) ||
!(c->c_io_state & LLOAD_C_READ_PAUSE) ) ) {
event_add( c->c_read_event, c->c_read_timeout );
}
checked_unlock( &c->c_io_mutex );
goto out;
}
......@@ -313,9 +342,28 @@ connection_write_cb( evutil_socket_t s, short what, void *arg )
CONNECTION_LOCK_DESTROY(c);
goto done;
}
if ( !(c->c_io_state & LLOAD_C_READ_PAUSE) ) {
Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
"connection connid=%lu blocked on writing, marking "
"paused\n",
c->c_connid );
}
c->c_io_state |= LLOAD_C_READ_PAUSE;
/* TODO: Do not reset write timeout unless we wrote something */
event_add( c->c_write_event, lload_write_timeout );
} else {
c->c_pendingber = NULL;
if ( c->c_io_state & LLOAD_C_READ_PAUSE ) {
c->c_io_state ^= LLOAD_C_READ_PAUSE;
Debug( LDAP_DEBUG_CONNS, "connection_write_cb: "
"Unpausing connection connid=%lu\n",
c->c_connid );
if ( !(c->c_io_state & LLOAD_C_READ_HANDOVER) ) {
event_add( c->c_read_event, c->c_read_timeout );
}
}
}
checked_unlock( &c->c_io_mutex );
......
......@@ -1637,6 +1637,8 @@ lload_handle_global_invalidation( LloadChange *change )
* - ProxyAuthz:
* - on: nothing needed
* - off: clear c_auth/privileged on each client
* - read pause (WIP):
* - nothing needed?
*/
assert( change->target );
......@@ -1644,6 +1646,9 @@ lload_handle_global_invalidation( LloadChange *change )
assert(0);
feature_diff &= ~LLOAD_FEATURE_VC;
}
if ( feature_diff & LLOAD_FEATURE_PAUSE ) {
feature_diff &= ~LLOAD_FEATURE_PAUSE;
}
if ( feature_diff & LLOAD_FEATURE_PROXYAUTHZ ) {
if ( !(lload_features & LLOAD_FEATURE_PROXYAUTHZ) ) {
LloadConnection *c;
......
......@@ -89,6 +89,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
ber_printf( output, "t{tit{ess}}", LDAP_TAG_MESSAGE,
LDAP_TAG_MSGID, op->o_client_msgid,
LDAP_RES_EXTENDED, LDAP_SUCCESS, "", "" );
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
CONNECTION_LOCK(c);
......
......@@ -174,6 +174,7 @@ typedef enum {
LLOAD_FEATURE_VC = 1 << 0,
#endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
LLOAD_FEATURE_PROXYAUTHZ = 1 << 1,
LLOAD_FEATURE_PAUSE = 1 << 2,
} lload_features_t;
#ifdef BALANCER_MODULE
......@@ -272,7 +273,8 @@ enum sc_state {
LLOAD_C_CLOSING, /* closing */
LLOAD_C_ACTIVE, /* exclusive operation (tls setup, ...) in progress */
LLOAD_C_BINDING, /* binding */
LLOAD_C_DYING, /* part-processed dead but someone still holds a reference */
LLOAD_C_DYING, /* part-processed dead waiting to be freed, someone
* might still be observing it */
};
enum sc_type {
LLOAD_C_OPEN = 0, /* regular connection */
......@@ -280,12 +282,22 @@ enum sc_type {
LLOAD_C_BIND, /* connection used to handle bind client requests if VC not enabled */
LLOAD_C_PRIVILEGED, /* connection can override proxyauthz control */
};
enum sc_io_state {
LLOAD_C_OPERATIONAL = 0, /* all is good */
LLOAD_C_READ_HANDOVER = 1 << 0, /* A task to process PDUs is scheduled or
* running, do not re-enable c_read_event */
LLOAD_C_READ_PAUSE = 1 << 1, /* We want to pause reading until the client
* has sufficiently caught up with what we
* sent */
};
/*
* represents a connection from an ldap client/to ldap server
*/
struct LloadConnection {
enum sc_state c_state; /* connection state */
enum sc_type c_type;
enum sc_io_state c_io_state;
ber_socket_t c_fd;
/*
......
......@@ -513,6 +513,9 @@ upstream_bind_cb( LloadConnection *c )
goto fail;
}
checked_lock( &c->c_io_mutex );
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
event_add( c->c_read_event, c->c_read_timeout );
ber_free( ber, 1 );
return -1;
......@@ -578,6 +581,9 @@ upstream_bind( void *ctx, void *arg )
}
#endif /* HAVE_CYRUS_SASL */
}
/* TODO: can we be paused at this point? Then we'd have to move this line
* after connection_write_cb */
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
connection_write_cb( -1, 0, c );
......@@ -832,11 +838,16 @@ upstream_starttls( LloadConnection *c )
ber_free( ber, 1 );
CONNECTION_UNLOCK(c);
checked_lock( &c->c_io_mutex );
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
checked_unlock( &c->c_io_mutex );
return rc;
}
base = event_get_base( c->c_read_event );
c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
event_del( c->c_read_event );
event_del( c->c_write_event );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment