tpool.c 24 KB
Newer Older
1
/* $OpenLDAP$ */
2
3
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
 *
Kurt Zeilenga's avatar
Kurt Zeilenga committed
4
 * Copyright 1998-2007 The OpenLDAP Foundation.
5
6
 * All rights reserved.
 *
7
8
9
10
11
12
13
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted only as authorized by the OpenLDAP
 * Public License.
 *
 * A copy of this license is available in file LICENSE in the
 * top-level directory of the distribution or, alternatively, at
 * <http://www.OpenLDAP.org/license.html>.
14
15
16
17
18
19
 */

#include "portable.h"

#include <stdio.h>

20
#include <ac/stdarg.h>
21
22
23
#include <ac/stdlib.h>
#include <ac/string.h>
#include <ac/time.h>
24
#include <ac/errno.h>
25
26

#include "ldap-int.h"
27
#include "ldap_pvt_thread.h" /* Get the thread interface */
28
#include "ldap_queue.h"
29
30
#define LDAP_THREAD_POOL_IMPLEMENTATION
#include "ldap_thr_debug.h"  /* May rename symbols defined below */
31
32
33

#ifndef LDAP_THREAD_HAVE_TPOOL

34
typedef enum ldap_int_thread_pool_state_e {
35
36
	LDAP_INT_THREAD_POOL_RUNNING,
	LDAP_INT_THREAD_POOL_FINISHING,
37
	LDAP_INT_THREAD_POOL_STOPPING
38
} ldap_int_thread_pool_state_t;
39

40
41
42
43
44
45
46
47
48
49
typedef struct ldap_int_thread_key_s {
	void *ltk_key;
	void *ltk_data;
	ldap_pvt_thread_pool_keyfree_t *ltk_free;
} ldap_int_thread_key_t;

/* Max number of thread-specific keys we store per thread.
 * We don't expect to use many...
 */
#define	MAXKEYS	32
50
#define	LDAP_MAXTHR	1024	/* must be a power of 2 */
51

Howard Chu's avatar
Howard Chu committed
52
53
54
55
56
typedef struct ldap_int_thread_userctx_s {
	ldap_pvt_thread_t ltu_id;
	ldap_int_thread_key_t ltu_key[MAXKEYS];
} ldap_int_thread_userctx_t;

57
58
static ldap_pvt_thread_t tid_zero;

59
/* Thread ID -> context mapping (poor open-addressed hash table).
60
61
62
 * Protected by ldap_pvt_thread_pool_mutex except during pauses,
 * when it is reserved for ldap_pvt_thread_pool_purgekey().
 */
63
64
static struct {
	ldap_pvt_thread_t id;
65
66
	ldap_int_thread_userctx_t *ctx;		/* set when id != tid_zero */
#	define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
67
} thread_keys[LDAP_MAXTHR];
68
69
70
71
72
73
74
75

#define	TID_HASH(tid, hash) do { \
	unsigned const char *ptr_ = (unsigned const char *)&(tid); \
	unsigned i_; \
	for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
		(hash) += ((hash) << 5) ^ ptr_[i_]; \
} while(0)

76

77
78
typedef struct ldap_int_thread_ctx_s {
	union {
79
80
	LDAP_STAILQ_ENTRY(ldap_int_thread_ctx_s) q;
	LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) l;
81
	LDAP_SLIST_ENTRY(ldap_int_thread_ctx_s) al;
82
	} ltc_next;
83
	ldap_pvt_thread_start_t *ltc_start_routine;
84
85
	void *ltc_arg;
} ldap_int_thread_ctx_t;
86
87

struct ldap_int_thread_pool_s {
88
	LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
89
90
	ldap_pvt_thread_mutex_t ltp_mutex;
	ldap_pvt_thread_cond_t ltp_cond;
91
	ldap_pvt_thread_cond_t ltp_pcond;
92
93
	LDAP_STAILQ_HEAD(tcq, ldap_int_thread_ctx_s) ltp_pending_list;
	LDAP_SLIST_HEAD(tcl, ldap_int_thread_ctx_s) ltp_free_list;
94
	LDAP_SLIST_HEAD(tclq, ldap_int_thread_ctx_s) ltp_active_list;
95
	ldap_int_thread_pool_state_t ltp_state;
96
	int ltp_pause;
97
98
99
100
101
	long ltp_max_count;
	long ltp_max_pending;
	long ltp_pending_count;
	long ltp_active_count;
	long ltp_open_count;
102
	long ltp_starting;
103
104
};

105
static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
106
	ldap_int_thread_pool_list =
107
	LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
108

109
110
static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;

Pierangelo Masarati's avatar
   
Pierangelo Masarati committed
111
static void *ldap_int_thread_pool_wrapper( void *pool );
112

113
114
static ldap_pvt_thread_t ldap_int_main_tid;

Howard Chu's avatar
Howard Chu committed
115
static ldap_int_thread_userctx_t ldap_int_main_thrctx;
116

117
118
119
int
ldap_int_thread_pool_startup ( void )
{
120
	ldap_int_main_tid = ldap_pvt_thread_self();
Howard Chu's avatar
Howard Chu committed
121
	ldap_int_main_thrctx.ltu_id = ldap_int_main_tid;
122

123
	return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
124
125
126
127
128
}

int
ldap_int_thread_pool_shutdown ( void )
{
129
	struct ldap_int_thread_pool_s *pool;
130

131
	while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
132
		(ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
133
134
135
136
137
	}
	ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
	return(0);
}

138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
typedef struct ldap_lazy_sem_t {
	ldap_pvt_thread_mutex_t ls_mutex;
	ldap_pvt_thread_cond_t	ls_cond;
	int ls_sem_value;
	/*
	 * when more than ls_lazy_count number of resources
	 * becmoes available, the thread wating for the resources will
	 * be waken up in order to prevent frequent blocking/waking-up
	 */
	unsigned int ls_lazy_count;
	/*
	 * only one thread(listener) will wait on this semaphore
	 * using a flag instead of a list
	 */
	int ls_wait;
} ldap_lazy_sem_t;

ldap_lazy_sem_t* thread_pool_sem = NULL;

int
158
159
ldap_lazy_sem_init( unsigned int value, unsigned int lazyness )
{
160
161
162
163
164
165
166
167
168
169
170
171
172
173
	thread_pool_sem = (ldap_lazy_sem_t*) LDAP_CALLOC(1,
		sizeof( ldap_lazy_sem_t ));

	if( thread_pool_sem == NULL ) return -1;

	ldap_pvt_thread_mutex_init( &thread_pool_sem->ls_mutex );
	ldap_pvt_thread_cond_init( &thread_pool_sem->ls_cond );
	thread_pool_sem->ls_sem_value = value;
	thread_pool_sem->ls_lazy_count = lazyness;
	thread_pool_sem->ls_wait = 0;

	return 0;
}

174
175
176
177
/* FIXME: move to some approprite header */
int ldap_lazy_sem_dec( ldap_lazy_sem_t* ls );
int ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls );

178
179
180
181
182
/*
 * ldap_lazy_sem_wait is used if a caller is blockable(listener).
 * Otherwise use ldap_lazy_sem_dec (worker)
 */
int
183
184
ldap_lazy_sem_op_submit( ldap_lazy_sem_t* ls )
{
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
	if ( ls == NULL ) return -1;

	/* only worker thread has its thread ctx */
	if ( ldap_pvt_thread_pool_context() ) {
		/* worker thread */
		return ldap_lazy_sem_dec( ls );
	} else {
		/* listener */
		return ldap_lazy_sem_wait( ls );
	}
}

/*
 * test if given semaphore's count is zero.
 * If 0, the caller is blocked 
 * If not, the count is decremented.
 */
int
203
204
ldap_lazy_sem_wait ( ldap_lazy_sem_t* ls )
{
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
	ldap_pvt_thread_mutex_lock( &ls->ls_mutex );

lazy_sem_retry:
	if ( ls->ls_sem_value <= 0 ) {
		/* no more avaliable resources */
		ls->ls_wait = 1;
		ldap_pvt_thread_cond_wait( &ls->ls_cond, &ls->ls_mutex );
		goto lazy_sem_retry;
	} else {
		/* avaliable resources */
		ls->ls_sem_value--;
	}

	ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );

	return 0;
}

/*
 * decrement the count without blocking
 * even when the count becomes less than or equal to 0
 */
int
228
229
ldap_lazy_sem_dec( ldap_lazy_sem_t* ls )
{
230
231
232
233
234
235
236
237
238
239
240
241
242
243
	ldap_pvt_thread_mutex_lock( &ls->ls_mutex );

	ls->ls_sem_value--;

	ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );

	return 0;
}

/*
 * Increment the count by one and test if it is greater or
 * equal to lazyness. If it is, wake up a blocked thread.
 */
int
244
245
ldap_lazy_sem_post( ldap_lazy_sem_t* ls )
{
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
	if( ls == NULL ) return (-1);

	ldap_pvt_thread_mutex_lock( &ls->ls_mutex );

	ls->ls_sem_value++;
	if ( ls->ls_wait ) {
		if ( ls->ls_sem_value >= ls->ls_lazy_count ) {
			ls->ls_wait = 0;
			ldap_pvt_thread_cond_signal( &ls->ls_cond );
		}
	}

	ldap_pvt_thread_mutex_unlock( &ls->ls_mutex );

	return 0;
}

263
264
265
int
ldap_pvt_thread_pool_init (
	ldap_pvt_thread_pool_t *tpool,
266
	int max_threads,
267
268
269
	int max_pending )
{
	ldap_pvt_thread_pool_t pool;
270
	int rc;
271

272
273
274
	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
		max_threads = 0;

275
276
277
278
279
280
	*tpool = NULL;
	pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
		sizeof(struct ldap_int_thread_pool_s));

	if (pool == NULL) return(-1);

281
282
283
284
	rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
	if (rc != 0)
		return(rc);
	rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
285
286
287
	if (rc != 0)
		return(rc);
	rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
288
289
	if (rc != 0)
		return(rc);
290
	pool->ltp_state = LDAP_INT_THREAD_POOL_RUNNING;
291
	pool->ltp_max_count = max_threads;
292
	pool->ltp_max_pending = max_pending;
293
294
	LDAP_STAILQ_INIT(&pool->ltp_pending_list);
	LDAP_SLIST_INIT(&pool->ltp_free_list);
295
	LDAP_SLIST_INIT(&pool->ltp_active_list);
296
	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
297
	LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
298
299
	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);

300
301
302
303
304
305
306
307
308
309
#if 0
	/* THIS WILL NOT WORK on some systems.  If the process
	 * forks after starting a thread, there is no guarantee
	 * that the thread will survive the fork.  For example,
	 * slapd forks in order to daemonize, and does so after
	 * calling ldap_pvt_thread_pool_init.  On some systems,
	 * this initial thread does not run in the child process,
	 * but ltp_open_count == 1, so two things happen: 
	 * 1) the first client connection fails, and 2) when
	 * slapd is kill'ed, it never terminates since it waits
310
	 * for all worker threads to exit. */
311
312
313
314

	/* start up one thread, just so there is one. no need to
	 * lock the mutex right now, since no threads are running.
	 */
315
316
	pool->ltp_open_count++;

317
	ldap_pvt_thread_t thr;
Pierangelo Masarati's avatar
   
Pierangelo Masarati committed
318
	rc = ldap_pvt_thread_create( &thr, 1, ldap_int_thread_pool_wrapper, pool );
319
320
321
322

	if( rc != 0) {
		/* couldn't start one?  then don't start any */
		ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
323
		LDAP_STAILQ_REMOVE(ldap_int_thread_pool_list, pool, 
Howard Chu's avatar
Howard Chu committed
324
			ldap_int_thread_pool_s, ltp_next);
325
		ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
326
		ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
327
328
		ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
		ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
Howard Chu's avatar
Howard Chu committed
329
		LDAP_FREE(pool);
330
331
		return(-1);
	}
332
#endif
333
334
335
336
337

	*tpool = pool;
	return(0);
}

338

339
340
341
int
ldap_pvt_thread_pool_submit (
	ldap_pvt_thread_pool_t *tpool,
342
	ldap_pvt_thread_start_t *start_routine, void *arg )
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
{
	struct ldap_int_thread_pool_s *pool;
	ldap_int_thread_ctx_t *ctx;
	int need_thread = 0;
	ldap_pvt_thread_t thr;

	if (tpool == NULL)
		return(-1);

	pool = *tpool;

	if (pool == NULL)
		return(-1);

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
358
	if (pool->ltp_state != LDAP_INT_THREAD_POOL_RUNNING
359
360
361
362
363
364
		|| (pool->ltp_max_pending > 0
			&& pool->ltp_pending_count >= pool->ltp_max_pending))
	{
		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
		return(-1);
	}
365

366
	ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list);
367
	if (ctx) {
368
		LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
369
370
371
372
373
374
375
376
377
378
379
380
	} else {
		ctx = (ldap_int_thread_ctx_t *) LDAP_MALLOC(
			sizeof(ldap_int_thread_ctx_t));
		if (ctx == NULL) {
			ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
			return(-1);
		}
	}

	ctx->ltc_start_routine = start_routine;
	ctx->ltc_arg = arg;

381
	pool->ltp_pending_count++;
382
	LDAP_STAILQ_INSERT_TAIL(&pool->ltp_pending_list, ctx, ltc_next.q);
383
	if (pool->ltp_pause) {
384
385
386
		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
		return(0);
	}
387
	ldap_pvt_thread_cond_signal(&pool->ltp_cond);
388
	if (pool->ltp_open_count < pool->ltp_active_count + pool->ltp_pending_count
389
390
		&& (pool->ltp_open_count <
			(pool->ltp_max_count ? pool->ltp_max_count : LDAP_MAXTHR)))
391
392
	{
		pool->ltp_open_count++;
393
		pool->ltp_starting++;
394
395
396
397
		need_thread = 1;
	}
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);

398
#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
399
400
401
	ldap_lazy_sem_op_submit( thread_pool_sem );
#endif

402
	if (need_thread) {
403
404
		int rc;

405
		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
406
407
408

		rc = ldap_pvt_thread_create( &thr, 1,
			ldap_int_thread_pool_wrapper, pool );
409
410
		pool->ltp_starting--;
		if (rc != 0) {
411
412
413
414
415
			/* couldn't create thread.  back out of
			 * ltp_open_count and check for even worse things.
			 */
			pool->ltp_open_count--;
			if (pool->ltp_open_count == 0) {
416
				/* no open threads at all?!?
417
				 */
418
				ldap_int_thread_ctx_t *ptr;
Hallvard Furuseth's avatar
Hallvard Furuseth committed
419
420
421
422

				/* let pool_destroy know there are no more threads */
				ldap_pvt_thread_cond_signal(&pool->ltp_cond);

423
				LDAP_STAILQ_FOREACH(ptr, &pool->ltp_pending_list, ltc_next.q)
424
425
					if (ptr == ctx) break;
				if (ptr == ctx) {
426
427
428
429
					/* no open threads, context not handled, so
					 * back out of ltp_pending_count, free the context,
					 * report the error.
					 */
430
					LDAP_STAILQ_REMOVE(&pool->ltp_pending_list, ctx, 
431
						ldap_int_thread_ctx_s, ltc_next.q);
Hallvard Furuseth's avatar
Hallvard Furuseth committed
432
					pool->ltp_pending_count--;
433
					ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
Howard Chu's avatar
Howard Chu committed
434
					LDAP_FREE(ctx);
435
436
437
438
439
440
441
442
443
					return(-1);
				}
			}
			/* there is another open thread, so this
			 * context will be handled eventually.
			 * continue on and signal that the context
			 * is waiting.
			 */
		}
444
		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
445
446
447
448
449
	}

	return(0);
}

450
451
452
453
454
int
ldap_pvt_thread_pool_maxthreads ( ldap_pvt_thread_pool_t *tpool, int max_threads )
{
	struct ldap_int_thread_pool_s *pool;

455
456
457
	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
		max_threads = 0;

458
459
460
461
462
463
464
465
466
467
468
469
470
471
	if (tpool == NULL)
		return(-1);

	pool = *tpool;

	if (pool == NULL)
		return(-1);

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
	pool->ltp_max_count = max_threads;
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
	return(0);
}

472
int
473
ldap_pvt_thread_pool_query ( ldap_pvt_thread_pool_t *tpool, ldap_pvt_thread_pool_param_t param, void *value )
474
{
475
476
	struct ldap_int_thread_pool_s	*pool;
	int				count = -1;
477

478
479
480
	if ( tpool == NULL || value == NULL ) {
		return -1;
	}
481
482
483

	pool = *tpool;

484
485
486
	if ( pool == NULL ) {
		return 0;
	}
487
488

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
	switch ( param ) {
	case LDAP_PVT_THREAD_POOL_PARAM_MAX:
		count = pool->ltp_max_count;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
		count = pool->ltp_max_pending;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
		count = pool->ltp_open_count;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
		count = pool->ltp_starting;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
		count = pool->ltp_active_count;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
		count = pool->ltp_pending_count;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
		count = pool->ltp_pending_count + pool->ltp_active_count;
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
		break;

	case LDAP_PVT_THREAD_POOL_PARAM_STATE: {
		static struct {
			char				*name;
			ldap_int_thread_pool_state_t	state;
		}		str2state[] = {
			{ "running",	LDAP_INT_THREAD_POOL_RUNNING },
			{ "finishing",	LDAP_INT_THREAD_POOL_FINISHING },
			{ "stopping",	LDAP_INT_THREAD_POOL_STOPPING },
			{ NULL }
		};
		int		i;

539
540
541
542
543
544
545
		if ( pool->ltp_pause ) {
			*((char **)value) = "pausing";
		} else {
			for ( i = 0; str2state[ i ].name != NULL; i++ ) {
				if ( str2state[ i ].state == pool->ltp_state ) {
					break;
				}
546
			}
547
			*((char **)value) = str2state[ i ].name;
548
		}
549
		if ( *((char **)value) != NULL ) {
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
			count = -2;
		}
		} break;
	}
	ldap_pvt_thread_mutex_unlock( &pool->ltp_mutex );

	if ( count > -1 ) {
		*((int *)value) = count;
	}

	return ( count == -1 ? -1 : 0 );
}

/*
 * wrapper for ldap_pvt_thread_pool_query(), left around
 * for backwards compatibility
 */
int
ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
{
	int	rc, count;

	rc = ldap_pvt_thread_pool_query( tpool,
		LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );

	if ( rc == 0 ) {
		return count;
	}

	return rc;
580
581
582
583
584
}

int
ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
{
585
	struct ldap_int_thread_pool_s *pool, *pptr;
586
587
588
589
590
591
592
593
594
595
	ldap_int_thread_ctx_t *ctx;

	if (tpool == NULL)
		return(-1);

	pool = *tpool;

	if (pool == NULL) return(-1);

	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
596
	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
597
598
		if (pptr == pool) break;
	if (pptr == pool)
599
		LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
600
			ldap_int_thread_pool_s, ltp_next);
601
602
	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);

603
	if (pool != pptr) return(-1);
604
605
606
607
608
609

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
	pool->ltp_state = run_pending
		? LDAP_INT_THREAD_POOL_FINISHING
		: LDAP_INT_THREAD_POOL_STOPPING;

610
611
612
	while (pool->ltp_open_count) {
		if (!pool->ltp_pause)
			ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
613
614
		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
	}
615
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
616

617
	while ((ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list)) != NULL)
618
	{
619
		LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
Howard Chu's avatar
Howard Chu committed
620
		LDAP_FREE(ctx);
621
622
	}

623
	while ((ctx = LDAP_SLIST_FIRST(&pool->ltp_free_list)) != NULL)
624
	{
625
		LDAP_SLIST_REMOVE_HEAD(&pool->ltp_free_list, ltc_next.l);
Howard Chu's avatar
Howard Chu committed
626
		LDAP_FREE(ctx);
627
628
	}

629
	ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
630
631
	ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
	ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
Howard Chu's avatar
Howard Chu committed
632
	LDAP_FREE(pool);
633
#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
634
635
636
637
	if ( thread_pool_sem ) {
		LDAP_FREE( thread_pool_sem );
	}
#endif
638
639
640
641
642
	return(0);
}

static void *
ldap_int_thread_pool_wrapper ( 
Pierangelo Masarati's avatar
   
Pierangelo Masarati committed
643
	void *xpool )
644
{
Pierangelo Masarati's avatar
   
Pierangelo Masarati committed
645
	struct ldap_int_thread_pool_s *pool = xpool;
646
	ldap_int_thread_ctx_t *ctx;
Howard Chu's avatar
Howard Chu committed
647
	ldap_int_thread_userctx_t uctx;
648
	unsigned i, keyslot, hash;
649
650
651
652

	if (pool == NULL)
		return NULL;

653
	for ( i=0; i<MAXKEYS; i++ ) {
Howard Chu's avatar
Howard Chu committed
654
		uctx.ltu_key[i].ltk_key = NULL;
655
656
	}

Howard Chu's avatar
Howard Chu committed
657
	uctx.ltu_id = ldap_pvt_thread_self();
658
	TID_HASH(uctx.ltu_id, hash);
659

660
661
	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);

662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
	/* when paused, thread_keys[] is reserved for pool_purgekey() */
	while (pool->ltp_pause)
		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);

	/* find a key slot to give this thread ID and store a
	 * pointer to our keys there; start at the thread ID
	 * itself (mod LDAP_MAXTHR) and look for an empty slot.
	 */
	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
	for (keyslot = hash & (LDAP_MAXTHR-1);
		!ldap_pvt_thread_equal(thread_keys[keyslot].id, tid_zero);
		keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
	thread_keys[keyslot].id = uctx.ltu_id;
	thread_keys[keyslot].ctx = &uctx;
	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
677

678
679
680
681
682
683
684
	for (;;) {
		while (pool->ltp_pause)
			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);

		if (pool->ltp_state == LDAP_INT_THREAD_POOL_STOPPING)
			break;

685
		ctx = LDAP_STAILQ_FIRST(&pool->ltp_pending_list);
686
		if (ctx == NULL) {
687
688
			if (pool->ltp_state == LDAP_INT_THREAD_POOL_FINISHING)
				break;
689

690
691
			if (pool->ltp_open_count >
				(pool->ltp_max_count ? pool->ltp_max_count : LDAP_MAXTHR))
692
693
694
695
696
697
698
699
700
			{
				/* too many threads running (can happen if the
				 * maximum threads value is set during ongoing
				 * operation using ldap_pvt_thread_pool_maxthreads)
				 * so let this thread die.
				 */
				break;
			}

701
702
703
			/* we could check an idle timer here, and let the
			 * thread die if it has been inactive for a while.
			 * only die if there are other open threads (i.e.,
704
705
706
			 * always have at least one thread open).  the check
			 * should be like this:
			 *   if (pool->ltp_open_count > 1 && pool->ltp_starting == 0)
707
			 *       check timer, wait if ltp_pause, leave thread (break;)
708
709
710
			 *
			 * Just use pthread_cond_timedwait if we want to
			 * check idle time.
711
712
			 */

713
			assert(pool->ltp_state == LDAP_INT_THREAD_POOL_RUNNING);
Hallvard Furuseth's avatar
Hallvard Furuseth committed
714
			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
715
716
717
			continue;
		}

718
		LDAP_STAILQ_REMOVE_HEAD(&pool->ltp_pending_list, ltc_next.q);
719
		pool->ltp_pending_count--;
720

721
		LDAP_SLIST_INSERT_HEAD(&pool->ltp_active_list, ctx, ltc_next.al);
722
723
724
		pool->ltp_active_count++;
		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);

Howard Chu's avatar
Howard Chu committed
725
		ctx->ltc_start_routine(&uctx, ctx->ltc_arg);
726

727
#ifdef LDAP_PVT_THREAD_POOL_SEM_LOAD_CONTROL
728
729
		ldap_lazy_sem_post( thread_pool_sem );
#endif
730
		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
731
732
		LDAP_SLIST_REMOVE(&pool->ltp_active_list, ctx,
			ldap_int_thread_ctx_s, ltc_next.al);
733
		LDAP_SLIST_INSERT_HEAD(&pool->ltp_free_list, ctx, ltc_next.l);
734
		pool->ltp_active_count--;
735

736
737
738
		/* let pool_pause know when it is the sole active thread */
		if (pool->ltp_active_count < 2)
			ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
739
740
	}

741
	ldap_pvt_thread_pool_context_reset(&uctx);
742

743
744
745
746
747
	/* Needed if context_reset can let another thread request a pause */
	while (pool->ltp_pause)
		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);

	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
748
	thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
749
	thread_keys[keyslot].id = tid_zero;
750
	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
751

752
	pool->ltp_open_count--;
753
754
755
756
757

	/* let pool_destroy know we're all done */
	if (pool->ltp_open_count < 1)
		ldap_pvt_thread_cond_signal(&pool->ltp_cond);

758
759
760
761
762
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);

	ldap_pvt_thread_exit(NULL);
	return(NULL);
}
763

764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
int
ldap_pvt_thread_pool_pause ( 
	ldap_pvt_thread_pool_t *tpool )
{
	struct ldap_int_thread_pool_s *pool;

	if (tpool == NULL)
		return(-1);

	pool = *tpool;

	if (pool == NULL)
		return(0);

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);

	/* If someone else has already requested a pause, we have to wait */
781
	while (pool->ltp_pause) {
782
783
		pool->ltp_pending_count++;
		pool->ltp_active_count--;
784
785
786
		/* let the other pool_pause() know when it can proceed */
		if (pool->ltp_active_count < 2)
			ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
787
788
789
790
		ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
		pool->ltp_pending_count--;
		pool->ltp_active_count++;
	}
791
792
793

	/* Wait for everyone else to pause or finish */
	pool->ltp_pause = 1;
794
795
796
	while (pool->ltp_active_count > 1) {
		ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
	}
797

798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
	return(0);
}

int
ldap_pvt_thread_pool_resume ( 
	ldap_pvt_thread_pool_t *tpool )
{
	struct ldap_int_thread_pool_s *pool;

	if (tpool == NULL)
		return(-1);

	pool = *tpool;

	if (pool == NULL)
		return(0);

	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
817
	pool->ltp_pause = 0;
818
819
820
821
822
	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
	return(0);
}

823
824
825
826
827
828
int ldap_pvt_thread_pool_getkey(
	void *xctx,
	void *key,
	void **data,
	ldap_pvt_thread_pool_keyfree_t **kfree )
{
Howard Chu's avatar
Howard Chu committed
829
	ldap_int_thread_userctx_t *ctx = xctx;
830
831
	int i;

832
	if ( !ctx || !key || !data ) return EINVAL;
833

Howard Chu's avatar
Howard Chu committed
834
835
836
837
	for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
		if ( ctx->ltu_key[i].ltk_key == key ) {
			*data = ctx->ltu_key[i].ltk_data;
			if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
838
839
840
841
842
843
			return 0;
		}
	}
	return ENOENT;
}

844
845
846
847
848
849
850
851
852
853
854
855
static void
clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
{
	int j = i;
	while ( ++j < MAXKEYS && ctx->ltu_key[j].ltk_key );
	if ( --j != i ) {
		ctx->ltu_key[i] = ctx->ltu_key[j];
		i = j;
	}
	ctx->ltu_key[i].ltk_key = NULL;
}

856
857
858
859
860
861
int ldap_pvt_thread_pool_setkey(
	void *xctx,
	void *key,
	void *data,
	ldap_pvt_thread_pool_keyfree_t *kfree )
{
Howard Chu's avatar
Howard Chu committed
862
	ldap_int_thread_userctx_t *ctx = xctx;
863
	int i, found;
864

865
	if ( !ctx || !key ) return EINVAL;
866

867
868
869
870
871
872
	for ( i=found=0; i<MAXKEYS; i++ ) {
		if ( ctx->ltu_key[i].ltk_key == key ) {
			found = 1;
			break;
		} else if ( !ctx->ltu_key[i].ltk_key ) {
			break;
873
874
		}
	}
875
876
877
878
879
880
881
882
883
884
885
886

	if ( data || kfree ) {
		if ( i>=MAXKEYS )
			return ENOMEM;
		ctx->ltu_key[i].ltk_key = key;
		ctx->ltu_key[i].ltk_data = data;
		ctx->ltu_key[i].ltk_free = kfree;
	} else if ( found ) {
		clear_key_idx( ctx, i );
	}

	return 0;
887
}
888

889
890
891
892
893
894
/* Free all elements with this key, no matter which thread they're in.
 * May only be called while the pool is paused.
 */
void ldap_pvt_thread_pool_purgekey( void *key )
{
	int i, j;
Howard Chu's avatar
Howard Chu committed
895
	ldap_int_thread_userctx_t *ctx;
896

897
898
	assert ( key != NULL );

899
	for ( i=0; i<LDAP_MAXTHR; i++ ) {
900
901
		ctx = thread_keys[i].ctx;
		if ( ctx && ctx != DELETED_THREAD_CTX ) {
902
			for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
Howard Chu's avatar
Howard Chu committed
903
904
905
906
				if ( ctx->ltu_key[j].ltk_key == key ) {
					if (ctx->ltu_key[j].ltk_free)
						ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
						ctx->ltu_key[j].ltk_data );
907
					clear_key_idx( ctx, j );
908
909
910
911
912
913
914
					break;
				}
			}
		}
	}
}

915
916
917
/*
 * This is necessary if the caller does not have access to the
 * thread context handle (for example, a slapd plugin calling
918
 * slapi_search_internal()). No doubt it is more efficient
919
920
921
 * for the application to keep track of the thread context
 * handles itself.
 */
922
void *ldap_pvt_thread_pool_context( )
923
924
{
	ldap_pvt_thread_t tid;
925
	unsigned i, hash;
926
	ldap_int_thread_userctx_t *ctx;
927
928

	tid = ldap_pvt_thread_self();
929
	if ( ldap_pvt_thread_equal( tid, ldap_int_main_tid ))
Howard Chu's avatar
Howard Chu committed
930
		return &ldap_int_main_thrctx;
931

932
	TID_HASH( tid, hash );
933
	i = hash &= (LDAP_MAXTHR-1);
934
	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
935
936
937
938
939
940
941
942
	do {
		ctx = thread_keys[i].ctx;
		if ( ctx != DELETED_THREAD_CTX )
			if ( ldap_pvt_thread_equal(thread_keys[i].id, tid) || !ctx )
				goto done;
	} while ( (i = (i+1) & (LDAP_MAXTHR-1)) != hash );
	ctx = NULL;
 done:
943
	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
944

945
	return ctx;
946
947
}

948
void ldap_pvt_thread_pool_context_reset( void *vctx )
949
{
Howard Chu's avatar
Howard Chu committed
950
	ldap_int_thread_userctx_t *ctx = vctx;
951
952
	int i;

953
954
955
	for ( i=MAXKEYS-1; i>=0; i--) {
		if ( !ctx->ltu_key[i].ltk_key )
			continue;
Howard Chu's avatar
Howard Chu committed
956
957
958
959
		if ( ctx->ltu_key[i].ltk_free )
			ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
			ctx->ltu_key[i].ltk_data );
		ctx->ltu_key[i].ltk_key = NULL;
960
961
	}
}
Howard Chu's avatar
Howard Chu committed
962
963
964
965
966
967
968

ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
{
	ldap_int_thread_userctx_t *ctx = vctx;

	return ctx->ltu_id;
}
969
#endif /* LDAP_THREAD_HAVE_TPOOL */