]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: asynchronous image creation state machine
authorVenky Shankar <vshankar@redhat.com>
Thu, 2 Jun 2016 17:14:21 +0000 (22:44 +0530)
committerVenky Shankar <vshankar@redhat.com>
Sun, 7 Aug 2016 11:01:37 +0000 (16:31 +0530)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/cls/rbd/cls_rbd_client.cc
src/cls/rbd/cls_rbd_client.h
src/librbd/CMakeLists.txt
src/librbd/Makefile.am
src/librbd/image/CreateRequest.cc [new file with mode: 0644]
src/librbd/image/CreateRequest.h [new file with mode: 0644]

index a8fd5a1a6670a389c6b21273e1f3e4ac0ff66ee2..b49fe6a70a9cfba80168714f7d31faaeb6f481cb 100644 (file)
@@ -149,6 +149,18 @@ namespace librbd {
                                          parent);
     }
 
+    void create_image(librados::ObjectWriteOperation *op, uint64_t size, uint8_t order,
+                      uint64_t features, const std::string &object_prefix)
+    {
+      bufferlist bl;
+      ::encode(size, bl);
+      ::encode(order, bl);
+      ::encode(features, bl);
+      ::encode(object_prefix, (bl));
+
+      op->exec("rbd", "create", bl);
+    }
+
     int create_image(librados::IoCtx *ioctx, const std::string &oid,
                     uint64_t size, uint8_t order, uint64_t features,
                     const std::string &object_prefix)
@@ -723,6 +735,16 @@ namespace librbd {
       return get_stripe_unit_count_finish(&it, stripe_unit, stripe_count);
     }
 
+    void set_stripe_unit_count(librados::ObjectWriteOperation *op,
+                              uint64_t stripe_unit, uint64_t stripe_count)
+    {
+      bufferlist bl;
+      ::encode(stripe_unit, bl);
+      ::encode(stripe_count, bl);
+
+      op->exec("rbd", "set_stripe_unit_count", bl);
+    }
+
     int set_stripe_unit_count(librados::IoCtx *ioctx, const std::string &oid,
                              uint64_t stripe_unit, uint64_t stripe_count)
     {
@@ -732,7 +754,6 @@ namespace librbd {
       return ioctx->exec(oid, "rbd", "set_stripe_unit_count", in, out);
     }
 
-
     /************************ rbd_id object methods ************************/
 
     void get_id_start(librados::ObjectReadOperation *op) {
@@ -764,6 +785,13 @@ namespace librbd {
       return get_id_finish(&it, id);
     }
 
+    void set_id(librados::ObjectWriteOperation *op, const std::string id)
+    {
+      bufferlist bl;
+      ::encode(id, bl);
+      op->exec("rbd", "set_id", bl);
+    }
+
     int set_id(librados::IoCtx *ioctx, const std::string &oid, std::string id)
     {
       bufferlist in, out;
@@ -844,6 +872,15 @@ namespace librbd {
       return 0;
     }
 
+    void dir_add_image(librados::ObjectWriteOperation *op,
+                      const std::string &name, const std::string &id)
+    {
+      bufferlist bl;
+      ::encode(name, bl);
+      ::encode(id, bl);
+      op->exec("rbd", "dir_add_image", bl);
+    }
+
     int dir_add_image(librados::IoCtx *ioctx, const std::string &oid,
                      const std::string &name, const std::string &id)
     {
@@ -862,6 +899,16 @@ namespace librbd {
       return ioctx->exec(oid, "rbd", "dir_remove_image", in, out);
     }
 
+    void dir_remove_image(librados::ObjectWriteOperation *op,
+                         const std::string &name, const std::string &id)
+    {
+      bufferlist bl;
+      ::encode(name, bl);
+      ::encode(id, bl);
+
+      op->exec("rbd", "dir_remove_image", bl);
+    }
+
     void dir_rename_image(librados::ObjectWriteOperation *op,
                         const std::string &src, const std::string &dest,
                         const std::string &id)
@@ -1061,6 +1108,24 @@ namespace librbd {
       return 0;
     }
 
+    void mirror_mode_get_start(librados::ObjectReadOperation *op) {
+      bufferlist bl;
+      op->exec("rbd", "mirror_mode_get", bl);
+    }
+
+    int mirror_mode_get_finish(bufferlist::iterator *it,
+                              cls::rbd::MirrorMode *mirror_mode) {
+      try {
+       uint32_t mirror_mode_decode;
+       ::decode(mirror_mode_decode, *it);
+       *mirror_mode = static_cast<cls::rbd::MirrorMode>(mirror_mode_decode);
+      } catch (const buffer::error &err) {
+       return -EBADMSG;
+      }
+
+      return 0;
+    }
+
     int mirror_mode_get(librados::IoCtx *ioctx,
                         cls::rbd::MirrorMode *mirror_mode) {
       bufferlist in_bl;
@@ -1274,6 +1339,16 @@ namespace librbd {
       return 0;
     }
 
+    void mirror_image_set(librados::ObjectWriteOperation *op,
+                         const std::string &image_id,
+                         const cls::rbd::MirrorImage &mirror_image) {
+      bufferlist bl;
+      ::encode(image_id, bl);
+      ::encode(mirror_image, bl);
+
+      op->exec("rbd", "mirror_image_set", bl);
+    }
+
     int mirror_image_set(librados::IoCtx *ioctx, const std::string &image_id,
                         const cls::rbd::MirrorImage &mirror_image) {
       bufferlist in_bl;
@@ -1289,6 +1364,14 @@ namespace librbd {
       return 0;
     }
 
+    void mirror_image_remove(librados::ObjectWriteOperation *op,
+                            const std::string &image_id) {
+      bufferlist bl;
+      ::encode(image_id, bl);
+
+      op->exec("rbd", "mirror_image_remove", bl);
+    }
+
     int mirror_image_remove(librados::IoCtx *ioctx, const std::string &image_id) {
       bufferlist in_bl;
       ::encode(image_id, in_bl);
index e073dfdf659394f7432a68f8dcfd74e125957ed5..40d5eb3382a4e405a8ec36897d3f765816bfe0a4 100644 (file)
@@ -47,6 +47,8 @@ namespace librbd {
                             parent_info *parent);
 
     // low-level interface (mainly for testing)
+    void create_image(librados::ObjectWriteOperation *op, uint64_t size, uint8_t order,
+                     uint64_t features, const std::string &object_prefix);
     int create_image(librados::IoCtx *ioctx, const std::string &oid,
                     uint64_t size, uint8_t order, uint64_t features,
                     const std::string &object_prefix);
@@ -136,6 +138,8 @@ namespace librbd {
     int get_stripe_unit_count(librados::IoCtx *ioctx, const std::string &oid,
                              uint64_t *stripe_unit, uint64_t *stripe_count);
 
+    void set_stripe_unit_count(librados::ObjectWriteOperation *op,
+                              uint64_t stripe_unit, uint64_t stripe_count);
     int set_stripe_unit_count(librados::IoCtx *ioctx, const std::string &oid,
                              uint64_t stripe_unit, uint64_t stripe_count);
     int metadata_list(librados::IoCtx *ioctx, const std::string &oid,
@@ -157,6 +161,7 @@ namespace librbd {
     int get_id_finish(bufferlist::iterator *it, std::string *id);
     int get_id(librados::IoCtx *ioctx, const std::string &oid, std::string *id);
 
+    void set_id(librados::ObjectWriteOperation *op, std::string id);
     int set_id(librados::IoCtx *ioctx, const std::string &oid, std::string id);
 
     // operations on rbd_directory objects
@@ -170,10 +175,14 @@ namespace librbd {
     int dir_list(librados::IoCtx *ioctx, const std::string &oid,
                 const std::string &start, uint64_t max_return,
                 map<string, string> *images);
+    void dir_add_image(librados::ObjectWriteOperation *op,
+                      const std::string &name, const std::string &id);
     int dir_add_image(librados::IoCtx *ioctx, const std::string &oid,
                      const std::string &name, const std::string &id);
     int dir_remove_image(librados::IoCtx *ioctx, const std::string &oid,
                         const std::string &name, const std::string &id);
+    void dir_remove_image(librados::ObjectWriteOperation *op,
+                         const std::string &name, const std::string &id);
     // atomic remove and add
     void dir_rename_image(librados::ObjectWriteOperation *op,
                          const std::string &src, const std::string &dest,
@@ -219,6 +228,9 @@ namespace librbd {
     // operations on the rbd_mirroring object
     int mirror_uuid_get(librados::IoCtx *ioctx, std::string *uuid);
     int mirror_uuid_set(librados::IoCtx *ioctx, const std::string &uuid);
+    void mirror_mode_get_start(librados::ObjectReadOperation *op);
+    int mirror_mode_get_finish(bufferlist::iterator *it,
+                              cls::rbd::MirrorMode *mirror_mode);
     int mirror_mode_get(librados::IoCtx *ioctx,
                         cls::rbd::MirrorMode *mirror_mode);
     int mirror_mode_set(librados::IoCtx *ioctx,
@@ -253,8 +265,13 @@ namespace librbd {
                                 const std::string &image_id);
     int mirror_image_get_finish(bufferlist::iterator *iter,
                                cls::rbd::MirrorImage *mirror_image);
+    void mirror_image_set(librados::ObjectWriteOperation *op,
+                         const std::string &image_id,
+                         const cls::rbd::MirrorImage &mirror_image);
     int mirror_image_set(librados::IoCtx *ioctx, const std::string &image_id,
                         const cls::rbd::MirrorImage &mirror_image);
+    void mirror_image_remove(librados::ObjectWriteOperation *op,
+                            const std::string &image_id);
     int mirror_image_remove(librados::IoCtx *ioctx,
                            const std::string &image_id);
     int mirror_image_status_set(librados::IoCtx *ioctx,
index dea8f9fee6f42f5f5aa8256d009e2500fb7e35eb..974dcac94b715a5d3447fe5339bb1b3747eeaf3b 100644 (file)
@@ -30,6 +30,7 @@ set(librbd_internal_srcs
   exclusive_lock/ReleaseRequest.cc
   exclusive_lock/StandardPolicy.cc
   image/CloseRequest.cc
+  image/CreateRequest.cc
   image/OpenRequest.cc
   image/RefreshParentRequest.cc
   image/RefreshRequest.cc
index d1075cca40496ecf8153c53d27ed857d3a0f34d4..7c0981962761b48ec9c4c72b617f4d738ec523b9 100644 (file)
@@ -35,6 +35,7 @@ librbd_internal_la_SOURCES = \
        librbd/exclusive_lock/ReleaseRequest.cc \
        librbd/exclusive_lock/StandardPolicy.cc \
        librbd/image/CloseRequest.cc \
+       librbd/image/CreateRequest.cc \
        librbd/image/OpenRequest.cc \
        librbd/image/RefreshParentRequest.cc \
        librbd/image/RefreshRequest.cc \
@@ -125,6 +126,7 @@ noinst_HEADERS += \
        librbd/exclusive_lock/ReleaseRequest.h \
        librbd/exclusive_lock/StandardPolicy.h \
        librbd/image/CloseRequest.h \
+       librbd/image/CreateRequest.h \
        librbd/image/OpenRequest.h \
        librbd/image/RefreshParentRequest.h \
        librbd/image/RefreshRequest.h \
diff --git a/src/librbd/image/CreateRequest.cc b/src/librbd/image/CreateRequest.cc
new file mode 100644 (file)
index 0000000..7a9daaf
--- /dev/null
@@ -0,0 +1,734 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/image/CreateRequest.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "cls/rbd/cls_rbd_client.h"
+#include "include/assert.h"
+#include "librbd/Utils.h"
+#include "common/ceph_context.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/Journal.h"
+#include "librbd/journal/CreateRequest.h"
+#include "librbd/journal/RemoveRequest.h"
+#include "journal/Journaler.h"
+#include "librbd/MirroringWatcher.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::image::CreateRequest: "
+
+namespace librbd {
+namespace image {
+
+using util::create_rados_ack_callback;
+using util::create_context_callback;
+
+namespace {
+
+bool validate_features(CephContext *cct, uint64_t features, bool force_non_primary) {
+  if ((features & RBD_FEATURE_FAST_DIFF) != 0 &&
+      (features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    lderr(cct) << "cannot use fast diff without object map" << dendl;
+    return false;
+  }
+  if ((features & RBD_FEATURE_OBJECT_MAP) != 0 &&
+      (features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+    lderr(cct) << "cannot use object map without exclusive lock" << dendl;
+    return false;
+  }
+  if ((features & RBD_FEATURE_JOURNALING) != 0) {
+    if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+      lderr(cct) << "cannot use journaling without exclusive lock" << dendl;
+      return false;
+    }
+  } else if (force_non_primary) {
+    assert(false);
+  }
+
+  return true;
+}
+
+bool validate_layout(CephContext *cct, uint64_t size, file_layout_t &layout) {
+  if (!librbd::ObjectMap::is_compatible(layout, size)) {
+    lderr(cct) << "image size not compatible with object map" << dendl;
+    return false;
+  }
+
+  return true;
+}
+
+} // anonymous namespace
+
+// TODO: do away with @m_op_work_queue
+// This is used as a temporary measure to execute synchronous calls in
+// worker thread (see callers of ->queue()). Once everything is made
+// fully asynchronous this can be done away with.
+template<typename I>
+CreateRequest<I>::CreateRequest(IoCtx &ioctx, std::string &imgname, std::string &imageid,
+                                uint64_t size, int order, uint64_t features,
+                                uint64_t stripe_unit, uint64_t stripe_count,
+                                uint8_t journal_order, uint8_t journal_splay_width,
+                                const std::string &journal_pool,
+                                const std::string &non_primary_global_image_id,
+                                const std::string &primary_mirror_uuid,
+                                ContextWQ *op_work_queue, Context *on_finish) :
+  m_image_name(imgname), m_image_id(imageid), m_size(size), m_order(order),
+  m_features(features), m_stripe_unit(stripe_unit), m_stripe_count(stripe_count),
+  m_journal_order(journal_order), m_journal_splay_width(journal_splay_width),
+  m_journal_pool(journal_pool), m_non_primary_global_image_id(non_primary_global_image_id),
+  m_primary_mirror_uuid(primary_mirror_uuid),
+  m_op_work_queue(op_work_queue), m_on_finish(on_finish) {
+
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+  m_id_obj = util::id_obj_name(m_image_name);
+  m_header_obj = util::header_name(m_image_id);
+  m_objmap_name = ObjectMap::object_map_name(m_image_id, CEPH_NOSNAP);
+
+  m_layout.object_size = 1ull << m_order;
+  if (m_stripe_unit == 0 || m_stripe_count == 0) {
+    m_layout.stripe_unit = m_layout.object_size;
+    m_layout.stripe_count = 1;
+  } else {
+    m_layout.stripe_unit = m_stripe_unit;
+    m_layout.stripe_count = m_stripe_count;
+  }
+
+  m_force_non_primary = !non_primary_global_image_id.empty();
+}
+
+template<typename I>
+void CreateRequest<I>::send() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  if (!validate_features(m_cct, m_features, m_force_non_primary)) {
+    complete(-EINVAL);
+    return;
+  }
+  if (!validate_layout(m_cct, m_size, m_layout)) {
+    complete(-EINVAL);
+    return;
+  }
+
+  validate_pool();
+}
+
+template<typename I>
+void CreateRequest<I>::validate_pool() {
+  if (!m_cct->_conf->rbd_validate_pool) {
+    create_id_object();
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_validate_pool>(this);
+
+  librados::ObjectReadOperation op;
+  op.stat(NULL, NULL, NULL);
+
+  int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op, &m_outbl);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context* CreateRequest<I>::handle_validate_pool(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result == 0) {
+    create_id_object();
+    return nullptr;
+  } else if ((*result < 0) && (*result != -ENOENT)) {
+    lderr(m_cct) << "failed to stat RBD directory: " << cpp_strerror(*result) << dendl;
+    return m_on_finish;
+  }
+
+  // allocate a self-managed snapshot id if this a new pool to force
+  // self-managed snapshot mode
+  // This call is executed just once per (fresh) pool, hence we do not
+  // try hard to make it asynchronous (and it's pretty safe not to cause
+  // deadlocks).
+
+  uint64_t snap_id;
+  int r = m_ioctx.selfmanaged_snap_create(&snap_id);
+  if (r == -EINVAL) {
+    lderr(m_cct) << "pool not configured for self-managed RBD snapshot support"
+                 << dendl;
+    *result = r;
+    return m_on_finish;
+  } else if (r < 0) {
+    lderr(m_cct) << "failed to allocate self-managed snapshot: "
+                 << cpp_strerror(r) << dendl;
+    *result = r;
+    return m_on_finish;
+  }
+
+  r = m_ioctx.selfmanaged_snap_remove(snap_id);
+  if (r < 0) {
+    // we've already switced to self-managed snapshots -- no need to
+    // error out in case of failure here.
+    ldout(m_cct, 10) << "failed to release self-managed snapshot " << snap_id
+                     << ": " << cpp_strerror(r) << dendl;
+  }
+
+  create_id_object();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::create_id_object() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_client::set_id(&op, m_image_id);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_create_id_object>(this);
+  int r = m_ioctx.aio_operate(m_id_obj, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_create_id_object(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error creating RBD id object: " << cpp_strerror(*result) << dendl;
+    return m_on_finish;
+  }
+
+  add_image_to_directory();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::add_image_to_directory() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::dir_add_image(&op, m_image_name, m_image_id);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_add_image_to_directory>(this);
+  int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_add_image_to_directory(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error adding image to directory: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    remove_id_object();
+    return nullptr;
+  }
+
+  create_image();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::create_image() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  ostringstream oss;
+  oss << RBD_DATA_PREFIX << m_image_id;
+
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_client::create_image(&op, m_size, m_order, m_features, oss.str());
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_create_image>(this);
+  int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_create_image(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error writing header: " << cpp_strerror(*result) << dendl;
+    m_r_saved = *result;
+    remove_from_dir();
+    return nullptr;
+  }
+
+  set_stripe_unit_count();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::set_stripe_unit_count() {
+  if ((!m_stripe_unit && !m_stripe_count) ||
+      ((m_stripe_count == 1) && (m_stripe_unit == (1ull << m_order)))) {
+    object_map_resize();
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::set_stripe_unit_count(&op, m_stripe_unit, m_stripe_count);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_set_stripe_unit_count>(this);
+  int r = m_ioctx.aio_operate(m_header_obj, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_set_stripe_unit_count(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error setting stripe unit/count: " << cpp_strerror(*result) << dendl;
+    m_r_saved = *result;
+    remove_header_object();
+    return nullptr;
+  }
+
+  object_map_resize();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::object_map_resize() {
+  if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    fetch_mirror_mode();
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  cls_client::object_map_resize(&op, Striper::get_num_objects(m_layout, m_size),
+                                OBJECT_NONEXISTENT);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_object_map_resize>(this);
+  int r = m_ioctx.aio_operate(m_objmap_name, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_object_map_resize(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error creating initial object map: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    remove_header_object();
+    return nullptr;
+  }
+
+  fetch_mirror_mode();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::fetch_mirror_mode() {
+  if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
+    complete(0);
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectReadOperation op;
+  cls_client::mirror_mode_get_start(&op);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_fetch_mirror_mode>(this);
+  m_outbl.clear();
+  int r = m_ioctx.aio_operate(RBD_MIRRORING, comp, &op, &m_outbl);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_fetch_mirror_mode(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if ((*result < 0) && (*result != -ENOENT)) {
+    lderr(m_cct) << "failed to retrieve mirror mode: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    remove_object_map();
+    return nullptr;
+  }
+
+  cls::rbd::MirrorMode mirror_mode_internal = cls::rbd::MIRROR_MODE_DISABLED;
+  if (*result == 0) {
+    bufferlist::iterator it = m_outbl.begin();
+    *result = cls_client::mirror_mode_get_finish(&it, &mirror_mode_internal);
+    if (*result < 0) {
+      lderr(m_cct) << "Failed to retrieve mirror mode" << dendl;
+
+      m_r_saved = *result;
+      remove_object_map();
+      return nullptr;
+    }
+  }
+
+  // TODO: remove redundant code...
+  switch (mirror_mode_internal) {
+  case cls::rbd::MIRROR_MODE_DISABLED:
+  case cls::rbd::MIRROR_MODE_IMAGE:
+  case cls::rbd::MIRROR_MODE_POOL:
+    m_mirror_mode = static_cast<rbd_mirror_mode_t>(mirror_mode_internal);
+    break;
+  default:
+    lderr(m_cct) << "Unknown mirror mode ("
+               << static_cast<uint32_t>(mirror_mode_internal) << ")"
+               << dendl;
+    *result = -EINVAL;
+    remove_object_map();
+    return nullptr;
+  }
+
+  journal_create();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::journal_create() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  Context *ctx = create_context_callback<klass, &klass::handle_journal_create>(this);
+
+  librbd::journal::TagData tag_data;
+  tag_data.mirror_uuid = (m_force_non_primary ? m_primary_mirror_uuid :
+                          librbd::Journal<I>::LOCAL_MIRROR_UUID);
+
+  librbd::journal::CreateRequest<I> *req =
+    librbd::journal::CreateRequest<I>::create(
+      m_ioctx, m_image_id, m_journal_order, m_journal_splay_width, m_journal_pool,
+      cls::journal::Tag::TAG_CLASS_NEW, tag_data, librbd::Journal<I>::IMAGE_CLIENT_ID,
+      m_op_work_queue, ctx);
+  req->send();
+}
+
+template<typename I>
+Context* CreateRequest<I>::handle_journal_create(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error creating journal: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    remove_object_map();
+    return nullptr;
+  }
+
+  fetch_mirror_image();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::fetch_mirror_image() {
+  if ((m_mirror_mode != RBD_MIRROR_MODE_POOL) && !m_force_non_primary) {
+    complete(0);
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectReadOperation op;
+  cls_client::mirror_image_get_start(&op, m_image_id);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_fetch_mirror_image>(this);
+  m_outbl.clear();
+  int r = m_ioctx.aio_operate(RBD_MIRRORING, comp, &op, &m_outbl);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_fetch_mirror_image(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if ((*result < 0) && (*result != -ENOENT)) {
+    lderr(m_cct) << "cannot enable mirroring: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    journal_remove();
+    return nullptr;
+  }
+
+  if (*result == 0) {
+    bufferlist::iterator it = m_outbl.begin();
+    *result = cls_client::mirror_image_get_finish(&it, &m_mirror_image_internal);
+    if (*result < 0) {
+      lderr(m_cct) << "cannot enable mirroring: " << cpp_strerror(*result) << dendl;
+
+      m_r_saved = *result;
+      journal_remove();
+      return nullptr;
+    }
+
+    if (m_mirror_image_internal.state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
+      return m_on_finish;
+    }
+  }
+
+  // enable image mirroring (-ENOENT or disabled earlier)
+  mirror_image_enable();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::mirror_image_enable() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  m_mirror_image_internal.state = cls::rbd::MIRROR_IMAGE_STATE_ENABLED;
+  if (m_non_primary_global_image_id.empty()) {
+    uuid_d uuid_gen;
+    uuid_gen.generate_random();
+    m_mirror_image_internal.global_image_id = uuid_gen.to_string();
+  } else {
+    m_mirror_image_internal.global_image_id = m_non_primary_global_image_id;
+  }
+
+  librados::ObjectWriteOperation op;
+  cls_client::mirror_image_set(&op, m_image_id, m_mirror_image_internal);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_mirror_image_enable>(this);
+  int r = m_ioctx.aio_operate(RBD_MIRRORING, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_mirror_image_enable(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "cannot enable mirroring: " << cpp_strerror(*result) << dendl;
+
+    m_r_saved = *result;
+    journal_remove();
+    return nullptr;
+  }
+
+  send_watcher_notification();
+  return nullptr;
+}
+
+// TODO: make this *really* async
+template<typename I>
+void CreateRequest<I>::send_watcher_notification() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  Context *ctx = new FunctionContext([this](int r) {
+      r = MirroringWatcher<>::notify_image_updated(
+        m_ioctx, cls::rbd::MIRROR_IMAGE_STATE_ENABLED,
+        m_image_id, m_mirror_image_internal.global_image_id);
+      handle_watcher_notify(r);
+    });
+
+    m_op_work_queue->queue(ctx, 0);
+}
+
+template<typename I>
+void CreateRequest<I>::handle_watcher_notify(int r) {
+  ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+
+  if (r < 0) {
+    // non fatal error -- watchers would cope up upon noticing missing
+    // updates. just log and move on...
+    ldout(m_cct, 10) << "failed to send update notification: " << cpp_strerror(r)
+                     << dendl;
+  } else {
+    ldout(m_cct, 20) << "image mirroring is enabled: global_id=" <<
+      m_mirror_image_internal.global_image_id << dendl;
+  }
+
+  complete(0);
+}
+
+template<typename I>
+void CreateRequest<I>::complete(int r) {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  if (r == 0) {
+    ldout(m_cct, 20) << "done." << dendl;
+  }
+
+  m_on_finish->complete(r);
+  delete this;
+}
+
+// cleanup
+template<typename I>
+void CreateRequest<I>::journal_remove() {
+  if ((m_features & RBD_FEATURE_JOURNALING) == 0) {
+    remove_object_map();
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " <<__func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  Context *ctx = create_context_callback<klass, &klass::handle_journal_remove>(this);
+
+  librbd::journal::RemoveRequest<I> *req =
+    librbd::journal::RemoveRequest<I>::create(
+      m_ioctx, m_image_id, librbd::Journal<I>::IMAGE_CLIENT_ID, m_op_work_queue, ctx);
+  req->send();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_journal_remove(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error cleaning up journal after creation failed: "
+                 << cpp_strerror(*result) << dendl;
+  }
+
+  remove_object_map();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::remove_object_map() {
+  if ((m_features & RBD_FEATURE_OBJECT_MAP) == 0) {
+    remove_header_object();
+    return;
+  }
+
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_remove_object_map>(this);
+  int r = m_ioctx.aio_remove(m_objmap_name, comp);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_remove_object_map(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error cleaning up object map after creation failed: "
+                 << cpp_strerror(*result) << dendl;
+  }
+
+  remove_header_object();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::remove_header_object() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_remove_header_object>(this);
+  int r = m_ioctx.aio_remove(m_header_obj, comp);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_remove_header_object(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error cleaning up image header after creation failed: "
+                 << cpp_strerror(*result) << dendl;
+  }
+
+  remove_from_dir();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::remove_from_dir() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  librados::ObjectWriteOperation op;
+  cls_client::dir_remove_image(&op, m_image_name, m_image_id);
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_remove_from_dir>(this);
+  int r = m_ioctx.aio_operate(RBD_DIRECTORY, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_remove_from_dir(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error cleaning up image from rbd_directory object "
+                 << "after creation failed: " << cpp_strerror(*result) << dendl;
+  }
+
+  remove_id_object();
+  return nullptr;
+}
+
+template<typename I>
+void CreateRequest<I>::remove_id_object() {
+  ldout(m_cct, 20) << this << " " << __func__ << dendl;
+
+  using klass = CreateRequest<I>;
+  librados::AioCompletion *comp =
+    create_rados_ack_callback<klass, &klass::handle_remove_id_object>(this);
+  int r = m_ioctx.aio_remove(m_id_obj, comp);
+  assert(r == 0);
+  comp->release();
+}
+
+template<typename I>
+Context *CreateRequest<I>::handle_remove_id_object(int *result) {
+  ldout(m_cct, 20) << __func__ << ": r=" << *result << dendl;
+
+  if (*result < 0) {
+    lderr(m_cct) << "error cleaning up id object after creation failed: "
+                 << cpp_strerror(*result) << dendl;
+  }
+
+  *result = m_r_saved;
+  return m_on_finish;
+}
+
+} //namespace image
+} //namespace librbd
+
+template class librbd::image::CreateRequest<librbd::ImageCtx>;
diff --git a/src/librbd/image/CreateRequest.h b/src/librbd/image/CreateRequest.h
new file mode 100644 (file)
index 0000000..1628882
--- /dev/null
@@ -0,0 +1,183 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IMAGE_CREATE_REQUEST_H
+#define CEPH_LIBRBD_IMAGE_CREATE_REQUEST_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/WorkQueue.h"
+#include "librbd/ObjectMap.h"
+#include "include/rados/librados.hpp"
+#include "include/rbd_types.h"
+#include "cls/rbd/cls_rbd_types.h"
+#include "include/rbd/librbd.hpp"
+#include "librbd/ImageCtx.h"
+#include "common/Timer.h"
+#include "librbd/journal/TypeTraits.h"
+
+class Context;
+
+using librados::IoCtx;
+
+namespace journal {
+  class Journaler;
+}
+
+namespace librbd {
+namespace image {
+
+template <typename ImageCtxT = ImageCtx>
+class CreateRequest {
+public:
+  static CreateRequest *create(IoCtx &ioctx, std::string &imgname, std::string &imageid,
+                               uint64_t size, int order, uint64_t features,
+                               uint64_t stripe_unit, uint64_t stripe_count,
+                               uint8_t journal_order, uint8_t journal_splay_width,
+                               const std::string &journal_pool,
+                               const std::string &non_primary_global_image_id,
+                               const std::string &primary_mirror_uuid,
+                               ContextWQ *op_work_queue, Context *on_finish) {
+    return new CreateRequest(ioctx, imgname, imageid, size, order, features, stripe_unit,
+                             stripe_count, journal_order, journal_splay_width, journal_pool,
+                             non_primary_global_image_id, primary_mirror_uuid,
+                             op_work_queue, on_finish);
+  }
+
+  void send();
+
+private:
+  /**
+   * @verbatim
+   *
+   *                                  <start> . . . . > . . . . .
+   *                                     |                      .
+   *                                     v                      .
+   *                                VALIDATE POOL               v (pool validation
+   *                                     |                      . disabled)
+   *                                     v                      .
+   * (error: bottom up)           CREATE ID OBJECT. . < . . . . .
+   *  _______<_______                    |
+   * |               |                   v
+   * |               |          ADD IMAGE TO DIRECTORY
+   * |               |               /   |
+   * |      REMOVE ID OBJECT<-------/    v         (stripingv2 disabled)
+   * |               |              CREATE IMAGE. . . . > . . . .
+   * v               |               /   |                      .
+   * |      REMOVE FROM DIR<--------/    v                      .
+   * |               |          SET STRIPE UNIT COUNT           .
+   * |               |               /   |  \ . . . . . > . . . .
+   * |      REMOVE HEADER OBJ<------/    v                     /. (object-map
+   * |               |\           OBJECT MAP RESIZE . . < . . * v  disabled)
+   * |               | \              /  |  \ . . . . . > . . . .
+   * |               |  *<-----------/   v                     /. (journaling
+   * |               |             FETCH MIRROR MODE. . < . . * v  disabled)
+   * |               |                /   |                     .
+   * |     REMOVE OBJECT MAP<--------/    v                     .
+   * |               |\             JOURNAL CREATE              .
+   * |               | \               /  |                     .
+   * v               |  *<------------/   v                     .
+   * |               |           FETCH MIRROR IMAGE             v
+   * |               |                /   |                     .
+   * |        JOURNAL REMOVE<--------/    v                     .
+   * |                \          MIRROR IMAGE ENABLE            .
+   * |                 \               /  |                     .
+   * |                  *<------------/   v                     .
+   * |                              NOTIFY WATCHERS             .
+   * |                                    |                     .
+   * |                                    v                     .
+   * |_____________>___________________<finish> . . . . < . . . .
+   *
+   * @endverbatim
+   */
+
+  CreateRequest(IoCtx &ioctx, std::string &imgname, std::string &imageid,
+                uint64_t size, int order, uint64_t features,
+                uint64_t stripe_unit, uint64_t stripe_count, uint8_t journal_order,
+                uint8_t journal_splay_width, const std::string &journal_pool,
+                const std::string &non_primary_global_image_id,
+                const std::string &primary_mirror_uuid,
+                ContextWQ *op_work_queue, Context *on_finish);
+
+  IoCtx m_ioctx;
+  std::string m_image_name;
+  std::string m_image_id;
+  uint64_t m_size;
+  int m_order;
+  uint64_t m_features, m_stripe_unit, m_stripe_count;
+  uint8_t m_journal_order, m_journal_splay_width;
+  const std::string m_journal_pool;
+  const std::string m_non_primary_global_image_id;
+  const std::string m_primary_mirror_uuid;
+
+  ContextWQ *m_op_work_queue;
+  Context *m_on_finish;
+
+  CephContext *m_cct;
+  int m_r_saved;  // used to return actual error after cleanup
+  bool m_force_non_primary;
+  file_layout_t m_layout;
+  std::string m_id_obj, m_header_obj, m_objmap_name;
+
+  bufferlist m_outbl;
+  rbd_mirror_mode_t m_mirror_mode;
+  cls::rbd::MirrorImage m_mirror_image_internal;
+
+  void validate_pool();
+  Context *handle_validate_pool(int *result);
+
+  void create_id_object();
+  Context *handle_create_id_object(int *result);
+
+  void add_image_to_directory();
+  Context *handle_add_image_to_directory(int *result);
+
+  void create_image();
+  Context *handle_create_image(int *result);
+
+  void set_stripe_unit_count();
+  Context *handle_set_stripe_unit_count(int *result);
+
+  void object_map_resize();
+  Context *handle_object_map_resize(int *result);
+
+  void fetch_mirror_mode();
+  Context *handle_fetch_mirror_mode(int *result);
+
+  void journal_create();
+  Context *handle_journal_create(int *result);
+
+  void fetch_mirror_image();
+  Context *handle_fetch_mirror_image(int *result);
+
+  void mirror_image_enable();
+  Context *handle_mirror_image_enable(int *result);
+
+  void send_watcher_notification();
+  void handle_watcher_notify(int r);
+
+  void complete(int r);
+
+  // cleanup
+  void journal_remove();
+  Context *handle_journal_remove(int *result);
+
+  void remove_object_map();
+  Context *handle_remove_object_map(int *result);
+
+  void remove_header_object();
+  Context *handle_remove_header_object(int *result);
+
+  void remove_from_dir();
+  Context *handle_remove_from_dir(int *result);
+
+  void remove_id_object();
+  Context *handle_remove_id_object(int *result);
+};
+
+} //namespace image
+} //namespace librbd
+
+extern template class librbd::image::CreateRequest<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IMAGE_CREATE_REQUEST_H