CollectionIndex* dest //< [in] destination index
) { ceph_abort(); return 0; }
+ virtual int merge(
+ uint32_t bits, //< [in] common (target) bits
+ CollectionIndex* dest //< [in] destination index
+ ) { ceph_abort(); return 0; }
+
/// List contents of collection by hash
virtual int collection_list_partial(
if (!collection_exists(cid)) {
dout(2) << __FUNC__ << ": " << cid << " DNE" << dendl;
- assert(replaying);
+ ceph_assert(replaying);
return 0;
}
if (!collection_exists(dest)) {
dout(2) << __FUNC__ << ": " << dest << " DNE" << dendl;
- assert(replaying);
+ ceph_assert(replaying);
return 0;
}
if (_check_replay_guard(cid, spos) > 0)
_collection_set_bits(dest, bits);
- // move everything
spg_t pgid;
bool is_pg = dest.is_pg(&pgid);
- assert(is_pg);
- r = _split_collection(cid, bits, pgid.pgid.ps(), dest, spos);
- if (r < 0)
- return r;
+ ceph_assert(is_pg);
- // temp too!
- r = _split_collection(cid.get_temp(), bits, pgid.pgid.ps(), dest.get_temp(),
- spos);
- if (r < 0)
- return r;
+ {
+ int dstcmp = _check_replay_guard(dest, spos);
+ if (dstcmp < 0)
+ return 0;
+
+ int srccmp = _check_replay_guard(cid, spos);
+ if (srccmp < 0)
+ return 0;
+
+ _set_global_replay_guard(cid, spos);
+ _set_replay_guard(cid, spos, true);
+ _set_replay_guard(dest, spos, true);
+
+ Index from;
+ r = get_index(cid, &from);
+
+ Index to;
+ if (!r)
+ r = get_index(dest, &to);
+
+ if (!r) {
+ ceph_assert(from.index);
+ RWLock::WLocker l1((from.index)->access_lock);
+
+ ceph_assert(to.index);
+ RWLock::WLocker l2((to.index)->access_lock);
+
+ r = from->merge(bits, to.index);
+ }
+
+ _close_replay_guard(cid, spos);
+ _close_replay_guard(dest, spos);
+ }
+
+ // temp too
+ {
+ int dstcmp = _check_replay_guard(dest.get_temp(), spos);
+ if (dstcmp < 0)
+ return 0;
+
+ int srccmp = _check_replay_guard(cid.get_temp(), spos);
+ if (srccmp < 0)
+ return 0;
+
+ _set_global_replay_guard(cid.get_temp(), spos);
+ _set_replay_guard(cid.get_temp(), spos, true);
+ _set_replay_guard(dest.get_temp(), spos, true);
+
+ Index from;
+ r = get_index(cid.get_temp(), &from);
+
+ Index to;
+ if (!r)
+ r = get_index(dest.get_temp(), &to);
+
+ if (!r) {
+ ceph_assert(from.index);
+ RWLock::WLocker l1((from.index)->access_lock);
+
+ ceph_assert(to.index);
+ RWLock::WLocker l2((to.index)->access_lock);
+
+ r = from->merge(bits, to.index);
+ }
+
+ _close_replay_guard(cid.get_temp(), spos);
+ _close_replay_guard(dest.get_temp(), spos);
+
+ }
// remove source
if (_check_replay_guard(cid, spos) > 0)
if (!i->match(bits, pgid.pgid.ps())) {
dout(20) << __FUNC__ << ": " << *i << " does not belong in "
<< cid << dendl;
- assert(i->match(bits, pgid.pgid.ps()));
+ ceph_assert(i->match(bits, pgid.pgid.ps()));
}
}
objects.clear();
return 0;
}
+int HashIndex::_merge(
+ uint32_t bits,
+ CollectionIndex* dest) {
+ dout(20) << __func__ << " bits " << bits << dendl;
+ ceph_assert(collection_version() == dest->collection_version());
+
+ vector<string> emptypath;
+
+ // pre-split to common/target level so that any shared prefix DIR_?
+ // directories already exist at the destination. Since each
+ // directory is a nibble (4 bits),
+ unsigned shared = bits / 4;
+ dout(20) << __func__ << " pre-splitting to shared level " << shared << dendl;
+ if (shared) {
+ split_dirs(emptypath, shared);
+ ((HashIndex*)dest)->split_dirs(emptypath, shared);
+ }
+
+ // now merge the contents
+ _merge_dirs(*this, *(HashIndex*)dest, emptypath);
+
+ return 0;
+}
+
+int HashIndex::_merge_dirs(
+ HashIndex& from,
+ HashIndex& to,
+ const vector<string>& path)
+{
+ dout(20) << __func__ << " path " << path << dendl;
+ int r;
+
+ vector<string> src_subs, dst_subs;
+ r = from.list_subdirs(path, &src_subs);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "from.list_subdirs"
+ << dendl;
+ return r;
+ }
+ r = to.list_subdirs(path, &dst_subs);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "to.list_subdirs"
+ << dendl;
+ return r;
+ }
+
+ for (auto& i : src_subs) {
+ if (std::find(dst_subs.begin(), dst_subs.end(), i) == dst_subs.end()) {
+ // move it
+ r = move_subdir(from, to, path, i);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "move_subdir(...,"
+ << path << "," << i << ")"
+ << dendl;
+ return r;
+ }
+ } else {
+ // common, recurse!
+ vector<string> nested = path;
+ nested.push_back(i);
+ r = _merge_dirs(from, to, nested);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "rec _merge_dirs"
+ << dendl;
+ return r;
+ }
+
+ // now remove it
+ r = remove_path(nested);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "remove_path "
+ << nested
+ << dendl;
+ return r;
+ }
+ }
+ }
+
+ // objects
+ map<string, ghobject_t> objects;
+ r = from.list_objects(path, 0, 0, &objects);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "from.list_objects"
+ << dendl;
+ return r;
+ }
+
+ for (auto& i : objects) {
+ r = move_object(from, to, path, i);
+ if (r < 0) {
+ lgeneric_subdout(g_ceph_context,filestore,20) << __func__
+ << " r " << r << " from "
+ << "move_object(...,"
+ << path << "," << i << ")"
+ << dendl;
+ return r;
+ }
+ }
+
+ return 0;
+}
+
+
int HashIndex::_split(
uint32_t match,
uint32_t bits,
CollectionIndex* dest) {
ceph_assert(collection_version() == dest->collection_version());
unsigned mkdirred = 0;
+
return col_split_level(
*this,
*static_cast<HashIndex*>(dest),
CollectionIndex* dest
) override;
+ /// @see CollectionIndex
+ int _merge(
+ uint32_t bits,
+ CollectionIndex* dest
+ ) override;
+
+ int _merge_dirs(
+ HashIndex& from,
+ HashIndex& to,
+ const vector<string>& path);
+
/// @see CollectionIndex
int apply_layout_settings(int target_level) override;
uint32_t bits, //< [in] bits to check
CollectionIndex* dest //< [in] destination index
) = 0;
+ virtual int _merge(
+ uint32_t bits, //< [in] bits for target
+ CollectionIndex* dest //< [in] destination index
+ ) = 0;
/// @see CollectionIndex
int split(
);
}
+ /// @see CollectionIndex
+ int merge(
+ uint32_t bits,
+ CollectionIndex* dest
+ ) override {
+ WRAP_RETRY(
+ r = _merge(bits, dest);
+ goto out;
+ );
+ }
+
/**
* Returns the length of the longest escaped name which could result
* from any clone, shard, or rollback object of this object
}
#endif
+void test_merge_skewed(ObjectStore *store,
+ unsigned base, unsigned bits,
+ unsigned anum, unsigned bnum)
+{
+ cout << __func__ << " 0x" << std::hex << base << std::dec
+ << " bits " << bits
+ << " anum " << anum << " bnum " << bnum << std::endl;
+ /*
+ make merge source pgs have radically different # of objects in them,
+ which should trigger different splitting in filestore, and verify that
+ post-merge all objects are accessible.
+ */
+ int r;
+ coll_t a(spg_t(pg_t(base, 0), shard_id_t::NO_SHARD));
+ coll_t b(spg_t(pg_t(base | (1<<bits), 0), shard_id_t::NO_SHARD));
+
+ auto cha = store->create_new_collection(a);
+ auto chb = store->create_new_collection(b);
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(a, bits + 1);
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(b, bits + 1);
+ r = queue_transaction(store, chb, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+
+ bufferlist small;
+ small.append("small");
+ string suffix = "ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooaaaaaaaaaa";
+ set<ghobject_t> aobjects, bobjects;
+ {
+ // fill a
+ ObjectStore::Transaction t;
+ for (unsigned i = 0; i < 1000; ++i) {
+ string objname = "a" + stringify(i) + suffix;
+ ghobject_t o(hobject_t(
+ objname,
+ "",
+ CEPH_NOSNAP,
+ i<<(bits+1) | base,
+ 52, ""));
+ aobjects.insert(o);
+ t.write(a, o, 0, small.length(), small, 0);
+ if (i % 100) {
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ t = ObjectStore::Transaction();
+ }
+ }
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+ {
+ // fill b
+ ObjectStore::Transaction t;
+ for (unsigned i = 0; i < 10; ++i) {
+ string objname = "b" + stringify(i) + suffix;
+ ghobject_t o(hobject_t(
+ objname,
+ "",
+ CEPH_NOSNAP,
+ (i<<(base+1)) | base | (1<<bits),
+ 52, ""));
+ bobjects.insert(o);
+ t.write(b, o, 0, small.length(), small, 0);
+ if (i % 100) {
+ r = queue_transaction(store, chb, std::move(t));
+ ASSERT_EQ(r, 0);
+ t = ObjectStore::Transaction();
+ }
+ }
+ r = queue_transaction(store, chb, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+
+ // merge b->a
+ {
+ ObjectStore::Transaction t;
+ t.merge_collection(b, a, bits);
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+
+ // verify
+ {
+ vector<ghobject_t> got;
+ store->collection_list(cha, ghobject_t(), ghobject_t::get_max(), INT_MAX,
+ &got, 0);
+ set<ghobject_t> gotset;
+ for (auto& o : got) {
+ ASSERT_TRUE(aobjects.count(o) || bobjects.count(o));
+ gotset.insert(o);
+ }
+ // check both listing and stat-ability (different code paths!)
+ struct stat st;
+ for (auto& o : aobjects) {
+ ASSERT_TRUE(gotset.count(o));
+ int r = store->stat(cha, o, &st, false);
+ ASSERT_EQ(r, 0);
+ }
+ for (auto& o : bobjects) {
+ ASSERT_TRUE(gotset.count(o));
+ int r = store->stat(cha, o, &st, false);
+ ASSERT_EQ(r, 0);
+ }
+ }
+
+ // clean up
+ {
+ ObjectStore::Transaction t;
+ for (auto &o : aobjects) {
+ t.remove(a, o);
+ }
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+ {
+ ObjectStore::Transaction t;
+ for (auto &o : bobjects) {
+ t.remove(a, o);
+ }
+ t.remove_collection(a);
+ r = queue_transaction(store, cha, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+}
+
+TEST_P(StoreTest, MergeSkewed) {
+ if (string(GetParam()) != "filestore")
+ return;
+
+ // this is sufficient to exercise merges with different hashing levels
+ test_merge_skewed(store.get(), 0xf, 4, 10, 10000);
+ test_merge_skewed(store.get(), 0xf, 4, 10000, 10);
+
+ /*
+ // this covers a zillion variations that all boil down to the same thing
+ for (unsigned base = 3; base < 0x1000; base *= 5) {
+ unsigned bits;
+ unsigned t = base;
+ for (bits = 0; t; t >>= 1) {
+ ++bits;
+ }
+ for (unsigned b = bits; b < bits + 10; b += 3) {
+ for (auto anum : { 10, 1000, 10000 }) {
+ for (auto bnum : { 10, 1000, 10000 }) {
+ if (anum == bnum) {
+ continue;
+ }
+ test_merge_skewed(store.get(), base, b, anum, bnum);
+ }
+ }
+ }
+ }
+ */
+}
+
+
/**
* This test tests adding two different groups
* of objects, each with 1 common prefix and 1
uint32_t bits,
CollectionIndex* dest
) override { return 0; }
+ int _merge(
+ uint32_t bits,
+ CollectionIndex* dest
+ ) override { return 0; }
void test_generate_and_parse(const ghobject_t &hoid, const std::string &mangled_expected) {
const std::string mangled_name = lfn_generate_object_name(hoid);