]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: async methods to enable/disable mirroring
authorMykola Golub <mgolub@mirantis.com>
Wed, 31 Aug 2016 19:44:24 +0000 (22:44 +0300)
committerMykola Golub <mgolub@mirantis.com>
Wed, 28 Sep 2016 12:17:19 +0000 (15:17 +0300)
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
src/cls/rbd/cls_rbd_types.h
src/librbd/CMakeLists.txt
src/librbd/mirror/DisableRequest.cc [new file with mode: 0644]
src/librbd/mirror/DisableRequest.h [new file with mode: 0644]
src/librbd/mirror/EnableRequest.cc [new file with mode: 0644]
src/librbd/mirror/EnableRequest.h [new file with mode: 0644]

index 83030ba09a2af3a4d65b3a2aa10a0eac627d1ba6..547e9963250546f7a81cb3eb5967f24165f92225 100644 (file)
@@ -80,7 +80,8 @@ WRITE_CLASS_ENCODER(MirrorPeer);
 
 enum MirrorImageState {
   MIRROR_IMAGE_STATE_DISABLING = 0,
-  MIRROR_IMAGE_STATE_ENABLED   = 1
+  MIRROR_IMAGE_STATE_ENABLED   = 1,
+  MIRROR_IMAGE_STATE_DISABLED  = 2,
 };
 
 struct MirrorImage {
index 81d2d6490702ebd8bf76f340722fb9471501b0ab..840677361da94ef2049d839c7525233c18adc7bc 100644 (file)
@@ -48,6 +48,8 @@ set(librbd_internal_srcs
   journal/CreateRequest.cc
   journal/Replay.cc
   journal/StandardPolicy.cc
+  mirror/DisableRequest.cc
+  mirror/EnableRequest.cc
   object_map/CreateRequest.cc
   object_map/InvalidateRequest.cc
   object_map/LockRequest.cc
diff --git a/src/librbd/mirror/DisableRequest.cc b/src/librbd/mirror/DisableRequest.cc
new file mode 100644 (file)
index 0000000..49d58fe
--- /dev/null
@@ -0,0 +1,458 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/mirror/DisableRequest.h"
+#include "common/WorkQueue.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "cls/journal/cls_journal_client.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "journal/Journaler.h"
+#include "librbd/ImageState.h"
+#include "librbd/Journal.h"
+#include "librbd/MirroringWatcher.h"
+#include "librbd/Operations.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::mirror::DisableRequest: "
+
+namespace librbd {
+namespace mirror {
+
+using util::create_rados_ack_callback;
+
+template <typename I>
+DisableRequest<I>::DisableRequest(I *image_ctx, bool force,
+                                              bool remove,  Context *on_finish)
+  : m_image_ctx(image_ctx), m_force(force), m_remove(remove),
+    m_on_finish(on_finish), m_lock("mirror::DisableRequest::m_lock") {
+}
+
+template <typename I>
+void DisableRequest<I>::send() {
+  send_get_mirror_image();
+}
+
+template <typename I>
+void DisableRequest<I>::send_get_mirror_image() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  librados::ObjectReadOperation op;
+  cls_client::mirror_image_get_start(&op, m_image_ctx->id);
+
+  using klass = DisableRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_get_mirror_image>(this);
+  m_out_bl.clear();
+  int r = m_image_ctx->md_ctx.aio_operate(RBD_MIRRORING, comp, &op, &m_out_bl);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_get_mirror_image(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result == 0) {
+    bufferlist::iterator iter = m_out_bl.begin();
+    *result = cls_client::mirror_image_get_finish(&iter, &m_mirror_image);
+  }
+
+  if (*result < 0) {
+    if (*result == -ENOENT) {
+      ldout(cct, 20) << this << " " << __func__
+                     << ": mirroring is not enabled for this image" << dendl;
+      *result = 0;
+    } else if (*result == -EOPNOTSUPP) {
+      ldout(cct, 5) << this << " " << __func__
+                    << ": mirroring is not supported by OSD" << dendl;
+    } else {
+      lderr(cct) << "failed to retreive mirror image: " << cpp_strerror(*result)
+                 << dendl;
+    }
+    return m_on_finish;
+  }
+
+  send_get_tag_owner();
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_get_tag_owner() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = DisableRequest<I>;
+  Context *ctx = util::create_context_callback<
+      klass, &klass::handle_get_tag_owner>(this);
+
+  Journal<>::is_tag_owner(m_image_ctx, &m_is_primary, ctx);
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_get_tag_owner(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to check tag ownership: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  if (!m_is_primary && !m_force) {
+    lderr(cct) << "mirrored image is not primary, "
+               << "add force option to disable mirroring" << dendl;
+    *result = -EINVAL;
+    return m_on_finish;
+  }
+
+  m_mirror_image.state = cls::rbd::MIRROR_IMAGE_STATE_DISABLING;
+  send_set_mirror_image();
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_set_mirror_image() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::mirror_image_set(&op, m_image_ctx->id, m_mirror_image);
+
+  using klass = DisableRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_set_mirror_image>(this);
+  m_out_bl.clear();
+  int r = m_image_ctx->md_ctx.aio_operate(RBD_MIRRORING, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_set_mirror_image(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to disable mirroring: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  send_notify_mirroring_watcher();
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_notify_mirroring_watcher() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = DisableRequest<I>;
+  Context *ctx = util::create_context_callback<
+    klass, &klass::handle_notify_mirroring_watcher>(this);
+
+  MirroringWatcher<>::notify_image_updated(m_image_ctx->md_ctx,
+                                           cls::rbd::MIRROR_IMAGE_STATE_DISABLING,
+                                           m_image_ctx->id,
+                                           m_mirror_image.global_image_id, ctx);
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_notify_mirroring_watcher(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to send update notification: "
+               << cpp_strerror(*result) << dendl;
+    *result = 0;
+  }
+
+  send_get_clients();
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_get_clients() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = DisableRequest<I>;
+  Context *ctx = util::create_context_callback<
+    klass, &klass::handle_get_clients>(this);
+
+  std::string header_oid = ::journal::Journaler::header_oid(m_image_ctx->id);
+  m_clients.clear();
+  cls::journal::client::client_list(m_image_ctx->md_ctx, header_oid, &m_clients,
+                                    ctx);
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_get_clients(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to get registered clients: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_current_ops.empty());
+
+  for (auto client : m_clients) {
+    journal::ClientData client_data;
+    bufferlist::iterator bl_it = client.data.begin();
+    try {
+      ::decode(client_data, bl_it);
+    } catch (const buffer::error &err) {
+      lderr(cct) << "failed to decode client data" << dendl;
+      m_error_result = -EBADMSG;
+      continue;
+    }
+
+    journal::ClientMetaType type = client_data.get_client_meta_type();
+    if (type != journal::ClientMetaType::MIRROR_PEER_CLIENT_META_TYPE) {
+      continue;
+    }
+
+    if (m_current_ops.find(client.id) != m_current_ops.end()) {
+      // Should not happen.
+      lderr(cct) << this << " " << __func__ << ": clients with the same id "
+                 << client.id << dendl;
+      continue;
+    }
+
+    m_current_ops[client.id] = 0;
+    m_ret[client.id] = 0;
+
+    journal::MirrorPeerClientMeta client_meta =
+      boost::get<journal::MirrorPeerClientMeta>(client_data.client_meta);
+
+    for (const auto& sync : client_meta.sync_points) {
+      send_remove_snap(client.id, sync.snap_name);
+    }
+
+    if (m_current_ops[client.id] == 0) {
+      // no snaps to remove
+      send_unregister_client(client.id);
+    }
+  }
+
+  if (m_current_ops.empty()) {
+    if (m_error_result < 0) {
+      *result = m_error_result;
+      return m_on_finish;
+    }
+    // no mirror clients to unregister
+    send_remove_mirror_image();
+  }
+
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_remove_snap(const std::string &client_id,
+                                               const std::string &snap_name) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": client_id=" << client_id
+                 << ", snap_name=" << snap_name << dendl;
+
+  assert(m_lock.is_locked());
+
+  m_current_ops[client_id]++;
+
+  Context *ctx = create_context_callback(
+    &DisableRequest<I>::handle_remove_snap, client_id);
+
+  ctx = new FunctionContext([this, snap_name, ctx](int r) {
+      RWLock::WLocker owner_locker(m_image_ctx->owner_lock);
+      m_image_ctx->operations->execute_snap_remove(snap_name.c_str(), ctx);
+    });
+
+  m_image_ctx->op_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_remove_snap(int *result,
+    const std::string &client_id) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  assert(m_current_ops[client_id] > 0);
+  m_current_ops[client_id]--;
+
+  if (*result < 0 && *result != -ENOENT) {
+    lderr(cct) <<
+      "failed to remove temporary snapshot created by remote peer: "
+               << cpp_strerror(*result) << dendl;
+    m_ret[client_id] = *result;
+  }
+
+  if (m_current_ops[client_id] == 0) {
+    send_unregister_client(client_id);
+  }
+
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_unregister_client(
+  const std::string &client_id) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  assert(m_lock.is_locked());
+  assert(m_current_ops[client_id] == 0);
+
+  Context *ctx = create_context_callback(
+    &DisableRequest<I>::handle_unregister_client, client_id);
+
+  if (m_ret[client_id] < 0) {
+    m_image_ctx->op_work_queue->queue(ctx, m_ret[client_id]);
+    return;
+  }
+
+  librados::ObjectWriteOperation op;
+  cls::journal::client::client_unregister(&op, client_id);
+  std::string header_oid = ::journal::Journaler::header_oid(m_image_ctx->id);
+  librados::AioCompletion *comp = create_rados_ack_callback(ctx);
+
+  int r = m_image_ctx->md_ctx.aio_operate(header_oid, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_unregister_client(
+  int *result, const std::string &client_id) {
+
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_current_ops[client_id] == 0);
+  m_current_ops.erase(client_id);
+
+  if (*result < 0 && *result != -ENOENT) {
+    lderr(cct) << "failed to unregister remote journal client: "
+               << cpp_strerror(*result) << dendl;
+    m_error_result = *result;
+  }
+
+  if (!m_current_ops.empty()) {
+    return nullptr;
+  }
+
+  if (m_error_result < 0) {
+    *result = m_error_result;
+    return m_on_finish;
+  }
+
+  if (!m_remove) {
+    return m_on_finish;
+  }
+
+  send_get_clients();
+  return nullptr;
+}
+
+template <typename I>
+void DisableRequest<I>::send_remove_mirror_image() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::mirror_image_remove(&op, m_image_ctx->id);
+
+  using klass = DisableRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_remove_mirror_image>(this);
+  m_out_bl.clear();
+  int r = m_image_ctx->md_ctx.aio_operate(RBD_MIRRORING, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_remove_mirror_image(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result == -ENOENT) {
+    *result = 0;
+  }
+
+  if (*result < 0) {
+    lderr(cct) << "failed to remove mirror image: " << cpp_strerror(*result)
+               << dendl;
+  }
+
+  ldout(cct, 20) << this << " " << __func__
+                 <<  ": removed image state from rbd_mirroring object" << dendl;
+
+  if (m_is_primary) {
+    send_notify_mirroring_watcher_removed();
+    return nullptr;
+  }
+
+  return m_on_finish;
+}
+
+template <typename I>
+void DisableRequest<I>::send_notify_mirroring_watcher_removed() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = DisableRequest<I>;
+  Context *ctx = util::create_context_callback<
+    klass, &klass::handle_notify_mirroring_watcher_removed>(this);
+
+  MirroringWatcher<>::notify_image_updated(
+    m_image_ctx->md_ctx, cls::rbd::MIRROR_IMAGE_STATE_DISABLED, m_image_ctx->id,
+    m_mirror_image.global_image_id, ctx);
+}
+
+template <typename I>
+Context *DisableRequest<I>::handle_notify_mirroring_watcher_removed(
+  int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to send update notification: "
+               << cpp_strerror(*result) << dendl;
+    *result = 0;
+  }
+
+  return m_on_finish;
+}
+
+template <typename I>
+Context *DisableRequest<I>::create_context_callback(
+  Context*(DisableRequest<I>::*handle)(int*, const std::string &client_id),
+  const std::string &client_id) {
+
+  return new FunctionContext([this, handle, client_id](int r) {
+      Context *on_finish = (this->*handle)(&r, client_id);
+      if (on_finish != nullptr) {
+        on_finish->complete(r);
+        delete this;
+      }
+    });
+}
+
+} // namespace mirror
+} // namespace librbd
+
+template class librbd::mirror::DisableRequest<librbd::ImageCtx>;
diff --git a/src/librbd/mirror/DisableRequest.h b/src/librbd/mirror/DisableRequest.h
new file mode 100644 (file)
index 0000000..0b1ed12
--- /dev/null
@@ -0,0 +1,133 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRROR_DISABLE_REQUEST_H
+#define CEPH_LIBRBD_MIRROR_DISABLE_REQUEST_H
+
+#include "include/buffer.h"
+#include "common/Mutex.h"
+#include "cls/journal/cls_journal_types.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include <map>
+#include <string>
+
+class Context;
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace mirror {
+
+template <typename ImageCtxT = ImageCtx>
+class DisableRequest {
+public:
+  static DisableRequest *create(ImageCtxT *image_ctx, bool force,
+                                bool remove, Context *on_finish) {
+    return new DisableRequest(image_ctx, force, remove, on_finish);
+  }
+
+  void send();
+
+private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * GET_MIRROR_IMAGE * * * * * * * * * * * * * * * * * * * * * * *
+   *    |                                                         *
+   *    v                                                         *
+   * GET_TAG_OWNER  * * * * * * * * * * * * * * * * * * * * * * * *
+   *    |                                                         *
+   *    v                                                         *
+   * SET_MIRROR_IMAGE * * * * * * * * * * * * * * * * * * * * * * *
+   *    |                                                         *
+   *    v                                                         *
+   * NOTIFY_MIRRORING_WATCHER                                     *
+   *    |                                                         *
+   *    v                                                         *
+   * GET_CLIENTS <----------------------------------------\ * * * *
+   *    |     | (unregister clients)                      |       *  (on error)
+   *    |     |/----------------------------\             |       *
+   *    |     |                             |             |       *
+   *    |     |   /-----------\ (repeat     | (repeat     | (repeat
+   *    |     |   |           |  as needed) |  as needed) |  as needed)
+   *    |     v   v           |             |             |       *
+   *    |  REMOVE_SYNC_SNAP --/ * * * * * * | * * * * * * | * * * *
+   *    |     |                             |             |       *
+   *    |     v                             |             |       *
+   *    |  UNREGISTER_CLIENT ---------------/-------------/ * * * *
+   *    |                                                         *
+   *    | (no more clients                                        *
+   *    |  to unregister)                                         *
+   *    v                                                         *
+   * REMOVE_MIRROR_IMAGE  * * * * * * * * * * * * * * * * * * * * *
+   *    |         (skip if no remove)                             *
+   *    v                                                         *
+   * NOTIFY_MIRRORING_WATCHER_REMOVED                             *
+   *    |         (skip if not primary or no remove)              *
+   *    v                                                         *
+   * <finish> < * * * * * * * * * * * * * * * * * * * * * * * * * *
+   *
+   * @endverbatim
+   */
+
+  DisableRequest(ImageCtxT *image_ctx, bool force, bool remove,
+                 Context *on_finish);
+
+  ImageCtxT *m_image_ctx;
+  bool m_force;
+  bool m_remove;
+  Context *m_on_finish;
+
+  bool m_is_primary = false;
+  bufferlist m_out_bl;
+  cls::rbd::MirrorImage m_mirror_image;
+  std::set<cls::journal::Client> m_clients;
+  std::map<std::string, int> m_ret;
+  std::map<std::string, int> m_current_ops;
+  int m_error_result = 0;
+  mutable Mutex m_lock;
+
+  void send_get_mirror_image();
+  Context *handle_get_mirror_image(int *result);
+
+  void send_get_tag_owner();
+  Context *handle_get_tag_owner(int *result);
+
+  void send_set_mirror_image();
+  Context *handle_set_mirror_image(int *result);
+
+  void send_notify_mirroring_watcher();
+  Context *handle_notify_mirroring_watcher(int *result);
+
+  void send_get_clients();
+  Context *handle_get_clients(int *result);
+
+  void send_remove_snap(const std::string &client_id,
+                        const std::string &snap_name);
+  Context *handle_remove_snap(int *result, const std::string &client_id);
+
+  void send_unregister_client(const std::string &client_id);
+  Context *handle_unregister_client(int *result, const std::string &client_id);
+
+  void send_remove_mirror_image();
+  Context *handle_remove_mirror_image(int *result);
+
+  void send_notify_mirroring_watcher_removed();
+  Context *handle_notify_mirroring_watcher_removed(int *result);
+
+  Context *create_context_callback(
+    Context*(DisableRequest<ImageCtxT>::*handle)(
+      int*, const std::string &client_id),
+    const std::string &client_id);
+};
+
+} // namespace mirror
+} // namespace librbd
+
+extern template class librbd::mirror::DisableRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIRROR_DISABLE_REQUEST_H
diff --git a/src/librbd/mirror/EnableRequest.cc b/src/librbd/mirror/EnableRequest.cc
new file mode 100644 (file)
index 0000000..a9b03a2
--- /dev/null
@@ -0,0 +1,185 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/mirror/EnableRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/ImageState.h"
+#include "librbd/Journal.h"
+#include "librbd/MirroringWatcher.h"
+#include "librbd/Utils.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::mirror::EnableRequest: "
+
+namespace librbd {
+namespace mirror {
+
+using util::create_context_callback;
+using util::create_rados_ack_callback;
+
+template <typename I>
+EnableRequest<I>::EnableRequest(I *image_ctx, Context *on_finish)
+  : m_io_ctx(&image_ctx->md_ctx), m_image_ctx(image_ctx),
+    m_on_finish(on_finish) {
+}
+
+template <typename I>
+void EnableRequest<I>::send() {
+  send_get_tag_owner();
+}
+
+template <typename I>
+void EnableRequest<I>::send_get_tag_owner() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = EnableRequest<I>;
+  Context *ctx = create_context_callback<
+      klass, &klass::handle_get_tag_owner>(this);
+
+  librbd::Journal<>::is_tag_owner(m_image_ctx, &m_is_primary, ctx);
+}
+
+template <typename I>
+Context *EnableRequest<I>::handle_get_tag_owner(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to check tag ownership: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  if (!m_is_primary) {
+    lderr(cct) << "last journal tag not owned by local cluster" << dendl;
+    *result = -EINVAL;
+    return m_on_finish;
+  }
+
+  send_get_mirror_image();
+  return nullptr;
+}
+
+template <typename I>
+void EnableRequest<I>::send_get_mirror_image() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  librados::ObjectReadOperation op;
+  cls_client::mirror_image_get_start(&op, m_image_ctx->id);
+
+  using klass = EnableRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_get_mirror_image>(this);
+  m_out_bl.clear();
+  int r = m_image_ctx->md_ctx.aio_operate(RBD_MIRRORING, comp, &op, &m_out_bl);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *EnableRequest<I>::handle_get_mirror_image(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result == 0) {
+    bufferlist::iterator iter = m_out_bl.begin();
+    *result = cls_client::mirror_image_get_finish(&iter, &m_mirror_image);
+  }
+
+  if (*result == 0) {
+    if (m_mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
+      ldout(cct, 10) << this << " " << __func__
+                     << ": mirroring is already enabled" << dendl;
+    } else {
+      lderr(cct) << "currently disabling" << dendl;
+      *result = -EINVAL;
+    }
+    return m_on_finish;
+  }
+
+  if (*result != -ENOENT) {
+    lderr(cct) << "failed to retreive mirror image: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  *result = 0;
+  m_mirror_image.state = cls::rbd::MIRROR_IMAGE_STATE_ENABLED;
+  uuid_d uuid_gen;
+  uuid_gen.generate_random();
+  m_mirror_image.global_image_id = uuid_gen.to_string();
+
+  send_set_mirror_image();
+  return nullptr;
+}
+
+template <typename I>
+void EnableRequest<I>::send_set_mirror_image() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::mirror_image_set(&op, m_image_ctx->id, m_mirror_image);
+
+  using klass = EnableRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_set_mirror_image>(this);
+  m_out_bl.clear();
+  int r = m_image_ctx->md_ctx.aio_operate(RBD_MIRRORING, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template <typename I>
+Context *EnableRequest<I>::handle_set_mirror_image(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to enable mirroring: " << cpp_strerror(*result)
+               << dendl;
+    return m_on_finish;
+  }
+
+  send_notify_mirroring_watcher();
+  return nullptr;
+}
+
+template <typename I>
+void EnableRequest<I>::send_notify_mirroring_watcher() {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << dendl;
+
+  using klass = EnableRequest<I>;
+  Context *ctx = create_context_callback<
+    klass, &klass::handle_notify_mirroring_watcher>(this);
+
+  MirroringWatcher<>::notify_image_updated(m_image_ctx->md_ctx,
+                                           cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
+                                           m_image_ctx->id,
+                                           m_mirror_image.global_image_id, ctx);
+}
+
+template <typename I>
+Context *EnableRequest<I>::handle_notify_mirroring_watcher(int *result) {
+  CephContext *cct = m_image_ctx->cct;
+  ldout(cct, 10) << this << " " << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(cct) << "failed to send update notification: "
+               << cpp_strerror(*result) << dendl;
+    *result = 0;
+  }
+
+  return m_on_finish;
+}
+
+} // namespace mirror
+} // namespace librbd
+
+template class librbd::mirror::EnableRequest<librbd::ImageCtx>;
diff --git a/src/librbd/mirror/EnableRequest.h b/src/librbd/mirror/EnableRequest.h
new file mode 100644 (file)
index 0000000..8860365
--- /dev/null
@@ -0,0 +1,84 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_MIRROR_ENABLE_REQUEST_H
+#define CEPH_LIBRBD_MIRROR_ENABLE_REQUEST_H
+
+#include "include/buffer.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include <map>
+#include <string>
+
+class Context;
+
+namespace librados { class IoCtx; }
+
+namespace librbd {
+
+class ImageCtx;
+
+namespace mirror {
+
+template <typename ImageCtxT = ImageCtx>
+class EnableRequest {
+public:
+  static EnableRequest *create(ImageCtxT *image_ctx, Context *on_finish) {
+    return new EnableRequest(image_ctx, on_finish);
+  }
+
+  void send();
+
+private:
+  /**
+   * @verbatim
+   *
+   * <start>
+   *    |
+   *    v
+   * GET_TAG_OWNER  * * * * * * * *
+   *    |                         *
+   *    v                         *
+   * GET_MIRROR_IMAGE * * * * * * *
+   *    |                         * (on error)
+   *    v                         *
+   * SET_MIRROR_IMAGE * * * * * * *
+   *    |                         *
+   *    v                         *
+   * NOTIFY_MIRRORING_WATCHER * * *
+   *    |                         *
+   *    v                         *
+   * <finish>   < * * * * * * * * *
+   *
+   * @endverbatim
+   */
+
+  EnableRequest(ImageCtxT *image_ctx, Context *on_finish);
+
+  librados::IoCtx *m_io_ctx = nullptr;
+  std::string m_image_id;
+  ImageCtxT *m_image_ctx = nullptr;
+  Context *m_on_finish;
+
+  bool m_is_primary = false;
+  bufferlist m_out_bl;
+  cls::rbd::MirrorImage m_mirror_image;
+
+  void send_get_tag_owner();
+  Context *handle_get_tag_owner(int *result);
+
+  void send_get_mirror_image();
+  Context *handle_get_mirror_image(int *result);
+
+  void send_set_mirror_image();
+  Context *handle_set_mirror_image(int *result);
+
+  void send_notify_mirroring_watcher();
+  Context *handle_notify_mirroring_watcher(int *result);
+};
+
+} // namespace mirror
+} // namespace librbd
+
+extern template class librbd::mirror::EnableRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_MIRROR_ENABLE_REQUEST_H