stop();
}
-
-class ImageReplayer : public rbd::mirror::ImageReplayer<> {
-public:
- ImageReplayer(rbd::mirror::Threads *threads,
- rbd::mirror::RadosRef local, rbd::mirror::RadosRef remote,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- int64_t local_pool_id,
- int64_t remote_pool_id, const std::string &remote_image_id,
- const std::string &global_image_id)
- : rbd::mirror::ImageReplayer<>(threads, local, remote, local_mirror_uuid,
- remote_mirror_uuid, local_pool_id,
- remote_pool_id, remote_image_id,
- global_image_id)
- {}
-
- void set_error(const std::string &state, int r) {
- m_errors[state] = r;
- }
-
- int get_error(const std::string &state) const {
- std::map<std::string, int>::const_iterator i = m_errors.find(state);
- return i == m_errors.end() ? 0 : i->second;
- }
-
-protected:
- virtual void on_stop_journal_replay_shut_down_finish(int r) {
- ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer<>::on_stop_journal_replay_shut_down_finish(
- get_error("on_stop_journal_replay_shut_down"));
- }
-
- virtual void on_stop_local_image_close_finish(int r) {
- ASSERT_EQ(0, r);
- rbd::mirror::ImageReplayer<>::on_stop_local_image_close_finish(
- get_error("on_stop_local_image_close"));
- }
-
-private:
- std::map<std::string, int> m_errors;
-};
-
-#define TEST_ON_START_ERROR(state) \
-TEST_F(TestImageReplayer, Error_on_start_##state) \
-{ \
- create_replayer<ImageReplayer>(); \
- reinterpret_cast<ImageReplayer *>(m_replayer)-> \
- set_error("on_start_" #state, -1); \
- rbd::mirror::ImageReplayer<>::BootstrapParams \
- bootstap_params(m_image_name); \
- C_SaferCond cond; \
- m_replayer->start(&cond, &bootstap_params); \
- ASSERT_EQ(-1, cond.wait()); \
-}
-
-#define TEST_ON_STOP_ERROR(state) \
-TEST_F(TestImageReplayer, Error_on_stop_##state) \
-{ \
- create_replayer<ImageReplayer>(); \
- reinterpret_cast<ImageReplayer *>(m_replayer)-> \
- set_error("on_stop_" #state, -1); \
- rbd::mirror::ImageReplayer<>::BootstrapParams \
- bootstap_params(m_image_name); \
- start(&bootstap_params); \
- /* TODO: investigate: without wait below I observe: */ \
- /* librbd/journal/Replay.cc: 70: FAILED assert(m_op_events.empty()) */\
- wait_for_replay_complete(); \
- C_SaferCond cond; \
- m_replayer->stop(&cond); \
- ASSERT_EQ(0, cond.wait()); \
-}
-
-TEST_ON_STOP_ERROR(journal_replay_shut_down);
-TEST_ON_STOP_ERROR(no_error);
-
if (r < 0) {
derr << "error opening ioctx for remote pool " << m_remote_pool_id
<< ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r, "error opening remote pool");
+ on_start_fail(r, "error opening remote pool");
return;
}
if (r < 0) {
derr << "error opening ioctx for local pool " << m_local_pool_id
<< ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r, "error opening local pool");
+ on_start_fail(r, "error opening local pool");
return;
}
if (r == -EREMOTEIO) {
dout(5) << "remote image is non-primary or local image is primary" << dendl;
- on_start_fail_start(0, "remote image is non-primary or local image is primary");
+ on_start_fail(0, "remote image is non-primary or local image is primary");
return;
} else if (r < 0) {
- on_start_fail_start(r, "error bootstrapping replay");
+ on_start_fail(r, "error bootstrapping replay");
return;
} else if (on_start_interrupted()) {
return;
if (r < 0) {
derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
- on_start_fail_start(r, "error initializing remote journal");
+ on_start_fail(r, "error initializing remote journal");
return;
} else if (on_start_interrupted()) {
return;
if (r < 0) {
derr << "error starting external replay on local image "
<< m_local_image_id << ": " << cpp_strerror(r) << dendl;
- on_start_fail_start(r, "error starting replay on local image");
+ on_start_fail(r, "error starting replay on local image");
return;
}
}
template <typename I>
-void ImageReplayer<I>::on_start_fail_start(int r, const std::string &desc)
+void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
dout(20) << "r=" << r << dendl;
+ Context *ctx = new FunctionContext([this, r, desc](int _r) {
+ Context *on_start_finish(nullptr);
+ {
+ Mutex::Locker locker(m_lock);
+ m_state = STATE_STOPPING;
+ if (r < 0 && r != -EINTR) {
+ derr << "start failed: " << cpp_strerror(r) << dendl;
+ } else {
+ dout(20) << "start interrupted" << dendl;
+ }
+ std::swap(m_on_start_finish, on_start_finish);
+ }
- FunctionContext *ctx = new FunctionContext(
- [this, r, desc](int r1) {
- assert(r1 == 0);
set_state_description(r, desc);
- on_start_fail_finish(r);
+ update_mirror_image_status(false, boost::none);
+ shut_down(r, on_start_finish);
});
-
m_threads->work_queue->queue(ctx, 0);
}
-template <typename I>
-void ImageReplayer<I>::on_start_fail_finish(int r)
-{
- dout(20) << "r=" << r << dendl;
-
- {
- Mutex::Locker locker(m_lock);
- m_state = STATE_STOPPING;
- }
- update_mirror_image_status(false, boost::none);
-
- if (m_remote_journaler) {
- if (m_remote_journaler->is_initialized()) {
- m_remote_journaler->shut_down();
- }
- delete m_remote_journaler;
- m_remote_journaler = nullptr;
- }
-
- if (m_local_replay) {
- shut_down_journal_replay(true);
- m_local_image_ctx->journal->stop_external_replay();
- m_local_replay = nullptr;
- }
-
- if (m_replay_handler) {
- delete m_replay_handler;
- m_replay_handler = nullptr;
- }
-
- if (m_local_image_ctx) {
- // TODO: switch to async close via CloseImageRequest
- m_local_image_ctx->state->close();
- m_local_image_ctx = nullptr;
- }
-
- Context *on_start_finish(nullptr);
- Context *on_stop_finish(nullptr);
- {
- Mutex::Locker locker(m_lock);
- if (r < 0 && r != -EINTR) {
- derr << "start failed: " << cpp_strerror(r) << dendl;
- } else {
- dout(20) << "start interrupted" << dendl;
- }
-
- std::swap(m_on_start_finish, on_start_finish);
- std::swap(m_on_stop_finish, on_stop_finish);
- }
-
- update_mirror_image_status(true, STATE_STOPPED);
- handle_stop(r, on_start_finish, on_stop_finish);
-}
-
template <typename I>
bool ImageReplayer<I>::on_start_interrupted()
{
return false;
}
- on_start_fail_start(-EINTR);
+ on_start_fail(-EINTR);
return true;
}
}
if (shut_down_replay) {
- on_stop_journal_replay_shut_down_start();
+ on_stop_journal_replay();
} else if (on_finish != nullptr) {
on_finish->complete(0);
}
}
template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
+void ImageReplayer<I>::on_stop_journal_replay()
{
dout(20) << "enter" << dendl;
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- on_stop_journal_replay_shut_down_finish(r);
- });
-
{
Mutex::Locker locker(m_lock);
-
- // as we complete in-flight records, we might receive multiple stop requests
if (m_state != STATE_REPLAYING) {
+ // might be invoked multiple times while stopping
return;
}
m_state = STATE_STOPPING;
- m_local_replay->shut_down(false, ctx);
}
set_state_description(0, "");
update_mirror_image_status(false, boost::none);
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay_shut_down_finish(int r)
-{
- dout(20) << "r=" << r << dendl;
- if (r < 0) {
- derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
- }
-
- {
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STOPPING);
- m_local_image_ctx->journal->stop_external_replay();
- m_local_replay = nullptr;
- m_replay_entry = ReplayEntry();
- m_replay_tag_valid = false;
- }
-
- on_stop_local_image_close_start();
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_local_image_close_start()
-{
- dout(20) << "enter" << dendl;
-
- // close and delete the image (from outside the image's thread context)
- Context *ctx = create_context_callback<
- ImageReplayer, &ImageReplayer<I>::on_stop_local_image_close_finish>(this);
- CloseImageRequest<I> *request = CloseImageRequest<I>::create(
- &m_local_image_ctx, m_threads->work_queue, false, ctx);
- request->send();
-}
-
-template <typename I>
-void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
-{
- dout(20) << "r=" << r << dendl;
- if (r < 0) {
- derr << "error closing local image: " << cpp_strerror(r) << dendl;
- }
-
- delete m_replay_status_formatter;
- m_replay_status_formatter = nullptr;
-
- m_remote_journaler->stop_replay();
- m_remote_journaler->shut_down();
- delete m_remote_journaler;
- m_remote_journaler = nullptr;
-
- delete m_replay_handler;
- m_replay_handler = nullptr;
-
- Context *on_finish(nullptr);
- {
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STOPPING);
- std::swap(m_on_stop_finish, on_finish);
- }
-
- update_mirror_image_status(true, STATE_STOPPED);
- handle_stop(r, nullptr, on_finish);
+ shut_down(0, nullptr);
}
template <typename I>
}
if (shut_down) {
- on_stop_journal_replay_shut_down_start();
+ on_stop_journal_replay();
}
return shut_down;
}
m_remote_journaler->committed(replay_entry);
}
-template <typename I>
-void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
-{
- C_SaferCond cond;
- m_local_replay->shut_down(cancel_ops, &cond);
- int r = cond.wait();
- if (r < 0) {
- derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
- }
-}
-
template <typename I>
bool ImageReplayer<I>::update_mirror_image_status(bool force,
const OptionalState &state) {
}
template <typename I>
-void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
+void ImageReplayer<I>::shut_down(int r, Context *on_start) {
+ dout(20) << "r=" << r << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_STOPPING);
+ }
+
+ // chain the shut down sequence (reverse order)
+ Context *ctx = new FunctionContext(
+ [this, r, on_start](int _r) {
+ update_mirror_image_status(true, STATE_STOPPED);
+ handle_shut_down(r, on_start);
+ });
+ if (m_local_image_ctx) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ CloseImageRequest<I> *request = CloseImageRequest<I>::create(
+ &m_local_image_ctx, m_threads->work_queue, false, ctx);
+ request->send();
+ });
+ }
+ if (m_local_replay != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ if (r < 0) {
+ derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
+ }
+ m_local_image_ctx->journal->stop_external_replay();
+ m_local_replay = nullptr;
+ ctx->complete(0);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_local_replay->shut_down(true, ctx);
+ });
+ }
+ if (m_remote_journaler != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ delete m_remote_journaler;
+ m_remote_journaler = nullptr;
+ ctx->complete(0);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_remote_journaler->shut_down(ctx);
+ });
+ }
+ if (m_replay_handler != nullptr) {
+ ctx = new FunctionContext([this, ctx](int r) {
+ delete m_replay_handler;
+ m_replay_handler = nullptr;
+ ctx->complete(0);
+ });
+ ctx = new FunctionContext([this, ctx](int r) {
+ m_remote_journaler->stop_replay(ctx);
+ });
+ }
+ m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageReplayer<I>::handle_shut_down(int r, Context *on_start) {
reschedule_update_status_task(-1);
+ Context *on_stop = nullptr;
{
Mutex::Locker locker(m_lock);
dout(20) << "waiting for in-flight status update" << dendl;
assert(m_on_update_status_finish == nullptr);
m_on_update_status_finish = new FunctionContext(
- [this, r, on_start, on_stop](int r) {
- handle_stop(r, on_start, on_stop);
+ [this, r, on_start](int r) {
+ handle_shut_down(r, on_start);
});
return;
}
+ std::swap(on_stop, m_on_stop_finish);
m_stop_requested = false;
+ assert(m_state == STATE_STOPPING);
m_state = STATE_STOPPED;
m_state_desc.clear();
m_last_r = 0;
m_local_ioctx.close();
m_remote_ioctx.close();
+ delete m_replay_status_formatter;
+ m_replay_status_formatter = nullptr;
+
if (on_start != nullptr) {
dout(20) << "on start finish complete, r=" << r << dendl;
on_start->complete(r);