]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: close, advance, and open object set ordering
authorJason Dillaman <dillaman@redhat.com>
Sat, 14 May 2016 22:58:41 +0000 (18:58 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 20 May 2016 00:29:00 +0000 (20:29 -0400)
Flush in-flight appends to open objects before advancing the
active object set.  Additionally, don't start recording to the
new objects until after advancing the active set.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit de830057d0f7286914e019540c6263423cb60428)

src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h

index 02836b87c861907c1731a47344b9643259564b05..ce66a1314437322078528161548e9bfcc567bfb6 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/JournalRecorder.h"
+#include "common/errno.h"
 #include "journal/Entry.h"
 #include "journal/Utils.h"
 
@@ -67,6 +68,10 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 
 JournalRecorder::~JournalRecorder() {
   m_journal_metadata->remove_listener(&m_listener);
+
+  Mutex::Locker locker(m_lock);
+  assert(m_in_flight_advance_sets == 0);
+  assert(m_in_flight_object_closes == 0);
 }
 
 Future JournalRecorder::append(uint64_t tag_tid,
@@ -97,9 +102,8 @@ Future JournalRecorder::append(uint64_t tag_tid,
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
-    close_object_set(object_ptr->get_object_number() / splay_width);
+    close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
   }
-
   return Future(future);
 }
 
@@ -127,36 +131,100 @@ ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
   return object_recoder;
 }
 
-void JournalRecorder::close_object_set(uint64_t object_set) {
+void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
   assert(m_lock.is_locked());
 
-  uint8_t splay_width = m_journal_metadata->get_splay_width();
-  if (object_set != m_current_set) {
+  // entry overflow from open object
+  if (m_current_set != object_set) {
+    ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
     return;
   }
 
+  // we shouldn't overflow upon append if already closed and we
+  // shouldn't receive an overflowed callback if already closed
+  assert(m_in_flight_advance_sets == 0);
+  assert(m_in_flight_object_closes == 0);
+
   uint64_t active_set = m_journal_metadata->get_active_set();
-  if (active_set < m_current_set + 1) {
-    m_journal_metadata->set_active_set(m_current_set + 1);
+  assert(m_current_set == active_set);
+  ++m_current_set;
+  ++m_in_flight_advance_sets;
+
+  ldout(m_cct, 20) << __func__ << ": closing active object set "
+                   << object_set << dendl;
+  if (close_object_set(m_current_set)) {
+    advance_object_set();
+  }
+}
+
+void JournalRecorder::advance_object_set() {
+  assert(m_lock.is_locked());
+
+  assert(m_in_flight_object_closes == 0);
+  ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
+                   << dendl;
+  m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
+    this));
+}
+
+void JournalRecorder::handle_advance_object_set(int r) {
+  Mutex::Locker locker(m_lock);
+  ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+
+  assert(m_in_flight_advance_sets > 0);
+  --m_in_flight_advance_sets;
+
+  if (r < 0 && r != -ESTALE) {
+    lderr(m_cct) << __func__ << ": failed to advance object set: "
+                 << cpp_strerror(r) << dendl;
+  }
+
+  if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+    open_object_set();
+  }
+}
+
+void JournalRecorder::open_object_set() {
+  assert(m_lock.is_locked());
+
+  ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
+                   << dendl;
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+       it != m_object_ptrs.end(); ++it) {
+    ObjectRecorderPtr object_recorder = it->second;
+    if (object_recorder->get_object_number() / splay_width != m_current_set) {
+      assert(object_recorder->is_closed());
+
+      // ready to close object and open object in active set
+      create_next_object_recorder(object_recorder);
+    }
   }
-  m_current_set = m_journal_metadata->get_active_set();
+}
 
-  ldout(m_cct, 10) << __func__ << ": advancing to object set "
-                   << m_current_set << dendl;
+bool JournalRecorder::close_object_set(uint64_t active_set) {
+  assert(m_lock.is_locked());
 
   // object recorders will invoke overflow handler as they complete
   // closing the object to ensure correct order of future appends
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
        it != m_object_ptrs.end(); ++it) {
     ObjectRecorderPtr object_recorder = it->second;
-    if (object_recorder != NULL &&
-        object_recorder->get_object_number() / splay_width == m_current_set) {
-      if (object_recorder->close()) {
-        // no in-flight ops, immediately create new recorder
-        create_next_object_recorder(object_recorder);
+    if (object_recorder->get_object_number() / splay_width != active_set) {
+      ldout(m_cct, 10) << __func__ << ": closing object "
+                       << object_recorder->get_oid() << dendl;
+      // flush out all queued appends and hold future appends
+      if (!object_recorder->close()) {
+        ++m_in_flight_object_closes;
+      } else {
+        ldout(m_cct, 20) << __func__ << ": object "
+                         << object_recorder->get_oid() << " closed" << dendl;
       }
     }
   }
+  return (m_in_flight_object_closes == 0);
 }
 
 ObjectRecorderPtr JournalRecorder::create_object_recorder(
@@ -181,6 +249,9 @@ void JournalRecorder::create_next_object_recorder(
   ObjectRecorderPtr new_object_recorder = create_object_recorder(
      (m_current_set * splay_width) + splay_offset);
 
+  ldout(m_cct, 10) << __func__ << ": "
+                   << "old oid=" << object_recorder->get_oid() << ", "
+                   << "new oid=" << new_object_recorder->get_oid() << dendl;
   AppendBuffers append_buffers;
   object_recorder->claim_append_buffers(&append_buffers);
   new_object_recorder->append(append_buffers);
@@ -192,13 +263,50 @@ void JournalRecorder::handle_update() {
   Mutex::Locker locker(m_lock);
 
   uint64_t active_set = m_journal_metadata->get_active_set();
-  if (active_set > m_current_set) {
-    close_object_set(m_current_set);
+  if (m_current_set < active_set) {
+    // peer journal client advanced the active set
+    ldout(m_cct, 20) << __func__ << ": "
+                     << "current_set=" << m_current_set << ", "
+                     << "active_set=" << active_set << dendl;
+
+    uint64_t current_set = m_current_set;
+    m_current_set = active_set;
+    if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+      ldout(m_cct, 20) << __func__ << ": closing current object set "
+                       << current_set << dendl;
+      if (close_object_set(active_set)) {
+        open_object_set();
+      }
+    }
   }
 }
 
 void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
-  // TODO
+  ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  uint64_t object_number = object_recorder->get_object_number();
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint8_t splay_offset = object_number % splay_width;
+  ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+  assert(active_object_recorder->get_object_number() == object_number);
+
+  assert(m_in_flight_object_closes > 0);
+  --m_in_flight_object_closes;
+
+  // object closed after advance active set committed
+  ldout(m_cct, 20) << __func__ << ": object "
+                   << active_object_recorder->get_oid() << " closed" << dendl;
+  if (m_in_flight_object_closes == 0) {
+    if (m_in_flight_advance_sets == 0) {
+      // peer forced closing of object set
+      open_object_set();
+    } else {
+      // local overflow advanced object set
+      advance_object_set();
+    }
+  }
 }
 
 void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
@@ -212,8 +320,10 @@ void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
   ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
   assert(active_object_recorder->get_object_number() == object_number);
 
-  close_object_set(object_number / splay_width);
-  create_next_object_recorder(active_object_recorder);
+  ldout(m_cct, 20) << __func__ << ": object "
+                   << active_object_recorder->get_oid() << " overflowed"
+                   << dendl;
+  close_and_advance_object_set(object_number / splay_width);
 }
 
 } // namespace journal
index dbd289883d46c0f1fcd56ba04cd0cec5d9977f64..68a1d8f50e38ea12e72a1b16ae40852dadcb511d 100644 (file)
@@ -62,6 +62,17 @@ private:
     }
   };
 
+  struct C_AdvanceObjectSet : public Context {
+    JournalRecorder *journal_recorder;
+
+    C_AdvanceObjectSet(JournalRecorder *_journal_recorder)
+      : journal_recorder(_journal_recorder) {
+    }
+    virtual void finish(int r) {
+      journal_recorder->handle_advance_object_set(r);
+    }
+  };
+
   librados::IoCtx m_ioctx;
   CephContext *m_cct;
   std::string m_object_oid_prefix;
@@ -77,12 +88,21 @@ private:
 
   Mutex m_lock;
 
+  uint32_t m_in_flight_advance_sets = 0;
+  uint32_t m_in_flight_object_closes = 0;
   uint64_t m_current_set;
   ObjectRecorderPtrs m_object_ptrs;
 
   FutureImplPtr m_prev_future;
 
-  void close_object_set(uint64_t object_set);
+  void open_object_set();
+  bool close_object_set(uint64_t active_set);
+
+  void advance_object_set();
+  void handle_advance_object_set(int r);
+
+  void close_and_advance_object_set(uint64_t object_set);
+
   ObjectRecorderPtr create_object_recorder(uint64_t object_number);
   void create_next_object_recorder(ObjectRecorderPtr object_recorder);