]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: update JournalMetadata to support new commit handling
authorJason Dillaman <dillaman@redhat.com>
Tue, 23 Feb 2016 20:32:26 +0000 (15:32 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 26 Feb 2016 16:54:52 +0000 (11:54 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalTrimmer.cc
src/journal/JournalTrimmer.h
src/test/journal/test_JournalMetadata.cc

index 99d6871cef40dfc65ef3d87dbe89776d3e48d32e..19bd8b0c33c9dcf4443af054961daa0da1edf5d6 100644 (file)
@@ -20,33 +20,6 @@ using namespace cls::journal;
 
 namespace {
 
-// does not compare object number
-inline bool object_positions_less_equal(const ObjectSetPosition &lhs,
-                                       const ObjectSetPosition &rhs) {
-  if (lhs.object_positions == rhs.object_positions) {
-    return true;
-  }
-
-  if (lhs.object_positions.size() != rhs.object_positions.size()) {
-    return lhs.object_positions.size() < rhs.object_positions.size();
-  }
-
-  std::map<uint64_t, uint64_t> rhs_tids;
-  for (ObjectPositions::const_iterator it = rhs.object_positions.begin();
-       it != rhs.object_positions.end(); ++it) {
-    rhs_tids[it->tag_tid] = it->entry_tid;
-  }
-
-  for (ObjectPositions::const_iterator it = lhs.object_positions.begin();
-       it != lhs.object_positions.end(); ++it) {
-    const ObjectPosition &object_position = *it;
-    if (object_position.entry_tid < rhs_tids[object_position.tag_tid]) {
-      return true;
-    }
-  }
-  return false;
-}
-
 struct C_AllocateTag : public Context {
   CephContext *cct;
   librados::IoCtx &ioctx;
@@ -491,34 +464,6 @@ void JournalMetadata::flush_commit_position(Context *on_safe) {
   handle_commit_position_task();
 }
 
-void JournalMetadata::set_commit_position(
-    const ObjectSetPosition &commit_position, Context *on_safe) {
-  assert(on_safe != NULL);
-
-  Context *stale_ctx = nullptr;
-  {
-    Mutex::Locker timer_locker(m_timer_lock);
-    Mutex::Locker locker(m_lock);
-    ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
-                     << ", new=" << commit_position << dendl;
-    if (object_positions_less_equal(commit_position, m_client.commit_position) ||
-        object_positions_less_equal(commit_position, m_commit_position)) {
-      stale_ctx = on_safe;
-    } else {
-      stale_ctx = m_commit_position_ctx;
-
-      m_client.commit_position = commit_position;
-      m_commit_position = commit_position;
-      m_commit_position_ctx = on_safe;
-      schedule_commit_task();
-    }
-  }
-
-  if (stale_ctx != nullptr) {
-    stale_ctx->complete(-ESTALE);
-  }
-}
-
 void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
   Mutex::Locker locker(m_lock);
   uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
@@ -696,58 +641,84 @@ uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
 
   ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
                    << "object_num=" << object_num << ", "
-                   << "tag_tid=" << tag_tid << ", entry_tid=" << entry_tid << "]"
+                   << "tag_tid=" << tag_tid << ", "
+                   << "entry_tid=" << entry_tid << "]"
                    << dendl;
   return commit_tid;
 }
 
-bool JournalMetadata::committed(uint64_t commit_tid,
-                                ObjectSetPosition *object_set_position) {
+void JournalMetadata::committed(uint64_t commit_tid,
+                                const CreateContext &create_context) {
   ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
 
-  Mutex::Locker locker(m_lock);
+  ObjectSetPosition commit_position;
+  Context *stale_ctx = nullptr;
   {
+    Mutex::Locker timer_locker(m_timer_lock);
+    Mutex::Locker locker(m_lock);
+    assert(commit_tid > m_commit_position_tid);
+
+    if (!m_commit_position.object_positions.empty()) {
+      // in-flight commit position update
+      commit_position = m_commit_position;
+    } else {
+      // safe commit position
+      commit_position = m_client.commit_position;
+    }
+
     CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
     assert(it != m_pending_commit_tids.end());
 
     CommitEntry &commit_entry = it->second;
     commit_entry.committed = true;
-  }
 
-  if (!m_commit_position.object_positions.empty()) {
-    *object_set_position = m_commit_position;
-  } else {
-    *object_set_position = m_client.commit_position;
-  }
+    bool update_commit_position = false;
+    while (!m_pending_commit_tids.empty()) {
+      CommitTids::iterator it = m_pending_commit_tids.begin();
+      CommitEntry &commit_entry = it->second;
+      if (!commit_entry.committed) {
+        break;
+      }
 
-  bool update_commit_position = false;
-  while (!m_pending_commit_tids.empty()) {
-    CommitTids::iterator it = m_pending_commit_tids.begin();
-    CommitEntry &commit_entry = it->second;
-    if (!commit_entry.committed) {
-      break;
+      commit_position.object_positions.emplace_front(
+        commit_entry.object_num, commit_entry.tag_tid,
+        commit_entry.entry_tid);
+      m_pending_commit_tids.erase(it);
+      update_commit_position = true;
     }
 
-    // TODO
-    update_commit_position = true;
-  }
+    if (!update_commit_position) {
+      return;
+    }
 
-  if (update_commit_position) {
-    // prune the position to have unique tags in commit-order
-    std::set<uint64_t> in_use_tag_tids;
-    ObjectPositions::iterator it = object_set_position->object_positions.begin();
-    while (it != object_set_position->object_positions.end()) {
-      if (!in_use_tag_tids.insert(it->tag_tid).second) {
-        it = object_set_position->object_positions.erase(it);
+    // prune the position to have one position per splay offset
+    std::set<uint8_t> in_use_splay_offsets;
+    ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
+    while (ob_it != commit_position.object_positions.end()) {
+      uint8_t splay_offset = ob_it->object_number % m_splay_width;
+      if (!in_use_splay_offsets.insert(splay_offset).second) {
+        ob_it = commit_position.object_positions.erase(ob_it);
       } else {
-        ++it;
+        ++ob_it;
       }
     }
 
-    ldout(m_cct, 20) << "updated object set position: " << *object_set_position
+    stale_ctx = m_commit_position_ctx;
+    m_commit_position_ctx = create_context();
+    m_commit_position = commit_position;
+    m_commit_position_tid = commit_tid;
+
+    ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
+                     << "on_safe=" << m_commit_position_ctx << dendl;
+    schedule_commit_task();
+  }
+
+
+  if (stale_ctx != nullptr) {
+    ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
                      << dendl;
+    stale_ctx->complete(-ESTALE);
   }
-  return update_commit_position;
 }
 
 void JournalMetadata::notify_update() {
index 049329b409c87150ba329b3a216a951f8e646a89..5d21d71556e7dac37e61e494b1248497445e8b50 100644 (file)
@@ -15,6 +15,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/optional.hpp>
+#include <functional>
 #include <list>
 #include <map>
 #include <string>
@@ -30,6 +31,7 @@ typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
 
 class JournalMetadata : public RefCountedObject, boost::noncopyable {
 public:
+  typedef std::function<Context*()> CreateContext;
   typedef cls::journal::ObjectPosition ObjectPosition;
   typedef cls::journal::ObjectPositions ObjectPositions;
   typedef cls::journal::ObjectSetPosition ObjectSetPosition;
@@ -106,8 +108,6 @@ public:
 
   void flush_commit_position();
   void flush_commit_position(Context *on_safe);
-  void set_commit_position(const ObjectSetPosition &commit_position,
-                           Context *on_safe);
   void get_commit_position(ObjectSetPosition *commit_position) const {
     Mutex::Locker locker(m_lock);
     *commit_position = m_client.commit_position;
@@ -127,7 +127,7 @@ public:
 
   uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
                                uint64_t entry_tid);
-  bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position);
+  void committed(uint64_t commit_tid, const CreateContext &create_context);
 
   void notify_update();
   void async_notify_update();
@@ -306,6 +306,7 @@ private:
   size_t m_update_notifications;
   Cond m_update_cond;
 
+  uint64_t m_commit_position_tid = 0;
   ObjectSetPosition m_commit_position;
   Context *m_commit_position_ctx;
   Context *m_commit_position_task_ctx;
index 05a767285167693847511267ccf514b42c409852..0998c259fb0c261b86579723df1e39504a60d455 100644 (file)
@@ -63,19 +63,8 @@ int JournalTrimmer::remove_objects(bool force) {
 
 void JournalTrimmer::committed(uint64_t commit_tid) {
   ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl;
-
-  ObjectSetPosition object_set_position;
-  if (!m_journal_metadata->committed(commit_tid, &object_set_position)) {
-    return;
-  }
-
-  {
-    Mutex::Locker locker(m_lock);
-    m_async_op_tracker.start_op();
-  }
-
-  Context *ctx = new C_CommitPositionSafe(this, object_set_position);
-  m_journal_metadata->set_commit_position(object_set_position, ctx);
+  m_journal_metadata->committed(commit_tid,
+                                m_create_commit_position_safe_context);
 }
 
 void JournalTrimmer::trim_objects(uint64_t minimum_set) {
@@ -121,10 +110,8 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
   }
 }
 
-void JournalTrimmer::handle_commit_position_safe(
-    int r, const ObjectSetPosition &object_set_position) {
-  ldout(m_cct, 20) << __func__ << ": r=" << r << ", pos="
-                   << object_set_position << dendl;
+void JournalTrimmer::handle_commit_position_safe(int r) {
+  ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
 
   Mutex::Locker locker(m_lock);
   if (r == 0) {
index 46db1c51bab43529c92d26ff879b84e32808a040..349592b36e42512b0506327a829c3bc80210a9e7 100644 (file)
@@ -11,6 +11,7 @@
 #include "journal/AsyncOpTracker.h"
 #include "journal/JournalMetadata.h"
 #include "cls/journal/cls_journal_types.h"
+#include <functional>
 
 namespace journal {
 
@@ -26,18 +27,22 @@ public:
   void committed(uint64_t commit_tid);
 
 private:
+  typedef std::function<Context*()> CreateContext;
+
   struct C_CommitPositionSafe : public Context {
     JournalTrimmer *journal_trimmer;
-    ObjectSetPosition object_set_position;
 
-    C_CommitPositionSafe(JournalTrimmer *_journal_trimmer,
-                         const ObjectSetPosition &_object_set_position)
-      : journal_trimmer(_journal_trimmer),
-        object_set_position(_object_set_position) {}
+    C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
+      : journal_trimmer(_journal_trimmer) {
+      Mutex::Locker locker(journal_trimmer->m_lock);
+      journal_trimmer->m_async_op_tracker.start_op();
+    }
+    virtual ~C_CommitPositionSafe() {
+      journal_trimmer->m_async_op_tracker.finish_op();
+    }
 
     virtual void finish(int r) {
-      journal_trimmer->handle_commit_position_safe(r, object_set_position);
-      journal_trimmer->m_async_op_tracker.finish_op();
+      journal_trimmer->handle_commit_position_safe(r);
     }
   };
   struct C_RemoveSet : public Context {
@@ -70,10 +75,14 @@ private:
   uint64_t m_remove_set;
   Context *m_remove_set_ctx;
 
+  CreateContext m_create_commit_position_safe_context = [this]() {
+      return new C_CommitPositionSafe(this);
+    };
+
   void trim_objects(uint64_t minimum_set);
   void remove_set(uint64_t object_set);
 
-  void handle_commit_position_safe(int r, const ObjectSetPosition &position);
+  void handle_commit_position_safe(int r);
 
   void handle_set_removed(int r, uint64_t object_set);
 };
index be9628e2941cd2da397ee2cb5c8fccd4335de053..91ed2d7ca19f69307ceba6f929b104ec7ac687a3 100644 (file)
@@ -18,9 +18,10 @@ public:
   }
 
   journal::JournalMetadataPtr create_metadata(const std::string &oid,
-                                              const std::string &client_id) {
+                                              const std::string &client_id,
+                                              double commit_internal = 0.1) {
     journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
-      m_ioctx, oid, client_id, 0.1));
+      m_ioctx, oid, client_id, commit_internal));
     m_metadata_list.push_back(metadata);
     metadata->add_listener(&m_listener);
     return metadata;
@@ -50,36 +51,49 @@ TEST_F(TestJournalMetadata, ClientDNE) {
   ASSERT_EQ(-ENOENT, init_metadata(metadata2));
 }
 
-TEST_F(TestJournalMetadata, SetCommitPositions) {
+TEST_F(TestJournalMetadata, Committed) {
   std::string oid = get_temp_oid();
 
   ASSERT_EQ(0, create(oid, 14, 2));
   ASSERT_EQ(0, client_register(oid, "client1", ""));
 
-  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600);
   ASSERT_EQ(0, init_metadata(metadata1));
 
   journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
   ASSERT_EQ(0, init_metadata(metadata2));
   ASSERT_TRUE(wait_for_update(metadata2));
 
-  journal::JournalMetadata::ObjectSetPosition commit_position;
+  journal::JournalMetadata::ObjectSetPosition expect_commit_position;
   journal::JournalMetadata::ObjectSetPosition read_commit_position;
   metadata1->get_commit_position(&read_commit_position);
-  ASSERT_EQ(commit_position, read_commit_position);
+  ASSERT_EQ(expect_commit_position, read_commit_position);
 
-  journal::JournalMetadata::ObjectPositions object_positions;
-  object_positions = {
-    cls::journal::ObjectPosition(1, 123, 122)};
-  commit_position = journal::JournalMetadata::ObjectSetPosition(object_positions);
+  uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0);
+  uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0);
+  uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1);
+  uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2);
 
-  C_SaferCond cond;
-  metadata1->set_commit_position(commit_position, &cond);
-  ASSERT_EQ(0, cond.wait());
-  ASSERT_TRUE(wait_for_update(metadata2));
+  // cannot commit until tid1 + 2 committed
+  metadata1->committed(commit_tid2, []() { return nullptr; });
+  metadata1->committed(commit_tid3, []() { return nullptr; });
+
+  C_SaferCond cond1;
+  metadata1->committed(commit_tid1, [&cond1]() { return &cond1; });
 
+  // given our 10 minute commit internal, this should override the
+  // in-flight commit
+  C_SaferCond cond2;
+  metadata1->committed(commit_tid4, [&cond2]() { return &cond2; });
+
+  ASSERT_EQ(-ESTALE, cond1.wait());
+  metadata1->flush_commit_position();
+  ASSERT_EQ(0, cond2.wait());
+
+  ASSERT_TRUE(wait_for_update(metadata2));
   metadata2->get_commit_position(&read_commit_position);
-  ASSERT_EQ(commit_position, read_commit_position);
+  expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}};
+  ASSERT_EQ(expect_commit_position, read_commit_position);
 }
 
 TEST_F(TestJournalMetadata, UpdateActiveObject) {