]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: avoid race between in-flight notifications and flush
authorJason Dillaman <dillaman@redhat.com>
Wed, 20 Jan 2016 21:33:35 +0000 (16:33 -0500)
committerJason Dillaman <dillaman@redhat.com>
Wed, 20 Jan 2016 21:33:35 +0000 (16:33 -0500)
If an async callback for a safely committed event was in-flight,
it could race with the flush of the journal.  This would result
in the flush callback completing before the notifications for
safe events.

Fixes: #14434
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h

index 4fb7765c59f55c7c2ddb18e82fc7564dc6aa24ad..e65228bd91ca1578f89d0f4ada5876f9603e1647 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/JournalRecorder.h"
+#include "common/Finisher.h"
 #include "journal/Entry.h"
 #include "journal/Utils.h"
 
 
 namespace journal {
 
+namespace {
+
+struct C_Flush : public Context {
+  JournalMetadataPtr journal_metadata;
+  Context *on_finish;
+  atomic_t pending_flushes;
+  int ret_val;
+
+  C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
+          size_t _pending_flushes)
+    : journal_metadata(_journal_metadata), on_finish(_on_finish),
+      pending_flushes(_pending_flushes), ret_val(0) {
+  }
+
+  virtual void complete(int r) {
+    if (r < 0 && ret_val == 0) {
+      ret_val = r;
+    }
+    if (pending_flushes.dec() == 0) {
+      // ensure all prior callback have been flushed as well
+      journal_metadata->get_finisher().queue(on_finish, ret_val);
+      delete this;
+    }
+  }
+  virtual void finish(int r) {
+  }
+};
+
+} // anonymous namespace
+
 JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
                                  const std::string &object_oid_prefix,
                                  const JournalMetadataPtr& journal_metadata,
@@ -76,14 +107,15 @@ void JournalRecorder::flush(Context *on_safe) {
   {
     Mutex::Locker locker(m_lock);
 
-    ctx = new C_Flush(on_safe, m_object_ptrs.size());
+    ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
     for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
          it != m_object_ptrs.end(); ++it) {
       it->second->flush(ctx);
     }
   }
 
-  ctx->unblock();
+  // avoid holding the lock in case there is nothing to flush
+  ctx->complete(0);
 }
 
 ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
index 4c3489fcb889261f5a5ffef10ca0e7ab871439d2..58b988e9f66b199929791f13f92be7c30ffbc24b 100644 (file)
@@ -58,32 +58,6 @@ private:
     }
   };
 
-  struct C_Flush : public Context {
-    Context *on_finish;
-    atomic_t pending_flushes;
-    int ret_val;
-
-    C_Flush(Context *_on_finish, size_t _pending_flushes)
-      : on_finish(_on_finish), pending_flushes(_pending_flushes + 1),
-        ret_val(0) {
-    }
-
-    void unblock() {
-      complete(0);
-    }
-    virtual void complete(int r) {
-      if (r < 0 && ret_val == 0) {
-        ret_val = r;
-      }
-      if (pending_flushes.dec() == 0) {
-        on_finish->complete(ret_val);
-        delete this;
-      }
-    }
-    virtual void finish(int r) {
-    }
-  };
-
   librados::IoCtx m_ioctx;
   CephContext *m_cct;
   std::string m_object_oid_prefix;