template <typename I>
bool Replayer<I>::is_replay_interrupted(std::unique_lock<ceph::mutex>* locker) {
- if (m_state == STATE_COMPLETE || m_stop_requested) {
+ if (m_state == STATE_COMPLETE) {
locker->unlock();
return true;
}
return;
}
- m_stop_requested = true;
m_state = STATE_COMPLETE;
notify_group_listener();
}
dout(10) << "m_local_group_id=" << m_local_group_id << dendl;
std::unique_lock locker{m_lock};
- if (is_replay_interrupted(&locker) || m_stop_requested) {
+ if (is_replay_interrupted(&locker)) {
return;
}
-
- if (m_state != STATE_COMPLETE) {
- m_state = STATE_REPLAYING;
- }
+ m_state = STATE_REPLAYING;
// early exit if no snapshots to process
if (m_local_group_snaps.empty()) {
} else { // empty local cluster, started mirroring freshly
try_create_group_snapshot("", locker);
}
-
- if (m_stop_requested) {
- // stop group replayer
- handle_replay_complete(locker, 0, "");
- return;
- }
locker->unlock();
+
schedule_load_group_snapshots();
}
{
std::unique_lock locker{m_lock};
- m_stop_requested = false;
ceph_assert(m_on_shutdown == nullptr);
std::swap(m_on_shutdown, on_finish);
+ m_error_code = 0;
+ m_error_description = "";
+ ceph_assert(m_state != STATE_INIT);
auto state = STATE_COMPLETE;
std::swap(m_state, state);
}
cancel_load_group_snapshots();
- if (!m_in_flight_op_tracker.empty()) {
- m_in_flight_op_tracker.wait_for_ops(new LambdaContext([this](int) {
- finish_shut_down();
- }));
- return;
- }
-
- finish_shut_down();
- return;
+ wait_for_in_flight_ops();
}
template <typename I>
-void Replayer<I>::finish_shut_down() {
+void Replayer<I>::wait_for_in_flight_ops() {
dout(10) << dendl;
- Context *on_finish = nullptr;
+ auto ctx = create_async_context_callback(
+ m_threads->work_queue, create_context_callback<
+ Replayer<I>, &Replayer<I>::handle_wait_for_in_flight_ops>(this));
+ m_in_flight_op_tracker.wait_for_ops(ctx);
+}
+
+template <typename I>
+void Replayer<I>::handle_wait_for_in_flight_ops(int r) {
+ dout(10) << "r=" << r << dendl;
+ Context *on_finish = nullptr;
{
std::unique_lock locker{m_lock};
ceph_assert(m_on_shutdown != nullptr);
std::swap(m_on_shutdown, on_finish);
}
- if (on_finish) {
- on_finish->complete(0);
- }
+ on_finish->complete(m_error_code);
}
template <typename I>
}
void init(Context* on_finish);
void shut_down(Context* on_finish);
- void finish_shut_down();
bool is_replaying() const {
std::unique_lock locker{m_lock};
int m_error_code = 0;
std::string m_error_description;
- bool m_stop_requested = false;
bool m_retry_validate_snap = false;
utime_t m_snapshot_start;
void set_image_replayer_limits(const std::string &image_id,
const cls::rbd::GroupSnapshot *remote_snap,
std::unique_lock<ceph::mutex>* locker);
+ void wait_for_in_flight_ops();
+ void handle_wait_for_in_flight_ops(int r);
};
} // namespace group_replayer