Commit 14fb1f59 authored by Howard Chu's avatar Howard Chu
Browse files

Append tweaks, page_split fixes

Append mode now does no key comparisons, input must be in sorted order.
page_split was not updating cursor parents correctly.
parent 7e9a6134
......@@ -843,6 +843,7 @@ struct MDB_cursor {
#define C_SUB 0x04 /**< Cursor is a sub-cursor */
#define C_SHADOW 0x08 /**< Cursor is a dup from a parent txn */
#define C_ALLOCD 0x10 /**< Cursor was malloc'd */
#define C_SPLITTING 0x20 /**< Cursor is in page_split */
/** @} */
unsigned int mc_flags; /**< @ref mdb_cursor */
MDB_page *mc_pg[CURSOR_STACK]; /**< stack of pushed pages */
......@@ -937,6 +938,8 @@ static int mdb_page_search_root(MDB_cursor *mc,
static int mdb_page_search(MDB_cursor *mc,
MDB_val *key, int flags);
static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
#define MDB_SPLIT_REPLACE MDB_APPENDDUP /**< newkey is not new */
static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno, unsigned int nflags);
......@@ -1055,13 +1058,31 @@ mdb_page_keys(MDB_page *mp)
DKBUF;
nkeys = NUMKEYS(mp);
DPRINTF("numkeys %d", nkeys);
fprintf(stderr, "numkeys %d\n", nkeys);
for (i=0; i<nkeys; i++) {
node = NODEPTR(mp, i);
key.mv_size = node->mn_ksize;
key.mv_data = node->mn_data;
DPRINTF("key %d: %s", i, DKEY(&key));
fprintf(stderr, "key %d: %s\n", i, DKEY(&key));
}
}
void
mdb_cursor_chk(MDB_cursor *mc)
{
unsigned int i;
MDB_node *node;
MDB_page *mp;
if (!mc->mc_snum) return;
for (i=0; i<mc->mc_top; i++) {
mp = mc->mc_pg[i];
node = NODEPTR(mp, mc->mc_ki[i]);
if (NODEPGNO(node) != mc->mc_pg[i+1]->mp_pgno)
printf("oops!\n");
}
if (mc->mc_ki[i] >= NUMKEYS(mc->mc_pg[i]))
printf("ack!\n");
}
#endif
......@@ -3989,23 +4010,24 @@ mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data)
{
int rc;
MDB_node *leaf;
MDB_val lkey;
lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL;
if (!(mc->mc_flags & C_EOF)) {
if (!(mc->mc_flags & C_INITIALIZED) || mc->mc_top) {
MDB_val lkey;
lkey.mv_size = MAXKEYSIZE+1;
lkey.mv_data = NULL;
rc = mdb_page_search(mc, &lkey, 0);
if (rc != MDB_SUCCESS)
return rc;
}
assert(IS_LEAF(mc->mc_pg[mc->mc_top]));
leaf = NODEPTR(mc->mc_pg[mc->mc_top], NUMKEYS(mc->mc_pg[mc->mc_top])-1);
mc->mc_flags |= C_INITIALIZED;
mc->mc_flags &= ~C_EOF;
mc->mc_ki[mc->mc_top] = NUMKEYS(mc->mc_pg[mc->mc_top]) - 1;
mc->mc_flags |= C_INITIALIZED|C_EOF;
}
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (IS_LEAF2(mc->mc_pg[mc->mc_top])) {
key->mv_size = mc->mc_db->md_pad;
......@@ -4104,9 +4126,10 @@ fetchm:
case MDB_PREV:
case MDB_PREV_DUP:
case MDB_PREV_NODUP:
if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF))
if (!(mc->mc_flags & C_INITIALIZED) || (mc->mc_flags & C_EOF)) {
rc = mdb_cursor_last(mc, key, data);
else
mc->mc_flags &= ~C_EOF;
} else
rc = mdb_cursor_prev(mc, key, data, op);
break;
case MDB_FIRST:
......@@ -4178,7 +4201,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_val xdata, *rdata, dkey;
MDB_page *fp;
MDB_db dummy;
int do_sub = 0;
int do_sub = 0, insert = 0;
unsigned int mcount = 0;
size_t nsize;
int rc, rc2;
......@@ -4220,7 +4243,16 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
} else {
int exact = 0;
MDB_val d2;
if (flags & MDB_APPEND) {
MDB_val k2;
rc = mdb_cursor_last(mc, &k2, &d2);
if (rc == 0) {
rc = MDB_NOTFOUND;
mc->mc_ki[mc->mc_top]++;
}
} else {
rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
}
if ((flags & MDB_NOOVERWRITE) && rc == 0) {
DPRINTF("duplicate key [%s]", DKEY(key));
*data = d2;
......@@ -4419,6 +4451,7 @@ current:
mc->mc_db->md_entries--;
} else {
DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]);
insert = 1;
}
rdata = data;
......@@ -4429,11 +4462,13 @@ new_sub:
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
nflags &= ~MDB_APPEND;
if (!insert)
nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
/* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (rc == 0 && !do_sub) {
if (rc == 0 && !do_sub && insert) {
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
......@@ -4495,7 +4530,8 @@ put_sub:
}
}
}
xflags |= (flags & MDB_APPEND);
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
......@@ -5760,6 +5796,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno);
}
mc->mc_flags |= C_SPLITTING;
mdb_cursor_copy(mc, &mn);
mn.mc_pg[mn.mc_top] = rp;
mn.mc_ki[ptop] = mc->mc_ki[ptop]+1;
......@@ -5767,8 +5804,8 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
if (nflags & MDB_APPEND) {
mn.mc_ki[mn.mc_top] = 0;
sepkey = *newkey;
split_indx = newindx;
nkeys = 0;
split_indx = 0;
goto newsep;
}
......@@ -5900,14 +5937,32 @@ newsep:
*/
if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
mc->mc_pg[ptop] = mn.mc_pg[ptop];
mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
/* root split? */
if (mn.mc_snum == mc->mc_snum) {
mc->mc_pg[mc->mc_snum] = mc->mc_pg[mc->mc_top];
mc->mc_ki[mc->mc_snum] = mc->mc_ki[mc->mc_top];
mc->mc_ki[mc->mc_top] = mn.mc_ki[mc->mc_top] - 1;
mc->mc_pg[mc->mc_top] = mn.mc_pg[mc->mc_top];
for (i=0; i<mc->mc_top; i++) {
mc->mc_pg[i] = mn.mc_pg[i];
mc->mc_ki[i] = mn.mc_ki[i];
}
mc->mc_snum++;
mc->mc_top++;
ptop++;
} else {
for (i=0; i<ptop; i++)
mc->mc_ki[i] = mn.mc_ki[i];
mc->mc_pg[ptop] = mn.mc_pg[ptop];
mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
}
}
} else {
mn.mc_top--;
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++;
}
mc->mc_flags ^= C_SPLITTING;
if (rc != MDB_SUCCESS) {
return rc;
}
......@@ -5917,6 +5972,8 @@ newsep:
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
if (rc)
return rc;
for (i=0; i<mc->mc_top; i++)
mc->mc_ki[i] = mn.mc_ki[i];
goto done;
}
if (IS_LEAF2(rp)) {
......@@ -6001,6 +6058,8 @@ newsep:
if (!(node->mn_flags & F_BIGDATA))
newdata->mv_data = NODEDATA(node);
}
} else {
mc->mc_ki[ptop]++;
}
/* return tmp page to freelist */
......@@ -6024,6 +6083,8 @@ done:
m3 = m2;
if (!(m3->mc_flags & C_INITIALIZED))
continue;
if (m3->mc_flags & C_SPLITTING)
continue;
if (new_root) {
int k;
/* root split */
......@@ -6031,16 +6092,28 @@ done:
m3->mc_ki[k+1] = m3->mc_ki[k];
m3->mc_pg[k+1] = m3->mc_pg[k];
}
m3->mc_ki[0] = mc->mc_ki[0];
if (m3->mc_ki[0] >= split_indx) {
m3->mc_ki[0] = 1;
} else {
m3->mc_ki[0] = 0;
}
m3->mc_pg[0] = mc->mc_pg[0];
m3->mc_snum++;
m3->mc_top++;
}
if (m3->mc_pg[mc->mc_top] == mp) {
if (m3->mc_ki[m3->mc_top] >= split_indx) {
m3->mc_pg[m3->mc_top] = rp;
m3->mc_ki[m3->mc_top] -= split_indx;
if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE))
m3->mc_ki[mc->mc_top]++;
if (m3->mc_ki[mc->mc_top] >= split_indx) {
m3->mc_pg[mc->mc_top] = rp;
m3->mc_ki[mc->mc_top] -= split_indx;
if ((nflags & MDB_SPLIT_REPLACE) && !newpos)
m3->mc_ki[mc->mc_top]--;
m3->mc_ki[ptop] = mn.mc_ki[ptop];
}
} else if (m3->mc_pg[ptop] == mc->mc_pg[ptop] &&
m3->mc_ki[ptop] >= mc->mc_ki[ptop]) {
m3->mc_ki[ptop]++;
}
}
}
......
......@@ -201,8 +201,10 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
#define MDB_RESERVE 0x10000
/** Data is being appended, don't split full pages. */
#define MDB_APPEND 0x20000
/** Duplicate data is being appended, don't split full pages. */
#define MDB_APPENDDUP 0x40000
/** Store multiple data items in one call. */
#define MDB_MULTIPLE 0x40000
#define MDB_MULTIPLE 0x80000
/* @} */
/** @brief Cursor Get operations.
......@@ -806,6 +808,16 @@ int mdb_get(MDB_txn *txn, MDB_dbi dbi, MDB_val *key, MDB_val *data);
* #MDB_KEYEXIST if the key already appears in the database, even if
* the database supports duplicates (#MDB_DUPSORT). The \b data
* parameter will be set to point to the existing item.
* <li>#MDB_RESERVE - reserve space for data of the given size, but
* don't copy the given data. Instead, return a pointer to the
* reserved space, which the caller can fill in later. This saves
* an extra memcpy if the data is being generated later.
* <li>#MDB_APPEND - append the given key/data pair to the end of the
* database. No key comparisons are performed. This option allows
* fast bulk loading when keys are already known to be in the
* correct order. Loading unsorted keys with this flag will cause
* data corruption.
* <li>#MDB_APPENDDUP - as above, but for sorted dup data.
* </ul>
* @return A non-zero error value on failure and 0 on success. Some possible
* errors are:
......@@ -921,6 +933,16 @@ int mdb_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
* does not already appear in the database. The function will return
* #MDB_KEYEXIST if the key already appears in the database, even if
* the database supports duplicates (#MDB_DUPSORT).
* <li>#MDB_RESERVE - reserve space for data of the given size, but
* don't copy the given data. Instead, return a pointer to the
* reserved space, which the caller can fill in later. This saves
* an extra memcpy if the data is being generated later.
* <li>#MDB_APPEND - append the given key/data pair to the end of the
* database. No key comparisons are performed. This option allows
* fast bulk loading when keys are already known to be in the
* correct order. Loading unsorted keys with this flag will cause
* data corruption.
* <li>#MDB_APPENDDUP - as above, but for sorted dup data.
* </ul>
* @return A non-zero error value on failure and 0 on success. Some possible
* errors are:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment