From a22cf518f9cc0872f72a8a850ea01558094ebd00 Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Thu, 28 Apr 2016 09:32:33 +0300 Subject: [PATCH] rbd-mirror: admin socket commands to start/stop/restart mirroring Signed-off-by: Mykola Golub --- qa/workunits/rbd/rbd_mirror.sh | 78 +++++++++++--- src/tools/rbd_mirror/ImageReplayer.cc | 142 ++++++++++++++++++++++---- src/tools/rbd_mirror/ImageReplayer.h | 7 +- src/tools/rbd_mirror/Mirror.cc | 117 ++++++++++++++++++++- src/tools/rbd_mirror/Mirror.h | 4 + src/tools/rbd_mirror/Replayer.cc | 129 ++++++++++++++++++++++- src/tools/rbd_mirror/Replayer.h | 4 + 7 files changed, 444 insertions(+), 37 deletions(-) diff --git a/qa/workunits/rbd/rbd_mirror.sh b/qa/workunits/rbd/rbd_mirror.sh index 93ed76c1d0bf..a9ec82568622 100755 --- a/qa/workunits/rbd/rbd_mirror.sh +++ b/qa/workunits/rbd/rbd_mirror.sh @@ -102,6 +102,11 @@ daemon_pid_file() fi } +testlog() +{ + echo $(date '+%F %T') $@ | tee -a "${TEMPDIR}/rbd-mirror.test.log" +} + setup() { local c @@ -195,6 +200,16 @@ stop_mirror() rm -f $(daemon_pid_file "${cluster}") } +admin_daemon() +{ + local cluster=$1 ; shift + + local asok_file=$(daemon_asok_file "${cluster}" "${cluster}") + test -S "${asok_file}" + + ceph --admin-daemon ${asok_file} $@ +} + status() { local cluster daemon image @@ -285,10 +300,7 @@ flush() cmd="${cmd} ${POOL}/${image}" fi - local asok_file=$(daemon_asok_file "${cluster}" "${cluster}") - test -S "${asok_file}" - - ceph --admin-daemon ${asok_file} ${cmd} + admin_daemon "${cluster}" ${cmd} } test_image_replay_state() @@ -298,12 +310,9 @@ test_image_replay_state() local test_state=$3 local current_state=stopped - local asok_file=$(daemon_asok_file "${cluster}" "${cluster}") - test -S "${asok_file}" - - ceph --admin-daemon ${asok_file} help | + admin_daemon "${cluster}" help | fgrep "\"rbd mirror status ${POOL}/${image}\"" && - ceph --admin-daemon ${asok_file} rbd mirror status ${POOL}/${image} | + admin_daemon "${cluster}" rbd mirror status ${POOL}/${image} | grep -i 'state.*Replaying' && current_state=started @@ -475,7 +484,7 @@ set -xe setup -echo "TEST: add image and test replay" +testlog "TEST: add image and test replay" start_mirror ${CLUSTER1} image=test create_image ${CLUSTER2} ${image} @@ -486,7 +495,7 @@ test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position' test_status_in_pool_dir ${CLUSTER2} ${image} 'down+unknown' compare_images ${image} -echo "TEST: stop mirror, add image, start mirror and test replay" +testlog "TEST: stop mirror, add image, start mirror and test replay" stop_mirror ${CLUSTER1} image1=test1 create_image ${CLUSTER2} ${image1} @@ -498,13 +507,56 @@ test_status_in_pool_dir ${CLUSTER1} ${image1} 'up+replaying' 'master_position' test_status_in_pool_dir ${CLUSTER2} ${image1} 'down+unknown' compare_images ${image1} -echo "TEST: test the first image is replaying after restart" +testlog "TEST: test the first image is replaying after restart" write_image ${CLUSTER2} ${image} 100 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${image} test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position' compare_images ${image} -echo "TEST: failover and failback" +testlog "TEST: stop/start/restart mirror via admin socket" +admin_daemon ${CLUSTER1} rbd mirror stop +wait_for_image_replay_stopped ${CLUSTER1} ${image} +wait_for_image_replay_stopped ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror start +wait_for_image_replay_started ${CLUSTER1} ${image} +wait_for_image_replay_started ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror restart +wait_for_image_replay_started ${CLUSTER1} ${image} +wait_for_image_replay_started ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror stop +wait_for_image_replay_stopped ${CLUSTER1} ${image} +wait_for_image_replay_stopped ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror restart +wait_for_image_replay_started ${CLUSTER1} ${image} +wait_for_image_replay_started ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror stop ${CLUSTER2} +wait_for_image_replay_stopped ${CLUSTER1} ${image} +wait_for_image_replay_stopped ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image} +wait_for_image_replay_started ${CLUSTER1} ${image} + +admin_daemon ${CLUSTER1} rbd mirror start +wait_for_image_replay_started ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror start ${CLUSTER2} + +admin_daemon ${CLUSTER1} rbd mirror restart ${POOL}/${image} +wait_for_image_replay_started ${CLUSTER1} ${image} + +admin_daemon ${CLUSTER1} rbd mirror restart ${CLUSTER2} +wait_for_image_replay_started ${CLUSTER1} ${image} +wait_for_image_replay_started ${CLUSTER1} ${image1} + +admin_daemon ${CLUSTER1} rbd mirror flush +admin_daemon ${CLUSTER1} rbd mirror status + +testlog "TEST: failover and failback" start_mirror ${CLUSTER2} # failover diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index 9cf445aa24fd..fcf93b48db0f 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -86,6 +86,48 @@ private: ImageReplayer *replayer; }; +template +class StartCommand : public ImageReplayerAdminSocketCommand { +public: + explicit StartCommand(ImageReplayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->start(nullptr, nullptr, true); + return true; + } + +private: + ImageReplayer *replayer; +}; + +template +class StopCommand : public ImageReplayerAdminSocketCommand { +public: + explicit StopCommand(ImageReplayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->stop(nullptr, true); + return true; + } + +private: + ImageReplayer *replayer; +}; + +template +class RestartCommand : public ImageReplayerAdminSocketCommand { +public: + explicit RestartCommand(ImageReplayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->restart(); + return true; + } + +private: + ImageReplayer *replayer; +}; + template class FlushCommand : public ImageReplayerAdminSocketCommand { public: @@ -122,6 +164,27 @@ public: commands[command] = new StatusCommand(replayer); } + command = "rbd mirror start " + name; + r = admin_socket->register_command(command, command, this, + "start rbd mirror " + name); + if (r == 0) { + commands[command] = new StartCommand(replayer); + } + + command = "rbd mirror stop " + name; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror " + name); + if (r == 0) { + commands[command] = new StopCommand(replayer); + } + + command = "rbd mirror restart " + name; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror " + name); + if (r == 0) { + commands[command] = new RestartCommand(replayer); + } + command = "rbd mirror flush " + name; r = admin_socket->register_command(command, command, this, "flush rbd mirror " + name); @@ -239,23 +302,41 @@ void ImageReplayer::set_state_description(int r, const std::string &desc) { template void ImageReplayer::start(Context *on_finish, - const BootstrapParams *bootstrap_params) + const BootstrapParams *bootstrap_params, + bool manual) { assert(m_on_start_finish == nullptr); assert(m_on_stop_finish == nullptr); dout(20) << "on_finish=" << on_finish << dendl; + int r = 0; { Mutex::Locker locker(m_lock); - assert(is_stopped_()); - m_state = STATE_STARTING; - m_last_r = 0; - m_state_desc.clear(); - m_on_start_finish = on_finish; + if (!is_stopped_()) { + derr << "already running" << dendl; + r = -EINVAL; + } else if (m_manual_stop && !manual) { + dout(5) << "stopped manually, ignoring start without manual flag" + << dendl; + r = -EPERM; + } else { + m_state = STATE_STARTING; + m_last_r = 0; + m_state_desc.clear(); + m_on_start_finish = on_finish; + m_manual_stop = false; + } + } + + if (r < 0) { + if (on_finish) { + on_finish->complete(r); + } + return; } - int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx); + r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx); if (r < 0) { derr << "error opening ioctx for remote pool " << m_remote_pool_id << ": " << cpp_strerror(r) << dendl; @@ -508,27 +589,39 @@ bool ImageReplayer::on_start_interrupted() } template -void ImageReplayer::stop(Context *on_finish) +void ImageReplayer::stop(Context *on_finish, bool manual) { dout(20) << "on_finish=" << on_finish << dendl; bool shut_down_replay = false; + bool running = true; { Mutex::Locker locker(m_lock); - assert(is_running_()); + if (!is_running_()) { + running = false; + } else { + if (!is_stopped_()) { + if (m_state == STATE_STARTING) { + dout(20) << "interrupting start" << dendl; + } else { + dout(20) << "interrupting replay" << dendl; + shut_down_replay = true; + } - if (!is_stopped_()) { - if (m_state == STATE_STARTING) { - dout(20) << "interrupting start" << dendl; - } else { - dout(20) << "interrupting replay" << dendl; - shut_down_replay = true; + assert(m_on_stop_finish == nullptr); + std::swap(m_on_stop_finish, on_finish); + m_stop_requested = true; + m_manual_stop = manual; } + } + } - assert(m_on_stop_finish == nullptr); - std::swap(m_on_stop_finish, on_finish); - m_stop_requested = true; + if (!running) { + derr << "not running" << dendl; + if (on_finish) { + on_finish->complete(-EINVAL); } + return; } if (shut_down_replay) { @@ -667,6 +760,19 @@ void ImageReplayer::handle_replay_ready() replay_flush(); } +template +void ImageReplayer::restart(Context *on_finish) +{ + FunctionContext *ctx = new FunctionContext( + [this, on_finish](int r) { + if (r < 0) { + // Try start anyway. + } + start(on_finish, nullptr, true); + }); + stop(ctx); +} + template void ImageReplayer::flush(Context *on_finish) { diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 9cff9bba4463..752eaadf9fd6 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -88,8 +88,10 @@ public: void set_state_description(int r, const std::string &desc); void start(Context *on_finish = nullptr, - const BootstrapParams *bootstrap_params = nullptr); - void stop(Context *on_finish = nullptr); + const BootstrapParams *bootstrap_params = nullptr, + bool manual = false); + void stop(Context *on_finish = nullptr, bool manual = false); + void restart(Context *on_finish = nullptr); void flush(Context *on_finish = nullptr); void print_status(Formatter *f, stringstream *ss); @@ -226,6 +228,7 @@ private: librados::AioCompletion *m_update_status_comp = nullptr; bool m_update_status_pending = false; bool m_stop_requested = false; + bool m_manual_stop = false; AdminSocketHook *m_asok_hook = nullptr; diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 5dd59bea3232..98787805593a 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -50,6 +50,45 @@ private: Mirror *mirror; }; +class StartCommand : public MirrorAdminSocketCommand { +public: + explicit StartCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->start(); + return true; + } + +private: + Mirror *mirror; +}; + +class StopCommand : public MirrorAdminSocketCommand { +public: + explicit StopCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->stop(); + return true; + } + +private: + Mirror *mirror; +}; + +class RestartCommand : public MirrorAdminSocketCommand { +public: + explicit RestartCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) { + mirror->restart(); + return true; + } + +private: + Mirror *mirror; +}; + class FlushCommand : public MirrorAdminSocketCommand { public: explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} @@ -79,6 +118,27 @@ public: commands[command] = new StatusCommand(mirror); } + command = "rbd mirror start"; + r = admin_socket->register_command(command, command, this, + "start rbd mirror"); + if (r == 0) { + commands[command] = new StartCommand(mirror); + } + + command = "rbd mirror stop"; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror"); + if (r == 0) { + commands[command] = new StopCommand(mirror); + } + + command = "rbd mirror restart"; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror"); + if (r == 0) { + commands[command] = new RestartCommand(mirror); + } + command = "rbd mirror flush"; r = admin_socket->register_command(command, command, this, "flush rbd mirror"); @@ -165,7 +225,9 @@ void Mirror::run() while (!m_stopping.read()) { m_local_cluster_watcher->refresh_pools(); Mutex::Locker l(m_lock); - update_replayers(m_local_cluster_watcher->get_peer_configs()); + if (!m_manual_stop) { + update_replayers(m_local_cluster_watcher->get_peer_configs()); + } // TODO: make interval configurable m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30)); } @@ -199,7 +261,24 @@ void Mirror::print_status(Formatter *f, stringstream *ss) } } -void Mirror::flush() +void Mirror::start() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_manual_stop = false; + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->start(); + } +} + +void Mirror::stop() { dout(20) << "enter" << dendl; Mutex::Locker l(m_lock); @@ -208,6 +287,40 @@ void Mirror::flush() return; } + m_manual_stop = true; + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->stop(); + } +} + +void Mirror::restart() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_manual_stop = false; + + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { + auto &replayer = it->second; + replayer->restart(); + } +} + +void Mirror::flush() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping.read() || m_manual_stop) { + return; + } + for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) { auto &replayer = it->second; replayer->flush(); diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index a23c448e2ba0..298f80519875 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -40,6 +40,9 @@ public: void handle_signal(int signum); void print_status(Formatter *f, stringstream *ss); + void start(); + void stop(); + void restart(); void flush(); private: @@ -57,6 +60,7 @@ private: std::unique_ptr m_local_cluster_watcher; std::map > m_replayers; atomic_t m_stopping; + bool m_manual_stop = false; MirrorAdminSocketHook *m_asok_hook; }; diff --git a/src/tools/rbd_mirror/Replayer.cc b/src/tools/rbd_mirror/Replayer.cc index ed2da4b2ff11..54a003d7044d 100644 --- a/src/tools/rbd_mirror/Replayer.cc +++ b/src/tools/rbd_mirror/Replayer.cc @@ -50,6 +50,45 @@ private: Replayer *replayer; }; +class StartCommand : public ReplayerAdminSocketCommand { +public: + explicit StartCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->start(); + return true; + } + +private: + Replayer *replayer; +}; + +class StopCommand : public ReplayerAdminSocketCommand { +public: + explicit StopCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->stop(); + return true; + } + +private: + Replayer *replayer; +}; + +class RestartCommand : public ReplayerAdminSocketCommand { +public: + explicit RestartCommand(Replayer *replayer) : replayer(replayer) {} + + bool call(Formatter *f, stringstream *ss) { + replayer->restart(); + return true; + } + +private: + Replayer *replayer; +}; + class FlushCommand : public ReplayerAdminSocketCommand { public: explicit FlushCommand(Replayer *replayer) : replayer(replayer) {} @@ -80,6 +119,27 @@ public: commands[command] = new StatusCommand(replayer); } + command = "rbd mirror start " + name; + r = admin_socket->register_command(command, command, this, + "start rbd mirror " + name); + if (r == 0) { + commands[command] = new StartCommand(replayer); + } + + command = "rbd mirror stop " + name; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror " + name); + if (r == 0) { + commands[command] = new StopCommand(replayer); + } + + command = "rbd mirror restart " + name; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror " + name); + if (r == 0) { + commands[command] = new RestartCommand(replayer); + } + command = "rbd mirror flush " + name; r = admin_socket->register_command(command, command, this, "flush rbd mirror " + name); @@ -275,7 +335,9 @@ void Replayer::run() while (!m_stopping.read()) { Mutex::Locker l(m_lock); - set_sources(m_pool_watcher->get_images()); + if (!m_manual_stop) { + set_sources(m_pool_watcher->get_images()); + } m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30)); } @@ -318,7 +380,28 @@ void Replayer::print_status(Formatter *f, stringstream *ss) } } -void Replayer::flush() +void Replayer::start() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_manual_stop = false; + + for (auto it = m_images.begin(); it != m_images.end(); it++) { + auto &pool_images = it->second; + for (auto i = pool_images.begin(); i != pool_images.end(); i++) { + auto &image_replayer = i->second; + image_replayer->start(nullptr, nullptr, true); + } + } +} + +void Replayer::stop() { dout(20) << "enter" << dendl; @@ -328,6 +411,48 @@ void Replayer::flush() return; } + m_manual_stop = true; + + for (auto it = m_images.begin(); it != m_images.end(); it++) { + auto &pool_images = it->second; + for (auto i = pool_images.begin(); i != pool_images.end(); i++) { + auto &image_replayer = i->second; + image_replayer->stop(nullptr, true); + } + } +} + +void Replayer::restart() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read()) { + return; + } + + m_manual_stop = false; + + for (auto it = m_images.begin(); it != m_images.end(); it++) { + auto &pool_images = it->second; + for (auto i = pool_images.begin(); i != pool_images.end(); i++) { + auto &image_replayer = i->second; + image_replayer->restart(); + } + } +} + +void Replayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping.read() || m_manual_stop) { + return; + } + for (auto it = m_images.begin(); it != m_images.end(); it++) { auto &pool_images = it->second; for (auto i = pool_images.begin(); i != pool_images.end(); i++) { diff --git a/src/tools/rbd_mirror/Replayer.h b/src/tools/rbd_mirror/Replayer.h index 37b96606a39c..0dcd5ed72dc7 100644 --- a/src/tools/rbd_mirror/Replayer.h +++ b/src/tools/rbd_mirror/Replayer.h @@ -42,6 +42,9 @@ public: void run(); void print_status(Formatter *f, stringstream *ss); + void start(); + void stop(); + void restart(); void flush(); private: @@ -60,6 +63,7 @@ private: Mutex m_lock; Cond m_cond; atomic_t m_stopping; + bool m_manual_stop = false; peer_t m_peer; std::vector m_args; -- 2.47.3