]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: new async flush method for whole journal
authorJason Dillaman <dillaman@redhat.com>
Mon, 13 Jul 2015 19:25:37 +0000 (15:25 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/Journaler.cc
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h

index 9050775c63de704a795ba297342bee85a8c8b7b9..fa4f84b7853839623565bdaa37b8ab9b74e2b44b 100644 (file)
@@ -70,11 +70,13 @@ Future JournalRecorder::append(const std::string &tag,
   return Future(future);
 }
 
-void JournalRecorder::flush() {
+void JournalRecorder::flush(Context *on_safe) {
   Mutex::Locker locker(m_lock);
+
+  C_Flush *ctx = new C_Flush(on_safe, m_object_ptrs.size());
   for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
        it != m_object_ptrs.end(); ++it) {
-    it->second->flush();
+    it->second->flush(ctx);
   }
 }
 
index 0604d555dfb24718cd9ca184383a62ca880bb9fc..186fe05c78ad1c76db4e0b1494762b0c22287507 100644 (file)
@@ -5,6 +5,8 @@
 #define CEPH_JOURNAL_JOURNAL_RECORDER_H
 
 #include "include/int_types.h"
+#include "include/atomic.h"
+#include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
 #include "journal/Future.h"
@@ -27,7 +29,7 @@ public:
   ~JournalRecorder();
 
   Future append(const std::string &tag, const bufferlist &bl);
-  void flush();
+  void flush(Context *on_safe);
 
   ObjectRecorderPtr get_object(uint8_t splay_offset);
 
@@ -56,6 +58,28 @@ private:
     }
   };
 
+  struct C_Flush : public Context {
+    Context *on_finish;
+    atomic_t pending_flushes;
+    int ret_val;
+
+    C_Flush(Context *_on_finish, size_t _pending_flushes)
+      : on_finish(_on_finish), pending_flushes(_pending_flushes), ret_val(0) {
+    }
+
+    virtual void complete(int r) {
+      if (r < 0 && ret_val == 0) {
+        ret_val = r;
+      }
+      if (pending_flushes.dec() == 0) {
+        on_finish->complete(ret_val);
+        delete this;
+      }
+    }
+    virtual void finish(int r) {
+    }
+  };
+
   librados::IoCtx m_ioctx;
   CephContext *m_cct;
   std::string m_object_oid_prefix;
index efca93d688aff3d0f76bdd291b287ecb3d1e3a21..d75461125edacc6a28056b439b5a0d41b449db4a 100644 (file)
@@ -157,8 +157,7 @@ Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
 }
 
 void Journaler::flush(Context *on_safe) {
-  // TODO pass ctx
-  m_recorder->flush();
+  m_recorder->flush(on_safe);
 }
 
 void Journaler::create_player(ReplayHandler *replay_handler) {
index b81cd3cb46b87105d1efb063249b7dd17fe7c91a..af16ccdb00a24853d0225fcba6f0fc5cdee6ac9b 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/ObjectRecorder.h"
+#include "journal/Future.h"
 #include "journal/Utils.h"
 #include "include/assert.h"
 #include "common/Timer.h"
@@ -59,13 +60,32 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
   return (m_size + m_pending_bytes >= m_soft_max_size);
 }
 
-void ObjectRecorder::flush() {
+void ObjectRecorder::flush(Context *on_safe) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
 
-  Mutex::Locker locker(m_lock);
-  if (flush_appends(true)) {
-    cancel_append_task();
+  Future future;
+  {
+    Mutex::Locker locker(m_lock);
+
+    // attach the flush to the most recent append
+    if (!m_append_buffers.empty()) {
+      future = Future(m_append_buffers.rbegin()->first);
+
+      flush_appends(true);
+      cancel_append_task();
+    } else if (!m_in_flight_appends.empty()) {
+      AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
+      assert(!append_buffers.empty());
+      future = Future(append_buffers.rbegin()->first);
+    }
+  }
+
+  if (future.is_valid()) {
+    future.flush(on_safe);
+  } else {
+    on_safe->complete(0);
   }
+
 }
 
 void ObjectRecorder::flush(const FutureImplPtr &future) {
index d7d2ed10e361e47b668473490aa283435cd002d3..cc6425c23b81d1340a31477c56b25bfa482a4942 100644 (file)
@@ -46,7 +46,7 @@ public:
   }
 
   bool append(const AppendBuffers &append_buffers);
-  void flush();
+  void flush(Context *on_safe);
   void flush(const FutureImplPtr &future);
 
   void claim_append_buffers(AppendBuffers *append_buffers);