From: Jason Dillaman Date: Fri, 19 Feb 2016 01:41:52 +0000 (-0500) Subject: journal: added interface to flush pending commit position updates X-Git-Tag: v10.1.0~350^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=999a319dd9cf62f7411b25bfc6f160298d70e3cb;p=ceph.git journal: added interface to flush pending commit position updates Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index f8d55742892..d13bbed9bc3 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -243,6 +243,21 @@ struct C_GetTags : public Context { } }; +struct C_FlushCommitPosition : public Context { + Context *commit_position_ctx; + Context *on_finish; + + C_FlushCommitPosition(Context *commit_position_ctx, Context *on_finish) + : commit_position_ctx(commit_position_ctx), on_finish(on_finish) { + } + virtual void finish(int r) override { + if (commit_position_ctx != nullptr) { + commit_position_ctx->complete(r); + } + on_finish->complete(r); + } +}; + } // anonymous namespace JournalMetadata::JournalMetadata(librados::IoCtx &ioctx, @@ -434,19 +449,32 @@ void JournalMetadata::set_active_set(uint64_t object_set) { } void JournalMetadata::flush_commit_position() { - ldout(m_cct, 20) << __func__ << dendl; - { - Mutex::Locker timer_locker(m_timer_lock); - Mutex::Locker locker(m_lock); - if (m_commit_position_task_ctx == NULL) { - return; - } + Mutex::Locker timer_locker(m_timer_lock); + Mutex::Locker locker(m_lock); + if (m_commit_position_ctx == nullptr) { + return; + } + + cancel_commit_task(); + handle_commit_position_task(); +} - m_timer->cancel_event(m_commit_position_task_ctx); - m_commit_position_task_ctx = NULL; +void JournalMetadata::flush_commit_position(Context *on_safe) { + ldout(m_cct, 20) << __func__ << dendl; + + Mutex::Locker timer_locker(m_timer_lock); + Mutex::Locker locker(m_lock); + if (m_commit_position_ctx == nullptr) { + // nothing to flush + m_finisher->queue(on_safe, 0); + return; } + + m_commit_position_ctx = new C_FlushCommitPosition( + m_commit_position_ctx, on_safe); + cancel_commit_task(); handle_commit_position_task(); } @@ -554,13 +582,24 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { } } -void JournalMetadata::schedule_commit_task() { - +void JournalMetadata::cancel_commit_task() { ldout(m_cct, 20) << __func__ << dendl; assert(m_timer_lock.is_locked()); assert(m_lock.is_locked()); + assert(m_commit_position_ctx != nullptr); + assert(m_commit_position_task_ctx != nullptr); + + m_timer->cancel_event(m_commit_position_task_ctx); + m_commit_position_task_ctx = NULL; +} +void JournalMetadata::schedule_commit_task() { + ldout(m_cct, 20) << __func__ << dendl; + + assert(m_timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(m_commit_position_ctx != nullptr); if (m_commit_position_task_ctx == NULL) { m_commit_position_task_ctx = new C_CommitPositionTask(this); m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx); @@ -568,11 +607,10 @@ void JournalMetadata::schedule_commit_task() { } void JournalMetadata::handle_commit_position_task() { - + assert(m_timer_lock.is_locked()); + assert(m_lock.is_locked()); ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker locker(m_lock); - librados::ObjectWriteOperation op; client::client_commit(&op, m_client_id, m_commit_position); diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 450169b61d6..f3898dab132 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -99,6 +99,7 @@ public: } void flush_commit_position(); + void flush_commit_position(Context *on_safe); void set_commit_position(const ObjectSetPosition &commit_position, Context *on_safe); void get_commit_position(ObjectSetPosition *commit_position) const { @@ -185,6 +186,7 @@ private: journal_metadata->m_async_op_tracker.finish_op(); } virtual void finish(int r) { + Mutex::Locker locker(journal_metadata->m_lock); journal_metadata->handle_commit_position_task(); }; }; @@ -309,6 +311,7 @@ private: void refresh(Context *on_finish); void handle_refresh_complete(C_Refresh *refresh, int r); + void cancel_commit_task(); void schedule_commit_task(); void handle_commit_position_task(); diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 1eb7e2c2cb6..d3fb225b1d5 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -159,6 +159,10 @@ int Journaler::remove(bool force) { return 0; } +void Journaler::flush_commit_position(Context *on_safe) { + m_metadata->flush_commit_position(on_safe); +} + int Journaler::register_client(const bufferlist &data) { return m_metadata->register_client(data); } @@ -240,7 +244,7 @@ void Journaler::start_append(int flush_interval, uint64_t flush_bytes, void Journaler::stop_append(Context *on_safe) { assert(m_recorder != NULL); - flush(new C_DeleteRecorder(m_recorder, on_safe)); + flush_append(new C_DeleteRecorder(m_recorder, on_safe)); m_recorder = NULL; } @@ -248,7 +252,7 @@ Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) { return m_recorder->append(tag_tid, payload_bl); } -void Journaler::flush(Context *on_safe) { +void Journaler::flush_append(Context *on_safe) { m_recorder->flush(on_safe); } diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 6702eb4dffb..324dbbc72bb 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -48,6 +48,8 @@ public: int register_client(const bufferlist &data); int unregister_client(); + void flush_commit_position(Context *on_safe); + void allocate_tag(const bufferlist &data, cls::journal::Tag *tag, Context *on_finish); void allocate_tag(uint64_t tag_class, const bufferlist &data, @@ -61,7 +63,7 @@ public: void start_append(int flush_interval, uint64_t flush_bytes, double flush_age); Future append(uint64_t tag_tid, const bufferlist &bl); - void flush(Context *on_safe); + void flush_append(Context *on_safe); void stop_append(Context *on_safe); void committed(const ReplayEntry &replay_entry);