Skip to content
Snippets Groups Projects
Commit 33f4955c authored by Mark Valence's avatar Mark Valence
Browse files

Add thread pool routines.

parent 4933f894
No related branches found
No related tags found
No related merge requests found
......@@ -148,6 +148,32 @@ ldap_pvt_thread_rdwr_active LDAP_P((ldap_pvt_thread_rdwr_t *rdwrp));
#define LDAP_PVT_THREAD_EINVAL EINVAL
#define LDAP_PVT_THREAD_EBUSY EINVAL
typedef struct t_ldap_pvt_thread_pool *ldap_pvt_thread_pool_t;
LIBLDAP_F( int )
ldap_pvt_thread_pool_initialize LDAP_P((
ldap_pvt_thread_pool_t *pool_out,
int max_concurrency,
int max_pending ));
LIBLDAP_F( int )
ldap_pvt_thread_pool_submit LDAP_P((
ldap_pvt_thread_pool_t pool,
void *(*start_routine)( void * ),
void *arg ));
LIBLDAP_F( int )
ldap_pvt_thread_pool_backload LDAP_P((
ldap_pvt_thread_pool_t pool ));
LIBLDAP_F( int )
ldap_pvt_thread_pool_destroy LDAP_P((
ldap_pvt_thread_pool_t pool,
int run_pending ));
LDAP_END_DECL
#endif /* _LDAP_THREAD_H */
......@@ -11,22 +11,76 @@
#include "portable.h"
#include <ac/stdlib.h>
#include <ac/string.h>
#include "ldap_int_thread.h"
#include "ldap_pvt_thread.h"
LIBLDAP_F( int )
enum {
LDAP_PVT_THREAD_POOL_RUNNING,
LDAP_PVT_THREAD_POOL_FINISHING,
LDAP_PVT_THREAD_POOL_STOPPING
};
typedef struct t_ldap_pvt_thread_listelement {
struct t_ldap_pvt_thread_listelement *next;
} ldap_pvt_thread_listelement, *ldap_pvt_thread_list;
struct t_ldap_pvt_thread_pool {
struct t_ldap_pvt_thread_pool *ltp_next;
ldap_pvt_thread_mutex_t ltp_mutex;
ldap_pvt_thread_cond_t ltp_cond;
ldap_pvt_thread_list ltp_pending_list;
long ltp_state;
long ltp_max_count;
long ltp_max_pending;
long ltp_pending_count;
long ltp_active_count;
long ltp_open_count;
};
typedef struct t_ldap_pvt_thread_ctx {
struct t_ldap_pvt_thread_ctx *ltc_next;
void *(*ltc_start_routine)( void *);
void *ltc_arg;
} ldap_pvt_thread_ctx;
ldap_pvt_thread_list ldap_pvt_thread_pool_list = NULL;
ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
void *ldap_pvt_thread_pool_wrapper( ldap_pvt_thread_pool_t pool );
void *ldap_pvt_thread_enlist( ldap_pvt_thread_list *list, void *elem );
void *ldap_pvt_thread_delist( ldap_pvt_thread_list *list, void *elem );
void *ldap_pvt_thread_onlist( ldap_pvt_thread_list *list, void *elem );
int
ldap_pvt_thread_initialize ( void )
{
return ldap_int_thread_initialize();
int rc;
rc = ldap_int_thread_initialize();
if (rc == 0) {
/* init the mutex that protext the list of pools
*/
ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
}
return rc;
}
LIBLDAP_F( int )
int
ldap_pvt_thread_destroy ( void )
{
while (ldap_pvt_thread_pool_list != NULL) {
ldap_pvt_thread_pool_destroy((ldap_pvt_thread_pool_t)ldap_pvt_thread_pool_list, 0);
}
ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
return ldap_int_thread_destroy();
}
LIBLDAP_F( int )
int
ldap_pvt_thread_get_concurrency ( void )
{
#ifdef HAVE_GETCONCURRENCY
......@@ -36,7 +90,7 @@ ldap_pvt_thread_get_concurrency ( void )
#endif
}
LIBLDAP_F( int )
int
ldap_pvt_thread_set_concurrency ( int concurrency )
{
#ifdef HAVE_SETCONCURRENCY
......@@ -46,7 +100,7 @@ ldap_pvt_thread_set_concurrency ( int concurrency )
#endif
}
LIBLDAP_F( int )
int
ldap_pvt_thread_create (
ldap_pvt_thread_t * thread,
int detach,
......@@ -56,55 +110,55 @@ ldap_pvt_thread_create (
return ldap_int_thread_create(thread, detach, start_routine, arg);
}
LIBLDAP_F( void )
void
ldap_pvt_thread_exit ( void *retval )
{
ldap_int_thread_exit(retval);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_join ( ldap_pvt_thread_t thread, void **status )
{
return ldap_int_thread_join(thread, status);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_kill ( ldap_pvt_thread_t thread, int signo )
{
return ldap_int_thread_kill(thread, signo);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_yield ( void )
{
return ldap_int_thread_yield();
}
LIBLDAP_F( int )
int
ldap_pvt_thread_cond_init ( ldap_pvt_thread_cond_t *cond )
{
return ldap_int_thread_cond_init(cond);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_cond_destroy ( ldap_pvt_thread_cond_t *cond )
{
return ldap_int_thread_cond_destroy(cond);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_cond_signal ( ldap_pvt_thread_cond_t *cond )
{
return ldap_int_thread_cond_signal(cond);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_cond_broadcast ( ldap_pvt_thread_cond_t *cond )
{
return ldap_int_thread_cond_broadcast(cond);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_cond_wait (
ldap_pvt_thread_cond_t *cond,
ldap_pvt_thread_mutex_t *mutex )
......@@ -112,32 +166,374 @@ ldap_pvt_thread_cond_wait (
return ldap_int_thread_cond_wait(cond, mutex);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_mutex_init ( ldap_pvt_thread_mutex_t *mutex )
{
return ldap_int_thread_mutex_init(mutex);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_mutex_destroy ( ldap_pvt_thread_mutex_t *mutex )
{
return ldap_int_thread_mutex_destroy(mutex);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_mutex_lock ( ldap_pvt_thread_mutex_t *mutex )
{
return ldap_int_thread_mutex_lock(mutex);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_mutex_trylock ( ldap_pvt_thread_mutex_t *mutex )
{
return ldap_int_thread_mutex_trylock(mutex);
}
LIBLDAP_F( int )
int
ldap_pvt_thread_mutex_unlock ( ldap_pvt_thread_mutex_t *mutex )
{
return ldap_int_thread_mutex_unlock(mutex);
}
#ifdef NO_THREADS
/* There must be a separate implementation when NO_THREADS is on.
* Since ldap_pvt_thread_pool_wrapper loops, there's no way to
* simply let the underlying (stub) thread implementation take
* care of things (unless there was an #ifdef that removed the
* "while" in ldap_pvt_thread_pool_wrapper, but why do all the
* extra work of init/submit/destroy when all that's needed
* are these stubs?)
*/
int
ldap_pvt_thread_pool_startup ( void )
{
return(0);
}
int
ldap_pvt_thread_pool_shutdown ( void )
{
return(0);
}
int
ldap_pvt_thread_pool_initialize ( ldap_pvt_thread_pool_t *pool_out, int max_concurrency, int max_pending )
{
*pool_out = NULL;
return(0);
}
int
ldap_pvt_thread_pool_submit ( ldap_pvt_thread_pool_t pool, void *(*start_routine)( void * ), void *arg )
{
(start_routine)(arg);
return(0);
}
int
ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t pool )
{
return(0);
}
int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t pool, int run_pending )
{
return(0);
}
#else
int
ldap_pvt_thread_pool_startup ( void )
{
return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
}
int
ldap_pvt_thread_pool_shutdown ( void )
{
while (ldap_pvt_thread_pool_list != NULL) {
ldap_pvt_thread_pool_destroy((ldap_pvt_thread_pool_t)ldap_pvt_thread_pool_list, 0);
}
ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
return(0);
}
int
ldap_pvt_thread_pool_initialize ( ldap_pvt_thread_pool_t *pool_out, int max_concurrency, int max_pending )
{
ldap_pvt_thread_pool_t pool;
ldap_pvt_thread_t thr;
*pool_out = NULL;
pool = (ldap_pvt_thread_pool_t)calloc(1, sizeof(struct t_ldap_pvt_thread_pool));
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
ldap_pvt_thread_cond_init(&pool->ltp_cond);
pool->ltp_state = LDAP_PVT_THREAD_POOL_RUNNING;
pool->ltp_max_count = max_concurrency;
pool->ltp_max_pending = max_pending;
ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
ldap_pvt_thread_enlist(&ldap_pvt_thread_pool_list, pool);
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
/* start up one thread, just so there is one */
pool->ltp_open_count++;
if (ldap_pvt_thread_create( &thr, 1, (void *)ldap_pvt_thread_pool_wrapper, pool ) != 0) {
/* couldn't start one? then don't start any */
ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
ldap_pvt_thread_delist(&ldap_pvt_thread_pool_list, pool);
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
free(pool);
return(-1);
}
*pool_out = pool;
return(0);
}
int
ldap_pvt_thread_pool_submit ( ldap_pvt_thread_pool_t pool, void *(*start_routine)( void * ), void *arg )
{
ldap_pvt_thread_ctx *ctx;
int need_thread = 0;
ldap_pvt_thread_t thr;
if (pool == NULL)
return(-1);
ctx = (ldap_pvt_thread_ctx *)calloc(1, sizeof(ldap_pvt_thread_ctx));
if (ctx == NULL)
return(-1);
ctx->ltc_start_routine = start_routine;
ctx->ltc_arg = arg;
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
if (pool->ltp_state != LDAP_PVT_THREAD_POOL_RUNNING
|| (pool->ltp_max_pending > 0 && pool->ltp_pending_count >= pool->ltp_max_pending))
{
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
free(ctx);
return(-1);
}
pool->ltp_pending_count++;
ldap_pvt_thread_enlist(&pool->ltp_pending_list, ctx);
if ((pool->ltp_open_count <= 0 || pool->ltp_open_count == pool->ltp_active_count)
&& (pool->ltp_max_count <= 0 || pool->ltp_open_count < pool->ltp_max_count))
{
pool->ltp_open_count++;
need_thread = 1;
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
if (need_thread) {
if (ldap_pvt_thread_create( &thr, 1, (void *)ldap_pvt_thread_pool_wrapper, pool ) != 0) {
/* couldn't create thread. back out of
* ltp_open_count and check for even worse things.
*/
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_open_count--;
if (pool->ltp_open_count == 0) {
/* no open threads at all?!? this will never happen
* because we always leave at least one thread open.
*/
if (ldap_pvt_thread_delist(&pool->ltp_pending_list, ctx)) {
/* no open threads, context not handled, so
* back out of ltp_pending_count, free the context,
* report the error.
*/
pool->ltp_pending_count++;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
free(ctx);
return(-1);
}
}
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
/* there is another open thread, so this
* context will be handled eventually.
* continue on and signal that the context
* is waiting.
*/
}
}
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
return(0);
}
int
ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t pool )
{
int count;
if (pool == NULL)
return(0);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
count = pool->ltp_pending_count + pool->ltp_active_count;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
return(count);
}
int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t pool, int run_pending )
{
long waiting;
ldap_pvt_thread_ctx *ctx;
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
pool = ldap_pvt_thread_delist(&ldap_pvt_thread_pool_list, pool);
ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
if (pool == NULL)
return(-1);
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
if (run_pending)
pool->ltp_state = LDAP_PVT_THREAD_POOL_FINISHING;
else
pool->ltp_state = LDAP_PVT_THREAD_POOL_STOPPING;
waiting = pool->ltp_open_count;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
/* broadcast could be used here, but only after
* it is fixed in the NT thread implementation
*/
while (--waiting >= 0)
ldap_pvt_thread_cond_signal(&pool->ltp_cond);
do {
ldap_pvt_thread_yield();
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
waiting = pool->ltp_open_count;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
} while (waiting > 0);
while (ctx = (ldap_pvt_thread_ctx *)ldap_pvt_thread_delist(&pool->ltp_pending_list, NULL))
free(ctx);
ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
free(pool);
return(0);
}
void *
ldap_pvt_thread_pool_wrapper ( ldap_pvt_thread_pool_t pool )
{
ldap_pvt_thread_ctx *ctx;
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
while (pool->ltp_state != LDAP_PVT_THREAD_POOL_STOPPING) {
if (pool->ltp_state == LDAP_PVT_THREAD_POOL_RUNNING) {
ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
if (pool->ltp_state == LDAP_PVT_THREAD_POOL_STOPPING)
break;
}
ctx = ldap_pvt_thread_delist(&pool->ltp_pending_list, NULL);
if (ctx == NULL) {
if (pool->ltp_state == LDAP_PVT_THREAD_POOL_FINISHING)
break;
/* we could check an idle timer here, and let the
* thread die if it has been inactive for a while.
* only die if there are other open threads (i.e.,
* always have at least one thread open).
*/
continue;
}
pool->ltp_pending_count--;
pool->ltp_active_count++;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
(ctx->ltc_start_routine)(ctx->ltc_arg);
free(ctx);
/* if we use an idle timer, here's
* a good place to update it
*/
ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
pool->ltp_active_count--;
}
pool->ltp_open_count--;
ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
ldap_pvt_thread_exit(NULL);
return(NULL);
}
void *
ldap_pvt_thread_enlist( ldap_pvt_thread_list *list, void *elem )
{
ldap_pvt_thread_listelement *prev;
if (elem == NULL)
return(NULL);
((ldap_pvt_thread_listelement *)elem)->next = NULL;
if (*list == NULL) {
*list = elem;
return(elem);
}
for (prev = *list ; prev->next != NULL; prev = prev->next) ;
prev->next = elem;
return(elem);
}
void *
ldap_pvt_thread_delist( ldap_pvt_thread_list *list, void *elem )
{
ldap_pvt_thread_listelement *prev;
if (*list == NULL)
return(NULL);
if (elem == NULL)
elem = *list;
if (*list == elem) {
*list = ((ldap_pvt_thread_listelement *)elem)->next;
return(elem);
}
for (prev = *list ; prev->next != NULL; prev = prev->next) {
if (prev->next == elem) {
prev->next = ((ldap_pvt_thread_listelement *)elem)->next;
return(elem);
}
}
return(NULL);
}
void *
ldap_pvt_thread_onlist( ldap_pvt_thread_list *list, void *elem )
{
ldap_pvt_thread_listelement *prev;
if (elem == NULL || *list == NULL)
return(NULL);
for (prev = *list ; prev != NULL; prev = prev->next) {
if (prev == elem)
return(elem);
}
return(NULL);
}
#endif /* NO_THREADS */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment