]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rbd: add methods for interacting with the new header format
authorJosh Durgin <josh.durgin@inktank.com>
Fri, 8 Jun 2012 05:54:45 +0000 (22:54 -0700)
committerJosh Durgin <josh.durgin@inktank.com>
Fri, 8 Jun 2012 23:38:19 +0000 (16:38 -0700)
Add the client side (cls_rbd_client) for testing as well.
librbd will use the functions in cls_rbd_client to interact with cls_rbd.

Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
src/Makefile.am
src/cls_rbd.cc
src/include/rbd_types.h
src/librbd/cls_rbd_client.cc [new file with mode: 0644]
src/librbd/cls_rbd_client.h [new file with mode: 0644]
src/test/rbd/test_cls_rbd.cc [new file with mode: 0644]

index 74df14d741dd7b784b97f3a3554ecbb6dd6e91c5..0de11b6bda50ba804660edc5f7d41369a2264c04 100644 (file)
@@ -701,6 +701,13 @@ test_librbd_fsx_LDADD =  librbd.la librados.la
 test_librbd_fsx_CFLAGS = ${AM_CFLAGS} -Wno-format
 bin_DEBUGPROGRAMS += test_librbd_fsx
 
+test_cls_rbd_SOURCES = test/rbd/test_cls_rbd.cc \
+       test/rados-api/test.cc \
+       librbd/cls_rbd_client.cc
+test_cls_rbd_LDADD = librados.la ${UNITTEST_STATIC_LDADD}
+test_cls_rbd_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
+bin_DEBUGPROGRAMS += test_cls_rbd
+
 test_rados_api_io_SOURCES = test/rados-api/io.cc test/rados-api/test.cc
 test_rados_api_io_LDFLAGS = ${AM_LDFLAGS}
 test_rados_api_io_LDADD =  librados.la ${UNITTEST_STATIC_LDADD}
@@ -1365,6 +1372,7 @@ noinst_HEADERS = \
        librados/IoCtxImpl.h\
        librados/PoolAsyncCompletionImpl.h\
        librados/RadosClient.h\
+       librbd/cls_rbd_client.h\
        librbd/LibrbdWriteback.h\
        logrotate.conf\
        json_spirit/json_spirit.h\
index c74b514dc26c4c2a0cc543c144a0f0183122437e..46c78fcb6be682de54b23548e97c0dd1f0184aa5 100644 (file)
@@ -1,25 +1,73 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#include <iostream>
-#include <string.h>
-#include <stdlib.h>
-#include <errno.h>
+/** \file
+ *
+ * This is an OSD class that implements methods for
+ * use with rbd.
+ *
+ * Most of these deal with the rbd header object. Methods prefixed
+ * with old_ deal with the original rbd design, in which clients read
+ * and interpreted the header object directly.
+ *
+ * The new format is meant to be opaque to clients - all their
+ * interactions with non-data objects should go through this
+ * class. The OSD class interface leaves the class to implement its
+ * own argument and payload serialization/deserialization, so for ease
+ * of implementation we use the existing ceph encoding/decoding
+ * methods. Something like json might be preferable, but the rbd
+ * kernel module has to be able understand format as well. The
+ * datatypes exposed to the clients are strings, unsigned integers,
+ * and vectors of those types. The on-wire format can be found in
+ * src/include/encoding.h.
+ *
+ * The methods for interacting with the new format document their
+ * parameters as the client sees them - it would be silly to mention
+ * in each one that they take an input and an output bufferlist.
+ */
 
 #include "include/types.h"
 #include "objclass/objclass.h"
 
 #include "include/rbd_types.h"
 
-CLS_VER(1,3)
+#include <algorithm>
+#include <cstring>
+#include <cstdlib>
+#include <errno.h>
+#include <iostream>
+#include <map>
+#include <sstream>
+#include <vector>
+
+CLS_VER(2,0)
 CLS_NAME(rbd)
 
 cls_handle_t h_class;
-cls_method_handle_t h_snapshots_list;
+cls_method_handle_t h_create;
+cls_method_handle_t h_get_features;
+cls_method_handle_t h_get_size;
+cls_method_handle_t h_set_size;
+cls_method_handle_t h_get_snapcontext;
+cls_method_handle_t h_get_object_prefix;
+cls_method_handle_t h_get_snapshot_name;
 cls_method_handle_t h_snapshot_add;
 cls_method_handle_t h_snapshot_remove;
+cls_method_handle_t h_old_snapshots_list;
+cls_method_handle_t h_old_snapshot_add;
+cls_method_handle_t h_old_snapshot_remove;
 cls_method_handle_t h_assign_bid;
 
+#define RBD_MAX_KEYS_READ 64
+#define RBD_SNAP_KEY_PREFIX "snapshot_"
+
+typedef struct cls_rbd_snap {
+  snapid_t id;
+  string name;
+  uint64_t image_size;
+  uint64_t features;
+} cls_rbd_snap;
+
 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
 {
   unsigned snap_count = 0;
@@ -53,7 +101,524 @@ static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
   return 0;
 }
 
-int snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static void key_from_snap_id(snapid_t snap_id, string *out)
+{
+  ostringstream oss;
+  oss << RBD_SNAP_KEY_PREFIX
+      << std::setw(16) << std::setfill('0') << std::hex << snap_id;
+  *out = oss.str();
+}
+
+static snapid_t snap_id_from_key(const string &key)
+{
+  istringstream iss(key);
+  uint64_t id;
+  iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
+  return id;
+}
+
+static int decode_snapshot_metadata(uint64_t snap_id, bufferlist *in,
+                                   cls_rbd_snap *snap)
+{
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(snap->name, iter);
+    ::decode(snap->image_size, iter);
+    ::decode(snap->features, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("error decoding snapshot metadata for snap_id: %llu", snap_id);
+    return -EIO;
+  }
+
+  snap->id = snap_id;
+
+  return 0;
+}
+
+static int read_snapshot_metadata(cls_method_context_t hctx, uint64_t snap_id,
+                                 cls_rbd_snap *snap_meta)
+{
+  bufferlist snapbl;
+  string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = cls_cxx_map_get_val(hctx, snapshot_key, &snapbl);
+  if (r < 0) {
+    CLS_ERR("error reading snapshot metadata: %d", r);
+    return r;
+  }
+
+  return decode_snapshot_metadata(snap_id, &snapbl, snap_meta);
+}
+
+template<typename T>
+static int read_key(cls_method_context_t hctx, const string &key, T *out)
+{
+  bufferlist bl;
+  int r = cls_cxx_map_get_val(hctx, key, &bl);
+  if (r < 0) {
+    CLS_ERR("error reading omap key %s: %d", key.c_str(), r);
+    return r;
+  }
+
+  try {
+    bufferlist::iterator it = bl.begin();
+    ::decode(*out, it);
+  } catch (const buffer::error &err) {
+    CLS_ERR("error decoding %s", key.c_str());
+    return -EIO;
+  }
+
+  return 0;
+}
+
+/**
+ * Initialize the header with basic metadata.
+ * Extra features may initialize more fields in the future.
+ * Everything is stored as key/value pairs as omaps in the header object.
+ *
+ * If features the OSD does not understand are requested, -ENOSYS is
+ * returned.
+ *
+ * Input:
+ * @param size number of bytes in the image (uint64_t)
+ * @param order bits to shift to determine the size of data objects (uint8_t)
+ * @param features what optional things this image will use (uint64_t)
+ * @param object_prefix a prefix for all the data objects
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string object_prefix;
+  uint64_t features, size;
+  uint8_t order;
+
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(size, iter);
+    ::decode(order, iter);
+    ::decode(features, iter);
+    ::decode(object_prefix, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
+         object_prefix.c_str(), size, order, features);
+
+  if (features & ~RBD_FEATURES_ALL) {
+    return -ENOSYS;
+  }
+
+  if (!object_prefix.size()) {
+    return -EINVAL;
+  }
+
+  bufferlist stored_prefixbl;
+  int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
+  if (r != -ENOENT) {
+    CLS_ERR("reading object_prefix returned %d", r);
+    return -EEXIST;
+  }
+
+  bufferlist sizebl;
+  ::encode(size, sizebl);
+  r = cls_cxx_map_set_val(hctx, "size", &sizebl);
+  if (r < 0)
+    return r;
+
+  bufferlist orderbl;
+  ::encode(order, orderbl);
+  r = cls_cxx_map_set_val(hctx, "order", &orderbl);
+  if (r < 0)
+    return r;
+
+  bufferlist featuresbl;
+  ::encode(features, featuresbl);
+  r = cls_cxx_map_set_val(hctx, "features", &featuresbl);
+  if (r < 0)
+    return r;
+
+  bufferlist object_prefixbl;
+  ::encode(object_prefix, object_prefixbl);
+  r = cls_cxx_map_set_val(hctx, "object_prefix", &object_prefixbl);
+  if (r < 0)
+    return r;
+
+  bufferlist snap_seqbl;
+  uint64_t snap_seq = 0;
+  ::encode(snap_seq, snap_seqbl);
+  r = cls_cxx_map_set_val(hctx, "snap_seq", &snap_seqbl);
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
+ *
+ * Output:
+ * @param features list of enabled features for the given snapshot (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t features, snap_id;
+
+  bufferlist::iterator iter = in->begin();
+  try {
+    ::decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "get_features snap_id=%llu", snap_id);
+
+  if (snap_id == CEPH_NOSNAP) {
+    int r = read_key(hctx, "features", &features);
+    if (r < 0)
+      return r;
+  } else {
+    cls_rbd_snap snap;
+    int r = read_snapshot_metadata(hctx, snap_id, &snap);
+    if (r < 0)
+      return r;
+
+    features = snap.features;
+  }
+
+  uint64_t incompatible = features & RBD_FEATURES_INCOMPATIBLE;
+  ::encode(features, *out);
+  ::encode(incompatible, *out);
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
+ *
+ * Output:
+ * @param order bits to shift to get the size of data objects (uint8_t)
+ * @param size size of the image in bytes for the given snapshot (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t snap_id, size;
+  uint8_t order;
+
+  bufferlist::iterator iter = in->begin();
+  try {
+    ::decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "get_size snap_id=%llu", snap_id);
+
+  int r = read_key(hctx, "order", &order);
+  if (r < 0)
+    return r;
+
+  if (snap_id == CEPH_NOSNAP) {
+    r = read_key(hctx, "size", &size);
+    if (r < 0)
+      return r;
+  } else {
+    cls_rbd_snap snap;
+    int r = read_snapshot_metadata(hctx, snap_id, &snap);
+    if (r < 0)
+      return r;
+
+    size = snap.image_size;
+  }
+
+  ::encode(order, *out);
+  ::encode(size, *out);
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param size new capacity of the image in bytes (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t size;
+
+  bufferlist::iterator iter = in->begin();
+  try {
+    ::decode(size, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "set_size size=%llu", size);
+
+  // check that size exists to make sure this is a header object
+  // that was created correctly
+  uint64_t orig_size;
+  int r = read_key(hctx, "size", &orig_size);
+  if (r < 0)
+    return r;
+
+  bufferlist sizebl;
+  ::encode(size, sizebl);
+
+  r = cls_cxx_map_set_val(hctx, "size", &sizebl);
+  if (r < 0) {
+    CLS_ERR("error writing snapshot metadata: %d", r);
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Get the information needed to create a rados snap context for doing
+ * I/O to the data objects. This must include all snapshots.
+ *
+ * Output:
+ * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
+ * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "get_snapcontext");
+
+  int r;
+  uint64_t max_read = RBD_MAX_KEYS_READ;
+  vector<snapid_t> snap_ids;
+  string last_read = RBD_SNAP_KEY_PREFIX;
+
+  do {
+    set<string> keys;
+    r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys);
+    if (r < 0)
+      return r;
+
+    for (set<string>::const_iterator it = keys.begin();
+        it != keys.end(); ++it) {
+      snapid_t snap_id = snap_id_from_key(*it);
+      snap_ids.push_back(snap_id);
+    }
+    if (keys.size() > 0)
+      last_read = *(keys.rbegin());
+  } while (r == RBD_MAX_KEYS_READ);
+
+  uint64_t snap_seq;
+  r = read_key(hctx, "snap_seq", &snap_seq);
+  if (r < 0)
+    return r;
+
+  // snap_ids must be descending in a snap context
+  std::reverse(snap_ids.begin(), snap_ids.end());
+
+  ::encode(snap_seq, *out);
+  ::encode(snap_ids, *out);
+
+  return 0;
+}
+
+/**
+ * Output:
+ * @param object_prefix prefix for data object names (string)
+ * @returns 0 on success, negative error code on failure
+ */
+int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "get_object_prefix");
+
+  string object_prefix;
+  int r = read_key(hctx, "object_prefix", &object_prefix);
+  if (r < 0)
+    return r;
+
+  ::encode(object_prefix, *out);
+
+  return 0;
+}
+
+int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t snap_id;
+
+  bufferlist::iterator iter = in->begin();
+  try {
+    ::decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "get_snapshot_name snap_id=%llu", snap_id);
+
+  if (snap_id == CEPH_NOSNAP)
+    return -EINVAL;
+
+  cls_rbd_snap snap;
+  int r = read_snapshot_metadata(hctx, snap_id, &snap);
+  if (r < 0)
+    return r;
+
+  ::encode(snap.name, *out);
+
+  return 0;
+}
+
+/**
+ * Adds a snapshot to an rbd header. Ensures the id and name are unique.
+ *
+ * Input:
+ * @param snap_name name of the snapshot (string)
+ * @param snap_id id of the snapshot (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure.
+ * @returns -ESTALE if the input snap_id is less than the image's snap_seq
+ * @returns -EEXIST if the id or name are already used by another snapshot
+ */
+int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  bufferlist snap_namebl, snap_idbl;
+  snapid_t snap_id;
+  string snap_name;
+
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(snap_name, iter);
+    ::decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_name.c_str(), snap_id.val);
+
+  if (snap_id > CEPH_MAXSNAP)
+    return -EINVAL;
+
+  uint64_t cur_snap_seq;
+  int r = read_key(hctx, "snap_seq", &cur_snap_seq);
+  if (r < 0)
+    return r;
+
+  // client lost a race with another snapshot creation.
+  // snap_seq must be monotonically increasing.
+  if (snap_id < cur_snap_seq)
+    return -ESTALE;
+
+  uint64_t size;
+  r = read_key(hctx, "size", &size);
+  if (r < 0)
+    return r;
+  uint64_t features;
+  r = read_key(hctx, "features", &features);
+  if (r < 0)
+    return r;
+
+  int max_read = RBD_MAX_KEYS_READ;
+  string last_read = RBD_SNAP_KEY_PREFIX;
+  do {
+    map<string, bufferlist> vals;
+    r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
+                             max_read, &vals);
+    if (r < 0)
+      return r;
+
+    for (map<string, bufferlist>::iterator it = vals.begin();
+        it != vals.end(); ++it) {
+      snapid_t cur_snap_id = snap_id_from_key(it->first);
+      cls_rbd_snap snap_meta;
+      r = decode_snapshot_metadata(cur_snap_id, &it->second, &snap_meta);
+      if (r < 0)
+       return r;
+
+      if (snap_meta.name == snap_name || snap_meta.id == snap_id) {
+       CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
+               snap_name.c_str(), snap_id.val,
+               snap_meta.name.c_str(), snap_meta.id.val);
+       return -EEXIST;
+      }
+    }
+
+    if (vals.size() > 0)
+      last_read = vals.rbegin()->first;
+  } while (r == RBD_MAX_KEYS_READ);
+
+  bufferlist snap_metabl, snap_seqbl;
+  ::encode(snap_name, snap_metabl);
+  ::encode(size, snap_metabl);
+  ::encode(features, snap_metabl);
+
+  ::encode(snap_id, snap_seqbl);
+
+  string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  map<string, bufferlist> vals;
+  vals["snap_seq"] = snap_seqbl;
+  vals[snapshot_key] = snap_metabl;
+  r = cls_cxx_map_set_vals(hctx, &vals);
+  if (r < 0) {
+    CLS_ERR("error writing snapshot metadata: %d", r);
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Removes a snapshot from an rbd header.
+ *
+ * Input:
+ * @param snap_id the id of the snapshot to remove (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  snapid_t snap_id;
+
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "snapshot_remove id=%llu", snap_id.val);
+
+  // check if the key exists. we can rely on remove_key doing this for
+  // us, since OMAPRMKEYS returns success if the key is not there.
+  // bug or feature? sounds like a bug, since tmap did not have this
+  // behavior, but cls_rgw may rely on it...
+  string snapshot_key;
+  bufferlist snapbl;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = cls_cxx_map_get_val(hctx, snapshot_key, &snapbl);
+  if (r == -ENOENT)
+    return -ENOENT;
+
+  r = cls_cxx_map_remove_key(hctx, snapshot_key);
+  if (r < 0) {
+    CLS_ERR("error writing snapshot metadata: %d", r);
+    return r;
+  }
+
+  return 0;
+}
+
+/****************************** Old format *******************************/
+
+int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   bufferlist bl;
   struct rbd_obj_header_ondisk *header;
@@ -87,7 +652,7 @@ int snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   return 0;
 }
 
-int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   bufferlist bl;
   struct rbd_obj_header_ondisk *header;
@@ -119,7 +684,6 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   }
   snap_name = s.c_str();
 
-
   const char *cur_snap_name;
   for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
     if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
@@ -164,7 +728,7 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   return 0;
 }
 
-int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   bufferlist bl;
   struct rbd_obj_header_ondisk *header;
@@ -296,13 +860,49 @@ void __cls_init()
   CLS_LOG(20, "Loaded rbd class!");
 
   cls_register("rbd", &h_class);
-  cls_register_cxx_method(h_class, "snap_list", CLS_METHOD_RD | CLS_METHOD_PUBLIC, snapshots_list, &h_snapshots_list);
-  cls_register_cxx_method(h_class, "snap_add", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, snapshot_add, &h_snapshot_add);
-  cls_register_cxx_method(h_class, "snap_remove", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, snapshot_remove, &h_snapshot_remove);
+  cls_register_cxx_method(h_class, "create",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         create, &h_create);
+  cls_register_cxx_method(h_class, "get_features",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         get_features, &h_get_features);
+  cls_register_cxx_method(h_class, "get_size",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         get_size, &h_get_size);
+  cls_register_cxx_method(h_class, "set_size",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         set_size, &h_set_size);
+  cls_register_cxx_method(h_class, "get_snapcontext",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         get_snapcontext, &h_get_snapcontext);
+  cls_register_cxx_method(h_class, "get_object_prefix",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         get_object_prefix, &h_get_object_prefix);
+  cls_register_cxx_method(h_class, "get_snapshot_name",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         get_snapshot_name, &h_get_snapshot_name);
+  cls_register_cxx_method(h_class, "snapshot_add",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         snapshot_add, &h_snapshot_add);
+  cls_register_cxx_method(h_class, "snapshot_remove",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         snapshot_remove, &h_snapshot_remove);
+
+  /* methods for the old format */
+  cls_register_cxx_method(h_class, "snap_list",
+                         CLS_METHOD_RD | CLS_METHOD_PUBLIC,
+                         old_snapshots_list, &h_old_snapshots_list);
+  cls_register_cxx_method(h_class, "snap_add",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         old_snapshot_add, &h_old_snapshot_add);
+  cls_register_cxx_method(h_class, "snap_remove",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         old_snapshot_remove, &h_old_snapshot_remove);
 
   /* assign a unique block id for rbd blocks */
-  cls_register_cxx_method(h_class, "assign_bid", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rbd_assign_bid, &h_assign_bid);
+  cls_register_cxx_method(h_class, "assign_bid",
+                         CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC,
+                         rbd_assign_bid, &h_assign_bid);
 
   return;
 }
-
index f165c40e3613124243533294f0f075234658c090..a91ca84a3074d82630374697235838ad243f4b8a 100644 (file)
 #include <sys/types.h>
 #endif
 
+
+
+/* New-style rbd image 'foo' consists of objects
+ *   rbd_header.foo         - image metadata
+ *   rbd_data.<id>.00000000
+ *   rbd_data.<id>.00000001
+ *   ...                    - data
+ */
+
+#define RBD_HEADER_PREFIX      "rbd_header."
+#define RBD_DATA_PREFIX        "rbd_data."
+
+#define RBD_FEATURES_INCOMPATIBLE 0
+#define RBD_FEATURES_ALL 0
+
 /*
- * rbd image 'foo' consists of objects
+ * old-style rbd image 'foo' consists of objects
  *   foo.rbd      - image metadata
- *   foo.00000000
- *   foo.00000001
+ *   rb.<idhi>.<idlo>.00000000
+ *   rb.<idhi>.<idlo>.00000001
  *   ...          - data
  */
 
diff --git a/src/librbd/cls_rbd_client.cc b/src/librbd/cls_rbd_client.cc
new file mode 100644 (file)
index 0000000..cb42eb7
--- /dev/null
@@ -0,0 +1,318 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+
+#include "cls_rbd_client.h"
+
+#include <errno.h>
+
+namespace librbd {
+  namespace cls_client {
+    int get_immutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+                              std::string *object_prefix, uint8_t *order)
+    {
+      assert(object_prefix);
+      assert(order);
+
+      librados::ObjectReadOperation op;
+      bufferlist bl, empty;
+      snapid_t snap = CEPH_NOSNAP;
+      ::encode(snap, bl);
+      op.exec("rbd", "get_size", bl);
+      op.exec("rbd", "get_object_prefix", empty);
+
+      bufferlist outbl;
+      ioctx->operate(oid, &op, &outbl);
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       uint64_t size;
+       ::decode(*order, iter);
+       ::decode(size, iter);
+       ::decode(*object_prefix, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int get_mutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+                            uint64_t *size, uint64_t *features,
+                            uint64_t *incompatible_features,
+                            ::SnapContext *snapc)
+    {
+      assert(size);
+      assert(features);
+      assert(incompatible_features);
+      assert(snapc);
+
+      librados::ObjectReadOperation op;
+      bufferlist sizebl, featuresbl, empty;
+      snapid_t snap = CEPH_NOSNAP;
+      ::encode(snap, sizebl);
+      ::encode(snap, featuresbl);
+      op.exec("rbd", "get_size", sizebl);
+      op.exec("rbd", "get_features", featuresbl);
+      op.exec("rbd", "get_snapcontext", empty);
+
+      bufferlist outbl;
+      ioctx->operate(oid, &op, &outbl);
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       uint8_t order;
+       ::decode(order, iter);
+       ::decode(*size, iter);
+       ::decode(*features, iter);
+       ::decode(*incompatible_features, iter);
+       ::decode(*snapc, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int create_image(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t size, uint8_t order, uint64_t features,
+                    const std::string &object_prefix)
+    {
+      bufferlist bl, bl2;
+      ::encode(size, bl);
+      ::encode(order, bl);
+      ::encode(features, bl);
+      ::encode(object_prefix, (bl));
+
+      return ioctx->exec(oid, "rbd", "create", bl, bl2);
+    }
+
+    int get_features(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t snap_id, uint64_t *features)
+    {
+      bufferlist inbl, outbl;
+      ::encode(snap_id, inbl);
+
+      int r = ioctx->exec(oid, "rbd", "get_features", inbl, outbl);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       ::decode(*features, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int get_object_prefix(librados::IoCtx *ioctx, const std::string &oid,
+                         std::string *object_prefix)
+    {
+      bufferlist inbl, outbl;
+      int r = ioctx->exec(oid, "rbd", "get_object_prefix", inbl, outbl);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       ::decode(*object_prefix, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int get_size(librados::IoCtx *ioctx, const std::string &oid,
+                uint64_t snap_id, uint64_t *size, uint8_t *order)
+    {
+      bufferlist inbl, outbl;
+      ::encode(snap_id, inbl);
+
+      int r = ioctx->exec(oid, "rbd", "get_size", inbl, outbl);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       ::decode(*order, iter);
+       ::decode(*size, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int set_size(librados::IoCtx *ioctx, const std::string &oid,
+                uint64_t size)
+    {
+      bufferlist bl, bl2;
+      ::encode(size, bl);
+
+      return ioctx->exec(oid, "rbd", "set_size", bl, bl2);
+    }
+
+    int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t snap_id, const std::string &snap_name)
+    {
+      bufferlist bl, bl2;
+      ::encode(snap_name, bl);
+      ::encode(snap_id, bl);
+
+      return ioctx->exec(oid, "rbd", "snapshot_add", bl, bl2);
+    }
+
+    int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+                       uint64_t snap_id)
+    {
+      bufferlist bl, bl2;
+      ::encode(snap_id, bl);
+
+      return ioctx->exec(oid, "rbd", "snapshot_remove", bl, bl2);
+    }
+
+    int get_snapcontext(librados::IoCtx *ioctx, const std::string &oid,
+                       ::SnapContext *snapc)
+    {
+      bufferlist inbl, outbl;
+
+      int r = ioctx->exec(oid, "rbd", "get_snapcontext", inbl, outbl);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       ::decode(*snapc, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      if (!snapc->is_valid())
+       return -EBADMSG;
+
+      return 0;
+    }
+
+    int snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+                     const std::vector<snapid_t> &ids,
+                     std::vector<string> *names,
+                     std::vector<uint64_t> *sizes,
+                     std::vector<uint64_t> *features)
+    {
+      names->clear();
+      names->resize(ids.size());
+      sizes->clear();
+      sizes->resize(ids.size());
+      features->clear();
+      features->resize(ids.size());
+      librados::ObjectReadOperation op;
+      for (vector<snapid_t>::const_iterator it = ids.begin();
+          it != ids.end(); ++it) {
+       bufferlist bl1, bl2, bl3;
+       uint64_t snap_id = it->val;
+       ::encode(snap_id, bl1);
+       op.exec("rbd", "get_snapshot_name", bl1);
+       ::encode(snap_id, bl2);
+       op.exec("rbd", "get_size", bl2);
+       ::encode(snap_id, bl3);
+       op.exec("rbd", "get_features", bl3);
+      }
+
+      bufferlist outbl;
+      int r = ioctx->operate(oid, &op, &outbl);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = outbl.begin();
+       for (size_t i = 0; i < ids.size(); ++i) {
+         uint8_t order;
+         uint64_t incompat_features;
+         ::decode((*names)[i], iter);
+         ::decode(order, iter);
+         ::decode((*sizes)[i], iter);
+         ::decode((*features)[i], iter);
+         ::decode(incompat_features, iter);
+       }
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int assign_bid(librados::IoCtx *ioctx, const std::string &oid,
+                  uint64_t *id)
+    {
+      bufferlist bl, out;
+      int r = ioctx->exec(oid, "rbd", "assign_bid", bl, out);
+      if (r < 0)
+       return r;
+
+      try {
+       bufferlist::iterator iter = out.begin();
+       ::decode(*id, iter);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
+    int old_snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+                        uint64_t snap_id, const std::string &snap_name)
+    {
+      bufferlist bl, bl2;
+      ::encode(snap_name, bl);
+      ::encode(snap_id, bl);
+
+      return ioctx->exec(oid, "rbd", "snap_add", bl, bl2);
+    }
+
+    int old_snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+                           const std::string &snap_name)
+    {
+      bufferlist bl, bl2;
+      ::encode(snap_name, bl);
+
+      return ioctx->exec(oid, "rbd", "snap_remove", bl, bl2);
+    }
+
+    int old_snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+                         std::vector<string> *names,
+                         std::vector<uint64_t> *sizes,
+                         ::SnapContext *snapc)
+    {
+      bufferlist bl, outbl;
+      int r = ioctx->exec(oid, "rbd", "snap_list", bl, outbl);
+      if (r < 0)
+       return r;
+
+      bufferlist::iterator iter = outbl.begin();
+      uint32_t num_snaps;
+      try {
+       ::decode(snapc->seq, iter);
+       ::decode(num_snaps, iter);
+
+       names->resize(num_snaps);
+       sizes->resize(num_snaps);
+       snapc->snaps.resize(num_snaps);
+
+       for (uint32_t i = 0; i < num_snaps; ++i) {
+         ::decode(snapc->snaps[i], iter);
+         ::decode((*sizes)[i], iter);
+         ::decode((*names)[i], iter);
+       }
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+  } // namespace cls_client
+} // namespace librbd
diff --git a/src/librbd/cls_rbd_client.h b/src/librbd/cls_rbd_client.h
new file mode 100644 (file)
index 0000000..7965448
--- /dev/null
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CLS_RBD_CLIENT_H
+#define CEPH_LIBRBD_CLS_RBD_CLIENT_H
+
+#include "common/snap_types.h"
+#include "include/rados.h"
+#include "include/rados/librados.hpp"
+#include "include/types.h"
+
+#include <string>
+#include <vector>
+
+namespace librbd {
+  namespace cls_client {
+
+    // high-level interface to the header
+    int get_immutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+                              std::string *object_prefix, uint8_t *order);
+    int get_mutable_metadata(librados::IoCtx *ioctx, const std::string &oid,
+                            uint64_t *size, uint64_t *features,
+                            uint64_t *incompatible_features,
+                            ::SnapContext *snapc);
+
+    // low-level interface (mainly for testing)
+    int create_image(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t size, uint8_t order, uint64_t features,
+                    const std::string &object_prefix);
+    int get_features(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t snap_id, uint64_t *features);
+    int get_object_prefix(librados::IoCtx *ioctx, const std::string &oid,
+                         std::string *object_prefix);
+    int get_size(librados::IoCtx *ioctx, const std::string &oid,
+                uint64_t snap_id, uint64_t *size, uint8_t *order);
+    int set_size(librados::IoCtx *ioctx, const std::string &oid,
+                uint64_t size);
+    int snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+                    uint64_t snap_id, const std::string &snap_name);
+    int snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+                       uint64_t snap_id);
+    int get_snapcontext(librados::IoCtx *ioctx, const std::string &oid,
+                       ::SnapContext *snapc);
+    int snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+                     const std::vector<snapid_t> &ids,
+                     std::vector<string> *names,
+                     std::vector<uint64_t> *sizes,
+                     std::vector<uint64_t> *features);
+    int assign_bid(librados::IoCtx *ioctx, const std::string &oid,
+                  uint64_t *id);
+
+
+    // class operations on the old format, kept for
+    // backwards compatability
+    int old_snapshot_add(librados::IoCtx *ioctx, const std::string &oid,
+                        uint64_t snap_id, const std::string &snap_name);
+    int old_snapshot_remove(librados::IoCtx *ioctx, const std::string &oid,
+                           const std::string &snap_name);
+    int old_snapshot_list(librados::IoCtx *ioctx, const std::string &oid,
+                         std::vector<string> *names,
+                         std::vector<uint64_t> *sizes,
+                         ::SnapContext *snapc);
+  } // namespace cls_client
+} // namespace librbd
+#endif // CEPH_LIBRBD_CLS_RBD_CLIENT_H
diff --git a/src/test/rbd/test_cls_rbd.cc b/src/test/rbd/test_cls_rbd.cc
new file mode 100644 (file)
index 0000000..a99051c
--- /dev/null
@@ -0,0 +1,337 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/snap_types.h"
+#include "include/encoding.h"
+#include "include/rados.h"
+#include "include/rados/librados.h"
+#include "include/types.h"
+#include "librbd/cls_rbd_client.h"
+
+#include "gtest/gtest.h"
+#include "test/rados-api/test.h"
+
+#include <errno.h>
+#include <string>
+#include <vector>
+
+using namespace std;
+using ::librbd::cls_client::create_image;
+using ::librbd::cls_client::get_features;
+using ::librbd::cls_client::get_size;
+using ::librbd::cls_client::get_object_prefix;
+using ::librbd::cls_client::set_size;
+using ::librbd::cls_client::snapshot_add;
+using ::librbd::cls_client::snapshot_remove;
+using ::librbd::cls_client::get_snapcontext;
+using ::librbd::cls_client::snapshot_list;
+
+TEST(cls_rbd, create)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  string oid = "testobj";
+  uint64_t size = 20 << 30;
+  uint64_t features = 0;
+  uint8_t order = 22;
+  string object_prefix = "foo";
+
+  ASSERT_EQ(0, create_image(&ioctx, oid, size, order,
+                           features, object_prefix));
+  ASSERT_EQ(-EEXIST, create_image(&ioctx, oid, size, order,
+                                 features, object_prefix));
+  ASSERT_EQ(0, ioctx.remove(oid));
+
+  ASSERT_EQ(-EINVAL, create_image(&ioctx, oid, size, order,
+                                 features, ""));
+  ASSERT_EQ(-ENOENT, ioctx.remove(oid));
+
+  ASSERT_EQ(0, create_image(&ioctx, oid, 0, order,
+                           features, object_prefix));
+  ASSERT_EQ(0, ioctx.remove(oid));
+
+  ASSERT_EQ(-ENOSYS, create_image(&ioctx, oid, size, order,
+                                 -1, object_prefix));
+  ASSERT_EQ(-ENOENT, ioctx.remove(oid));
+
+  bufferlist inbl, outbl;
+  ASSERT_EQ(-EINVAL, ioctx.exec(oid, "rbd", "create", inbl, outbl));
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_features)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  uint64_t features;
+  ASSERT_EQ(-ENOENT, get_features(&ioctx, "foo", CEPH_NOSNAP, &features));
+
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+  ASSERT_EQ(0, get_features(&ioctx, "foo", CEPH_NOSNAP, &features));
+  ASSERT_EQ(0u, features);
+
+  ASSERT_EQ(-ENOENT, get_features(&ioctx, "foo", 1, &features));
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_object_prefix)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  string object_prefix;
+  ASSERT_EQ(-ENOENT, get_object_prefix(&ioctx, "foo", &object_prefix));
+
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+  ASSERT_EQ(0, get_object_prefix(&ioctx, "foo", &object_prefix));
+  ASSERT_EQ("foo", object_prefix);
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, get_size)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  uint64_t size;
+  uint8_t order;
+  ASSERT_EQ(-ENOENT, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(0u, size);
+  ASSERT_EQ(22, order);
+  ASSERT_EQ(0, ioctx.remove("foo"));
+
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 2 << 22, 0, 0, "foo"));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(2u << 22, size);
+  ASSERT_EQ(0, order);
+
+  ASSERT_EQ(-ENOENT, get_size(&ioctx, "foo", 1, &size, &order));
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, set_size)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  ASSERT_EQ(-ENOENT, set_size(&ioctx, "foo", 5));
+
+  uint64_t size;
+  uint8_t order;
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 0, 22, 0, "foo"));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(0u, size);
+  ASSERT_EQ(22, order);
+
+  ASSERT_EQ(0, set_size(&ioctx, "foo", 0));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(0u, size);
+  ASSERT_EQ(22, order);
+
+  ASSERT_EQ(0, set_size(&ioctx, "foo", 3 << 22));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(3u << 22, size);
+  ASSERT_EQ(22, order);
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}
+
+TEST(cls_rbd, snapshots)
+{
+  librados::Rados rados;
+  librados::IoCtx ioctx;
+  string pool_name = get_temp_pool_name();
+
+  ASSERT_EQ("", create_one_pool_pp(pool_name, rados));
+  ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx));
+
+  ASSERT_EQ(-ENOENT, snapshot_add(&ioctx, "foo", 0, "snap1"));
+
+  ASSERT_EQ(0, create_image(&ioctx, "foo", 10, 22, 0, "foo"));
+
+  vector<string> snap_names;
+  vector<uint64_t> snap_sizes;
+  vector<uint64_t> snap_features;
+  SnapContext snapc;
+  
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(0u, snapc.snaps.size());
+  ASSERT_EQ(0u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(0u, snap_names.size());
+  ASSERT_EQ(0u, snap_sizes.size());
+  ASSERT_EQ(0u, snap_features.size());
+
+  ASSERT_EQ(0, snapshot_add(&ioctx, "foo", 0, "snap1"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(0u, snapc.snaps[0]);
+  ASSERT_EQ(0u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(1u, snap_names.size());
+  ASSERT_EQ("snap1", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+
+  // snap with same id and name
+  ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 0, "snap1"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(0u, snapc.snaps[0]);
+  ASSERT_EQ(0u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(1u, snap_names.size());
+  ASSERT_EQ("snap1", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+
+  // snap with same id, different name
+  ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 0, "snap2"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(0u, snapc.snaps[0]);
+  ASSERT_EQ(0u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(1u, snap_names.size());
+  ASSERT_EQ("snap1", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+
+  // snap with different id, same name
+  ASSERT_EQ(-EEXIST, snapshot_add(&ioctx, "foo", 1, "snap1"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(0u, snapc.snaps[0]);
+  ASSERT_EQ(0u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(snap_names.size(), 1u);
+  ASSERT_EQ(snap_names[0], "snap1");
+  ASSERT_EQ(snap_sizes[0], 10u);
+  ASSERT_EQ(snap_features[0], 0u);
+
+  // snap with different id, different name
+  ASSERT_EQ(0, snapshot_add(&ioctx, "foo", 1, "snap2"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(2u, snapc.snaps.size());
+  ASSERT_EQ(1u, snapc.snaps[0]);
+  ASSERT_EQ(0u, snapc.snaps[1]);
+  ASSERT_EQ(1u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(2u, snap_names.size());
+  ASSERT_EQ("snap2", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+  ASSERT_EQ("snap1", snap_names[1]);
+  ASSERT_EQ(10u, snap_sizes[1]);
+  ASSERT_EQ(0u, snap_features[1]);
+
+  ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", 0));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(1u, snapc.snaps[0]);
+  ASSERT_EQ(1u, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(1u, snap_names.size());
+  ASSERT_EQ("snap2", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+
+  uint64_t size;
+  uint8_t order;
+  ASSERT_EQ(0, set_size(&ioctx, "foo", 0));
+  ASSERT_EQ(0, get_size(&ioctx, "foo", CEPH_NOSNAP, &size, &order));
+  ASSERT_EQ(0u, size);
+  ASSERT_EQ(22u, order);
+
+  uint64_t large_snap_id = 1ull << 63;
+  ASSERT_EQ(0, snapshot_add(&ioctx, "foo", large_snap_id, "snap3"));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(2u, snapc.snaps.size());
+  ASSERT_EQ(large_snap_id, snapc.snaps[0]);
+  ASSERT_EQ(1u, snapc.snaps[1]);
+  ASSERT_EQ(large_snap_id, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(2u, snap_names.size());
+  ASSERT_EQ("snap3", snap_names[0]);
+  ASSERT_EQ(0u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+  ASSERT_EQ("snap2", snap_names[1]);
+  ASSERT_EQ(10u, snap_sizes[1]);
+  ASSERT_EQ(0u, snap_features[1]);
+
+  ASSERT_EQ(0, get_size(&ioctx, "foo", large_snap_id, &size, &order));
+  ASSERT_EQ(0u, size);
+  ASSERT_EQ(22u, order);
+
+  ASSERT_EQ(0, get_size(&ioctx, "foo", 1, &size, &order));
+  ASSERT_EQ(10u, size);
+  ASSERT_EQ(22u, order);
+
+  ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", large_snap_id));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(1u, snapc.snaps.size());
+  ASSERT_EQ(1u, snapc.snaps[0]);
+  ASSERT_EQ(large_snap_id, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(1u, snap_names.size());
+  ASSERT_EQ("snap2", snap_names[0]);
+  ASSERT_EQ(10u, snap_sizes[0]);
+  ASSERT_EQ(0u, snap_features[0]);
+
+  ASSERT_EQ(-ENOENT, snapshot_remove(&ioctx, "foo", large_snap_id));
+  ASSERT_EQ(0, snapshot_remove(&ioctx, "foo", 1));
+  ASSERT_EQ(0, get_snapcontext(&ioctx, "foo", &snapc));
+  ASSERT_EQ(0u, snapc.snaps.size());
+  ASSERT_EQ(large_snap_id, snapc.seq);
+  ASSERT_EQ(0, snapshot_list(&ioctx, "foo", snapc.snaps, &snap_names,
+                            &snap_sizes, &snap_features));
+  ASSERT_EQ(0u, snap_names.size());
+  ASSERT_EQ(0u, snap_sizes.size());
+  ASSERT_EQ(0u, snap_features.size());
+
+  ioctx.close();
+  ASSERT_EQ(0, destroy_one_pool_pp(pool_name, rados));
+}