From 310b656a2e1b81c039baf8ee4eeece7468c2b41e Mon Sep 17 00:00:00 2001
From: Howard Chu <hyc@symas.com>
Date: Sat, 12 Oct 2013 09:34:40 -0700
Subject: [PATCH] ITS#7589 avoid wasting space in mdb_page_split

Also, check the split point on branch pages as well as leaf pages.
---
 libraries/liblmdb/mdb.c | 425 ++++++++++++++++++++--------------------
 1 file changed, 209 insertions(+), 216 deletions(-)

diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c
index 91a84f091f..39635117cf 100644
--- a/libraries/liblmdb/mdb.c
+++ b/libraries/liblmdb/mdb.c
@@ -7376,10 +7376,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
 	unsigned int nflags)
 {
 	unsigned int flags;
-	int		 rc = MDB_SUCCESS, ins_new = 0, new_root = 0, newpos = 1, did_split = 0;
+	int		 rc = MDB_SUCCESS, new_root = 0, did_split = 0;
 	indx_t		 newindx;
 	pgno_t		 pgno = 0;
 	unsigned int	 i, j, split_indx, nkeys, pmax;
+	MDB_env 	*env = mc->mc_txn->mt_env;
 	MDB_node	*node;
 	MDB_val	 sepkey, rkey, xdata, *rdata = &xdata;
 	MDB_page	*copy;
@@ -7390,10 +7391,11 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
 
 	mp = mc->mc_pg[mc->mc_top];
 	newindx = mc->mc_ki[mc->mc_top];
+	nkeys = NUMKEYS(mp);
 
-	DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i",
+	DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i/%i",
 	    IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno,
-	    DKEY(newkey), mc->mc_ki[mc->mc_top]));
+	    DKEY(newkey), mc->mc_ki[mc->mc_top], nkeys));
 
 	/* Create a right sibling. */
 	if ((rc = mdb_page_new(mc, mp->mp_flags, 1, &rp)))
@@ -7440,141 +7442,148 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
 		sepkey = *newkey;
 		split_indx = newindx;
 		nkeys = 0;
-		goto newsep;
-	}
+	} else {
 
-	nkeys = NUMKEYS(mp);
-	split_indx = nkeys / 2;
-	if (newindx < split_indx)
-		newpos = 0;
-
-	if (IS_LEAF2(rp)) {
-		char *split, *ins;
-		int x;
-		unsigned int lsize, rsize, ksize;
-		/* Move half of the keys to the right sibling */
-		copy = NULL;
-		x = mc->mc_ki[mc->mc_top] - split_indx;
-		ksize = mc->mc_db->md_pad;
-		split = LEAF2KEY(mp, split_indx, ksize);
-		rsize = (nkeys - split_indx) * ksize;
-		lsize = (nkeys - split_indx) * sizeof(indx_t);
-		mp->mp_lower -= lsize;
-		rp->mp_lower += lsize;
-		mp->mp_upper += rsize - lsize;
-		rp->mp_upper -= rsize - lsize;
-		sepkey.mv_size = ksize;
-		if (newindx == split_indx) {
-			sepkey.mv_data = newkey->mv_data;
-		} else {
-			sepkey.mv_data = split;
-		}
-		if (x<0) {
-			ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize);
-			memcpy(rp->mp_ptrs, split, rsize);
-			sepkey.mv_data = rp->mp_ptrs;
-			memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize);
-			memcpy(ins, newkey->mv_data, ksize);
-			mp->mp_lower += sizeof(indx_t);
-			mp->mp_upper -= ksize - sizeof(indx_t);
+		split_indx = (nkeys+1) / 2;
+
+		if (IS_LEAF2(rp)) {
+			char *split, *ins;
+			int x;
+			unsigned int lsize, rsize, ksize;
+			/* Move half of the keys to the right sibling */
+			copy = NULL;
+			x = mc->mc_ki[mc->mc_top] - split_indx;
+			ksize = mc->mc_db->md_pad;
+			split = LEAF2KEY(mp, split_indx, ksize);
+			rsize = (nkeys - split_indx) * ksize;
+			lsize = (nkeys - split_indx) * sizeof(indx_t);
+			mp->mp_lower -= lsize;
+			rp->mp_lower += lsize;
+			mp->mp_upper += rsize - lsize;
+			rp->mp_upper -= rsize - lsize;
+			sepkey.mv_size = ksize;
+			if (newindx == split_indx) {
+				sepkey.mv_data = newkey->mv_data;
+			} else {
+				sepkey.mv_data = split;
+			}
+			if (x<0) {
+				ins = LEAF2KEY(mp, mc->mc_ki[mc->mc_top], ksize);
+				memcpy(rp->mp_ptrs, split, rsize);
+				sepkey.mv_data = rp->mp_ptrs;
+				memmove(ins+ksize, ins, (split_indx - mc->mc_ki[mc->mc_top]) * ksize);
+				memcpy(ins, newkey->mv_data, ksize);
+				mp->mp_lower += sizeof(indx_t);
+				mp->mp_upper -= ksize - sizeof(indx_t);
+			} else {
+				if (x)
+					memcpy(rp->mp_ptrs, split, x * ksize);
+				ins = LEAF2KEY(rp, x, ksize);
+				memcpy(ins, newkey->mv_data, ksize);
+				memcpy(ins+ksize, split + x * ksize, rsize - x * ksize);
+				rp->mp_lower += sizeof(indx_t);
+				rp->mp_upper -= ksize - sizeof(indx_t);
+				mc->mc_ki[mc->mc_top] = x;
+				mc->mc_pg[mc->mc_top] = rp;
+			}
 		} else {
-			if (x)
-				memcpy(rp->mp_ptrs, split, x * ksize);
-			ins = LEAF2KEY(rp, x, ksize);
-			memcpy(ins, newkey->mv_data, ksize);
-			memcpy(ins+ksize, split + x * ksize, rsize - x * ksize);
-			rp->mp_lower += sizeof(indx_t);
-			rp->mp_upper -= ksize - sizeof(indx_t);
-			mc->mc_ki[mc->mc_top] = x;
-			mc->mc_pg[mc->mc_top] = rp;
-		}
-		goto newsep;
-	}
+			unsigned int psize, nsize, tsize;
+			int k;
+			/* Maximum free space in an empty page */
+			pmax = env->me_psize - PAGEHDRSZ;
+			if (IS_LEAF(mp))
+				nsize = mdb_leaf_size(env, newkey, newdata);
+			else
+				nsize = mdb_branch_size(env, newkey);
+			nsize += nsize & 1;
 
-	/* For leaf pages, check the split point based on what
-	 * fits where, since otherwise mdb_node_add can fail.
-	 *
-	 * This check is only needed when the data items are
-	 * relatively large, such that being off by one will
-	 * make the difference between success or failure.
-	 *
-	 * It's also relevant if a page happens to be laid out
-	 * such that one half of its nodes are all "small" and
-	 * the other half of its nodes are "large." If the new
-	 * item is also "large" and falls on the half with
-	 * "large" nodes, it also may not fit.
-	 */
-	if (IS_LEAF(mp)) {
-		unsigned int psize, nsize;
-		/* Maximum free space in an empty page */
-		pmax = mc->mc_txn->mt_env->me_psize - PAGEHDRSZ;
-		nsize = mdb_leaf_size(mc->mc_txn->mt_env, newkey, newdata);
-		if ((nkeys < 20) || (nsize > pmax/16)) {
-			if (newindx <= split_indx) {
-				psize = nsize;
-				newpos = 0;
-				for (i=0; i<split_indx; i++) {
-					node = NODEPTR(mp, i);
-					psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
-					if (F_ISSET(node->mn_flags, F_BIGDATA))
-						psize += sizeof(pgno_t);
-					else
-						psize += NODEDSZ(node);
-					psize += psize & 1;
-					if (psize > pmax) {
-						if (i <= newindx) {
-							split_indx = newindx;
-							if (i < newindx)
-								newpos = 1;
+			/* grab a page to hold a temporary copy */
+			copy = mdb_page_malloc(mc->mc_txn, 1);
+			if (copy == NULL)
+				return ENOMEM;
+			copy->mp_pgno  = mp->mp_pgno;
+			copy->mp_flags = mp->mp_flags;
+			copy->mp_lower = PAGEHDRSZ;
+			copy->mp_upper = env->me_psize;
+
+			/* prepare to insert */
+			for (i=0, j=0; i<nkeys; i++) {
+				if (i == newindx) {
+					copy->mp_ptrs[j++] = 0;
+				}
+				copy->mp_ptrs[j++] = mp->mp_ptrs[i];
+			}
+
+			/* When items are relatively large the split point needs
+			 * to be checked, because being off-by-one will make the
+			 * difference between success or failure in mdb_node_add.
+			 *
+			 * It's also relevant if a page happens to be laid out
+			 * such that one half of its nodes are all "small" and
+			 * the other half of its nodes are "large." If the new
+			 * item is also "large" and falls on the half with
+			 * "large" nodes, it also may not fit.
+			 *
+			 * As a final tweak, if the new item goes on the last
+			 * spot on the page (and thus, onto the new page), bias
+			 * the split so the new page is emptier than the old page.
+			 * This yields better packing during sequential inserts.
+			 */
+			if (nkeys < 20 || nsize > pmax/16 || newindx >= nkeys) {
+				/* Find split point */
+				psize = 0;
+				if (newindx <= split_indx || newindx >= nkeys) {
+					i = 0; j = 1;
+					k = newindx >= nkeys ? nkeys : split_indx+1;
+				} else {
+					i = nkeys; j = -1;
+					k = split_indx-1;
+				}
+				for (; i!=k; i+=j) {
+					if (i == newindx) {
+						tsize = nsize;
+						node = NULL;
+					} else {
+						node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]);
+						tsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t);
+						if (IS_LEAF(mp)) {
+							if (F_ISSET(node->mn_flags, F_BIGDATA))
+								tsize += sizeof(pgno_t);
+							else
+								tsize += NODEDSZ(node);
 						}
-						else
-							split_indx = i;
-						break;
+						tsize += tsize & 1;
 					}
-				}
-			} else {
-				psize = nsize;
-				for (i=nkeys-1; i>=split_indx; i--) {
-					node = NODEPTR(mp, i);
-					psize += NODESIZE + NODEKSZ(node) + sizeof(indx_t);
-					if (F_ISSET(node->mn_flags, F_BIGDATA))
-						psize += sizeof(pgno_t);
-					else
-						psize += NODEDSZ(node);
-					psize += psize & 1;
-					if (psize > pmax) {
-						if (i >= newindx) {
-							split_indx = newindx;
-							newpos = 0;
-						} else
-							split_indx = i+1;
+					if (psize + tsize > pmax) {
+						split_indx = i + (j<0);
 						break;
 					}
+					psize += tsize;
 				}
+				/* special case: when the new node was on the last
+				 * slot we may not have tripped the break inside the loop.
+				 * In all other cases we either hit the break condition,
+				 * or the original split_indx was already safe.
+				 */
+				if (newindx >= nkeys && i == k)
+					split_indx = nkeys-1;
+			}
+			if (split_indx == newindx) {
+				sepkey.mv_size = newkey->mv_size;
+				sepkey.mv_data = newkey->mv_data;
+			} else {
+				node = (MDB_node *)((char *)mp + copy->mp_ptrs[split_indx]);
+				sepkey.mv_size = node->mn_ksize;
+				sepkey.mv_data = NODEKEY(node);
 			}
 		}
 	}
 
-	/* First find the separating key between the split pages.
-	 * The case where newindx == split_indx is ambiguous; the
-	 * new item could go to the new page or stay on the original
-	 * page. If newpos == 1 it goes to the new page.
-	 */
-	if (newindx == split_indx && newpos) {
-		sepkey.mv_size = newkey->mv_size;
-		sepkey.mv_data = newkey->mv_data;
-	} else {
-		node = NODEPTR(mp, split_indx);
-		sepkey.mv_size = node->mn_ksize;
-		sepkey.mv_data = NODEKEY(node);
-	}
-
-newsep:
-	DPRINTF(("separator is [%s]", DKEY(&sepkey)));
+	DPRINTF(("separator is %d [%s]", split_indx, DKEY(&sepkey)));
 
 	/* Copy separator key to the parent.
 	 */
-	if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(mc->mc_txn->mt_env, &sepkey)) {
+	if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) {
 		mn.mc_snum--;
 		mn.mc_top--;
 		did_split = 1;
@@ -7619,108 +7628,91 @@ newsep:
 			return rc;
 		for (i=0; i<mc->mc_top; i++)
 			mc->mc_ki[i] = mn.mc_ki[i];
-		goto done;
-	}
-	if (IS_LEAF2(rp)) {
-		goto done;
-	}
-
-	/* Move half of the keys to the right sibling. */
+	} else if (!IS_LEAF2(mp)) {
+		/* Move nodes */
+		mc->mc_pg[mc->mc_top] = rp;
+		i = split_indx;
+		j = 0;
+		do {
+			if (i == newindx) {
+				rkey.mv_data = newkey->mv_data;
+				rkey.mv_size = newkey->mv_size;
+				if (IS_LEAF(mp)) {
+					rdata = newdata;
+				} else
+					pgno = newpgno;
+				flags = nflags;
+				/* Update index for the new key. */
+				mc->mc_ki[mc->mc_top] = j;
+			} else {
+				node = (MDB_node *)((char *)mp + copy->mp_ptrs[i]);
+				rkey.mv_data = NODEKEY(node);
+				rkey.mv_size = node->mn_ksize;
+				if (IS_LEAF(mp)) {
+					xdata.mv_data = NODEDATA(node);
+					xdata.mv_size = NODEDSZ(node);
+					rdata = &xdata;
+				} else
+					pgno = NODEPGNO(node);
+				flags = node->mn_flags;
+			}
 
-	/* grab a page to hold a temporary copy */
-	copy = mdb_page_malloc(mc->mc_txn, 1);
-	if (copy == NULL)
-		return ENOMEM;
+			if (!IS_LEAF(mp) && j == 0) {
+				/* First branch index doesn't need key data. */
+				rkey.mv_size = 0;
+			}
 
-	copy->mp_pgno  = mp->mp_pgno;
-	copy->mp_flags = mp->mp_flags;
-	copy->mp_lower = PAGEHDRSZ;
-	copy->mp_upper = mc->mc_txn->mt_env->me_psize;
-	mc->mc_pg[mc->mc_top] = copy;
-	for (i = j = 0; i <= nkeys; j++) {
-		if (i == split_indx) {
-		/* Insert in right sibling. */
-		/* Reset insert index for right sibling. */
-			if (i != newindx || (newpos ^ ins_new)) {
+			rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
+			if (rc) {
+				/* return tmp page to freelist */
+				mdb_page_free(env, copy);
+				return rc;
+			}
+			if (i == nkeys) {
+				i = 0;
 				j = 0;
-				mc->mc_pg[mc->mc_top] = rp;
+				mc->mc_pg[mc->mc_top] = copy;
+			} else {
+				i++;
+				j++;
+			}
+		} while (i != split_indx);
+
+		nkeys = NUMKEYS(copy);
+		for (i=0; i<nkeys; i++)
+			mp->mp_ptrs[i] = copy->mp_ptrs[i];
+		mp->mp_lower = copy->mp_lower;
+		mp->mp_upper = copy->mp_upper;
+		memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1),
+			env->me_psize - copy->mp_upper);
+
+		/* reset back to original page */
+		if (newindx < split_indx) {
+			mc->mc_pg[mc->mc_top] = mp;
+			if (nflags & MDB_RESERVE) {
+				node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
+				if (!(node->mn_flags & F_BIGDATA))
+					newdata->mv_data = NODEDATA(node);
 			}
-		}
-
-		if (i == newindx && !ins_new) {
-			/* Insert the original entry that caused the split. */
-			rkey.mv_data = newkey->mv_data;
-			rkey.mv_size = newkey->mv_size;
-			if (IS_LEAF(mp)) {
-				rdata = newdata;
-			} else
-				pgno = newpgno;
-			flags = nflags;
-
-			ins_new = 1;
-
-			/* Update index for the new key. */
-			mc->mc_ki[mc->mc_top] = j;
-		} else if (i == nkeys) {
-			break;
 		} else {
-			node = NODEPTR(mp, i);
-			rkey.mv_data = NODEKEY(node);
-			rkey.mv_size = node->mn_ksize;
-			if (IS_LEAF(mp)) {
-				xdata.mv_data = NODEDATA(node);
-				xdata.mv_size = NODEDSZ(node);
-				rdata = &xdata;
-			} else
-				pgno = NODEPGNO(node);
-			flags = node->mn_flags;
-
-			i++;
-		}
-
-		if (!IS_LEAF(mp) && j == 0) {
-			/* First branch index doesn't need key data. */
-			rkey.mv_size = 0;
-		}
-
-		rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
-		if (rc) break;
-	}
-
-	nkeys = NUMKEYS(copy);
-	for (i=0; i<nkeys; i++)
-		mp->mp_ptrs[i] = copy->mp_ptrs[i];
-	mp->mp_lower = copy->mp_lower;
-	mp->mp_upper = copy->mp_upper;
-	memcpy(NODEPTR(mp, nkeys-1), NODEPTR(copy, nkeys-1),
-		mc->mc_txn->mt_env->me_psize - copy->mp_upper);
-
-	/* reset back to original page */
-	if (newindx < split_indx || (!newpos && newindx == split_indx)) {
-		mc->mc_pg[mc->mc_top] = mp;
-		if (nflags & MDB_RESERVE) {
-			node = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
-			if (!(node->mn_flags & F_BIGDATA))
-				newdata->mv_data = NODEDATA(node);
-		}
-	} else {
-		mc->mc_ki[ptop]++;
-		/* Make sure mc_ki is still valid.
-		 */
-		if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
-		    mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
-			for (i=0; i<ptop; i++) {
-				mc->mc_pg[i] = mn.mc_pg[i];
-				mc->mc_ki[i] = mn.mc_ki[i];
+			mc->mc_pg[mc->mc_top] = rp;
+			mc->mc_ki[ptop]++;
+			/* Make sure mc_ki is still valid.
+			 */
+			if (mn.mc_pg[ptop] != mc->mc_pg[ptop] &&
+				mc->mc_ki[ptop] >= NUMKEYS(mc->mc_pg[ptop])) {
+				for (i=0; i<ptop; i++) {
+					mc->mc_pg[i] = mn.mc_pg[i];
+					mc->mc_ki[i] = mn.mc_ki[i];
+				}
+				mc->mc_pg[ptop] = mn.mc_pg[ptop];
+				mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
 			}
-			mc->mc_pg[ptop] = mn.mc_pg[ptop];
-			mc->mc_ki[ptop] = mn.mc_ki[ptop] - 1;
 		}
+		/* return tmp page to freelist */
+		mdb_page_free(env, copy);
 	}
 
-	/* return tmp page to freelist */
-	mdb_page_free(mc->mc_txn->mt_env, copy);
-done:
 	{
 		/* Adjust other cursors pointing to mp */
 		MDB_cursor *m2, *m3;
@@ -7768,6 +7760,7 @@ done:
 			}
 		}
 	}
+	DPRINTF(("mp left: %d, rp left: %d", SIZELEFT(mp), SIZELEFT(rp)));
 	return rc;
 }
 
-- 
GitLab