Commit bb6a034e authored by Howard Chu's avatar Howard Chu
Browse files

Add mdb_put for sorted dups

parent 51e210c6
......@@ -74,7 +74,7 @@ typedef ULONG pgno_t;
#define MDB_MINKEYS 4
#define MDB_MAGIC 0xBEEFC0DE
#define MDB_VERSION 1
#define MAXKEYSIZE 255
#define MAXKEYSIZE 511
#define P_INVALID (~0UL)
......@@ -2147,6 +2147,56 @@ mdb_del_node(MDB_page *mp, indx_t indx)
mp->mp_upper += sz;
}
static void
mdb_xcursor_init0(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{
MDB_dbi dbn;
mx->mx_cursor.mc_txn = &mx->mx_txn;
mx->mx_txn = *txn;
mx->mx_txn.mt_dbxs = mx->mx_dbxs;
mx->mx_txn.mt_dbs = mx->mx_dbs;
mx->mx_dbxs[0] = txn->mt_dbxs[0];
mx->mx_dbxs[1] = txn->mt_dbxs[1];
if (dbi > 1) {
mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
dbn = 2;
} else {
dbn = 1;
}
mx->mx_dbxs[dbn+1].md_parent = dbn;
mx->mx_dbxs[dbn+1].md_cmp = mx->mx_dbxs[dbn].md_dcmp;
mx->mx_dbxs[dbn+1].md_rel = mx->mx_dbxs[dbn].md_rel;
mx->mx_dbxs[dbn+1].md_dirty = 0;
mx->mx_txn.mt_numdbs = dbn+2;
}
static void
mdb_xcursor_init1(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx, MDB_db *db)
{
mx->mx_dbs[0] = txn->mt_dbs[0];
mx->mx_dbs[1] = txn->mt_dbs[1];
if (dbi > 1) {
mx->mx_dbs[2] = txn->mt_dbs[dbi];
mx->mx_dbs[3] = *db;
} else {
mx->mx_dbs[2] = *db;
}
}
static void
mdb_xcursor_fini(MDB_txn *txn, MDB_dbi dbi, MDB_xcursor *mx)
{
txn->mt_dbs[0] = mx->mx_dbs[0];
txn->mt_dbs[1] = mx->mx_dbs[1];
txn->mt_dbxs[0].md_dirty = mx->mx_dbxs[0].md_dirty;
txn->mt_dbxs[1].md_dirty = mx->mx_dbxs[1].md_dirty;
if (dbi > 1) {
txn->mt_dbs[dbi] = mx->mx_dbs[2];
txn->mt_dbxs[2].md_dirty = mx->mx_dbxs[2].md_dirty;
}
}
int
mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
{
......@@ -2166,18 +2216,7 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret)
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
MDB_xcursor *mx = (MDB_xcursor *)(cursor + 1);
cursor->mc_xcursor = mx;
mx->mx_cursor.mc_txn = &mx->mx_txn;
mx->mx_txn = *txn;
mx->mx_txn.mt_dbxs = mx->mx_dbxs;
mx->mx_txn.mt_dbs = mx->mx_dbs;
mx->mx_dbxs[0] = txn->mt_dbxs[0];
mx->mx_dbxs[1] = txn->mt_dbxs[1];
if (dbi > 1) {
mx->mx_dbxs[2] = txn->mt_dbxs[dbi];
mx->mx_txn.mt_numdbs = 4;
} else {
mx->mx_txn.mt_numdbs = 3;
}
mdb_xcursor_init0(txn, dbi, mx);
}
} else {
return ENOMEM;
......@@ -2535,6 +2574,10 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi,
if (data && (rc = mdb_read_data(txn, leaf, data)) != MDB_SUCCESS)
return rc;
if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
/* add all the child DB's pages to the free list */
}
return mdb_del0(txn, dbi, ki, &mpp, leaf);
}
......@@ -2703,6 +2746,8 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
unsigned int ki;
MDB_node *leaf;
MDB_pageparent mpp;
MDB_val xdata, *rdata;
MDB_db dummy;
assert(key != NULL);
assert(data != NULL);
......@@ -2731,6 +2776,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (rc == MDB_SUCCESS) {
leaf = mdb_search_node(txn, dbi, mpp.mp_page, key, &exact, &ki);
if (leaf && exact) {
if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
goto put_sub;
}
if (F_ISSET(flags, MDB_NOOVERWRITE)) {
DPRINTF("duplicate key %.*s",
(int)key->mv_size, (char *)key->mv_data);
......@@ -2766,6 +2814,20 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
DPRINTF("there are %u keys, should insert new key at index %i",
NUMKEYS(mpp.mp_page), ki);
/* For sorted dups, the data item at this level is a DB record
* for a child DB; the actual data elements are stored as keys
* in the child DB.
*/
if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
rdata = &xdata;
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.md_root = P_INVALID;
} else {
rdata = data;
}
if (SIZELEFT(mpp.mp_page) < mdb_leaf_size(txn->mt_env, key, data)) {
rc = mdb_split(txn, dbi, &mpp.mp_page, &ki, key, data, P_INVALID);
} else {
......@@ -2775,8 +2837,26 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (rc != MDB_SUCCESS)
txn->mt_flags |= MDB_TXN_ERROR;
else
else {
txn->mt_dbs[dbi].md_entries++;
/* Now store the actual data in the child DB. Note that we're
* storing the user data in the keys field, so there are strict
* size limits on dupdata. The actual data fields of the child
* DB are all zero size.
*/
if (F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) {
MDB_xcursor mx;
leaf = NODEPTR(mpp.mp_page, ki);
put_sub:
mdb_xcursor_init0(txn, dbi, &mx);
mdb_xcursor_init1(txn, dbi, &mx, NODEDATA(leaf));
xdata.mv_size = 0;
xdata.mv_data = "";
rc = mdb_put(&mx.mx_txn, mx.mx_txn.mt_numdbs-1, data, &xdata, flags);
mdb_xcursor_fini(txn, dbi, &mx);
}
}
done:
return rc;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment