}
static const string RBD_GROUP_SNAP_KEY_PREFIX = "snapshot_";
+static const string RBD_GROUP_SNAP_ORDER_KEY_PREFIX = "snap_order_";
std::string snap_key(const std::string &snap_id) {
ostringstream oss;
return oss.str();
}
+std::string snap_order_key(const std::string &snap_id) {
+ ostringstream oss;
+ oss << RBD_GROUP_SNAP_ORDER_KEY_PREFIX << snap_id;
+ return oss.str();
+}
+
+std::string snap_id_from_order_key(const string &key) {
+ return key.substr(RBD_GROUP_SNAP_ORDER_KEY_PREFIX.size());
+}
+
int snap_list(cls_method_context_t hctx, cls::rbd::GroupSnapshot start_after,
uint64_t max_return,
- std::vector<cls::rbd::GroupSnapshot> *group_snaps)
-{
+ std::vector<cls::rbd::GroupSnapshot> *group_snaps) {
int max_read = RBD_MAX_KEYS_READ;
std::map<string, bufferlist> vals;
string last_read = snap_key(start_after.id);
if (!vals.empty()) {
last_read = vals.rbegin()->first;
+ } else {
+ ceph_assert(!more);
}
} while (more && (group_snaps->size() < max_return));
if (r < 0 && r != -ENOENT) {
return r;
} else if (r >= 0) {
+ CLS_ERR("snap key already exists : %s", key.c_str());
+ return -EEXIST;
+ }
+
+ std::string order_key = group::snap_order_key(group_snap.id);
+ r = cls_cxx_map_get_val(hctx, order_key, &snap_bl);
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ } else if (r >= 0) {
+ CLS_ERR("order key already exists : %s", order_key.c_str());
return -EEXIST;
}
+
+ std::string last_read = group::snap_order_key("");
+ bool more;
+ uint64_t max_order = 0;
+ std::map<string, bufferlist> vals;
+ do {
+ int r = cls_cxx_map_get_vals(hctx, last_read,
+ group::RBD_GROUP_SNAP_ORDER_KEY_PREFIX,
+ RBD_MAX_KEYS_READ, &vals, &more);
+ if (r < 0) {
+ return r;
+ }
+ for (auto &[key, bl] : vals) {
+ auto iter = bl.cbegin();
+ uint64_t order;
+ try {
+ decode(order, iter);
+ } catch (const ceph::buffer::error &err) {
+ CLS_ERR("error decoding snapshot order: %s", key.c_str());
+ return -EIO;
+ }
+ max_order = std::max(max_order, order);
+ }
+ if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ } else {
+ ceph_assert(!more);
+ }
+ } while (more);
+
+ bufferlist bl;
+ encode(++max_order, bl);
+ r = cls_cxx_map_set_val(hctx, order_key, &bl);
+ if (r < 0) {
+ CLS_ERR("error setting key: %s : %s", order_key.c_str(),
+ cpp_strerror(r).c_str());
+ return r;
+ }
}
bufferlist obl;
encode(group_snap, obl);
r = cls_cxx_map_set_val(hctx, key, &obl);
- return r;
+ if (r < 0) {
+ CLS_ERR("error setting key: %s : %s", key.c_str(), cpp_strerror(r).c_str());
+ return r;
+ }
+ return 0;
}
/**
CLS_LOG(20, "removing snapshot with key %s", snap_key.c_str());
int r = cls_cxx_map_remove_key(hctx, snap_key);
- return r;
+ if (r < 0) {
+ CLS_ERR("error removing snapshot with key %s : %s", snap_key.c_str(),
+ cpp_strerror(r).c_str());
+ return r;
+ }
+
+ std::string snap_order_key = group::snap_order_key(snap_id);
+ r = cls_cxx_map_remove_key(hctx, snap_order_key);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("error removing snapshot order key %s : %s", snap_order_key.c_str(),
+ cpp_strerror(r).c_str());
+ return r;
+ }
+
+ return 0;
}
/**
ASSERT_EQ(0, ioctx.omap_get_keys(group_id, "", 10, &keys));
auto it = keys.begin();
- ASSERT_EQ(1U, keys.size());
+ ASSERT_EQ(2U, keys.size());
- string snap_key = "snapshot_" + stringify(snap.id);
- ASSERT_EQ(snap_key, *it);
+ ASSERT_EQ("snap_order_" + snap.id, *it++);
+ ASSERT_EQ("snapshot_" + snap.id, *it);
}
TEST_F(TestClsRbd, group_snap_list) {
}
cls::rbd::GroupSnapshot last_snap = *snapshots.rbegin();
-
ASSERT_EQ(0, group_snap_list(&ioctx, group_id, last_snap, 10, &snapshots));
ASSERT_EQ(5U, snapshots.size());
for (int i = 10; i < 15; ++i) {
TEST_F(TestClsRbd, group_snap_remove) {
librados::IoCtx ioctx;
-
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
string group_id = "group_id_snap_remove";
ASSERT_EQ(0, ioctx.omap_get_keys(group_id, "", 10, &keys));
auto it = keys.begin();
- ASSERT_EQ(1U, keys.size());
+ ASSERT_EQ(2U, keys.size());
- string snap_key = "snapshot_" + stringify(snap.id);
- ASSERT_EQ(snap_key, *it);
+ ASSERT_EQ("snap_order_" + snap.id, *it++);
+ ASSERT_EQ("snapshot_" + snap.id, *it);
// Remove the snapshot
ASSERT_EQ(0U, keys.size());
}
+TEST_F(TestClsRbd, group_snap_remove_without_order) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ string group_id = "group_id_snap_remove_without_order";
+ ASSERT_EQ(0, ioctx.create(group_id, true));
+
+ string snap_id = "snap_id";
+ cls::rbd::GroupSnapshot snap = {snap_id, "test_snapshot",
+ cls::rbd::GROUP_SNAPSHOT_STATE_INCOMPLETE};
+ ASSERT_EQ(0, group_snap_set(&ioctx, group_id, snap));
+
+ // Simulate an older snapshot by removing the order key
+ set<string> keys = {"snap_order_" + snap.id};
+ ASSERT_EQ(0, ioctx.omap_rm_keys(group_id, keys));
+
+ ASSERT_EQ(0, ioctx.omap_get_keys(group_id, "", 10, &keys));
+ ASSERT_EQ(1U, keys.size());
+
+ auto it = keys.begin();
+ ASSERT_EQ("snapshot_" + snap.id, *it);
+
+ // Remove the snapshot
+ ASSERT_EQ(0, group_snap_remove(&ioctx, group_id, snap_id));
+}
+
TEST_F(TestClsRbd, group_snap_get_by_id) {
librados::IoCtx ioctx;