namespace rbd {
namespace mirror {
+using librbd::util::create_async_context_callback;
using librbd::util::create_context_callback;
using librbd::util::create_rados_callback;
using namespace rbd::mirror::image_replayer;
ceph_assert(m_on_stop_finish == nullptr);
ceph_assert(m_bootstrap_request == nullptr);
ceph_assert(m_in_flight_status_updates == 0);
+ ceph_assert(m_flush_local_replay_task == nullptr);
delete m_journal_listener;
}
// might be invoked multiple times while stopping
return;
}
+
m_stop_requested = true;
m_state = STATE_STOPPING;
+ cancel_flush_local_replay_task();
}
set_state_description(r, desc);
update_mirror_image_status(false, boost::none);
}
+template <typename I>
+void ImageReplayer<I>::schedule_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ std::lock_guard timer_locker{m_threads->timer_lock};
+ if (m_state != STATE_REPLAYING || m_flush_local_replay_task != nullptr) {
+ return;
+ }
+
+ dout(15) << dendl;
+ m_flush_local_replay_task = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ ImageReplayer<I>,
+ &ImageReplayer<I>::handle_flush_local_replay_task>(this));
+ m_threads->timer->add_event_after(30, m_flush_local_replay_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::cancel_flush_local_replay_task() {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ std::lock_guard timer_locker{m_threads->timer_lock};
+ if (m_flush_local_replay_task != nullptr) {
+ auto canceled = m_threads->timer->cancel_event(m_flush_local_replay_task);
+ m_flush_local_replay_task = nullptr;
+ ceph_assert(canceled);
+ }
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_flush_local_replay_task(int) {
+ dout(15) << dendl;
+
+ m_in_flight_op_tracker.start_op();
+ auto on_finish = new LambdaContext([this](int) {
+ {
+ std::lock_guard timer_locker{m_threads->timer_lock};
+ m_flush_local_replay_task = nullptr;
+ }
+
+ m_in_flight_op_tracker.finish_op();
+ });
+ flush_local_replay(on_finish);
+}
+
template <typename I>
void ImageReplayer<I>::flush_local_replay(Context* on_flush)
{
auto ctx = new LambdaContext(
[this, bytes, latency](int r) {
std::lock_guard locker{m_lock};
+ schedule_flush_local_replay_task();
+
if (m_perf_counters) {
m_perf_counters->inc(l_rbd_mirror_replay);
m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes);
[this, state](int r) {
send_mirror_status_update(state);
});
-
- // ensure pending IO is flushed and the commit position is updated
- // prior to updating the mirror status
- auto ctx2 = new LambdaContext(
- [this, ctx=std::move(ctx)](int r) {
- flush_local_replay(ctx);
- });
- m_threads->work_queue->queue(ctx2, 0);
+ m_threads->work_queue->queue(ctx, 0);
}
template <typename I>
return;
}
+ if (!m_in_flight_op_tracker.empty()) {
+ dout(15) << "waiting for in-flight operations to complete" << dendl;
+ m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this, r](int) {
+ handle_shut_down(r);
+ }));
+ return;
+ }
+
dout(10) << "stop complete" << dendl;
ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
m_replay_status_formatter = nullptr;