Commit f1db84d3 authored by Hallvard Furuseth's avatar Hallvard Furuseth Committed by Howard Chu
Browse files

mp_txnid = page state, avoids searching dirty_list

In nested txns, check mp_txnid to see which txn dirtied the page.
This change will also let us remove the P_DIRTY flag, and keep
some flags in (dirty page).mp_txnid if we should need it.
parent c83434e1
......@@ -962,7 +962,26 @@ typedef struct MDB_page_header {
pgno_t p_pgno; /**< page number */
struct MDB_page *p_next; /**< for in-memory list of freed pages */
} mh_p;
txnid_t mh_txnid; /**< txnid that committed this page, unused in meta pages */
/** If page is clean: snapshot txnid, dirty: txn workid, metapage: unused.
*
* The value indicates which snapshot/#MDB_txn the page belongs to.
* Tested with #IS_MUTABLE(), #IS_WRITABLE(), #IS_DIRTY_NW().
*
* (clean page).mp_txnid == txnid of creator < txnid of later txns.
* (dirty page).mp_txnid >= mt_workid of creator txn.
* (dirty page).mt_txnid < mt_workid of children of creator txn.
*
* Thus an #MDB_txn can write to pages with mp_txnid >= txn.mt_workid.
* A page with smaller mp_txnid is dirty in an ancestor txn or clean.
*
* txn.mt_workid > txn.mt_txnid, to tell apart spilled and dirty pages.
*
* Finally, ((dirty page).mp_txnid & #MDB_PGTXNID_FLAGMASK) can be used
* for flags with non-WRITEMAP; it keeps low bits in workid = 0.
*/
txnid_t mh_txnid;
uint16_t mh_pad; /**< key size if this is a LEAF2 page */
/** @defgroup mdb_page Page Flags
* @ingroup internal
......@@ -1040,7 +1059,9 @@ typedef struct MDB_page {
#define IS_SUBP(p) F_ISSET((p)->mp_flags, P_SUBP)
/** Test if this non-sub page belongs to the current snapshot */
#define IS_MUTABLE(txn, p) ((p)->mp_txnid == (txn)->mt_txnid)
#define IS_MUTABLE(txn, p) ((p)->mp_txnid >= (txn)->mt_txnid)
/** Test if this non-sub page is writable in this txn (not an ancestor) */
#define IS_WRITABLE(txn, p) ((p)->mp_txnid >= (txn)->mt_workid)
/** Info about overflow page, stored in an F_BIGDATA node */
typedef struct MDB_ovpage {
......@@ -1065,6 +1086,27 @@ typedef struct MDB_dovpage {
*/
#define NEXT_LOOSE_PAGE(p) (*(MDB_page **)((p) + 2))
/** Mark the page as writable by this txn */
#ifndef MDB_TEST
#define SET_PGTXNID(txn, mp) ((mp)->mp_txnid = (txn)->mt_workid)
#else
#define SET_PGTXNID(txn, mp) \
((mp)->mp_txnid = (txn)->mt_workid \
/* random unused "flags" added when not WRITEMAP for debugging */ \
| (((txn)->mt_flags & MDB_TXN_WRITEMAP) ? 0 : \
(MDB_RAND((size_t)(txn)) >> (32-MDB_PGTXNID_FLAGBITS))))
#define MDB_RAND(x) (mdb_rnd = (mdb_rnd + (unsigned)(x)) * 987654321 + 54321)
static volatile unsigned mdb_rnd;
#endif
/** mp_txnid bits reserved in dirty pages for flags.
* TODO: For future code with header-free ovpages, if we omit mp_flags
* from the "header" kept elsewhere. Otherwise, drop this code.
*/
#define MDB_PGTXNID_FLAGBITS 4
#define MDB_PGTXNID_STEP ((txnid_t)1 << MDB_PGTXNID_FLAGBITS)
#define MDB_PGTXNID_FLAGMASK (MDB_PGTXNID_STEP-1)
/** Header for a single key/data pair within a page.
* Used in pages of type #P_BRANCH and #P_LEAF without #P_LEAF2.
* We guarantee 2-byte alignment for 'MDB_node's.
......@@ -1286,6 +1328,23 @@ struct MDB_txn {
* aborts, the ID may be re-used by the next writer.
*/
txnid_t mt_txnid;
/** Written to mp_txnid of dirty pages, to be fixed by #mdb_page_flush().
*
* Value >= 1 + (parent ? parent.last_workid : txnid).
* See #MDB_page.%mp_txnid.
*
* An MDB_txn can write to a page when page.mp_txnid >= txn.mt_workid.
* New children get bigger workid than pages dirty in their parent
* (i.e. bigger than parent.mt_last_workid). When children commit,
* they copy #mt_last_workid to the parent to match their pages.
*/
txnid_t mt_workid;
/** Current max mp_txnid of the MDB_txn's dirty pages: Starts as
* #mt_workid, then grows as it is copied from children who commit.
*/
txnid_t mt_last_workid;
MDB_env *mt_env; /**< the DB environment */
/** The list of pages that became unused during this transaction.
*/
......@@ -2185,46 +2244,21 @@ mdb_cursor_unref(MDB_cursor *mc)
static int
mdb_page_loose(MDB_cursor *mc, MDB_page *mp)
{
int loose = 0;
pgno_t pgno = mp->mp_pgno;
int rc = MDB_SUCCESS;
MDB_txn *txn = mc->mc_txn;
if ((mp->mp_flags & P_DIRTY) && mc->mc_dbi != FREE_DBI) {
if (txn->mt_parent) {
MDB_ID2 *dl = txn->mt_u.dirty_list;
/* If txn has a parent, make sure the page is in our
* dirty list.
*/
if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) {
if (mp != dl[x].mptr) { /* bad cursor? */
mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
txn->mt_flags |= MDB_TXN_ERROR;
return MDB_PROBLEM;
}
/* ok, it's ours */
loose = 1;
}
}
} else {
/* no parent txn, so it's just ours */
loose = 1;
}
}
if (loose) {
if (IS_WRITABLE(txn, mp) && mc->mc_dbi != FREE_DBI) {
/* Page is dirty in this txn, and is not in freeDB */
DPRINTF(("loosen db %d page %"Yu, DDBI(mc), mp->mp_pgno));
NEXT_LOOSE_PAGE(mp) = txn->mt_loose_pgs;
txn->mt_loose_pgs = mp;
txn->mt_loose_count++;
mp->mp_flags |= P_LOOSE;
} else {
int rc = mdb_midl_append(&txn->mt_free_pgs, pgno);
if (rc)
return rc;
rc = mdb_midl_append(&txn->mt_free_pgs, mp->mp_pgno);
}
return MDB_SUCCESS;
return rc;
}
/** Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn.
......@@ -2696,7 +2730,7 @@ search_done:
}
#endif
np->mp_pgno = pgno;
np->mp_txnid = txn->mt_txnid;
SET_PGTXNID(txn, np);
np->mp_flags = 0;
#if OVERFLOW_NOTYET
mdb_page_dirty(txn, np, ov);
......@@ -2797,6 +2831,7 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret)
*/
mdb_page_dirty(txn, np);
SET_PGTXNID(txn, np);
*ret = np;
return MDB_SUCCESS;
}
......@@ -2817,13 +2852,11 @@ mdb_page_touch(MDB_cursor *mc)
pgno_t pgno;
int rc;
if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
if (IS_MUTABLE(txn, mp)) {
rc = mdb_page_unspill(txn, mp, &np);
if (rc)
goto fail;
goto done;
}
if (IS_SUBP(mp) || IS_WRITABLE(txn, mp))
return MDB_SUCCESS;
if (!IS_MUTABLE(txn, mp)) {
/* Page from an older snapshot */
if ((rc = mdb_midl_need(&txn->mt_free_pgs, 1)) ||
(rc = mdb_page_alloc(mc, 1, &np)))
goto fail;
......@@ -2840,25 +2873,20 @@ mdb_page_touch(MDB_cursor *mc)
} else {
mc->mc_db->md_root = pgno;
}
} else if (txn->mt_parent && !IS_SUBP(mp)) {
} else if (!F_ISSET(mp->mp_flags, P_DIRTY)) {
rc = mdb_page_unspill(txn, mp, &np);
if (rc)
goto fail;
goto done;
} else {
/* Writable in an ancestor txn */
MDB_ID2 mid, *dl = txn->mt_u.dirty_list;
pgno = mp->mp_pgno;
/* If txn has a parent, make sure the page is in our
* dirty list.
*/
if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) {
if (mp != dl[x].mptr) { /* bad cursor? */
mc->mc_flags &= ~(C_INITIALIZED|C_EOF);
txn->mt_flags |= MDB_TXN_ERROR;
return MDB_PROBLEM;
}
return 0;
}
if (!txn->mt_parent) {
rc = MDB_PROBLEM;
goto fail;
}
mdb_cassert(mc, dl[0].mid < MDB_IDL_UM_MAX);
/* No - copy it */
np = mdb_page_malloc(txn, 1, 1);
if (!np) {
rc = ENOMEM;
......@@ -2869,17 +2897,15 @@ mdb_page_touch(MDB_cursor *mc)
rc = mdb_mid2l_insert(dl, &mid);
mdb_cassert(mc, rc == 0);
np->mp_flags |= P_DIRTY;
} else {
return 0;
}
np_flags = np->mp_flags; /* P_ADM_FLAGS */
mdb_page_copy(np, mp, txn->mt_env->me_psize);
np->mp_flags |= np_flags;
np->mp_pgno = pgno;
SET_PGTXNID(txn, np);
done:
np->mp_txnid = txn->mt_txnid;
/* Adjust cursors pointing to mp */
mc->mc_pg[mc->mc_top] = np;
m2 = txn->mt_cursors[mc->mc_dbi];
......@@ -3175,6 +3201,8 @@ mdb_txn_renew0(MDB_txn *txn)
txn->mt_child = NULL;
txn->mt_loose_pgs = NULL;
txn->mt_loose_count = 0;
txn->mt_workid = (txn->mt_txnid | MDB_PGTXNID_FLAGMASK) + 1;
txn->mt_last_workid = txn->mt_workid;
txn->mt_dirty_room = MDB_IDL_UM_MAX;
txn->mt_u.dirty_list = env->me_dirty_list;
txn->mt_u.dirty_list[0].mid = 0;
......@@ -3239,6 +3267,33 @@ mdb_txn_renew(MDB_txn *txn)
return rc;
}
/** Used up all workids. Rewind it and update dirty pages to match. */
static txnid_t ESECT
mdb_workid_rewind(MDB_txn *txn)
{
txnid_t workid, diff;
while (txn->mt_parent)
txn = txn->mt_parent;
workid = txn->mt_txnid & ~MDB_PGTXNID_FLAGMASK;
do {
workid += MDB_PGTXNID_STEP;
diff = txn->mt_last_workid - workid;
if (diff) {
MDB_ID2L dl = txn->mt_u.dirty_list;
int i;
for (i = dl[0].mid; i; i--) {
if (MDB_PGTXNID_FLAGBITS)
((MDB_page *)dl[i].mptr)->mp_txnid -= diff;
else
((MDB_page *)dl[i].mptr)->mp_txnid = workid;
}
txn->mt_workid = txn->mt_last_workid = workid;
}
} while ((txn = txn->mt_child) != NULL);
return workid;
}
int
mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
{
......@@ -3294,6 +3349,10 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret)
if (parent) {
unsigned int i;
txnid_t workid = parent->mt_last_workid + MDB_PGTXNID_STEP;
if (!workid) /* wraparound after lots of previous children */
workid = mdb_workid_rewind(parent) + MDB_PGTXNID_STEP;
txn->mt_workid = txn->mt_last_workid = workid;
txn->mt_cursors = (MDB_cursor **)(txn->mt_dbs + env->me_maxdbs);
txn->mt_dbiseqs = parent->mt_dbiseqs;
txn->mt_u.dirty_list = malloc(sizeof(MDB_ID2)*MDB_IDL_UM_SIZE);
......@@ -3786,7 +3845,7 @@ mdb_page_flush(MDB_txn *txn, int keep)
j = i = keep;
if (env->me_flags & MDB_WRITEMAP) {
/* Clear dirty flags */
/* Mark the pages as clean */
while (++i <= pagecount) {
dp = dl[i].mptr;
/* Don't flush this page yet */
......@@ -3795,6 +3854,7 @@ mdb_page_flush(MDB_txn *txn, int keep)
dl[++j] = dl[i];
continue;
}
dp->mp_txnid = txn->mt_txnid;
dp->mp_flags &= ~P_DIRTY;
}
goto done;
......@@ -3811,7 +3871,8 @@ mdb_page_flush(MDB_txn *txn, int keep)
continue;
}
pgno = dl[i].mid;
/* clear dirty flag */
/* Mark the page as clean */
dp->mp_txnid = txn->mt_txnid;
dp->mp_flags &= ~P_DIRTY;
pos = pgno * psize;
size = psize;
......@@ -4128,6 +4189,7 @@ mdb_txn_commit(MDB_txn *txn)
*lp = txn->mt_loose_pgs;
parent->mt_loose_count += txn->mt_loose_count;
parent->mt_last_workid = txn->mt_last_workid;
parent->mt_child = NULL;
mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead);
free(txn);
......@@ -8077,10 +8139,10 @@ current:
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
MDB_ovpage ovp;
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
int ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&ovp, olddata.mv_data, sizeof(ovp));
if ((rc2 = MDB_PAGE_GET(mc, ovp.op_pgno, ovp.op_pages, &omp, &level)) != 0)
if ((rc2 = MDB_PAGE_GET(mc, ovp.op_pgno, ovp.op_pages, &omp, NULL)) != 0)
return rc2;
ovpages = ovp.op_pages;
......@@ -8090,13 +8152,12 @@ current:
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold.
*/
if (!(omp->mp_flags & P_DIRTY)) {
if (!IS_WRITABLE(mc->mc_txn, omp)) {
if (!(omp->mp_flags & P_DIRTY)) {
rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
if (rc)
return rc;
level = 0; /* dirty in this txn */
}
if (level > 1) {
} else {
/* It is writable only in a parent txn */
size_t sz = (size_t) env->me_psize * ovpages, off;
MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages, 1);
......@@ -8123,7 +8184,9 @@ current:
sz = PAGEHDRSZ;
}
memcpy(np, omp, sz); /* Copy beginning of page */
SET_PGTXNID(mc->mc_txn, np);
omp = np;
}
}
SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_RESERVE))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment