]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: store replay status in mirroring object
authorMykola Golub <mgolub@mirantis.com>
Tue, 5 Apr 2016 11:40:50 +0000 (14:40 +0300)
committerAbhishek Varshney <abhishek.varshney@flipkart.com>
Mon, 2 May 2016 06:35:58 +0000 (12:05 +0530)
Fixes: #14420
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
(cherry picked from commit 52b2fe14cfc932b2aa53ce76a3150ce87ce4d377)

14 files changed:
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/test/rbd_mirror/test_mock_ImageSync.cc
src/tools/Makefile-client.am
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/ImageSync.cc
src/tools/rbd_mirror/ImageSync.h
src/tools/rbd_mirror/ProgressContext.h [new file with mode: 0644]
src/tools/rbd_mirror/Replayer.cc
src/tools/rbd_mirror/Replayer.h
src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc
src/tools/rbd_mirror/image_replayer/BootstrapRequest.h
src/tools/rbd_mirror/image_sync/ImageCopyRequest.cc
src/tools/rbd_mirror/image_sync/ImageCopyRequest.h

index ac4e8bcf354db6086531ae4670463fec2558ae24..cf19c6747d47341ec58a171f80b1d3134c83f789 100644 (file)
@@ -65,7 +65,8 @@ struct BootstrapRequest<librbd::MockImageReplayerImageCtx> {
                                   const std::string &remote_mirror_uuid,
                                   ::journal::MockJournalerProxy *journaler,
                                   librbd::journal::MirrorPeerClientMeta *client_meta,
-                                  Context *on_finish) {
+                                  Context *on_finish,
+                                  rbd::mirror::ProgressContext *progress_ctx = nullptr) {
     assert(s_instance != nullptr);
     s_instance->on_finish = on_finish;
     return s_instance;
index 726fa95c3232efd36e2e61d7c068f98718f6d35f..8dd0870ad13700d89673cd3c7727719e9b7252e6 100644 (file)
@@ -36,7 +36,8 @@ public:
                                   journal::MockJournaler *journaler,
                                   librbd::journal::MirrorPeerClientMeta *client_meta,
                                   librbd::journal::MirrorPeerSyncPoint *sync_point,
-                                  Context *on_finish) {
+                                  Context *on_finish,
+                                  rbd::mirror::ProgressContext *progress_ctx = nullptr) {
     assert(s_instance != nullptr);
     s_instance->on_finish = on_finish;
     return s_instance;
index 0b8154982f0a1cdaa81c6e0b89c1991b144003b8..68614e2232959e87d5f45663bd0032de200c3d66 100644 (file)
@@ -112,6 +112,7 @@ noinst_HEADERS += \
        tools/rbd_mirror/ImageSync.h \
        tools/rbd_mirror/Mirror.h \
        tools/rbd_mirror/PoolWatcher.h \
+       tools/rbd_mirror/ProgressContext.h \
        tools/rbd_mirror/Replayer.h \
        tools/rbd_mirror/Threads.h \
        tools/rbd_mirror/types.h \
index df297b0805933fa99bfce78b4d362a62e10af0fa..698eefb7fb65a0933a7aeaee186cef7939f2b9ad 100644 (file)
@@ -37,6 +37,7 @@ namespace rbd {
 namespace mirror {
 
 using librbd::util::create_context_callback;
+using librbd::util::create_rados_ack_callback;
 using namespace rbd::mirror::image_replayer;
 
 template <typename I>
@@ -57,7 +58,11 @@ struct ReplayHandler : public ::journal::ReplayHandler {
     replayer->handle_replay_ready();
   }
   virtual void handle_complete(int r) {
-    replayer->handle_replay_complete(r);
+    std::stringstream ss;
+    if (r < 0) {
+      ss << "replay completed with error: " << cpp_strerror(r);
+    }
+    replayer->handle_replay_complete(r, ss.str());
   }
 };
 
@@ -154,6 +159,22 @@ private:
 
 } // anonymous namespace
 
+template <typename I>
+void ImageReplayer<I>::BootstrapProgressContext::update_progress(
+  const std::string &description, bool flush)
+{
+  const std::string desc = "bootstrapping, " + description;
+
+  FunctionContext *ctx = new FunctionContext(
+    [this, desc, flush](int r) {
+      replayer->set_state_description(0, desc);
+      if (flush) {
+       replayer->update_mirror_image_status();
+      }
+    });
+  replayer->m_threads->work_queue->queue(ctx, 0);
+}
+
 template <typename I>
 ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
                             const std::string &local_mirror_uuid,
@@ -174,12 +195,7 @@ ImageReplayer<I>::ImageReplayer(Threads *threads, RadosRef local, RadosRef remot
   m_name(stringify(remote_pool_id) + "/" + remote_image_id),
   m_lock("rbd::mirror::ImageReplayer " + stringify(remote_pool_id) + " " +
         remote_image_id),
-  m_state(STATE_UNINITIALIZED),
-  m_local_image_ctx(nullptr),
-  m_local_replay(nullptr),
-  m_remote_journaler(nullptr),
-  m_replay_handler(nullptr),
-  m_asok_hook(nullptr)
+  m_progress_cxt(this)
 {
 }
 
@@ -192,9 +208,19 @@ ImageReplayer<I>::~ImageReplayer()
   assert(m_replay_handler == nullptr);
   assert(m_on_start_finish == nullptr);
   assert(m_on_stop_finish == nullptr);
+
   delete m_asok_hook;
 }
 
+template <typename I>
+void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
+  dout(20) << r << " " << desc << dendl;
+
+  Mutex::Locker l(m_lock);
+  m_last_r = r;
+  m_state_desc = desc;
+}
+
 template <typename I>
 void ImageReplayer<I>::start(Context *on_finish,
                             const BootstrapParams *bootstrap_params)
@@ -208,6 +234,8 @@ void ImageReplayer<I>::start(Context *on_finish,
     assert(is_stopped_());
 
     m_state = STATE_STARTING;
+    m_last_r = 0;
+    m_state_desc.clear();
     m_on_start_finish = on_finish;
   }
 
@@ -215,7 +243,7 @@ void ImageReplayer<I>::start(Context *on_finish,
   if (r < 0) {
     derr << "error opening ioctx for remote pool " << m_remote_pool_id
         << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r);
+    on_start_fail_start(r, "error opening remote pool");
     return;
   }
 
@@ -227,10 +255,12 @@ void ImageReplayer<I>::start(Context *on_finish,
   if (r < 0) {
     derr << "error opening ioctx for local pool " << m_local_pool_id
          << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r);
+    on_start_fail_start(r, "error opening local pool");
     return;
   }
 
+  reschedule_update_status_task(10);
+
   CephContext *cct = static_cast<CephContext *>(m_local->cct());
   double commit_interval = cct->_conf->rbd_journal_commit_age;
   m_remote_journaler = new Journaler(m_threads->work_queue,
@@ -247,15 +277,18 @@ void ImageReplayer<I>::bootstrap() {
   dout(20) << "bootstrap params: "
           << "local_image_name=" << m_local_image_name << dendl;
 
+  update_mirror_image_status();
+
   // TODO: add a new bootstrap state and support canceling
   Context *ctx = create_context_callback<
     ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
+
   BootstrapRequest<I> *request = BootstrapRequest<I>::create(
     m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
     m_local_image_name, m_remote_image_id, m_global_image_id,
     m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
     m_local_mirror_uuid, m_remote_mirror_uuid, m_remote_journaler,
-    &m_client_meta, ctx);
+    &m_client_meta, ctx, &m_progress_cxt);
   request->send();
 }
 
@@ -265,10 +298,10 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
 
   if (r == -EREMOTEIO) {
     dout(5) << "remote image is non-primary or local image is primary" << dendl;
-    on_start_fail_start(0);
+    on_start_fail_start(0, "remote image is non-primary or local image is primary");
     return;
   } else if (r < 0) {
-    on_start_fail_start(r);
+    on_start_fail_start(r, "error bootstrapping replay");
     return;
   } else if (on_start_interrupted()) {
     return;
@@ -283,6 +316,8 @@ void ImageReplayer<I>::handle_bootstrap(int r) {
     m_asok_hook = new ImageReplayerAdminSocketHook<I>(cct, m_name, this);
   }
 
+  update_mirror_image_status();
+
   init_remote_journaler();
 }
 
@@ -301,7 +336,7 @@ void ImageReplayer<I>::handle_init_remote_journaler(int r) {
 
   if (r < 0) {
     derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r);
+    on_start_fail_start(r, "error initializing remote journal");
     return;
   } else if (on_start_interrupted()) {
     return;
@@ -318,7 +353,7 @@ void ImageReplayer<I>::start_replay() {
   if (r < 0) {
     derr << "error starting external replay on local image "
         <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
-    on_start_fail_start(r);
+    on_start_fail_start(r, "error starting replay on local image");
     return;
   }
 
@@ -334,15 +369,19 @@ void ImageReplayer<I>::start_replay() {
   {
     Mutex::Locker locker(m_lock);
     if (m_stop_requested) {
-      on_start_fail_start(-EINTR);
+      on_start_fail_start(-EINTR, "start interrupted");
       return;
     }
 
     assert(m_state == STATE_STARTING);
     m_state = STATE_REPLAYING;
+    m_state_desc.clear();
     std::swap(m_on_start_finish, on_finish);
   }
 
+  update_mirror_image_status();
+  reschedule_update_status_task(30);
+
   dout(20) << "start succeeded" << dendl;
   if (on_finish != nullptr) {
     dout(20) << "on finish complete, r=" << r << dendl;
@@ -351,13 +390,14 @@ void ImageReplayer<I>::start_replay() {
 }
 
 template <typename I>
-void ImageReplayer<I>::on_start_fail_start(int r)
+void ImageReplayer<I>::on_start_fail_start(int r, const std::string &desc)
 {
   dout(20) << "r=" << r << dendl;
 
   FunctionContext *ctx = new FunctionContext(
-    [this, r](int r1) {
+    [this, r, desc](int r1) {
       assert(r1 == 0);
+      set_state_description(r, desc);
       on_start_fail_finish(r);
     });
 
@@ -394,12 +434,6 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
     m_local_image_ctx = nullptr;
   }
 
-  m_local_ioctx.close();
-  m_remote_ioctx.close();
-
-  delete m_asok_hook;
-  m_asok_hook = nullptr;
-
   Context *on_start_finish(nullptr);
   Context *on_stop_finish(nullptr);
   {
@@ -412,12 +446,20 @@ void ImageReplayer<I>::on_start_fail_finish(int r)
     } else {
       assert(m_state == STATE_STARTING);
       dout(20) << "start failed" << dendl;
-      m_state = STATE_UNINITIALIZED;
+      m_state = (r < 0) ? STATE_UNINITIALIZED : STATE_STOPPED;
     }
     std::swap(m_on_start_finish, on_start_finish);
     std::swap(m_on_stop_finish, on_stop_finish);
   }
 
+  update_mirror_image_status(true);
+
+  m_local_ioctx.close();
+  m_remote_ioctx.close();
+
+  delete m_asok_hook;
+  m_asok_hook = nullptr;
+
   if (on_start_finish != nullptr) {
     dout(20) << "on start finish complete, r=" << r << dendl;
     on_start_finish->complete(r);
@@ -492,6 +534,8 @@ void ImageReplayer<I>::on_stop_journal_replay_shut_down_start()
     m_state = STATE_STOPPING;
     m_local_replay->shut_down(false, ctx);
   }
+
+  update_mirror_image_status();
 }
 
 template <typename I>
@@ -536,6 +580,8 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
     derr << "error closing local image: " << cpp_strerror(r) << dendl;
   }
 
+  update_mirror_image_status(true);
+
   m_local_ioctx.close();
 
   m_remote_journaler->stop_replay();
@@ -558,6 +604,7 @@ void ImageReplayer<I>::on_stop_local_image_close_finish(int r)
     assert(m_state == STATE_STOPPING);
 
     m_state = STATE_STOPPED;
+    m_state_desc.clear();
     m_stop_requested = false;
     std::swap(m_on_stop_finish, on_finish);
   }
@@ -668,6 +715,8 @@ void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
         << cpp_strerror(r) << dendl;
   }
 
+  update_mirror_image_status();
+
   dout(20) << "flush complete, r=" << r << dendl;
   on_flush->complete(r);
 }
@@ -706,11 +755,12 @@ void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
 }
 
 template <typename I>
-void ImageReplayer<I>::handle_replay_complete(int r)
+void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
 {
   dout(20) << "r=" << r << dendl;
   if (r < 0) {
     derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
+    set_state_description(r, error_desc);
   }
 
   {
@@ -735,7 +785,7 @@ void ImageReplayer<I>::handle_replay_flush(int r) {
 
   if (r < 0) {
     derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
-    handle_replay_complete(r);
+    handle_replay_complete(r, "replay flush encountered an error");
     return;
   }
 
@@ -767,7 +817,7 @@ void ImageReplayer<I>::handle_get_remote_tag(int r) {
   if (r < 0) {
     derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
          << cpp_strerror(r) << dendl;
-    handle_replay_complete(r);
+    handle_replay_complete(r, "failed to retrieve remote tag");
     return;
   }
 
@@ -816,7 +866,7 @@ void ImageReplayer<I>::handle_allocate_local_tag(int r) {
 
   if (r < 0) {
     derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
-    handle_replay_complete(r);
+    handle_replay_complete(r, "failed to allocate journal tag");
     return;
   }
 
@@ -855,7 +905,7 @@ void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry
   if (r < 0) {
     derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
 
-    handle_replay_complete(r);
+    handle_replay_complete(r, "failed to commit journal event");
     return;
   }
 
@@ -873,6 +923,204 @@ void ImageReplayer<I>::shut_down_journal_replay(bool cancel_ops)
   }
 }
 
+template <typename I>
+void ImageReplayer<I>::update_mirror_image_status(bool final)
+{
+  dout(20) << dendl;
+
+  cls::rbd::MirrorImageStatus status;
+
+  {
+    Mutex::Locker locker(m_lock);
+
+    if (!final) {
+      if (m_update_status_comp) {
+       dout(20) << "already sending update" << dendl;
+       m_update_status_pending = true;
+       return;
+      }
+
+      Context *ctx = new FunctionContext(
+       [this](int r) {
+         if (r < 0) {
+           derr << "error updating mirror image status: " << cpp_strerror(r)
+           << dendl;
+         }
+         bool pending = false;
+         librados::AioCompletion *comp = nullptr;
+         {
+           Mutex::Locker locker(m_lock);
+           std::swap(m_update_status_comp, comp);
+           std::swap(m_update_status_pending, pending);
+         }
+         if (comp) {
+           comp->release();
+         }
+         if (pending && r == 0) {
+           update_mirror_image_status();
+         }
+       });
+      m_update_status_comp = create_rados_ack_callback(ctx);
+      m_update_status_pending = false;
+    }
+
+    switch (m_state) {
+    case STATE_UNINITIALIZED:
+      if (m_last_r < 0) {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+       status.description = m_state_desc;
+      } else {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
+       status.description = m_state_desc.empty() ? "not started yet" :
+         m_state_desc;
+      }
+      break;
+    case STATE_STARTING:
+      // TODO: a better way to detect syncing state.
+      if (!m_asok_hook) {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
+       status.description = m_state_desc.empty() ? "syncing" : m_state_desc;
+      } else {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
+       status.description = "starting replay";
+      }
+      break;
+    case STATE_REPLAYING:
+      status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
+      status.description = get_replay_status_description();
+      break;
+    case STATE_STOPPING:
+      if (m_local_image_ctx) {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
+       status.description = "stopping replay";
+       break;
+      }
+      /* FALLTHROUGH */
+    case STATE_STOPPED:
+      if (m_last_r < 0) {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
+       status.description = m_state_desc;
+      } else {
+       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
+       status.description = m_state_desc.empty() ? "stopped" : m_state_desc;
+      }
+      break;
+    default:
+      assert(!"invalid state");
+    }
+  }
+
+  dout(20) << "status=" << status << dendl;
+
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
+
+  if (final) {
+    reschedule_update_status_task(-1);
+    m_local_ioctx.aio_flush();
+    librados::AioCompletion *comp = nullptr;
+    {
+      Mutex::Locker locker(m_lock);
+      std::swap(m_update_status_comp, comp);
+    }
+    if (comp) {
+      comp->wait_for_complete();
+    }
+    int r = m_local_ioctx.operate(RBD_MIRRORING, &op);
+    if (r < 0) {
+      derr << "error updating mirror image status: " << cpp_strerror(r)
+          << dendl;
+    }
+    return;
+  }
+
+  int r = m_local_ioctx.aio_operate(RBD_MIRRORING, m_update_status_comp, &op);
+  assert(r == 0);
+
+  reschedule_update_status_task();
+}
+
+template <typename I>
+void ImageReplayer<I>::reschedule_update_status_task(int new_interval)
+{
+  Mutex::Locker locker(m_threads->timer_lock);
+
+  if (m_update_status_task) {
+    m_threads->timer->cancel_event(m_update_status_task);
+    m_update_status_task = nullptr;
+  }
+
+  if (new_interval > 0) {
+    m_update_status_interval = new_interval;
+  }
+
+  if (new_interval < 0) {
+    return;
+  }
+
+  m_update_status_task = new FunctionContext(
+    [this](int r) {
+      start_update_status_task();
+    });
+
+  m_threads->timer->add_event_after(m_update_status_interval,
+                                   m_update_status_task);
+}
+
+template <typename I>
+void ImageReplayer<I>::start_update_status_task()
+{
+  FunctionContext *ctx = new FunctionContext(
+    [this](int r) {
+      {
+       Mutex::Locker locker(m_threads->timer_lock);
+       m_update_status_task = nullptr;
+      }
+      update_mirror_image_status();
+    });
+  m_threads->work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+std::string ImageReplayer<I>::get_replay_status_description() {
+  assert(m_lock.is_locked());
+  assert(m_state == STATE_REPLAYING);
+
+  std::stringstream ss;
+  ss << "replaying";
+
+  cls::journal::Client master;
+  int r = m_remote_journaler->get_cached_client(
+    librbd::Journal<>::IMAGE_CLIENT_ID, &master);
+  if (r == 0) {
+    ss << ", master_position=";
+    cls::journal::ObjectPositions &object_positions =
+      master.commit_position.object_positions;
+
+    if (object_positions.begin() != object_positions.end()) {
+      ss << *(object_positions.begin());
+    } else {
+      ss << "[]";
+    }
+  }
+
+  cls::journal::Client mirror;
+  r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &mirror);
+  if (r == 0) {
+    ss << ", mirror_position=";
+    cls::journal::ObjectPositions &object_positions =
+      mirror.commit_position.object_positions;
+
+    if (object_positions.begin() != object_positions.end()) {
+      ss << *(object_positions.begin());
+    } else {
+      ss << "[]";
+    }
+  }
+
+  return ss.str();
+}
+
 template <typename I>
 std::string ImageReplayer<I>::to_string(const State state) {
   switch (state) {
index 17315add7dce3a3f3974d374cd8cf76e642f488c..01cbf85bdbad03e88af6c2344a46173a0d9b9158 100644 (file)
 #include "common/WorkQueue.h"
 #include "include/rados/librados.hpp"
 #include "cls/journal/cls_journal_types.h"
+#include "cls/rbd/cls_rbd_types.h"
 #include "journal/ReplayEntry.h"
 #include "librbd/journal/Types.h"
 #include "librbd/journal/TypeTraits.h"
+#include "ProgressContext.h"
 #include "types.h"
 
 class AdminSocketHook;
@@ -80,6 +82,7 @@ public:
   bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
 
   std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
+  void set_state_description(int r, const std::string &desc);
 
   void start(Context *on_finish = nullptr,
             const BootstrapParams *bootstrap_params = nullptr);
@@ -89,7 +92,7 @@ public:
   void print_status(Formatter *f, stringstream *ss);
 
   virtual void handle_replay_ready();
-  virtual void handle_replay_complete(int r);
+  virtual void handle_replay_complete(int r, const std::string &error_desc);
 
   inline int64_t get_remote_pool_id() const {
     return m_remote_pool_id;
@@ -159,7 +162,7 @@ protected:
    * @endverbatim
    */
 
-  virtual void on_start_fail_start(int r);
+  virtual void on_start_fail_start(int r, const std::string &desc = "");
   virtual void on_start_fail_finish(int r);
   virtual bool on_start_interrupted();
 
@@ -180,6 +183,18 @@ protected:
 private:
   typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
 
+  class BootstrapProgressContext : public ProgressContext {
+  public:
+    BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
+      replayer(replayer) {
+    }
+
+    virtual void update_progress(const std::string &description,
+                                bool flush = true);
+  private:
+    ImageReplayer<ImageCtxT> *replayer;
+  };
+
   Threads *m_threads;
   RadosRef m_local, m_remote;
   std::string m_local_mirror_uuid;
@@ -189,18 +204,25 @@ private:
   std::string m_local_image_name;
   std::string m_name;
   Mutex m_lock;
-  State m_state;
+  State m_state = STATE_UNINITIALIZED;
+  int m_last_r = 0;
+  std::string m_state_desc;
+  BootstrapProgressContext m_progress_cxt;
   librados::IoCtx m_local_ioctx, m_remote_ioctx;
-  ImageCtxT *m_local_image_ctx;
-  librbd::journal::Replay<ImageCtxT> *m_local_replay;
-  Journaler* m_remote_journaler;
-  ::journal::ReplayHandler *m_replay_handler;
+  ImageCtxT *m_local_image_ctx = nullptr;
+  librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
+  Journaler* m_remote_journaler = nullptr;
+  ::journal::ReplayHandler *m_replay_handler = nullptr;
 
   Context *m_on_start_finish = nullptr;
   Context *m_on_stop_finish = nullptr;
+  Context *m_update_status_task = nullptr;
+  int m_update_status_interval = 0;
+  librados::AioCompletion *m_update_status_comp = nullptr;
+  bool m_update_status_pending = false;
   bool m_stop_requested = false;
 
-  AdminSocketHook *m_asok_hook;
+  AdminSocketHook *m_asok_hook = nullptr;
 
   librbd::journal::MirrorPeerClientMeta m_client_meta;
 
@@ -232,6 +254,12 @@ private:
 
   void shut_down_journal_replay(bool cancel_ops);
 
+  void update_mirror_image_status(bool final = false);
+  void reschedule_update_status_task(int new_interval = 0);
+  void start_update_status_task();
+
+  std::string get_replay_status_description();
+
   void bootstrap();
   void handle_bootstrap(int r);
 
index 5bb145779472da8c6b480b580261f962a97f2137..f2f6784b84cdd62a06ff347e36c62106fde25ccd 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "ImageSync.h"
+#include "ProgressContext.h"
 #include "common/errno.h"
 #include "journal/Journaler.h"
 #include "librbd/ImageCtx.h"
@@ -29,10 +30,12 @@ template <typename I>
 ImageSync<I>::ImageSync(I *local_image_ctx, I *remote_image_ctx,
                         SafeTimer *timer, Mutex *timer_lock,
                         const std::string &mirror_uuid, Journaler *journaler,
-                        MirrorPeerClientMeta *client_meta, Context *on_finish)
+                        MirrorPeerClientMeta *client_meta, Context *on_finish,
+                       ProgressContext *progress_ctx)
   : m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
     m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
     m_journaler(journaler), m_client_meta(client_meta), m_on_finish(on_finish),
+    m_progress_ctx(progress_ctx),
     m_lock(unique_lock_name("ImageSync::m_lock", this)) {
 }
 
@@ -56,6 +59,8 @@ void ImageSync<I>::cancel() {
 
 template <typename I>
 void ImageSync<I>::send_prune_catch_up_sync_point() {
+  update_progress("PRUNE_CATCH_UP_SYNC_POINT");
+
   if (m_client_meta->sync_points.size() <= 1) {
     send_create_sync_point();
     return;
@@ -88,6 +93,8 @@ void ImageSync<I>::handle_prune_catch_up_sync_point(int r) {
 
 template <typename I>
 void ImageSync<I>::send_create_sync_point() {
+  update_progress("CREATE_SYNC_POINT");
+
   // TODO: when support for disconnecting laggy clients is added,
   //       re-connect and create catch-up sync point
   if (m_client_meta->sync_points.size() > 0) {
@@ -122,6 +129,8 @@ void ImageSync<I>::handle_create_sync_point(int r) {
 
 template <typename I>
 void ImageSync<I>::send_copy_snapshots() {
+  update_progress("COPY_SNAPSHOTS");
+
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << dendl;
 
@@ -157,6 +166,8 @@ void ImageSync<I>::send_copy_image() {
     return;
   }
 
+  update_progress("COPY_IMAGE");
+
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << dendl;
 
@@ -165,7 +176,7 @@ void ImageSync<I>::send_copy_image() {
   m_image_copy_request = ImageCopyRequest<I>::create(
     m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
     m_journaler, m_client_meta, &m_client_meta->sync_points.front(),
-    ctx);
+    ctx, m_progress_ctx);
   m_lock.Unlock();
 
   m_image_copy_request->send();
@@ -207,6 +218,8 @@ void ImageSync<I>::send_copy_object_map() {
     return;
   }
 
+  update_progress("COPY_OBJECT_MAP");
+
   assert(m_local_image_ctx->object_map != nullptr);
 
   assert(!m_client_meta->sync_points.empty());
@@ -242,6 +255,8 @@ void ImageSync<I>::send_refresh_object_map() {
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << dendl;
 
+  update_progress("REFRESH_OBJECT_MAP");
+
   Context *ctx = create_context_callback<
     ImageSync<I>, &ImageSync<I>::handle_refresh_object_map>(this);
   m_object_map = m_local_image_ctx->create_object_map(CEPH_NOSNAP);
@@ -268,6 +283,8 @@ void ImageSync<I>::send_prune_sync_points() {
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << dendl;
 
+  update_progress("PRUNE_SYNC_POINTS");
+
   Context *ctx = create_context_callback<
     ImageSync<I>, &ImageSync<I>::handle_prune_sync_points>(this);
   SyncPointPruneRequest<I> *request = SyncPointPruneRequest<I>::create(
@@ -304,6 +321,15 @@ void ImageSync<I>::finish(int r) {
   delete this;
 }
 
+template <typename I>
+void ImageSync<I>::update_progress(const std::string &description) {
+  dout(20) << ": " << description << dendl;
+
+  if (m_progress_ctx) {
+    m_progress_ctx->update_progress("IMAGE_SYNC/" + description);
+  }
+}
+
 } // namespace mirror
 } // namespace rbd
 
index 1ed22568f52b70bdcfbc78bb08d378fccea0ab96..95809ab0fb95ebda63932480103ca6978ddd05eb 100644 (file)
@@ -20,6 +20,8 @@ namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
 namespace rbd {
 namespace mirror {
 
+class ProgressContext;
+
 namespace image_sync { template <typename> class ImageCopyRequest; }
 
 template <typename ImageCtxT = librbd::ImageCtx>
@@ -34,15 +36,17 @@ public:
                            Mutex *timer_lock, const std::string &mirror_uuid,
                            Journaler *journaler,
                            MirrorPeerClientMeta *client_meta,
-                           Context *on_finish) {
+                           Context *on_finish,
+                          ProgressContext *progress_ctx = nullptr) {
     return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock,
-                         mirror_uuid, journaler, client_meta, on_finish);
+                         mirror_uuid, journaler, client_meta, on_finish,
+                        progress_ctx);
   }
 
   ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
             SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
             Journaler *journaler, MirrorPeerClientMeta *client_meta,
-            Context *on_finish);
+            Context *on_finish, ProgressContext *progress_ctx = nullptr);
 
   void start();
   void cancel();
@@ -91,6 +95,7 @@ private:
   Journaler *m_journaler;
   MirrorPeerClientMeta *m_client_meta;
   Context *m_on_finish;
+  ProgressContext *m_progress_ctx;
 
   SnapMap m_snap_map;
 
@@ -122,6 +127,8 @@ private:
   void handle_prune_sync_points(int r);
 
   void finish(int r);
+
+  void update_progress(const std::string &description);
 };
 
 } // namespace mirror
diff --git a/src/tools/rbd_mirror/ProgressContext.h b/src/tools/rbd_mirror/ProgressContext.h
new file mode 100644 (file)
index 0000000..e4430ee
--- /dev/null
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RBD_MIRROR_PROGRESS_CONTEXT_H
+#define RBD_MIRROR_PROGRESS_CONTEXT_H
+
+namespace rbd {
+namespace mirror {
+
+class ProgressContext
+{
+public:
+  virtual ~ProgressContext() {}
+  virtual void update_progress(const std::string &description,
+                              bool flush = true) = 0;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // RBD_MIRROR_PROGRESS_CONTEXT_H
index d73bc103048de5531a7ba19385f1ba78a94aba36..ed2da4b2ff1159b8cfac430c66c5b28771b92b0b 100644 (file)
@@ -12,7 +12,9 @@
 #include "common/errno.h"
 #include "include/stringify.h"
 #include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ObjectWatcher.h"
 #include "Replayer.h"
+#include "Threads.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -113,6 +115,55 @@ private:
   Commands commands;
 };
 
+class MirrorStatusWatchCtx {
+public:
+  MirrorStatusWatchCtx(librados::IoCtx &ioctx, ContextWQ *work_queue) {
+    m_ioctx.dup(ioctx);
+    m_watcher = new Watcher(m_ioctx, work_queue);
+  }
+
+  ~MirrorStatusWatchCtx() {
+    delete m_watcher;
+  }
+
+  int register_watch() {
+    C_SaferCond cond;
+    m_watcher->register_watch(&cond);
+    return cond.wait();
+  }
+
+  int unregister_watch() {
+    C_SaferCond cond;
+    m_watcher->unregister_watch(&cond);
+    return cond.wait();
+  }
+
+  std::string get_oid() const {
+    return m_watcher->get_oid();
+  }
+
+private:
+  class Watcher : public librbd::ObjectWatcher<> {
+  public:
+    Watcher(librados::IoCtx &ioctx, ContextWQ *work_queue) :
+      ObjectWatcher<>(ioctx, work_queue) {
+    }
+
+    virtual std::string get_oid() const {
+      return RBD_MIRRORING;
+    }
+
+    virtual void handle_notify(uint64_t notify_id, uint64_t handle,
+                              bufferlist &bl) {
+      bufferlist out;
+      acknowledge_notify(notify_id, handle, out);
+    }
+  };
+
+  librados::IoCtx m_ioctx;
+  Watcher *m_watcher;
+};
+
 Replayer::Replayer(Threads *threads, RadosRef local_cluster,
                    const peer_t &peer, const std::vector<const char*> &args) :
   m_threads(threads),
@@ -306,6 +357,7 @@ void Replayer::set_sources(const PoolImageIds &pool_image_ids)
         }
       }
       if (pool_images.empty()) {
+       mirror_image_status_shut_down(pool_id);
        it = m_images.erase(it);
       } else {
         ++it;
@@ -369,6 +421,14 @@ void Replayer::set_sources(const PoolImageIds &pool_image_ids)
 
     // create entry for pool if it doesn't exist
     auto &pool_replayers = m_images[pool_id];
+
+    if (pool_replayers.empty()) {
+      r = mirror_image_status_init(pool_id, local_ioctx);
+      if (r < 0) {
+       continue;
+      }
+    }
+
     for (const auto &image_id : kv.second) {
       auto it = pool_replayers.find(image_id.id);
       if (it == pool_replayers.end()) {
@@ -383,6 +443,51 @@ void Replayer::set_sources(const PoolImageIds &pool_image_ids)
   }
 }
 
+int Replayer::mirror_image_status_init(int64_t pool_id,
+                                      librados::IoCtx& ioctx) {
+  assert(m_status_watchers.find(pool_id) == m_status_watchers.end());
+
+  uint64_t instance_id = librados::Rados(ioctx).get_instance_id();
+
+  dout(20) << "pool_id=" << pool_id << ", instance_id=" << instance_id << dendl;
+
+  librados::ObjectWriteOperation op;
+  librbd::cls_client::mirror_image_status_remove_down(&op);
+  int r = ioctx.operate(RBD_MIRRORING, &op);
+  if (r < 0) {
+    derr << "error initializing " << RBD_MIRRORING << "object: "
+        << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  unique_ptr<MirrorStatusWatchCtx>
+    watch_ctx(new MirrorStatusWatchCtx(ioctx, m_threads->work_queue));
+
+  r = watch_ctx->register_watch();
+  if (r < 0) {
+    derr << "error registering watcher for " << watch_ctx->get_oid()
+        << " object: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  m_status_watchers.insert(std::make_pair(pool_id, std::move(watch_ctx)));
+
+  return 0;
+}
+
+void Replayer::mirror_image_status_shut_down(int64_t pool_id) {
+  auto watcher_it = m_status_watchers.find(pool_id);
+  assert(watcher_it != m_status_watchers.end());
+
+  int r = watcher_it->second->unregister_watch();
+  if (r < 0) {
+    derr << "error unregistering watcher for " << watcher_it->second->get_oid()
+        << " object: " << cpp_strerror(r) << dendl;
+  }
+
+  m_status_watchers.erase(watcher_it);
+}
+
 void Replayer::start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer)
 {
   if (!image_replayer->is_stopped()) {
index f7c623b592f29c69df3ee665f6b45b9ae7a7ec25..37b96606a39c06ef7930138d81ff4244870bac59 100644 (file)
@@ -25,6 +25,7 @@ namespace mirror {
 
 struct Threads;
 class ReplayerAdminSocketHook;
+class MirrorStatusWatchCtx;
 
 /**
  * Controls mirroring for a single remote cluster.
@@ -52,6 +53,9 @@ private:
   void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
   bool stop_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer);
 
+  int mirror_image_status_init(int64_t pool_id, librados::IoCtx& ioctx);
+  void mirror_image_status_shut_down(int64_t pool_id);
+
   Threads *m_threads;
   Mutex m_lock;
   Cond m_cond;
@@ -65,6 +69,7 @@ private:
   // when a pool's configuration changes
   std::map<int64_t, std::map<std::string,
                             std::unique_ptr<ImageReplayer<> > > > m_images;
+  std::map<int64_t, std::unique_ptr<MirrorStatusWatchCtx> > m_status_watchers;
   ReplayerAdminSocketHook *m_asok_hook;
 
   class ReplayerThread : public Thread {
index 92f98c0d1be018a3e9ea596cf65e02b089f679fe..760992de95d613d7e111dfee8129c5f895088fce 100644 (file)
@@ -17,6 +17,7 @@
 #include "librbd/Utils.h"
 #include "librbd/journal/Types.h"
 #include "tools/rbd_mirror/ImageSync.h"
+#include "tools/rbd_mirror/ProgressContext.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -95,14 +96,16 @@ BootstrapRequest<I>::BootstrapRequest(librados::IoCtx &local_io_ctx,
                                       const std::string &remote_mirror_uuid,
                                       Journaler *journaler,
                                       MirrorPeerClientMeta *client_meta,
-                                      Context *on_finish)
+                                      Context *on_finish,
+                                     rbd::mirror::ProgressContext *progress_ctx)
   : m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
     m_local_image_ctx(local_image_ctx), m_local_image_name(local_image_name),
     m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
     m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
     m_local_mirror_uuid(local_mirror_uuid),
     m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
-    m_client_meta(client_meta), m_on_finish(on_finish) {
+    m_client_meta(client_meta), m_on_finish(on_finish),
+    m_progress_ctx(progress_ctx) {
 }
 
 template <typename I>
@@ -119,6 +122,8 @@ template <typename I>
 void BootstrapRequest<I>::get_local_image_id() {
   dout(20) << dendl;
 
+  update_progress("GET_LOCAL_IMAGE_ID");
+
   // attempt to cross-reference a local image by the global image id
   librados::ObjectReadOperation op;
   librbd::cls_client::mirror_image_get_image_id_start(&op, m_global_image_id);
@@ -156,6 +161,8 @@ template <typename I>
 void BootstrapRequest<I>::get_remote_tag_class() {
   dout(20) << dendl;
 
+  update_progress("GET_REMOTE_TAG_CLASS");
+
   Context *ctx = create_context_callback<
     BootstrapRequest<I>, &BootstrapRequest<I>::handle_get_remote_tag_class>(
       this);
@@ -201,6 +208,8 @@ template <typename I>
 void BootstrapRequest<I>::get_client() {
   dout(20) << dendl;
 
+  update_progress("GET_CLIENT");
+
   Context *ctx = create_context_callback<
     BootstrapRequest<I>, &BootstrapRequest<I>::handle_get_client>(
       this);
@@ -230,6 +239,8 @@ template <typename I>
 void BootstrapRequest<I>::register_client() {
   dout(20) << dendl;
 
+  update_progress("REGISTER_CLIENT");
+
   // record an place-holder record
   librbd::journal::ClientData client_data{
     librbd::journal::MirrorPeerClientMeta{m_local_image_id}};
@@ -261,6 +272,8 @@ template <typename I>
 void BootstrapRequest<I>::open_remote_image() {
   dout(20) << dendl;
 
+  update_progress("OPEN_REMOTE_IMAGE");
+
   m_remote_image_ctx = I::create("", m_remote_image_id, nullptr,
                                  m_remote_io_ctx, false);
   Context *ctx = create_context_callback<
@@ -319,6 +332,8 @@ template <typename I>
 void BootstrapRequest<I>::open_local_image() {
   dout(20) << dendl;
 
+  update_progress("OPEN_LOCAL_IMAGE");
+
   Context *ctx = create_context_callback<
     BootstrapRequest<I>, &BootstrapRequest<I>::handle_open_local_image>(
       this);
@@ -359,6 +374,8 @@ template <typename I>
 void BootstrapRequest<I>::remove_local_image() {
   dout(20) << dendl;
 
+  update_progress("REMOVE_LOCAL_IMAGE");
+
   // TODO
 }
 
@@ -373,6 +390,8 @@ template <typename I>
 void BootstrapRequest<I>::create_local_image() {
   dout(20) << dendl;
 
+  update_progress("CREATE_LOCAL_IMAGE");
+
   // TODO: librbd should provide an AIO image creation method -- this is
   //       blocking so we execute in our worker thread
   Context *ctx = create_context_callback<
@@ -401,6 +420,10 @@ void BootstrapRequest<I>::handle_create_local_image(int r) {
 
 template <typename I>
 void BootstrapRequest<I>::update_client() {
+  dout(20) << dendl;
+
+  update_progress("UPDATE_CLIENT");
+
   if (m_client_meta->image_id == (*m_local_image_ctx)->id) {
     // already registered local image with remote journal
     get_remote_tags();
@@ -440,6 +463,10 @@ void BootstrapRequest<I>::handle_update_client(int r) {
 
 template <typename I>
 void BootstrapRequest<I>::get_remote_tags() {
+  dout(20) << dendl;
+
+  update_progress("GET_REMOTE_TAGS");
+
   if (m_created_local_image) {
     // optimization -- no need to compare remote tags if we just created
     // the image locally
@@ -520,6 +547,10 @@ void BootstrapRequest<I>::handle_get_remote_tags(int r) {
 
 template <typename I>
 void BootstrapRequest<I>::image_sync() {
+  dout(20) << dendl;
+
+  update_progress("IMAGE_SYNC");
+
   if (m_client_meta->state == librbd::journal::MIRROR_PEER_STATE_REPLAYING) {
     // clean replay state -- no image sync required
     close_remote_image();
@@ -535,7 +566,8 @@ void BootstrapRequest<I>::image_sync() {
                                                m_remote_image_ctx, m_timer,
                                                m_timer_lock,
                                                m_local_mirror_uuid, m_journaler,
-                                               m_client_meta, ctx);
+                                               m_client_meta, ctx,
+                                              m_progress_ctx);
   request->start();
 }
 
@@ -555,6 +587,8 @@ template <typename I>
 void BootstrapRequest<I>::close_local_image() {
   dout(20) << dendl;
 
+  update_progress("CLOSE_LOCAL_IMAGE");
+
   Context *ctx = create_context_callback<
     BootstrapRequest<I>, &BootstrapRequest<I>::handle_close_local_image>(
       this);
@@ -579,6 +613,8 @@ template <typename I>
 void BootstrapRequest<I>::close_remote_image() {
   dout(20) << dendl;
 
+  update_progress("CLOSE_REMOTE_IMAGE");
+
   Context *ctx = create_context_callback<
     BootstrapRequest<I>, &BootstrapRequest<I>::handle_close_remote_image>(
       this);
@@ -636,6 +672,15 @@ bool BootstrapRequest<I>::decode_client_meta() {
   return true;
 }
 
+template <typename I>
+void BootstrapRequest<I>::update_progress(const std::string &description) {
+  dout(20) << ": " << description << dendl;
+
+  if (m_progress_ctx) {
+    m_progress_ctx->update_progress(description);
+  }
+}
+
 } // namespace image_replayer
 } // namespace mirror
 } // namespace rbd
index bf9629ff589454af1c921621d80c09d1d7f53277..0f6a79e5e661c36cf85609c72a6b04456b390edc 100644 (file)
@@ -21,6 +21,9 @@ namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
 
 namespace rbd {
 namespace mirror {
+
+class ProgressContext;
+
 namespace image_replayer {
 
 template <typename ImageCtxT = librbd::ImageCtx>
@@ -29,6 +32,7 @@ public:
   typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
   typedef typename TypeTraits::Journaler Journaler;
   typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+  typedef rbd::mirror::ProgressContext ProgressContext;
 
   static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
                                   librados::IoCtx &remote_io_ctx,
@@ -42,12 +46,14 @@ public:
                                   const std::string &remote_mirror_uuid,
                                   Journaler *journaler,
                                   MirrorPeerClientMeta *client_meta,
-                                  Context *on_finish) {
+                                  Context *on_finish,
+                                 ProgressContext *progress_ctx = nullptr) {
     return new BootstrapRequest(local_io_ctx, remote_io_ctx, local_image_ctx,
                                 local_image_name, remote_image_id,
                                 global_image_id, work_queue, timer, timer_lock,
                                 local_mirror_uuid, remote_mirror_uuid,
-                                journaler, client_meta, on_finish);
+                                journaler, client_meta, on_finish,
+                               progress_ctx);
   }
 
   BootstrapRequest(librados::IoCtx &local_io_ctx,
@@ -59,7 +65,8 @@ public:
                    SafeTimer *timer, Mutex *timer_lock,
                    const std::string &local_mirror_uuid,
                    const std::string &remote_mirror_uuid, Journaler *journaler,
-                   MirrorPeerClientMeta *client_meta, Context *on_finish);
+                   MirrorPeerClientMeta *client_meta, Context *on_finish,
+                  ProgressContext *progress_ctx = nullptr);
   ~BootstrapRequest();
 
   void send();
@@ -136,6 +143,7 @@ private:
   Journaler *m_journaler;
   MirrorPeerClientMeta *m_client_meta;
   Context *m_on_finish;
+  ProgressContext *m_progress_ctx;
 
   Tags m_remote_tags;
   cls::journal::Client m_client;
@@ -188,6 +196,8 @@ private:
   void finish(int r);
 
   bool decode_client_meta();
+
+  void update_progress(const std::string &description);
 };
 
 } // namespace image_replayer
index f627f17cca24bcb82713adc652492647d2532659..df41d8181de608ae05493def0bb9376edad0d02e 100644 (file)
@@ -3,9 +3,11 @@
 
 #include "ImageCopyRequest.h"
 #include "ObjectCopyRequest.h"
+#include "include/stringify.h"
 #include "common/errno.h"
 #include "journal/Journaler.h"
 #include "librbd/Utils.h"
+#include "tools/rbd_mirror/ProgressContext.h"
 
 #define dout_subsys ceph_subsys_rbd_mirror
 #undef dout_prefix
@@ -25,11 +27,12 @@ ImageCopyRequest<I>::ImageCopyRequest(I *local_image_ctx, I *remote_image_ctx,
                                       Journaler *journaler,
                                       MirrorPeerClientMeta *client_meta,
                                       MirrorPeerSyncPoint *sync_point,
-                                      Context *on_finish)
+                                      Context *on_finish,
+                                     ProgressContext *progress_ctx)
   : m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
     m_timer(timer), m_timer_lock(timer_lock), m_journaler(journaler),
     m_client_meta(client_meta), m_sync_point(sync_point),
-    m_on_finish(on_finish),
+    m_on_finish(on_finish), m_progress_ctx(progress_ctx),
     m_lock(unique_lock_name("ImageCopyRequest::m_lock", this)),
     m_client_meta_copy(*client_meta) {
   assert(!m_client_meta_copy.sync_points.empty());
@@ -74,6 +77,8 @@ void ImageCopyRequest<I>::send_update_max_object_count() {
     return;
   }
 
+  update_progress("UPDATE_MAX_OBJECT_COUNT");
+
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << ": sync_object_count=" << max_objects << dendl;
 
@@ -121,6 +126,8 @@ void ImageCopyRequest<I>::send_object_copies() {
   dout(20) << ": start_object=" << m_object_no << ", "
            << "end_object=" << m_end_object_no << dendl;
 
+  update_progress("COPY_OBJECT");
+
   bool complete;
   {
     Mutex::Locker locker(m_lock);
@@ -165,11 +172,13 @@ void ImageCopyRequest<I>::handle_object_copy(int r) {
   CephContext *cct = m_local_image_ctx->cct;
   ldout(cct, 20) << ": r=" << r << dendl;
 
+  int percent;
   bool complete;
   {
     Mutex::Locker locker(m_lock);
     assert(m_current_ops > 0);
     --m_current_ops;
+    percent = 100 * m_object_no / m_end_object_no;
 
     if (r < 0) {
       lderr(cct) << ": object copy failed: " << cpp_strerror(r) << dendl;
@@ -182,6 +191,8 @@ void ImageCopyRequest<I>::handle_object_copy(int r) {
     complete = (m_current_ops == 0);
   }
 
+  update_progress("COPY_OBJECT " + stringify(percent) + "%", false);
+
   if (complete) {
     send_flush_sync_point();
   }
@@ -194,6 +205,8 @@ void ImageCopyRequest<I>::send_flush_sync_point() {
     return;
   }
 
+  update_progress("FLUSH_SYNC_POINT");
+
   m_client_meta_copy = *m_client_meta;
   if (m_object_no > 0) {
     m_sync_point->object_number = m_object_no - 1;
@@ -287,6 +300,16 @@ int ImageCopyRequest<I>::compute_snap_map() {
   return 0;
 }
 
+template <typename I>
+void ImageCopyRequest<I>::update_progress(const std::string &description,
+                                         bool flush) {
+  dout(20) << ": " << description << dendl;
+
+  if (m_progress_ctx) {
+    m_progress_ctx->update_progress("IMAGE_COPY/" + description, flush);
+  }
+}
+
 } // namespace image_sync
 } // namespace mirror
 } // namespace rbd
index 0d1f5e3170aedfed3abd48eff4c793356ebe62b4..7198aaefac55ab8efd004376725c2b447cafbda6 100644 (file)
@@ -19,6 +19,9 @@ namespace librbd { struct ImageCtx; }
 
 namespace rbd {
 namespace mirror {
+
+class ProgressContext;
+
 namespace image_sync {
 
 template <typename ImageCtxT = librbd::ImageCtx>
@@ -30,6 +33,7 @@ public:
   typedef typename TypeTraits::Journaler Journaler;
   typedef librbd::journal::MirrorPeerSyncPoint MirrorPeerSyncPoint;
   typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+  typedef rbd::mirror::ProgressContext ProgressContext;
 
   static ImageCopyRequest* create(ImageCtxT *local_image_ctx,
                                   ImageCtxT *remote_image_ctx,
@@ -37,16 +41,18 @@ public:
                                   Journaler *journaler,
                                   MirrorPeerClientMeta *client_meta,
                                   MirrorPeerSyncPoint *sync_point,
-                                  Context *on_finish) {
+                                  Context *on_finish,
+                                 ProgressContext *progress_ctx = nullptr) {
     return new ImageCopyRequest(local_image_ctx, remote_image_ctx, timer,
                                 timer_lock, journaler, client_meta, sync_point,
-                                on_finish);
+                                on_finish, progress_ctx);
   }
 
   ImageCopyRequest(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
                    SafeTimer *timer, Mutex *timer_lock, Journaler *journaler,
                    MirrorPeerClientMeta *client_meta,
-                   MirrorPeerSyncPoint *sync_point, Context *on_finish);
+                   MirrorPeerSyncPoint *sync_point, Context *on_finish,
+                  ProgressContext *progress_ctx = nullptr);
 
   void send();
   void cancel();
@@ -82,6 +88,7 @@ private:
   MirrorPeerClientMeta *m_client_meta;
   MirrorPeerSyncPoint *m_sync_point;
   Context *m_on_finish;
+  ProgressContext *m_progress_ctx;
 
   SnapMap m_snap_map;
 
@@ -109,6 +116,7 @@ private:
 
   int compute_snap_map();
 
+  void update_progress(const std::string &description, bool flush = true);
 };
 
 } // namespace image_sync