From 989c9f208d4fc29d89ead18beb7b036b69f7381e Mon Sep 17 00:00:00 2001 From: Howard Chu <hyc@openldap.org> Date: Fri, 21 Mar 2008 01:46:03 +0000 Subject: [PATCH] ITS#5364, #5407 from HEAD --- include/ldap_pvt_thread.h | 8 + libraries/libldap_r/tpool.c | 303 ++++++++++++++++++++++-------------- 2 files changed, 198 insertions(+), 113 deletions(-) diff --git a/include/ldap_pvt_thread.h b/include/ldap_pvt_thread.h index 3236b89755..1325e77226 100644 --- a/include/ldap_pvt_thread.h +++ b/include/ldap_pvt_thread.h @@ -243,10 +243,18 @@ ldap_pvt_thread_pool_query LDAP_P(( ldap_pvt_thread_pool_t *pool, ldap_pvt_thread_pool_param_t param, void *value )); +LDAP_F( int ) +ldap_pvt_thread_pool_pausing LDAP_P(( + ldap_pvt_thread_pool_t *pool )); + LDAP_F( int ) ldap_pvt_thread_pool_backload LDAP_P(( ldap_pvt_thread_pool_t *pool )); +LDAP_F( int ) +ldap_pvt_thread_pool_pausecheck LDAP_P(( + ldap_pvt_thread_pool_t *pool )); + LDAP_F( int ) ldap_pvt_thread_pool_pause LDAP_P(( ldap_pvt_thread_pool_t *pool )); diff --git a/libraries/libldap_r/tpool.c b/libraries/libldap_r/tpool.c index 0824a11ba5..fa22d7d9a4 100644 --- a/libraries/libldap_r/tpool.c +++ b/libraries/libldap_r/tpool.c @@ -17,6 +17,7 @@ #include <stdio.h> +#include <ac/signal.h> #include <ac/stdarg.h> #include <ac/stdlib.h> #include <ac/string.h> @@ -31,12 +32,6 @@ #ifndef LDAP_THREAD_HAVE_TPOOL -typedef enum ldap_int_thread_pool_state_e { - LDAP_INT_THREAD_POOL_RUNNING, - LDAP_INT_THREAD_POOL_FINISHING, - LDAP_INT_THREAD_POOL_STOPPING -} ldap_int_thread_pool_state_t; - /* Thread-specific key with data and optional free function */ typedef struct ldap_int_tpool_key_s { void *ltk_key; @@ -52,6 +47,9 @@ typedef struct ldap_int_tpool_key_s { /* Max number of threads */ #define LDAP_MAXTHR 1024 /* must be a power of 2 */ +/* (Theoretical) max number of pending requests */ +#define MAX_PENDING (INT_MAX/2) /* INT_MAX - (room to avoid overflow) */ + /* Context: thread ID and thread-specific key/data pairs */ typedef struct ldap_int_thread_userctx_s { ldap_pvt_thread_t ltu_id; @@ -88,6 +86,8 @@ typedef struct ldap_int_thread_task_s { void *ltt_arg; } ldap_int_thread_task_t; +typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t; + struct ldap_int_thread_pool_s { LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next; @@ -100,23 +100,56 @@ struct ldap_int_thread_pool_s { /* ltp_active_count <= 1 && ltp_pause */ ldap_pvt_thread_cond_t ltp_pcond; + /* ltp_pause == 0 ? <p_pending_list : &empty_pending_list, + * maintaned to reduce work for pool_wrapper() + */ + ldap_int_tpool_plist_t *ltp_work_list; + /* pending tasks, and unused task objects */ - LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ltp_pending_list; + ldap_int_tpool_plist_t ltp_pending_list; LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list; - ldap_int_thread_pool_state_t ltp_state; + /* The pool is finishing, waiting for its threads to close. + * They close when ltp_pending_list is done. pool_submit() + * rejects new tasks. ltp_max_pending = -(its old value). + */ + int ltp_finishing; + + /* Some active task needs to be the sole active task. + * Atomic variable so ldap_pvt_thread_pool_pausing() can read it. + * Note: Pauses adjust ltp_<open_count/vary_open_count/work_list>, + * so pool_<submit/wrapper>() mostly can avoid testing ltp_pause. + */ + volatile sig_atomic_t ltp_pause; + + /* Max number of threads in pool, or 0 for default (LDAP_MAXTHR) */ + int ltp_max_count; - /* some active request needs to be the sole active request */ - int ltp_pause; + /* Max number of pending + paused requests, negated when ltp_finishing */ + int ltp_max_pending; - long ltp_max_count; /* max number of threads in pool, or 0 */ - long ltp_max_pending; /* max pending or paused requests, or 0 */ - long ltp_pending_count; /* pending or paused requests */ - long ltp_active_count; /* active, not paused requests */ - long ltp_open_count; /* number of threads */ - long ltp_starting; /* currenlty starting threads */ + int ltp_pending_count; /* Pending or paused requests */ + int ltp_active_count; /* Active, not paused requests */ + int ltp_open_count; /* Number of threads, negated when ltp_pause */ + int ltp_starting; /* Currenlty starting threads */ + + /* >0 if paused or we may open a thread, <0 if we should close a thread. + * Updated when ltp_<finishing/pause/max_count/open_count> change. + * Maintained to reduce the time ltp_mutex must be locked in + * ldap_pvt_thread_pool_<submit/wrapper>(). + */ + int ltp_vary_open_count; +# define SET_VARY_OPEN_COUNT(pool) \ + ((pool)->ltp_vary_open_count = \ + (pool)->ltp_pause ? 1 : \ + (pool)->ltp_finishing ? -1 : \ + ((pool)->ltp_max_count ? (pool)->ltp_max_count : LDAP_MAXTHR) \ + - (pool)->ltp_open_count) }; +static ldap_int_tpool_plist_t empty_pending_list = + LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list); + static int ldap_int_has_thread_pool = 0; static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s) ldap_int_thread_pool_list = @@ -168,8 +201,8 @@ ldap_pvt_thread_pool_init ( if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR)) max_threads = 0; - if (max_pending < 0) - max_pending = 0; + if (! (1 <= max_pending && max_pending <= MAX_PENDING)) + max_pending = MAX_PENDING; *tpool = NULL; pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1, @@ -188,11 +221,15 @@ ldap_pvt_thread_pool_init ( return(rc); ldap_int_has_thread_pool = 1; - pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING; + pool->ltp_max_count = max_threads; + SET_VARY_OPEN_COUNT(pool); pool->ltp_max_pending = max_pending; + LDAP_STAILQ_INIT(&pool->ltp_pending_list); + pool->ltp_work_list = &pool->ltp_pending_list; LDAP_SLIST_INIT(&pool->ltp_free_list); + ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex); LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next); ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); @@ -213,6 +250,7 @@ ldap_pvt_thread_pool_init ( * lock the mutex right now, since no threads are running. */ pool->ltp_open_count++; + SET_VARY_OPEN_COUNT(pool); ldap_pvt_thread_t thr; rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool ); @@ -256,23 +294,17 @@ ldap_pvt_thread_pool_submit ( return(-1); ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); - if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING - || (pool->ltp_max_pending - && pool->ltp_pending_count >= pool->ltp_max_pending)) - { - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - return(-1); - } + + if (pool->ltp_pending_count >= pool->ltp_max_pending) + goto failed; task = LDAP_SLIST_FIRST(&pool->ltp_free_list); if (task) { LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltt_next.l); } else { task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task)); - if (task == NULL) { - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - return(-1); - } + if (task == NULL) + goto failed; } task->ltt_start_routine = start_routine; @@ -280,17 +312,18 @@ ldap_pvt_thread_pool_submit ( pool->ltp_pending_count++; LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, task, ltt_next.q); - if (pool->ltp_pause) { - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - return(0); - } - ldap_pvt_thread_cond_signal(&pool->ltp_cond); - if (pool->ltp_open_count < pool->ltp_active_count + pool->ltp_pending_count - && (pool->ltp_open_count < - (pool->ltp_max_count ? pool->ltp_max_count : LDAP_MAXTHR))) + + /* true if ltp_pause != 0 or we should open (create) a thread */ + if (pool->ltp_vary_open_count > 0 && + pool->ltp_open_count < pool->ltp_active_count+pool->ltp_pending_count) { - pool->ltp_open_count++; + if (pool->ltp_pause) + goto done; + pool->ltp_starting++; + pool->ltp_open_count++; + SET_VARY_OPEN_COUNT(pool); + if (0 != ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool)) { @@ -299,6 +332,8 @@ ldap_pvt_thread_pool_submit ( */ pool->ltp_starting--; pool->ltp_open_count--; + SET_VARY_OPEN_COUNT(pool); + if (pool->ltp_open_count == 0) { /* no open threads at all?!? */ @@ -314,24 +349,28 @@ ldap_pvt_thread_pool_submit ( * back out of ltp_pending_count, free the task, * report the error. */ + pool->ltp_pending_count--; LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, task, ldap_int_thread_task_s, ltt_next.q); - pool->ltp_pending_count--; - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - LDAP_FREE(task); - return(-1); + LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, task, + ltt_next.l); + goto failed; } } /* there is another open thread, so this * task will be handled eventually. - * continue on, we have signalled that - * the task is waiting. */ } } + ldap_pvt_thread_cond_signal(&pool->ltp_cond); + done: ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); return(0); + + failed: + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); + return(-1); } /* Set max #threads. value <= 0 means max supported #threads (LDAP_MAXTHR) */ @@ -354,7 +393,10 @@ ldap_pvt_thread_pool_maxthreads( return(-1); ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); + pool->ltp_max_count = max_threads; + SET_VARY_OPEN_COUNT(pool); + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); return(0); } @@ -387,10 +429,16 @@ ldap_pvt_thread_pool_query( case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING: count = pool->ltp_max_pending; + if (count < 0) + count = -count; + if (count == MAX_PENDING) + count = 0; break; case LDAP_PVT_THREAD_POOL_PARAM_OPEN: count = pool->ltp_open_count; + if (count < 0) + count = -count; break; case LDAP_PVT_THREAD_POOL_PARAM_STARTING: @@ -422,32 +470,12 @@ ldap_pvt_thread_pool_query( case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX: break; - case LDAP_PVT_THREAD_POOL_PARAM_STATE: { - static struct { - char *name; - ldap_int_thread_pool_state_t state; - } str2state[] = { - { "running", LDAP_INT_THREAD_POOL_RUNNING }, - { "finishing", LDAP_INT_THREAD_POOL_FINISHING }, - { "stopping", LDAP_INT_THREAD_POOL_STOPPING }, - { NULL } - }; - int i; - - if ( pool->ltp_pause ) { - *((char **)value) = "pausing"; - } else { - for ( i = 0; str2state[ i ].name != NULL; i++ ) { - if ( str2state[ i ].state == pool->ltp_state ) { - break; - } - } - *((char **)value) = str2state[ i ].name; - } - if ( *((char **)value) != NULL ) { - count = -2; - } - } break; + case LDAP_PVT_THREAD_POOL_PARAM_STATE: + *((char **)value) = + pool->ltp_pause ? "pausing" : + !pool->ltp_finishing ? "running" : + pool->ltp_pending_count ? "finishing" : "stopping"; + break; case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN: break; @@ -461,6 +489,23 @@ ldap_pvt_thread_pool_query( return ( count == -1 ? -1 : 0 ); } +/* + * true if pool is pausing; does not lock any mutex to check. + * 0 if not pause, 1 if pause, -1 if error or no pool. + */ +int +ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool ) +{ + int rc = -1; + struct ldap_int_thread_pool_s *pool; + + if ( tpool != NULL && (pool = *tpool) != NULL ) { + rc = pool->ltp_pause; + } + + return rc; +} + /* * wrapper for ldap_pvt_thread_pool_query(), left around * for backwards compatibility @@ -505,22 +550,25 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) if (pool != pptr) return(-1); ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); - pool->ltp_state = run_pending - ? LDAP_INT_THREAD_POOL_FINISHING - : LDAP_INT_THREAD_POOL_STOPPING; + + pool->ltp_finishing = 1; + SET_VARY_OPEN_COUNT(pool); + if (pool->ltp_max_pending > 0) + pool->ltp_max_pending = -pool->ltp_max_pending; + + if (!run_pending) { + while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) { + LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); + LDAP_FREE(task); + } + pool->ltp_pending_count = 0; + } while (pool->ltp_open_count) { if (!pool->ltp_pause) ldap_pvt_thread_cond_broadcast(&pool->ltp_cond); ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); } - ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - - while ((task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL) - { - LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); - LDAP_FREE(task); - } while ((task = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL) { @@ -528,6 +576,7 @@ ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending ) LDAP_FREE(task); } + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); ldap_pvt_thread_cond_destroy(&pool->ltp_pcond); ldap_pvt_thread_cond_destroy(&pool->ltp_cond); ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex); @@ -544,6 +593,7 @@ ldap_int_thread_pool_wrapper ( { struct ldap_int_thread_pool_s *pool = xpool; ldap_int_thread_task_t *task; + ldap_int_tpool_plist_t *work_list; ldap_int_thread_userctx_t ctx, *kctx; unsigned i, keyslot, hash; @@ -578,24 +628,13 @@ ldap_int_thread_pool_wrapper ( pool->ltp_starting--; for (;;) { - while (pool->ltp_pause) - ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); - - if (pool->ltp_state == LDAP_INT_THREAD_POOL_STOPPING) - break; - - task = LDAP_STAILQ_FIRST(&pool->ltp_pending_list); - if (task == NULL) { - if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING) - break; - - if (pool->ltp_open_count > - (pool->ltp_max_count ? pool->ltp_max_count : LDAP_MAXTHR)) - { - /* too many threads running (can happen if the - * maximum threads value is set during ongoing - * operation using ldap_pvt_thread_pool_maxthreads) - * so let this thread die. + work_list = pool->ltp_work_list; /* help the compiler a bit */ + task = LDAP_STAILQ_FIRST(work_list); + if (task == NULL) { /* paused or no pending tasks */ + if (pool->ltp_vary_open_count < 0) { + /* not paused, and either finishing or too many + * threads running (can happen if ltp_max_count + * was reduced) so let this thread die. */ break; } @@ -612,12 +651,11 @@ ldap_int_thread_pool_wrapper ( * check idle time. */ - assert(pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING); ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex); continue; } - LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltt_next.q); + LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q); pool->ltp_pending_count--; pool->ltp_active_count++; ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); @@ -632,6 +670,8 @@ ldap_int_thread_pool_wrapper ( ldap_pvt_thread_cond_signal(&pool->ltp_pcond); } + assert(!pool->ltp_pause); /* thread_keys writable, ltp_open_count >= 0 */ + /* The ltp_mutex lock protects ctx->ltu_key from pool_purgekey() * during this call, since it prevents new pauses. */ ldap_pvt_thread_pool_context_reset(&ctx); @@ -641,8 +681,9 @@ ldap_int_thread_pool_wrapper ( ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex); pool->ltp_open_count--; + SET_VARY_OPEN_COUNT(pool); /* let pool_destroy know we're all done */ - if (pool->ltp_open_count < 1) + if (pool->ltp_open_count == 0) ldap_pvt_thread_cond_signal(&pool->ltp_cond); ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); @@ -651,10 +692,8 @@ ldap_int_thread_pool_wrapper ( return(NULL); } -/* Pause the pool. Return when all other threads are paused. */ -int -ldap_pvt_thread_pool_pause ( - ldap_pvt_thread_pool_t *tpool ) +static int +handle_pause( ldap_pvt_thread_pool_t *tpool, int do_pause ) { struct ldap_int_thread_pool_s *pool; @@ -666,6 +705,9 @@ ldap_pvt_thread_pool_pause ( if (pool == NULL) return(0); + if (! (do_pause || pool->ltp_pause)) + return(0); + ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); /* If someone else has already requested a pause, we have to wait */ @@ -682,14 +724,41 @@ ldap_pvt_thread_pool_pause ( pool->ltp_active_count++; } - /* Wait for everyone else to pause or finish */ - pool->ltp_pause = 1; - while (pool->ltp_active_count > 1) { - ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex); + if (do_pause) { + /* Wait for everyone else to pause or finish */ + pool->ltp_pause = 1; + /* Let ldap_pvt_thread_pool_submit() through to its ltp_pause test, + * and do not finish threads in ldap_pvt_thread_pool_wrapper() */ + pool->ltp_open_count = -pool->ltp_open_count; + SET_VARY_OPEN_COUNT(pool); + /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */ + pool->ltp_work_list = &empty_pending_list; + + while (pool->ltp_active_count > 1) { + ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex); + } } ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); - return(0); + return(!do_pause); +} + +/* + * If a pause was requested, wait for it. If several threads + * are waiting to pause, let through one or more pauses. + * Return 1 if we waited, 0 if not, -1 at parameter error. + */ +int +ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool ) +{ + return handle_pause( tpool, 0 ); +} + +/* Pause the pool. Return when all other threads are paused. */ +int +ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool ) +{ + return handle_pause( tpool, 1 ); } /* End a pause */ @@ -708,9 +777,17 @@ ldap_pvt_thread_pool_resume ( return(0); ldap_pvt_thread_mutex_lock(&pool->ltp_mutex); + + assert(pool->ltp_pause); pool->ltp_pause = 0; - if (pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING) + if (pool->ltp_open_count <= 0) /* true when paused, but be paranoid */ + pool->ltp_open_count = -pool->ltp_open_count; + SET_VARY_OPEN_COUNT(pool); + pool->ltp_work_list = &pool->ltp_pending_list; + + if (!pool->ltp_finishing) ldap_pvt_thread_cond_broadcast(&pool->ltp_cond); + ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex); return(0); } @@ -846,7 +923,7 @@ void *ldap_pvt_thread_pool_context( ) void *ctx = NULL; ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx ); - return ctx ? ctx : &ldap_int_main_thrctx; + return ctx ? ctx : (void *) &ldap_int_main_thrctx; } /* -- GitLab