]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rbd: Implement images in consistency groups
authorVictor Denisov <denisovenator@gmail.com>
Sun, 24 Jul 2016 00:31:28 +0000 (17:31 -0700)
committerVictor Denisov <denisovenator@gmail.com>
Sat, 6 Aug 2016 02:09:28 +0000 (19:09 -0700)
Signed-off-by: Victor Denisov <denisovenator@gmail.com>
src/cls/rbd/cls_rbd.cc
src/cls/rbd/cls_rbd_client.cc
src/cls/rbd/cls_rbd_client.h
src/cls/rbd/cls_rbd_types.cc
src/cls/rbd/cls_rbd_types.h
src/test/cls_rbd/test_cls_rbd.cc

index db6ad3345d04b1976d171109dfdec45aebf4264f..f7bf2e33411012acfd111d186f36649402ba1f5e 100644 (file)
@@ -138,6 +138,12 @@ cls_method_handle_t h_group_create;
 cls_method_handle_t h_group_dir_list;
 cls_method_handle_t h_group_dir_add;
 cls_method_handle_t h_group_dir_remove;
+cls_method_handle_t h_group_image_remove;
+cls_method_handle_t h_group_image_list;
+cls_method_handle_t h_group_image_set;
+cls_method_handle_t h_image_add_group;
+cls_method_handle_t h_image_remove_group;
+cls_method_handle_t h_image_get_group;
 
 #define RBD_MAX_KEYS_READ 64
 #define RBD_SNAP_KEY_PREFIX "snapshot_"
@@ -4412,6 +4418,280 @@ int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   return 0;
 }
 
+/**
+ * Set state of an image in the consistency group.
+ *
+ * Input:
+ * @param image_status (cls::rbd::GroupImageStatus)
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_image_set");
+
+  cls::rbd::GroupImageStatus st;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(st, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  string image_key = st.spec.image_key();
+
+  bufferlist image_val_bl;
+  ::encode(st.state, image_val_bl);
+  int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Remove reference to an image from the consistency group.
+ *
+ * Input:
+ * @param spec (cls::rbd::GroupImageSpec)
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int group_image_remove(cls_method_context_t hctx,
+                       bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_image_remove");
+  cls::rbd::GroupImageSpec spec;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(spec, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  string image_key = spec.image_key();
+
+  int r = cls_cxx_map_remove_key(hctx, image_key);
+  if (r < 0) {
+    CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+/*
+ * List images in the consistency group.
+ *
+ * Input:
+ * @param start_after which name to begin listing after
+ *        (use the empty string to start at the beginning)
+ * @param max_return the maximum number of names to list
+ *
+ * Output:
+ * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
+ * @return 0 on success, negative error code on failure
+ */
+int group_image_list(cls_method_context_t hctx,
+                     bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_image_list");
+  cls::rbd::GroupImageSpec start_after;
+  uint64_t max_return;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(start_after, iter);
+    ::decode(max_return, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  int max_read = RBD_MAX_KEYS_READ;
+  std::map<string, bufferlist> vals;
+  string last_read = start_after.image_key();
+  std::vector<cls::rbd::GroupImageStatus> res;
+  int keys_read;
+  do {
+    keys_read = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
+                                    max_read, &vals);
+    if (keys_read < 0)
+      return keys_read;
+
+    for (map<string, bufferlist>::iterator it = vals.begin();
+        it != vals.end() && res.size() < max_return; ++it) {
+
+      bufferlist::iterator iter = it->second.begin();
+      cls::rbd::GroupImageLinkState state;
+      try {
+       ::decode(state, iter);
+      } catch (const buffer::error &err) {
+       CLS_ERR("error decoding state for image: %s", it->first.c_str());
+       return -EIO;
+      }
+      cls::rbd::GroupImageSpec spec;
+      int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
+      if (r < 0)
+       return r;
+
+      CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
+                                                spec.pool_id,
+                                                (int)state);
+      res.push_back(cls::rbd::GroupImageStatus(spec, state));
+    }
+    if (res.size() > 0) {
+      last_read = res.rbegin()->spec.image_key();
+    }
+
+  } while ((keys_read == RBD_MAX_KEYS_READ) && (res.size() < max_return));
+  ::encode(res, *out);
+
+  return 0;
+}
+
+/**
+ * Reference the consistency group this image belongs to.
+ *
+ * Input:
+ * @param group_id (std::string)
+ * @param pool_id (int64_t)
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int image_add_group(cls_method_context_t hctx,
+                   bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "image_add_group");
+  cls::rbd::GroupSpec new_group;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(new_group, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  bufferlist existing_refbl;
+
+  int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
+  if (r == 0) {
+    // If we are trying to link this image to the same group then return success.
+    // If this image already belongs to another group then abort.
+    cls::rbd::GroupSpec old_group;
+    try {
+      bufferlist::iterator iter = existing_refbl.begin();
+      ::decode(old_group, iter);
+    } catch (const buffer::error &err) {
+      return -EINVAL;
+    }
+
+    if ((old_group.group_id != new_group.group_id)
+       || (old_group.pool_id != new_group.pool_id)) {
+      return -EEXIST;
+    } else {
+      return 0; // In this case the values are already correct
+    }
+  } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
+    return r;
+  }
+
+  bufferlist refbl;
+  ::encode(new_group, refbl);
+  r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
+
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Remove image's pointer to the consistency group.
+ *
+ * Input:
+ * @param cg_id (std::string)
+ * @param pool_id (int64_t)
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int image_remove_group(cls_method_context_t hctx,
+                      bufferlist *in,
+                      bufferlist *out)
+{
+  CLS_LOG(20, "image_remove_group");
+  cls::rbd::GroupSpec spec;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(spec, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  bufferlist refbl;
+  int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
+  if (r < 0) {
+    return r;
+  }
+
+  cls::rbd::GroupSpec ref_spec;
+  bufferlist::iterator iter = refbl.begin();
+  try {
+    ::decode(ref_spec, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
+    return -EBADF;
+  }
+
+  r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Retrieve the id and pool of the consistency group this image belongs to.
+ *
+ * Input:
+ * none
+ *
+ * Output:
+ * @param GroupSpec
+ * @return 0 on success, negative error code on failure
+ */
+int image_get_group(cls_method_context_t hctx,
+                   bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "image_get_group");
+  bufferlist refbl;
+  int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+
+  cls::rbd::GroupSpec spec;
+
+  if (r != -ENOENT) {
+    bufferlist::iterator iter = refbl.begin();
+    try {
+      ::decode(spec, iter);
+    } catch (const buffer::error &err) {
+      return -EINVAL;
+    }
+  }
+
+  ::encode(spec, *out);
+  return 0;
+}
+
 void __cls_init()
 {
   CLS_LOG(20, "Loaded rbd class!");
@@ -4645,5 +4925,23 @@ void __cls_init()
   cls_register_cxx_method(h_class, "group_dir_remove",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          group_dir_remove, &h_group_dir_remove);
+  cls_register_cxx_method(h_class, "group_image_remove",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         group_image_remove, &h_group_image_remove);
+  cls_register_cxx_method(h_class, "group_image_list",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         group_image_list, &h_group_image_list);
+  cls_register_cxx_method(h_class, "group_image_set",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         group_image_set, &h_group_image_set);
+  cls_register_cxx_method(h_class, "image_add_group",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         image_add_group, &h_image_add_group);
+  cls_register_cxx_method(h_class, "image_remove_group",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         image_remove_group, &h_image_remove_group);
+  cls_register_cxx_method(h_class, "image_get_group",
+                         CLS_METHOD_RD,
+                         image_get_group, &h_image_get_group);
   return;
 }
index a8fd5a1a6670a389c6b21273e1f3e4ac0ff66ee2..89c56818b58323e6637aaefe9c821042b5576e51 100644 (file)
@@ -73,6 +73,8 @@ namespace librbd {
       bufferlist parent_bl;
       ::encode(snap, parent_bl);
       op->exec("rbd", "get_parent", parent_bl);
+
+      op->exec("rbd", "image_get_group", empty_bl);
       rados::cls::lock::get_lock_info_start(op, RBD_LOCK_NAME);
     }
 
@@ -82,7 +84,8 @@ namespace librbd {
                                     std::map<rados::cls::lock::locker_id_t,
                                              rados::cls::lock::locker_info_t> *lockers,
                                     bool *exclusive_lock, std::string *lock_tag,
-                                    ::SnapContext *snapc, parent_info *parent) {
+                                   ::SnapContext *snapc, parent_info *parent,
+                                   cls::rbd::GroupSpec *group_ref) {
       assert(size);
       assert(features);
       assert(incompatible_features);
@@ -90,6 +93,7 @@ namespace librbd {
       assert(exclusive_lock);
       assert(snapc);
       assert(parent);
+      assert(group_ref);
 
       try {
        uint8_t order;
@@ -106,6 +110,8 @@ namespace librbd {
        ::decode(parent->spec.image_id, *it);
        ::decode(parent->spec.snap_id, *it);
        ::decode(parent->overlap, *it);
+       // group_image_get_group
+       ::decode(*group_ref, *it);
 
        // get_lock_info
        ClsLockType lock_type = LOCK_NONE;
@@ -131,7 +137,8 @@ namespace librbd {
                              bool *exclusive_lock,
                             string *lock_tag,
                             ::SnapContext *snapc,
-                            parent_info *parent)
+                            parent_info *parent,
+                            cls::rbd::GroupSpec *group_ref)
     {
       librados::ObjectReadOperation op;
       get_mutable_metadata_start(&op, read_only);
@@ -146,7 +153,7 @@ namespace librbd {
       return get_mutable_metadata_finish(&it, size, features,
                                          incompatible_features, lockers,
                                          exclusive_lock, lock_tag, snapc,
-                                         parent);
+                                         parent, group_ref);
     }
 
     int create_image(librados::IoCtx *ioctx, const std::string &oid,
@@ -1507,5 +1514,83 @@ namespace librbd {
       return ioctx->exec(oid, "rbd", "group_dir_remove", in, out);
     }
 
+    int group_image_remove(librados::IoCtx *ioctx, const std::string &oid,
+                          const cls::rbd::GroupImageSpec &spec)
+    {
+      bufferlist bl, bl2;
+      ::encode(spec, bl);
+
+      return ioctx->exec(oid, "rbd", "group_image_remove", bl, bl2);
+    }
+
+    int group_image_list(librados::IoCtx *ioctx,
+                        const std::string &oid, const cls::rbd::GroupImageSpec &start,
+                        uint64_t max_return,
+                        std::vector<cls::rbd::GroupImageStatus>& images)
+    {
+      bufferlist bl, bl2;
+      ::encode(start, bl);
+      ::encode(max_return, bl);
+
+      int r = ioctx->exec(oid, "rbd", "group_image_list", bl, bl2);
+      if (r < 0)
+       return r;
+
+      bufferlist::iterator iter = bl2.begin();
+      try {
+       ::decode(images, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int group_image_set(librados::IoCtx *ioctx, const std::string &oid,
+                       const cls::rbd::GroupImageStatus &st)
+    {
+      bufferlist bl, bl2;
+      ::encode(st, bl);
+
+      return ioctx->exec(oid, "rbd", "group_image_set", bl, bl2);
+    }
+
+    int image_add_group(librados::IoCtx *ioctx, const std::string &oid,
+                       const cls::rbd::GroupSpec &group_spec)
+    {
+      bufferlist bl, bl2;
+      ::encode(group_spec, bl);
+
+      return ioctx->exec(oid, "rbd", "image_add_group", bl, bl2);
+    }
+
+    int image_remove_group(librados::IoCtx *ioctx, const std::string &oid,
+                          const cls::rbd::GroupSpec &group_spec)
+    {
+      bufferlist bl, bl2;
+      ::encode(group_spec, bl);
+
+      return ioctx->exec(oid, "rbd", "image_remove_group", bl, bl2);
+    }
+
+    int image_get_group(librados::IoCtx *ioctx, const std::string &oid,
+                       cls::rbd::GroupSpec &group_spec)
+    {
+      bufferlist in, out;
+
+      int r = ioctx->exec(oid, "rbd", "image_get_group", in, out);
+      if (r < 0)
+       return r;
+
+      bufferlist::iterator iter = out.begin();
+      try {
+       ::decode(group_spec, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
   } // namespace cls_client
 } // namespace librbd
index e073dfdf659394f7432a68f8dcfd74e125957ed5..e145514565d3871f60cde19a4d6bf0e239321997 100644 (file)
@@ -35,7 +35,8 @@ namespace librbd {
                                     std::map<rados::cls::lock::locker_id_t,
                                              rados::cls::lock::locker_info_t> *lockers,
                                     bool *exclusive_lock, std::string *lock_tag,
-                                    ::SnapContext *snapc, parent_info *parent);
+                                   ::SnapContext *snapc, parent_info *parent,
+                                   cls::rbd::GroupSpec *uplink);
     int get_mutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
                             bool read_only, uint64_t *size, uint64_t *features,
                             uint64_t *incompatible_features,
@@ -44,7 +45,8 @@ namespace librbd {
                             bool *exclusive_lock,
                             std::string *lock_tag,
                             ::SnapContext *snapc,
-                            parent_info *parent);
+                            parent_info *parent,
+                            cls::rbd::GroupSpec *uplink);
 
     // low-level interface (mainly for testing)
     int create_image(librados::IoCtx *ioctx, const std::string &oid,
@@ -301,6 +303,20 @@ namespace librbd {
                   const std::string &name, const std::string &id);
     int group_dir_remove(librados::IoCtx *ioctx, const std::string &oid,
                      const std::string &name, const std::string &id);
+    int group_image_remove(librados::IoCtx *ioctx, const std::string &oid,
+                          const cls::rbd::GroupImageSpec &spec);
+    int group_image_list(librados::IoCtx *ioctx, const std::string &oid,
+                        const cls::rbd::GroupImageSpec &start,
+                        uint64_t max_return,
+                        std::vector<cls::rbd::GroupImageStatus>& images);
+    int group_image_set(librados::IoCtx *ioctx, const std::string &oid,
+                       const cls::rbd::GroupImageStatus &st);
+    int image_add_group(librados::IoCtx *ioctx, const std::string &oid,
+                       const cls::rbd::GroupSpec &group_spec);
+    int image_remove_group(librados::IoCtx *ioctx, const std::string &oid,
+                          const cls::rbd::GroupSpec &group_spec);
+    int image_get_group(librados::IoCtx *ioctx, const std::string &oid,
+                       cls::rbd::GroupSpec &s);
 
   } // namespace cls_client
 } // namespace librbd
index 5891e566a2a5038c2d92a0e18d2d727dcf7d2ee2..efbe47fd92ba60dca39ba5f6f51d3437d785eac2 100644 (file)
@@ -211,5 +211,110 @@ std::ostream& operator<<(std::ostream& os, const MirrorImageStatus& status) {
   return os;
 }
 
+void GroupImageSpec::encode(bufferlist &bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(image_id, bl);
+  ::encode(pool_id, bl);
+  ENCODE_FINISH(bl);
+}
+
+void GroupImageSpec::decode(bufferlist::iterator &it) {
+  DECODE_START(1, it);
+  ::decode(image_id, it);
+  ::decode(pool_id, it);
+  DECODE_FINISH(it);
+}
+
+void GroupImageSpec::dump(Formatter *f) const {
+  f->dump_string("image_id", image_id);
+  f->dump_int("pool_id", pool_id);
+}
+
+int GroupImageSpec::from_key(const std::string &image_key,
+                                   GroupImageSpec *spec) {
+  if (nullptr == spec) return -EINVAL;
+  int prefix_len = cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX.size();
+  std::string data_string = image_key.substr(prefix_len,
+                                            image_key.size() - prefix_len);
+  size_t p = data_string.find("_");
+  if (std::string::npos == p) {
+    return -EIO;
+  }
+  data_string[p] = ' ';
+
+  istringstream iss(data_string);
+  uint64_t pool_id;
+  string image_id;
+  iss >> std::hex >> pool_id >> image_id;
+
+  spec->image_id = image_id;
+  spec->pool_id = pool_id;
+  return 0;
+}
+
+std::string GroupImageSpec::image_key() {
+  if (-1 == pool_id)
+    return "";
+  else {
+    ostringstream oss;
+    oss << RBD_GROUP_IMAGE_KEY_PREFIX << std::setw(16)
+       << std::setfill('0') << std::hex << pool_id << "_" << image_id;
+    return oss.str();
+  }
+}
+
+void GroupImageStatus::encode(bufferlist &bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(spec, bl);
+  ::encode(state, bl);
+  ENCODE_FINISH(bl);
+}
+
+void GroupImageStatus::decode(bufferlist::iterator &it) {
+  DECODE_START(1, it);
+  ::decode(spec, it);
+  ::decode(state, it);
+  DECODE_FINISH(it);
+}
+
+std::string GroupImageStatus::state_to_string() const {
+  std::stringstream ss;
+  if (state == GROUP_IMAGE_LINK_STATE_INCOMPLETE) {
+    ss << "incomplete";
+  }
+  if (state == GROUP_IMAGE_LINK_STATE_ATTACHED) {
+    ss << "attached";
+  }
+  return ss.str();
+}
+
+void GroupImageStatus::dump(Formatter *f) const {
+  spec.dump(f);
+  f->dump_string("state", state_to_string());
+}
+
+void GroupSpec::encode(bufferlist &bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(pool_id, bl);
+  ::encode(group_id, bl);
+  ENCODE_FINISH(bl);
+}
+
+void GroupSpec::decode(bufferlist::iterator &it) {
+  DECODE_START(1, it);
+  ::decode(pool_id, it);
+  ::decode(group_id, it);
+  DECODE_FINISH(it);
+}
+
+void GroupSpec::dump(Formatter *f) const {
+  f->dump_string("group_id", group_id);
+  f->dump_int("pool_id", pool_id);
+}
+
+bool GroupSpec::is_valid() const {
+  return (!group_id.empty()) && (pool_id != -1);
+}
+
 } // namespace rbd
 } // namespace cls
index f98a942e2adc9155e4990046989cbf892eeed3e4..83030ba09a2af3a4d65b3a2aa10a0eac627d1ba6 100644 (file)
@@ -7,16 +7,20 @@
 #include "include/int_types.h"
 #include "include/buffer.h"
 #include "include/encoding.h"
+#include "include/stringify.h"
 #include "include/utime.h"
 #include <iosfwd>
 #include <string>
 
+#define RBD_GROUP_REF "rbd_group_ref"
+
 namespace ceph { class Formatter; }
 
 namespace cls {
 namespace rbd {
 
 static const uint32_t MAX_OBJECT_MAP_OBJECT_COUNT = 256000000;
+static const string RBD_GROUP_IMAGE_KEY_PREFIX = "image_";
 
 enum MirrorMode {
   MIRROR_MODE_DISABLED = 0,
@@ -24,6 +28,24 @@ enum MirrorMode {
   MIRROR_MODE_POOL     = 2
 };
 
+enum GroupImageLinkState {
+  GROUP_IMAGE_LINK_STATE_ATTACHED,
+  GROUP_IMAGE_LINK_STATE_INCOMPLETE
+};
+
+inline void encode(const GroupImageLinkState &state, bufferlist& bl,
+                  uint64_t features=0)
+{
+  ::encode(static_cast<uint8_t>(state), bl);
+}
+
+inline void decode(GroupImageLinkState &state, bufferlist::iterator& it)
+{
+  uint8_t int_state;
+  ::decode(int_state, it);
+  state = static_cast<GroupImageLinkState>(int_state);
+}
+
 struct MirrorPeer {
   MirrorPeer() {
   }
@@ -134,6 +156,66 @@ std::ostream& operator<<(std::ostream& os, const MirrorImageStatusState& state);
 
 WRITE_CLASS_ENCODER(MirrorImageStatus);
 
+struct GroupImageSpec {
+  GroupImageSpec() {}
+
+  GroupImageSpec(const std::string &image_id, int64_t pool_id)
+    : image_id(image_id), pool_id(pool_id) {}
+
+  static int from_key(const std::string &image_key, GroupImageSpec *spec);
+
+  std::string image_id;
+  int64_t pool_id = -1;
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+
+  std::string image_key();
+
+};
+
+WRITE_CLASS_ENCODER(GroupImageSpec);
+
+struct GroupImageStatus {
+  GroupImageStatus() {}
+  GroupImageStatus(const std::string &image_id,
+                  int64_t pool_id,
+                  GroupImageLinkState state)
+    : spec(image_id, pool_id), state(state) {}
+
+  GroupImageStatus(GroupImageSpec spec,
+                  GroupImageLinkState state)
+    : spec(spec), state(state) {}
+
+  GroupImageSpec spec;
+  GroupImageLinkState state = GROUP_IMAGE_LINK_STATE_INCOMPLETE;
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+
+  std::string state_to_string() const;
+};
+
+WRITE_CLASS_ENCODER(GroupImageStatus);
+
+struct GroupSpec {
+  GroupSpec() {}
+  GroupSpec(const std::string &group_id, int64_t pool_id)
+    : group_id(group_id), pool_id(pool_id) {}
+
+  std::string group_id;
+  int64_t pool_id = -1;
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+  bool is_valid() const;
+};
+
+WRITE_CLASS_ENCODER(GroupSpec);
+
 } // namespace rbd
 } // namespace cls
 
index 0022cf5593bf5ab7db754681257a786910014895..2ff5628c854547e810960a88472317ecee1fbe2c 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/types.h"
 #include "include/rados/librados.h"
 #include "include/rbd/object_map_types.h"
+#include "include/rbd_types.h"
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd.h"
 #include "cls/rbd/cls_rbd_client.h"
@@ -953,18 +954,19 @@ TEST_F(TestClsRbd, get_mutable_metadata_features)
   std::string lock_tag;
   ::SnapContext snapc;
   parent_info parent;
+  cls::rbd::GroupSpec group_spec;
 
   ASSERT_EQ(0, get_mutable_metadata(&ioctx, oid, true, &size, &features,
                                    &incompatible_features, &lockers,
                                    &exclusive_lock, &lock_tag, &snapc,
-                                    &parent));
+                                    &parent, &group_spec));
   ASSERT_EQ(static_cast<uint64_t>(RBD_FEATURE_EXCLUSIVE_LOCK), features);
   ASSERT_EQ(0U, incompatible_features);
 
   ASSERT_EQ(0, get_mutable_metadata(&ioctx, oid, false, &size, &features,
                                     &incompatible_features, &lockers,
                                     &exclusive_lock, &lock_tag, &snapc,
-                                    &parent));
+                                   &parent, &group_spec));
   ASSERT_EQ(static_cast<uint64_t>(RBD_FEATURE_EXCLUSIVE_LOCK), features);
   ASSERT_EQ(static_cast<uint64_t>(RBD_FEATURE_EXCLUSIVE_LOCK),
            incompatible_features);
@@ -1835,3 +1837,181 @@ TEST_F(TestClsRbd, group_dir_remove_missing) {
   ASSERT_EQ(0, ioctx.omap_get_keys(RBD_GROUP_DIRECTORY, "", 10, &keys));
   ASSERT_EQ(0U, keys.size());
 }
+
+void test_image_add(librados::IoCtx &ioctx, const string& group_id,
+                   const string& image_id, int64_t pool_id) {
+
+  cls::rbd::GroupImageStatus st(image_id, pool_id,
+                              cls::rbd::GROUP_IMAGE_LINK_STATE_INCOMPLETE);
+  ASSERT_EQ(0, group_image_set(&ioctx, group_id, st));
+
+  set<string> keys;
+  ASSERT_EQ(0, ioctx.omap_get_keys(group_id, "", 10, &keys));
+
+  auto it = keys.begin();
+  ASSERT_EQ(2U, keys.size());
+
+  string image_key = cls::rbd::GroupImageSpec(image_id, pool_id).image_key();
+  ASSERT_EQ(image_key, *it);
+  ++it;
+  ASSERT_EQ("snap_seq", *it);
+}
+
+TEST_F(TestClsRbd, group_image_add) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string group_id = "group_id";
+  ASSERT_EQ(0, group_create(&ioctx, group_id));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "image_id";
+  test_image_add(ioctx, group_id, image_id, pool_id);
+}
+
+TEST_F(TestClsRbd, group_image_remove) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string group_id = "group_id";
+  ASSERT_EQ(0, group_create(&ioctx, group_id));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "image_id";
+  test_image_add(ioctx, group_id, image_id, pool_id);
+
+  cls::rbd::GroupImageSpec spec(image_id, pool_id);
+  ASSERT_EQ(0, group_image_remove(&ioctx, group_id, spec));
+  set<string> keys;
+  ASSERT_EQ(0, ioctx.omap_get_keys(group_id, "", 10, &keys));
+  ASSERT_EQ(1U, keys.size());
+  ASSERT_EQ("snap_seq", *(keys.begin()));
+}
+
+TEST_F(TestClsRbd, group_image_list) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string group_id = "group_id";
+  ASSERT_EQ(0, group_create(&ioctx, group_id));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "imageid"; // Image id shouldn't contain underscores
+  test_image_add(ioctx, group_id, image_id, pool_id);
+
+  vector<cls::rbd::GroupImageStatus> images;
+  cls::rbd::GroupImageSpec empty_image_spec = cls::rbd::GroupImageSpec();
+  ASSERT_EQ(0, group_image_list(&ioctx, group_id, empty_image_spec, 1024, images));
+  ASSERT_EQ(1U, images.size());
+  ASSERT_EQ(image_id, images[0].spec.image_id);
+  ASSERT_EQ(pool_id, images[0].spec.pool_id);
+  ASSERT_EQ(cls::rbd::GROUP_IMAGE_LINK_STATE_INCOMPLETE, images[0].state);
+
+  cls::rbd::GroupImageStatus last_image = *images.rbegin();
+  ASSERT_EQ(0, group_image_list(&ioctx, group_id, last_image.spec, 1024, images));
+  ASSERT_EQ(0U, images.size());
+}
+
+TEST_F(TestClsRbd, group_image_clean) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  string group_id = "group_id1";
+  ASSERT_EQ(0, group_create(&ioctx, group_id));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "image_id";
+  test_image_add(ioctx, group_id, image_id, pool_id);
+
+  cls::rbd::GroupImageStatus incomplete_st(image_id, pool_id,
+                              cls::rbd::GROUP_IMAGE_LINK_STATE_INCOMPLETE);
+
+  ASSERT_EQ(0, group_image_set(&ioctx, group_id, incomplete_st));
+  // Set to dirty first in order to make sure that group_image_clean
+  // actually does something.
+  cls::rbd::GroupImageStatus attached_st(image_id, pool_id,
+                              cls::rbd::GROUP_IMAGE_LINK_STATE_ATTACHED);
+  ASSERT_EQ(0, group_image_set(&ioctx, group_id, attached_st));
+
+  string image_key = cls::rbd::GroupImageSpec(image_id, pool_id).image_key();
+
+  map<string, bufferlist> vals;
+  ASSERT_EQ(0, ioctx.omap_get_vals(group_id, "", 10, &vals));
+
+  cls::rbd::GroupImageLinkState ref_state;
+  bufferlist::iterator it = vals[image_key].begin();
+  ::decode(ref_state, it);
+  ASSERT_EQ(cls::rbd::GROUP_IMAGE_LINK_STATE_ATTACHED, ref_state);
+}
+
+TEST_F(TestClsRbd, image_add_group) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "imageid";
+
+  ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
+                           RBD_FEATURE_LAYERING, image_id));
+
+  string group_id = "group_id";
+
+  cls::rbd::GroupSpec spec(group_id, pool_id);
+  ASSERT_EQ(0, image_add_group(&ioctx, image_id, spec));
+
+  map<string, bufferlist> vals;
+  ASSERT_EQ(0, ioctx.omap_get_vals(image_id, "", RBD_GROUP_REF, 10, &vals));
+
+  cls::rbd::GroupSpec val_spec;
+  bufferlist::iterator it = vals[RBD_GROUP_REF].begin();
+  ::decode(val_spec, it);
+
+  ASSERT_EQ(group_id, val_spec.group_id);
+  ASSERT_EQ(pool_id, val_spec.pool_id);
+}
+
+TEST_F(TestClsRbd, image_remove_group) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "image_id";
+
+  ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
+                           RBD_FEATURE_LAYERING, image_id));
+
+  string group_id = "group_id";
+
+  cls::rbd::GroupSpec spec(group_id, pool_id);
+  ASSERT_EQ(0, image_add_group(&ioctx, image_id, spec));
+  // Add reference in order to make sure that image_remove_group actually
+  // does something.
+  ASSERT_EQ(0, image_remove_group(&ioctx, image_id, spec));
+
+  map<string, bufferlist> vals;
+  ASSERT_EQ(0, ioctx.omap_get_vals(image_id, "", RBD_GROUP_REF, 10, &vals));
+
+  ASSERT_EQ(0U, vals.size());
+}
+
+TEST_F(TestClsRbd, image_get_group) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  int64_t pool_id = ioctx.get_id();
+  string image_id = "imageidgroupspec";
+
+  ASSERT_EQ(0, create_image(&ioctx, image_id, 2<<20, 0,
+                           RBD_FEATURE_LAYERING, image_id));
+
+  string group_id = "group_id_get_group_spec";
+
+  cls::rbd::GroupSpec spec_add(group_id, pool_id);
+  ASSERT_EQ(0, image_add_group(&ioctx, image_id, spec_add));
+
+  cls::rbd::GroupSpec spec;
+  ASSERT_EQ(0, image_get_group(&ioctx, image_id, spec));
+
+  ASSERT_EQ(group_id, spec.group_id);
+  ASSERT_EQ(pool_id, spec.pool_id);
+}