return 0;
}
-int DBObjectMap::merge_new_complete(
- DBObjectMapIterator &iter,
- string *begin,
- const string &end,
- set<string> *complete_keys_to_remove) {
- assert(begin);
- assert(complete_keys_to_remove);
-
- string _begin = *begin;
- iter->in_complete_region(_begin, begin, nullptr);
-
- // see in_complete_region post condition
- auto &citer = iter->complete_iter;
- while (citer->valid() && (end.empty() || (citer->key() <= end))) {
- /* rm_keys takes end from a parent pointer, can't be the start
- * of a complete region */
- assert(citer->key() != end);
- /* same as above, parent pointers can't be in a current complete region
- */
- assert(citer->value().length() >= 1);
- assert(end.empty() ||
- string(citer->value().c_str(), citer->value().length() - 1) <= end);
- complete_keys_to_remove->insert(citer->key());
- citer->next();
- }
- return 0;
-}
-
int DBObjectMap::copy_up_header(Header header,
KeyValueDB::Transaction t)
{
return db->submit_transaction(t);
}
- // Copy up keys from parent around to_clear
- map<string, bufferlist> to_write;
- map<string, bufferlist> complete_to_write;
- set<string> complete_keys_to_remove;
- list<pair<string, string>> new_complete;
- bool keep_parent = true;
- auto cleared_to = to_clear.begin();
- {
- DBObjectMapIterator iter = _get_iterator(header);
- while (cleared_to != to_clear.end()) {
- string begin = *cleared_to;
- string end;
- unsigned count = 0;
- iter->lower_bound_parent(*cleared_to);
- while (true) {
- if (!iter->valid()) {
- cleared_to = to_clear.end();
- end = string();
- break;
- }
-
- if (count >= 20) {
- cleared_to = to_clear.lower_bound(iter->key());
- if (cleared_to == to_clear.end() ||
- *cleared_to != iter->key()) {
- end = iter->key();
- break;
- } else {
- // We don't want to create [a, c), [c, f), keep this entry going
- }
- }
-
- if (!to_clear.count(iter->key()))
- to_write[iter->key()] = iter->value();
-
- ++count;
- iter->next_parent();
- }
-
- merge_new_complete(iter, &begin, end, &complete_keys_to_remove);
+ assert(state.v < 3);
- bufferlist bl;
- bl.append(bufferptr(end.c_str(), end.size() + 1));
- complete_to_write.insert(make_pair(begin, bl));
+ {
+ // We only get here for legacy (v2) stores
+ // Copy up all keys from parent excluding to_clear
+ // and remove parent
+ // This eliminates a v2 format use of complete for this oid only
+ map<string, bufferlist> to_write;
+ ObjectMapIterator iter = _get_iterator(header);
+ for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
+ if (iter->status())
+ return iter->status();
+ if (!to_clear.count(iter->key()))
+ to_write[iter->key()] = iter->value();
}
+ t->set(user_prefix(header), to_write);
+ } // destruct iter which has parent in_use
-
- if (complete_to_write.size() == 1 &&
- complete_to_write.begin()->second.length() == 1) {
- iter->seek_to_first();
- if (iter->key() >= complete_to_write.begin()->first)
- keep_parent = false;
- }
- }
- t->set(user_prefix(header), to_write);
- if (keep_parent) {
- t->rmkeys(complete_prefix(header), complete_keys_to_remove);
- t->set(complete_prefix(header), complete_to_write);
- } else {
- copy_up_header(header, t);
- Header parent = lookup_parent(header);
- if (!parent)
- return -EINVAL;
- parent->num_children--;
- _clear(parent, t);
- header->parent = 0;
- set_map_header(hl, oid, *header, t);
- t->rmkeys_by_prefix(complete_prefix(header));
- }
+ copy_up_header(header, t);
+ Header parent = lookup_parent(header);
+ if (!parent)
+ return -EINVAL;
+ parent->num_children--;
+ _clear(parent, t);
+ header->parent = 0;
+ set_map_header(hl, oid, *header, t);
+ t->rmkeys_by_prefix(complete_prefix(header));
return db->submit_transaction(t);
}
return db->submit_transaction(t);
}
-int DBObjectMap::clone(const ghobject_t &oid,
+// ONLY USED FOR TESTING
+// Set version to 2 to avoid asserts
+int DBObjectMap::legacy_clone(const ghobject_t &oid,
const ghobject_t &target,
const SequencerPosition *spos)
{
+ state.v = 2;
+
if (oid == target)
return 0;
return db->submit_transaction(t);
}
+int DBObjectMap::clone(const ghobject_t &oid,
+ const ghobject_t &target,
+ const SequencerPosition *spos)
+{
+ if (oid == target)
+ return 0;
+
+ MapHeaderLock _l1(this, MIN_GHOBJ(oid, target, true));
+ MapHeaderLock _l2(this, MAX_GHOBJ(oid, target, true));
+ MapHeaderLock *lsource, *ltarget;
+ if (cmp_bitwise(oid, target) > 0) {
+ lsource = &_l2;
+ ltarget= &_l1;
+ } else {
+ lsource = &_l1;
+ ltarget= &_l2;
+ }
+
+ KeyValueDB::Transaction t = db->get_transaction();
+ {
+ Header destination = lookup_map_header(*ltarget, target);
+ if (destination) {
+ if (check_spos(target, destination, spos))
+ return 0;
+ destination->num_children--;
+ remove_map_header(*ltarget, target, destination, t);
+ _clear(destination, t);
+ }
+ }
+
+ Header source = lookup_map_header(*lsource, oid);
+ if (!source)
+ return db->submit_transaction(t);
+
+ Header destination = generate_new_header(target, Header());
+ if (spos)
+ destination->spos = *spos;
+
+ set_map_header(*ltarget, target, *destination, t);
+
+ bufferlist bl;
+ int r = _get_header(source, &bl);
+ if (r < 0)
+ return r;
+ _set_header(destination, bl, t);
+
+ map<string, bufferlist> to_set;
+ KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(source));
+ for (xattr_iter->seek_to_first();
+ xattr_iter->valid();
+ xattr_iter->next())
+ to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
+ t->set(xattr_prefix(destination), to_set);
+
+ map<string, bufferlist> to_write;
+ ObjectMapIterator iter = _get_iterator(source);
+ for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
+ if (iter->status())
+ return iter->status();
+ to_write[iter->key()] = iter->value();
+ }
+ t->set(user_prefix(destination), to_write);
+
+ return db->submit_transaction(t);
+}
+
int DBObjectMap::upgrade_to_v2()
{
dout(1) << __func__ << " start" << dendl;
}
} else {
// New store
- state.v = 2;
+ // Version 3 means that complete regions never used
+ state.v = 3;
state.seq = 1;
}
ostringstream ss;
dout(20) << "clear_header: clearing seq " << header->seq << dendl;
t->rmkeys_by_prefix(user_prefix(header));
t->rmkeys_by_prefix(sys_prefix(header));
- t->rmkeys_by_prefix(complete_prefix(header));
+ if (state.v < 3)
+ t->rmkeys_by_prefix(complete_prefix(header)); // Needed when header.parent != 0
t->rmkeys_by_prefix(xattr_prefix(header));
set<string> keys;
keys.insert(header_key(header->seq));
<< " ghobject=" << h.oid;
return out;
}
+
+int DBObjectMap::rename(const ghobject_t &from,
+ const ghobject_t &to,
+ const SequencerPosition *spos)
+{
+ if (from == to)
+ return 0;
+
+ MapHeaderLock _l1(this, MIN_GHOBJ(from, to, true));
+ MapHeaderLock _l2(this, MAX_GHOBJ(from, to, true));
+ MapHeaderLock *lsource, *ltarget;
+ if (cmp_bitwise(from, to) > 0) {
+ lsource = &_l2;
+ ltarget= &_l1;
+ } else {
+ lsource = &_l1;
+ ltarget= &_l2;
+ }
+
+ KeyValueDB::Transaction t = db->get_transaction();
+ {
+ Header destination = lookup_map_header(*ltarget, to);
+ if (destination) {
+ if (check_spos(to, destination, spos))
+ return 0;
+ destination->num_children--;
+ remove_map_header(*ltarget, to, destination, t);
+ _clear(destination, t);
+ }
+ }
+
+ Header hdr = lookup_map_header(*lsource, from);
+ if (!hdr)
+ return db->submit_transaction(t);
+
+ remove_map_header(*lsource, from, hdr, t);
+ hdr->oid = to;
+ set_map_header(*ltarget, to, *hdr, t);
+
+ return db->submit_transaction(t);
+}
db->clone(hoid, hoid2);
}
+ void rename(const string &objname, const string &target) {
+ rename(ghobject_t(hobject_t(sobject_t(objname, CEPH_NOSNAP))),
+ ghobject_t(hobject_t(sobject_t(target, CEPH_NOSNAP))));
+ }
+
+ void rename(ghobject_t hoid,
+ ghobject_t hoid2) {
+ db->rename(hoid, hoid2);
+ }
+
void clear(const string &objname) {
clear(ghobject_t(hobject_t(sobject_t(objname, CEPH_NOSNAP))));
}
+ void legacy_clone(const string &objname, const string &target) {
+ legacy_clone(ghobject_t(hobject_t(sobject_t(objname, CEPH_NOSNAP))),
+ ghobject_t(hobject_t(sobject_t(target, CEPH_NOSNAP))));
+ }
+
+ void legacy_clone(ghobject_t hoid,
+ ghobject_t hoid2) {
+ db->legacy_clone(hoid, hoid2);
+ }
+
void clear(ghobject_t hoid) {
db->clear(hoid);
}
db->clear(hoid2);
}
+TEST_F(ObjectMapTest, Rename) {
+ ghobject_t hoid(hobject_t(sobject_t("foo", CEPH_NOSNAP)));
+ ghobject_t hoid2(hobject_t(sobject_t("foo2", CEPH_NOSNAP)));
+
+ for (unsigned i = 0; i < 1000; ++i) {
+ tester.set_key(hoid, "foo" + num_str(i), "bar" + num_str(i));
+ }
+
+ db->rename(hoid, hoid2);
+ // Verify rename where target exists
+ db->clone(hoid2, hoid);
+ db->rename(hoid, hoid2);
+
+ int r = 0;
+ for (unsigned i = 0; i < 1000; ++i) {
+ string result;
+ r = tester.get_key(hoid2, "foo" + num_str(i), &result);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ("bar" + num_str(i), result);
+
+ if (i % 2) {
+ tester.remove_key(hoid2, "foo" + num_str(i));
+ }
+ }
+
+ for (unsigned i = 0; i < 1000; ++i) {
+ string result;
+ r = tester.get_key(hoid2, "foo" + num_str(i), &result);
+ if (i % 2) {
+ ASSERT_EQ(0, r);
+ } else {
+ ASSERT_EQ(1, r);
+ ASSERT_EQ("bar" + num_str(i), result);
+ }
+ }
+
+ {
+ ObjectMap::ObjectMapIterator iter = db->get_iterator(hoid2);
+ iter->seek_to_first();
+ for (unsigned i = 0; i < 1000; ++i) {
+ if (!(i % 2)) {
+ ASSERT_TRUE(iter->valid());
+ ASSERT_EQ("foo" + num_str(i), iter->key());
+ iter->next();
+ }
+ }
+ }
+
+ db->clear(hoid2);
+}
+
+TEST_F(ObjectMapTest, OddEvenOldClone) {
+ ghobject_t hoid(hobject_t(sobject_t("foo", CEPH_NOSNAP)));
+ ghobject_t hoid2(hobject_t(sobject_t("foo2", CEPH_NOSNAP)));
+
+ for (unsigned i = 0; i < 1000; ++i) {
+ tester.set_key(hoid, "foo" + num_str(i), "bar" + num_str(i));
+ }
+
+ db->legacy_clone(hoid, hoid2);
+
+ int r = 0;
+ for (unsigned i = 0; i < 1000; ++i) {
+ string result;
+ r = tester.get_key(hoid, "foo" + num_str(i), &result);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ("bar" + num_str(i), result);
+ r = tester.get_key(hoid2, "foo" + num_str(i), &result);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ("bar" + num_str(i), result);
+
+ if (i % 2) {
+ tester.remove_key(hoid, "foo" + num_str(i));
+ } else {
+ tester.remove_key(hoid2, "foo" + num_str(i));
+ }
+ }
+
+ for (unsigned i = 0; i < 1000; ++i) {
+ string result;
+ string result2;
+ r = tester.get_key(hoid, "foo" + num_str(i), &result);
+ int r2 = tester.get_key(hoid2, "foo" + num_str(i), &result2);
+ if (i % 2) {
+ ASSERT_EQ(0, r);
+ ASSERT_EQ(1, r2);
+ ASSERT_EQ("bar" + num_str(i), result2);
+ } else {
+ ASSERT_EQ(0, r2);
+ ASSERT_EQ(1, r);
+ ASSERT_EQ("bar" + num_str(i), result);
+ }
+ }
+
+ {
+ ObjectMap::ObjectMapIterator iter = db->get_iterator(hoid);
+ iter->seek_to_first();
+ for (unsigned i = 0; i < 1000; ++i) {
+ if (!(i % 2)) {
+ ASSERT_TRUE(iter->valid());
+ ASSERT_EQ("foo" + num_str(i), iter->key());
+ iter->next();
+ }
+ }
+ }
+
+ {
+ ObjectMap::ObjectMapIterator iter2 = db->get_iterator(hoid2);
+ iter2->seek_to_first();
+ for (unsigned i = 0; i < 1000; ++i) {
+ if (i % 2) {
+ ASSERT_TRUE(iter2->valid());
+ ASSERT_EQ("foo" + num_str(i), iter2->key());
+ iter2->next();
+ }
+ }
+ }
+
+ db->clear(hoid);
+ db->clear(hoid2);
+}
+
TEST_F(ObjectMapTest, RandomTest) {
tester.def_init();
for (unsigned i = 0; i < 5000; ++i) {