]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: non-pruning parent overlap handling fixes
authorIlya Dryomov <idryomov@gmail.com>
Mon, 17 Oct 2022 13:51:04 +0000 (15:51 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Sun, 4 Dec 2022 17:19:19 +0000 (18:19 +0100)
Apply similar "reduce overlap and respect area" logic to places
that don't use prune_parent_extents().  Changes to FlattenRequest
and TrimRequest here should complete the long tail of encrypted
I/O path and flatten fixes.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
src/librbd/Operations.cc
src/librbd/api/DiffIterate.cc
src/librbd/operation/FlattenRequest.cc
src/librbd/operation/FlattenRequest.h
src/librbd/operation/SparsifyRequest.cc
src/librbd/operation/TrimRequest.cc
src/test/librbd/operation/test_mock_TrimRequest.cc
src/test/librbd/test_librbd.cc

index bab2ef4d18cbade74bcf8e1e5678fddc65e1a1e4..ad6e5bcf6a08a430e93335ebfc7835b32b748502 100644 (file)
@@ -541,19 +541,24 @@ void Operations<I>::execute_flatten(ProgressContext &prog_ctx,
     return;
   }
 
-  uint64_t overlap;
-  int r = m_image_ctx.get_parent_overlap(CEPH_NOSNAP, &overlap);
-  ceph_assert(r == 0);
-  ceph_assert(overlap <= m_image_ctx.size);
+  uint64_t crypto_header_objects = Striper::get_num_objects(
+      m_image_ctx.layout,
+      m_image_ctx.get_area_size(io::ImageArea::CRYPTO_HEADER));
 
-  uint64_t overlap_objects = Striper::get_num_objects(m_image_ctx.layout,
-                                                      overlap);
+  uint64_t raw_overlap;
+  int r = m_image_ctx.get_parent_overlap(CEPH_NOSNAP, &raw_overlap);
+  ceph_assert(r == 0);
+  auto overlap = m_image_ctx.reduce_parent_overlap(raw_overlap, false);
+  uint64_t data_overlap_objects = Striper::get_num_objects(
+      m_image_ctx.layout,
+      (overlap.second == io::ImageArea::DATA ? overlap.first : 0));
 
   m_image_ctx.image_lock.unlock_shared();
 
+  // leave encryption header flattening to format-specific handler
   operation::FlattenRequest<I> *req = new operation::FlattenRequest<I>(
-    m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish), overlap_objects,
-    prog_ctx);
+      m_image_ctx, new C_NotifyUpdate<I>(m_image_ctx, on_finish),
+      crypto_header_objects, data_overlap_objects, prog_ctx);
   req->send();
 }
 
index 1cc08968a2904516f496687a929ee1818e843be9..b400b5d5a406cd0dd072977bec64f47cce37a8cf 100644 (file)
@@ -263,12 +263,14 @@ int DiffIterate<I>::execute() {
       // check parent overlap only if we are comparing to the beginning of time
       if (m_include_parent && from_snap_id == 0) {
         std::shared_lock image_locker{m_image_ctx.image_lock};
-        uint64_t overlap = 0;
-        m_image_ctx.get_parent_overlap(m_image_ctx.snap_id, &overlap);
-        if (m_image_ctx.parent && overlap > 0) {
+        uint64_t raw_overlap = 0;
+        m_image_ctx.get_parent_overlap(m_image_ctx.snap_id, &raw_overlap);
+        auto overlap = m_image_ctx.reduce_parent_overlap(raw_overlap, false);
+        if (overlap.first > 0 && overlap.second == io::ImageArea::DATA) {
           ldout(cct, 10) << " first getting parent diff" << dendl;
-          DiffIterate diff_parent(*m_image_ctx.parent, {}, nullptr, 0, overlap,
-                                  true, true, &simple_diff_cb, &parent_diff);
+          DiffIterate diff_parent(*m_image_ctx.parent, {}, nullptr, 0,
+                                  overlap.first, true, true, &simple_diff_cb,
+                                  &parent_diff);
           r = diff_parent.execute();
           if (r < 0) {
             return r;
index ad958fbcaae904ec412807fdd0681b49400c1471..7bc3468192426ae4745f705f4678ca5b104a8984 100644 (file)
@@ -97,15 +97,6 @@ void FlattenRequest<I>::flatten_objects() {
   CephContext *cct = image_ctx.cct;
   ldout(cct, 5) << dendl;
 
-  auto encryption_format = image_ctx.encryption_format.get();
-  uint64_t start_object_no = 0;
-  if (encryption_format != nullptr) {
-    // leave encryption header flattening to format-specific handler
-    start_object_no = Striper::get_num_objects(
-            image_ctx.layout,
-            encryption_format->get_crypto()->get_data_offset());
-  }
-
   assert(ceph_mutex_is_locked(image_ctx.owner_lock));
   auto ctx = create_context_callback<
     FlattenRequest<I>,
@@ -115,8 +106,8 @@ void FlattenRequest<I>::flatten_objects() {
       boost::lambda::_1, &image_ctx, image_ctx.get_data_io_context(),
       boost::lambda::_2));
   AsyncObjectThrottle<I> *throttle = new AsyncObjectThrottle<I>(
-    this, image_ctx, context_factory, ctx, &m_prog_ctx, start_object_no,
-    m_overlap_objects);
+      this, image_ctx, context_factory, ctx, &m_prog_ctx, m_start_object_no,
+      m_start_object_no + m_overlap_objects);
   throttle->start_ops(
     image_ctx.config.template get_val<uint64_t>("rbd_concurrent_management_ops"));
 }
index 82a05f35be33be44f42488833e5467ad5d9793d9..ec6a38a9d727178acc9f9644f4b35603b7b76487 100644 (file)
@@ -17,10 +17,12 @@ class FlattenRequest : public Request<ImageCtxT>
 {
 public:
   FlattenRequest(ImageCtxT &image_ctx, Context *on_finish,
-                 uint64_t overlap_objects, ProgressContext &prog_ctx)
-    : Request<ImageCtxT>(image_ctx, on_finish),
-      m_overlap_objects(overlap_objects), m_prog_ctx(prog_ctx) {
-  }
+                 uint64_t start_object_no, uint64_t overlap_objects,
+                 ProgressContext& prog_ctx)
+      : Request<ImageCtxT>(image_ctx, on_finish),
+        m_start_object_no(start_object_no),
+        m_overlap_objects(overlap_objects),
+        m_prog_ctx(prog_ctx) {}
 
 protected:
   void send_op() override;
@@ -54,6 +56,7 @@ private:
    * @endverbatim
    */
 
+  uint64_t m_start_object_no;
   uint64_t m_overlap_objects;
   ProgressContext &m_prog_ctx;
 
index 5d9837c3eab25669b42b5e172acb9ab686fc5f1d..ef7fc78f502fa698db18f8f7808aade55d4cc982 100644 (file)
@@ -11,6 +11,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/Types.h"
 #include "librbd/io/ObjectRequest.h"
+#include "librbd/io/Utils.h"
 #include "osdc/Striper.h"
 #include <boost/lambda/bind.hpp>
 #include <boost/lambda/construct.hpp>
@@ -143,13 +144,17 @@ public:
         return 1;
       }
 
-      uint64_t overlap_objects = 0;
-      uint64_t overlap;
-      int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &overlap);
-      if (r == 0 && overlap > 0) {
-        overlap_objects = Striper::get_num_objects(image_ctx.layout, overlap);
+      uint64_t raw_overlap = 0;
+      uint64_t object_overlap = 0;
+      int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &raw_overlap);
+      ceph_assert(r == 0);
+      if (raw_overlap > 0) {
+        auto [parent_extents, area] = io::util::object_to_area_extents(
+            &image_ctx, m_object_no, {{0, image_ctx.layout.object_size}});
+        object_overlap = image_ctx.prune_parent_extents(parent_extents, area,
+                                                        raw_overlap, false);
       }
-      m_remove_empty = (m_object_no >= overlap_objects);
+      m_remove_empty = object_overlap == 0;
     }
 
     send_sparsify();
index 6fda48aa769f4e99d5855ede2840227a69187eb1..6c6685f2b757f2582c25de06ba3c863fca4a7347 100644 (file)
@@ -223,20 +223,30 @@ void TrimRequest<I>::send_copyup_objects() {
 
   IOContext io_context;
   bool has_snapshots;
-  uint64_t parent_overlap;
+  uint64_t copyup_end;
   {
     std::shared_lock image_locker{image_ctx.image_lock};
 
     io_context = image_ctx.get_data_io_context();
     has_snapshots = !image_ctx.snaps.empty();
-    int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &parent_overlap);
+
+    uint64_t crypto_header_objects = Striper::get_num_objects(
+        image_ctx.layout,
+        image_ctx.get_area_size(io::ImageArea::CRYPTO_HEADER));
+
+    uint64_t raw_overlap;
+    int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &raw_overlap);
     ceph_assert(r == 0);
+    auto overlap = image_ctx.reduce_parent_overlap(raw_overlap, false);
+    uint64_t data_overlap_objects = Striper::get_num_objects(
+        image_ctx.layout,
+        (overlap.second == io::ImageArea::DATA ? overlap.first : 0));
+
+    // copyup is only required for portion of image that overlaps parent
+    ceph_assert(m_delete_start >= crypto_header_objects);
+    copyup_end = crypto_header_objects + data_overlap_objects;
   }
 
-  // copyup is only required for portion of image that overlaps parent
-  uint64_t copyup_end = Striper::get_num_objects(image_ctx.layout,
-                                                 parent_overlap);
-
   // TODO: protect against concurrent shrink and snap create?
   // skip to remove if no copyup is required.
   if (copyup_end <= m_delete_start || !has_snapshots) {
index cd18f1306e9656f9a78ba5e0aeda0f029d74c929..1771e741377e963104063db721a86d61de6483de 100644 (file)
@@ -156,6 +156,17 @@ public:
                            })));
   }
 
+  void expect_reduce_parent_overlap(MockTestImageCtx& mock_image_ctx,
+                                    uint64_t overlap) {
+    EXPECT_CALL(mock_image_ctx, reduce_parent_overlap(overlap, false))
+      .WillOnce(Return(std::make_pair(overlap, io::ImageArea::DATA)));
+  }
+
+  void expect_get_area_size(MockTestImageCtx& mock_image_ctx) {
+    EXPECT_CALL(mock_image_ctx, get_area_size(io::ImageArea::CRYPTO_HEADER))
+      .WillOnce(Return(0));
+  }
+
   void expect_object_may_exist(MockTestImageCtx &mock_image_ctx,
                                uint64_t object_no, bool exists) {
     if (mock_image_ctx.object_map != nullptr) {
@@ -221,7 +232,9 @@ TEST_F(TestMockOperationTrimRequest, SuccessRemove) {
                            true, 0);
 
   // copy-up
+  expect_get_area_size(mock_image_ctx);
   expect_get_parent_overlap(mock_image_ctx, 0);
+  expect_reduce_parent_overlap(mock_image_ctx, 0);
 
   // remove
   expect_object_may_exist(mock_image_ctx, 0, true);
@@ -277,7 +290,9 @@ TEST_F(TestMockOperationTrimRequest, SuccessCopyUp) {
 
   // copy-up
   io::MockObjectDispatch mock_io_object_dispatch;
+  expect_get_area_size(mock_image_ctx);
   expect_get_parent_overlap(mock_image_ctx, ictx->get_object_size());
+  expect_reduce_parent_overlap(mock_image_ctx, ictx->get_object_size());
   expect_get_object_name(mock_image_ctx, 0, "object0");
   expect_object_discard(mock_image_ctx, mock_io_object_dispatch, 0,
                         ictx->get_object_size(), false, 0);
@@ -369,7 +384,9 @@ TEST_F(TestMockOperationTrimRequest, RemoveError) {
                            false, 0);
 
   // copy-up
+  expect_get_area_size(mock_image_ctx);
   expect_get_parent_overlap(mock_image_ctx, 0);
+  expect_reduce_parent_overlap(mock_image_ctx, 0);
 
   // remove
   expect_object_may_exist(mock_image_ctx, 0, true);
@@ -421,7 +438,9 @@ TEST_F(TestMockOperationTrimRequest, CopyUpError) {
 
   // copy-up
   io::MockObjectDispatch mock_io_object_dispatch;
+  expect_get_area_size(mock_image_ctx);
   expect_get_parent_overlap(mock_image_ctx, ictx->get_object_size());
+  expect_reduce_parent_overlap(mock_image_ctx, ictx->get_object_size());
   expect_get_object_name(mock_image_ctx, 0, "object0");
   expect_object_discard(mock_image_ctx, mock_io_object_dispatch, 0,
                         ictx->get_object_size(), false, -EINVAL);
index 359ea020321e51dfc71b6954a4b3b7f6fef599c4..38ea142623d5f08150c3b32670b03343f4bd18b7 100644 (file)
@@ -65,6 +65,13 @@ using namespace std;
 
 using std::chrono::seconds;
 
+#define ASSERT_PASSED0(x)         \
+  do {                            \
+    bool passed = false;          \
+    x(&passed);                   \
+    ASSERT_TRUE(passed);          \
+  } while(0)
+
 #define ASSERT_PASSED(x, args...) \
   do {                            \
     bool passed = false;          \
@@ -3047,6 +3054,513 @@ TEST_F(TestLibRBD, EncryptedFlattenSmallData)
   }
 }
 
+struct LUKSOnePassphrase {
+  int load(librbd::Image& clone) {
+    librbd::encryption_luks_format_options_t opts = {m_passphrase};
+    return clone.encryption_load(RBD_ENCRYPTION_FORMAT_LUKS, &opts,
+                                 sizeof(opts));
+  }
+
+  int load_flattened(librbd::Image& clone) {
+    return load(clone);
+  }
+
+  std::string m_passphrase = "some passphrase";
+};
+
+struct LUKSTwoPassphrases {
+  int load(librbd::Image& clone) {
+    librbd::encryption_luks_format_options_t opts1 = {m_parent_passphrase};
+    librbd::encryption_luks_format_options_t opts2 = {m_clone_passphrase};
+    librbd::encryption_spec_t specs[] = {
+        {RBD_ENCRYPTION_FORMAT_LUKS, &opts2, sizeof(opts2)},
+        {RBD_ENCRYPTION_FORMAT_LUKS, &opts1, sizeof(opts1)}};
+    return clone.encryption_load2(specs, std::size(specs));
+  }
+
+  int load_flattened(librbd::Image& clone) {
+    librbd::encryption_luks_format_options_t opts = {m_clone_passphrase};
+    return clone.encryption_load(RBD_ENCRYPTION_FORMAT_LUKS, &opts,
+                                 sizeof(opts));
+  }
+
+  std::string m_parent_passphrase = "parent passphrase";
+  std::string m_clone_passphrase = "clone passphrase";
+};
+
+struct PlaintextUnderLUKS1 : LUKSOnePassphrase {
+protected:
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+
+    // before taking a parent snapshot, (temporarily) add extra space
+    // to the parent to account for upcoming LUKS1 header in the clone,
+    // making the clone able to reach all parent data
+    uint64_t luks1_meta_size = 4 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks1_meta_size));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks1_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                         sizeof(fopts)));
+    *passed = true;
+  }
+};
+
+struct PlaintextUnderLUKS2 : LUKSOnePassphrase {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+
+    // before taking a parent snapshot, (temporarily) add extra space
+    // to the parent to account for upcoming LUKS2 header in the clone,
+    // making the clone able to reach all parent data
+    uint64_t luks2_meta_size = 16 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks2_meta_size));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks2_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                         sizeof(fopts)));
+    *passed = true;
+  }
+};
+
+struct UnformattedLUKS1 : LUKSOnePassphrase {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks1_meta_size = 4 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks1_meta_size));
+    librbd::encryption_luks1_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    *passed = true;
+  }
+};
+
+struct LUKS1UnderLUKS1 : LUKSTwoPassphrases {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks1_meta_size = 4 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks1_meta_size));
+    librbd::encryption_luks1_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_parent_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks1_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_clone_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                         sizeof(fopts)));
+    *passed = true;
+  }
+};
+
+struct LUKS1UnderLUKS2 : LUKSTwoPassphrases {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks1_meta_size = 4 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks1_meta_size));
+    librbd::encryption_luks1_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_parent_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+
+    // before taking a parent snapshot, (temporarily) add extra space
+    // to the parent to account for upcoming LUKS2 header in the clone,
+    // making the clone able to reach all parent data
+    // space taken by LUKS1 header in the parent would be reused
+    uint64_t luks2_meta_size = 16 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks2_meta_size - luks1_meta_size));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks2_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_clone_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                         sizeof(fopts)));
+    *passed = true;
+  }
+};
+
+struct UnformattedLUKS2 : LUKSOnePassphrase {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks2_meta_size = 16 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks2_meta_size));
+    librbd::encryption_luks2_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    *passed = true;
+  }
+};
+
+struct LUKS2UnderLUKS2 : LUKSTwoPassphrases {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks2_meta_size = 16 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks2_meta_size));
+    librbd::encryption_luks2_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_parent_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks2_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_clone_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                         sizeof(fopts)));
+    *passed = true;
+  }
+};
+
+struct LUKS2UnderLUKS1 : LUKSTwoPassphrases {
+  void setup_parent(librbd::Image& parent, uint64_t data_size, bool* passed) {
+    uint64_t luks2_meta_size = 16 << 20;
+    ASSERT_EQ(0, parent.resize(data_size + luks2_meta_size));
+    librbd::encryption_luks2_format_options_t fopts = {
+        RBD_ENCRYPTION_ALGORITHM_AES256, m_parent_passphrase};
+    ASSERT_EQ(0, parent.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS2, &fopts,
+                                          sizeof(fopts)));
+
+    ceph::bufferlist bl;
+    bl.append(std::string(data_size, 'a'));
+    ASSERT_EQ(data_size, parent.write(0, data_size, bl));
+    *passed = true;
+  }
+
+  void setup_clone(librbd::Image& clone, uint64_t data_size, bool* passed) {
+    librbd::encryption_luks1_format_options_t fopts =
+        {RBD_ENCRYPTION_ALGORITHM_AES256, m_clone_passphrase};
+    ASSERT_EQ(0, clone.encryption_format(RBD_ENCRYPTION_FORMAT_LUKS1, &fopts,
+                                         sizeof(fopts)));
+
+    // after loading encryption on the clone, one can get rid of
+    // unneeded space allowance in the clone arising from LUKS2 header
+    // in the parent being bigger than LUKS1 header in the clone
+    ASSERT_EQ(0, load(clone));
+    ASSERT_EQ(0, clone.resize(data_size));
+    *passed = true;
+  }
+};
+
+template <typename FormatPolicy>
+class EncryptedFlattenTest : public TestLibRBD, FormatPolicy {
+protected:
+  void create_and_setup(bool* passed) {
+    ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), m_ioctx));
+
+    int order = 22;
+    ASSERT_EQ(0, create_image_pp(m_rbd, m_ioctx, m_parent_name.c_str(),
+                                 m_data_size, &order));
+    librbd::Image parent;
+    ASSERT_EQ(0, m_rbd.open(m_ioctx, parent, m_parent_name.c_str(), nullptr));
+    ASSERT_PASSED(FormatPolicy::setup_parent, parent, m_data_size);
+
+    ASSERT_EQ(0, parent.snap_create("snap"));
+    ASSERT_EQ(0, parent.snap_protect("snap"));
+    uint64_t features;
+    ASSERT_EQ(0, parent.features(&features));
+    ASSERT_EQ(0, m_rbd.clone(m_ioctx, m_parent_name.c_str(), "snap", m_ioctx,
+                             m_clone_name.c_str(), features, &order));
+    librbd::Image clone;
+    ASSERT_EQ(0, m_rbd.open(m_ioctx, clone, m_clone_name.c_str(), nullptr));
+    ASSERT_PASSED(FormatPolicy::setup_clone, clone, m_data_size);
+
+    *passed = true;
+  }
+
+  void open_and_load(librbd::Image& clone, bool* passed) {
+    ASSERT_EQ(0, m_rbd.open(m_ioctx, clone, m_clone_name.c_str(), nullptr));
+    ASSERT_EQ(0, FormatPolicy::load(clone));
+    *passed = true;
+  }
+
+  void open_and_load_flattened(librbd::Image& clone, bool* passed) {
+    ASSERT_EQ(0, m_rbd.open(m_ioctx, clone, m_clone_name.c_str(), nullptr));
+    ASSERT_EQ(0, FormatPolicy::load_flattened(clone));
+    *passed = true;
+  }
+
+  void verify_size_and_overlap(librbd::Image& image, uint64_t expected_size,
+                               uint64_t expected_overlap) {
+    uint64_t size;
+    ASSERT_EQ(0, image.size(&size));
+    EXPECT_EQ(expected_size, size);
+    uint64_t overlap;
+    ASSERT_EQ(0, image.overlap(&overlap));
+    EXPECT_EQ(expected_overlap, overlap);
+  }
+
+  void verify_data(librbd::Image& image, const ceph::bufferlist& expected_bl) {
+    ceph::bufferlist read_bl;
+    ASSERT_EQ(expected_bl.length(),
+              image.read(0, expected_bl.length(), read_bl));
+    EXPECT_TRUE(expected_bl.contents_equal(read_bl));
+  }
+
+  librados::IoCtx m_ioctx;
+  librbd::RBD m_rbd;
+  std::string m_parent_name = get_temp_image_name();
+  std::string m_clone_name = get_temp_image_name();
+  uint64_t m_data_size = 25 << 20;
+};
+
+using EncryptedFlattenTestTypes =
+    ::testing::Types<PlaintextUnderLUKS1, PlaintextUnderLUKS2,
+                     UnformattedLUKS1, LUKS1UnderLUKS1, LUKS1UnderLUKS2,
+                     UnformattedLUKS2, LUKS2UnderLUKS2, LUKS2UnderLUKS1>;
+TYPED_TEST_SUITE(EncryptedFlattenTest, EncryptedFlattenTestTypes);
+
+TYPED_TEST(EncryptedFlattenTest, Simple)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(this->m_data_size, 'a'));
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size, this->m_data_size);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, Grow)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(this->m_data_size, 'a'));
+  expected_bl.append_zero(1);
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.resize(this->m_data_size + 1));
+    this->verify_size_and_overlap(clone, this->m_data_size + 1,
+                                  this->m_data_size);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size + 1, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, Shrink)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(this->m_data_size - 1, 'a'));
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.resize(this->m_data_size - 1));
+    this->verify_size_and_overlap(clone, this->m_data_size - 1,
+                                  this->m_data_size - 1);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size - 1, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, ShrinkToOne)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(1, 'a'));
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.resize(1));
+    this->verify_size_and_overlap(clone, 1, 1);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, 1, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, ShrinkToOneAfterSnapshot)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(1, 'a'));
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.snap_create("snap"));
+    ASSERT_EQ(0, clone.resize(1));
+    this->verify_size_and_overlap(clone, 1, 1);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, 1, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, MinOverlap)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append(std::string(1, 'a'));
+  expected_bl.append_zero(this->m_data_size - 1);
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.resize(1));
+    ASSERT_EQ(0, clone.resize(this->m_data_size));
+    this->verify_size_and_overlap(clone, this->m_data_size, 1);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
+TYPED_TEST(EncryptedFlattenTest, ZeroOverlap)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_STRIPINGV2));
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
+
+  ASSERT_PASSED0(this->create_and_setup);
+
+  ceph::bufferlist expected_bl;
+  expected_bl.append_zero(this->m_data_size);
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load, clone);
+    ASSERT_EQ(0, clone.resize(0));
+    ASSERT_EQ(0, clone.resize(this->m_data_size));
+    this->verify_size_and_overlap(clone, this->m_data_size, 0);
+    this->verify_data(clone, expected_bl);
+    ASSERT_EQ(0, clone.flatten());
+    this->verify_data(clone, expected_bl);
+  }
+
+  {
+    librbd::Image clone;
+    ASSERT_PASSED(this->open_and_load_flattened, clone);
+    this->verify_size_and_overlap(clone, this->m_data_size, 0);
+    this->verify_data(clone, expected_bl);
+  }
+}
+
 #endif
 
 TEST_F(TestLibRBD, TestIOWithIOHint)