From: Jason Dillaman Date: Wed, 20 Jan 2016 21:33:35 +0000 (-0500) Subject: journal: avoid race between in-flight notifications and flush X-Git-Tag: v10.0.4~150^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=144262131ce6edb8ba03a3596d1ec0d84098c0e0;p=ceph.git journal: avoid race between in-flight notifications and flush If an async callback for a safely committed event was in-flight, it could race with the flush of the journal. This would result in the flush callback completing before the notifications for safe events. Fixes: #14434 Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 4fb7765c59f5..e65228bd91ca 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "journal/JournalRecorder.h" +#include "common/Finisher.h" #include "journal/Entry.h" #include "journal/Utils.h" @@ -11,6 +12,36 @@ namespace journal { +namespace { + +struct C_Flush : public Context { + JournalMetadataPtr journal_metadata; + Context *on_finish; + atomic_t pending_flushes; + int ret_val; + + C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish, + size_t _pending_flushes) + : journal_metadata(_journal_metadata), on_finish(_on_finish), + pending_flushes(_pending_flushes), ret_val(0) { + } + + virtual void complete(int r) { + if (r < 0 && ret_val == 0) { + ret_val = r; + } + if (pending_flushes.dec() == 0) { + // ensure all prior callback have been flushed as well + journal_metadata->get_finisher().queue(on_finish, ret_val); + delete this; + } + } + virtual void finish(int r) { + } +}; + +} // anonymous namespace + JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr& journal_metadata, @@ -76,14 +107,15 @@ void JournalRecorder::flush(Context *on_safe) { { Mutex::Locker locker(m_lock); - ctx = new C_Flush(on_safe, m_object_ptrs.size()); + ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { it->second->flush(ctx); } } - ctx->unblock(); + // avoid holding the lock in case there is nothing to flush + ctx->complete(0); } ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index 4c3489fcb889..58b988e9f66b 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -58,32 +58,6 @@ private: } }; - struct C_Flush : public Context { - Context *on_finish; - atomic_t pending_flushes; - int ret_val; - - C_Flush(Context *_on_finish, size_t _pending_flushes) - : on_finish(_on_finish), pending_flushes(_pending_flushes + 1), - ret_val(0) { - } - - void unblock() { - complete(0); - } - virtual void complete(int r) { - if (r < 0 && ret_val == 0) { - ret_val = r; - } - if (pending_flushes.dec() == 0) { - on_finish->complete(ret_val); - delete this; - } - } - virtual void finish(int r) { - } - }; - librados::IoCtx m_ioctx; CephContext *m_cct; std::string m_object_oid_prefix;