From: Jason Dillaman Date: Mon, 9 Sep 2019 14:15:24 +0000 (-0400) Subject: rbd-mirror: separate image replayer periodic flushing from status updates X-Git-Tag: v15.1.0~1245^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=aa62b99ec018f33448d9586509bbbecec9df8ab7;p=ceph.git rbd-mirror: separate image replayer periodic flushing from status updates This is in preparation for pulling the periodic status update logic out of the image replayer. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/Utils.h b/src/librbd/Utils.h index 9aa01edb2df..4695a10e608 100644 --- a/src/librbd/Utils.h +++ b/src/librbd/Utils.h @@ -89,8 +89,12 @@ struct C_AsyncCallback : public Context { C_AsyncCallback(WQ *op_work_queue, Context *on_finish) : op_work_queue(op_work_queue), on_finish(on_finish) { } + ~C_AsyncCallback() override { + delete on_finish; + } void finish(int r) override { op_work_queue->queue(on_finish, r); + on_finish = nullptr; } }; diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index f11cf783783..3663949fcff 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -48,6 +48,7 @@ extern PerfCounters *g_perf_counters; namespace rbd { namespace mirror { +using librbd::util::create_async_context_callback; using librbd::util::create_context_callback; using librbd::util::create_rados_callback; using namespace rbd::mirror::image_replayer; @@ -288,6 +289,7 @@ ImageReplayer::~ImageReplayer() ceph_assert(m_on_stop_finish == nullptr); ceph_assert(m_bootstrap_request == nullptr); ceph_assert(m_in_flight_status_updates == 0); + ceph_assert(m_flush_local_replay_task == nullptr); delete m_journal_listener; } @@ -770,8 +772,10 @@ void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) // might be invoked multiple times while stopping return; } + m_stop_requested = true; m_state = STATE_STOPPING; + cancel_flush_local_replay_task(); } set_state_description(r, desc); @@ -836,6 +840,50 @@ void ImageReplayer::flush() update_mirror_image_status(false, boost::none); } +template +void ImageReplayer::schedule_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + + std::lock_guard timer_locker{m_threads->timer_lock}; + if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) { + return; + } + + dout(15) << dendl; + m_flush_local_replay_task = create_async_context_callback( + m_threads->work_queue, create_context_callback< + ImageReplayer, + &ImageReplayer::handle_flush_local_replay_task>(this)); + m_threads->timer->add_event_after(30, m_flush_local_replay_task); +} + +template +void ImageReplayer::cancel_flush_local_replay_task() { + ceph_assert(ceph_mutex_is_locked(m_lock)); + std::lock_guard timer_locker{m_threads->timer_lock}; + if (m_flush_local_replay_task != nullptr) { + auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task); + m_flush_local_replay_task = nullptr; + ceph_assert(canceled); + } +} + +template +void ImageReplayer::handle_flush_local_replay_task(int) { + dout(15) << dendl; + + m_in_flight_op_tracker.start_op(); + auto on_finish = new LambdaContext([this](int) { + { + std::lock_guard timer_locker{m_threads->timer_lock}; + m_flush_local_replay_task = nullptr; + } + + m_in_flight_op_tracker.finish_op(); + }); + flush_local_replay(on_finish); +} + template void ImageReplayer::flush_local_replay(Context* on_flush) { @@ -1238,6 +1286,8 @@ void ImageReplayer::handle_process_entry_safe(const ReplayEntry &replay_entry auto ctx = new LambdaContext( [this, bytes, latency](int r) { std::lock_guard locker{m_lock}; + schedule_flush_local_replay_task(); + if (m_perf_counters) { m_perf_counters->inc(l_rbd_mirror_replay); m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes); @@ -1315,14 +1365,7 @@ void ImageReplayer::queue_mirror_image_status_update(const OptionalState &sta [this, state](int r) { send_mirror_status_update(state); }); - - // ensure pending IO is flushed and the commit position is updated - // prior to updating the mirror status - auto ctx2 = new LambdaContext( - [this, ctx=std::move(ctx)](int r) { - flush_local_replay(ctx); - }); - m_threads->work_queue->queue(ctx2, 0); + m_threads->work_queue->queue(ctx, 0); } template @@ -1700,6 +1743,14 @@ void ImageReplayer::handle_shut_down(int r) { return; } + if (!m_in_flight_op_tracker.empty()) { + dout(15) << "waiting for in-flight operations to complete" << dendl; + m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) { + handle_shut_down(r); + })); + return; + } + dout(10) << "stop complete" << dendl; ReplayStatusFormatter::destroy(m_replay_status_formatter); m_replay_status_formatter = nullptr; diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 33bd8e811ee..22507de162f 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -341,6 +341,9 @@ private: AsyncOpTracker m_event_replay_tracker; Context *m_delayed_preprocess_task = nullptr; + AsyncOpTracker m_in_flight_op_tracker; + Context *m_flush_local_replay_task = nullptr; + struct RemoteJournalerListener : public ::journal::JournalMetadataListener { ImageReplayer *replayer; @@ -378,6 +381,10 @@ private: m_state == STATE_REPLAY_FLUSHING); } + void schedule_flush_local_replay_task(); + void cancel_flush_local_replay_task(); + void handle_flush_local_replay_task(int r); + void flush_local_replay(Context* on_flush); void handle_flush_local_replay(Context* on_flush, int r);