]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge pull request #38237 from myoungwon/wip-fix-clone-size-mismatch
authorKefu Chai <kchai@redhat.com>
Wed, 30 Dec 2020 03:19:51 +0000 (11:19 +0800)
committerGitHub <noreply@github.com>
Wed, 30 Dec 2020 03:19:51 +0000 (11:19 +0800)
osd: fix clone size mismatch when deduped object is evicted

Reviewed-by: Samuel Just <sjust@redhat.com>
1  2 
src/osd/PrimaryLogPG.cc
src/test/librados/tier_cxx.cc

Simple merge
index 5dd09f1d2c50d6685227d71cc821a5a0860313c1,79a19c17dc96373ae3f2f47f07c0b0affbe19b72..6495350569e4c22771c235467e6287672143db3c
@@@ -4625,757 -4625,122 +4625,851 @@@ TEST_F(LibRadosTwoPoolsPP, ManifestEvic
  
      stat_op.stat(&size, NULL, NULL);
      ASSERT_EQ(0, ioctx.operate("foo", &stat_op, NULL));
-     ASSERT_EQ(0, size);
+     ASSERT_EQ(strlen("there hiHI"), size);
    }
  
-   ioctx.snap_set_read(librados::SNAP_HEAD);
+ }
+ TEST_F(LibRadosTwoPoolsPP, ManifestSnapSizeMismatch) {
+   // skip test if not yet octopus
+   if (_get_required_osd_release(cluster) < "octopus") {
+     cout << "cluster is not yet octopus, skipping test" << std::endl;
+     return;
+   }
+   // create object
+   {
+     bufferlist bl;
+     bl.append("there hiHI");
+     ObjectWriteOperation op;
+     op.write_full(bl);
+     ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
+   }
+   {
+     bufferlist bl;
+     bl.append("there hiHI");
+     ObjectWriteOperation op;
+     op.write_full(bl);
+     ASSERT_EQ(0, ioctx.operate("chunk1", &op));
+   }
+   {
+     bufferlist bl;
+     bl.append("there HIHI");
+     ObjectWriteOperation op;
+     op.write_full(bl);
+     ASSERT_EQ(0, ioctx.operate("chunk2", &op));
+   }
+   // wait for maps to settle
+   cluster.wait_for_latest_osdmap();
+   // create a snapshot, clone
+   vector<uint64_t> my_snaps(1);
+   ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
+   ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
+        my_snaps));
+   {
+     bufferlist bl;
+     bl.append("There hiHI");
+     ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
+   }
+   my_snaps.resize(2);
+   my_snaps[1] = my_snaps[0];
+   ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
+   ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
+        my_snaps));
+   {
+     bufferlist bl;
+     bl.append("tHere hiHI");
+     ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
+   }
+   // set-chunk 
+   manifest_set_chunk(cluster, ioctx, cache_ioctx, 0, 10, "chunk1", "foo");
+   cache_ioctx.snap_set_read(my_snaps[1]);
+   // set-chunk 
+   manifest_set_chunk(cluster, ioctx, cache_ioctx, 0, 10, "chunk2", "foo");
+   // evict
    {
      ObjectReadOperation op, stat_op;
-     uint64_t size;
      op.tier_evict();
      librados::AioCompletion *completion = cluster.aio_create_completion();
-     ASSERT_EQ(0, ioctx.aio_operate(
-       "foo", completion, &op,
-       librados::OPERATION_IGNORE_OVERLAY, NULL));
+     ASSERT_EQ(0, cache_ioctx.aio_operate(
+        "foo", completion, &op,
+        librados::OPERATION_IGNORE_OVERLAY, NULL));
      completion->wait_for_complete();
      ASSERT_EQ(0, completion->get_return_value());
+   }
  
-     stat_op.stat(&size, NULL, NULL);
-     ASSERT_EQ(0, ioctx.operate("foo", &stat_op, NULL));
-     ASSERT_EQ(strlen("there hiHI"), size);
+   uint32_t hash;
+   ASSERT_EQ(0, cache_ioctx.get_object_pg_hash_position2("foo", &hash));
+   // scrub
+   {
+     for (int tries = 0; tries < 5; ++tries) {
+       bufferlist inbl;
+       ostringstream ss;
+       ss << "{\"prefix\": \"pg deep-scrub\", \"pgid\": \""
+         << cache_ioctx.get_id() << "."
+         << std::hex << hash
+         << "\"}";
+       int r = cluster.mon_command(ss.str(), inbl, NULL, NULL);
+       if (r == -ENOENT ||  
+          r == -EAGAIN) {
+        sleep(5);
+        continue;
+       }
+       ASSERT_EQ(0, r);
+       break;
+     }
+     cout << "waiting for scrubs..." << std::endl;
+     sleep(20);
+     cout << "done waiting" << std::endl;
    }
  
+   {
+     bufferlist bl;
+     ASSERT_EQ(1, cache_ioctx.read("foo", bl, 1, 0));
+     ASSERT_EQ('t', bl[0]);
+   }
  }
  
 +#include <common/CDC.h>
 +TEST_F(LibRadosTwoPoolsPP, DedupFlushRead) {
 +  // skip test if not yet octopus
 +  if (_get_required_osd_release(cluster) < "octopus") {
 +    GTEST_SKIP() << "cluster is not yet octopus, skipping test";
 +  }
 +
 +  bufferlist inbl;
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "fingerprint_algorithm", "sha1"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_tier", pool_name),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_chunk_algorithm", "fastcdc"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 1024),
 +      inbl, NULL, NULL));
 +
 +  // wait for maps to settle
 +  cluster.wait_for_latest_osdmap();
 +
 +  // create object
 +  bufferlist gbl;
 +  {
 +    generate_buffer(1024*8, &gbl);
 +    ObjectWriteOperation op;
 +    op.write_full(gbl);
 +    ASSERT_EQ(0, cache_ioctx.operate("foo-chunk", &op));
 +  }
 +  {
 +    bufferlist bl;
 +    bl.append("DDse chunk");
 +    ObjectWriteOperation op;
 +    op.write_full(bl);
 +    ASSERT_EQ(0, ioctx.operate("bar-chunk", &op));
 +  }
 +
 +  // set-chunk to set manifest object
 +  {
 +    ObjectReadOperation op;
 +    op.set_chunk(0, 2, ioctx, "bar-chunk", 0,
 +              CEPH_OSD_OP_FLAG_WITH_REFERENCE);
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate("foo-chunk", completion, &op,
 +            librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo-chunk", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  std::unique_ptr<CDC> cdc = CDC::create("fastcdc", cbits(1024)-1);
 +  vector<pair<uint64_t, uint64_t>> chunks;
 +  bufferlist chunk;
 +  cdc->calc_chunks(gbl, &chunks);
 +  chunk.substr_of(gbl, chunks[1].first, chunks[1].second);
 +  string tgt_oid;
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    SHA1 sha1_gen;
 +    int size = chunk.length();
 +    sha1_gen.Update((const unsigned char *)chunk.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +  }
 +
 +  // read and verify the chunked object
 +  {
 +    bufferlist test_bl;
 +    ASSERT_EQ(2, ioctx.read(tgt_oid, test_bl, 2, 0));
 +    ASSERT_EQ(test_bl[1], chunk[1]);
 +  }
 +
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 512),
 +      inbl, NULL, NULL));
 +  cluster.wait_for_latest_osdmap();
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo-chunk", bl, bl.length(), 0));
 +  }
 +
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo-chunk", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  cdc = CDC::create("fastcdc", cbits(512)-1);
 +  chunks.clear();
 +  cdc->calc_chunks(gbl, &chunks);
 +  bufferlist chunk_512;
 +  chunk_512.substr_of(gbl, chunks[3].first, chunks[3].second);
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    SHA1 sha1_gen;
 +    int size = chunk_512.length();
 +    sha1_gen.Update((const unsigned char *)chunk_512.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +  }
 +
 +  // read and verify the chunked object
 +  {
 +    bufferlist test_bl;
 +    ASSERT_EQ(2, ioctx.read(tgt_oid, test_bl, 2, 0));
 +    ASSERT_EQ(test_bl[1], chunk_512[1]);
 +  }
 +
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 16384),
 +      inbl, NULL, NULL));
 +  cluster.wait_for_latest_osdmap();
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo-chunk", bl, bl.length(), 0));
 +    gbl.begin(0).copy_in(bl.length(), bl);
 +  }
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo-chunk", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  cdc = CDC::create("fastcdc", cbits(16384)-1);
 +  chunks.clear();
 +  cdc->calc_chunks(gbl, &chunks);
 +  bufferlist chunk_16384;
 +  chunk_16384.substr_of(gbl, chunks[0].first, chunks[0].second);
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    SHA1 sha1_gen;
 +    int size = chunk_16384.length();
 +    sha1_gen.Update((const unsigned char *)chunk_16384.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +  }
 +  // read and verify the chunked object
 +  {
 +    bufferlist test_bl;
 +    ASSERT_EQ(2, ioctx.read(tgt_oid, test_bl, 2, 0));
 +    ASSERT_EQ(test_bl[0], chunk_16384[0]);
 +  }
 +
 +  // less than object size
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 1024),
 +      inbl, NULL, NULL));
 +  cluster.wait_for_latest_osdmap();
 +
 +  // make a dirty chunks
 +  // a chunk_info is deleted by write, which converts the manifest object to non-manifest object
 +  {
 +    bufferlist bl;
 +    bl.append("hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo-chunk", bl, bl.length(), 0));
 +  }
 +
 +  // reset set-chunk
 +  {
 +    bufferlist bl;
 +    bl.append("DDse chunk");
 +    ObjectWriteOperation op;
 +    op.write_full(bl);
 +    ASSERT_EQ(0, ioctx.operate("bar-chunk", &op));
 +  }
 +  // set-chunk to set manifest object
 +  {
 +    ObjectReadOperation op;
 +    op.set_chunk(0, 2, ioctx, "bar-chunk", 0,
 +              CEPH_OSD_OP_FLAG_WITH_REFERENCE);
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate("foo-chunk", completion, &op,
 +            librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo-chunk", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  cdc = CDC::create("fastcdc", cbits(1024)-1);
 +  chunks.clear();
 +  cdc->calc_chunks(gbl, &chunks);
 +  bufferlist small_chunk;
 +  small_chunk.substr_of(gbl, chunks[1].first, chunks[1].second);
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    SHA1 sha1_gen;
 +    int size = small_chunk.length();
 +    sha1_gen.Update((const unsigned char *)small_chunk.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +  }
 +  // read and verify the chunked object
 +  {
 +    bufferlist test_bl;
 +    ASSERT_EQ(2, ioctx.read(tgt_oid, test_bl, 2, 0));
 +    ASSERT_EQ(test_bl[0], small_chunk[0]);
 +  }
 +
 +}
 +
 +TEST_F(LibRadosTwoPoolsPP, ManifestFlushSnap) {
 +  // skip test if not yet octopus
 +  if (_get_required_osd_release(cluster) < "octopus") {
 +    cout << "cluster is not yet octopus, skipping test" << std::endl;
 +    return;
 +  }
 +
 +  bufferlist inbl;
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "fingerprint_algorithm", "sha1"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_tier", pool_name),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_chunk_algorithm", "fastcdc"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 1024),
 +      inbl, NULL, NULL));
 +
 +  // wait for maps to settle
 +  cluster.wait_for_latest_osdmap();
 +
 +  // create object
 +  bufferlist gbl;
 +  {
 +    //bufferlist bl;
 +    //bl.append("there hi");
 +    generate_buffer(1024*8, &gbl);
 +    ObjectWriteOperation op;
 +    op.write_full(gbl);
 +    ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
 +  }
 +  {
 +    bufferlist bl;
 +    bl.append("there hi");
 +    ObjectWriteOperation op;
 +    op.write_full(bl);
 +    ASSERT_EQ(0, ioctx.operate("bar", &op));
 +  }
 +
 +  // set-chunk (dedup)
 +  manifest_set_chunk(cluster, ioctx, cache_ioctx, 2, 2, "bar", "foo");
 +
 +  // create a snapshot, clone
 +  vector<uint64_t> my_snaps(1);
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
 +      my_snaps));
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("Thbbe");
 +    ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
 +  }
 +
 +  // and another
 +  my_snaps.resize(2);
 +  my_snaps[1] = my_snaps[0];
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
 +      my_snaps));
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("Thcce");
 +    ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
 +  }
 +
 +  // flush on head (should fail)
 +  cache_ioctx.snap_set_read(librados::SNAP_HEAD);
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(-EBUSY, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // flush on recent snap (should fail)
 +  cache_ioctx.snap_set_read(my_snaps[0]);
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(-EBUSY, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // flush on oldest snap
 +  cache_ioctx.snap_set_read(my_snaps[1]);
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // flush on oldest snap
 +  cache_ioctx.snap_set_read(my_snaps[0]);
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // flush on oldest snap
 +  cache_ioctx.snap_set_read(librados::SNAP_HEAD);
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // check chunk's refcount
 +  std::unique_ptr<CDC> cdc = CDC::create("fastcdc", cbits(1024)-1);
 +  vector<pair<uint64_t, uint64_t>> chunks;
 +  bufferlist chunk;
 +  cdc->calc_chunks(gbl, &chunks);
 +  chunk.substr_of(gbl, chunks[1].first, chunks[1].second);
 +  string tgt_oid;
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    SHA1 sha1_gen;
 +    int size = chunk.length();
 +    sha1_gen.Update((const unsigned char *)chunk.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +  }
 +  // read and verify the chunked object
 +  {
 +    bufferlist test_bl;
 +    ASSERT_EQ(2, ioctx.read(tgt_oid, test_bl, 2, 0));
 +    ASSERT_EQ(test_bl[1], chunk[1]);
 +  }
 +
 +  cache_ioctx.snap_set_read(librados::SNAP_HEAD);
 +  {
 +    bufferlist bl;
 +    ASSERT_EQ(4, cache_ioctx.read("foo", bl, 4, 0));
 +    ASSERT_EQ('c', bl[2]);
 +  }
 +
 +  cache_ioctx.snap_set_read(my_snaps[0]);
 +  {
 +    bufferlist bl;
 +    ASSERT_EQ(4, cache_ioctx.read("foo", bl, 4, 0));
 +    ASSERT_EQ('b', bl[2]);
 +  }
 +}
 +
 +TEST_F(LibRadosTwoPoolsPP, ManifestFlushDupCount) {
 +  // skip test if not yet octopus
 +  if (_get_required_osd_release(cluster) < "octopus") {
 +    cout << "cluster is not yet octopus, skipping test" << std::endl;
 +    return;
 +  }
 +
 +  bufferlist inbl;
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "fingerprint_algorithm", "sha1"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_tier", pool_name),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_chunk_algorithm", "fastcdc"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 1024),
 +      inbl, NULL, NULL));
 +
 +  // create object
 +  bufferlist gbl;
 +  {
 +    //bufferlist bl;
 +    generate_buffer(1024*8, &gbl);
 +    ObjectWriteOperation op;
 +    op.write_full(gbl);
 +    ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
 +  }
 +  {
 +    bufferlist bl;
 +    bl.append("there hiHI");
 +    ObjectWriteOperation op;
 +    op.write_full(bl);
 +    ASSERT_EQ(0, ioctx.operate("bar", &op));
 +  }
 +
 +  // wait for maps to settle
 +  cluster.wait_for_latest_osdmap();
 +
 +  // set-chunk to set manifest object
 +  {
 +    ObjectReadOperation op;
 +    op.set_chunk(0, 2, ioctx, "bar", 0,
 +              CEPH_OSD_OP_FLAG_WITH_REFERENCE);
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
 +            librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // create a snapshot, clone
 +  vector<uint64_t> my_snaps(1);
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
 +      my_snaps));
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("Thbbe hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
 +  }
 +
 +  // and another
 +  my_snaps.resize(2);
 +  my_snaps[1] = my_snaps[0];
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_create(&my_snaps[0]));
 +  ASSERT_EQ(0, cache_ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
 +      my_snaps));
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("Thcce hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
 +  }
 +
 +  //flush on oldest snap
 +  cache_ioctx.snap_set_read(my_snaps[1]);
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // flush on oldest snap
 +  cache_ioctx.snap_set_read(my_snaps[0]);
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  cache_ioctx.snap_set_read(librados::SNAP_HEAD);
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  std::unique_ptr<CDC> cdc = CDC::create("fastcdc", cbits(1024)-1);
 +  vector<pair<uint64_t, uint64_t>> chunks;
 +  bufferlist chunk;
 +  cdc->calc_chunks(gbl, &chunks);
 +  chunk.substr_of(gbl, chunks[1].first, chunks[1].second);
 +  string tgt_oid;
 +  // check chunk's refcount
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    bufferlist t;
 +    SHA1 sha1_gen;
 +    int size = chunk.length();
 +    sha1_gen.Update((const unsigned char *)chunk.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +    ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
 +    chunk_refs_t refs;
 +    try {
 +      auto iter = t.cbegin();
 +      decode(refs, iter);
 +    } catch (buffer::error& err) {
 +      ASSERT_TRUE(0);
 +    }
 +    ASSERT_EQ(1u, refs.count());
 +  }
 +
 +  bufferlist chunk2;
 +  chunk2.substr_of(gbl, chunks[0].first, chunks[0].second);
 +  // check chunk's refcount
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    bufferlist t;
 +    SHA1 sha1_gen;
 +    int size = chunk2.length();
 +    sha1_gen.Update((const unsigned char *)chunk2.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +    ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
 +    chunk_refs_t refs;
 +    try {
 +      auto iter = t.cbegin();
 +      decode(refs, iter);
 +    } catch (buffer::error& err) {
 +      ASSERT_TRUE(0);
 +    }
 +    ASSERT_EQ(1u, refs.count());
 +  }
 +
 +  // make a dirty chunks
 +  {
 +    bufferlist bl;
 +    bl.append("ThDDe hi");
 +    ASSERT_EQ(0, cache_ioctx.write("foo", bl, bl.length(), 0));
 +  }
 +
 +  // flush
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  bufferlist tmp;
 +  tmp.append("Thcce hi");
 +  gbl.begin(0).copy_in(tmp.length(), tmp);
 +  bufferlist chunk3;
 +  cdc->calc_chunks(gbl, &chunks);
 +  chunk3.substr_of(gbl, chunks[0].first, chunks[0].second);
 +  // check chunk's refcount
 +  {
 +    unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1] = {0};
 +    char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
 +    bufferlist t;
 +    SHA1 sha1_gen;
 +    int size = chunk2.length();
 +    sha1_gen.Update((const unsigned char *)chunk2.c_str(), size);
 +    sha1_gen.Final(fingerprint);
 +    buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
 +    tgt_oid = string(p_str);
 +    ASSERT_EQ(-ENOENT, ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t));
 +  }
 +}
 +
 +TEST_F(LibRadosTwoPoolsPP, TierFlushDuringFlush) {
 +  // skip test if not yet octopus
 +  if (_get_required_osd_release(cluster) < "octopus") {
 +    cout << "cluster is not yet octopus, skipping test" << std::endl;
 +    return;
 +  }
 +
 +  bufferlist inbl;
 +
 +  // create a new pool 
 +  std::string temp_pool_name = get_temp_pool_name() + "-test-flush";
 +  ASSERT_EQ(0, cluster.pool_create(temp_pool_name.c_str()));
 +
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "fingerprint_algorithm", "sha1"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_tier", temp_pool_name),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_chunk_algorithm", "fastcdc"),
 +      inbl, NULL, NULL));
 +  ASSERT_EQ(0, cluster.mon_command(
 +      set_pool_str(cache_pool_name, "dedup_cdc_chunk_size", 1024),
 +      inbl, NULL, NULL));
 +
 +  // create object
 +  bufferlist gbl;
 +  {
 +    //bufferlist bl;
 +    generate_buffer(1024*8, &gbl);
 +    ObjectWriteOperation op;
 +    op.write_full(gbl);
 +    ASSERT_EQ(0, cache_ioctx.operate("foo", &op));
 +  }
 +  {
 +    bufferlist bl;
 +    bl.append("there hiHI");
 +    ObjectWriteOperation op;
 +    op.write_full(bl);
 +    ASSERT_EQ(0, ioctx.operate("bar", &op));
 +  }
 +
 +  // set-chunk to set manifest object
 +  {
 +    ObjectReadOperation op;
 +    op.set_chunk(0, 2, ioctx, "bar", 0,
 +              CEPH_OSD_OP_FLAG_WITH_REFERENCE);
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate("foo", completion, &op,
 +            librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(0, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +  // delete temp pool, so flushing chunk will fail
 +  ASSERT_EQ(0, s_cluster.pool_delete(temp_pool_name.c_str()));
 +
 +  // flush to check if proper error is returned
 +  {
 +    ObjectReadOperation op;
 +    op.tier_flush();
 +    librados::AioCompletion *completion = cluster.aio_create_completion();
 +    ASSERT_EQ(0, cache_ioctx.aio_operate(
 +      "foo", completion, &op,
 +      librados::OPERATION_IGNORE_CACHE, NULL));
 +    completion->wait_for_complete();
 +    ASSERT_EQ(-ENOENT, completion->get_return_value());
 +    completion->release();
 +  }
 +
 +}
 +
  class LibRadosTwoPoolsECPP : public RadosTestECPP
  {
  public: