]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: ImageReplayer async start/stop
authorMykola Golub <mgolub@mirantis.com>
Tue, 23 Feb 2016 07:02:00 +0000 (09:02 +0200)
committerMykola Golub <mgolub@mirantis.com>
Fri, 11 Mar 2016 14:12:00 +0000 (16:12 +0200)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/test/rbd_mirror/image_replay.cc
src/test/rbd_mirror/test_ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h

index 66fcd610856cecff8c46fbfdf20d6145ceef1c4f..87d449717a96ef2015a84a8e93ab55ebb8a02fe3 100644 (file)
@@ -132,6 +132,8 @@ int main(int argc, const char **argv)
   rbd::mirror::RadosRef remote(new librados::Rados());
   rbd::mirror::Threads *threads = nullptr;
 
+  C_SaferCond start_cond, stop_cond;
+
   int r = local->init_with_context(g_ceph_context);
   if (r < 0) {
     derr << "could not initialize rados handle" << dendl;
@@ -187,7 +189,8 @@ int main(int argc, const char **argv)
                                            local_pool_id, remote_pool_id,
                                            remote_image_id);
 
-  r = replayer->start(&bootstap_params);
+  replayer->start(&start_cond, &bootstap_params);
+  r = start_cond.wait();
   if (r < 0) {
     derr << "failed to start: " << cpp_strerror(r) << dendl;
     goto cleanup;
@@ -201,7 +204,9 @@ int main(int argc, const char **argv)
 
   dout(1) << "termination signal received, stopping replay" << dendl;
 
-  replayer->stop();
+  replayer->stop(&stop_cond);
+  r = stop_cond.wait();
+  assert(r == 0);
 
   dout(1) << "shutdown" << dendl;
 
index 3ab535c55e392100396c4aa179a2270486bcf6d8..c0bdef561956586700821786966160d91888015e 100644 (file)
@@ -101,7 +101,6 @@ public:
 
     m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
       m_local_ioctx.cct()));
-
     m_replayer = new rbd::mirror::ImageReplayer(
       m_threads,
       rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
@@ -123,7 +122,9 @@ public:
   void start(rbd::mirror::ImageReplayer::BootstrapParams *bootstap_params =
             nullptr)
   {
-    ASSERT_EQ(0, m_replayer->start(bootstap_params));
+    C_SaferCond cond;
+    m_replayer->start(&cond, bootstap_params);
+    ASSERT_EQ(0, cond.wait());
 
     ASSERT_EQ(0U, m_watch_handle);
     std::string oid = ::journal::Journaler::header_oid(m_remote_image_id);
@@ -140,7 +141,9 @@ public:
       m_watch_handle = 0;
     }
 
-    m_replayer->stop();
+    C_SaferCond cond;
+    m_replayer->stop(&cond);
+    ASSERT_EQ(0, cond.wait());
   }
 
   void bootstrap()
index 0afea699fd958c251f941b20168aad7901f16940..3148b6dc05e7918db6b6cc2d4c5bad4a3c092ed5 100644 (file)
@@ -65,6 +65,30 @@ struct C_ReplayCommitted : public Context {
   }
 };
 
+class BootstrapThread : public Thread {
+public:
+  explicit BootstrapThread(ImageReplayer *replayer,
+                          const ImageReplayer::BootstrapParams &params,
+                          Context *on_finish)
+    : replayer(replayer), params(params), on_finish(on_finish) {}
+
+  virtual ~BootstrapThread() {}
+
+protected:
+  void *entry()
+  {
+    int r = replayer->bootstrap(params);
+    on_finish->complete(r);
+    delete this;
+    return NULL;
+  }
+
+private:
+  ImageReplayer *replayer;
+  ImageReplayer::BootstrapParams params;
+  Context *on_finish;
+};
+
 class ImageReplayerAdminSocketCommand {
 public:
   virtual ~ImageReplayerAdminSocketCommand() {}
@@ -179,7 +203,8 @@ ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
   m_local_image_ctx(nullptr),
   m_local_replay(nullptr),
   m_remote_journaler(nullptr),
-  m_replay_handler(nullptr)
+  m_replay_handler(nullptr),
+  m_on_finish(nullptr)
 {
   CephContext *cct = static_cast<CephContext *>(m_local->cct());
 
@@ -198,110 +223,298 @@ ImageReplayer::~ImageReplayer()
   delete m_asok_hook;
 }
 
-int ImageReplayer::start(const BootstrapParams *bootstrap_params)
+void ImageReplayer::start(Context *on_finish,
+                         const BootstrapParams *bootstrap_params)
 {
-  // TODO: make async
-
-  dout(20) << "enter" << dendl;
+  dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
+          << dendl;
 
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPED);
+    assert(is_stopped_());
 
     m_state = STATE_STARTING;
-  }
 
-  std::string remote_journal_id = m_remote_image_id;
-  std::string image_name = "";
-  C_SaferCond cond, lock_ctx;
-  double commit_interval;
-  bool registered;
-  int r = 0;
+    assert(m_on_finish == nullptr);
+    m_on_finish = on_finish;
+  }
 
-  r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
+  int r = m_remote->ioctx_create2(m_remote_pool_id, m_remote_ioctx);
   if (r < 0) {
     derr << "error opening ioctx for remote pool " << m_remote_pool_id
         << ": " << cpp_strerror(r) << dendl;
-    return r;
+    on_start_fail_start(r);
+    return;
   }
 
   CephContext *cct = static_cast<CephContext *>(m_local->cct());
 
-  commit_interval = cct->_conf->rbd_journal_commit_age;
-  bool remote_journaler_initialized = false;
+  double commit_interval = cct->_conf->rbd_journal_commit_age;
   m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
-                                                m_threads->timer,
-                                                &m_threads->timer_lock,
-                                                m_remote_ioctx,
-                                               remote_journal_id,
-                                               m_client_id, commit_interval);
-  r = get_registered_client_status(&registered);
+                                               m_threads->timer,
+                                               &m_threads->timer_lock,
+                                               m_remote_ioctx,
+                                               m_remote_image_id, m_client_id,
+                                               commit_interval);
+
+  on_start_get_registered_client_status_start(bootstrap_params);
+}
+
+void ImageReplayer::on_start_get_registered_client_status_start(
+  const BootstrapParams *bootstrap_params)
+{
+  dout(20) << "enter" << dendl;
+
+  struct Metadata {
+    uint64_t minimum_set;
+    uint64_t active_set;
+    std::set<cls::journal::Client> registered_clients;
+    BootstrapParams bootstrap_params;
+  } *m = new Metadata();
+
+  if (bootstrap_params) {
+    m->bootstrap_params = *bootstrap_params;
+  }
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, m, bootstrap_params](int r) {
+      on_start_get_registered_client_status_finish(r, m->registered_clients,
+                                                  m->bootstrap_params);
+      delete m;
+    });
+
+  m_remote_journaler->get_mutable_metadata(&m->minimum_set, &m->active_set,
+                                          &m->registered_clients, ctx);
+}
+
+void ImageReplayer::on_start_get_registered_client_status_finish(int r,
+  const std::set<cls::journal::Client> &registered_clients,
+  const BootstrapParams &bootstrap_params)
+{
+  dout(20) << "r=" << r << dendl;
+
   if (r < 0) {
     derr << "error obtaining registered client status: "
         << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(r);
+    return;
+  }
+  if (on_start_interrupted()) {
+    return;
   }
 
-  if (registered) {
-    if (bootstrap_params) {
-      dout(0) << "ignoring bootsrap params: client already registered" << dendl;
-    }
-  } else {
-    r = bootstrap(bootstrap_params);
-    if (r < 0) {
-      derr << "bootstrap failed: " << cpp_strerror(r) << dendl;
-      goto fail;
+  for (auto c : registered_clients) {
+    if (c.id == m_client_id) {
+      librbd::journal::ClientData client_data;
+      bufferlist::iterator bl = c.data.begin();
+      try {
+       ::decode(client_data, bl);
+      } catch (const buffer::error &err) {
+       derr << "failed to decode client meta data: " << err.what() << dendl;
+       on_start_fail_start(-EINVAL);
+       return;
+      }
+      librbd::journal::MirrorPeerClientMeta &cm =
+       boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
+      m_local_image_id = cm.image_id;
+
+      // TODO: snap name should be transient
+      if (cm.sync_points.empty()) {
+       derr << "sync points not found" << dendl;
+       on_start_fail_start(-ENOENT);
+       return;
+      }
+      m_snap_name = cm.sync_points.front().snap_name;
+
+      dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
+              << m_local_image_id << ", snap_name=" << m_snap_name << dendl;
+
+      if (!bootstrap_params.empty()) {
+       dout(0) << "ignoring bootsrap params: client already registered" << dendl;
+      }
+
+      on_start_bootstrap_finish(0);
+      return;
     }
   }
 
-  m_remote_journaler->init(&cond);
-  r = cond.wait();
+  dout(20) << "client not found" << dendl;
+
+  on_start_bootstrap_start(bootstrap_params);
+}
+
+void ImageReplayer::on_start_bootstrap_start(const BootstrapParams &params)
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_start_bootstrap_finish(r);
+    });
+
+  BootstrapThread *thread = new BootstrapThread(this, params, ctx);
+
+  thread->create("bootstrap");
+  thread->detach();
+  // TODO: As the bootstrap might take long time it needs some control
+  // to get current status, interrupt, etc...
+}
+
+void ImageReplayer::on_start_bootstrap_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    on_start_fail_start(r);
+    return;
+  }
+  if (on_start_interrupted()) {
+    return;
+  }
+
+  on_start_remote_journaler_init_start();
+}
+
+void ImageReplayer::on_start_remote_journaler_init_start()
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_start_remote_journaler_init_finish(r);
+    });
+
+  m_remote_journaler->init(ctx);
+}
+
+void ImageReplayer::on_start_remote_journaler_init_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
   if (r < 0) {
     derr << "error initializing journal: " << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(r);
+    return;
+  }
+  if (on_start_interrupted()) {
+    return;
   }
-  remote_journaler_initialized = true;
 
   r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
   if (r < 0) {
     derr << "error opening ioctx for local pool " << m_local_pool_id
         << ": " << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(r);
+    return;
   }
 
+  on_start_local_image_open_start();
+}
+
+void ImageReplayer::on_start_local_image_open_start()
+{
+  dout(20) << "enter" << dendl;
+
   m_local_image_ctx = new librbd::ImageCtx("", m_local_image_id, NULL,
                                           m_local_ioctx, false);
-  r = m_local_image_ctx->state->open();
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_start_local_image_open_finish(r);
+    });
+  m_local_image_ctx->state->open(ctx);
+}
+
+void ImageReplayer::on_start_local_image_open_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
   if (r < 0) {
     derr << "error opening local image " <<  m_local_image_id
         << ": " << cpp_strerror(r) << dendl;
-    delete m_local_image_ctx;
-    m_local_image_ctx = nullptr;
-    goto fail;
-  }
 
-  {
-    RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock);
-    m_local_image_ctx->exclusive_lock->request_lock(&lock_ctx);
+    FunctionContext *ctx = new FunctionContext(
+      [this, r](int r1) {
+       assert(r1 == 0);
+       delete m_local_image_ctx;
+       m_local_image_ctx = nullptr;
+       on_start_fail_start(r);
+      });
+
+    m_threads->work_queue->queue(ctx, 0);
+    return;
   }
-  r = lock_ctx.wait();
+  if (on_start_interrupted()) {
+    return;
+  }
+
+  on_start_local_image_lock_start();
+}
+
+void ImageReplayer::on_start_local_image_lock_start()
+{
+  dout(20) << "enter" << dendl;
+
+  RWLock::WLocker owner_locker(m_local_image_ctx->owner_lock);
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_start_local_image_lock_finish(r);
+    });
+  m_local_image_ctx->exclusive_lock->request_lock(ctx);
+}
+
+void ImageReplayer::on_start_local_image_lock_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
   if (r < 0) {
     derr << "error to lock exclusively local image " <<  m_local_image_id
         << ": " << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(r);
+    return;
   }
-
   if (m_local_image_ctx->journal == nullptr) {
-    derr << "journaling is not enabled on local image " <<  m_local_image_id
-        << ": " << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(-EINVAL);
+    return;
+  }
+  if (on_start_interrupted()) {
+    return;
+  }
+
+  on_start_wait_for_local_journal_ready_start();
+}
+
+void ImageReplayer::on_start_wait_for_local_journal_ready_start()
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_start_wait_for_local_journal_ready_finish(r);
+    });
+  m_local_image_ctx->journal->wait_for_journal_ready(ctx);
+}
+
+void ImageReplayer::on_start_wait_for_local_journal_ready_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "error when waiting for local journal ready: " << cpp_strerror(r)
+        << dendl;
+    on_start_fail_start(r);
+    return;
+  }
+  if (on_start_interrupted()) {
+    return;
   }
 
   r = m_local_image_ctx->journal->start_external_replay(&m_local_replay);
   if (r < 0) {
     derr << "error starting external replay on local image "
         <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
-    goto fail;
+    on_start_fail_start(r);
+    return;
   }
 
   m_replay_handler = new ReplayHandler(this);
@@ -311,21 +524,51 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params)
 
   dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
 
+  assert(r == 0);
+
+  Context *on_finish(nullptr);
+
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_STARTING);
 
+    if (m_state == STATE_STOPPING) {
+      on_start_fail_start(-EINTR);
+      return;
+    }
+
+    assert(m_state == STATE_STARTING);
     m_state = STATE_REPLAYING;
+
+    std::swap(m_on_finish, on_finish);
   }
 
-  return 0;
+  dout(20) << "start succeeded" << dendl;
+
+  if (on_finish) {
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
+  }
+}
 
-fail:
-  dout(20) << "fail, r=" << r << dendl;
+void ImageReplayer::on_start_fail_start(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, r](int r1) {
+      assert(r1 == 0);
+      on_start_fail_finish(r);
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+void ImageReplayer::on_start_fail_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
 
   if (m_remote_journaler) {
-    if (remote_journaler_initialized) {
-      m_remote_journaler->stop_replay();
+    if (m_remote_journaler->is_initialized()) {
       m_remote_journaler->shut_down();
     }
     delete m_remote_journaler;
@@ -333,7 +576,6 @@ fail:
   }
 
   if (m_local_replay) {
-    Mutex::Locker locker(m_lock);
     shut_down_journal_replay(true);
     m_local_image_ctx->journal->stop_external_replay();
     m_local_replay = nullptr;
@@ -357,34 +599,137 @@ fail:
   m_local_ioctx.close();
   m_remote_ioctx.close();
 
+  Context *on_finish(nullptr);
+
   {
     Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_STARTING);
+    if (m_state == STATE_STOPPING) {
+      assert(r == -EINTR);
+      dout(20) << "start interrupted" << dendl;
+      m_state = STATE_STOPPED;
+    } else {
+      assert(m_state == STATE_STARTING);
+      dout(20) << "start failed" << dendl;
+      m_state = STATE_UNINITIALIZED;
+    }
+    std::swap(m_on_finish, on_finish);
+  }
 
-    m_state = STATE_UNINITIALIZED;
+  if (on_finish) {
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
   }
+}
 
-  return r;
+bool ImageReplayer::on_start_interrupted()
+{
+  Mutex::Locker locker(m_lock);
+
+  if (m_state == STATE_STARTING) {
+    return false;
+  }
+
+  assert(m_state == STATE_STOPPING);
+
+  on_start_fail_start(-EINTR);
+  return true;
 }
 
-void ImageReplayer::stop()
+void ImageReplayer::stop(Context *on_finish)
 {
-  dout(20) << "enter" << dendl;
+  dout(20) << "on_finish=" << on_finish << ", m_on_finish=" << m_on_finish
+          << dendl;
 
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_state == STATE_REPLAYING);
+  Mutex::Locker locker(m_lock);
+  assert(is_running_());
+
+  if (m_state == STATE_STARTING) {
+    dout(20) << "interrupting start" << dendl;
 
-    m_state = STATE_STOPPING;
+    if (on_finish) {
+      Context *on_start_finish = m_on_finish;
+      FunctionContext *ctx = new FunctionContext(
+       [this, on_start_finish, on_finish](int r) {
+         if (on_start_finish) {
+           on_start_finish->complete(r);
+         }
+         on_finish->complete(0);
+       });
+
+      m_on_finish = ctx;
+    }
+  } else {
+    assert(m_on_finish == nullptr);
+    m_on_finish = on_finish;
+    on_stop_journal_replay_shut_down_start();
   }
+  m_state = STATE_STOPPING;
+}
 
-  shut_down_journal_replay(false);
+void ImageReplayer::on_stop_journal_replay_shut_down_start()
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_stop_journal_replay_shut_down_finish(r);
+    });
+
+  m_local_replay->shut_down(false, ctx);
+}
+
+void ImageReplayer::on_stop_journal_replay_shut_down_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "error flushing journal replay: " << cpp_strerror(r) << dendl;
+  }
 
   m_local_image_ctx->journal->stop_external_replay();
   m_local_replay = nullptr;
 
-  m_local_image_ctx->state->close();
-  m_local_image_ctx = nullptr;
+  on_stop_local_image_close_start();
+}
+
+void ImageReplayer::on_stop_local_image_close_start()
+{
+  dout(20) << "enter" << dendl;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      on_stop_local_image_close_finish(r);
+    });
+
+  m_local_image_ctx->state->close(ctx);
+}
+
+void ImageReplayer::on_stop_local_image_close_finish(int r)
+{
+  dout(20) << "r=" << r << dendl;
+
+  if (r < 0) {
+    derr << "error closing local image: " << cpp_strerror(r) << dendl;
+  }
+
+  on_stop_local_image_delete_start();
+}
+
+void ImageReplayer::on_stop_local_image_delete_start()
+{
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      delete m_local_image_ctx;
+      m_local_image_ctx = nullptr;
+      on_stop_local_image_delete_finish(r);
+    });
+
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+void ImageReplayer::on_stop_local_image_delete_finish(int r)
+{
+  assert(r == 0);
 
   m_local_ioctx.close();
 
@@ -398,14 +743,47 @@ void ImageReplayer::stop()
 
   m_remote_ioctx.close();
 
+  Context *on_finish(nullptr);
+
   {
     Mutex::Locker locker(m_lock);
     assert(m_state == STATE_STOPPING);
 
     m_state = STATE_STOPPED;
+
+    std::swap(m_on_finish, on_finish);
   }
 
-  dout(20) << "done" << dendl;
+  dout(20) << "stop complete" << dendl;
+
+  if (on_finish) {
+    dout(20) << "on finish complete, r=" << r << dendl;
+    on_finish->complete(r);
+  }
+}
+
+void ImageReplayer::close_local_image(Context *on_finish)
+{
+  m_local_image_ctx->state->close(on_finish);
+}
+
+void ImageReplayer::handle_replay_ready()
+{
+  dout(20) << "enter" << dendl;
+
+  ::journal::ReplayEntry replay_entry;
+  if (!m_remote_journaler->try_pop_front(&replay_entry)) {
+    return;
+  }
+
+  dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
+
+  bufferlist data = replay_entry.get_data();
+  bufferlist::iterator it = data.begin();
+  Context *on_ready = create_context_callback<
+    ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this);
+  Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry));
+  m_local_replay->process(&it, on_ready, on_commit);
 }
 
 int ImageReplayer::flush()
@@ -451,25 +829,6 @@ int ImageReplayer::flush()
   return r < 0 ? r : r1;
 }
 
-void ImageReplayer::handle_replay_ready()
-{
-  dout(20) << "enter" << dendl;
-
-  ::journal::ReplayEntry replay_entry;
-  if (!m_remote_journaler->try_pop_front(&replay_entry)) {
-    return;
-  }
-
-  dout(20) << "processing entry tid=" << replay_entry.get_commit_tid() << dendl;
-
-  bufferlist data = replay_entry.get_data();
-  bufferlist::iterator it = data.begin();
-  Context *on_ready = create_context_callback<
-    ImageReplayer, &ImageReplayer::handle_replay_process_ready>(this);
-  Context *on_commit = new C_ReplayCommitted(this, std::move(replay_entry));
-  m_local_replay->process(&it, on_ready, on_commit);
-}
-
 void ImageReplayer::handle_replay_process_ready(int r)
 {
   // journal::Replay is ready for more events -- attempt to pop another
@@ -502,60 +861,10 @@ void ImageReplayer::handle_replay_committed(
   m_remote_journaler->committed(*replay_entry);
 }
 
-int ImageReplayer::get_registered_client_status(bool *registered)
-{
-  dout(20) << "enter" << dendl;
-
-  uint64_t minimum_set;
-  uint64_t active_set;
-  std::set<cls::journal::Client> registered_clients;
-  C_SaferCond cond;
-  m_remote_journaler->get_mutable_metadata(&minimum_set, &active_set,
-                                          &registered_clients, &cond);
-  int r = cond.wait();
-  if (r < 0) {
-    derr << "error retrieving remote journal registered clients: "
-        << cpp_strerror(r) << dendl;
-    return r;
-  }
-
-  for (auto c : registered_clients) {
-    if (c.id == m_client_id) {
-      *registered = true;
-      librbd::journal::ClientData client_data;
-      bufferlist::iterator bl = c.data.begin();
-      try {
-       ::decode(client_data, bl);
-      } catch (const buffer::error &err) {
-       derr << "failed to decode client meta data: " << err.what() << dendl;
-       return -EINVAL;
-      }
-      librbd::journal::MirrorPeerClientMeta &cm =
-       boost::get<librbd::journal::MirrorPeerClientMeta>(client_data.client_meta);
-      m_local_image_id = cm.image_id;
-
-      // TODO: snap name should be transient
-      if (cm.sync_points.empty()) {
-        return -ENOENT;
-      }
-      m_snap_name = cm.sync_points.front().snap_name;
-
-      dout(20) << "client found, pool_id=" << m_local_pool_id << ", image_id="
-              << m_local_image_id << ", snap_name=" << m_snap_name << dendl;
-      return 0;
-    }
-  }
-
-  dout(20) << "client not found" << dendl;
-
-  *registered = false;
-  return 0;
-}
-
 int ImageReplayer::register_client()
 {
   // TODO allocate snap as part of sync process
-  std::string m_snap_name = ".rbd-mirror." + m_client_id;
+  m_snap_name = ".rbd-mirror." + m_client_id;
 
   dout(20) << "mirror_uuid=" << m_client_id << ", "
            << "image_id=" << m_local_image_id << ", "
@@ -589,7 +898,7 @@ int ImageReplayer::get_bootrstap_params(BootstrapParams *params)
   return 0;
 }
 
-int ImageReplayer::bootstrap(const BootstrapParams *bootstrap_params)
+int ImageReplayer::bootstrap(const BootstrapParams &bootstrap_params)
 {
   // Register client and sync images
 
@@ -598,13 +907,13 @@ int ImageReplayer::bootstrap(const BootstrapParams *bootstrap_params)
   int r;
   BootstrapParams params;
 
-  if (bootstrap_params) {
+  if (!bootstrap_params.empty()) {
     dout(20) << "using external bootstrap params" << dendl;
-    params = *bootstrap_params;
+    params = bootstrap_params;
   } else {
     r = get_bootrstap_params(&params);
     if (r < 0) {
-      derr << "error obtaining bootrstap parameters: "
+      derr << "error obtaining bootstrap parameters: "
           << cpp_strerror(r) << dendl;
       return r;
     }
index 7e9a7e3b2168ae8b2c98dd5dea15907010154ef2..f3c91768b774d8a2e1bb1bda7ac2eb40e551327a 100644 (file)
 #include "common/Mutex.h"
 #include "common/WorkQueue.h"
 #include "include/rados/librados.hpp"
+#include "cls/journal/cls_journal_types.h"
 #include "types.h"
 
-class ContextWQ;
-
 namespace journal {
 
 class Journaler;
@@ -64,6 +63,10 @@ public:
                    const std::string local_image_name) :
       local_pool_name(local_pool_name),
       local_image_name(local_image_name) {}
+
+    bool empty() const {
+      return local_pool_name.empty() && local_image_name.empty();
+    }
   };
 
 public:
@@ -74,24 +77,107 @@ public:
   ImageReplayer(const ImageReplayer&) = delete;
   ImageReplayer& operator=(const ImageReplayer&) = delete;
 
-  State get_state() { return m_state; }
-
-  int start(const BootstrapParams *bootstrap_params = nullptr);
-  void stop();
+  State get_state() { Mutex::Locker l(m_lock); return get_state_(); }
+  bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
+  bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
 
+  void start(Context *on_finish = nullptr,
+            const BootstrapParams *bootstrap_params = nullptr);
+  void stop(Context *on_finish = nullptr);
   int flush();
 
+  int bootstrap(const BootstrapParams &bootstrap_params);
+
   virtual void handle_replay_ready();
   virtual void handle_replay_process_ready(int r);
   virtual void handle_replay_complete(int r);
 
   virtual void handle_replay_committed(::journal::ReplayEntry* replay_entry, int r);
 
+protected:
+  /**
+   * @verbatim
+   *                   (error)
+   * <uninitialized> <------------------- FAIL
+   *    |                                  ^
+   *    v                                  |
+   * <starting>                            |
+   *    |                                  |
+   *    v                         (error)  |
+   * GET_REGISTERED_CLIENT_STATUS -------->|
+   *    |                                  |
+   *    v                           (error)|
+   * BOOTSTRAP (skip if not needed) ------>|
+   *    |                                  |
+   *    v                  (error)         |
+   * REMOTE_JOURNALER_INIT --------------->|
+   *    |                                  |
+   *    v             (error)              |
+   * LOCAL_IMAGE_OPEN -------------------->|
+   *    |                                  |
+   *    v             (error)              |
+   * LOCAL_IMAGE_LOCK -------------------->|
+   *    |                                  |
+   *    v                         (error)  |
+   * WAIT_FOR_LOCAL_JOURNAL_READY -------->/
+   *    |
+   *    v
+   * <replaying>
+   *    |
+   *    v
+   * <stopping>
+   *    |
+   *    v
+   * JOURNAL_REPLAY_SHUT_DOWN
+   *    |
+   *    v
+   * LOCAL_IMAGE_CLOSE
+   *    |
+   *    v
+   * LOCAL_IMAGE_DELETE
+   *    |
+   *    v
+   * <stopped>
+   *
+   * @endverbatim
+   */
+
+  virtual void on_start_get_registered_client_status_start(
+    const BootstrapParams *bootstrap_params);
+  virtual void on_start_get_registered_client_status_finish(int r,
+    const std::set<cls::journal::Client> &registered_clients,
+    const BootstrapParams &bootstrap_params);
+  virtual void on_start_bootstrap_start(const BootstrapParams &params);
+  virtual void on_start_bootstrap_finish(int r);
+  virtual void on_start_remote_journaler_init_start();
+  virtual void on_start_remote_journaler_init_finish(int r);
+  virtual void on_start_local_image_open_start();
+  virtual void on_start_local_image_open_finish(int r);
+  virtual void on_start_local_image_lock_start();
+  virtual void on_start_local_image_lock_finish(int r);
+  virtual void on_start_wait_for_local_journal_ready_start();
+  virtual void on_start_wait_for_local_journal_ready_finish(int r);
+  virtual void on_start_fail_start(int r);
+  virtual void on_start_fail_finish(int r);
+  virtual bool on_start_interrupted();
+
+  virtual void on_stop_journal_replay_shut_down_start();
+  virtual void on_stop_journal_replay_shut_down_finish(int r);
+  virtual void on_stop_local_image_close_start();
+  virtual void on_stop_local_image_close_finish(int r);
+  virtual void on_stop_local_image_delete_start();
+  virtual void on_stop_local_image_delete_finish(int r);
+
+  void close_local_image(Context *on_finish); // for tests
+
 private:
-  int get_registered_client_status(bool *registered);
-  int register_client();
+  State get_state_() const { return m_state; }
+  bool is_stopped_() const { return m_state == STATE_UNINITIALIZED ||
+                                    m_state == STATE_STOPPED; }
+  bool is_running_() const { return !is_stopped_() && m_state != STATE_STOPPING; }
+
   int get_bootrstap_params(BootstrapParams *params);
-  int bootstrap(const BootstrapParams *bootstrap_params);
+  int register_client();
   int create_local_image(const BootstrapParams &bootstrap_params);
   int get_image_id(librados::IoCtx &ioctx, const std::string &image_name,
                   std::string *image_id);
@@ -108,6 +194,7 @@ private:
   int64_t m_remote_pool_id, m_local_pool_id;
   std::string m_remote_image_id, m_local_image_id;
   std::string m_snap_name;
+  ContextWQ *m_work_queue;
   Mutex m_lock;
   State m_state;
   std::string m_local_pool_name, m_remote_pool_name;
@@ -116,6 +203,7 @@ private:
   librbd::journal::Replay<librbd::ImageCtx> *m_local_replay;
   ::journal::Journaler *m_remote_journaler;
   ::journal::ReplayHandler *m_replay_handler;
+  Context *m_on_finish;
   ImageReplayerAdminSocketHook *m_asok_hook;
 };
 
index ba043b37ec6d31bf025d9aa4df27d6968474da55..f40127b6e8932435578c5ee6b33aeb2fd179b058 100644 (file)
@@ -99,6 +99,17 @@ void Replayer::run()
     set_sources(m_pool_watcher->get_images());
     m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
   }
+
+  // Stopping
+  map<int64_t, set<string> > empty_sources;
+  while (true) {
+    Mutex::Locker l(m_lock);
+    set_sources(empty_sources);
+    if (m_images.empty()) {
+      break;
+    }
+    m_cond.WaitInterval(g_ceph_context, m_lock, seconds(1));
+  }
 }
 
 void Replayer::set_sources(const map<int64_t, set<string> > &images)
@@ -106,19 +117,28 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
   dout(20) << "enter" << dendl;
 
   assert(m_lock.is_locked());
-  // TODO: make stopping and starting ImageReplayers async
   for (auto it = m_images.begin(); it != m_images.end();) {
     int64_t pool_id = it->first;
     auto &pool_images = it->second;
     if (images.find(pool_id) == images.end()) {
-      m_images.erase(it++);
+      for (auto images_it = pool_images.begin();
+          images_it != pool_images.end();) {
+       if (stop_image_replayer(images_it->second)) {
+         pool_images.erase(images_it++);
+       }
+      }
+      if (pool_images.empty()) {
+       m_images.erase(it++);
+      }
       continue;
     }
     for (auto images_it = pool_images.begin();
         images_it != pool_images.end();) {
       if (images.at(pool_id).find(images_it->first) ==
          images.at(pool_id).end()) {
-       pool_images.erase(images_it++);
+       if (stop_image_replayer(images_it->second)) {
+         pool_images.erase(images_it++);
+       }
       } else {
        ++images_it;
       }
@@ -149,23 +169,46 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
     // create entry for pool if it doesn't exist
     auto &pool_replayers = m_images[pool_id];
     for (const auto &image_id : kv.second) {
-      if (pool_replayers.find(image_id) == pool_replayers.end()) {
+      auto it = pool_replayers.find(image_id);
+      if (it == pool_replayers.end()) {
        unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
-                                                                   m_local,
+                                                                  m_local,
                                                                   m_remote,
                                                                   m_client_id,
-                                                                   local_ioctx.get_id(),
+                                                                  local_ioctx.get_id(),
                                                                   pool_id,
                                                                   image_id));
-       int r = image_replayer->start();
-       if (r < 0) {
-         continue;
-       }
-       pool_replayers.insert(std::make_pair(image_id, std::move(image_replayer)));
+       it = pool_replayers.insert(
+         std::make_pair(image_id, std::move(image_replayer))).first;
       }
+      start_image_replayer(it->second);
     }
   }
 }
 
+void Replayer::start_image_replayer(unique_ptr<ImageReplayer> &image_replayer)
+{
+  if (!image_replayer->is_stopped()) {
+    return;
+  }
+
+  image_replayer->start();
+}
+
+bool Replayer::stop_image_replayer(unique_ptr<ImageReplayer> &image_replayer)
+{
+  if (image_replayer->is_stopped()) {
+    return true;
+  }
+
+  if (image_replayer->is_running()) {
+    image_replayer->stop();
+  } else {
+    // TODO: check how long it is stopping and alert if it is too long.
+  }
+
+  return false;
+}
+
 } // namespace mirror
 } // namespace rbd
index 03199b6c8fb1c79f1615ce206da096d9a42e5aaa..44d699e1cbe4112daa2b495004b12642d0bfd163 100644 (file)
@@ -42,6 +42,9 @@ public:
 private:
   void set_sources(const std::map<int64_t, std::set<std::string> > &images);
 
+  void start_image_replayer(unique_ptr<ImageReplayer> &image_replayer);
+  bool stop_image_replayer(unique_ptr<ImageReplayer> &image_replayer);
+
   Threads *m_threads;
   Mutex m_lock;
   Cond m_cond;