]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: update JournalTrimmer to support new commit tracking
authorJason Dillaman <dillaman@redhat.com>
Wed, 24 Feb 2016 03:10:20 +0000 (22:10 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 26 Feb 2016 16:54:52 +0000 (11:54 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalTrimmer.cc
src/journal/JournalTrimmer.h
src/test/journal/test_JournalTrimmer.cc

index 0998c259fb0c261b86579723df1e39504a60d455..f00925303c01d77a38a884056cd44394a96a41de 100644 (file)
@@ -18,13 +18,18 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
                                const std::string &object_oid_prefix,
                                const JournalMetadataPtr &journal_metadata)
     : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
-      m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
-      m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) {
+      m_journal_metadata(journal_metadata), m_metadata_listener(this),
+      m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
+      m_remove_set(0), m_remove_set_ctx(NULL) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+  m_journal_metadata->add_listener(&m_metadata_listener);
 }
 
 JournalTrimmer::~JournalTrimmer() {
+  m_journal_metadata->remove_listener(&m_metadata_listener);
+
   m_journal_metadata->flush_commit_position();
   m_async_op_tracker.wait_for_ops();
 }
@@ -110,39 +115,43 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
   }
 }
 
-void JournalTrimmer::handle_commit_position_safe(int r) {
-  ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+void JournalTrimmer::handle_metadata_updated() {
+  ldout(m_cct, 20) << __func__ << dendl;
 
   Mutex::Locker locker(m_lock);
-  if (r == 0) {
-    // TODO
-    /*
-    uint8_t splay_width = m_journal_metadata->get_splay_width();
-    uint64_t object_set = object_set_position.object_number / splay_width;
-
-    JournalMetadata::RegisteredClients registered_clients;
-    m_journal_metadata->get_registered_clients(&registered_clients);
-
-    bool trim_permitted = true;
-    for (JournalMetadata::RegisteredClients::iterator it =
-           registered_clients.begin();
-         it != registered_clients.end(); ++it) {
-      const JournalMetadata::Client &client = *it;
-      uint64_t client_object_set = client.commit_position.object_number /
-                                   splay_width;
-      if (client.id != m_journal_metadata->get_client_id() &&
-          client_object_set < object_set) {
-        ldout(m_cct, 20) << "object set " << client_object_set << " still "
-                         << "in-use by client " << client.id << dendl;
-        trim_permitted = false;
-        break;
-      }
+
+  JournalMetadata::RegisteredClients registered_clients;
+  m_journal_metadata->get_registered_clients(&registered_clients);
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint64_t minimum_set = m_journal_metadata->get_minimum_set();
+  uint64_t active_set = m_journal_metadata->get_active_set();
+  uint64_t minimum_commit_set = active_set;
+  std::string minimum_client_id;
+
+  // TODO: add support for trimming past "laggy" clients
+  for (auto &client : registered_clients) {
+    if (client.commit_position.object_positions.empty()) {
+      // client hasn't recorded any commits
+      minimum_commit_set = minimum_set;
+      minimum_client_id = client.id;
+      break;
     }
 
-    if (trim_permitted) {
-      trim_objects(object_set_position.object_number / splay_width);
+    for (auto &position : client.commit_position.object_positions) {
+      uint64_t object_set = position.object_number / splay_width;
+      if (object_set < minimum_commit_set) {
+        minimum_client_id = client.id;
+        minimum_commit_set = object_set;
+      }
     }
-    */
+  }
+
+  if (minimum_commit_set > minimum_set) {
+    trim_objects(minimum_commit_set);
+  } else {
+    ldout(m_cct, 20) << "object set " << minimum_commit_set << " still "
+                     << "in-use by client " << minimum_client_id << dendl;
   }
 }
 
@@ -153,23 +162,26 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
   Mutex::Locker locker(m_lock);
   m_remove_set_pending = false;
 
-  if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) {
+  if (r == -ENOENT) {
+    // no objects within the set existed
+    r = 0;
+  }
+  if (r == 0) {
     // advance the minimum set to the next set
     m_journal_metadata->set_minimum_set(object_set + 1);
+    uint64_t active_set = m_journal_metadata->get_active_set();
     uint64_t minimum_set = m_journal_metadata->get_minimum_set();
 
-    if (m_remove_set > minimum_set) {
+    if (m_remove_set > minimum_set && minimum_set <= active_set) {
       m_remove_set_pending = true;
       remove_set(minimum_set);
     }
-  } else if (r == -ENOENT) {
-    // no objects within the set existed
-    r = 0;
   }
 
-  if (m_remove_set_ctx != NULL && !m_remove_set_pending) {
+  if (m_remove_set_ctx != nullptr && !m_remove_set_pending) {
     ldout(m_cct, 20) << "completing remove set context" << dendl;
     m_remove_set_ctx->complete(r);
+    m_remove_set_ctx = nullptr;
   }
 }
 
index 349592b36e42512b0506327a829c3bc80210a9e7..8c577767d1f626d6a2b3283bc81ffc83ab2490c8 100644 (file)
@@ -29,12 +29,22 @@ public:
 private:
   typedef std::function<Context*()> CreateContext;
 
+  struct MetadataListener : public JournalMetadata::Listener {
+    JournalTrimmer *journal_trimmmer;
+
+    MetadataListener(JournalTrimmer *journal_trimmmer)
+      : journal_trimmmer(journal_trimmmer) {
+    }
+    void handle_update(JournalMetadata *) {
+      journal_trimmmer->handle_metadata_updated();
+    }
+  };
+
   struct C_CommitPositionSafe : public Context {
     JournalTrimmer *journal_trimmer;
 
     C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
       : journal_trimmer(_journal_trimmer) {
-      Mutex::Locker locker(journal_trimmer->m_lock);
       journal_trimmer->m_async_op_tracker.start_op();
     }
     virtual ~C_CommitPositionSafe() {
@@ -42,7 +52,6 @@ private:
     }
 
     virtual void finish(int r) {
-      journal_trimmer->handle_commit_position_safe(r);
     }
   };
   struct C_RemoveSet : public Context {
@@ -66,6 +75,7 @@ private:
   std::string m_object_oid_prefix;
 
   JournalMetadataPtr m_journal_metadata;
+  MetadataListener m_metadata_listener;
 
   AsyncOpTracker m_async_op_tracker;
 
@@ -82,8 +92,7 @@ private:
   void trim_objects(uint64_t minimum_set);
   void remove_set(uint64_t object_set);
 
-  void handle_commit_position_safe(int r);
-
+  void handle_metadata_updated();
   void handle_set_removed(int r, uint64_t object_set);
 };
 
index 896f80c88faec46430ae8ecd811c89501f3c8df3..62a6a2751b3e480c6bf658f53941818f1de310a0 100644 (file)
@@ -85,10 +85,10 @@ TEST_F(TestJournalTrimmer, Committed) {
   uint64_t commit_tid5;
   uint64_t commit_tid6;
   ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1));
-  ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2));
+  ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2));
   ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3));
   ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4));
-  ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid5));
+  ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5));
   ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6));
 
   journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);