From: Jason Dillaman Date: Mon, 13 Jul 2015 19:25:37 +0000 (-0400) Subject: journal: new async flush method for whole journal X-Git-Tag: v10.0.1~102^2~22 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c1dd24ed2444f115153330fe30d5df07bf3b31eb;p=ceph.git journal: new async flush method for whole journal Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 9050775c63de..fa4f84b78538 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -70,11 +70,13 @@ Future JournalRecorder::append(const std::string &tag, return Future(future); } -void JournalRecorder::flush() { +void JournalRecorder::flush(Context *on_safe) { Mutex::Locker locker(m_lock); + + C_Flush *ctx = new C_Flush(on_safe, m_object_ptrs.size()); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { - it->second->flush(); + it->second->flush(ctx); } } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index 0604d555dfb2..186fe05c78ad 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -5,6 +5,8 @@ #define CEPH_JOURNAL_JOURNAL_RECORDER_H #include "include/int_types.h" +#include "include/atomic.h" +#include "include/Context.h" #include "include/rados/librados.hpp" #include "common/Mutex.h" #include "journal/Future.h" @@ -27,7 +29,7 @@ public: ~JournalRecorder(); Future append(const std::string &tag, const bufferlist &bl); - void flush(); + void flush(Context *on_safe); ObjectRecorderPtr get_object(uint8_t splay_offset); @@ -56,6 +58,28 @@ private: } }; + struct C_Flush : public Context { + Context *on_finish; + atomic_t pending_flushes; + int ret_val; + + C_Flush(Context *_on_finish, size_t _pending_flushes) + : on_finish(_on_finish), pending_flushes(_pending_flushes), ret_val(0) { + } + + virtual void complete(int r) { + if (r < 0 && ret_val == 0) { + ret_val = r; + } + if (pending_flushes.dec() == 0) { + on_finish->complete(ret_val); + delete this; + } + } + virtual void finish(int r) { + } + }; + librados::IoCtx m_ioctx; CephContext *m_cct; std::string m_object_oid_prefix; diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index efca93d688af..d75461125eda 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -157,8 +157,7 @@ Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) { } void Journaler::flush(Context *on_safe) { - // TODO pass ctx - m_recorder->flush(); + m_recorder->flush(on_safe); } void Journaler::create_player(ReplayHandler *replay_handler) { diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index b81cd3cb46b8..af16ccdb00a2 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "journal/ObjectRecorder.h" +#include "journal/Future.h" #include "journal/Utils.h" #include "include/assert.h" #include "common/Timer.h" @@ -59,13 +60,32 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) { return (m_size + m_pending_bytes >= m_soft_max_size); } -void ObjectRecorder::flush() { +void ObjectRecorder::flush(Context *on_safe) { ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; - Mutex::Locker locker(m_lock); - if (flush_appends(true)) { - cancel_append_task(); + Future future; + { + Mutex::Locker locker(m_lock); + + // attach the flush to the most recent append + if (!m_append_buffers.empty()) { + future = Future(m_append_buffers.rbegin()->first); + + flush_appends(true); + cancel_append_task(); + } else if (!m_in_flight_appends.empty()) { + AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; + assert(!append_buffers.empty()); + future = Future(append_buffers.rbegin()->first); + } + } + + if (future.is_valid()) { + future.flush(on_safe); + } else { + on_safe->complete(0); } + } void ObjectRecorder::flush(const FutureImplPtr &future) { diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index d7d2ed10e361..cc6425c23b81 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -46,7 +46,7 @@ public: } bool append(const AppendBuffers &append_buffers); - void flush(); + void flush(Context *on_safe); void flush(const FutureImplPtr &future); void claim_append_buffers(AppendBuffers *append_buffers);