]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: admin socket commands to start/stop/restart mirroring 8809/head
authorMykola Golub <mgolub@mirantis.com>
Thu, 28 Apr 2016 06:32:33 +0000 (09:32 +0300)
committerMykola Golub <mgolub@mirantis.com>
Mon, 2 May 2016 19:52:02 +0000 (22:52 +0300)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
qa/workunits/rbd/rbd_mirror.sh
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index 93ed76c1d0bf9039e8ca07c82723ea42a4ff9458..a9ec825686220890289d980b24ec8f44965de0ad 100755 (executable)
@@ -102,6 +102,11 @@ daemon_pid_file()
     fi
 }
 
+testlog()
+{
+    echo $(date '+%F %T') $@ | tee -a "${TEMPDIR}/rbd-mirror.test.log"
+}
+
 setup()
 {
     local c
@@ -195,6 +200,16 @@ stop_mirror()
     rm -f $(daemon_pid_file "${cluster}")
 }
 
+admin_daemon()
+{
+    local cluster=$1 ; shift
+
+    local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
+    test -S "${asok_file}"
+
+    ceph --admin-daemon ${asok_file} $@
+}
+
 status()
 {
     local cluster daemon image
@@ -285,10 +300,7 @@ flush()
        cmd="${cmd} ${POOL}/${image}"
     fi
 
-    local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
-    test -S "${asok_file}"
-
-    ceph --admin-daemon ${asok_file} ${cmd}
+    admin_daemon "${cluster}" ${cmd}
 }
 
 test_image_replay_state()
@@ -298,12 +310,9 @@ test_image_replay_state()
     local test_state=$3
     local current_state=stopped
 
-    local asok_file=$(daemon_asok_file "${cluster}" "${cluster}")
-    test -S "${asok_file}"
-
-    ceph --admin-daemon ${asok_file} help |
+    admin_daemon "${cluster}" help |
        fgrep "\"rbd mirror status ${POOL}/${image}\"" &&
-    ceph --admin-daemon ${asok_file} rbd mirror status ${POOL}/${image} |
+    admin_daemon "${cluster}" rbd mirror status ${POOL}/${image} |
        grep -i 'state.*Replaying' &&
     current_state=started
 
@@ -475,7 +484,7 @@ set -xe
 
 setup
 
-echo "TEST: add image and test replay"
+testlog "TEST: add image and test replay"
 start_mirror ${CLUSTER1}
 image=test
 create_image ${CLUSTER2} ${image}
@@ -486,7 +495,7 @@ test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position'
 test_status_in_pool_dir ${CLUSTER2} ${image} 'down+unknown'
 compare_images ${image}
 
-echo "TEST: stop mirror, add image, start mirror and test replay"
+testlog "TEST: stop mirror, add image, start mirror and test replay"
 stop_mirror ${CLUSTER1}
 image1=test1
 create_image ${CLUSTER2} ${image1}
@@ -498,13 +507,56 @@ test_status_in_pool_dir ${CLUSTER1} ${image1} 'up+replaying' 'master_position'
 test_status_in_pool_dir ${CLUSTER2} ${image1} 'down+unknown'
 compare_images ${image1}
 
-echo "TEST: test the first image is replaying after restart"
+testlog "TEST: test the first image is replaying after restart"
 write_image ${CLUSTER2} ${image} 100
 wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${image}
 test_status_in_pool_dir ${CLUSTER1} ${image} 'up+replaying' 'master_position'
 compare_images ${image}
 
-echo "TEST: failover and failback"
+testlog "TEST: stop/start/restart mirror via admin socket"
+admin_daemon ${CLUSTER1} rbd mirror stop
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror restart
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror stop
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror restart
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror stop ${CLUSTER2}
+wait_for_image_replay_stopped ${CLUSTER1} ${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+
+admin_daemon ${CLUSTER1} rbd mirror start
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror start ${CLUSTER2}
+
+admin_daemon ${CLUSTER1} rbd mirror restart ${POOL}/${image}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+
+admin_daemon ${CLUSTER1} rbd mirror restart ${CLUSTER2}
+wait_for_image_replay_started ${CLUSTER1} ${image}
+wait_for_image_replay_started ${CLUSTER1} ${image1}
+
+admin_daemon ${CLUSTER1} rbd mirror flush
+admin_daemon ${CLUSTER1} rbd mirror status
+
+testlog "TEST: failover and failback"
 start_mirror ${CLUSTER2}
 
 # failover
index 9cf445aa24fd57ddec3bd0109ba60c8295f472d9..fcf93b48db0f8bd13bc6c02cf01d98bd700b0803 100644 (file)
@@ -86,6 +86,48 @@ private:
   ImageReplayer<I> *replayer;
 };
 
+template <typename I>
+class StartCommand : public ImageReplayerAdminSocketCommand {
+public:
+  explicit StartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->start(nullptr, nullptr, true);
+    return true;
+  }
+
+private:
+  ImageReplayer<I> *replayer;
+};
+
+template <typename I>
+class StopCommand : public ImageReplayerAdminSocketCommand {
+public:
+  explicit StopCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->stop(nullptr, true);
+    return true;
+  }
+
+private:
+  ImageReplayer<I> *replayer;
+};
+
+template <typename I>
+class RestartCommand : public ImageReplayerAdminSocketCommand {
+public:
+  explicit RestartCommand(ImageReplayer<I> *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->restart();
+    return true;
+  }
+
+private:
+  ImageReplayer<I> *replayer;
+};
+
 template <typename I>
 class FlushCommand : public ImageReplayerAdminSocketCommand {
 public:
@@ -122,6 +164,27 @@ public:
       commands[command] = new StatusCommand<I>(replayer);
     }
 
+    command = "rbd mirror start " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "start rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StartCommand<I>(replayer);
+    }
+
+    command = "rbd mirror stop " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "stop rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StopCommand<I>(replayer);
+    }
+
+    command = "rbd mirror restart " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "restart rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new RestartCommand<I>(replayer);
+    }
+
     command = "rbd mirror flush " + name;
     r = admin_socket->register_command(command, command, this,
                                       "flush rbd mirror " + name);
@@ -239,23 +302,41 @@ void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
 
 template <typename I>
 void ImageReplayer<I>::start(Context *on_finish,
-                            const BootstrapParams *bootstrap_params)
+                            const BootstrapParams *bootstrap_params,
+                            bool manual)
 {
   assert(m_on_start_finish == nullptr);
   assert(m_on_stop_finish == nullptr);
   dout(20) << "on_finish=" << on_finish << dendl;
 
+  int r = 0;
   {
     Mutex::Locker locker(m_lock);
-    assert(is_stopped_());
 
-    m_state = STATE_STARTING;
-    m_last_r = 0;
-    m_state_desc.clear();
-    m_on_start_finish = on_finish;
+    if (!is_stopped_()) {
+      derr << "already running" << dendl;
+      r = -EINVAL;
+    } else if (m_manual_stop && !manual) {
+      dout(5) << "stopped manually, ignoring start without manual flag"
+             << dendl;
+      r = -EPERM;
+    } else {
+      m_state = STATE_STARTING;
+      m_last_r = 0;
+      m_state_desc.clear();
+      m_on_start_finish = on_finish;
+      m_manual_stop = false;
+    }
+  }
+
+  if (r < 0) {
+    if (on_finish) {
+      on_finish->complete(r);
+    }
+    return;
   }
 
-  int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
+  r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
   if (r < 0) {
     derr << "error opening ioctx for remote pool " << m_remote_pool_id
         << ": " << cpp_strerror(r) << dendl;
@@ -508,27 +589,39 @@ bool ImageReplayer<I>::on_start_interrupted()
 }
 
 template <typename I>
-void ImageReplayer<I>::stop(Context *on_finish)
+void ImageReplayer<I>::stop(Context *on_finish, bool manual)
 {
   dout(20) << "on_finish=" << on_finish << dendl;
 
   bool shut_down_replay = false;
+  bool running = true;
   {
     Mutex::Locker locker(m_lock);
-    assert(is_running_());
+    if (!is_running_()) {
+      running = false;
+    } else {
+      if (!is_stopped_()) {
+       if (m_state == STATE_STARTING) {
+         dout(20) << "interrupting start" << dendl;
+       } else {
+         dout(20) << "interrupting replay" << dendl;
+         shut_down_replay = true;
+       }
 
-    if (!is_stopped_()) {
-      if (m_state == STATE_STARTING) {
-        dout(20) << "interrupting start" << dendl;
-      } else {
-        dout(20) << "interrupting replay" << dendl;
-        shut_down_replay = true;
+       assert(m_on_stop_finish == nullptr);
+       std::swap(m_on_stop_finish, on_finish);
+       m_stop_requested = true;
+       m_manual_stop = manual;
       }
+    }
+  }
 
-      assert(m_on_stop_finish == nullptr);
-      std::swap(m_on_stop_finish, on_finish);
-      m_stop_requested = true;
+  if (!running) {
+    derr << "not running" << dendl;
+    if (on_finish) {
+      on_finish->complete(-EINVAL);
     }
+    return;
   }
 
   if (shut_down_replay) {
@@ -667,6 +760,19 @@ void ImageReplayer<I>::handle_replay_ready()
   replay_flush();
 }
 
+template <typename I>
+void ImageReplayer<I>::restart(Context *on_finish)
+{
+  FunctionContext *ctx = new FunctionContext(
+    [this, on_finish](int r) {
+      if (r < 0) {
+       // Try start anyway.
+      }
+      start(on_finish, nullptr, true);
+    });
+  stop(ctx);
+}
+
 template <typename I>
 void ImageReplayer<I>::flush(Context *on_finish)
 {
index 9cff9bba44638ec2c05010bfa6010cfecf8e5d70..752eaadf9fd66f7ffa84ae0b55e827c72e40928a 100644 (file)
@@ -88,8 +88,10 @@ public:
   void set_state_description(int r, const std::string &desc);
 
   void start(Context *on_finish = nullptr,
-            const BootstrapParams *bootstrap_params = nullptr);
-  void stop(Context *on_finish = nullptr);
+            const BootstrapParams *bootstrap_params = nullptr,
+            bool manual = false);
+  void stop(Context *on_finish = nullptr, bool manual = false);
+  void restart(Context *on_finish = nullptr);
   void flush(Context *on_finish = nullptr);
 
   void print_status(Formatter *f, stringstream *ss);
@@ -226,6 +228,7 @@ private:
   librados::AioCompletion *m_update_status_comp = nullptr;
   bool m_update_status_pending = false;
   bool m_stop_requested = false;
+  bool m_manual_stop = false;
 
   AdminSocketHook *m_asok_hook = nullptr;
 
index 5dd59bea3232417a88b666e25bfacb954bb71ef6..98787805593a61fe49c1d5980cdab5c674c7cf8d 100644 (file)
@@ -50,6 +50,45 @@ private:
   Mirror *mirror;
 };
 
+class StartCommand : public MirrorAdminSocketCommand {
+public:
+  explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->start();
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
+class StopCommand : public MirrorAdminSocketCommand {
+public:
+  explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->stop();
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
+class RestartCommand : public MirrorAdminSocketCommand {
+public:
+  explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    mirror->restart();
+    return true;
+  }
+
+private:
+  Mirror *mirror;
+};
+
 class FlushCommand : public MirrorAdminSocketCommand {
 public:
   explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
@@ -79,6 +118,27 @@ public:
       commands[command] = new StatusCommand(mirror);
     }
 
+    command = "rbd mirror start";
+    r = admin_socket->register_command(command, command, this,
+                                      "start rbd mirror");
+    if (r == 0) {
+      commands[command] = new StartCommand(mirror);
+    }
+
+    command = "rbd mirror stop";
+    r = admin_socket->register_command(command, command, this,
+                                      "stop rbd mirror");
+    if (r == 0) {
+      commands[command] = new StopCommand(mirror);
+    }
+
+    command = "rbd mirror restart";
+    r = admin_socket->register_command(command, command, this,
+                                      "restart rbd mirror");
+    if (r == 0) {
+      commands[command] = new RestartCommand(mirror);
+    }
+
     command = "rbd mirror flush";
     r = admin_socket->register_command(command, command, this,
                                       "flush rbd mirror");
@@ -165,7 +225,9 @@ void Mirror::run()
   while (!m_stopping.read()) {
     m_local_cluster_watcher->refresh_pools();
     Mutex::Locker l(m_lock);
-    update_replayers(m_local_cluster_watcher->get_peer_configs());
+    if (!m_manual_stop) {
+      update_replayers(m_local_cluster_watcher->get_peer_configs());
+    }
     // TODO: make interval configurable
     m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
   }
@@ -199,7 +261,24 @@ void Mirror::print_status(Formatter *f, stringstream *ss)
   }
 }
 
-void Mirror::flush()
+void Mirror::start()
+{
+  dout(20) << "enter" << dendl;
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_manual_stop = false;
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->start();
+  }
+}
+
+void Mirror::stop()
 {
   dout(20) << "enter" << dendl;
   Mutex::Locker l(m_lock);
@@ -208,6 +287,40 @@ void Mirror::flush()
     return;
   }
 
+  m_manual_stop = true;
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->stop();
+  }
+}
+
+void Mirror::restart()
+{
+  dout(20) << "enter" << dendl;
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_manual_stop = false;
+
+  for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
+    auto &replayer = it->second;
+    replayer->restart();
+  }
+}
+
+void Mirror::flush()
+{
+  dout(20) << "enter" << dendl;
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read() || m_manual_stop) {
+    return;
+  }
+
   for (auto it = m_replayers.begin(); it != m_replayers.end(); it++) {
     auto &replayer = it->second;
     replayer->flush();
index a23c448e2ba0294b52d91836e9c236a8b9a870c8..298f805198750b59995f74ff8b7ff53b2c2a3255 100644 (file)
@@ -40,6 +40,9 @@ public:
   void handle_signal(int signum);
 
   void print_status(Formatter *f, stringstream *ss);
+  void start();
+  void stop();
+  void restart();
   void flush();
 
 private:
@@ -57,6 +60,7 @@ private:
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
   std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
   atomic_t m_stopping;
+  bool m_manual_stop = false;
   MirrorAdminSocketHook *m_asok_hook;
 };
 
index ed2da4b2ff1159b8cfac430c66c5b28771b92b0b..54a003d7044d7240fa9c515abc5d814552707d40 100644 (file)
@@ -50,6 +50,45 @@ private:
   Replayer *replayer;
 };
 
+class StartCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit StartCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->start();
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
+class StopCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit StopCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->stop();
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
+class RestartCommand : public ReplayerAdminSocketCommand {
+public:
+  explicit RestartCommand(Replayer *replayer) : replayer(replayer) {}
+
+  bool call(Formatter *f, stringstream *ss) {
+    replayer->restart();
+    return true;
+  }
+
+private:
+  Replayer *replayer;
+};
+
 class FlushCommand : public ReplayerAdminSocketCommand {
 public:
   explicit FlushCommand(Replayer *replayer) : replayer(replayer) {}
@@ -80,6 +119,27 @@ public:
       commands[command] = new StatusCommand(replayer);
     }
 
+    command = "rbd mirror start " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "start rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StartCommand(replayer);
+    }
+
+    command = "rbd mirror stop " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "stop rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new StopCommand(replayer);
+    }
+
+    command = "rbd mirror restart " + name;
+    r = admin_socket->register_command(command, command, this,
+                                      "restart rbd mirror " + name);
+    if (r == 0) {
+      commands[command] = new RestartCommand(replayer);
+    }
+
     command = "rbd mirror flush " + name;
     r = admin_socket->register_command(command, command, this,
                                       "flush rbd mirror " + name);
@@ -275,7 +335,9 @@ void Replayer::run()
 
   while (!m_stopping.read()) {
     Mutex::Locker l(m_lock);
-    set_sources(m_pool_watcher->get_images());
+    if (!m_manual_stop) {
+      set_sources(m_pool_watcher->get_images());
+    }
     m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
   }
 
@@ -318,7 +380,28 @@ void Replayer::print_status(Formatter *f, stringstream *ss)
   }
 }
 
-void Replayer::flush()
+void Replayer::start()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_manual_stop = false;
+
+  for (auto it = m_images.begin(); it != m_images.end(); it++) {
+    auto &pool_images = it->second;
+    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+      auto &image_replayer = i->second;
+      image_replayer->start(nullptr, nullptr, true);
+    }
+  }
+}
+
+void Replayer::stop()
 {
   dout(20) << "enter" << dendl;
 
@@ -328,6 +411,48 @@ void Replayer::flush()
     return;
   }
 
+  m_manual_stop = true;
+
+  for (auto it = m_images.begin(); it != m_images.end(); it++) {
+    auto &pool_images = it->second;
+    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+      auto &image_replayer = i->second;
+      image_replayer->stop(nullptr, true);
+    }
+  }
+}
+
+void Replayer::restart()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read()) {
+    return;
+  }
+
+  m_manual_stop = false;
+
+  for (auto it = m_images.begin(); it != m_images.end(); it++) {
+    auto &pool_images = it->second;
+    for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
+      auto &image_replayer = i->second;
+      image_replayer->restart();
+    }
+  }
+}
+
+void Replayer::flush()
+{
+  dout(20) << "enter" << dendl;
+
+  Mutex::Locker l(m_lock);
+
+  if (m_stopping.read() || m_manual_stop) {
+    return;
+  }
+
   for (auto it = m_images.begin(); it != m_images.end(); it++) {
     auto &pool_images = it->second;
     for (auto i = pool_images.begin(); i != pool_images.end(); i++) {
index 37b96606a39c06ef7930138d81ff4244870bac59..0dcd5ed72dc75d7be6162a2ca1cf343b5c717273 100644 (file)
@@ -42,6 +42,9 @@ public:
   void run();
 
   void print_status(Formatter *f, stringstream *ss);
+  void start();
+  void stop();
+  void restart();
   void flush();
 
 private:
@@ -60,6 +63,7 @@ private:
   Mutex m_lock;
   Cond m_cond;
   atomic_t m_stopping;
+  bool m_manual_stop = false;
 
   peer_t m_peer;
   std::vector<const char*> m_args;