Commit 059b357d authored by Howard Chu's avatar Howard Chu
Browse files

More tweaks to copyfd2

Make sure the writer thread starts and stops when we expect it to.
parent 03f0ecb0
......@@ -177,6 +177,7 @@
#define THREAD_RET DWORD
#define pthread_t HANDLE
#define pthread_mutex_t HANDLE
#define pthread_cond_t HANDLE
#define pthread_key_t DWORD
#define pthread_self() GetCurrentThreadId()
#define pthread_key_create(x,y) \
......@@ -186,6 +187,8 @@
#define pthread_setspecific(x,y) (TlsSetValue(x,y) ? 0 : ErrCode())
#define pthread_mutex_unlock(x) ReleaseMutex(x)
#define pthread_mutex_lock(x) WaitForSingleObject(x, INFINITE)
#define pthread_cond_signal(x) SetEvent(*x)
#define pthread_cond_wait(cond,mutex) SignalObjectAndWait(*mutex, *cond, INFINITE, FALSE); WaitForSingleObject(*mutex, INFINITE)
#define THREAD_CREATE(thr,start,arg) thr=CreateThread(NULL,0,start,arg,0,NULL)
#define THREAD_FINISH(thr) WaitForSingleObject(thr, INFINITE)
#define LOCK_MUTEX_R(env) pthread_mutex_lock((env)->me_rmutex)
......@@ -8034,7 +8037,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
#endif
typedef struct mdb_copy {
pthread_mutex_t mc_mutex[2];
pthread_mutex_t mc_mutex;
pthread_cond_t mc_cond;
char *mc_wbuf[2];
char *mc_over[2];
MDB_env *mc_env;
......@@ -8044,6 +8048,7 @@ typedef struct mdb_copy {
pgno_t mc_next_pgno;
HANDLE mc_fd;
int mc_status;
volatile int mc_new;
int mc_toggle;
} mdb_copy;
......@@ -8061,14 +8066,17 @@ mdb_env_copythr(void *arg)
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
pthread_mutex_lock(&my->mc_mutex[toggle^1]);
pthread_mutex_lock(&my->mc_mutex);
my->mc_new = 0;
pthread_cond_signal(&my->mc_cond);
for(;;) {
pthread_mutex_lock(&my->mc_mutex[toggle]);
pthread_mutex_unlock(&my->mc_mutex[toggle^1]);
if (!my->mc_wlen[toggle]) {
pthread_mutex_unlock(&my->mc_mutex[toggle]);
while (!my->mc_new)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
if (my->mc_new < 0) {
my->mc_new = 0;
break;
}
my->mc_new = 0;
wsize = my->mc_wlen[toggle];
ptr = my->mc_wbuf[toggle];
again:
......@@ -8089,7 +8097,6 @@ again:
}
if (rc) {
my->mc_status = rc;
pthread_mutex_unlock(&my->mc_mutex[toggle]);
break;
}
/* If there's an overflow page tail, write it too */
......@@ -8101,23 +8108,29 @@ again:
}
my->mc_wlen[toggle] = 0;
toggle ^= 1;
pthread_cond_signal(&my->mc_cond);
}
pthread_cond_signal(&my->mc_cond);
pthread_mutex_unlock(&my->mc_mutex);
return (THREAD_RET)0;
#undef DO_WRITE
}
static int
mdb_env_cthr_toggle(mdb_copy *my)
mdb_env_cthr_toggle(mdb_copy *my, int st)
{
int toggle = my->mc_toggle ^ 1;
pthread_mutex_unlock(&my->mc_mutex[my->mc_toggle]);
pthread_mutex_lock(&my->mc_mutex[toggle]);
pthread_mutex_lock(&my->mc_mutex);
if (my->mc_status) {
pthread_mutex_unlock(&my->mc_mutex[toggle]);
pthread_mutex_unlock(&my->mc_mutex);
return my->mc_status;
}
while (my->mc_new == 1)
pthread_cond_wait(&my->mc_cond, &my->mc_mutex);
my->mc_new = st;
my->mc_toggle = toggle;
pthread_cond_signal(&my->mc_cond);
pthread_mutex_unlock(&my->mc_mutex);
return 0;
}
......@@ -8188,10 +8201,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
if (rc)
goto done;
if (my->mc_wlen[toggle] >= MDB_WBUF) {
rc = mdb_env_cthr_toggle(my);
rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
toggle ^= 1;
toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
memcpy(mo, omp, my->mc_env->me_psize);
......@@ -8201,10 +8214,10 @@ mdb_env_cwalk(mdb_copy *my, pgno_t *pg, int flags)
if (omp->mp_pages > 1) {
my->mc_olen[toggle] = my->mc_env->me_psize * (omp->mp_pages - 1);
my->mc_over[toggle] = (char *)omp + my->mc_env->me_psize;
rc = mdb_env_cthr_toggle(my);
rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
toggle ^= 1;
toggle = my->mc_toggle;
}
memcpy(NODEDATA(ni), &mo->mp_pgno, sizeof(pgno_t));
} else if (ni->mn_flags & F_SUBDATA) {
......@@ -8250,10 +8263,10 @@ again:
}
}
if (my->mc_wlen[toggle] >= MDB_WBUF) {
rc = mdb_env_cthr_toggle(my);
rc = mdb_env_cthr_toggle(my, 1);
if (rc)
goto done;
toggle ^= 1;
toggle = my->mc_toggle;
}
mo = (MDB_page *)(my->mc_wbuf[toggle] + my->mc_wlen[toggle]);
mdb_page_copy(mo, mp, my->mc_env->me_psize);
......@@ -8286,14 +8299,14 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
int rc;
#ifdef _WIN32
my.mc_mutex[0] = CreateMutex(NULL, FALSE, NULL);
my.mc_mutex[1] = CreateMutex(NULL, FALSE, NULL);
my.mc_mutex = CreateMutex(NULL, FALSE, NULL);
my.mc_cond = CreateEvent(NULL, FALSE, FALSE, NULL);
my.mc_wbuf[0] = _aligned_malloc(MDB_WBUF*2, env->me_psize);
if (my.mc_wbuf[0] == NULL)
return errno;
#else
pthread_mutex_init(&my.mc_mutex[0], NULL);
pthread_mutex_init(&my.mc_mutex[1], NULL);
pthread_mutex_init(&my.mc_mutex, NULL);
pthread_cond_init(&my.mc_cond, NULL);
rc = posix_memalign((void **)&my.mc_wbuf[0], env->me_psize, MDB_WBUF*2);
if (rc)
return rc;
......@@ -8305,11 +8318,10 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
my.mc_olen[1] = 0;
my.mc_next_pgno = 2;
my.mc_status = 0;
my.mc_new = 1;
my.mc_toggle = 0;
my.mc_env = env;
my.mc_fd = fd;
pthread_mutex_lock(&my.mc_mutex[0]);
THREAD_CREATE(thr, mdb_env_copythr, &my);
/* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers.
......@@ -8332,6 +8344,7 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
}
}
THREAD_CREATE(thr, mdb_env_copythr, &my);
mp = (MDB_page *)my.mc_wbuf[0];
memset(mp, 0, 2*env->me_psize);
mp->mp_pgno = 0;
......@@ -8368,21 +8381,28 @@ mdb_env_copyfd2(MDB_env *env, HANDLE fd)
}
my.mc_wlen[0] = env->me_psize * 2;
my.mc_txn = txn;
pthread_mutex_lock(&my.mc_mutex);
while(my.mc_new)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
rc = mdb_env_cwalk(&my, &txn->mt_dbs[1].md_root, 0);
if (rc == MDB_SUCCESS && my.mc_wlen[my.mc_toggle])
rc = mdb_env_cthr_toggle(&my);
my.mc_wlen[my.mc_toggle] = 0;
pthread_mutex_unlock(&my.mc_mutex[my.mc_toggle]);
rc = mdb_env_cthr_toggle(&my, 1);
mdb_env_cthr_toggle(&my, -1);
pthread_mutex_lock(&my.mc_mutex);
while(my.mc_new)
pthread_cond_wait(&my.mc_cond, &my.mc_mutex);
pthread_mutex_unlock(&my.mc_mutex);
THREAD_FINISH(thr);
leave:
mdb_txn_abort(txn);
#ifdef _WIN32
CloseHandle(my.mc_mutex[1]);
CloseHandle(my.mc_mutex[0]);
CloseHandle(my.mc_cond);
CloseHandle(my.mc_mutex);
_aligned_free(my.mc_wbuf[0]);
#else
pthread_mutex_destroy(&my.mc_mutex[1]);
pthread_mutex_destroy(&my.mc_mutex[0]);
pthread_cond_destroy(&my.mc_cond);
pthread_mutex_destroy(&my.mc_mutex);
free(my.mc_wbuf[0]);
#endif
return rc;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment