]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: stop replay when client is disconnected
authorMykola Golub <mgolub@mirantis.com>
Wed, 27 Jul 2016 10:45:32 +0000 (13:45 +0300)
committerJason Dillaman <dillaman@redhat.com>
Tue, 11 Oct 2016 16:40:53 +0000 (12:40 -0400)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
(cherry picked from commit 330dba00ba3153ba2862eef52714e0dceae05192)

qa/workunits/rbd/rbd_mirror.sh
qa/workunits/rbd/rbd_mirror_helpers.sh
src/test/journal/mock/MockJournaler.h
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc

index 7e01eeabd55f10aa17099e4f8f83e166fcc77f09..d553e7b9fc3d316774c423c6253838c614ef6cc5 100755 (executable)
@@ -232,4 +232,51 @@ wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
 test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+replaying' 'master_position'
 compare_images ${POOL} ${image}
 
+testlog "TEST: client disconnect"
+image=laggy
+create_image ${CLUSTER2} ${POOL} ${image} 128 --journal-object-size 64K
+write_image ${CLUSTER2} ${POOL} ${image} 10
+
+testlog " - replay stopped after disconnect"
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+disconnect_image ${CLUSTER2} ${POOL} ${image}
+test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
+
+testlog " - replay started after resync requested"
+request_resync_image ${CLUSTER1} ${POOL} ${image}
+wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+compare_images ${POOL} ${image}
+
+testlog " - disconnected after max_concurrent_object_sets reached"
+admin_daemon ${CLUSTER1} rbd mirror stop ${POOL}/${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+set_image_meta ${CLUSTER2} ${POOL} ${image} \
+              conf_rbd_journal_max_concurrent_object_sets 1
+write_image ${CLUSTER2} ${POOL} ${image} 20 16384
+write_image ${CLUSTER2} ${POOL} ${image} 20 16384
+test -z "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+set_image_meta ${CLUSTER2} ${POOL} ${image} \
+              conf_rbd_journal_max_concurrent_object_sets 0
+
+testlog " - replay is still stopped (disconnected) after restart"
+admin_daemon ${CLUSTER1} rbd mirror start ${POOL}/${image}
+wait_for_image_replay_stopped ${CLUSTER1} ${POOL} ${image}
+test_status_in_pool_dir ${CLUSTER1} ${POOL} ${image} 'up+error' 'disconnected'
+
+testlog " - replay started after resync requested"
+request_resync_image ${CLUSTER1} ${POOL} ${image}
+wait_for_image_present ${CLUSTER1} ${POOL} ${image} 'deleted'
+wait_for_image_replay_started ${CLUSTER1} ${POOL} ${image}
+wait_for_replay_complete ${CLUSTER1} ${CLUSTER2} ${POOL} ${image}
+test -n "$(get_mirror_position ${CLUSTER2} ${POOL} ${image})"
+compare_images ${POOL} ${image}
+
 echo OK
index b6f6d28d02b4117bc7efe32d761d64c388f9586e..f8f2ccaece1a9d0479209d4c80c6ba84659f8926 100755 (executable)
@@ -381,7 +381,7 @@ get_position()
     local status_log=${TEMPDIR}/${CLUSTER2}-${pool}-${image}.status
     rbd --cluster ${cluster} -p ${pool} journal status --image ${image} |
        tee ${status_log} >&2
-    sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*$/\1/p' \
+    sed -nEe 's/^.*\[id='"${id_regexp}"',.*positions=\[\[([^]]*)\],.*state=connected.*$/\1/p' \
        ${status_log}
 }
 
@@ -451,13 +451,30 @@ test_status_in_pool_dir()
 }
 
 create_image()
+{
+    local cluster=$1 ; shift
+    local pool=$1 ; shift
+    local image=$1 ; shift
+    local size=128
+
+    if [ -n "$1" ]; then
+       size=$1
+       shift
+    fi
+
+    rbd --cluster ${cluster} -p ${pool} create --size ${size} \
+       --image-feature layering,exclusive-lock,journaling $@ ${image}
+}
+
+set_image_meta()
 {
     local cluster=$1
     local pool=$2
     local image=$3
+    local key=$4
+    local val=$5
 
-    rbd --cluster ${cluster} -p ${pool} create --size 128 \
-       --image-feature layering,exclusive-lock,journaling ${image}
+    rbd --cluster ${cluster} -p ${pool} image-meta set ${image} $key $val
 }
 
 remove_image()
@@ -495,6 +512,16 @@ clone_image()
        ${clone_pool}/${clone_image} --image-feature layering,exclusive-lock,journaling
 }
 
+disconnect_image()
+{
+    local cluster=$1
+    local pool=$2
+    local image=$3
+
+    rbd --cluster ${cluster} -p ${pool} journal client disconnect \
+       --image ${image}
+}
+
 create_snapshot()
 {
     local cluster=$1
@@ -577,9 +604,12 @@ write_image()
     local pool=$2
     local image=$3
     local count=$4
+    local size=$5
+
+    test -n "${size}" || size=4096
 
     rbd --cluster ${cluster} -p ${pool} bench-write ${image} \
-       --io-size 4096 --io-threads 1 --io-total $((4096 * count)) \
+       --io-size ${size} --io-threads 1 --io-total $((size * count)) \
        --io-pattern rand
 }
 
index e1998ff0e42b6fbeb8c08bc947f003024740f0ad..a40f234403cbf1245f7a58a524733147079b4ef8 100644 (file)
@@ -102,6 +102,7 @@ struct MockJournaler {
                                           Context*));
 
   MOCK_METHOD2(register_client, void(const bufferlist &, Context *));
+  MOCK_METHOD1(unregister_client, void(Context *));
   MOCK_METHOD3(get_client, void(const std::string &, cls::journal::Client *,
                                 Context *));
   MOCK_METHOD2(get_cached_client, int(const std::string&, cls::journal::Client*));
@@ -159,9 +160,6 @@ struct MockJournalerProxy {
   int register_client(const bufferlist &data) {
     return -EINVAL;
   }
-  void unregister_client(Context *ctx) {
-    ctx->complete(-EINVAL);
-  }
 
   void allocate_tag(uint64_t, const bufferlist &,
                     cls::journal::Tag*, Context *on_finish) {
@@ -196,6 +194,10 @@ struct MockJournalerProxy {
     MockJournaler::get_instance().register_client(data, on_finish);
   }
 
+  void unregister_client(Context *on_finish) {
+    MockJournaler::get_instance().unregister_client(on_finish);
+  }
+
   void get_client(const std::string &client_id, cls::journal::Client *client,
                   Context *on_finish) {
     MockJournaler::get_instance().get_client(client_id, client, on_finish);
index d3002b052c53949cac940fe2ed6a4ec05cf3459e..bb95dff819d9456a1819056ea2543a743c06bc71 100644 (file)
@@ -233,6 +233,9 @@ public:
     std::set<cls::journal::Client>::const_iterator c;
     for (c = registered_clients.begin(); c != registered_clients.end(); c++) {
       std::cout << __func__ << ": client: " << *c << std::endl;
+      if (c->state != cls::journal::CLIENT_STATE_CONNECTED) {
+       continue;
+      }
       cls::journal::ObjectPositions object_positions =
        c->commit_position.object_positions;
       cls::journal::ObjectPositions::const_iterator p =
@@ -822,3 +825,68 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) {
   close_image(ictx);
 }
 
+TEST_F(TestImageReplayer, Disconnect)
+{
+  bootstrap();
+
+  // Test start fails if disconnected
+
+  librbd::ImageCtx *ictx;
+
+  generate_test_data();
+  open_remote_image(&ictx);
+  for (int i = 0; i < TEST_IO_COUNT; ++i) {
+    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+  }
+  flush(ictx);
+  close_image(ictx);
+
+  std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
+  ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
+       m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
+
+  C_SaferCond cond1;
+  m_replayer->start(&cond1);
+  ASSERT_EQ(-ENOTCONN, cond1.wait());
+
+  // Test start succeeds after resync
+
+  open_local_image(&ictx);
+  librbd::Journal<>::request_resync(ictx);
+  close_image(ictx);
+  C_SaferCond cond2;
+  m_replayer->start(&cond2);
+  ASSERT_EQ(-ENOTCONN, cond2.wait());
+  C_SaferCond delete_cond;
+  m_image_deleter->wait_for_scheduled_deletion(
+    m_local_ioctx.get_id(), m_replayer->get_global_image_id(), &delete_cond);
+  EXPECT_EQ(0, delete_cond.wait());
+
+  start();
+  wait_for_replay_complete();
+
+  // Test replay stopped after disconnect
+
+  open_remote_image(&ictx);
+  for (int i = TEST_IO_COUNT; i < 2 * TEST_IO_COUNT; ++i) {
+    write_test_data(ictx, m_test_data, TEST_IO_SIZE * i, TEST_IO_SIZE);
+  }
+  flush(ictx);
+  close_image(ictx);
+
+  ASSERT_EQ(0, cls::journal::client::client_update_state(m_remote_ioctx, oid,
+       m_local_mirror_uuid, cls::journal::CLIENT_STATE_DISCONNECTED));
+  bufferlist bl;
+  ASSERT_EQ(0, m_remote_ioctx.notify2(oid, bl, 5000, NULL));
+
+  wait_for_stopped();
+
+  // Test start fails after disconnect
+
+  C_SaferCond cond3;
+  m_replayer->start(&cond3);
+  ASSERT_EQ(-ENOTCONN, cond3.wait());
+  C_SaferCond cond4;
+  m_replayer->start(&cond4);
+  ASSERT_EQ(-ENOTCONN, cond4.wait());
+}
index 83c30a29c2e9d0873421f69cfc7fec183469d129..58df5d081b11173ddc86eb7b326c210756b6a1af 100644 (file)
@@ -250,6 +250,15 @@ void ImageReplayer<I>::BootstrapProgressContext::update_progress(
   }
 }
 
+template <typename I>
+void ImageReplayer<I>::RemoteJournalerListener::handle_update(
+  ::journal::JournalMetadata *) {
+  FunctionContext *ctx = new FunctionContext([this](int r) {
+      replayer->handle_remote_journal_metadata_updated();
+    });
+  replayer->m_threads->work_queue->queue(ctx, 0);
+}
+
 template <typename I>
 ImageReplayer<I>::ImageReplayer(Threads *threads,
                              shared_ptr<ImageDeleter> image_deleter,
@@ -276,7 +285,8 @@ ImageReplayer<I>::ImageReplayer(Threads *threads,
   m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
         remote_image_id),
   m_progress_cxt(this),
-  m_resync_listener(new ResyncListener<I>(this))
+  m_resync_listener(new ResyncListener<I>(this)),
+  m_remote_listener(this)
 {
   // Register asok commands using a temporary "remote_pool_name/global_image_id"
   // name.  When the image name becomes known on start the asok commands will be
@@ -508,6 +518,23 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
     return;
   }
 
+  m_remote_journaler->add_listener(&m_remote_listener);
+
+  cls::journal::Client client;
+  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
+  if (r < 0) {
+    derr << "error retrieving remote journal client: " << cpp_strerror(r)
+        << dendl;
+    on_start_fail(r, "error retrieving remote journal client");
+    return;
+  }
+
+  if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+    dout(5) << "client flagged disconnected, stopping image replay" << dendl;
+    on_start_fail(-ENOTCONN, "disconnected");
+    return;
+  }
+
   start_replay();
 }
 
@@ -637,15 +664,18 @@ bool ImageReplayer<I>::on_start_interrupted()
 }
 
 template <typename I>
-void ImageReplayer<I>::stop(Context *on_finish, bool manual)
+void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
+                           const std::string& desc)
 {
-  dout(20) << "on_finish=" << on_finish << dendl;
+  dout(20) << "on_finish=" << on_finish << ", manual=" << manual
+          << ", desc=" << desc << dendl;
 
   image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
   bool shut_down_replay = false;
   bool running = true;
   {
     Mutex::Locker locker(m_lock);
+
     if (!is_running_()) {
       running = false;
     } else {
@@ -684,14 +714,14 @@ void ImageReplayer<I>::stop(Context *on_finish, bool manual)
   }
 
   if (shut_down_replay) {
-    on_stop_journal_replay();
+    on_stop_journal_replay(r, desc);
   } else if (on_finish != nullptr) {
     on_finish->complete(0);
   }
 }
 
 template <typename I>
-void ImageReplayer<I>::on_stop_journal_replay()
+void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
 {
   dout(20) << "enter" << dendl;
 
@@ -705,7 +735,7 @@ void ImageReplayer<I>::on_stop_journal_replay()
     m_state = STATE_STOPPING;
   }
 
-  set_state_description(0, "");
+  set_state_description(r, desc);
   update_mirror_image_status(false, boost::none);
   reschedule_update_status_task(-1);
   shut_down(0);
@@ -1351,6 +1381,7 @@ void ImageReplayer<I>::shut_down(int r) {
         ctx->complete(0);
       });
     ctx = new FunctionContext([this, ctx](int r) {
+       m_remote_journaler->remove_listener(&m_remote_listener);
         m_remote_journaler->shut_down(ctx);
       });
     if (m_stopping_for_resync) {
@@ -1451,6 +1482,30 @@ void ImageReplayer<I>::handle_shut_down(int r) {
   }
 }
 
+template <typename I>
+void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
+  dout(20) << dendl;
+
+  cls::journal::Client client;
+  {
+    Mutex::Locker locker(m_lock);
+    if (!is_running_()) {
+      return;
+    }
+
+    int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
+    if (r < 0) {
+      derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+      return;
+    }
+  }
+
+  if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
+    dout(0) << "client flagged disconnected, stopping image replay" << dendl;
+    stop(nullptr, false, -ENOTCONN, "disconnected");
+  }
+}
+
 template <typename I>
 std::string ImageReplayer<I>::to_string(const State state) {
   switch (state) {
index 1b1a742ff64dab4d6e3b90ef2fe8183cdb672098..36ca97d66ea57bd6c45da336d4fddf1640c94ca0 100644 (file)
@@ -14,6 +14,7 @@
 #include "include/rados/librados.hpp"
 #include "cls/journal/cls_journal_types.h"
 #include "cls/rbd/cls_rbd_types.h"
+#include "journal/JournalMetadataListener.h"
 #include "journal/ReplayEntry.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/journal/Types.h"
@@ -111,7 +112,8 @@ public:
   }
 
   void start(Context *on_finish = nullptr, bool manual = false);
-  void stop(Context *on_finish = nullptr, bool manual = false);
+  void stop(Context *on_finish = nullptr, bool manual = false,
+           int r = 0, const std::string& desc = "");
   void restart(Context *on_finish = nullptr);
   void flush(Context *on_finish = nullptr);
 
@@ -190,7 +192,7 @@ protected:
   virtual void on_start_fail(int r, const std::string &desc = "");
   virtual bool on_start_interrupted();
 
-  virtual void on_stop_journal_replay();
+  virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
 
   virtual void on_flush_local_replay_flush_start(Context *on_flush);
   virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
@@ -270,6 +272,14 @@ private:
   librbd::journal::TagData m_replay_tag_data;
   librbd::journal::EventEntry m_event_entry;
 
+  struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
+    ImageReplayer *replayer;
+
+    RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
+
+    void handle_update(::journal::JournalMetadata *);
+  } m_remote_listener;
+
   struct C_ReplayCommitted : public Context {
     ImageReplayer *replayer;
     ReplayEntry replay_entry;
@@ -309,6 +319,7 @@ private:
 
   void shut_down(int r);
   void handle_shut_down(int r);
+  void handle_remote_journal_metadata_updated();
 
   void bootstrap();
   void handle_bootstrap(int r);
index 9fd57339f46342d9cc2cf1c99d581307848ab920..db83a4a068a437d4b8cf0dcc3408112ec4d52fb4 100644 (file)
@@ -333,6 +333,12 @@ void BootstrapRequest<I>::handle_open_local_image(int r) {
     m_ret_val = r;
     close_remote_image();
     return;
+  } if (m_client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+    dout(10) << ": client flagged disconnected -- skipping bootstrap" << dendl;
+    // The caller is expected to detect disconnect initializing remote journal.
+    m_ret_val = 0;
+    close_remote_image();
+    return;
   }
 
   update_client();