This will simplify adding a replay guard to create_collection.
Backport: bobtail
Signed-off-by: Samuel Just <sam.just@inktank.com>
}
break;
case Transaction::OP_SPLIT_COLLECTION:
+ {
+ coll_t cid(i.get_cid());
+ uint32_t bits(i.get_u32());
+ uint32_t rem(i.get_u32());
+ coll_t dest(i.get_cid());
+ r = _split_collection_create(cid, bits, rem, dest, spos);
+ }
+ break;
+ case Transaction::OP_SPLIT_COLLECTION2:
{
coll_t cid(i.get_cid());
uint32_t bits(i.get_u32());
uint32_t rem,
coll_t dest,
const SequencerPosition &spos)
+{
+ dout(15) << __func__ << " " << cid << " bits: " << bits << dendl;
+ int dstcmp = _check_replay_guard(dest, spos);
+ if (dstcmp < 0)
+ return 0;
+ if (dstcmp > 0 && !collection_empty(dest))
+ return -ENOTEMPTY;
+
+ int srccmp = _check_replay_guard(cid, spos);
+ if (srccmp < 0)
+ return 0;
+
+ _set_replay_guard(cid, spos, true);
+ _set_replay_guard(dest, spos, true);
+
+ Index from;
+ int r = get_index(cid, &from);
+
+ Index to;
+ if (!r)
+ r = get_index(dest, &to);
+
+ if (!r)
+ r = from->split(rem, bits, to);
+
+ _close_replay_guard(cid, spos);
+ _close_replay_guard(dest, spos);
+ return r;
+}
+
+// DEPRECATED: remove once we are sure there won't be any such transactions
+// replayed
+int FileStore::_split_collection_create(coll_t cid,
+ uint32_t bits,
+ uint32_t rem,
+ coll_t dest,
+ const SequencerPosition &spos)
{
dout(15) << __func__ << " " << cid << " bits: " << bits << dendl;
int r = _create_collection(dest);
const SequencerPosition &spos);
int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest,
const SequencerPosition &spos);
+ int _split_collection_create(coll_t cid, uint32_t bits, uint32_t rem,
+ coll_t dest,
+ const SequencerPosition &spos);
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const struct md_config_t *conf,
break;
case Transaction::OP_SPLIT_COLLECTION:
+ {
+ coll_t cid(i.get_cid());
+ uint32_t bits(i.get_u32());
+ uint32_t rem(i.get_u32());
+ coll_t dest(i.get_cid());
+ f->dump_string("op_name", "op_split_collection_create");
+ f->dump_stream("collection") << cid;
+ f->dump_stream("bits") << bits;
+ f->dump_stream("rem") << rem;
+ f->dump_stream("dest") << dest;
+ }
+
+ case Transaction::OP_SPLIT_COLLECTION2:
{
coll_t cid(i.get_cid());
uint32_t bits(i.get_u32());
OP_OMAP_RMKEYS = 33, // cid, keyset
OP_OMAP_SETHEADER = 34, // cid, header
OP_SPLIT_COLLECTION = 35, // cid, bits, destination
+ OP_SPLIT_COLLECTION2 = 36, /* cid, bits, destination
+ doesn't create the destination */
};
private:
uint32_t bits,
uint32_t rem,
coll_t destination) {
- __u32 op = OP_SPLIT_COLLECTION;
+ __u32 op = OP_SPLIT_COLLECTION2;
::encode(op, tbl);
::encode(cid, tbl);
::encode(bits, tbl);
dout(10) << "m_seed " << i->ps() << dendl;
dout(10) << "split_bits is " << split_bits << dendl;
+ rctx->transaction->create_collection(
+ coll_t(*i));
rctx->transaction->split_collection(
coll_t(parent->info.pgid),
split_bits,
++k) {
for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len();
++j) {
+ rctx->transaction->create_collection(
+ coll_t(*i, j));
rctx->transaction->split_collection(
coll_t(parent->info.pgid, j),
split_bits,