Commit 2d330325 authored by Nadezhda Ivanova's avatar Nadezhda Ivanova Committed by Ondřej Kuzník
Browse files

Lload cn=monitor initial implementation

parent 9baa56ad
......@@ -19,7 +19,7 @@ XSRCS = version.c
NT_SRCS = ../slapd/nt_svc.c
NT_OBJS = ../slapd/nt_svc.o ../../libraries/liblutil/slapdmsg.res
SRCS += module_init.c
SRCS += module_init.c monitor.c
OBJS = $(patsubst %.c,%.lo,$(SRCS)) $(@PLAT@_OBJS)
......
......@@ -245,7 +245,13 @@ backend_select( LloadOperation *op, int *res )
ldap_pvt_thread_mutex_unlock( &backend_mutex );
b->b_n_ops_executing++;
if ( op->o_tag == LDAP_REQ_BIND ) {
b->b_counters[LLOAD_STATS_OPS_BIND].lc_ops_received++;
} else {
b->b_counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
}
c->c_n_ops_executing++;
c->c_counters.lc_ops_received++;
CONNECTION_UNLOCK_INCREF(c);
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
......
......@@ -39,6 +39,8 @@ bind_mech_external(
client->c_state = LLOAD_C_READY;
client->c_type = LLOAD_C_OPEN;
op->o_res = LLOAD_OP_COMPLETED;
/*
* We only support implicit assertion.
*
......@@ -225,6 +227,11 @@ request_bind( LloadConnection *client, LloadOperation *op )
* lose the client lock in operation_destroy_from_client temporarily
*/
pinned_op->o_client_refcnt++;
op->o_res = LLOAD_OP_COMPLETED;
/* We didn't start a new operation, just continuing an existing one */
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received--;
operation_destroy_from_client( op );
pinned_op->o_client_refcnt--;
......@@ -380,6 +387,10 @@ request_bind( LloadConnection *client, LloadOperation *op )
}
upstream->c_pendingber = ber;
if ( !pin ) {
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_forwarded++;
}
CONNECTION_LOCK(upstream);
if ( pin ) {
tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
......
......@@ -34,6 +34,8 @@ request_abandon( LloadConnection *c, LloadOperation *op )
LloadOperation *request, needle = { .o_client_connid = c->c_connid };
int rc = LDAP_SUCCESS;
op->o_res = LLOAD_OP_COMPLETED;
if ( ber_decode_int( &op->o_request, &needle.o_client_msgid ) ) {
Debug( LDAP_DEBUG_STATS, "request_abandon: "
"connid=%lu msgid=%d invalid integer sent in abandon request\n",
......@@ -125,6 +127,8 @@ request_process( LloadConnection *client, LloadOperation *op )
op->o_client_msgid, op->o_upstream_connid, op->o_upstream_msgid );
assert( rc == LDAP_SUCCESS );
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_forwarded++;
if ( (lload_features & LLOAD_FEATURE_PROXYAUTHZ) &&
client->c_type != LLOAD_C_PRIVILEGED ) {
CONNECTION_LOCK_DECREF(client);
......@@ -209,7 +213,9 @@ handle_one_request( LloadConnection *c )
switch ( op->o_tag ) {
case LDAP_REQ_UNBIND:
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
/* There is never a response for this operation */
op->o_res = LLOAD_OP_COMPLETED;
operation_destroy_from_client( op );
Debug( LDAP_DEBUG_STATS, "handle_one_request: "
"received unbind, closing client connid=%lu\n",
......@@ -217,14 +223,17 @@ handle_one_request( LloadConnection *c )
CONNECTION_DESTROY(c);
return -1;
case LDAP_REQ_BIND:
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_received++;
handler = request_bind;
break;
case LDAP_REQ_ABANDON:
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
/* We can't send a response to abandon requests even if a bind is
* currently in progress */
handler = request_abandon;
break;
case LDAP_REQ_EXTENDED:
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
handler = request_extended;
break;
default:
......@@ -232,6 +241,7 @@ handle_one_request( LloadConnection *c )
return operation_send_reject_locked(
op, LDAP_PROTOCOL_ERROR, "bind in progress", 0 );
}
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_received++;
handler = request_process;
break;
}
......@@ -563,3 +573,20 @@ clients_destroy( void )
}
ldap_pvt_thread_mutex_unlock( &clients_mutex );
}
void
clients_walk( CONNECTION_CLIENT_WALK apply, void *argv )
{
LloadConnection *c;
ldap_pvt_thread_mutex_lock( &clients_mutex );
if ( LDAP_CIRCLEQ_EMPTY( &clients ) ) {
ldap_pvt_thread_mutex_unlock( &clients_mutex );
return;
}
/* Todo is it possible to do this without holding this lock? */
LDAP_CIRCLEQ_FOREACH ( c, &clients, c_next ) {
apply( c, argv );
}
ldap_pvt_thread_mutex_unlock( &clients_mutex );
}
......@@ -85,6 +85,12 @@ struct evdns_base *dnsbase;
struct event *lload_timeout_event;
/*
* global lload statistics. Not mutex protected to preserve performance -
* increment is atomic, at most we risk a bit of inconsistency
*/
lload_global_stats_t lload_stats;
#ifndef SLAPD_LISTEN_BACKLOG
#define SLAPD_LISTEN_BACKLOG 1024
#endif /* ! SLAPD_LISTEN_BACKLOG */
......@@ -1431,3 +1437,10 @@ lload_resume_listeners( void )
evconnlistener_enable( lload_listeners[i]->listener );
}
}
/* we need this in a file that compiles for both module and server */
void
lload_counters_init()
{
memset( &lload_stats, 0, sizeof(lload_global_stats_t) );
}
......@@ -96,6 +96,7 @@ handle_starttls( LloadConnection *c, LloadOperation *op )
/* We already have something to write */
event_add( c->c_write_event, lload_write_timeout );
op->o_res = LLOAD_OP_COMPLETED;
operation_destroy_from_client( op );
CONNECTION_UNLOCK_INCREF(c);
......
......@@ -94,6 +94,8 @@ lload_init( int mode, const char *name )
ldap_pvt_thread_pool_init_q( &connection_pool, connection_pool_max,
0, connection_pool_queues );
lload_counters_init();
ldap_pvt_thread_mutex_init( &slapd_rq.rq_mutex );
LDAP_STAILQ_INIT( &slapd_rq.task_list );
LDAP_STAILQ_INIT( &slapd_rq.run_list );
......
......@@ -43,6 +43,7 @@
#include "avl.h"
#include "../servers/slapd/slap.h"
#include "../slapd/back-monitor/back-monitor.h"
#ifndef ldap_debug
#define ldap_debug slap_debug
......@@ -123,6 +124,26 @@ struct LloadPendingConnection {
LDAP_LIST_ENTRY(LloadPendingConnection) next;
};
typedef struct lload_counters_t {
ldap_pvt_mp_t lc_ops_completed;
ldap_pvt_mp_t lc_ops_received;
ldap_pvt_mp_t lc_ops_forwarded;
ldap_pvt_mp_t lc_ops_rejected;
ldap_pvt_mp_t lc_ops_failed;
} lload_counters_t;
enum {
LLOAD_STATS_OPS_BIND = 0,
LLOAD_STATS_OPS_OTHER,
LLOAD_STATS_OPS_LAST
};
typedef struct lload_global_stats_t {
ldap_pvt_mp_t global_incoming;
ldap_pvt_mp_t global_outgoing;
lload_counters_t counters[LLOAD_STATS_OPS_LAST];
} lload_global_stats_t;
/* Can hold mutex when locking a linked connection */
struct LloadBackend {
ldap_pvt_thread_mutex_t b_mutex;
......@@ -145,6 +166,8 @@ struct LloadBackend {
long b_max_pending, b_max_conn_pending;
long b_n_ops_executing;
lload_counters_t b_counters[LLOAD_STATS_OPS_LAST];
LDAP_CIRCLEQ_ENTRY(LloadBackend) b_next;
};
......@@ -276,8 +299,9 @@ struct LloadConnection {
enum lload_tls_type c_is_tls; /* true if this LDAP over raw TLS */
#endif
long c_n_ops_executing; /* num of ops currently executing */
long c_n_ops_completed; /* num of ops completed */
long c_n_ops_executing; /* num of ops currently executing */
long c_n_ops_completed; /* num of ops completed */
lload_counters_t c_counters; /* per connection operation counters */
/*
* Protected by the CIRCLEQ mutex:
......@@ -296,6 +320,14 @@ enum op_state {
LLOAD_OP_DETACHING_UPSTREAM = 1 << 2,
LLOAD_OP_DETACHING_CLIENT = 1 << 3,
};
/* operation result for monitoring purposes */
enum op_result {
LLOAD_OP_REJECTED, /* operation was not forwarded */
LLOAD_OP_COMPLETED, /* operation sent and response received */
LLOAD_OP_FAILED, /* operation was forwarded, but no response was received */
};
#define LLOAD_OP_FREEING_MASK \
( LLOAD_OP_FREEING_UPSTREAM | LLOAD_OP_FREEING_CLIENT )
#define LLOAD_OP_DETACHING_MASK \
......@@ -326,6 +358,7 @@ struct LloadOperation {
time_t o_start;
unsigned long o_pin_id;
enum op_result o_res;
BerElement *o_ber;
BerValue o_request, o_ctrls;
};
......@@ -354,6 +387,13 @@ struct LloadListener {
#endif
};
typedef int (*CONNECTION_CLIENT_WALK)( LloadConnection *c, void *argv );
struct lload_monitor_conn_arg {
Operation *op;
monitor_subsys_t *ms;
Entry **ep;
};
LDAP_END_DECL
#include "proto-lload.h"
......
......@@ -39,8 +39,10 @@
#include "ldap_rq.h"
int
lload_start_daemon()
ldap_pvt_thread_t lloadd_main_thread;
void *
lload_start_daemon( void *arg )
{
struct event_base *daemon_base = event_base_new();
int rc = 0, i;
......@@ -48,11 +50,11 @@ lload_start_daemon()
Debug( LDAP_DEBUG_ANY, "lload_start_daemon: "
"main event base allocation failed\n" );
rc = 1;
return rc;
return (void *)(uintptr_t)rc;
}
rc = lloadd_daemon( daemon_base );
return rc;
return (void *)(uintptr_t)rc;
}
/* from init.c */
......@@ -71,16 +73,90 @@ lload_conn_pool_init()
return rc;
}
static int
lload_module_incoming_count( LloadConnection *conn, void *argv )
{
lload_global_stats_t *tmp_stats = argv;
tmp_stats->global_incoming++;
return 0;
}
/* update all global statistics other than rejected and received,
* these are updated in real time */
void *
lload_module_update_global_stats( void *ctx, void *arg )
{
struct re_s *rtask = arg;
lload_global_stats_t tmp_stats = {};
LloadBackend *b;
int i;
Debug( LDAP_DEBUG_TRACE, "lload_module_update_global_stats: "
"updating stats\n" );
/* count incoming connections */
clients_walk( lload_module_incoming_count, &tmp_stats );
LDAP_CIRCLEQ_FOREACH ( b, &backend, b_next ) {
LloadConnection *c;
ldap_pvt_thread_mutex_lock( &b->b_mutex );
tmp_stats.global_outgoing += b->b_active + b->b_bindavail;
/* merge completed and failed stats */
for ( i = 0; i < LLOAD_STATS_OPS_LAST; i++ ) {
tmp_stats.counters[i].lc_ops_completed +=
b->b_counters[i].lc_ops_completed;
tmp_stats.counters[i].lc_ops_failed +=
b->b_counters[i].lc_ops_failed;
}
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
/* update lload_stats */
lload_stats.global_outgoing = tmp_stats.global_outgoing;
lload_stats.global_incoming = tmp_stats.global_incoming;
for ( i = 0; i < LLOAD_STATS_OPS_LAST; i++ ) {
lload_stats.counters[i].lc_ops_completed =
tmp_stats.counters[i].lc_ops_completed;
lload_stats.counters[i].lc_ops_failed =
tmp_stats.counters[i].lc_ops_failed;
}
/* reschedule */
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
}
void *
lload_module_start_daemon( void *ctx, void *arg )
{
lload_start_daemon();
lload_counters_init();
lload_monitor_mss_init();
if ( ldap_pvt_thread_create(
&lloadd_main_thread, 0, lload_start_daemon, NULL ) ) {
return NULL;
}
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_insert( &slapd_rq, 1, lload_module_update_global_stats,
NULL, "lload_module_update_global_stats", "lloadd" );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
}
int
init_module( int argc, char *argv[] )
{
if ( argc != 2 ) {
Debug( LDAP_DEBUG_CONFIG, "lloadd: "
"incorrect number of arguments to module\n" );
return -1;
}
if ( slapMode & SLAP_TOOL_MODE ) {
return 0;
}
......@@ -103,9 +179,10 @@ init_module( int argc, char *argv[] )
return -1;
}
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
ldap_pvt_runqueue_insert( &slapd_rq, 0, lload_module_start_daemon, NULL,
"lload_module_start_daemon", "lloadd" );
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return 0;
if ( lload_monitor_initialize() != 0 ) {
return -1;
}
return ldap_pvt_thread_pool_submit(
&connection_pool, lload_module_start_daemon, NULL );
}
This diff is collapsed.
......@@ -223,6 +223,9 @@ operation_destroy_from_client( LloadOperation *op )
return;
}
/* it seems we will be destroying the operation,
* so update the global rejected cunter if needed */
operation_update_global_rejected( op );
/* 5. If we raced the upstream side and won, reclaim the token */
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
if ( !(race_state & LLOAD_OP_DETACHING_UPSTREAM) ) {
......@@ -281,6 +284,7 @@ operation_destroy_from_client( LloadOperation *op )
if ( upstream ) {
if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
upstream->c_n_ops_executing--;
operation_update_conn_counters( op );
b = (LloadBackend *)upstream->c_private;
}
CONNECTION_UNLOCK_OR_DESTROY(upstream);
......@@ -288,6 +292,7 @@ operation_destroy_from_client( LloadOperation *op )
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
}
......@@ -331,9 +336,13 @@ operation_destroy_from_upstream( LloadOperation *op )
return;
}
/* it seems we will be destroying the operation,
* so update the global rejected cunter if needed */
operation_update_global_rejected( op );
/* 2. Remove from the operation map and adjust the pending op count */
if ( tavl_delete( &upstream->c_ops, op, operation_upstream_cmp ) ) {
upstream->c_n_ops_executing--;
operation_update_conn_counters( op );
b = (LloadBackend *)upstream->c_private;
}
......@@ -357,6 +366,7 @@ operation_destroy_from_upstream( LloadOperation *op )
if ( b ) {
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
}
......@@ -590,6 +600,10 @@ operation_abandon( LloadOperation *op )
int rc = LDAP_SUCCESS;
ldap_pvt_thread_mutex_lock( &op->o_link_mutex );
/* for now consider all abandoned operations completed,
* perhaps add a separate counter later */
op->o_res = LLOAD_OP_COMPLETED;
c = op->o_upstream;
if ( !c ) {
ldap_pvt_thread_mutex_unlock( &op->o_link_mutex );
......@@ -620,6 +634,7 @@ operation_abandon( LloadOperation *op )
ldap_pvt_thread_mutex_lock( &b->b_mutex );
b->b_n_ops_executing--;
operation_update_backend_counters( op, b );
ldap_pvt_thread_mutex_unlock( &b->b_mutex );
if ( operation_send_abandon( op ) == LDAP_SUCCESS ) {
......@@ -791,6 +806,7 @@ operation_lost_upstream( LloadOperation *op )
CONNECTION_LOCK_DECREF(c);
op->o_upstream_refcnt--;
op->o_res = LLOAD_OP_FAILED;
operation_destroy_from_upstream( op );
CONNECTION_UNLOCK(c);
}
......@@ -817,6 +833,7 @@ connection_timeout( LloadConnection *upstream, time_t threshold )
}
op->o_upstream_refcnt++;
op->o_res = LLOAD_OP_FAILED;
found_op = tavl_delete( &upstream->c_ops, op, operation_upstream_cmp );
assert( op == found_op );
......@@ -982,3 +999,43 @@ done:
"timeout task finished\n" );
evtimer_add( self, lload_timeout_api );
}
void
operation_update_global_rejected( LloadOperation *op )
{
if ( op->o_res == LLOAD_OP_REJECTED && op->o_upstream_connid == 0 ) {
switch ( op->o_tag ) {
case LDAP_REQ_BIND:
lload_stats.counters[LLOAD_STATS_OPS_BIND].lc_ops_rejected++;
break;
default:
lload_stats.counters[LLOAD_STATS_OPS_OTHER].lc_ops_rejected++;
break;
}
}
}
void
operation_update_conn_counters( LloadOperation *op )
{
assert( op->o_upstream != NULL );
if ( op->o_res == LLOAD_OP_COMPLETED ) {
op->o_upstream->c_counters.lc_ops_completed++;
} else {
op->o_upstream->c_counters.lc_ops_failed++;
}
}
void
operation_update_backend_counters( LloadOperation *op, LloadBackend *b )
{
int stat_type = op->o_tag == LDAP_REQ_BIND ? LLOAD_STATS_OPS_BIND :
LLOAD_STATS_OPS_OTHER;
assert( b != NULL );
if ( op->o_res == LLOAD_OP_COMPLETED ) {
b->b_counters[stat_type].lc_ops_completed++;
} else {
b->b_counters[stat_type].lc_ops_failed++;
}
}
......@@ -65,6 +65,7 @@ LDAP_SLAPD_F (LloadConnection *) client_init( ber_socket_t s, LloadListener *url
LDAP_SLAPD_F (void) client_reset( LloadConnection *c );
LDAP_SLAPD_F (void) client_destroy( LloadConnection *c );
LDAP_SLAPD_F (void) clients_destroy( void );
LDAP_SLAPD_F (void) clients_walk( CONNECTION_CLIENT_WALK apply, void *argv );
/*
* config.c
......@@ -126,6 +127,7 @@ LDAP_SLAPD_F (int) lload_exop_init( void );
*/
LDAP_SLAPD_F (int) lload_init( int mode, const char *name );
LDAP_SLAPD_F (int) lload_destroy( void );
LDAP_SLAPD_F (void) lload_counters_init( void );
/*
* libevent_support.c
......@@ -133,6 +135,13 @@ LDAP_SLAPD_F (int) lload_destroy( void );
LDAP_SLAPD_F (int) lload_libevent_init( void );
LDAP_SLAPD_F (void) lload_libevent_destroy( void );
#ifdef BALANCER_MODULE
/*
* monitor.c
*/
LDAP_SLAPD_F (int) lload_monitor_initialize( void );
#endif /* BALANCER_MODULE */
/*
* operation.c
*/
......@@ -150,7 +159,9 @@ LDAP_SLAPD_F (void) operation_lost_upstream( LloadOperation *op );
LDAP_SLAPD_F (void) operation_destroy_from_client( LloadOperation *op );
LDAP_SLAPD_F (void) operation_destroy_from_upstream( LloadOperation *op );
LDAP_SLAPD_F (void) operations_timeout( evutil_socket_t s, short what, void *arg );
LDAP_SLAPD_F (void) operation_update_conn_counters( LloadOperation *op );
LDAP_SLAPD_F (void) operation_update_backend_counters( LloadOperation *op, LloadBackend *b );
LDAP_SLAPD_F (void) operation_update_global_rejected( LloadOperation *op );
/*
* upstream.c
*/
......@@ -181,6 +192,8 @@ LDAP_SLAPD_V (char *) global_host;
LDAP_SLAPD_V (int) lber_debug;
LDAP_SLAPD_V (int) ldap_syslog;
LDAP_SLAPD_V (lload_global_stats_t) lload_stats;
LDAP_END_DECL
#endif /* PROTO_LLOAD_H */
......@@ -89,6 +89,7 @@ forward_final_response(
op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
rc = forward_response( client, op, ber );
CONNECTION_LOCK(op->o_upstream);
op->o_res = LLOAD_OP_COMPLETED;
if ( !op->o_pin_id || !op->o_upstream_refcnt-- ) {
operation_destroy_from_upstream( op );
}
......
......@@ -1963,6 +1963,7 @@ monitor_back_initialize(
{ "olmGenericAttributes", "olmSubSystemAttributes:0" },
{ "olmDatabaseAttributes", "olmSubSystemAttributes:1" },
{ "olmOverlayAttributes", "olmSubSystemAttributes:2" },
{ "olmModuleAttributes", "olmSubSystemAttributes:3" },
/* for example, back-mdb specific attrs
* are in "olmDatabaseAttributes:12"
......@@ -1975,6 +1976,7 @@ monitor_back_initialize(
{ "olmGenericObjectClasses", "olmSubSystemObjectClasses:0" },
{ "olmDatabaseObjectClasses", "olmSubSystemObjectClasses:1" },
{ "olmOverlayObjectClasses", "olmSubSystemObjectClasses:2" },
{ "olmModuleObjectClasses", "olmSubSystemObjectClasses:3" },
/* for example, back-mdb specific objectClasses
* are in "olmDatabaseObjectClasses:12"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment