]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: add a 'delete' flag to missing items and related functions
authorJosh Durgin <jdurgin@redhat.com>
Mon, 26 Jun 2017 22:09:27 +0000 (18:09 -0400)
committerJosh Durgin <jdurgin@redhat.com>
Mon, 17 Jul 2017 06:00:35 +0000 (02:00 -0400)
This will track deletes that were in the pg log and still need to be
performed during recovery. Note that with these deleted objects we may
not have an accurate 'have' version, since the object may have already
been deleted locally, so tolerate this when examining divergent entries.

Signed-off-by: Josh Durgin <jdurgin@redhat.com>
src/osd/PGLog.cc
src/osd/PGLog.h
src/osd/osd_types.h
src/test/osd/TestPGLog.cc
src/test/osd/types.cc

index e85c6cc3333a327ecf9b86ea8b7c71bb84c99bea..9830e5b05b19b15db508932d89d5cc2d08378003 100644 (file)
@@ -671,7 +671,8 @@ void PGLog::_write_log_and_missing(
       if (!missing.is_missing(obj, &item)) {
        to_remove.insert(key);
       } else {
-       ::encode(make_pair(obj, item), (*km)[key]);
+       ::encode(obj, (*km)[key]);
+       item.encode_with_flags((*km)[key]);
       }
     });
   if (require_rollback) {
index 05ff7faf428407fd21f8f0b6b7c64b061bd89af1..8a21516d4fcb909e3bd004cac14d8621c7f84fd8 100644 (file)
@@ -576,12 +576,8 @@ public:
     missing.revise_have(oid, have);
   }
 
-  void revise_need(hobject_t oid, eversion_t need) {
-    missing.revise_need(oid, need);
-  }
-
   void missing_add(const hobject_t& oid, eversion_t need, eversion_t have) {
-    missing.add(oid, need, have);
+    missing.add(oid, need, have, false);
   }
 
   //////////////////// get or set log ////////////////////
@@ -857,7 +853,7 @@ protected:
        ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                           << " missing.have is " << missing.get_items().at(hoid).have
                           << ", adjusting" << dendl;
-       missing.revise_need(hoid, prior_version);
+       missing.revise_need(hoid, prior_version, false);
        if (prior_version <= info.log_tail) {
          ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                             << " prior_version " << prior_version
@@ -915,7 +911,7 @@ protected:
          rollbacker->trim(i);
        }
       }
-      missing.add(hoid, prior_version, eversion_t());
+      missing.add(hoid, prior_version, eversion_t(), false);
       if (prior_version <= info.log_tail) {
        ldpp_dout(dpp, 10) << __func__ << ": hoid " << hoid
                           << " prior_version " << prior_version
@@ -1016,7 +1012,7 @@ public:
          // hack to match PG::mark_all_unfound_lost
          if (maintain_rollback && p->is_lost_delete() && p->can_rollback()) {
            rollbacker->try_stash(p->soid, p->version.version);
-         } else if (p->is_delete()) {
+         } else if (p->is_lost_delete()) {
            rollbacker->remove(p->soid);
          }
        }
@@ -1163,9 +1159,11 @@ public:
        } else if (p->key() == "rollback_info_trimmed_to") {
          ::decode(on_disk_rollback_info_trimmed_to, bp);
        } else if (p->key().substr(0, 7) == string("missing")) {
-         pair<hobject_t, pg_missing_item> p;
-         ::decode(p, bp);
-         missing.add(p.first, p.second.need, p.second.have);
+         hobject_t oid;
+         pg_missing_item item;
+         ::decode(oid, bp);
+         item.decode_with_flags(bp);
+         missing.add(oid, item.need, item.have, item.is_delete());
        } else {
          pg_log_entry_t e;
          e.decode_with_checksum(bp);
@@ -1209,7 +1207,8 @@ public:
          if (did.count(i->soid)) continue;
          did.insert(i->soid);
 
-         if (i->is_delete()) continue;
+         // TODO: enable only if we aren't tracking deletes in the log
+         if (i->is_lost_delete()) continue;
 
          bufferlist bv;
          int r = store->getattr(
@@ -1229,19 +1228,25 @@ public:
                assert(miter->second.have == oi.version);
                checked.insert(i->soid);
              } else {
-               missing.add(i->soid, i->version, oi.version);
+               missing.add(i->soid, i->version, oi.version, i->is_delete());
              }
            }
          } else {
            ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
            if (debug_verify_stored_missing) {
              auto miter = missing.get_items().find(i->soid);
-             assert(miter != missing.get_items().end());
-             assert(miter->second.need == i->version);
-             assert(miter->second.have == eversion_t());
+             if (i->is_delete()) {
+               assert(miter == missing.get_items().end() ||
+                      (miter->second.need == i->version &&
+                       miter->second.have == eversion_t()));
+             } else {
+               assert(miter != missing.get_items().end());
+               assert(miter->second.need == i->version);
+               assert(miter->second.have == eversion_t());
+             }
              checked.insert(i->soid);
            } else {
-             missing.add(i->soid, i->version, eversion_t());
+             missing.add(i->soid, i->version, eversion_t(), i->is_delete());
            }
          }
        }
@@ -1252,7 +1257,8 @@ public:
            if (i.second.need > log.tail ||
              i.first > info.last_backfill) {
              ldpp_dout(dpp, -1) << __func__ << ": invalid missing set entry found "
-                                << i.first
+                                << i.first << " " << i.second << " log tail = "
+                                << log.tail << " last_backfill = " << info.last_backfill
                                 << dendl;
              assert(0 == "invalid missing set entry found");
            }
@@ -1266,7 +1272,7 @@ public:
              object_info_t oi(bv);
              assert(oi.version == i.second.have);
            } else {
-             assert(eversion_t() == i.second.have);
+             assert(i.second.is_delete() || eversion_t() == i.second.have);
            }
          }
        } else {
@@ -1315,7 +1321,7 @@ public:
              }
            } else {
              ldpp_dout(dpp, 15) << "read_log_and_missing  missing " << *i << dendl;
-             missing.add(i->second, i->first, eversion_t());
+             missing.add(i->second, i->first, eversion_t(), false);
            }
          }
        }
index 2507e64b86c11da6624ac5b4cf9fee60583f7e62..18e75e686a8f421cc10dfac04ca557f2bb10d428 100644 (file)
@@ -3582,9 +3582,15 @@ inline ostream& operator<<(ostream& out, const pg_log_t& log)
  */
 struct pg_missing_item {
   eversion_t need, have;
-  pg_missing_item() {}
-  explicit pg_missing_item(eversion_t n) : need(n) {}  // have no old version
-  pg_missing_item(eversion_t n, eversion_t h) : need(n), have(h) {}
+  enum missing_flags_t {
+    FLAG_NONE = 0,
+    FLAG_DELETE = 1,
+  } flags;
+  pg_missing_item() : flags(FLAG_NONE) {}
+  explicit pg_missing_item(eversion_t n) : need(n), flags(FLAG_NONE) {}  // have no old version
+  pg_missing_item(eversion_t n, eversion_t h, bool is_delete=false) : need(n), have(h) {
+    set_delete(is_delete);
+  }
 
   void encode(bufferlist& bl) const {
     ::encode(need, bl);
@@ -3594,18 +3600,48 @@ struct pg_missing_item {
     ::decode(need, bl);
     ::decode(have, bl);
   }
+
+  void set_delete(bool is_delete) {
+    flags = is_delete ? FLAG_DELETE : FLAG_NONE;
+  }
+
+  bool is_delete() const {
+    return (flags & FLAG_DELETE) == FLAG_DELETE;
+  }
+
+  void encode_with_flags(bufferlist& bl) const {
+    encode(bl);
+    ::encode(static_cast<uint8_t>(flags), bl);
+  }
+
+  void decode_with_flags(bufferlist::iterator& bl) {
+    decode(bl);
+    // no versioning on this, but it's stored in a single omap value,
+    // so just check for the end of the bufferlist
+    if (!bl.end()) {
+      uint8_t f;
+      ::decode(f, bl);
+      flags = static_cast<missing_flags_t>(f);
+    }
+  }
+
   void dump(Formatter *f) const {
     f->dump_stream("need") << need;
     f->dump_stream("have") << have;
+    f->dump_stream("flags") << (flags == FLAG_NONE ? "none" : "delete");
   }
   static void generate_test_instances(list<pg_missing_item*>& o) {
     o.push_back(new pg_missing_item);
     o.push_back(new pg_missing_item);
     o.back()->need = eversion_t(1, 2);
     o.back()->have = eversion_t(1, 1);
+    o.push_back(new pg_missing_item);
+    o.back()->need = eversion_t(3, 5);
+    o.back()->have = eversion_t(3, 4);
+    o.back()->flags = FLAG_DELETE;
   }
   bool operator==(const pg_missing_item &rhs) const {
-    return need == rhs.need && have == rhs.have;
+    return need == rhs.need && have == rhs.have && flags == rhs.flags;
   }
   bool operator!=(const pg_missing_item &rhs) const {
     return !(*this == rhs);
@@ -3728,43 +3764,40 @@ public:
    * assumes missing is accurate up through the previous log entry.
    */
   void add_next_event(const pg_log_entry_t& e) {
-    if (e.is_update()) {
-      map<hobject_t, item>::iterator missing_it;
-      missing_it = missing.find(e.soid);
-      bool is_missing_divergent_item = missing_it != missing.end();
-      if (e.prior_version == eversion_t() || e.is_clone()) {
-       // new object.
-       if (is_missing_divergent_item) {  // use iterator
-         rmissing.erase((missing_it->second).need.version);
-         missing_it->second = item(e.version, eversion_t());  // .have = nil
-       } else  // create new element in missing map
-         missing[e.soid] = item(e.version, eversion_t());     // .have = nil
-      } else if (is_missing_divergent_item) {
-       // already missing (prior).
+    map<hobject_t, item>::iterator missing_it;
+    missing_it = missing.find(e.soid);
+    bool is_missing_divergent_item = missing_it != missing.end();
+    if (e.prior_version == eversion_t() || e.is_clone()) {
+      // new object.
+      if (is_missing_divergent_item) {  // use iterator
        rmissing.erase((missing_it->second).need.version);
-       (missing_it->second).need = e.version;  // leave .have unchanged.
-      } else if (e.is_backlog()) {
-       // May not have prior version
-       assert(0 == "these don't exist anymore");
-      } else {
-       // not missing, we must have prior_version (if any)
-       assert(!is_missing_divergent_item);
-       missing[e.soid] = item(e.version, e.prior_version);
-      }
-      rmissing[e.version.version] = e.soid;
-    } else if (e.is_delete()) {
-      rm(e.soid, e.version);
+       missing_it->second = item(e.version, eversion_t(), e.is_delete());  // .have = nil
+      } else  // create new element in missing map
+       missing[e.soid] = item(e.version, eversion_t(), e.is_delete());     // .have = nil
+    } else if (is_missing_divergent_item) {
+      // already missing (prior).
+      rmissing.erase((missing_it->second).need.version);
+      (missing_it->second).need = e.version;  // leave .have unchanged.
+      missing_it->second.set_delete(e.is_delete());
+    } else if (e.is_backlog()) {
+      // May not have prior version
+      assert(0 == "these don't exist anymore");
+    } else {
+      // not missing, we must have prior_version (if any)
+      assert(!is_missing_divergent_item);
+      missing[e.soid] = item(e.version, e.prior_version, e.is_delete());
     }
-
+    rmissing[e.version.version] = e.soid;
     tracker.changed(e.soid);
   }
 
-  void revise_need(hobject_t oid, eversion_t need) {
+  void revise_need(hobject_t oid, eversion_t need, bool is_delete) {
     if (missing.count(oid)) {
       rmissing.erase(missing[oid].need.version);
       missing[oid].need = need;            // no not adjust .have
+      missing[oid].set_delete(is_delete);
     } else {
-      missing[oid] = item(need, eversion_t());
+      missing[oid] = item(need, eversion_t(), is_delete);
     }
     rmissing[need.version] = oid;
 
@@ -3778,8 +3811,9 @@ public:
     }
   }
 
-  void add(const hobject_t& oid, eversion_t need, eversion_t have) {
-    missing[oid] = item(need, have);
+  void add(const hobject_t& oid, eversion_t need, eversion_t have,
+          bool is_delete) {
+    missing[oid] = item(need, have, is_delete);
     rmissing[need.version] = oid;
     tracker.changed(oid);
   }
@@ -3799,7 +3833,7 @@ public:
   void got(const hobject_t& oid, eversion_t v) {
     std::map<hobject_t, item>::iterator p = missing.find(oid);
     assert(p != missing.end());
-    assert(p->second.need <= v);
+    assert(p->second.need <= v || p->second.is_delete());
     got(p);
   }
 
@@ -3818,7 +3852,8 @@ public:
         i != missing.end();
       ) {
       if ((i->first.get_hash() & mask) == child_pgid.m_seed) {
-       omissing->add(i->first, i->second.need, i->second.have);
+       omissing->add(i->first, i->second.need, i->second.have,
+                     i->second.is_delete());
        rm(i++);
       } else {
        ++i;
@@ -3834,15 +3869,33 @@ public:
   }
 
   void encode(bufferlist &bl) const {
-    ENCODE_START(3, 2, bl);
+    ENCODE_START(4, 2, bl);
     ::encode(missing, bl);
+    // since pg_missing_item was not versioned, we encode the new flags
+    // field here explicitly
+    map<hobject_t, uint8_t> missing_flags;
+    for (const auto &p : missing) {
+      if (p.second.flags != pg_missing_item::FLAG_NONE) {
+       missing_flags.insert(make_pair(p.first,
+                                      static_cast<uint8_t>(p.second.flags)));
+      }
+    }
+    ::encode(missing_flags, bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::iterator &bl, int64_t pool = -1) {
     for (auto const &i: missing)
       tracker.changed(i.first);
-    DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
+    DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
     ::decode(missing, bl);
+    if (struct_v >= 4) {
+      map<hobject_t, uint8_t> missing_flags;
+      ::decode(missing_flags, bl);
+      for (const auto &p : missing_flags) {
+       assert(missing.find(p.first) != missing.end());
+       missing[p.first].flags = static_cast<pg_missing_item::missing_flags_t>(p.second);
+      }
+    }
     DECODE_FINISH(bl);
 
     if (struct_v < 3) {
@@ -3898,7 +3951,7 @@ public:
     o.push_back(new pg_missing_set);
     o.back()->add(
       hobject_t(object_t("foo"), "foo", 123, 456, 0, ""),
-      eversion_t(5, 6), eversion_t(5, 1));
+      eversion_t(5, 6), eversion_t(5, 1), true);
   }
   template <typename F>
   void get_changed(F &&f) const {
index 62364e66992620a783a4fdf9cfef526fa323b84a..b2fa2955cc6db385a5c4dad34318d5e2e677f761 100644 (file)
@@ -769,7 +769,7 @@ TEST_F(PGLogTest, merge_old_entry) {
     oe.op = pg_log_entry_t::MODIFY;
     oe.prior_version = eversion_t();
 
-    missing.add(oe.soid, eversion_t(1,1), eversion_t());
+    missing.add(oe.soid, eversion_t(1,1), eversion_t(), false);
 
     missing.flush();
     EXPECT_FALSE(is_dirty());
@@ -1096,9 +1096,10 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_EQ(1U, log.objects.count(divergent_object));
     EXPECT_EQ(4U, log.log.size());
     /* DELETE entries from olog that are appended to the hed of the
-       log are also added to remove_snap.
+       log, and the divergent version of the object is removed (added
+       to remove_snap)
      */
-    EXPECT_EQ(0x7U, remove_snap.front().get_hash());
+    EXPECT_EQ(0x9U, remove_snap.front().get_hash());
     EXPECT_EQ(log.head, info.last_update);
     EXPECT_TRUE(info.purged_snaps.contains(purged_snap));
     EXPECT_TRUE(is_dirty());
@@ -1572,7 +1573,7 @@ TEST_F(PGLogTest, proc_replica_log) {
       e.prior_version = eversion_t(1, 1);
       e.soid = divergent_object;
       divergent_object = e.soid;
-      omissing.add(divergent_object, e.version, eversion_t());
+      omissing.add(divergent_object, e.version, eversion_t(), false);
       e.op = pg_log_entry_t::MODIFY;
       olog.log.push_back(e);
       olog.head = e.version;
@@ -1665,7 +1666,7 @@ TEST_F(PGLogTest, proc_replica_log) {
       e.prior_version = eversion_t(1, 1);
       e.soid.set_hash(0x9);
       divergent_object = e.soid;
-      omissing.add(divergent_object, e.version, eversion_t());
+      omissing.add(divergent_object, e.version, eversion_t(), false);
       e.op = pg_log_entry_t::MODIFY;
       olog.log.push_back(e);
       olog.head = e.version;
@@ -1696,7 +1697,7 @@ TEST_F(PGLogTest, merge_log_1) {
 
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1725,7 +1726,7 @@ TEST_F(PGLogTest, merge_log_3) {
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 102), mk_evt(10, 101)));
 
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1740,8 +1741,8 @@ TEST_F(PGLogTest, merge_log_4) {
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 102), mk_evt(10, 101)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 102), mk_evt(0, 0));
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.init.add(mk_obj(1), mk_evt(10, 102), mk_evt(0, 0), false);
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.setup();
   run_test_case(t);
@@ -1756,7 +1757,7 @@ TEST_F(PGLogTest, merge_log_5) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1770,7 +1771,7 @@ TEST_F(PGLogTest, merge_log_6) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100));
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100), false);
 
   t.setup();
   run_test_case(t);
@@ -1782,8 +1783,8 @@ TEST_F(PGLogTest, merge_log_7) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80));
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80));
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80), false);
 
   t.setup();
   run_test_case(t);
@@ -1795,9 +1796,8 @@ TEST_F(PGLogTest, merge_log_8) {
 
   t.auth.push_back(mk_ple_dt(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80));
-
-  t.toremove.insert(mk_obj(1));
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80), true);
 
   t.setup();
   run_test_case(t);
@@ -1809,7 +1809,7 @@ TEST_F(PGLogTest, merge_log_prior_version_have) {
 
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100));
+  t.init.add(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100), false);
 
   t.setup();
   run_test_case(t);
@@ -1825,7 +1825,7 @@ TEST_F(PGLogTest, merge_log_split_missing_entries_at_head) {
   t.setup();
   t.set_div_bounds(mk_evt(9, 79), mk_evt(8, 69));
   t.set_auth_bounds(mk_evt(15, 160), mk_evt(9, 77));
-  t.final.add(mk_obj(1), mk_evt(15, 150), mk_evt(8, 70));
+  t.final.add(mk_obj(1), mk_evt(15, 150), mk_evt(8, 70), false);
   run_test_case(t);
 }
 
@@ -1838,7 +1838,7 @@ TEST_F(PGLogTest, olog_tail_gt_log_tail_split) {
   t.setup();
   t.set_div_bounds(mk_evt(15, 153), mk_evt(15, 151));
   t.set_auth_bounds(mk_evt(15, 156), mk_evt(10, 99));
-  t.final.add(mk_obj(1), mk_evt(15, 155), mk_evt(15, 150));
+  t.final.add(mk_obj(1), mk_evt(15, 155), mk_evt(15, 150), false);
   run_test_case(t);
 }
 
@@ -1852,7 +1852,7 @@ TEST_F(PGLogTest, olog_tail_gt_log_tail_split2) {
   t.setup();
   t.set_div_bounds(mk_evt(15, 153), mk_evt(15, 151));
   t.set_auth_bounds(mk_evt(16, 156), mk_evt(10, 99));
-  t.final.add(mk_obj(1), mk_evt(16, 155), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(16, 155), mk_evt(0, 0), false);
   t.toremove.insert(mk_obj(1));
   run_test_case(t);
 }
index 13159a5d073bcbf568bc91f2abf056fb2c37d6f6..5a1020a441c37a6ab242b03e972df6ae24f4dabc 100644 (file)
@@ -682,7 +682,7 @@ TEST(pg_missing_t, have_missing)
   hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_FALSE(missing.have_missing());
-  missing.add(oid, eversion_t(), eversion_t());
+  missing.add(oid, eversion_t(), eversion_t(), false);
   EXPECT_TRUE(missing.have_missing());
 }
 
@@ -691,7 +691,7 @@ TEST(pg_missing_t, claim)
   hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_FALSE(missing.have_missing());
-  missing.add(oid, eversion_t(), eversion_t());
+  missing.add(oid, eversion_t(), eversion_t(), false);
   EXPECT_TRUE(missing.have_missing());
 
   pg_missing_t other;
@@ -708,7 +708,7 @@ TEST(pg_missing_t, is_missing)
     hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
-    missing.add(oid, eversion_t(), eversion_t());
+    missing.add(oid, eversion_t(), eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
   }
 
@@ -718,7 +718,7 @@ TEST(pg_missing_t, is_missing)
     pg_missing_t missing;
     eversion_t need(10,5);
     EXPECT_FALSE(missing.is_missing(oid, eversion_t()));
-    missing.add(oid, need, eversion_t());
+    missing.add(oid, need, eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
     EXPECT_FALSE(missing.is_missing(oid, eversion_t()));
     EXPECT_TRUE(missing.is_missing(oid, need));
@@ -730,7 +730,7 @@ TEST(pg_missing_t, have_old)
   hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_EQ(eversion_t(), missing.have_old(oid));
-  missing.add(oid, eversion_t(), eversion_t());
+  missing.add(oid, eversion_t(), eversion_t(), false);
   EXPECT_EQ(eversion_t(), missing.have_old(oid));
   eversion_t have(1,1);
   missing.revise_have(oid, have);
@@ -873,44 +873,13 @@ TEST(pg_missing_t, add_next_event)
     e.op = pg_log_entry_t::DELETE;
     EXPECT_TRUE(e.is_delete());
     missing.add_next_event(e);
-    EXPECT_FALSE(missing.have_missing());
-  }
-
-  // ERROR op should only be used for dup detection
-  {
-    pg_missing_t missing;
-    pg_log_entry_t e = sample_e;
-
-    e.op = pg_log_entry_t::ERROR;
-    e.return_code = -ENOENT;
-    EXPECT_FALSE(e.is_update());
-    EXPECT_FALSE(e.object_is_indexed());
-    EXPECT_TRUE(e.reqid_is_indexed());
-    EXPECT_FALSE(missing.is_missing(oid));
-    missing.add_next_event(e);
-    EXPECT_FALSE(missing.is_missing(oid));
-    EXPECT_FALSE(e.object_is_indexed());
-    EXPECT_TRUE(e.reqid_is_indexed());
-  }
-
-  // ERROR op should not affect previous entries
-  {
-    pg_missing_t missing;
-    pg_log_entry_t modify = sample_e;
-
-    modify.op = pg_log_entry_t::MODIFY;
-    EXPECT_FALSE(missing.is_missing(oid));
-    missing.add_next_event(modify);
-    EXPECT_TRUE(missing.is_missing(oid));
-    EXPECT_EQ(missing.get_items().at(oid).need, version);
-
-    pg_log_entry_t error = sample_e;
-    error.op = pg_log_entry_t::ERROR;
-    error.return_code = -ENOENT;
-    error.version = eversion_t(11, 5);
-    missing.add_next_event(error);
     EXPECT_TRUE(missing.is_missing(oid));
-    EXPECT_EQ(missing.get_items().at(oid).need, version);
+    EXPECT_EQ(prior_version, missing.get_items().at(oid).have);
+    EXPECT_EQ(version, missing.get_items().at(oid).need);
+    EXPECT_TRUE(missing.get_items().at(oid).is_delete());
+    EXPECT_EQ(oid, missing.get_rmissing().at(e.version.version));
+    EXPECT_EQ(1U, missing.num_missing());
+    EXPECT_EQ(1U, missing.get_rmissing().size());
   }
 }
 
@@ -921,7 +890,7 @@ TEST(pg_missing_t, revise_need)
   // create a new entry
   EXPECT_FALSE(missing.is_missing(oid));
   eversion_t need(10,10);
-  missing.revise_need(oid, need);
+  missing.revise_need(oid, need, false);
   EXPECT_TRUE(missing.is_missing(oid));
   EXPECT_EQ(eversion_t(), missing.get_items().at(oid).have);
   EXPECT_EQ(need, missing.get_items().at(oid).need);
@@ -930,7 +899,7 @@ TEST(pg_missing_t, revise_need)
   missing.revise_have(oid, have);
   eversion_t new_need(10,12);
   EXPECT_EQ(have, missing.get_items().at(oid).have);
-  missing.revise_need(oid, new_need);
+  missing.revise_need(oid, new_need, false);
   EXPECT_EQ(have, missing.get_items().at(oid).have);
   EXPECT_EQ(new_need, missing.get_items().at(oid).need);
 }
@@ -946,7 +915,7 @@ TEST(pg_missing_t, revise_have)
   EXPECT_FALSE(missing.is_missing(oid));
   // update an existing entry
   eversion_t need(10,12);
-  missing.add(oid, need, have);
+  missing.add(oid, need, have, false);
   EXPECT_TRUE(missing.is_missing(oid));
   eversion_t new_have(2,2);
   EXPECT_EQ(have, missing.get_items().at(oid).have);
@@ -962,7 +931,7 @@ TEST(pg_missing_t, add)
   EXPECT_FALSE(missing.is_missing(oid));
   eversion_t have(1,1);
   eversion_t need(10,10);
-  missing.add(oid, need, have);
+  missing.add(oid, need, have, false);
   EXPECT_TRUE(missing.is_missing(oid));
   EXPECT_EQ(have, missing.get_items().at(oid).have);
   EXPECT_EQ(need, missing.get_items().at(oid).need);
@@ -977,7 +946,7 @@ TEST(pg_missing_t, rm)
     EXPECT_FALSE(missing.is_missing(oid));
     epoch_t epoch = 10;
     eversion_t need(epoch,10);
-    missing.add(oid, need, eversion_t());
+    missing.add(oid, need, eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
     // rm of an older version is a noop
     missing.rm(oid, eversion_t(epoch / 2,20));
@@ -991,7 +960,7 @@ TEST(pg_missing_t, rm)
     hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
-    missing.add(oid, eversion_t(), eversion_t());
+    missing.add(oid, eversion_t(), eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
     auto m = missing.get_items().find(oid);
     missing.rm(m);
@@ -1013,7 +982,7 @@ TEST(pg_missing_t, got)
     EXPECT_FALSE(missing.is_missing(oid));
     epoch_t epoch = 10;
     eversion_t need(epoch,10);
-    missing.add(oid, need, eversion_t());
+    missing.add(oid, need, eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
     // assert if that the version to be removed is lower than the version of the object
     {
@@ -1029,7 +998,7 @@ TEST(pg_missing_t, got)
     hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
-    missing.add(oid, eversion_t(), eversion_t());
+    missing.add(oid, eversion_t(), eversion_t(), false);
     EXPECT_TRUE(missing.is_missing(oid));
     auto m = missing.get_items().find(oid);
     missing.got(m);
@@ -1044,8 +1013,8 @@ TEST(pg_missing_t, split_into)
   uint32_t hash2 = 2;
   hobject_t oid2(object_t("objname"), "key2", 123, hash2, 0, "");
   pg_missing_t missing;
-  missing.add(oid1, eversion_t(), eversion_t());
-  missing.add(oid2, eversion_t(), eversion_t());
+  missing.add(oid1, eversion_t(), eversion_t(), false);
+  missing.add(oid2, eversion_t(), eversion_t(), false);
   pg_t child_pgid;
   child_pgid.m_seed = 1;
   pg_missing_t child;