]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: add placeholder for marking clients are disconnected
authorJason Dillaman <dillaman@redhat.com>
Tue, 15 Mar 2016 19:50:31 +0000 (15:50 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 16 Mar 2016 00:38:34 +0000 (20:38 -0400)
When a client is disconnected from the journal, entries can be
pruned regardless of the affected client's commit position.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/cls/journal/cls_journal.cc
src/cls/journal/cls_journal_client.cc
src/cls/journal/cls_journal_client.h
src/cls/journal/cls_journal_types.cc
src/cls/journal/cls_journal_types.h
src/journal/JournalMetadata.cc
src/test/cls_journal/test_cls_journal.cc

index 632359b677502e438569f8df2f131d9aefec2d43..472b1003219deab4bc59176e5608b9860539df5f 100644 (file)
@@ -26,7 +26,8 @@ cls_method_handle_t h_journal_get_active_set;
 cls_method_handle_t h_journal_set_active_set;
 cls_method_handle_t h_journal_get_client;
 cls_method_handle_t h_journal_client_register;
-cls_method_handle_t h_journal_client_update;
+cls_method_handle_t h_journal_client_update_data;
+cls_method_handle_t h_journal_client_update_state;
 cls_method_handle_t h_journal_client_unregister;
 cls_method_handle_t h_journal_client_commit;
 cls_method_handle_t h_journal_client_list;
@@ -571,8 +572,8 @@ int journal_client_register(cls_method_context_t hctx, bufferlist *in,
  * Output:
  * @returns 0 on success, negative error code on failure
  */
-int journal_client_update(cls_method_context_t hctx, bufferlist *in,
-                          bufferlist *out) {
+int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
+                               bufferlist *out) {
   std::string id;
   bufferlist data;
   try {
@@ -599,6 +600,45 @@ int journal_client_update(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
+/**
+ * Input:
+ * @param id (string) - unique client id
+ * @param state (uint8_t) - client state
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
+                                bufferlist *out) {
+  std::string id;
+  cls::journal::ClientState state;
+  bufferlist data;
+  try {
+    bufferlist::iterator iter = in->begin();
+    ::decode(id, iter);
+    uint8_t state_raw;
+    ::decode(state_raw, iter);
+    state = static_cast<cls::journal::ClientState>(state_raw);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode input parameters: %s", err.what());
+    return -EINVAL;
+  }
+
+  std::string key(key_from_client_id(id));
+  cls::journal::Client client;
+  int r = read_key(hctx, key, &client);
+  if (r < 0) {
+    return r;
+  }
+
+  client.state = state;
+  r = write_key(hctx, key, client);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
 /**
  * Input:
  * @param id (string) - unique client id
@@ -1069,9 +1109,14 @@ void CEPH_CLS_API __cls_init()
   cls_register_cxx_method(h_class, "client_register",
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           journal_client_register, &h_journal_client_register);
-  cls_register_cxx_method(h_class, "client_update",
+  cls_register_cxx_method(h_class, "client_update_data",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          journal_client_update_data,
+                          &h_journal_client_update_data);
+  cls_register_cxx_method(h_class, "client_update_state",
                           CLS_METHOD_RD | CLS_METHOD_WR,
-                          journal_client_update, &h_journal_client_update);
+                          journal_client_update_state,
+                          &h_journal_client_update_state);
   cls_register_cxx_method(h_class, "client_unregister",
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           journal_client_unregister,
index 2566800611a64e0333a8b8b12af40356415cfb94..7fbfb517a69a91549013c1d29652d66cb8461005 100644 (file)
@@ -274,19 +274,30 @@ void client_register(librados::ObjectWriteOperation *op,
   op->exec("journal", "client_register", bl);
 }
 
-int client_update(librados::IoCtx &ioctx, const std::string &oid,
-                  const std::string &id, const bufferlist &data) {
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+                       const std::string &id, const bufferlist &data) {
   librados::ObjectWriteOperation op;
-  client_update(&op, id, data);
+  client_update_data(&op, id, data);
   return ioctx.operate(oid, &op);
 }
 
-void client_update(librados::ObjectWriteOperation *op,
-                   const std::string &id, const bufferlist &data) {
+void client_update_data(librados::ObjectWriteOperation *op,
+                        const std::string &id, const bufferlist &data) {
   bufferlist bl;
   ::encode(id, bl);
   ::encode(data, bl);
-  op->exec("journal", "client_update", bl);
+  op->exec("journal", "client_update_data", bl);
+}
+
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+                        const std::string &id, cls::journal::ClientState state) {
+  bufferlist bl;
+  ::encode(id, bl);
+  ::encode(static_cast<uint8_t>(state), bl);
+
+  librados::ObjectWriteOperation op;
+  op.exec("journal", "client_update_state", bl);
+  return ioctx.operate(oid, &op);
 }
 
 int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
index 9aa00af4514031da71f56bbbb1d41478cc6e7c5b..94ba4b2600b8c05586ad88b86adc1efca4a115c8 100644 (file)
@@ -45,10 +45,12 @@ int client_register(librados::IoCtx &ioctx, const std::string &oid,
 void client_register(librados::ObjectWriteOperation *op,
                      const std::string &id, const bufferlist &data);
 
-int client_update(librados::IoCtx &ioctx, const std::string &oid,
-                  const std::string &id, const bufferlist &data);
-void client_update(librados::ObjectWriteOperation *op,
-                   const std::string &id, const bufferlist &data);
+int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
+                       const std::string &id, const bufferlist &data);
+void client_update_data(librados::ObjectWriteOperation *op,
+                        const std::string &id, const bufferlist &data);
+int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
+                        const std::string &id, cls::journal::ClientState state);
 
 int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
                       const std::string &id);
index 595ee133225b06152f9dd9883127cd8ae2fb3092..6dd5664888cfc6074bd6185c26ff14ccc010be57 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "cls/journal/cls_journal_types.h"
+#include "include/stringify.h"
 #include "common/Formatter.h"
 
 namespace cls {
@@ -67,6 +68,7 @@ void Client::encode(bufferlist& bl) const {
   ::encode(id, bl);
   ::encode(data, bl);
   ::encode(commit_position, bl);
+  ::encode(static_cast<uint8_t>(state), bl);
   ENCODE_FINISH(bl);
 }
 
@@ -75,6 +77,10 @@ void Client::decode(bufferlist::iterator& iter) {
   ::decode(id, iter);
   ::decode(data, iter);
   ::decode(commit_position, iter);
+
+  uint8_t state_raw;
+  ::decode(state_raw, iter);
+  state = static_cast<ClientState>(state_raw);
   DECODE_FINISH(iter);
 }
 
@@ -88,6 +94,8 @@ void Client::dump(Formatter *f) const {
   f->open_object_section("commit_position");
   commit_position.dump(f);
   f->close_section();
+
+  f->dump_string("state", stringify(state));
 }
 
 void Client::generate_test_instances(std::list<Client *> &o) {
@@ -132,6 +140,21 @@ void Tag::generate_test_instances(std::list<Tag *> &o) {
   o.push_back(new Tag(123, 234, data));
 }
 
+std::ostream &operator<<(std::ostream &os, const ClientState &state) {
+  switch (state) {
+  case CLIENT_STATE_CONNECTED:
+    os << "connected";
+    break;
+  case CLIENT_STATE_DISCONNECTED:
+    os << "disconnected";
+    break;
+  default:
+    os << "unknown (" << static_cast<uint32_t>(state) << ")";
+    break;
+  }
+  return os;
+}
+
 std::ostream &operator<<(std::ostream &os,
                          const ObjectPosition &object_position) {
   os << "["
@@ -155,7 +178,8 @@ std::ostream &operator<<(std::ostream &os,
 
 std::ostream &operator<<(std::ostream &os, const Client &client) {
   os << "[id=" << client.id << ", "
-     << "commit_position=" << client.commit_position << "]";
+     << "commit_position=" << client.commit_position << ", "
+     << "state=" << client.state << "]";
   return os;
 }
 
index 2b1d90030f5e6fc1191fb94056828c0db29a7325..0381aff93c6b01223f9da6dc8ea500b41a557fbb 100644 (file)
@@ -72,20 +72,28 @@ struct ObjectSetPosition {
   static void generate_test_instances(std::list<ObjectSetPosition *> &o);
 };
 
+enum ClientState {
+  CLIENT_STATE_CONNECTED = 0,
+  CLIENT_STATE_DISCONNECTED = 1
+};
+
 struct Client {
   std::string id;
   bufferlist data;
   ObjectSetPosition commit_position;
+  ClientState state;
 
-  Client() {}
+  Client() : state(CLIENT_STATE_CONNECTED) {}
   Client(const std::string& _id, const bufferlist &_data,
-         const ObjectSetPosition &_commit_position = ObjectSetPosition())
-    : id(_id), data(_data), commit_position(_commit_position) {}
+         const ObjectSetPosition &_commit_position = ObjectSetPosition(),
+         ClientState _state = CLIENT_STATE_CONNECTED)
+    : id(_id), data(_data), commit_position(_commit_position), state(_state) {}
 
   inline bool operator==(const Client &rhs) const {
     return (id == rhs.id &&
             data.contents_equal(rhs.data) &&
-            commit_position == rhs.commit_position);
+            commit_position == rhs.commit_position &&
+            state == rhs.state);
   }
   inline bool operator<(const Client &rhs) const {
     return (id < rhs.id);
@@ -130,6 +138,7 @@ WRITE_CLASS_ENCODER(ObjectSetPosition);
 WRITE_CLASS_ENCODER(Client);
 WRITE_CLASS_ENCODER(Tag);
 
+std::ostream &operator<<(std::ostream &os, const ClientState &state);
 std::ostream &operator<<(std::ostream &os,
                          const ObjectPosition &object_position);
 std::ostream &operator<<(std::ostream &os,
index b93f0379065d83ac3359796c58143f53a25eebd7..830b4caaa7d0c090f1983350153b71f3a684444f 100644 (file)
@@ -328,7 +328,7 @@ void JournalMetadata::update_client(const bufferlist &data,
                                    Context *on_finish) {
   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
   librados::ObjectWriteOperation op;
-  client::client_update(&op, m_client_id, data);
+  client::client_update_data(&op, m_client_id, data);
 
   C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);
 
index 1eb7663da763f402dfc77bc77e2b63ee945a928b..9c6000dfd46e7cd67df45267bc1fa54a36b22421 100644 (file)
@@ -235,19 +235,20 @@ TEST_F(TestClsJournal, ClientRegisterDuplicate) {
   ASSERT_EQ(-EEXIST, client::client_register(ioctx, oid, "id1", bufferlist()));
 }
 
-TEST_F(TestClsJournal, ClientUpdate) {
+TEST_F(TestClsJournal, ClientUpdateData) {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
 
   std::string oid = get_temp_image_name();
 
-  ASSERT_EQ(-ENOENT, client::client_update(ioctx, oid, "id1", bufferlist()));
+  ASSERT_EQ(-ENOENT, client::client_update_data(ioctx, oid, "id1",
+                                                bufferlist()));
 
   ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
 
   bufferlist data;
   data.append(std::string('1', 128));
-  ASSERT_EQ(0, client::client_update(ioctx, oid, "id1", data));
+  ASSERT_EQ(0, client::client_update_data(ioctx, oid, "id1", data));
 
   Client client;
   ASSERT_EQ(0, client::get_client(ioctx, oid, "id1", &client));
@@ -255,6 +256,30 @@ TEST_F(TestClsJournal, ClientUpdate) {
   ASSERT_EQ(expected_client, client);
 }
 
+TEST_F(TestClsJournal, ClientUpdateState) {
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));
+
+  std::string oid = get_temp_image_name();
+
+  ASSERT_EQ(-ENOENT, client::client_update_state(ioctx, oid, "id1",
+                                                 CLIENT_STATE_DISCONNECTED));
+
+  ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
+
+  bufferlist data;
+  data.append(std::string('1', 128));
+  ASSERT_EQ(0, client::client_update_state(ioctx, oid, "id1",
+                                           CLIENT_STATE_DISCONNECTED));
+
+  Client client;
+  ASSERT_EQ(0, client::get_client(ioctx, oid, "id1", &client));
+  Client expected_client;
+  expected_client.id = "id1";
+  expected_client.state = CLIENT_STATE_DISCONNECTED;
+  ASSERT_EQ(expected_client, client);
+}
+
 TEST_F(TestClsJournal, ClientUnregister) {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), ioctx));