]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: avoid reentrant callbacks on Future
authorJason Dillaman <dillaman@redhat.com>
Mon, 13 Jul 2015 21:59:33 +0000 (17:59 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/Future.cc
src/journal/FutureImpl.cc
src/journal/FutureImpl.h
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalRecorder.cc

index 5b0510651278fa4e3ae69c8f290846106fec9e24..144f32052277878856db969f8c95cdca18537bb4 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "journal/Future.h"
 #include "journal/FutureImpl.h"
+#include "include/assert.h"
 
 namespace journal {
 
@@ -11,6 +12,7 @@ void Future::flush(Context *on_safe) {
 }
 
 void Future::wait(Context *on_safe) {
+  assert(on_safe != NULL);
   m_future_impl->wait(on_safe);
 }
 
index 3ca8688dfabaa76ac2cb6b885224df257f18fb39..4b966f0a8a82f2f35cf778030fab79916b700544 100644 (file)
@@ -2,12 +2,13 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/FutureImpl.h"
+#include "common/Finisher.h"
 #include "journal/Utils.h"
 
 namespace journal {
 
-FutureImpl::FutureImpl(const std::string &tag, uint64_t tid)
-  : RefCountedObject(NULL, 0), m_tag(tag), m_tid(tid),
+FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid)
+  : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid),
     m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
     m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
     m_consistent_ack(this) {
@@ -48,7 +49,7 @@ void FutureImpl::flush(Context *on_safe) {
   }
 
   if (complete && on_safe != NULL) {
-    on_safe->complete(m_return_value);
+    m_finisher.queue(on_safe, m_return_value);
   } else if (flush_handler) {
     // attached to journal object -- instruct it to flush all entries through
     // this one.  possible to become detached while lock is released, so flush
@@ -66,7 +67,7 @@ void FutureImpl::wait(Context *on_safe) {
       return;
     }
   }
-  on_safe->complete(m_return_value);
+  m_finisher.queue(on_safe, m_return_value);
 }
 
 bool FutureImpl::is_complete() const {
index eaf5febfcc04cf79363c6e74bdf806b240f582c5..69ea4461e149f47b28869d80b3a35d7e6de546c3 100644 (file)
@@ -14,6 +14,7 @@
 #include "include/assert.h"
 
 class Context;
+class Finisher;
 
 namespace journal {
 
@@ -30,7 +31,7 @@ public:
   };
   typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
 
-  FutureImpl(const std::string &tag, uint64_t tid);
+  FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid);
 
   void init(const FutureImplPtr &prev_future);
 
@@ -90,6 +91,7 @@ private:
     virtual void finish(int r) {}
   };
 
+  Finisher &m_finisher;
   std::string m_tag;
   uint64_t m_tid;
 
index 499100e356e007fcb0851afc952e161145215302..51c4cef803191178543ec563943a46cdb88ae1f0 100644 (file)
@@ -4,6 +4,7 @@
 #include "journal/JournalMetadata.h"
 #include "journal/Utils.h"
 #include "common/errno.h"
+#include "common/Finisher.h"
 #include "common/Timer.h"
 #include "cls/journal/cls_journal_client.h"
 
@@ -21,7 +22,7 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
                                  double commit_interval)
     : m_cct(NULL), m_oid(oid), m_client_id(client_id),
       m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
-      m_initialized(false), m_timer(NULL),
+      m_initialized(false), m_finisher(NULL), m_timer(NULL),
       m_timer_lock("JournalMetadata::m_timer_lock"),
       m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
       m_update_notifications(0), m_commit_position_pending(false),
@@ -37,6 +38,11 @@ JournalMetadata::~JournalMetadata() {
     delete m_timer;
     m_timer = NULL;
   }
+  if (m_finisher != NULL) {
+    m_finisher->stop();
+    delete m_finisher;
+    m_finisher = NULL;
+  }
 
   m_ioctx.unwatch2(m_watch_handle);
   librados::Rados rados(m_ioctx);
@@ -47,6 +53,9 @@ void JournalMetadata::init(Context *on_init) {
   assert(!m_initialized);
   m_initialized = true;
 
+  m_finisher = new Finisher(m_cct);
+  m_finisher->start();
+
   m_timer = new SafeTimer(m_cct, m_timer_lock, false);
   m_timer->init();
 
index 8763edd3edb00237a130e84cdc7c78588c72ab44..ce621c5e15794e3e15991a6b35513880be17ec20 100644 (file)
@@ -17,6 +17,7 @@
 #include <string>
 #include "include/assert.h"
 
+class Finisher;
 class SafeTimer;
 
 namespace journal {
@@ -60,6 +61,10 @@ public:
     return m_splay_width;
   }
 
+  inline Finisher &get_finisher() {
+    return *m_finisher;
+  }
+
   inline SafeTimer &get_timer() {
     return *m_timer;
   }
@@ -198,6 +203,7 @@ private:
   uint8_t m_splay_width;
   bool m_initialized;
 
+  Finisher *m_finisher;
   SafeTimer *m_timer;
   Mutex m_timer_lock;
 
index fa4f84b7853839623565bdaa37b8ab9b74e2b44b..938cf70729342ba7435f33746fdba6432f9f4614 100644 (file)
@@ -48,7 +48,8 @@ Future JournalRecorder::append(const std::string &tag,
   uint8_t splay_offset = tid % splay_width;
 
   ObjectRecorderPtr object_ptr = get_object(splay_offset);
-  FutureImplPtr future(new FutureImpl(tag, tid));
+  FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
+                                      tag, tid));
   future->init(m_prev_future);
   m_prev_future = future;