]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: allow to trim journal for "laggy" clients
authorMykola Golub <mgolub@mirantis.com>
Wed, 13 Jul 2016 12:49:40 +0000 (15:49 +0300)
committerMykola Golub <mgolub@mirantis.com>
Mon, 5 Sep 2016 05:51:54 +0000 (08:51 +0300)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalTrimmer.cc
src/journal/Settings.h
src/test/journal/RadosTestFixture.cc
src/test/journal/RadosTestFixture.h
src/test/journal/test_JournalMetadata.cc

index de46bd7ff0af1624e7fa299b19f728dd498e2dde..08a7e9307aa45bb24fed64caca6c3f59e71e14c2 100644 (file)
@@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
     Client client(m_client_id, bufferlist());
     RegisteredClients::iterator it = refresh->registered_clients.find(client);
     if (it != refresh->registered_clients.end()) {
+      if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+       ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
+                       << dendl;
+      }
       m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
       m_active_set = MAX(m_active_set, refresh->active_set);
       m_registered_clients = refresh->registered_clients;
@@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() {
   librados::ObjectWriteOperation op;
   client::client_commit(&op, m_client_id, m_commit_position);
 
-  C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
+  Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
   m_commit_position_ctx = NULL;
 
+  ctx = schedule_laggy_clients_disconnect(ctx);
+
   librados::AioCompletion *comp =
     librados::Rados::aio_create_completion(ctx, NULL,
                                            utils::rados_ctx_callback);
@@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() {
     if (r == -ENOENT) {
       ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
     } else {
-      lderr(m_cct) << __func__ << ": failed to watch journal"
+      lderr(m_cct) << __func__ << ": failed to watch journal"
                    << cpp_strerror(r) << dendl;
     }
     schedule_watch_reset();
@@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) {
   ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
 }
 
+Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
+  assert(m_lock.is_locked());
+
+  ldout(m_cct, 20) << __func__ << dendl;
+
+  if (m_settings.max_concurrent_object_sets <= 0) {
+    return on_finish;
+  }
+
+  Context *ctx = on_finish;
+
+  for (auto &c : m_registered_clients) {
+    if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+       c.id == m_client_id ||
+       m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+      continue;
+    }
+    const std::string &client_id = c.id;
+    uint64_t object_set = 0;
+    if (!c.commit_position.object_positions.empty()) {
+      auto &position = *(c.commit_position.object_positions.begin());
+      object_set = position.object_number / m_splay_width;
+    }
+
+    if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+      ldout(m_cct, 1) << __func__ << ": " << client_id
+                     << ": scheduling disconnect" << dendl;
+
+      ctx = new FunctionContext([this, client_id, ctx](int r1) {
+          ldout(m_cct, 10) << __func__ << ": " << client_id
+                           << ": flagging disconnected" << dendl;
+
+          librados::ObjectWriteOperation op;
+          client::client_update_state(&op, client_id,
+                                      cls::journal::CLIENT_STATE_DISCONNECTED);
+
+          librados::AioCompletion *comp =
+              librados::Rados::aio_create_completion(ctx, nullptr,
+                                                     utils::rados_ctx_callback);
+          int r = m_ioctx.aio_operate(m_oid, comp, &op);
+          assert(r == 0);
+          comp->release();
+       });
+    }
+  }
+
+  if (ctx == on_finish) {
+    ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
+  }
+
+  return ctx;
+}
+
 std::ostream &operator<<(std::ostream &os,
                         const JournalMetadata::RegisteredClients &clients) {
   os << "[";
index 01116d7d1e016f5a4e983dfc8f1fb3d68d46ed62..4055f9937047f587b64ef3fb4b54ef8a30687d79 100644 (file)
@@ -344,6 +344,8 @@ private:
   void handle_watch_error(int err);
   void handle_notified(int r);
 
+  Context *schedule_laggy_clients_disconnect(Context *on_finish);
+
   friend std::ostream &operator<<(std::ostream &os,
                                  const JournalMetadata &journal_metadata);
 };
index 5e68349513cfa6c0ae390218bffbeea4199d2970..283fe764ba028747880b2d3b799d7f96fde3d37d 100644 (file)
@@ -141,8 +141,11 @@ void JournalTrimmer::handle_metadata_updated() {
   uint64_t minimum_commit_set = active_set;
   std::string minimum_client_id;
 
-  // TODO: add support for trimming past "laggy" clients
   for (auto &client : registered_clients) {
+    if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+      continue;
+    }
+
     if (client.commit_position.object_positions.empty()) {
       // client hasn't recorded any commits
       minimum_commit_set = minimum_set;
index 603770cbaf65c2d5e3362c7bc28da4ab7a2f74e6..ca57125a859459b6c2cc209c2022854e3f7de052 100644 (file)
@@ -12,6 +12,9 @@ struct Settings {
   double commit_interval = 5;         ///< commit position throttle (in secs)
   uint64_t max_fetch_bytes = 0;       ///< 0 implies no limit
   uint64_t max_payload_bytes = 0;     ///< 0 implies object size limit
+  int max_concurrent_object_sets = 0; ///< 0 implies no limit
+  std::set<std::string> whitelisted_laggy_clients;
+                                      ///< clients that mustn't be disconnected
 };
 
 } // namespace journal
index dba3ec6a244a19329fca64370996749cce2505a9..a066559c92330ab792b8b618a54dfe086afc1426 100644 (file)
@@ -68,10 +68,12 @@ int RadosTestFixture::create(const std::string &oid, uint8_t order,
 
 journal::JournalMetadataPtr RadosTestFixture::create_metadata(
     const std::string &oid, const std::string &client_id,
-    double commit_interval, uint64_t max_fetch_bytes) {
+    double commit_interval, uint64_t max_fetch_bytes,
+    int max_concurrent_object_sets) {
   journal::Settings settings;
   settings.commit_interval = commit_interval;
   settings.max_fetch_bytes = max_fetch_bytes;
+  settings.max_concurrent_object_sets = max_concurrent_object_sets;
 
   journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
     m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id, settings));
index 4ea22e7cefcf96acd74bdd146d9e3c5e3567e8c1..64d3011a69f0e70ca000dfd04bf2b29fa7fb1d40 100644 (file)
@@ -26,7 +26,8 @@ public:
   journal::JournalMetadataPtr create_metadata(const std::string &oid,
                                               const std::string &client_id = "client",
                                               double commit_internal = 0.1,
-                                              uint64_t max_fetch_bytes = 0);
+                                              uint64_t max_fetch_bytes = 0,
+                                              int max_concurrent_object_sets = 0);
   int append(const std::string &oid, const bufferlist &bl);
 
   int client_register(const std::string &oid, const std::string &id = "client",
index b8f559365f81186bd609c8d84007afefdb516c46..560ee4e569f70ecfb2f024ab9d736bb9ef8f67d3 100644 (file)
@@ -21,9 +21,12 @@ public:
 
   journal::JournalMetadataPtr create_metadata(const std::string &oid,
                                               const std::string &client_id,
-                                              double commit_internal = 0.1) {
+                                              double commit_interval = 0.1,
+                                             uint64_t max_fetch_bytes = 0,
+                                              int max_concurrent_object_sets = 0) {
     journal::JournalMetadataPtr metadata = RadosTestFixture::create_metadata(
-      oid, client_id, commit_internal);
+      oid, client_id, commit_interval, max_fetch_bytes,
+      max_concurrent_object_sets);
     m_metadata_list.push_back(metadata);
     metadata->add_listener(&m_listener);
     return metadata;
@@ -116,6 +119,70 @@ TEST_F(TestJournalMetadata, UpdateActiveObject) {
   ASSERT_EQ(123U, metadata1->get_active_set());
 }
 
+TEST_F(TestJournalMetadata, DisconnectLaggyClient) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid, "client1", ""));
+  ASSERT_EQ(0, client_register(oid, "client2", "laggy"));
+
+  int max_concurrent_object_sets = 100;
+  journal::JournalMetadataPtr metadata =
+    create_metadata(oid, "client1", 0.1, 0, max_concurrent_object_sets);
+  ASSERT_EQ(0, init_metadata(metadata));
+  ASSERT_TRUE(wait_for_update(metadata));
+
+  ASSERT_EQ(0U, metadata->get_active_set());
+
+  journal::JournalMetadata::RegisteredClients clients;
+
+#define ASSERT_CLIENT_STATES(s1, s2)   \
+  ASSERT_EQ(2U, clients.size());       \
+  for (auto &c : clients) {            \
+    if (c.id == "client1") {           \
+      ASSERT_EQ(c.state, s1);          \
+    } else if (c.id == "client2") {    \
+      ASSERT_EQ(c.state, s2);          \
+    } else {                           \
+      ASSERT_TRUE(false);              \
+    }                                  \
+  }
+
+  metadata->get_registered_clients(&clients);
+  ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+                      cls::journal::CLIENT_STATE_CONNECTED);
+
+  // client2 is connected when active set <= max_concurrent_object_sets
+  ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets));
+  ASSERT_TRUE(wait_for_update(metadata));
+  uint64_t commit_tid = metadata->allocate_commit_tid(0, 0, 0);
+  C_SaferCond cond1;
+  metadata->committed(commit_tid, [&cond1]() { return &cond1; });
+  ASSERT_EQ(0, cond1.wait());
+  metadata->flush_commit_position();
+  ASSERT_TRUE(wait_for_update(metadata));
+  ASSERT_EQ(100U, metadata->get_active_set());
+  clients.clear();
+  metadata->get_registered_clients(&clients);
+  ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+                      cls::journal::CLIENT_STATE_CONNECTED);
+
+  // client2 is disconnected when active set > max_concurrent_object_sets
+  ASSERT_EQ(0, metadata->set_active_set(max_concurrent_object_sets + 1));
+  ASSERT_TRUE(wait_for_update(metadata));
+  commit_tid = metadata->allocate_commit_tid(0, 0, 1);
+  C_SaferCond cond2;
+  metadata->committed(commit_tid, [&cond2]() { return &cond2; });
+  ASSERT_EQ(0, cond2.wait());
+  metadata->flush_commit_position();
+  ASSERT_TRUE(wait_for_update(metadata));
+  ASSERT_EQ(101U, metadata->get_active_set());
+  clients.clear();
+  metadata->get_registered_clients(&clients);
+  ASSERT_CLIENT_STATES(cls::journal::CLIENT_STATE_CONNECTED,
+                      cls::journal::CLIENT_STATE_DISCONNECTED);
+}
+
 TEST_F(TestJournalMetadata, AssertActiveTag) {
   std::string oid = get_temp_oid();