]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_journal: new tag management methods and handling
authorJason Dillaman <dillaman@redhat.com>
Wed, 3 Feb 2016 22:33:24 +0000 (17:33 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 5 Feb 2016 20:21:26 +0000 (15:21 -0500)
In the case of librbd, a new tag will be allocated when the
exclusive lock is acquired.  All tags for the same dataset
(e.g. librbd image) will belong to the same class.  Tags are
automatically pruned on tag create / client unregister
if no other clients' commit position would require the tags.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/cls/journal/cls_journal.cc
src/cls/journal/cls_journal_client.cc
src/cls/journal/cls_journal_client.h
src/cls/journal/cls_journal_types.cc
src/cls/journal/cls_journal_types.h
src/test/cls_journal/test_cls_journal.cc

index 0f2f3e48846cbfdaea979d412f5c58fd8eb061f3..08599a38bbf563d7f790b806b723124a0399ed74 100644 (file)
@@ -28,28 +28,55 @@ cls_method_handle_t h_journal_client_register;
 cls_method_handle_t h_journal_client_unregister;
 cls_method_handle_t h_journal_client_commit;
 cls_method_handle_t h_journal_client_list;
+cls_method_handle_t h_journal_get_next_tag_tid;
+cls_method_handle_t h_journal_get_tag;
+cls_method_handle_t h_journal_tag_create;
+cls_method_handle_t h_journal_tag_list;
 cls_method_handle_t h_journal_object_guard_append;
 
 namespace {
 
 static const uint64_t MAX_KEYS_READ = 64;
 
-static const std::string HEADER_KEY_ORDER         = "order";
-static const std::string HEADER_KEY_SPLAY_WIDTH   = "splay_width";
-static const std::string HEADER_KEY_POOL_ID       = "pool_id";
-static const std::string HEADER_KEY_MINIMUM_SET   = "minimum_set";
-static const std::string HEADER_KEY_ACTIVE_SET    = "active_set";
-static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
+static const std::string HEADER_KEY_ORDER          = "order";
+static const std::string HEADER_KEY_SPLAY_WIDTH    = "splay_width";
+static const std::string HEADER_KEY_POOL_ID        = "pool_id";
+static const std::string HEADER_KEY_MINIMUM_SET    = "minimum_set";
+static const std::string HEADER_KEY_ACTIVE_SET     = "active_set";
+static const std::string HEADER_KEY_NEXT_TAG_TID   = "next_tag_tid";
+static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
+static const std::string HEADER_KEY_CLIENT_PREFIX  = "client_";
+static const std::string HEADER_KEY_TAG_PREFIX     = "tag_";
+
+std::string to_hex(uint64_t value) {
+  std::ostringstream oss;
+  oss << std::setw(16) << std::setfill('0') << std::hex << value;
+  return oss.str();
+}
+
+std::string key_from_client_id(const std::string &client_id) {
+  return HEADER_KEY_CLIENT_PREFIX + client_id;
+}
+
+std::string key_from_tag_tid(uint64_t tag_tid) {
+  return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
+}
 
-static void key_from_client_id(const std::string &client_id, string *key) {
-  *key = HEADER_KEY_CLIENT_PREFIX + client_id;
+uint64_t tag_tid_from_key(const std::string &key) {
+  std::istringstream iss(key);
+  uint64_t id;
+  iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
+  return id;
 }
 
 template <typename T>
-int read_key(cls_method_context_t hctx, const string &key, T *t) {
+int read_key(cls_method_context_t hctx, const string &key, T *t,
+             bool ignore_enoent = false) {
   bufferlist bl;
   int r = cls_cxx_map_get_val(hctx, key, &bl);
-  if (r < 0) {
+  if (r == -ENOENT && ignore_enoent) {
+    return 0;
+  } else if (r < 0) {
     CLS_ERR("failed to get omap key: %s", key.c_str());
     return r;
   }
@@ -77,6 +104,123 @@ int write_key(cls_method_context_t hctx, const string &key, const T &t) {
   return 0;
 }
 
+int remove_key(cls_method_context_t hctx, const string &key) {
+  int r = cls_cxx_map_remove_key(hctx, key);
+  if (r < 0 && r != -ENOENT) {
+      CLS_ERR("failed to remove key: %s", key.c_str());
+      return r;
+  }
+  return 0;
+}
+
+int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
+
+  std::string skip_client_key;
+  if (skip_client_id != nullptr) {
+    skip_client_key = key_from_client_id(*skip_client_id);
+  }
+
+  int r;
+  uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+  std::string last_read = HEADER_KEY_CLIENT_PREFIX;
+  do {
+    std::map<std::string, bufferlist> vals;
+    r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+                             MAX_KEYS_READ, &vals);
+    if (r < 0 && r != -ENOENT) {
+      CLS_ERR("failed to retrieve registered clients: %s",
+              cpp_strerror(r).c_str());
+      return r;
+    }
+
+    for (auto &val : vals) {
+      // if we are removing a client, skip its commit positions
+      if (val.first == skip_client_key) {
+        continue;
+      }
+
+      cls::journal::Client client;
+      bufferlist::iterator iter = val.second.begin();
+      try {
+        ::decode(client, iter);
+      } catch (const buffer::error &err) {
+        CLS_ERR("error decoding registered client: %s",
+                val.first.c_str());
+        return -EIO;
+      }
+
+      // cannot expire tags if a client hasn't committed yet
+      if (client.commit_position.entry_positions.empty()) {
+        return 0;
+      }
+
+      for (auto entry_position : client.commit_position.entry_positions) {
+        minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+      }
+    }
+    if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  } while (r == MAX_KEYS_READ);
+
+  // compute the minimum in-use tag for each class
+  std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+  typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+                 TAG_PASS_SCRUB,
+                 TAG_PASS_DONE } TagPass;
+  int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
+  last_read = HEADER_KEY_TAG_PREFIX;
+  do {
+    std::map<std::string, bufferlist> vals;
+    r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+                             MAX_KEYS_READ, &vals);
+    if (r < 0 && r != -ENOENT) {
+      CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+
+    for (auto &val : vals) {
+      cls::journal::Tag tag;
+      bufferlist::iterator iter = val.second.begin();
+      try {
+        ::decode(tag, iter);
+      } catch (const buffer::error &err) {
+        CLS_ERR("error decoding tag: %s", val.first.c_str());
+        return -EIO;
+      }
+
+      if (tag.tid != tag_tid_from_key(val.first)) {
+        CLS_ERR("tag tid mismatched: %s", val.first.c_str());
+        return -EINVAL;
+      }
+
+      if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+        minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+      } else if (tag_pass == TAG_PASS_SCRUB &&
+                 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
+        r = remove_key(hctx, val.first);
+        if (r < 0) {
+          return r;
+        }
+      }
+
+      if (tag.tid >= minimum_tag_tid) {
+        // no need to check for tag classes beyond this point
+        vals.clear();
+        break;
+      }
+    }
+
+    if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
+      last_read = HEADER_KEY_TAG_PREFIX;
+      ++tag_pass;
+    } else if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  } while (tag_pass != TAG_PASS_DONE);
+  return 0;
+}
+
 } // anonymous namespace
 
 /**
@@ -133,6 +277,17 @@ int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
   if (r < 0) {
     return r;
   }
+
+  uint64_t tag_id = 0;
+  r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
+  if (r < 0) {
+    return r;
+  }
+
+  r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
+  if (r < 0) {
+    return r;
+  }
   return 0;
 }
 
@@ -359,9 +514,7 @@ int journal_client_register(cls_method_context_t hctx, bufferlist *in,
     return -EINVAL;
   }
 
-  std::string key;
-  key_from_client_id(id, &key);
-
+  std::string key(key_from_client_id(id));
   bufferlist stored_clientbl;
   int r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
   if (r != -ENOENT) {
@@ -370,6 +523,7 @@ int journal_client_register(cls_method_context_t hctx, bufferlist *in,
   }
 
   cls::journal::Client client(id, description);
+  key = key_from_client_id(id);
   r = write_key(hctx, key, client);
   if (r < 0) {
     return r;
@@ -395,9 +549,7 @@ int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
     return -EINVAL;
   }
 
-  std::string key;
-  key_from_client_id(id, &key);
-
+  std::string key(key_from_client_id(id));
   bufferlist bl;
   int r = cls_cxx_map_get_val(hctx, key, &bl);
   if (r < 0) {
@@ -410,6 +562,12 @@ int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
     CLS_ERR("failed to remove omap key: %s", key.c_str());
     return r;
   }
+
+  // prune expired tags
+  r = expire_tags(hctx, &id);
+  if (r < 0) {
+    return r;
+  }
   return 0;
 }
 
@@ -444,9 +602,7 @@ int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
     return -EINVAL;
   }
 
-  std::string key;
-  key_from_client_id(id, &key);
-
+  std::string key(key_from_client_id(id));
   cls::journal::Client client;
   r = read_key(hctx, key, &client);
   if (r < 0) {
@@ -489,7 +645,7 @@ int journal_client_list(cls_method_context_t hctx, bufferlist *in,
 
   std::string last_read;
   if (!start_after.empty()) {
-    key_from_client_id(start_after, &last_read);
+    last_read = key_from_client_id(start_after);
   }
 
   std::map<std::string, bufferlist> vals;
@@ -520,6 +676,247 @@ int journal_client_list(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
+                             bufferlist *out) {
+  uint64_t tag_tid;
+  int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
+  if (r < 0) {
+    return r;
+  }
+
+  ::encode(tag_tid, *out);
+  return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ *
+ * Output:
+ * cls::journal::Tag
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
+                    bufferlist *out) {
+  uint64_t tag_tid;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(tag_tid, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode input parameters: %s", err.what());
+    return -EINVAL;
+  }
+
+  std::string key(key_from_tag_tid(tag_tid));
+  cls::journal::Tag tag;
+  int r = read_key(hctx, key, &tag);
+  if (r < 0) {
+    return r;
+  }
+
+  ::encode(tag, *out);
+  return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ * @param tag_class (uint64_t)
+ * @param data (bufferlist)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
+                       bufferlist *out) {
+  uint64_t tag_tid;
+  uint64_t tag_class;
+  bufferlist data;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(tag_tid, iter);
+    ::decode(tag_class, iter);
+    ::decode(data, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode input parameters: %s", err.what());
+    return -EINVAL;
+  }
+
+  std::string key(key_from_tag_tid(tag_tid));
+  bufferlist stored_tag_bl;
+  int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
+  if (r != -ENOENT) {
+    CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
+    return -EEXIST;
+  }
+
+  // verify tag tid ordering
+  uint64_t next_tag_tid;
+  r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
+  if (r < 0) {
+    return r;
+  }
+  if (tag_tid != next_tag_tid) {
+    CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
+    return -ESTALE;
+  }
+
+  uint64_t next_tag_class;
+  r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
+  if (r < 0) {
+    return r;
+  }
+
+  if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
+    // allocate a new tag class
+    tag_class = next_tag_class;
+    r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
+    if (r < 0) {
+      return r;
+    }
+  } else {
+    // verify tag class range
+    if (tag_class >= next_tag_class) {
+      CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
+      return -EINVAL;
+    }
+  }
+
+  // prune expired tags
+  r = expire_tags(hctx, nullptr);
+  if (r < 0) {
+    return r;
+  }
+
+  // update tag tid sequence
+  r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
+  if (r < 0) {
+    return r;
+  }
+
+  // write tag structure
+  cls::journal::Tag tag(tag_tid, tag_class, data);
+  key = key_from_tag_tid(tag_tid);
+  r = write_key(hctx, key, tag);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+/**
+ * Input:
+ * @param start_after_tag_tid (uint64_t) - first tag tid
+ * @param max_return (uint64_t) - max tags to return
+ * @param client_id (std::string) - client id filter
+ * @param tag_class (boost::optional<uint64_t> - optional tag class filter
+ *
+ * Output:
+ * std::set<cls::journal::Tag> - collection of tags
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
+                     bufferlist *out) {
+  uint64_t start_after_tag_tid;
+  uint64_t max_return;
+  std::string client_id;
+  boost::optional<uint64_t> tag_class(0);
+
+  // handle compiler false positive about use-before-init
+  tag_class = boost::none;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(start_after_tag_tid, iter);
+    ::decode(max_return, iter);
+    ::decode(client_id, iter);
+    ::decode(tag_class, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode input parameters: %s", err.what());
+    return -EINVAL;
+  }
+
+  // calculate the minimum tag within client's commit position
+  uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+  cls::journal::Client client;
+  int r = read_key(hctx, key_from_client_id(client_id), &client);
+  if (r < 0) {
+    return r;
+  }
+
+  for (auto entry_position : client.commit_position.entry_positions) {
+    minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+  }
+
+  // compute minimum tags in use per-class
+  std::set<cls::journal::Tag> tags;
+  std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+  typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+                 TAG_PASS_LIST,
+                 TAG_PASS_DONE } TagPass;
+  int tag_pass = (client.commit_position.entry_positions.empty() ?
+    TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
+  std::string last_read = HEADER_KEY_TAG_PREFIX;
+  do {
+    std::map<std::string, bufferlist> vals;
+    r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+                             MAX_KEYS_READ, &vals);
+    if (r < 0 && r != -ENOENT) {
+      CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+
+    for (auto &val : vals) {
+      cls::journal::Tag tag;
+      bufferlist::iterator iter = val.second.begin();
+      try {
+        ::decode(tag, iter);
+      } catch (const buffer::error &err) {
+        CLS_ERR("error decoding tag: %s", val.first.c_str());
+        return -EIO;
+      }
+
+      if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+        minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+
+        // completed calculation of tag class minimums
+        if (tag.tid >= minimum_tag_tid) {
+          vals.clear();
+          break;
+        }
+      } else if (tag_pass == TAG_PASS_LIST) {
+        if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
+          continue;
+        }
+
+        if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
+            (!tag_class || *tag_class == tag.tag_class)) {
+          tags.insert(tag);
+        }
+        if (tags.size() >= max_return) {
+          tag_pass = TAG_PASS_DONE;
+        }
+      }
+    }
+
+    if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
+      last_read = HEADER_KEY_TAG_PREFIX;
+      ++tag_pass;
+    } else if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  } while (tag_pass != TAG_PASS_DONE);
+
+  ::encode(tags, *out);
+  return 0;
+}
+
 /**
  * Input:
  * @param soft_max_size (uint64_t)
@@ -610,6 +1007,19 @@ void CEPH_CLS_API __cls_init()
   cls_register_cxx_method(h_class, "client_list",
                           CLS_METHOD_RD,
                           journal_client_list, &h_journal_client_list);
+  cls_register_cxx_method(h_class, "get_next_tag_tid",
+                          CLS_METHOD_RD,
+                          journal_get_next_tag_tid,
+                          &h_journal_get_next_tag_tid);
+  cls_register_cxx_method(h_class, "get_tag",
+                          CLS_METHOD_RD,
+                          journal_get_tag, &h_journal_get_tag);
+  cls_register_cxx_method(h_class, "tag_create",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          journal_tag_create, &h_journal_tag_create);
+  cls_register_cxx_method(h_class, "tag_list",
+                          CLS_METHOD_RD,
+                          journal_tag_list, &h_journal_tag_list);
 
   /// methods for journal_data.$journal_id.$object_id objects
   cls_register_cxx_method(h_class, "guard_append",
index a4a268d8d46b3ad1de71fd093af94f36a54594a7..7f8af73af3cff809889c47ecb4dc1a8f9565e70a 100644 (file)
@@ -263,6 +263,145 @@ int client_list(librados::IoCtx &ioctx, const std::string &oid,
   return cond.wait();
 }
 
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+                     uint64_t *tag_tid) {
+  librados::ObjectReadOperation op;
+  get_next_tag_tid_start(&op);
+
+  bufferlist out_bl;
+  int r = ioctx.operate(oid, &op, &out_bl);
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist::iterator iter = out_bl.begin();
+  r = get_next_tag_tid_finish(&iter, tag_tid);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
+  bufferlist bl;
+  op->exec("journal", "get_next_tag_tid", bl);
+}
+
+int get_next_tag_tid_finish(bufferlist::iterator *iter,
+                            uint64_t *tag_tid) {
+  try {
+    ::decode(*tag_tid, *iter);
+  } catch (const buffer::error &err) {
+    return -EBADMSG;
+  }
+  return 0;
+}
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+            uint64_t tag_tid, cls::journal::Tag *tag) {
+  librados::ObjectReadOperation op;
+  get_tag_start(&op, tag_tid);
+
+  bufferlist out_bl;
+  int r = ioctx.operate(oid, &op, &out_bl);
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist::iterator iter = out_bl.begin();
+  r = get_tag_finish(&iter, tag);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+void get_tag_start(librados::ObjectReadOperation *op,
+                   uint64_t tag_tid) {
+  bufferlist bl;
+  ::encode(tag_tid, bl);
+  op->exec("journal", "get_tag", bl);
+}
+
+int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) {
+  try {
+    ::decode(*tag, *iter);
+  } catch (const buffer::error &err) {
+    return -EBADMSG;
+  }
+  return 0;
+}
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+               uint64_t tag_tid, uint64_t tag_class,
+               const bufferlist &data) {
+  librados::ObjectWriteOperation op;
+  tag_create(&op, tag_tid, tag_class, data);
+  return ioctx.operate(oid, &op);
+}
+
+void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
+                uint64_t tag_class, const bufferlist &data) {
+  bufferlist bl;
+  ::encode(tag_tid, bl);
+  ::encode(tag_class, bl);
+  ::encode(data, bl);
+  op->exec("journal", "tag_create", bl);
+}
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+             const std::string &client_id, boost::optional<uint64_t> tag_class,
+             std::set<cls::journal::Tag> *tags) {
+  tags->clear();
+  uint64_t start_after_tag_tid = 0;
+  while (true) {
+    librados::ObjectReadOperation op;
+    tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
+                   tag_class);
+
+    bufferlist out_bl;
+    int r = ioctx.operate(oid, &op, &out_bl);
+    if (r < 0) {
+      return r;
+    }
+
+    bufferlist::iterator iter = out_bl.begin();
+    std::set<cls::journal::Tag> decode_tags;
+    r = tag_list_finish(&iter, &decode_tags);
+    if (r < 0) {
+      return r;
+    }
+
+    tags->insert(decode_tags.begin(), decode_tags.end());
+    if (decode_tags.size() < JOURNAL_MAX_RETURN) {
+      break;
+    }
+  }
+  return 0;
+}
+
+void tag_list_start(librados::ObjectReadOperation *op,
+                    uint64_t start_after_tag_tid, uint64_t max_return,
+                    const std::string &client_id,
+                    boost::optional<uint64_t> tag_class) {
+  bufferlist bl;
+  ::encode(start_after_tag_tid, bl);
+  ::encode(max_return, bl);
+  ::encode(client_id, bl);
+  ::encode(tag_class, bl);
+  op->exec("journal", "tag_list", bl);
+}
+
+int tag_list_finish(bufferlist::iterator *iter,
+                    std::set<cls::journal::Tag> *tags) {
+  try {
+    ::decode(*tags, *iter);
+  } catch (const buffer::error &err) {
+    return -EBADMSG;
+  }
+  return 0;
+}
+
 void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
   bufferlist bl;
   ::encode(soft_max_size, bl);
index 18ccf7bc2286df708a2b602fecb987926bab1b7a..acb4ae5410274b1d6dcc8d32508bce608427f76a 100644 (file)
@@ -10,6 +10,7 @@
 #include <map>
 #include <set>
 #include <string>
+#include <boost/optional.hpp>
 
 class Context;
 
@@ -31,6 +32,7 @@ void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
 void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set);
 void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set);
 
+// journal client helpers
 void client_register(librados::ObjectWriteOperation *op,
                      const std::string &id, const std::string &description);
 int client_register(librados::IoCtx &ioctx, const std::string &oid,
@@ -42,6 +44,37 @@ void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
 int client_list(librados::IoCtx &ioctx, const std::string &oid,
                 std::set<cls::journal::Client> *clients);
 
+// journal tag helpers
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+                     uint64_t *tag_tid);
+void get_next_tag_tid_start(librados::ObjectReadOperation *op);
+int get_next_tag_tid_finish(bufferlist::iterator *iter,
+                            uint64_t *tag_tid);
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+            uint64_t tag_tid, cls::journal::Tag *tag);
+void get_tag_start(librados::ObjectReadOperation *op,
+                   uint64_t tag_tid);
+int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag);
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+               uint64_t tag_tid, uint64_t tag_class,
+               const bufferlist &data);
+void tag_create(librados::ObjectWriteOperation *op,
+                uint64_t tag_tid, uint64_t tag_class,
+                const bufferlist &data);
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+             const std::string &client_id, boost::optional<uint64_t> tag_class,
+             std::set<cls::journal::Tag> *tags);
+void tag_list_start(librados::ObjectReadOperation *op,
+                    uint64_t start_after_tag_tid, uint64_t max_return,
+                    const std::string &client_id,
+                    boost::optional<uint64_t> tag_class);
+int tag_list_finish(bufferlist::iterator *iter,
+                    std::set<cls::journal::Tag> *tags);
+
+// journal entry helpers
 void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size);
 
 } // namespace client
index f66a5a480334a3336ede307438f4cd309be4fb74..437b380d9cf2821e4c632d746df48943f85b6d4e 100644 (file)
@@ -60,11 +60,7 @@ void ObjectSetPosition::dump(Formatter *f) const {
 void ObjectSetPosition::generate_test_instances(
     std::list<ObjectSetPosition *> &o) {
   o.push_back(new ObjectSetPosition());
-
-  EntryPositions entry_positions;
-  entry_positions.push_back(EntryPosition(1, 120));
-  entry_positions.push_back(EntryPosition(2, 121));
-  o.push_back(new ObjectSetPosition(1, entry_positions));
+  o.push_back(new ObjectSetPosition(1, {{1, 120}, {2, 121}}));
 }
 
 void Client::encode(bufferlist& bl) const {
@@ -86,6 +82,7 @@ void Client::decode(bufferlist::iterator& iter) {
 void Client::dump(Formatter *f) const {
   f->dump_string("id", id);
   f->dump_string("description", description);
+
   f->open_object_section("commit_position");
   commit_position.dump(f);
   f->close_section();
@@ -94,11 +91,40 @@ void Client::dump(Formatter *f) const {
 void Client::generate_test_instances(std::list<Client *> &o) {
   o.push_back(new Client());
   o.push_back(new Client("id", "desc"));
+  o.push_back(new Client("id", "desc", {1, {{1, 120}, {2, 121}}}));
+}
+
+void Tag::encode(bufferlist& bl) const {
+  ENCODE_START(1, 1, bl);
+  ::encode(tid, bl);
+  ::encode(tag_class, bl);
+  ::encode(data, bl);
+  ENCODE_FINISH(bl);
+}
+
+void Tag::decode(bufferlist::iterator& iter) {
+  DECODE_START(1, iter);
+  ::decode(tid, iter);
+  ::decode(tag_class, iter);
+  ::decode(data, iter);
+  DECODE_FINISH(iter);
+}
+
+void Tag::dump(Formatter *f) const {
+  f->dump_unsigned("tid", tid);
+  f->dump_unsigned("tag_class", tag_class);
 
-  EntryPositions entry_positions;
-  entry_positions.push_back(EntryPosition(1, 120));
-  entry_positions.push_back(EntryPosition(2, 121));
-  o.push_back(new Client("id", "desc", ObjectSetPosition(1, entry_positions)));
+  std::stringstream data_ss;
+  data.hexdump(data_ss);
+  f->dump_string("data", data_ss.str());
+}
+
+void Tag::generate_test_instances(std::list<Tag *> &o) {
+  o.push_back(new Tag());
+
+  bufferlist data;
+  data.append(std::string('1', 128));
+  o.push_back(new Tag(123, 234, data));
 }
 
 std::ostream &operator<<(std::ostream &os,
@@ -112,18 +138,28 @@ std::ostream &operator<<(std::ostream &os,
                          const ObjectSetPosition &object_set_position) {
   os << "[object_number=" << object_set_position.object_number << ", "
      << "positions=[";
-  for (EntryPositions::const_iterator it =
-         object_set_position.entry_positions.begin();
-       it != object_set_position.entry_positions.end(); ++it) {
-    os << *it;
+  std::string delim;
+  for (auto &entry_position : object_set_position.entry_positions) {
+    os << entry_position << delim;
+    delim = ", ";
   }
   os << "]]";
   return os;
 }
 
 std::ostream &operator<<(std::ostream &os, const Client &client) {
-  os << "[id=" << client.id << ", description=" << client.description
-     << ", commit_position=" << client.commit_position << "]";
+  os << "[id=" << client.id << ", "
+     << "description=" << client.description << ", "
+     << "commit_position=" << client.commit_position << "]";
+  return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const Tag &tag) {
+  os << "[tid=" << tag.tid << ", "
+     << "tag_class=" << tag.tag_class << ", "
+     << "data=";
+  tag.data.hexdump(os);
+  os << "]";
   return os;
 }
 
index da23680914cee991252fccdffbe96b60e2c863d9..3e614c20b661cfad536f91d4c0c427298552d8a6 100644 (file)
@@ -9,6 +9,7 @@
 #include "include/encoding.h"
 #include <iosfwd>
 #include <list>
+#include <set>
 #include <string>
 
 namespace ceph {
@@ -78,7 +79,8 @@ struct Client {
     : id(_id), description(_description), commit_position(_commit_position) {}
 
   inline bool operator==(const Client &rhs) const {
-    return (id == rhs.id && description == rhs.description &&
+    return (id == rhs.id &&
+            description == rhs.description &&
             commit_position == rhs.commit_position);
   }
   inline bool operator<(const Client &rhs) const {
@@ -92,9 +94,37 @@ struct Client {
   static void generate_test_instances(std::list<Client *> &o);
 };
 
+struct Tag {
+  static const uint64_t TAG_CLASS_NEW = static_cast<uint64_t>(-1);
+
+  uint64_t tid;
+  uint64_t tag_class;
+  bufferlist data;
+
+  Tag() : tid(0), tag_class(0) {}
+  Tag(uint64_t tid, uint64_t tag_class, const bufferlist &data)
+    : tid(tid), tag_class(tag_class), data(data) {}
+
+  inline bool operator==(const Tag &rhs) const {
+    return (tid == rhs.tid &&
+            tag_class == rhs.tag_class &&
+            data.contents_equal(rhs.data));
+  }
+  inline bool operator<(const Tag &rhs) const {
+    return (tid < rhs.tid);
+  }
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator& iter);
+  void dump(Formatter *f) const;
+
+  static void generate_test_instances(std::list<Tag *> &o);
+};
+
 WRITE_CLASS_ENCODER(EntryPosition);
 WRITE_CLASS_ENCODER(ObjectSetPosition);
 WRITE_CLASS_ENCODER(Client);
+WRITE_CLASS_ENCODER(Tag);
 
 std::ostream &operator<<(std::ostream &os,
                          const EntryPosition &entry_position);
@@ -102,6 +132,7 @@ std::ostream &operator<<(std::ostream &os,
                          const ObjectSetPosition &object_set_position);
 std::ostream &operator<<(std::ostream &os,
                         const Client &client);
+std::ostream &operator<<(std::ostream &os, const Tag &tag);
 
 } // namespace journal
 } // namespace cls
index 4187ec206441f5ca327418223252cab9f38daa67..c10ed3891ea570697a5e74b6e322423018b9373d 100644 (file)
@@ -238,6 +238,35 @@ TEST_F(TestClsJournal, ClientUnregisterDNE) {
   ASSERT_EQ(-ENOENT, client::client_unregister(ioctx, oid, "id1"));
 }
 
+TEST_F(TestClsJournal, ClientUnregisterPruneTags) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id2", "desc2"));
+
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+  librados::ObjectWriteOperation op1;
+  client::client_commit(&op1, "id1", {1, {{2, 120}}});
+  ASSERT_EQ(0, ioctx.operate(oid, &op1));
+
+  ASSERT_EQ(0, client::client_unregister(ioctx, oid, "id2"));
+
+  std::set<Tag> expected_tags = {{0, 0, {}}, {2, 1, {}}};
+  std::set<Tag> tags;
+  ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+                                boost::optional<uint64_t>(), &tags));
+  ASSERT_EQ(expected_tags, tags);
+}
+
 TEST_F(TestClsJournal, ClientCommit) {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
@@ -332,6 +361,124 @@ TEST_F(TestClsJournal, ClientList) {
   ASSERT_EQ(expected_clients, read_clients);
 }
 
+TEST_F(TestClsJournal, GetNextTagTid) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  uint64_t tag_tid;
+  ASSERT_EQ(-ENOENT, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+
+  ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+  ASSERT_EQ(0, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+  ASSERT_EQ(0U, tag_tid);
+
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+  ASSERT_EQ(1U, tag_tid);
+}
+
+TEST_F(TestClsJournal, TagCreate) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  ASSERT_EQ(-ENOENT, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                        bufferlist()));
+
+  ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+  ASSERT_EQ(-ESTALE, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+                                        bufferlist()));
+  ASSERT_EQ(-EINVAL, client::tag_create(ioctx, oid, 0, 1, bufferlist()));
+
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(-EEXIST, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                        bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+  std::set<Tag> expected_tags = {
+    {0, 0, {}}, {1, 1, {}}, {2, 1, {}}};
+  std::set<Tag> tags;
+  ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+                                boost::optional<uint64_t>(), &tags));
+  ASSERT_EQ(expected_tags, tags);
+}
+
+TEST_F(TestClsJournal, TagCreatePrunesTags) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+                                  bufferlist()));
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+  librados::ObjectWriteOperation op1;
+  client::client_commit(&op1, "id1", {1, {{2, 120}}});
+  ASSERT_EQ(0, ioctx.operate(oid, &op1));
+
+  ASSERT_EQ(0, client::tag_create(ioctx, oid, 3, 0, bufferlist()));
+
+  std::set<Tag> expected_tags = {
+    {0, 0, {}}, {2, 1, {}}, {3, 0, {}}};
+  std::set<Tag> tags;
+  ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+                                boost::optional<uint64_t>(), &tags));
+  ASSERT_EQ(expected_tags, tags);
+}
+
+TEST_F(TestClsJournal, TagList) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+  std::set<Tag> expected_all_tags;
+  std::set<Tag> expected_filtered_tags;
+  for (uint32_t i = 0; i < 96; ++i) {
+    uint64_t tag_class = Tag::TAG_CLASS_NEW;
+    if (i > 1) {
+      tag_class = i % 2 == 0 ? 0 : 1;
+    }
+
+    Tag tag(i, i % 2 == 0 ? 0 : 1, bufferlist());
+    expected_all_tags.insert(tag);
+    if (i % 2 == 0) {
+      expected_filtered_tags.insert(tag);
+    }
+    ASSERT_EQ(0, client::tag_create(ioctx, oid, i, tag_class,
+                                    bufferlist()));
+  }
+
+  std::set<Tag> tags;
+  ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1", boost::optional<uint64_t>(),
+                                &tags));
+  ASSERT_EQ(expected_all_tags, tags);
+
+  ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1", boost::optional<uint64_t>(0),
+                                &tags));
+  ASSERT_EQ(expected_filtered_tags, tags);
+}
+
 TEST_F(TestClsJournal, GuardAppend) {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));