const std::string &description, bool flush)
{
const std::string desc = "bootstrapping, " + description;
-
- FunctionContext *ctx = new FunctionContext(
- [this, desc, flush](int r) {
- replayer->set_state_description(0, desc);
- if (flush) {
- replayer->update_mirror_image_status();
- }
- });
- replayer->m_threads->work_queue->queue(ctx, 0);
+ replayer->set_state_description(0, desc);
+ if (flush) {
+ replayer->update_mirror_image_status(false, boost::none);
+ }
}
template <typename I>
assert(m_replay_handler == nullptr);
assert(m_on_start_finish == nullptr);
assert(m_on_stop_finish == nullptr);
-
+ assert(m_in_flight_status_updates == 0);
delete m_asok_hook;
}
return;
}
- reschedule_update_status_task(10);
-
CephContext *cct = static_cast<CephContext *>(m_local->cct());
double commit_interval = cct->_conf->rbd_journal_commit_age;
m_remote_journaler = new Journaler(m_threads->work_queue,
m_bootstrap_request = request;
}
+ update_mirror_image_status(false, boost::none);
+ reschedule_update_status_task(10);
+
request->send();
- update_mirror_image_status();
}
template <typename I>
}
}
- update_mirror_image_status();
-
+ update_mirror_image_status(false, boost::none);
init_remote_journaler();
}
m_replay_status_formatter =
ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
- update_mirror_image_status();
+
+ update_mirror_image_status(true, boost::none);
reschedule_update_status_task(30);
dout(20) << "start succeeded" << dendl;
{
dout(20) << "r=" << r << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ m_state = STATE_STOPPING;
+ }
+ update_mirror_image_status(false, boost::none);
+
if (m_remote_journaler) {
if (m_remote_journaler->is_initialized()) {
m_remote_journaler->shut_down();
Context *on_stop_finish(nullptr);
{
Mutex::Locker locker(m_lock);
- if (r < 0) {
+ if (r < 0 && r != -EINTR) {
derr << "start failed: " << cpp_strerror(r) << dendl;
} else {
dout(20) << "start interrupted" << dendl;
std::swap(m_on_stop_finish, on_stop_finish);
}
+ update_mirror_image_status(true, STATE_STOPPED);
handle_stop(r, on_start_finish, on_stop_finish);
}
m_local_replay->shut_down(false, ctx);
}
- update_mirror_image_status();
+ set_state_description(0, "");
+ update_mirror_image_status(false, boost::none);
}
template <typename I>
std::swap(m_on_stop_finish, on_finish);
}
+ update_mirror_image_status(true, STATE_STOPPED);
handle_stop(r, nullptr, on_finish);
}
<< cpp_strerror(r) << dendl;
}
- update_mirror_image_status();
+ update_mirror_image_status(false, boost::none);
dout(20) << "flush complete, r=" << r << dendl;
on_flush->complete(r);
}
template <typename I>
-void ImageReplayer<I>::update_mirror_image_status(bool final,
- State expected_state)
-{
- dout(20) << "final=" << final << ", expected_state=" << expected_state
- << dendl;
-
- cls::rbd::MirrorImageStatus status;
-
+bool ImageReplayer<I>::update_mirror_image_status(bool force,
+ const OptionalState &state) {
+ dout(20) << dendl;
{
Mutex::Locker locker(m_lock);
+ if (!start_mirror_image_status_update(force, false)) {
+ return false;
+ }
+ }
- assert(!final || !is_running_());
-
- if (!final) {
- if (expected_state != STATE_UNKNOWN && expected_state != m_state) {
- dout(20) << "state changed" << dendl;
- return;
- }
- if (m_update_status_comp) {
- dout(20) << "already sending update" << dendl;
- m_update_status_pending = true;
- return;
- }
+ queue_mirror_image_status_update(state);
+ return true;
+}
- Context *ctx = new FunctionContext(
- [this](int r) {
- if (r < 0) {
- derr << "error updating mirror image status: " << cpp_strerror(r)
- << dendl;
- }
- bool pending = false;
- librados::AioCompletion *comp = nullptr;
- {
- Mutex::Locker locker(m_lock);
- std::swap(m_update_status_comp, comp);
- std::swap(m_update_status_pending, pending);
- }
- if (comp) {
- comp->release();
- }
- if (pending && r == 0 && is_running_()) {
- update_mirror_image_status();
- }
- });
- m_update_status_comp = create_rados_ack_callback(ctx);
- m_update_status_pending = false;
- }
+template <typename I>
+bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
+ bool restarting) {
+ assert(m_lock.is_locked());
- switch (m_state) {
- case STATE_STARTING:
- if (m_bootstrap_request != nullptr) {
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
- status.description = m_state_desc.empty() ? "syncing" : m_state_desc;
- } else {
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
- status.description = "starting replay";
- }
- break;
- case STATE_REPLAYING:
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
- break;
- case STATE_STOPPING:
- if (m_local_image_ctx) {
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
- status.description = "stopping replay";
- break;
- }
- /* FALLTHROUGH */
- case STATE_STOPPED:
- if (m_last_r < 0) {
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
- status.description = m_state_desc;
- } else {
- status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
- status.description = m_state_desc.empty() ? "stopped" : m_state_desc;
- }
- break;
- default:
- assert(!"invalid state");
+ if (!force && !is_stopped_()) {
+ if (!is_running_()) {
+ dout(20) << "shut down in-progress: ignoring update" << dendl;
+ return false;
+ } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
+ dout(20) << "already sending update" << dendl;
+ m_update_status_requested = true;
+ return false;
}
}
- if (status.state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING) {
- Context *on_req_finish = new FunctionContext(
- [this](int r) {
- if (r == 0) {
- librados::AioCompletion *comp = nullptr;
- {
- Mutex::Locker locker(m_lock);
- std::swap(m_update_status_comp, comp);
- }
- if (comp) {
- comp->release();
- }
- update_mirror_image_status(false, STATE_REPLAYING);
- }
- });
- std::string desc;
- if (!m_replay_status_formatter->get_or_send_update(&desc, on_req_finish)) {
+ dout(20) << dendl;
+ ++m_in_flight_status_updates;
+ return true;
+}
+
+template <typename I>
+void ImageReplayer<I>::finish_mirror_image_status_update() {
+ Context *on_finish = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_in_flight_status_updates > 0);
+ if (--m_in_flight_status_updates > 0) {
+ dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
+ << "updates" << dendl;
return;
}
- status.description = "replaying, " + desc;
+
+ std::swap(on_finish, m_on_update_status_finish);
}
- dout(20) << "status=" << status << dendl;
+ dout(20) << dendl;
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
+ }
+}
- librados::ObjectWriteOperation op;
- librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
+template <typename I>
+void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
+ dout(20) << dendl;
+ FunctionContext *ctx = new FunctionContext(
+ [this, state](int r) {
+ send_mirror_status_update(state);
+ });
+ m_threads->work_queue->queue(ctx, 0);
+}
- if (final) {
- reschedule_update_status_task(-1);
- m_local_ioctx.aio_flush();
- librados::AioCompletion *comp = nullptr;
+template <typename I>
+void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
+ State state;
+ std::string state_desc;
+ int last_r;
+ bool bootstrapping;
+ bool stopping_replay;
+ {
+ Mutex::Locker locker(m_lock);
+ state = m_state;
+ state_desc = m_state_desc;
+ last_r = m_last_r;
+ bootstrapping = (m_bootstrap_request != nullptr);
+ stopping_replay = (m_local_image_ctx != nullptr);
+ }
+
+ if (opt_state) {
+ state = *opt_state;
+ }
+
+ cls::rbd::MirrorImageStatus status;
+ status.up = true;
+ switch (state) {
+ case STATE_STARTING:
+ if (bootstrapping) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
+ status.description = state_desc.empty() ? "syncing" : state_desc;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
+ status.description = "starting replay";
+ }
+ break;
+ case STATE_REPLAYING:
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
{
- Mutex::Locker locker(m_lock);
- std::swap(m_update_status_comp, comp);
+ Context *on_req_finish = new FunctionContext(
+ [this](int r) {
+ if (r >= 0) {
+ dout(20) << "replay status ready" << dendl;
+ send_mirror_status_update(boost::none);
+ }
+ });
+
+ std::string desc;
+ if (!m_replay_status_formatter->get_or_send_update(&desc,
+ on_req_finish)) {
+ dout(20) << "waiting for replay status" << dendl;
+ return;
+ }
+ status.description = "replaying, " + desc;
}
- if (comp) {
- comp->wait_for_complete();
+ break;
+ case STATE_STOPPING:
+ if (stopping_replay) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
+ status.description = "stopping replay";
+ break;
}
- int r = m_local_ioctx.operate(RBD_MIRRORING, &op);
- if (r < 0) {
- derr << "error updating mirror image status: " << cpp_strerror(r)
- << dendl;
+ // FALLTHROUGH
+ case STATE_STOPPED:
+ if (last_r < 0) {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+ status.description = state_desc;
+ } else {
+ status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
+ status.description = state_desc.empty() ? "stopped" : state_desc;
}
- return;
+ break;
+ default:
+ assert(!"invalid state");
}
- int r = m_local_ioctx.aio_operate(RBD_MIRRORING, m_update_status_comp, &op);
- assert(r == 0);
+ dout(20) << "status=" << status << dendl;
+ librados::ObjectWriteOperation op;
+ librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
- reschedule_update_status_task();
+ librados::AioCompletion *aio_comp = create_rados_ack_callback<
+ ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
+ int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
+ assert(r == 0);
+ aio_comp->release();
}
template <typename I>
-void ImageReplayer<I>::reschedule_update_status_task(int new_interval)
-{
- Mutex::Locker locker(m_threads->timer_lock);
+void ImageReplayer<I>::handle_mirror_status_update(int r) {
+ dout(20) << "r=" << r << dendl;
- if (m_update_status_task) {
- m_threads->timer->cancel_event(m_update_status_task);
- m_update_status_task = nullptr;
- }
+ bool running = false;
+ bool started = false;
+ {
+ Mutex::Locker locker(m_lock);
+ bool update_status_requested = false;
+ std::swap(update_status_requested, m_update_status_requested);
- if (new_interval > 0) {
- m_update_status_interval = new_interval;
+ running = is_running_();
+ if (running && update_status_requested) {
+ started = start_mirror_image_status_update(false, true);
+ }
}
- if (new_interval < 0) {
- return;
+ // if a deferred update is available, send it -- otherwise reschedule
+ // the timer task
+ if (started) {
+ queue_mirror_image_status_update(boost::none);
+ } else if (running) {
+ reschedule_update_status_task();
}
- m_update_status_task = new FunctionContext(
- [this](int r) {
- start_update_status_task();
- });
-
- m_threads->timer->add_event_after(m_update_status_interval,
- m_update_status_task);
+ // mark committed status update as no longer in-flight
+ finish_mirror_image_status_update();
}
template <typename I>
-void ImageReplayer<I>::start_update_status_task()
-{
- FunctionContext *ctx = new FunctionContext(
- [this](int r) {
- {
- Mutex::Locker locker(m_threads->timer_lock);
- m_update_status_task = nullptr;
- }
- update_mirror_image_status();
- });
- m_threads->work_queue->queue(ctx, 0);
+void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
+ dout(20) << dendl;
+
+ bool canceled_task = false;
+ {
+ Mutex::Locker locker(m_lock);
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+
+ if (m_update_status_task) {
+ canceled_task = m_threads->timer->cancel_event(m_update_status_task);
+ m_update_status_task = nullptr;
+ }
+
+ if (new_interval > 0) {
+ m_update_status_interval = new_interval;
+ }
+
+ bool restarting = (new_interval == 0 || canceled_task);
+ if (new_interval >= 0 && is_running_() &&
+ start_mirror_image_status_update(false, restarting)) {
+ m_update_status_task = new FunctionContext(
+ [this](int r) {
+ assert(m_threads->timer_lock.is_locked());
+ m_update_status_task = nullptr;
+
+ queue_mirror_image_status_update(boost::none);
+ });
+ m_threads->timer->add_event_after(m_update_status_interval,
+ m_update_status_task);
+ }
+ }
+
+ if (canceled_task) {
+ dout(20) << "canceled task" << dendl;
+ finish_mirror_image_status_update();
+ }
}
template <typename I>
void ImageReplayer<I>::handle_stop(int r, Context *on_start, Context *on_stop) {
+ reschedule_update_status_task(-1);
+
{
Mutex::Locker locker(m_lock);
+
+ // if status updates are in-flight, wait for them to complete
+ // before proceeding
+ if (m_in_flight_status_updates > 0) {
+ dout(20) << "waiting for in-flight status update" << dendl;
+ assert(m_on_update_status_finish == nullptr);
+ m_on_update_status_finish = new FunctionContext(
+ [this, r, on_start, on_stop](int r) {
+ handle_stop(r, on_start, on_stop);
+ });
+ return;
+ }
+
m_stop_requested = false;
m_state = STATE_STOPPED;
- m_last_r = r;
- if (r >= 0) {
- m_state_desc.clear();
- }
+ m_state_desc.clear();
+ m_last_r = 0;
}
dout(20) << "stop complete" << dendl;
- update_mirror_image_status(true);
-
m_local_ioctx.close();
m_remote_ioctx.close();
if (on_start != nullptr) {
dout(20) << "on start finish complete, r=" << r << dendl;
on_start->complete(r);
+ r = 0;
}
if (on_stop != nullptr) {
dout(20) << "on stop finish complete, r=" << r << dendl;
- on_stop->complete(0);
+ on_stop->complete(r);
}
}