]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/SnapMapper: maintain the prefix_itr between calls to SnapMapper::get_next_objects... 53132/head
authorGabriel BenHanokh <gbenhano@redhat.com>
Wed, 14 Dec 2022 12:55:19 +0000 (12:55 +0000)
committerGabriel BenHanokh <gbenhano@redhat.com>
Thu, 2 Nov 2023 19:25:16 +0000 (19:25 +0000)
Maintain the prefix_itr between calls to SnapMapper::get_next_objects_to_trim() to prevent searching depleted prefixes.
We got 8 distinct hash prefixes used for searching objects owned by a given PG.
On each call to SnapMapper::get_next_objects_to_trim() we start from the first prefix even after all objects mapped to it were depleted.
This means that we will be searching for 1 non-existing prefix after the first prefix was depleted, 2 after the first two prefixes were depleted... and so on until we will search 7 non-existing prefixes after the first 7 prefixes were depleted.

This is a performance improvement PR only!
It maintains the existing behavior and does not try to fix/change any of the TRIM logic.
I added an extra step after the last object is trimmed doing a full scan of the DB and only if no object was found it will return ENOENT.
This should make the new code no-worse than existing code which returns ENOENT after a full scan found no object.
It should not impact performance in real life snaps as it should only happen once per-snap.

added snap-mapper tests to rados-test-suite
disabled osd_debug_trim_objects when running (SnapMapperTest, prefix_itr) to prevent asserts(as this code does illegal inserts into DELETED snaps)
Code beautifing

Disabled the assert as there is a corner case when we retrieve the last valid object/s in a snap
The prefix_itr is advanced past the last valid value (as we completed a full scan)
If the OSD will call get_next_objects_to_trim() before the retrieved object/s was processed and removed from the SnapMapper DB it won't be found by the next call (as the prefix_itr is invalid).
The object will be found in the second-pass which will seems as if it was added after the trim was started (which is illegal) and will trigger an ASSERT

Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
qa/suites/rados/singleton-nomsgr/all/ceph-snapmapper.yaml [new file with mode: 0644]
qa/tasks/ceph.conf.template
src/common/options/osd.yaml.in
src/osd/SnapMapper.cc
src/osd/SnapMapper.h
src/test/CMakeLists.txt
src/test/test_snap_mapper.cc

diff --git a/qa/suites/rados/singleton-nomsgr/all/ceph-snapmapper.yaml b/qa/suites/rados/singleton-nomsgr/all/ceph-snapmapper.yaml
new file mode 100644 (file)
index 0000000..cbfd086
--- /dev/null
@@ -0,0 +1,22 @@
+openstack:
+  - volumes: # attached to each instance
+      count: 3
+      size: 10 # GB
+roles:
+- [mon.a, mgr.x, osd.0, osd.1, osd.2, client.0]
+
+overrides:
+  ceph:
+    pre-mgr-commands:
+      - sudo ceph config set mgr mgr_pool false --force
+    log-ignorelist:
+    - but it is still running
+    - overall HEALTH_
+    - \(POOL_APP_NOT_ENABLED\)
+
+tasks:
+- install:
+- ceph:
+- exec:
+    client.0:
+    - ceph_test_snap_mapper
index a9cce29539aa1f7d8c7a2cdc16e5cbca266f07b9..7ae2b83c2951561581449b664976bf4a42254bc3 100644 (file)
@@ -55,8 +55,9 @@
        osd debug shutdown = true
         osd debug op order = true
         osd debug verify stray on activate = true
+        osd debug trim objects = true
 
-       osd open classes on start = true
+        osd open classes on start = true
         osd debug pg log writeout = true
 
        osd deep scrub update digest min age = 30
index 5d8d40cf12d1fc9f45251c876e5dd9e9fc3903bc..f4c60db26b7747bab0b586aa6923effc225eb929 100644 (file)
@@ -194,6 +194,11 @@ options:
     load on busy clusters.
   default: false
   with_legacy: true
+- name: osd_debug_trim_objects
+  type: bool
+  level: advanced
+  desc: Asserts that no clone-objects were added to a snap after we start trimming it
+  default: false
 - name: osd_repair_during_recovery
   type: bool
   level: advanced
index 7893bc08fdcb6f6219f1168f765d1226544ee624..e41cd1df08e3e00ed64be9657430a371c85d6403 100644 (file)
@@ -438,7 +438,7 @@ void SnapMapper::clear_snaps(
   to_remove.insert(to_object_key(oid));
   if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
     for (auto& i : to_remove) {
-      dout(20) << __func__ << " rm " << i << dendl;
+      dout(20) << __func__ << "::rm " << i << dendl;
     }
   }
   backend.remove_keys(to_remove, t);
@@ -457,7 +457,7 @@ void SnapMapper::set_snaps(
   dout(20) << __func__ << " " << oid << " " << in.snaps << dendl;
   if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
     for (auto& i : to_set) {
-      dout(20) << __func__ << " set " << i.first << dendl;
+      dout(20) << __func__ << "::set " << i.first << dendl;
     }
   }
   backend.set_keys(to_set, t);
@@ -540,36 +540,48 @@ void SnapMapper::add_oid(
   backend.set_keys(to_add, t);
 }
 
-int SnapMapper::get_next_objects_to_trim(
+// reset the prefix iterator to the first prefix hash
+void SnapMapper::reset_prefix_itr(snapid_t snap, const char *s)
+{
+  if (prefix_itr_snap == CEPH_NOSNAP) {
+    dout(10) << __func__ << "::from <CEPH_NOSNAP> to <" << snap << "> ::" << s << dendl;
+  }
+  else if (snap == CEPH_NOSNAP) {
+    dout(10) << __func__ << "::from <"<< prefix_itr_snap << "> to <CEPH_NOSNAP> ::" << s << dendl;
+  }
+  else if (prefix_itr_snap == snap) {
+    dout(10) << __func__ << "::with the same snapid <" << snap << "> ::" << s << dendl;
+  }
+  else {
+    // This is unexpected!!
+    dout(10) << __func__ << "::from <"<< prefix_itr_snap << "> to <" << snap << "> ::" << s << dendl;
+  }
+  prefix_itr_snap = snap;
+  prefix_itr      = prefixes.begin();
+}
+
+void SnapMapper::get_objects_by_prefixes(
   snapid_t snap,
   unsigned max,
   vector<hobject_t> *out)
 {
-  ceph_assert(out);
-  ceph_assert(out->empty());
-
-  // if max would be 0, we return ENOENT and the caller would mistakenly
-  // trim the snaptrim queue
-  ceph_assert(max > 0);
-  int r = 0;
-
-  /// \todo cache the prefixes-set in update_bits()
-  for (set<string>::iterator i = prefixes.begin();
-       i != prefixes.end() && out->size() < max && r == 0;
-       ++i) {
-    string prefix(get_prefix(pool, snap) + *i);
+  /// maintain the prefix_itr between calls to avoid searching depleted prefixes
+  for ( ; prefix_itr != prefixes.end(); prefix_itr++) {
+    string prefix(get_prefix(pool, snap) + *prefix_itr);
     string pos = prefix;
     while (out->size() < max) {
       pair<string, ceph::buffer::list> next;
-      r = backend.get_next(pos, &next);
+      // access RocksDB (an expensive operation!)
+      int r = backend.get_next(pos, &next);
       dout(20) << __func__ << " get_next(" << pos << ") returns " << r
               << " " << next << dendl;
       if (r != 0) {
-       break; // Done
+       return; // Done
       }
 
       if (next.first.substr(0, prefix.size()) !=
          prefix) {
+       // TBD: we access the DB twice for the first object of each iterator...
        break; // Done with this prefix
       }
 
@@ -583,7 +595,56 @@ int SnapMapper::get_next_objects_to_trim(
       out->push_back(next_decoded.second);
       pos = next.first;
     }
+
+    if (out->size() >= max) {
+      return;
+    }
   }
+}
+
+int SnapMapper::get_next_objects_to_trim(
+  snapid_t snap,
+  unsigned max,
+  vector<hobject_t> *out)
+{
+  dout(20) << __func__ << "::snapid=" << snap << dendl;
+  ceph_assert(out);
+  ceph_assert(out->empty());
+
+  // if max would be 0, we return ENOENT and the caller would mistakenly
+  // trim the snaptrim queue
+  ceph_assert(max > 0);
+
+  // The prefix_itr is bound to a prefix_itr_snap so if we trim another snap
+  // we must reset the prefix_itr (should not happen normally)
+  if (prefix_itr_snap != snap) {
+    if (prefix_itr_snap == CEPH_NOSNAP) {
+      reset_prefix_itr(snap, "Trim begins");
+    }
+    else {
+      reset_prefix_itr(snap, "Unexpected snap change");
+    }
+  }
+
+  // when reaching the end of the DB reset the prefix_ptr and verify
+  // we didn't miss objects which were added after we started trimming
+  // This should never happen in reality because the snap was logically deleted
+  // before trimming starts (and so no new clone-objects could be added)
+  // For more info see PG::filter_snapc()
+  //
+  // We still like to be extra careful and run one extra loop over all prefixes
+  get_objects_by_prefixes(snap, max, out);
+  if (unlikely(out->size() == 0)) {
+    reset_prefix_itr(snap, "Second pass trim");
+    get_objects_by_prefixes(snap, max, out);
+
+    if (unlikely(out->size() > 0)) {
+      derr << __func__ << "::New Clone-Objects were added to Snap " << snap
+          << " after trimming was started" << dendl;
+    }
+    reset_prefix_itr(CEPH_NOSNAP, "Trim was completed successfully");
+  }
+
   if (out->size() == 0) {
     return -ENOENT;
   } else {
@@ -591,7 +652,6 @@ int SnapMapper::get_next_objects_to_trim(
   }
 }
 
-
 int SnapMapper::remove_oid(
   const hobject_t &oid,
   MapCacher::Transaction<std::string, ceph::buffer::list> *t)
@@ -621,7 +681,7 @@ int SnapMapper::_remove_oid(
   }
   if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
     for (auto& i : to_remove) {
-      dout(20) << __func__ << " rm " << i << dendl;
+      dout(20) << __func__ << "::rm " << i << dendl;
     }
   }
   backend.remove_keys(to_remove, t);
index eb43a23c2b0e0b3373b5d3bba731c2f8eff68ab7..a28b25970fb4709357b61494e7a3989a6aeacc35 100644 (file)
@@ -281,6 +281,20 @@ private:
   tl::expected<object_snaps, SnapMapReaderI::result_t> get_snaps_common(
     const hobject_t &hoid) const;
 
+  /// file @out vector with the first objects with @snap as a snap
+  void get_objects_by_prefixes(
+    snapid_t snap,
+    unsigned max,
+    std::vector<hobject_t> *out);
+
+  std::set<std::string>           prefixes;
+  // maintain a current active prefix
+  std::set<std::string>::iterator prefix_itr;
+  // associate the active prefix with a snap
+  snapid_t                        prefix_itr_snap;
+
+  // reset the prefix iterator to the first prefix hash
+  void reset_prefix_itr(snapid_t snap, const char *s);
  public:
   static std::string make_shard_prefix(shard_id_t shard) {
     if (shard == shard_id_t::NO_SHARD)
@@ -290,6 +304,7 @@ private:
     ceph_assert(r < (int)sizeof(buf));
     return std::string(buf, r) + '_';
   }
+
   uint32_t mask_bits;
   const uint32_t match;
   std::string last_key_checked;
@@ -309,7 +324,6 @@ private:
     update_bits(mask_bits);
   }
 
-  std::set<std::string> prefixes;
   /// Update bits in case of pg split or merge
   void update_bits(
     uint32_t new_bits  ///< [in] new split bits
@@ -323,6 +337,12 @@ private:
     for (auto i = _prefixes.begin(); i != _prefixes.end(); ++i) {
       prefixes.insert(shard_prefix + *i);
     }
+
+    reset_prefix_itr(CEPH_NOSNAP, "update_bits");
+  }
+
+  const std::set<std::string>::iterator get_prefix_itr() {
+    return prefix_itr;
   }
 
   /// Update snaps for oid, empty new_snaps removes the mapping
index f8e2f2a1b0116c097f2dfc4413e484355b0ddddb..ecd7dddda8d36ff2139181cb9f4d43a5ab8a0c6b 100644 (file)
@@ -466,6 +466,10 @@ add_executable(ceph_test_snap_mapper
   $<TARGET_OBJECTS:unit-main>
   )
 target_link_libraries(ceph_test_snap_mapper osd global ${BLKID_LIBRARIES} ${UNITTEST_LIBS})
+
+install(TARGETS
+  ceph_test_snap_mapper
+  DESTINATION ${CMAKE_INSTALL_BINDIR})
 endif(NOT WIN32)
 
 add_executable(ceph_test_stress_watch
index e502892cc42f129916987e1c23d0fa55cfc82181..9fe726afb1f9b1f7c12ff1d255c7bbb09928a702 100644 (file)
@@ -461,7 +461,19 @@ public:
     uint32_t bits)
     : driver(driver),
       mapper(new SnapMapper(g_ceph_context, driver, mask, bits, 0, shard_id_t(1))),
-             mask(mask), bits(bits) {}
+            mask(mask), bits(bits) {}
+
+  hobject_t create_hobject(
+    unsigned           idx,
+    snapid_t           snapid,
+    int64_t            pool,
+    const std::string& nspace) {
+    const object_t    oid("OID" + std::to_string(idx));
+    const std::string key("KEY" + std::to_string(idx));
+    const uint32_t    hash = (idx & ((~0)<<bits)) | (mask & ~((~0)<<bits));
+
+    return hobject_t(oid, key, snapid, hash, pool, nspace);
+  }
 
   hobject_t random_hobject() {
     return hobject_t(
@@ -480,9 +492,28 @@ public:
     }
   }
 
-  void create_snap() {
-    snap_to_hobject[next];
+  snapid_t create_snap() {
+    snapid_t snapid = next;
+    snap_to_hobject[snapid];
     ++next;
+
+    return snapid;
+  }
+
+  // must be called with lock held to protect access to
+  // hobject_to_snap and snap_to_hobject
+  void add_object_to_snaps(const hobject_t & obj, const set<snapid_t> &snaps) {
+    hobject_to_snap[obj] = snaps;
+    for (auto snap : snaps) {
+      map<snapid_t, set<hobject_t> >::iterator j = snap_to_hobject.find(snap);
+      ceph_assert(j != snap_to_hobject.end());
+      j->second.insert(obj);
+    }
+    {
+      PausyAsyncMap::Transaction t;
+      mapper->add_oid(obj, snaps, &t);
+      driver->submit(&t);
+    }
   }
 
   void create_object() {
@@ -494,20 +525,9 @@ public:
       obj = random_hobject();
     } while (hobject_to_snap.count(obj));
 
-    set<snapid_t> &snaps = hobject_to_snap[obj];
+    set<snapid_t> snaps;
     choose_random_snaps(1 + (rand() % 20), &snaps);
-    for (set<snapid_t>::iterator i = snaps.begin();
-        i != snaps.end();
-        ++i) {
-      map<snapid_t, set<hobject_t> >::iterator j = snap_to_hobject.find(*i);
-      ceph_assert(j != snap_to_hobject.end());
-      j->second.insert(obj);
-    }
-    {
-      PausyAsyncMap::Transaction t;
-      mapper->add_oid(obj, snaps, &t);
-      driver->submit(&t);
-    }
+    add_object_to_snaps(obj, snaps);
   }
 
   std::pair<std::string, ceph::buffer::list> to_raw(
@@ -535,27 +555,23 @@ public:
     return mapper->make_purged_snap_key(std::forward<Args>(args)...);
   }
 
-  void trim_snap() {
-    std::lock_guard l{lock};
-    if (snap_to_hobject.empty())
-      return;
-    map<snapid_t, set<hobject_t> >::iterator snap =
-      rand_choose(snap_to_hobject);
-    set<hobject_t> hobjects = snap->second;
-
+  // must be called with lock held to protect access to
+  // snap_to_hobject and hobject_to_snap
+  int trim_snap(snapid_t snapid, unsigned max_count, vector<hobject_t> & out) {
+    set<hobject_t>&   hobjects = snap_to_hobject[snapid];
     vector<hobject_t> hoids;
-    while (mapper->get_next_objects_to_trim(
-            snap->first, rand() % 5 + 1, &hoids) == 0) {
+    int ret = mapper->get_next_objects_to_trim(snapid, max_count, &hoids);
+    if (ret == 0) {
+      out.insert(out.end(), hoids.begin(), hoids.end());
       for (auto &&hoid: hoids) {
        ceph_assert(!hoid.is_max());
        ceph_assert(hobjects.count(hoid));
        hobjects.erase(hoid);
 
-       map<hobject_t, set<snapid_t>>::iterator j =
-         hobject_to_snap.find(hoid);
-       ceph_assert(j->second.count(snap->first));
+       map<hobject_t, set<snapid_t>>::iterator j = hobject_to_snap.find(hoid);
+       ceph_assert(j->second.count(snapid));
        set<snapid_t> old_snaps(j->second);
-       j->second.erase(snap->first);
+       j->second.erase(snapid);
 
        {
          PausyAsyncMap::Transaction t;
@@ -573,6 +589,48 @@ public:
       }
       hoids.clear();
     }
+    return ret;
+  }
+
+  // must be called with lock held to protect access to
+  // snap_to_hobject and hobject_to_snap in trim_snap
+  // will keep trimming until reaching max_count or failing a call to trim_snap()
+  void trim_snap_force(snapid_t           snapid,
+                      unsigned           max_count,
+                      vector<hobject_t>& out) {
+    int               guard = 1000;
+    vector<hobject_t> tmp;
+    unsigned          prev_size = 0;
+    while (tmp.size() < max_count) {
+      unsigned req_size = max_count - tmp.size();
+      // each call adds more objects into the tmp vector
+      trim_snap(snapid, req_size, tmp);
+      if (prev_size < tmp.size()) {
+       prev_size = tmp.size();
+      }
+      else{
+       // the tmp vector size was not increased in the last call
+       // which means we were unable to find anything to trim
+       break;
+      }
+      ceph_assert(--guard > 0);
+    }
+    out.insert(out.end(), tmp.begin(), tmp.end());
+  }
+
+  void trim_snap() {
+    std::lock_guard l{lock};
+    if (snap_to_hobject.empty()) {
+      return;
+    }
+    int ret = 0;
+    map<snapid_t, set<hobject_t> >::iterator snap = rand_choose(snap_to_hobject);
+    do {
+      int max_count = rand() % 5 + 1;
+      vector<hobject_t> out;
+      ret = trim_snap(snap->first, max_count, out);
+    } while(ret == 0);
+    set<hobject_t> hobjects = snap->second;
     ceph_assert(hobjects.empty());
     snap_to_hobject.erase(snap);
   }
@@ -612,6 +670,189 @@ public:
     ceph_assert(r == 0);
     ASSERT_EQ(snaps, obj->second);
   }
+
+  void test_prefix_itr() {
+    // protects access to snap_to_hobject and hobject_to_snap
+    std::lock_guard   l{lock};
+    snapid_t          snapid = create_snap();
+    // we initialize 32 PGS
+    ceph_assert(bits == 5);
+
+    const int64_t     pool(0);
+    const std::string nspace("GBH");
+    set<snapid_t>     snaps = { snapid };
+    set<hobject_t>&   hobjects = snap_to_hobject[snapid];
+    vector<hobject_t> trimmed_objs;
+    vector<hobject_t> stored_objs;
+
+    // add objects 0, 32, 64, 96, 128, 160, 192, 224
+    // which should hit all the prefixes
+    for (unsigned idx = 0; idx < 8; idx++) {
+      hobject_t hobj = create_hobject(idx * 32, snapid, pool, nspace);
+      add_object_to_snaps(hobj, snaps);
+      stored_objs.push_back(hobj);
+    }
+    ceph_assert(hobjects.size() == 8);
+
+    // trim 0, 32, 64, 96
+    trim_snap(snapid, 4, trimmed_objs);
+    ceph_assert(hobjects.size() == 4);
+
+    // add objects (3, 35, 67) before the prefix_itr position
+    // to force an iteartor reset later
+    for (unsigned idx = 0; idx < 3; idx++) {
+      hobject_t hobj = create_hobject(idx * 32 + 3, snapid, pool, nspace);
+      add_object_to_snaps(hobj, snaps);
+      stored_objs.push_back(hobj);
+    }
+    ceph_assert(hobjects.size() == 7);
+
+    // will now trim 128, 160, 192, 224
+    trim_snap(snapid, 4, trimmed_objs);
+    ceph_assert(hobjects.size() == 3);
+
+    // finally, trim 3, 35, 67
+    // This will force a reset to the prefix_itr (which is what we test here)
+    trim_snap(snapid, 3, trimmed_objs);
+    ceph_assert(hobjects.size() == 0);
+
+    ceph_assert(trimmed_objs.size() == 11);
+    // trimmed objs must be in the same order they were inserted
+    // this will prove that the second call to add_object_to_snaps inserted
+    // them before the current prefix_itr
+    ceph_assert(trimmed_objs.size() == stored_objs.size());
+    ceph_assert(std::equal(trimmed_objs.begin(), trimmed_objs.end(),
+                          stored_objs.begin()));
+    snap_to_hobject.erase(snapid);
+  }
+
+  // insert 256 objects which should populate multiple prefixes
+  // trim until we change prefix and then insert an old object
+  // which we know for certain belongs to a prefix before prefix_itr
+  void test_prefix_itr2() {
+    // protects access to snap_to_hobject and hobject_to_snap
+    std::lock_guard   l{lock};
+    snapid_t          snapid = create_snap();
+    // we initialize 32 PGS
+    ceph_assert(bits == 5);
+
+    const int64_t     pool(0);
+    const std::string nspace("GBH");
+    set<snapid_t>     snaps = { snapid };
+    vector<hobject_t> trimmed_objs;
+    vector<hobject_t> stored_objs;
+
+    constexpr unsigned MAX_IDX = 256;
+    for (unsigned idx = 0; idx < MAX_IDX; idx++) {
+      hobject_t hobj = create_hobject(idx, snapid, pool, nspace);
+      add_object_to_snaps(hobj, snaps);
+      stored_objs.push_back(hobj);
+    }
+
+    hobject_t dup_hobj;
+    bool      found = false;
+    trim_snap(snapid, 1, trimmed_objs);
+    const std::set<std::string>::iterator itr = mapper->get_prefix_itr();
+    for (unsigned idx = 1; idx < MAX_IDX + 1; idx++) {
+      trim_snap(snapid, 1, trimmed_objs);
+      if (!found && mapper->get_prefix_itr() != itr) {
+       // we changed prefix -> insert an OBJ belonging to perv prefix
+       dup_hobj = create_hobject(idx - 1, snapid, pool, nspace);
+       add_object_to_snaps(dup_hobj, snaps);
+       stored_objs.push_back(dup_hobj);
+       found = true;
+      }
+    }
+    ceph_assert(found);
+
+    sort(trimmed_objs.begin(), trimmed_objs.end());
+    sort(stored_objs.begin(),  stored_objs.end());
+    ceph_assert(trimmed_objs.size() == MAX_IDX+1);
+    ceph_assert(trimmed_objs.size() == stored_objs.size());
+    ceph_assert(std::equal(trimmed_objs.begin(), trimmed_objs.end(),
+                          stored_objs.begin()));
+    snap_to_hobject.erase(snapid);
+  }
+
+  void add_rand_hobjects(unsigned           count,
+                        snapid_t           snapid,
+                        int64_t            pool,
+                        const std::string& nspace,
+                        vector<hobject_t>& stored_objs) {
+    constexpr unsigned MAX_VAL = 1000;
+    set<snapid_t> snaps = { snapid };
+    for (unsigned i = 0; i < count; i++) {
+      hobject_t hobj;
+      do {
+       unsigned val = rand() % MAX_VAL;
+       hobj = create_hobject(val, snapid, pool, nspace);
+      }while (hobject_to_snap.count(hobj));
+      add_object_to_snaps(hobj, snaps);
+      stored_objs.push_back(hobj);
+    }
+  }
+
+  // Start with a set of random objects then run a partial trim
+  // followed by another random insert
+  // This should cause *some* objects to be added before the prefix_itr
+  // and will verify that we still remove them
+  void test_prefix_itr_rand() {
+    // protects access to snap_to_hobject and hobject_to_snap
+    std::lock_guard   l{lock};
+    snapid_t          snapid = create_snap();
+    // we initialize 32 PGS
+    ceph_assert(bits == 5);
+
+    const int64_t     pool(0);
+    const std::string nspace("GBH");
+    vector<hobject_t> trimmed_objs;
+    vector<hobject_t> stored_objs;
+    set<hobject_t>&   hobjects = snap_to_hobject[snapid];
+    ceph_assert(hobjects.size() == 0);
+
+    // add 100 random objects
+    add_rand_hobjects(100, snapid, pool, nspace, stored_objs);
+    ceph_assert(hobjects.size() == 100);
+
+    // trim the first 75 objects leaving 25 objects
+    trim_snap(snapid, 75, trimmed_objs);
+    ceph_assert(hobjects.size() == 25);
+
+    // add another 25 random objects (now we got 50 objects)
+    add_rand_hobjects(25, snapid, pool, nspace, stored_objs);
+    ceph_assert(hobjects.size() == 50);
+
+    // trim 49 objects leaving a single object
+    // we must use a wrapper function to keep trimming while until -ENOENT
+    trim_snap_force(snapid, 49, trimmed_objs);
+    ceph_assert(hobjects.size() == 1);
+
+    // add another 9 random objects (now we got 10 objects)
+    add_rand_hobjects(9, snapid, pool, nspace, stored_objs);
+    ceph_assert(hobjects.size() == 10);
+
+    // trim 10 objects leaving no object in store
+    trim_snap_force(snapid, 10, trimmed_objs);
+    ceph_assert(hobjects.size() == 0);
+
+    // add 10 random objects (now we got 10 objects)
+    add_rand_hobjects(10, snapid, pool, nspace, stored_objs);
+    ceph_assert(hobjects.size() == 10);
+
+    // trim 10 objects leaving no object in store
+    trim_snap_force(snapid, 10, trimmed_objs);
+    ceph_assert(hobjects.size() == 0);
+
+    sort(trimmed_objs.begin(), trimmed_objs.end());
+    sort(stored_objs.begin(),  stored_objs.end());
+    ceph_assert(trimmed_objs.size() == 144);
+    ceph_assert(trimmed_objs.size() == stored_objs.size());
+
+    bool are_equal = std::equal(trimmed_objs.begin(), trimmed_objs.end(),
+                               stored_objs.begin());
+    ceph_assert(are_equal);
+    snap_to_hobject.erase(snapid);
+  }
 };
 
 class SnapMapperTest : public ::testing::Test {
@@ -675,6 +916,27 @@ protected:
   }
 };
 
+// This test creates scenarios which are impossible to get with normal code.
+// The normal code deletes the snap before calling TRIM and so no new clones
+// can be added to that snap.
+// Our test calls get_next_objects_to_trim() *without* deleting the snap first.
+// This allows us to add objects to the (non-deleted) snap after trimming began.
+// We test that SnapTrim will find them even when added into positions before the prefix_itr.
+// Since those tests are doing illegal inserts we must disable osd_debug_trim_objects
+// during those tests as otherwise the code will assert.
+TEST_F(SnapMapperTest, prefix_itr) {
+  bool orig_val = g_ceph_context->_conf.get_val<bool>("osd_debug_trim_objects");
+  std::cout << "osd_debug_trim_objects = " << orig_val << std::endl;
+  g_ceph_context->_conf.set_val("osd_debug_trim_objects", std::to_string(false));
+  init(32);
+  get_tester().test_prefix_itr();
+  get_tester().test_prefix_itr2();
+  get_tester().test_prefix_itr_rand();
+  g_ceph_context->_conf.set_val("osd_debug_trim_objects", std::to_string(orig_val));
+  bool curr_val = g_ceph_context->_conf.get_val<bool>("osd_debug_trim_objects");
+  ceph_assert(curr_val == orig_val);
+}
+
 TEST_F(SnapMapperTest, Simple) {
   init(1);
   get_tester().create_snap();
@@ -806,7 +1068,7 @@ public:
   DirectMapper(
     uint32_t mask,
     uint32_t bits)
-   : mapper(new SnapMapper(g_ceph_context, driver.get(), mask, bits, 0, shard_id_t(1))), 
+    : mapper(new SnapMapper(g_ceph_context, driver.get(), mask, bits, 0, shard_id_t(1))),
              mask(mask), bits(bits) {}
 
   hobject_t random_hobject() {