From 24df022e0b34788953734c2e622c336077958e78 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Tue, 13 Feb 2018 10:05:01 -0500 Subject: [PATCH] journal: flush commit positions should wait for refresh Fixes: http://tracker.ceph.com/issues/22945 Signed-off-by: Jason Dillaman --- src/journal/JournalMetadata.cc | 150 ++++++++++++++++++++------------- src/journal/JournalMetadata.h | 6 +- 2 files changed, 96 insertions(+), 60 deletions(-) diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index e469d6c6c10..f9c856c185c 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -673,14 +673,9 @@ void JournalMetadata::assert_active_tag(uint64_t tag_tid, Context *on_finish) { void JournalMetadata::flush_commit_position() { ldout(m_cct, 20) << __func__ << dendl; - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); - if (m_commit_position_ctx == nullptr) { - return; - } - - cancel_commit_task(); - handle_commit_position_task(); + C_SaferCond ctx; + flush_commit_position(&ctx); + ctx.wait(); } void JournalMetadata::flush_commit_position(Context *on_safe) { @@ -688,7 +683,7 @@ void JournalMetadata::flush_commit_position(Context *on_safe) { Mutex::Locker timer_locker(*m_timer_lock); Mutex::Locker locker(m_lock); - if (m_commit_position_ctx == nullptr) { + if (m_commit_position_ctx == nullptr && m_flush_commits_in_progress == 0) { // nothing to flush if (on_safe != nullptr) { m_work_queue->queue(on_safe, 0); @@ -697,9 +692,12 @@ void JournalMetadata::flush_commit_position(Context *on_safe) { } if (on_safe != nullptr) { - m_commit_position_ctx = new C_FlushCommitPosition( - m_commit_position_ctx, on_safe); + m_flush_commit_position_ctxs.push_back(on_safe); } + if (m_commit_position_ctx == nullptr) { + return; + } + cancel_commit_task(); handle_commit_position_task(); } @@ -807,7 +805,6 @@ void JournalMetadata::cancel_commit_task() { assert(m_lock.is_locked()); assert(m_commit_position_ctx != nullptr); assert(m_commit_position_task_ctx != nullptr); - m_timer->cancel_event(m_commit_position_task_ctx); m_commit_position_task_ctx = NULL; } @@ -818,7 +815,7 @@ void JournalMetadata::schedule_commit_task() { assert(m_timer_lock->is_locked()); assert(m_lock.is_locked()); assert(m_commit_position_ctx != nullptr); - if (m_commit_position_task_ctx == NULL) { + if (m_commit_position_task_ctx == nullptr) { m_commit_position_task_ctx = m_timer->add_event_after(m_settings.commit_interval, new C_CommitPositionTask(this)); @@ -832,22 +829,51 @@ void JournalMetadata::handle_commit_position_task() { << "client_id=" << m_client_id << ", " << "commit_position=" << m_commit_position << dendl; - librados::ObjectWriteOperation op; - client::client_commit(&op, m_client_id, m_commit_position); + m_commit_position_task_ctx = nullptr; + Context* commit_position_ctx = nullptr; + std::swap(commit_position_ctx, m_commit_position_ctx); - Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); - m_commit_position_ctx = NULL; + m_async_op_tracker.start_op(); + ++m_flush_commits_in_progress; - ctx = schedule_laggy_clients_disconnect(ctx); + Context* ctx = new FunctionContext([this, commit_position_ctx](int r) { + Contexts flush_commit_position_ctxs; + m_lock.Lock(); + assert(m_flush_commits_in_progress > 0); + --m_flush_commits_in_progress; + if (m_flush_commits_in_progress == 0) { + std::swap(flush_commit_position_ctxs, m_flush_commit_position_ctxs); + } + m_lock.Unlock(); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, NULL, - utils::rados_ctx_callback); + commit_position_ctx->complete(0); + for (auto ctx : flush_commit_position_ctxs) { + ctx->complete(0); + } + m_async_op_tracker.finish_op(); + }); + ctx = new C_NotifyUpdate(this, ctx); + ctx = new FunctionContext([this, ctx](int r) { + // manually kick of a refresh in case the notification is missed + // and ignore the next notification that we are about to send + m_lock.Lock(); + ++m_ignore_watch_notifies; + m_lock.Unlock(); + + refresh(ctx); + }); + ctx = new FunctionContext([this, ctx](int r) { + schedule_laggy_clients_disconnect(ctx); + }); + + librados::ObjectWriteOperation op; + client::client_commit(&op, m_client_id, m_commit_position); + + auto comp = librados::Rados::aio_create_completion(ctx, nullptr, + utils::rados_ctx_callback); int r = m_ioctx.aio_operate(m_oid, comp, &op); assert(r == 0); comp->release(); - - m_commit_position_task_ctx = NULL; } void JournalMetadata::schedule_watch_reset() { @@ -884,6 +910,14 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) { bufferlist bl; m_ioctx.notify_ack(m_oid, notify_id, cookie, bl); + { + Mutex::Locker locker(m_lock); + if (m_ignore_watch_notifies > 0) { + --m_ignore_watch_notifies; + return; + } + } + refresh(NULL); } @@ -1060,57 +1094,55 @@ void JournalMetadata::handle_notified(int r) { ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; } -Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { - assert(m_lock.is_locked()); - +void JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { ldout(m_cct, 20) << __func__ << dendl; - if (m_settings.max_concurrent_object_sets <= 0) { - return on_finish; + on_finish->complete(0); + return; } Context *ctx = on_finish; + { + Mutex::Locker locker(m_lock); + for (auto &c : m_registered_clients) { + if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || + c.id == m_client_id || + m_settings.whitelisted_laggy_clients.count(c.id) > 0) { + continue; + } + const std::string &client_id = c.id; + uint64_t object_set = 0; + if (!c.commit_position.object_positions.empty()) { + auto &position = *(c.commit_position.object_positions.begin()); + object_set = position.object_number / m_splay_width; + } - for (auto &c : m_registered_clients) { - if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || - c.id == m_client_id || - m_settings.whitelisted_laggy_clients.count(c.id) > 0) { - continue; - } - const std::string &client_id = c.id; - uint64_t object_set = 0; - if (!c.commit_position.object_positions.empty()) { - auto &position = *(c.commit_position.object_positions.begin()); - object_set = position.object_number / m_splay_width; - } - - if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { - ldout(m_cct, 1) << __func__ << ": " << client_id - << ": scheduling disconnect" << dendl; + if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { + ldout(m_cct, 1) << __func__ << ": " << client_id + << ": scheduling disconnect" << dendl; - ctx = new FunctionContext([this, client_id, ctx](int r1) { - ldout(m_cct, 10) << __func__ << ": " << client_id - << ": flagging disconnected" << dendl; + ctx = new FunctionContext([this, client_id, ctx](int r1) { + ldout(m_cct, 10) << __func__ << ": " << client_id + << ": flagging disconnected" << dendl; - librados::ObjectWriteOperation op; - client::client_update_state(&op, client_id, - cls::journal::CLIENT_STATE_DISCONNECTED); + librados::ObjectWriteOperation op; + client::client_update_state( + &op, client_id, cls::journal::CLIENT_STATE_DISCONNECTED); - librados::AioCompletion *comp = - librados::Rados::aio_create_completion(ctx, nullptr, - utils::rados_ctx_callback); - int r = m_ioctx.aio_operate(m_oid, comp, &op); - assert(r == 0); - comp->release(); - }); + auto comp = librados::Rados::aio_create_completion( + ctx, nullptr, utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); + }); + } } } if (ctx == on_finish) { ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; } - - return ctx; + ctx->complete(0); } std::ostream &operator<<(std::ostream &os, diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 66f92e0cab1..ed063b0237f 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -333,6 +333,7 @@ private: size_t m_update_notifications; Cond m_update_cond; + size_t m_ignore_watch_notifies = 0; size_t m_refreshes_in_progress = 0; Contexts m_refresh_ctxs; @@ -341,6 +342,9 @@ private: Context *m_commit_position_ctx; Context *m_commit_position_task_ctx; + size_t m_flush_commits_in_progress = 0; + Contexts m_flush_commit_position_ctxs; + AsyncOpTracker m_async_op_tracker; void handle_immutable_metadata(int r, Context *on_init); @@ -358,7 +362,7 @@ private: void handle_watch_error(int err); void handle_notified(int r); - Context *schedule_laggy_clients_disconnect(Context *on_finish); + void schedule_laggy_clients_disconnect(Context *on_finish); friend std::ostream &operator<<(std::ostream &os, const JournalMetadata &journal_metadata); -- 2.39.5