cls_method_handle_t h_journal_client_unregister;
cls_method_handle_t h_journal_client_commit;
cls_method_handle_t h_journal_client_list;
+cls_method_handle_t h_journal_get_next_tag_tid;
+cls_method_handle_t h_journal_get_tag;
+cls_method_handle_t h_journal_tag_create;
+cls_method_handle_t h_journal_tag_list;
cls_method_handle_t h_journal_object_guard_append;
namespace {
static const uint64_t MAX_KEYS_READ = 64;
-static const std::string HEADER_KEY_ORDER = "order";
-static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
-static const std::string HEADER_KEY_POOL_ID = "pool_id";
-static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
-static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
-static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
+static const std::string HEADER_KEY_ORDER = "order";
+static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
+static const std::string HEADER_KEY_POOL_ID = "pool_id";
+static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
+static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
+static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
+static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
+static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
+static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
+
+std::string to_hex(uint64_t value) {
+ std::ostringstream oss;
+ oss << std::setw(16) << std::setfill('0') << std::hex << value;
+ return oss.str();
+}
+
+std::string key_from_client_id(const std::string &client_id) {
+ return HEADER_KEY_CLIENT_PREFIX + client_id;
+}
+
+std::string key_from_tag_tid(uint64_t tag_tid) {
+ return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
+}
-static void key_from_client_id(const std::string &client_id, string *key) {
- *key = HEADER_KEY_CLIENT_PREFIX + client_id;
+uint64_t tag_tid_from_key(const std::string &key) {
+ std::istringstream iss(key);
+ uint64_t id;
+ iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
+ return id;
}
template <typename T>
-int read_key(cls_method_context_t hctx, const string &key, T *t) {
+int read_key(cls_method_context_t hctx, const string &key, T *t,
+ bool ignore_enoent = false) {
bufferlist bl;
int r = cls_cxx_map_get_val(hctx, key, &bl);
- if (r < 0) {
+ if (r == -ENOENT && ignore_enoent) {
+ return 0;
+ } else if (r < 0) {
CLS_ERR("failed to get omap key: %s", key.c_str());
return r;
}
return 0;
}
+int remove_key(cls_method_context_t hctx, const string &key) {
+ int r = cls_cxx_map_remove_key(hctx, key);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to remove key: %s", key.c_str());
+ return r;
+ }
+ return 0;
+}
+
+int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
+
+ std::string skip_client_key;
+ if (skip_client_id != nullptr) {
+ skip_client_key = key_from_client_id(*skip_client_id);
+ }
+
+ int r;
+ uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+ std::string last_read = HEADER_KEY_CLIENT_PREFIX;
+ do {
+ std::map<std::string, bufferlist> vals;
+ r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
+ MAX_KEYS_READ, &vals);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve registered clients: %s",
+ cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ // if we are removing a client, skip its commit positions
+ if (val.first == skip_client_key) {
+ continue;
+ }
+
+ cls::journal::Client client;
+ bufferlist::iterator iter = val.second.begin();
+ try {
+ ::decode(client, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding registered client: %s",
+ val.first.c_str());
+ return -EIO;
+ }
+
+ // cannot expire tags if a client hasn't committed yet
+ if (client.commit_position.entry_positions.empty()) {
+ return 0;
+ }
+
+ for (auto entry_position : client.commit_position.entry_positions) {
+ minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+ }
+ }
+ if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (r == MAX_KEYS_READ);
+
+ // compute the minimum in-use tag for each class
+ std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+ typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+ TAG_PASS_SCRUB,
+ TAG_PASS_DONE } TagPass;
+ int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
+ last_read = HEADER_KEY_TAG_PREFIX;
+ do {
+ std::map<std::string, bufferlist> vals;
+ r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+ MAX_KEYS_READ, &vals);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ cls::journal::Tag tag;
+ bufferlist::iterator iter = val.second.begin();
+ try {
+ ::decode(tag, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding tag: %s", val.first.c_str());
+ return -EIO;
+ }
+
+ if (tag.tid != tag_tid_from_key(val.first)) {
+ CLS_ERR("tag tid mismatched: %s", val.first.c_str());
+ return -EINVAL;
+ }
+
+ if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+ minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+ } else if (tag_pass == TAG_PASS_SCRUB &&
+ tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
+ r = remove_key(hctx, val.first);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ if (tag.tid >= minimum_tag_tid) {
+ // no need to check for tag classes beyond this point
+ vals.clear();
+ break;
+ }
+ }
+
+ if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
+ last_read = HEADER_KEY_TAG_PREFIX;
+ ++tag_pass;
+ } else if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (tag_pass != TAG_PASS_DONE);
+ return 0;
+}
+
} // anonymous namespace
/**
if (r < 0) {
return r;
}
+
+ uint64_t tag_id = 0;
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
+ if (r < 0) {
+ return r;
+ }
+
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
+ if (r < 0) {
+ return r;
+ }
return 0;
}
return -EINVAL;
}
- std::string key;
- key_from_client_id(id, &key);
-
+ std::string key(key_from_client_id(id));
bufferlist stored_clientbl;
int r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
if (r != -ENOENT) {
}
cls::journal::Client client(id, description);
+ key = key_from_client_id(id);
r = write_key(hctx, key, client);
if (r < 0) {
return r;
return -EINVAL;
}
- std::string key;
- key_from_client_id(id, &key);
-
+ std::string key(key_from_client_id(id));
bufferlist bl;
int r = cls_cxx_map_get_val(hctx, key, &bl);
if (r < 0) {
CLS_ERR("failed to remove omap key: %s", key.c_str());
return r;
}
+
+ // prune expired tags
+ r = expire_tags(hctx, &id);
+ if (r < 0) {
+ return r;
+ }
return 0;
}
return -EINVAL;
}
- std::string key;
- key_from_client_id(id, &key);
-
+ std::string key(key_from_client_id(id));
cls::journal::Client client;
r = read_key(hctx, key, &client);
if (r < 0) {
std::string last_read;
if (!start_after.empty()) {
- key_from_client_id(start_after, &last_read);
+ last_read = key_from_client_id(start_after);
}
std::map<std::string, bufferlist> vals;
return 0;
}
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
+ if (r < 0) {
+ return r;
+ }
+
+ ::encode(tag_tid, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ *
+ * Output:
+ * cls::journal::Tag
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(tag_tid, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_tag_tid(tag_tid));
+ cls::journal::Tag tag;
+ int r = read_key(hctx, key, &tag);
+ if (r < 0) {
+ return r;
+ }
+
+ ::encode(tag, *out);
+ return 0;
+}
+
+/**
+ * Input:
+ * @param tag_tid (uint64_t)
+ * @param tag_class (uint64_t)
+ * @param data (bufferlist)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t tag_tid;
+ uint64_t tag_class;
+ bufferlist data;
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(tag_tid, iter);
+ ::decode(tag_class, iter);
+ ::decode(data, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ std::string key(key_from_tag_tid(tag_tid));
+ bufferlist stored_tag_bl;
+ int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
+ if (r != -ENOENT) {
+ CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
+ return -EEXIST;
+ }
+
+ // verify tag tid ordering
+ uint64_t next_tag_tid;
+ r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
+ if (r < 0) {
+ return r;
+ }
+ if (tag_tid != next_tag_tid) {
+ CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
+ return -ESTALE;
+ }
+
+ uint64_t next_tag_class;
+ r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
+ if (r < 0) {
+ return r;
+ }
+
+ if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
+ // allocate a new tag class
+ tag_class = next_tag_class;
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
+ if (r < 0) {
+ return r;
+ }
+ } else {
+ // verify tag class range
+ if (tag_class >= next_tag_class) {
+ CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
+ return -EINVAL;
+ }
+ }
+
+ // prune expired tags
+ r = expire_tags(hctx, nullptr);
+ if (r < 0) {
+ return r;
+ }
+
+ // update tag tid sequence
+ r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
+ if (r < 0) {
+ return r;
+ }
+
+ // write tag structure
+ cls::journal::Tag tag(tag_tid, tag_class, data);
+ key = key_from_tag_tid(tag_tid);
+ r = write_key(hctx, key, tag);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+/**
+ * Input:
+ * @param start_after_tag_tid (uint64_t) - first tag tid
+ * @param max_return (uint64_t) - max tags to return
+ * @param client_id (std::string) - client id filter
+ * @param tag_class (boost::optional<uint64_t> - optional tag class filter
+ *
+ * Output:
+ * std::set<cls::journal::Tag> - collection of tags
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
+ bufferlist *out) {
+ uint64_t start_after_tag_tid;
+ uint64_t max_return;
+ std::string client_id;
+ boost::optional<uint64_t> tag_class(0);
+
+ // handle compiler false positive about use-before-init
+ tag_class = boost::none;
+ try {
+ bufferlist::iterator iter = in->begin();
+ ::decode(start_after_tag_tid, iter);
+ ::decode(max_return, iter);
+ ::decode(client_id, iter);
+ ::decode(tag_class, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode input parameters: %s", err.what());
+ return -EINVAL;
+ }
+
+ // calculate the minimum tag within client's commit position
+ uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
+ cls::journal::Client client;
+ int r = read_key(hctx, key_from_client_id(client_id), &client);
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto entry_position : client.commit_position.entry_positions) {
+ minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+ }
+
+ // compute minimum tags in use per-class
+ std::set<cls::journal::Tag> tags;
+ std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
+ typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
+ TAG_PASS_LIST,
+ TAG_PASS_DONE } TagPass;
+ int tag_pass = (client.commit_position.entry_positions.empty() ?
+ TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
+ std::string last_read = HEADER_KEY_TAG_PREFIX;
+ do {
+ std::map<std::string, bufferlist> vals;
+ r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
+ MAX_KEYS_READ, &vals);
+ if (r < 0 && r != -ENOENT) {
+ CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
+ return r;
+ }
+
+ for (auto &val : vals) {
+ cls::journal::Tag tag;
+ bufferlist::iterator iter = val.second.begin();
+ try {
+ ::decode(tag, iter);
+ } catch (const buffer::error &err) {
+ CLS_ERR("error decoding tag: %s", val.first.c_str());
+ return -EIO;
+ }
+
+ if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
+ minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
+
+ // completed calculation of tag class minimums
+ if (tag.tid >= minimum_tag_tid) {
+ vals.clear();
+ break;
+ }
+ } else if (tag_pass == TAG_PASS_LIST) {
+ if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
+ continue;
+ }
+
+ if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
+ (!tag_class || *tag_class == tag.tag_class)) {
+ tags.insert(tag);
+ }
+ if (tags.size() >= max_return) {
+ tag_pass = TAG_PASS_DONE;
+ }
+ }
+ }
+
+ if (tag_pass != TAG_PASS_DONE && vals.size() < MAX_KEYS_READ) {
+ last_read = HEADER_KEY_TAG_PREFIX;
+ ++tag_pass;
+ } else if (!vals.empty()) {
+ last_read = vals.rbegin()->first;
+ }
+ } while (tag_pass != TAG_PASS_DONE);
+
+ ::encode(tags, *out);
+ return 0;
+}
+
/**
* Input:
* @param soft_max_size (uint64_t)
cls_register_cxx_method(h_class, "client_list",
CLS_METHOD_RD,
journal_client_list, &h_journal_client_list);
+ cls_register_cxx_method(h_class, "get_next_tag_tid",
+ CLS_METHOD_RD,
+ journal_get_next_tag_tid,
+ &h_journal_get_next_tag_tid);
+ cls_register_cxx_method(h_class, "get_tag",
+ CLS_METHOD_RD,
+ journal_get_tag, &h_journal_get_tag);
+ cls_register_cxx_method(h_class, "tag_create",
+ CLS_METHOD_RD | CLS_METHOD_WR,
+ journal_tag_create, &h_journal_tag_create);
+ cls_register_cxx_method(h_class, "tag_list",
+ CLS_METHOD_RD,
+ journal_tag_list, &h_journal_tag_list);
/// methods for journal_data.$journal_id.$object_id objects
cls_register_cxx_method(h_class, "guard_append",
return cond.wait();
}
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *tag_tid) {
+ librados::ObjectReadOperation op;
+ get_next_tag_tid_start(&op);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ bufferlist::iterator iter = out_bl.begin();
+ r = get_next_tag_tid_finish(&iter, tag_tid);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
+ bufferlist bl;
+ op->exec("journal", "get_next_tag_tid", bl);
+}
+
+int get_next_tag_tid_finish(bufferlist::iterator *iter,
+ uint64_t *tag_tid) {
+ try {
+ ::decode(*tag_tid, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, cls::journal::Tag *tag) {
+ librados::ObjectReadOperation op;
+ get_tag_start(&op, tag_tid);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ bufferlist::iterator iter = out_bl.begin();
+ r = get_tag_finish(&iter, tag);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void get_tag_start(librados::ObjectReadOperation *op,
+ uint64_t tag_tid) {
+ bufferlist bl;
+ ::encode(tag_tid, bl);
+ op->exec("journal", "get_tag", bl);
+}
+
+int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) {
+ try {
+ ::decode(*tag, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data) {
+ librados::ObjectWriteOperation op;
+ tag_create(&op, tag_tid, tag_class, data);
+ return ioctx.operate(oid, &op);
+}
+
+void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
+ uint64_t tag_class, const bufferlist &data) {
+ bufferlist bl;
+ ::encode(tag_tid, bl);
+ ::encode(tag_class, bl);
+ ::encode(data, bl);
+ op->exec("journal", "tag_create", bl);
+}
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, boost::optional<uint64_t> tag_class,
+ std::set<cls::journal::Tag> *tags) {
+ tags->clear();
+ uint64_t start_after_tag_tid = 0;
+ while (true) {
+ librados::ObjectReadOperation op;
+ tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
+ tag_class);
+
+ bufferlist out_bl;
+ int r = ioctx.operate(oid, &op, &out_bl);
+ if (r < 0) {
+ return r;
+ }
+
+ bufferlist::iterator iter = out_bl.begin();
+ std::set<cls::journal::Tag> decode_tags;
+ r = tag_list_finish(&iter, &decode_tags);
+ if (r < 0) {
+ return r;
+ }
+
+ tags->insert(decode_tags.begin(), decode_tags.end());
+ if (decode_tags.size() < JOURNAL_MAX_RETURN) {
+ break;
+ }
+ }
+ return 0;
+}
+
+void tag_list_start(librados::ObjectReadOperation *op,
+ uint64_t start_after_tag_tid, uint64_t max_return,
+ const std::string &client_id,
+ boost::optional<uint64_t> tag_class) {
+ bufferlist bl;
+ ::encode(start_after_tag_tid, bl);
+ ::encode(max_return, bl);
+ ::encode(client_id, bl);
+ ::encode(tag_class, bl);
+ op->exec("journal", "tag_list", bl);
+}
+
+int tag_list_finish(bufferlist::iterator *iter,
+ std::set<cls::journal::Tag> *tags) {
+ try {
+ ::decode(*tags, *iter);
+ } catch (const buffer::error &err) {
+ return -EBADMSG;
+ }
+ return 0;
+}
+
void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
bufferlist bl;
::encode(soft_max_size, bl);
#include <map>
#include <set>
#include <string>
+#include <boost/optional.hpp>
class Context;
void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set);
void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set);
+// journal client helpers
void client_register(librados::ObjectWriteOperation *op,
const std::string &id, const std::string &description);
int client_register(librados::IoCtx &ioctx, const std::string &oid,
int client_list(librados::IoCtx &ioctx, const std::string &oid,
std::set<cls::journal::Client> *clients);
+// journal tag helpers
+int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t *tag_tid);
+void get_next_tag_tid_start(librados::ObjectReadOperation *op);
+int get_next_tag_tid_finish(bufferlist::iterator *iter,
+ uint64_t *tag_tid);
+
+int get_tag(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, cls::journal::Tag *tag);
+void get_tag_start(librados::ObjectReadOperation *op,
+ uint64_t tag_tid);
+int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag);
+
+int tag_create(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data);
+void tag_create(librados::ObjectWriteOperation *op,
+ uint64_t tag_tid, uint64_t tag_class,
+ const bufferlist &data);
+
+int tag_list(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id, boost::optional<uint64_t> tag_class,
+ std::set<cls::journal::Tag> *tags);
+void tag_list_start(librados::ObjectReadOperation *op,
+ uint64_t start_after_tag_tid, uint64_t max_return,
+ const std::string &client_id,
+ boost::optional<uint64_t> tag_class);
+int tag_list_finish(bufferlist::iterator *iter,
+ std::set<cls::journal::Tag> *tags);
+
+// journal entry helpers
void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size);
} // namespace client
void ObjectSetPosition::generate_test_instances(
std::list<ObjectSetPosition *> &o) {
o.push_back(new ObjectSetPosition());
-
- EntryPositions entry_positions;
- entry_positions.push_back(EntryPosition(1, 120));
- entry_positions.push_back(EntryPosition(2, 121));
- o.push_back(new ObjectSetPosition(1, entry_positions));
+ o.push_back(new ObjectSetPosition(1, {{1, 120}, {2, 121}}));
}
void Client::encode(bufferlist& bl) const {
void Client::dump(Formatter *f) const {
f->dump_string("id", id);
f->dump_string("description", description);
+
f->open_object_section("commit_position");
commit_position.dump(f);
f->close_section();
void Client::generate_test_instances(std::list<Client *> &o) {
o.push_back(new Client());
o.push_back(new Client("id", "desc"));
+ o.push_back(new Client("id", "desc", {1, {{1, 120}, {2, 121}}}));
+}
+
+void Tag::encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(tid, bl);
+ ::encode(tag_class, bl);
+ ::encode(data, bl);
+ ENCODE_FINISH(bl);
+}
+
+void Tag::decode(bufferlist::iterator& iter) {
+ DECODE_START(1, iter);
+ ::decode(tid, iter);
+ ::decode(tag_class, iter);
+ ::decode(data, iter);
+ DECODE_FINISH(iter);
+}
+
+void Tag::dump(Formatter *f) const {
+ f->dump_unsigned("tid", tid);
+ f->dump_unsigned("tag_class", tag_class);
- EntryPositions entry_positions;
- entry_positions.push_back(EntryPosition(1, 120));
- entry_positions.push_back(EntryPosition(2, 121));
- o.push_back(new Client("id", "desc", ObjectSetPosition(1, entry_positions)));
+ std::stringstream data_ss;
+ data.hexdump(data_ss);
+ f->dump_string("data", data_ss.str());
+}
+
+void Tag::generate_test_instances(std::list<Tag *> &o) {
+ o.push_back(new Tag());
+
+ bufferlist data;
+ data.append(std::string('1', 128));
+ o.push_back(new Tag(123, 234, data));
}
std::ostream &operator<<(std::ostream &os,
const ObjectSetPosition &object_set_position) {
os << "[object_number=" << object_set_position.object_number << ", "
<< "positions=[";
- for (EntryPositions::const_iterator it =
- object_set_position.entry_positions.begin();
- it != object_set_position.entry_positions.end(); ++it) {
- os << *it;
+ std::string delim;
+ for (auto &entry_position : object_set_position.entry_positions) {
+ os << entry_position << delim;
+ delim = ", ";
}
os << "]]";
return os;
}
std::ostream &operator<<(std::ostream &os, const Client &client) {
- os << "[id=" << client.id << ", description=" << client.description
- << ", commit_position=" << client.commit_position << "]";
+ os << "[id=" << client.id << ", "
+ << "description=" << client.description << ", "
+ << "commit_position=" << client.commit_position << "]";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const Tag &tag) {
+ os << "[tid=" << tag.tid << ", "
+ << "tag_class=" << tag.tag_class << ", "
+ << "data=";
+ tag.data.hexdump(os);
+ os << "]";
return os;
}
#include "include/encoding.h"
#include <iosfwd>
#include <list>
+#include <set>
#include <string>
namespace ceph {
: id(_id), description(_description), commit_position(_commit_position) {}
inline bool operator==(const Client &rhs) const {
- return (id == rhs.id && description == rhs.description &&
+ return (id == rhs.id &&
+ description == rhs.description &&
commit_position == rhs.commit_position);
}
inline bool operator<(const Client &rhs) const {
static void generate_test_instances(std::list<Client *> &o);
};
+struct Tag {
+ static const uint64_t TAG_CLASS_NEW = static_cast<uint64_t>(-1);
+
+ uint64_t tid;
+ uint64_t tag_class;
+ bufferlist data;
+
+ Tag() : tid(0), tag_class(0) {}
+ Tag(uint64_t tid, uint64_t tag_class, const bufferlist &data)
+ : tid(tid), tag_class(tag_class), data(data) {}
+
+ inline bool operator==(const Tag &rhs) const {
+ return (tid == rhs.tid &&
+ tag_class == rhs.tag_class &&
+ data.contents_equal(rhs.data));
+ }
+ inline bool operator<(const Tag &rhs) const {
+ return (tid < rhs.tid);
+ }
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& iter);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<Tag *> &o);
+};
+
WRITE_CLASS_ENCODER(EntryPosition);
WRITE_CLASS_ENCODER(ObjectSetPosition);
WRITE_CLASS_ENCODER(Client);
+WRITE_CLASS_ENCODER(Tag);
std::ostream &operator<<(std::ostream &os,
const EntryPosition &entry_position);
const ObjectSetPosition &object_set_position);
std::ostream &operator<<(std::ostream &os,
const Client &client);
+std::ostream &operator<<(std::ostream &os, const Tag &tag);
} // namespace journal
} // namespace cls
ASSERT_EQ(-ENOENT, client::client_unregister(ioctx, oid, "id1"));
}
+TEST_F(TestClsJournal, ClientUnregisterPruneTags) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id2", "desc2"));
+
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+ librados::ObjectWriteOperation op1;
+ client::client_commit(&op1, "id1", {1, {{2, 120}}});
+ ASSERT_EQ(0, ioctx.operate(oid, &op1));
+
+ ASSERT_EQ(0, client::client_unregister(ioctx, oid, "id2"));
+
+ std::set<Tag> expected_tags = {{0, 0, {}}, {2, 1, {}}};
+ std::set<Tag> tags;
+ ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+ boost::optional<uint64_t>(), &tags));
+ ASSERT_EQ(expected_tags, tags);
+}
+
TEST_F(TestClsJournal, ClientCommit) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
ASSERT_EQ(expected_clients, read_clients);
}
+TEST_F(TestClsJournal, GetNextTagTid) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ uint64_t tag_tid;
+ ASSERT_EQ(-ENOENT, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+ ASSERT_EQ(0, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+ ASSERT_EQ(0U, tag_tid);
+
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::get_next_tag_tid(ioctx, oid, &tag_tid));
+ ASSERT_EQ(1U, tag_tid);
+}
+
+TEST_F(TestClsJournal, TagCreate) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ ASSERT_EQ(-ENOENT, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+ ASSERT_EQ(-ESTALE, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(-EINVAL, client::tag_create(ioctx, oid, 0, 1, bufferlist()));
+
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(-EEXIST, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+ std::set<Tag> expected_tags = {
+ {0, 0, {}}, {1, 1, {}}, {2, 1, {}}};
+ std::set<Tag> tags;
+ ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+ boost::optional<uint64_t>(), &tags));
+ ASSERT_EQ(expected_tags, tags);
+}
+
+TEST_F(TestClsJournal, TagCreatePrunesTags) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 0, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 1, Tag::TAG_CLASS_NEW,
+ bufferlist()));
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
+
+ librados::ObjectWriteOperation op1;
+ client::client_commit(&op1, "id1", {1, {{2, 120}}});
+ ASSERT_EQ(0, ioctx.operate(oid, &op1));
+
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, 3, 0, bufferlist()));
+
+ std::set<Tag> expected_tags = {
+ {0, 0, {}}, {2, 1, {}}, {3, 0, {}}};
+ std::set<Tag> tags;
+ ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1",
+ boost::optional<uint64_t>(), &tags));
+ ASSERT_EQ(expected_tags, tags);
+}
+
+TEST_F(TestClsJournal, TagList) {
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+ std::string oid = get_temp_image_name();
+
+ ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
+ ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", "desc1"));
+
+ std::set<Tag> expected_all_tags;
+ std::set<Tag> expected_filtered_tags;
+ for (uint32_t i = 0; i < 96; ++i) {
+ uint64_t tag_class = Tag::TAG_CLASS_NEW;
+ if (i > 1) {
+ tag_class = i % 2 == 0 ? 0 : 1;
+ }
+
+ Tag tag(i, i % 2 == 0 ? 0 : 1, bufferlist());
+ expected_all_tags.insert(tag);
+ if (i % 2 == 0) {
+ expected_filtered_tags.insert(tag);
+ }
+ ASSERT_EQ(0, client::tag_create(ioctx, oid, i, tag_class,
+ bufferlist()));
+ }
+
+ std::set<Tag> tags;
+ ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1", boost::optional<uint64_t>(),
+ &tags));
+ ASSERT_EQ(expected_all_tags, tags);
+
+ ASSERT_EQ(0, client::tag_list(ioctx, oid, "id1", boost::optional<uint64_t>(0),
+ &tags));
+ ASSERT_EQ(expected_filtered_tags, tags);
+}
+
TEST_F(TestClsJournal, GuardAppend) {
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));