]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
PGLog: create interface allowing interface user to cleanup/rollback
authorSamuel Just <sam.just@inktank.com>
Mon, 9 Dec 2013 03:36:51 +0000 (19:36 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 22 Jan 2014 22:39:16 +0000 (14:39 -0800)
We need to be able to allow the PGLog interface user to provide
logic for rolling back and trimming log entries.  To that end,
serveral PGLog methods now take a LogEntryHander.

In PGLog::merge_old_entry, if prior_version > info.log_tail and
the object is not missing, we must have rolled back the prior
log entry.  Thus, we don't skip the entry.

To simplify the code, _merge_old_entry has been split out as
a const helper.  This way, proc_replica_log can be reexpressed
as merging the divergent replica log entries with the fully
merged authoritative log.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.h
src/osd/PGLog.cc
src/osd/PGLog.h
src/osd/ReplicatedBackend.h
src/osd/osd_types.h
src/test/osd/TestPGLog.cc

index 5bcf0687672d72ed6aad355243f6a24175e2d541..dc528d69fd372747c0576abca85ae1875e73c637 100644 (file)
@@ -6379,7 +6379,9 @@ void OSD::handle_pg_trim(OpRequestRef op)
     } else {
       // primary is instructing us to trim
       ObjectStore::Transaction *t = new ObjectStore::Transaction;
-      pg->pg_log.trim(m->trim_to, pg->info);
+      PG::PGLogEntryHandler handler;
+      pg->pg_log.trim(&handler, m->trim_to, pg->info);
+      handler.apply(pg, t);
       pg->dirty_info = true;
       pg->write_if_dirty(*t);
       int tr = store->queue_transaction(
index 8395e509ccf6c3837498270f53baf828e3ffa4d4..c504aa2f2de2224110f00f15cbb6e0b72b4fa462 100644 (file)
@@ -301,10 +301,16 @@ bool PG::proc_replica_info(int from, const pg_info_t &oinfo)
 }
 
 void PG::remove_snap_mapped_object(
-  ObjectStore::Transaction& t, const hobject_t& soid)
+  ObjectStore::Transaction &t, const hobject_t &soid)
 {
   t.remove(coll, soid);
-  OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+  clear_object_snap_mapping(&t, soid);
+}
+
+void PG::clear_object_snap_mapping(
+  ObjectStore::Transaction *t, const hobject_t &soid)
+{
+  OSDriver::OSTransaction _t(osdriver.get_transaction(t));
   if (soid.snap < CEPH_MAXSNAP) {
     int r = snap_mapper.remove_oid(
       soid,
@@ -316,24 +322,39 @@ void PG::remove_snap_mapped_object(
   }
 }
 
-void PG::merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from)
+void PG::update_object_snap_mapping(
+  ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
 {
-  list<hobject_t> to_remove;
-  pg_log.merge_log(t, oinfo, olog, from, info, to_remove, dirty_info, dirty_big_info);
-  for(list<hobject_t>::iterator i = to_remove.begin();
-      i != to_remove.end();
-      ++i)
-    remove_snap_mapped_object(t, *i);
+  OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+  assert(soid.snap < CEPH_MAXSNAP);
+  int r = snap_mapper.remove_oid(
+    soid,
+    &_t);
+  if (!(r == 0 || r == -ENOENT)) {
+    derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
+    assert(0);
+  }
+  snap_mapper.add_oid(
+    soid,
+    snaps,
+    &_t);
+}
+
+void PG::merge_log(
+  ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from)
+{
+  PGLogEntryHandler rollbacker;
+  pg_log.merge_log(
+    t, oinfo, olog, from, info, &rollbacker, dirty_info, dirty_big_info);
+  rollbacker.apply(this, &t);
 }
 
 void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
 {
-  list<hobject_t> to_remove;
-  pg_log.rewind_divergent_log(t, newhead, info, to_remove, dirty_info, dirty_big_info);
-  for(list<hobject_t>::iterator i = to_remove.begin();
-      i != to_remove.end();
-      ++i)
-    remove_snap_mapped_object(t, *i);
+  PGLogEntryHandler rollbacker;
+  pg_log.rewind_divergent_log(
+    t, newhead, info, &rollbacker, dirty_info, dirty_big_info);
+  rollbacker.apply(this, &t);
 }
 
 /*
@@ -2332,8 +2353,9 @@ void PG::append_log(
 
   dout(10) << "append_log  adding " << keys.size() << " keys" << dendl;
   t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
-
-  pg_log.trim(trim_to, info);
+  PGLogEntryHandler handler;
+  pg_log.trim(&handler, trim_to, info);
+  handler.apply(this, &t);
 
   // update the local pg, pg log
   dirty_info = true;
index 1c143d2d300dd6cd4435f4b0208d86c88ba22523..b397802e38f46ae8e1adb0ebf0b070dc1e946691 100644 (file)
@@ -621,6 +621,97 @@ public:
   void proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
                       pg_missing_t& omissing, int from);
   bool proc_replica_info(int from, const pg_info_t &info);
+
+
+  struct LogEntryTrimmer : public ObjectModDesc::Visitor {
+    const hobject_t &soid;
+    PG *pg;
+    ObjectStore::Transaction *t;
+    LogEntryTrimmer(const hobject_t &soid, PG *pg, ObjectStore::Transaction *t)
+      : soid(soid), pg(pg), t(t) {}
+    void rmobject(version_t old_version) {
+      pg->get_pgbackend()->trim_stashed_object(
+       soid,
+       old_version,
+       t);
+    }
+  };
+
+  struct SnapRollBacker : public ObjectModDesc::Visitor {
+    const hobject_t &soid;
+    PG *pg;
+    ObjectStore::Transaction *t;
+    SnapRollBacker(const hobject_t &soid, PG *pg, ObjectStore::Transaction *t)
+      : soid(soid), pg(pg), t(t) {}
+    void update_snaps(set<snapid_t> &snaps) {
+      pg->update_object_snap_mapping(t, soid, snaps);
+    }
+    void create() {
+      pg->clear_object_snap_mapping(
+       t,
+       soid);
+    }
+  };
+
+  struct PGLogEntryHandler : public PGLog::LogEntryHandler {
+    map<hobject_t, list<pg_log_entry_t> > to_rollback;
+    set<hobject_t> cannot_rollback;
+    set<hobject_t> to_remove;
+    list<pg_log_entry_t> to_trim;
+    
+    // LogEntryHandler
+    void remove(const hobject_t &hoid) {
+      to_remove.insert(hoid);
+    }
+    void rollback(const pg_log_entry_t &entry) {
+      assert(!cannot_rollback.count(entry.soid));
+      to_rollback[entry.soid].push_back(entry);
+    }
+    void cant_rollback(const pg_log_entry_t &entry) {
+      to_rollback.erase(entry.soid);
+      cannot_rollback.insert(entry.soid);
+    }
+    void trim(const pg_log_entry_t &entry) {
+      to_trim.push_back(entry);
+    }
+
+    void apply(PG *pg, ObjectStore::Transaction *t) {
+      for (map<hobject_t, list<pg_log_entry_t> >::iterator i =
+            to_rollback.begin();
+          i != to_rollback.end();
+          ++i) {
+       for (list<pg_log_entry_t>::reverse_iterator j = i->second.rbegin();
+            j != i->second.rend();
+            ++j) {
+         assert(j->mod_desc.can_rollback());
+         pg->get_pgbackend()->rollback(j->soid, j->mod_desc, t);
+         SnapRollBacker rollbacker(j->soid, pg, t);
+         j->mod_desc.visit(&rollbacker);
+       }
+      }
+      for (set<hobject_t>::iterator i = to_remove.begin();
+          i != to_remove.end();
+          ++i) {
+       pg->get_pgbackend()->rollback_create(*i, t);
+       pg->remove_snap_mapped_object(*t, *i);
+      }
+      for (list<pg_log_entry_t>::reverse_iterator i = to_trim.rbegin();
+          i != to_trim.rend();
+          ++i) {
+       LogEntryTrimmer trimmer(i->soid, pg, t);
+       i->mod_desc.visit(&trimmer);
+      }
+    }
+  };
+  
+  friend struct SnapRollBacker;
+  friend struct PGLogEntryHandler;
+  friend struct LogEntryTrimmer;
+  void update_object_snap_mapping(
+    ObjectStore::Transaction *t, const hobject_t &soid,
+    const set<snapid_t> &snaps);
+  void clear_object_snap_mapping(
+    ObjectStore::Transaction *t, const hobject_t &soid);
   void remove_snap_mapped_object(
     ObjectStore::Transaction& t, const hobject_t& soid);
   void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from);
index 26c9bd7356c8d09e10307f20897300604674cee9..43c1312d818e05b191758eed284ec0fa191a06ff 100644 (file)
    virtual void rollback_setattrs(
      const hobject_t &hoid,
      map<string, boost::optional<bufferlist> > &old_attrs,
-     ObjectStore::Transaction *t) { assert(0); }
+     ObjectStore::Transaction *t) = 0;
 
    /// Rollback truncate
    virtual void rollback_append(
      const hobject_t &hoid,
      uint64_t old_size,
-     ObjectStore::Transaction *t) { assert(0); }
+     ObjectStore::Transaction *t) = 0;
 
    /// Rollback unstash
    virtual void rollback_unstash(
      const hobject_t &hoid,
      version_t old_version,
-     ObjectStore::Transaction *t) { assert(0); }
+     ObjectStore::Transaction *t) = 0;
 
    /// Rollback create
    virtual void rollback_create(
      const hobject_t &hoid,
-     ObjectStore::Transaction *t) { assert(0); }
+     ObjectStore::Transaction *t) = 0;
 
    /// Trim object stashed at stashed_version
    virtual void trim_stashed_object(
      const hobject_t &hoid,
-     version_t stashed_version) { assert(0); }
+     version_t stashed_version,
+     ObjectStore::Transaction *t) = 0;
 
    /// List objects in collection
    virtual int objects_list_partial(
index 68f5552d83712087e489da738c222f9287460e5e..06795dcb414d5d79b6648f223965809c8269c533 100644 (file)
@@ -51,7 +51,10 @@ void PGLog::IndexedLog::split_into(
   olog->can_rollback_to = can_rollback_to;
 }
 
-void PGLog::IndexedLog::trim(eversion_t s, set<eversion_t> *trimmed)
+void PGLog::IndexedLog::trim(
+  LogEntryHandler *handler,
+  eversion_t s,
+  set<eversion_t> *trimmed)
 {
   if (complete_to != log.end() &&
       complete_to->version <= s) {
@@ -66,6 +69,7 @@ void PGLog::IndexedLog::trim(eversion_t s, set<eversion_t> *trimmed)
     generic_dout(20) << "trim " << e << dendl;
     if (trimmed)
       trimmed->insert(e.version);
+    handler->trim(e);
     unindex(e);         // remove from index,
     log.pop_front();    // from log
   }
@@ -119,7 +123,10 @@ void PGLog::clear_info_log(
   t->omap_rmkeys(coll_t::META_COLL, infos_oid, keys_to_remove);
 }
 
-void PGLog::trim(eversion_t trim_to, pg_info_t &info)
+void PGLog::trim(
+  LogEntryHandler *handler,
+  eversion_t trim_to,
+  pg_info_t &info)
 {
   // trim?
   if (trim_to > log.tail) {
@@ -131,7 +138,7 @@ void PGLog::trim(eversion_t trim_to, pg_info_t &info)
     assert(trim_to <= info.last_complete);
 
     dout(10) << "trim " << log << " to " << trim_to << dendl;
-    log.trim(trim_to, &trimmed);
+    log.trim(handler, trim_to, &trimmed);
     info.log_tail = log.tail;
   }
 }
@@ -159,68 +166,48 @@ void PGLog::proc_replica_log(ObjectStore::Transaction& t,
             << " have " << i->second.have << dendl;
   }
 
-  list<pg_log_entry_t>::const_reverse_iterator pp = olog.log.rbegin();
+  list<pg_log_entry_t> divergent;
+  list<pg_log_entry_t>::const_iterator pp = olog.log.end();
   eversion_t lu(oinfo.last_update);
   while (true) {
-    if (pp == olog.log.rend()) {
-      if (pp != olog.log.rbegin())   // no last_update adjustment if we discard nothing!
+    if (pp == olog.log.begin()) {
+      if (pp != olog.log.end())   // no last_update adjustment if we discard nothing!
        lu = olog.tail;
       break;
     }
+    --pp;
     const pg_log_entry_t& oe = *pp;
 
     // don't continue past the tail of our log.
-    if (oe.version <= log.tail)
+    if (oe.version <= log.tail) {
       break;
-
-    if (!log.objects.count(oe.soid)) {
-      dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
-      ++pp;
-      continue;
     }
-      
-    const pg_log_entry_t& ne = *(log.objects.find(oe.soid)->second);
-    if (ne.version == oe.version) {
-      dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
+
+    hash_map<hobject_t, pg_log_entry_t*>::const_iterator i =
+      log.objects.find(oe.soid);
+    if (i != log.objects.end() && i->second->version == oe.version) {
+      dout(10) << " had " << oe << " new " << *(i->second)
+              << " : match, stopping" << dendl;
       lu = pp->version;
       break;
     }
 
-    if (oe.soid > oinfo.last_backfill) {
-      // past backfill line, don't care
-      dout(10) << " had " << oe << " beyond last_backfill : skipping" << dendl;
-      ++pp;
-      continue;
-    }
-
-    if (ne.version > oe.version) {
-      dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
-    } else {
-      if (oe.is_delete()) {
-       if (ne.is_delete()) {
-         // old and new are delete
-         dout(10) << " had " << oe << " new " << ne << " : both deletes" << dendl;
-       } else {
-         // old delete, new update.
-         dout(10) << " had " << oe << " new " << ne << " : missing" << dendl;
-         omissing.add(ne.soid, ne.version, eversion_t());
-       }
-      } else {
-       if (ne.is_delete()) {
-         // old update, new delete
-         dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
-         omissing.rm(oe.soid, oe.version);
-       } else {
-         // old update, new update
-         dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
-         omissing.revise_need(ne.soid, ne.version);
-       }
-      }
-    }
-
-    ++pp;
+    divergent.push_front(oe);
   }    
 
+  for (list<pg_log_entry_t>::iterator i = divergent.begin();
+       i != divergent.end();
+       ++i) {
+    _merge_old_entry(
+      t,
+      *i,
+      oinfo,
+      omissing,
+      olog.can_rollback_to,
+      0,
+      0);
+  }
+
   if (lu < oinfo.last_update) {
     dout(10) << " peer osd." << from << " last_update now " << lu << dendl;
     oinfo.last_update = lu;
@@ -252,42 +239,73 @@ void PGLog::proc_replica_log(ObjectStore::Transaction& t,
  *
  * return true if entry is not divergent.
  */
-bool PGLog::merge_old_entry(ObjectStore::Transaction& t, const pg_log_entry_t& oe, const pg_info_t& info, list<hobject_t>& remove_snap)
+bool PGLog::_merge_old_entry(
+  ObjectStore::Transaction& t,
+  const pg_log_entry_t &oe,
+  const pg_info_t& info,
+  pg_missing_t &missing,
+  eversion_t olog_can_rollback_to,
+  boost::optional<pair<eversion_t, hobject_t> > *new_divergent_prior,
+  LogEntryHandler *rollbacker) const
 {
   if (oe.soid > info.last_backfill) {
-    dout(20) << "merge_old_entry  had " << oe << " : beyond last_backfill" << dendl;
+    dout(20) << "merge_old_entry  had " << oe
+            << " : beyond last_backfill" << dendl;
     return false;
   }
-  if (log.objects.count(oe.soid)) {
-    pg_log_entry_t &ne = *log.objects[oe.soid];  // new(er?) entry
+  hash_map<hobject_t, pg_log_entry_t*>::const_iterator objiter =
+    log.objects.find(oe.soid);
+  if (objiter != log.objects.end()) {
+    pg_log_entry_t &ne = *(objiter->second); // new(er?) entry
     
     if (ne.version > oe.version) {
-      dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : older, missing" << dendl;
-      assert(ne.is_delete() || missing.is_missing(ne.soid));
+      dout(20) << "merge_old_entry  had " << oe
+              << " new " << ne << " : older, missing" << dendl;
       return false;
     }
     if (ne.version == oe.version) {
-      dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : same" << dendl;
+      dout(20) << "merge_old_entry  had " << oe
+              << " new " << ne << " : same" << dendl;
       return true;
     }
     if (oe.is_delete()) {
       if (ne.is_delete()) {
        // old and new are delete
-       dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : both deletes" << dendl;
+       dout(20) << "merge_old_entry  had " << oe
+                << " new " << ne << " : both deletes" << dendl;
       } else {
        // old delete, new update.
-       dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : missing" << dendl;
+       dout(20) << "merge_old_entry  had " << oe
+                << " new " << ne << " : missing" << dendl;
        missing.revise_need(ne.soid, ne.version);
+       if (rollbacker)
+         rollbacker->cant_rollback(oe);
       }
     } else {
       if (ne.is_delete()) {
        // old update, new delete
-       dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : new delete supercedes" << dendl;
-       missing.rm(oe.soid, oe.version);
+       dout(20) << "merge_old_entry  had " << oe
+                << " new " << ne << " : new delete supercedes" << dendl;
+       if (rollbacker) {
+         rollbacker->remove(oe.soid);
+         rollbacker->cant_rollback(oe);
+       }
+       if (missing.is_missing(oe.soid))
+         missing.rm(oe.soid, oe.version);
       } else {
        // old update, new update
-       dout(20) << "merge_old_entry  had " << oe << " new " << ne << " : new item supercedes" << dendl;
-       missing.revise_need(ne.soid, ne.version);
+       dout(20) << "merge_old_entry  had " << oe
+                << " new " << ne << " : new item supercedes" << dendl;
+       if (oe.mod_desc.can_rollback() && oe.version > olog_can_rollback_to) {
+         dout(20) << __func__ << ": ne.version < oe.version && can rollback, "
+                  << "rolling back " << oe << dendl;
+         if (rollbacker)
+           rollbacker->rollback(oe);
+       } else {
+         missing.revise_need(ne.soid, ne.version);
+         if (rollbacker)
+           rollbacker->cant_rollback(oe);
+       }
       }
     }
   } else if (oe.op == pg_log_entry_t::CLONE) {
@@ -295,10 +313,20 @@ bool PGLog::merge_old_entry(ObjectStore::Transaction& t, const pg_log_entry_t& o
     dout(20) << "merge_old_entry  had " << oe
             << ", clone with no non-divergent log entries, "
             << "deleting" << dendl;
-    remove_snap.push_back(oe.soid);
-    if (missing.is_missing(oe.soid))
+    if (missing.is_missing(oe.soid)) {
       missing.rm(oe.soid, missing.missing[oe.soid].need);
-  } else if (oe.prior_version > info.log_tail) {
+    }
+
+    if (oe.mod_desc.can_rollback() && oe.version > olog_can_rollback_to) {
+      dout(20) << __func__ << ": rolling back " << oe << dendl;
+      if (rollbacker)
+       rollbacker->rollback(oe);
+    } else {
+      dout(20) << __func__ << ": had " << oe << " deleting" << dendl;
+      if (rollbacker)
+       rollbacker->remove(oe.soid);
+    }
+  } else if (oe.prior_version > info.log_tail && missing.is_missing(oe.soid)) {
     /**
      * oe.prior_version is a previously divergent log entry
      * oe.soid must have already been handled and the missing
@@ -309,17 +337,27 @@ bool PGLog::merge_old_entry(ObjectStore::Transaction& t, const pg_log_entry_t& o
             << " oe.soid " << oe.soid
             << " must already have been merged" << dendl;
   } else {
-    if (!oe.is_delete()) {
-      dout(20) << "merge_old_entry  had " << oe << " deleting" << dendl;
-      remove_snap.push_back(oe.soid);
-    }
-    dout(20) << "merge_old_entry  had " << oe << " updating missing to "
-            << oe.prior_version << dendl;
-    if (oe.prior_version > eversion_t()) {
-      add_divergent_prior(oe.prior_version, oe.soid);
-      missing.revise_need(oe.soid, oe.prior_version);
-    } else if (missing.is_missing(oe.soid)) {
-      missing.rm(oe.soid, missing.missing[oe.soid].need);
+    if (oe.mod_desc.can_rollback() && oe.version > olog_can_rollback_to) {
+      dout(20) << __func__ << ": rolling back " << oe << dendl;
+      if (rollbacker)
+       rollbacker->rollback(oe);
+    } else {
+      if (!oe.is_delete()) {
+       if (rollbacker)
+         rollbacker->remove(oe.soid);
+       dout(20) << __func__ << ": had " << oe << " deleting" << dendl;
+      }
+      dout(20) << "merge_old_entry  had " << oe << " updating missing to "
+              << oe.prior_version << dendl;
+      if (oe.prior_version > eversion_t()) {
+       if (new_divergent_prior)
+         *new_divergent_prior = make_pair(oe.prior_version, oe.soid);
+       missing.revise_need(oe.soid, oe.prior_version);
+       if (rollbacker)
+         rollbacker->cant_rollback(oe);
+      } else if (missing.is_missing(oe.soid)) {
+       missing.rm(oe.soid, missing.missing[oe.soid].need);
+      }
     }
   }
   return false;
@@ -335,8 +373,8 @@ bool PGLog::merge_old_entry(ObjectStore::Transaction& t, const pg_log_entry_t& o
  * @param newhead new head to rewind to
  */
 void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead,
-                      pg_info_t &info, list<hobject_t>& remove_snap,
-                      bool &dirty_info, bool &dirty_big_info)
+                                pg_info_t &info, LogEntryHandler *rollbacker,
+                                bool &dirty_info, bool &dirty_big_info)
 {
   dout(10) << "rewind_divergent_log truncate divergent future " << newhead << dendl;
   assert(newhead >= log.tail);
@@ -366,8 +404,15 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead
     info.last_complete = newhead;
 
   log.index();
-  for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
-    merge_old_entry(t, *d, info, remove_snap);
+  for (list<pg_log_entry_t>::iterator d = divergent.begin();
+       d != divergent.end();
+       ++d) {
+    merge_old_entry(t, *d, info, rollbacker);
+    rollbacker->trim(*d);
+  }
+
+  if (info.last_update < log.can_rollback_to)
+    log.can_rollback_to = info.last_update;
 
   dirty_info = true;
   dirty_big_info = true;
@@ -375,7 +420,7 @@ void PGLog::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead
 
 void PGLog::merge_log(ObjectStore::Transaction& t,
                       pg_info_t &oinfo, pg_log_t &olog, int fromosd,
-                      pg_info_t &info, list<hobject_t>& remove_snap,
+                      pg_info_t &info, LogEntryHandler *rollbacker,
                       bool &dirty_info, bool &dirty_big_info)
 {
   dout(10) << "merge_log " << olog << " from osd." << fromosd
@@ -432,7 +477,7 @@ void PGLog::merge_log(ObjectStore::Transaction& t,
 
   // do we have divergent entries to throw out?
   if (olog.head < log.head) {
-    rewind_divergent_log(t, olog.head, info, remove_snap, dirty_info, dirty_big_info);
+    rewind_divergent_log(t, olog.head, info, rollbacker, dirty_info, dirty_big_info);
     changed = true;
   }
 
@@ -466,7 +511,7 @@ void PGLog::merge_log(ObjectStore::Transaction& t,
       if (ne.soid <= info.last_backfill) {
        missing.add_next_event(ne);
        if (ne.is_delete())
-         remove_snap.push_back(ne.soid);
+         rollbacker->remove(ne.soid);
       }
     }
       
@@ -495,15 +540,21 @@ void PGLog::merge_log(ObjectStore::Transaction& t,
     log.index();   
 
     info.last_update = log.head = olog.head;
+
     info.last_user_version = oinfo.last_user_version;
     info.purged_snaps = oinfo.purged_snaps;
 
     // process divergent items
-    if (!divergent.empty()) {
-      for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
-       merge_old_entry(t, *d, info, remove_snap);
+    for (list<pg_log_entry_t>::iterator d = divergent.begin();
+        d != divergent.end();
+        ++d) {
+      merge_old_entry(t, *d, info, rollbacker);
+      rollbacker->trim(*d);
     }
 
+    // We cannot rollback into the new log entries
+    log.can_rollback_to = log.head;
+
     changed = true;
   }
   
index 792191bea2a3035b1cd73195d1fa1a4c6555c2f7..0afd23eb09b5f2b5653dc13d144621f602e8e3b8 100644 (file)
@@ -27,6 +27,17 @@ using namespace std;
 
 struct PGLog {
   ////////////////////////////// sub classes //////////////////////////////
+  struct LogEntryHandler {
+    virtual void rollback(
+      const pg_log_entry_t &entry) = 0;
+    virtual void remove(
+      const hobject_t &hoid) = 0;
+    virtual void trim(
+      const pg_log_entry_t &entry) = 0;
+    virtual void cant_rollback(
+      const pg_log_entry_t &entry) = 0;
+    virtual ~LogEntryHandler() {}
+  };
 
   /* Exceptions */
   class read_log_error : public buffer::error {
@@ -142,7 +153,10 @@ struct PGLog {
        caller_ops[e.reqid] = &(log.back());
     }
 
-    void trim(eversion_t s, set<eversion_t> *trimmed);
+    void trim(
+      LogEntryHandler *handler,
+      eversion_t s,
+      set<eversion_t> *trimmed);
 
     ostream& print(ostream& out) const;
   };
@@ -304,7 +318,10 @@ public:
     const hobject_t &log_oid,
     ObjectStore::Transaction *t);
 
-  void trim(eversion_t trim_to, pg_info_t &info);
+  void trim(
+    LogEntryHandler *handler,
+    eversion_t trim_to,
+    pg_info_t &info);
 
   //////////////////// get or set log & missing ////////////////////
 
@@ -364,16 +381,39 @@ public:
                        pg_missing_t& omissing, int from) const;
 
 protected:
-  bool merge_old_entry(ObjectStore::Transaction& t, const pg_log_entry_t& oe,
-                      const pg_info_t& info, list<hobject_t>& remove_snap);
+  bool _merge_old_entry(
+    ObjectStore::Transaction& t,
+    const pg_log_entry_t &oe,
+    const pg_info_t& info,
+    pg_missing_t &missing,
+    eversion_t olog_can_rollback_to,
+    boost::optional<pair<eversion_t, hobject_t> > *new_divergent_prior,
+    LogEntryHandler *rollbacker) const;
+  bool merge_old_entry(
+    ObjectStore::Transaction& t,
+    const pg_log_entry_t& oe,
+    const pg_info_t& info,
+    LogEntryHandler *rollbacker) {
+    boost::optional<pair<eversion_t, hobject_t> > new_divergent_prior;
+    bool merged = _merge_old_entry(
+      t, oe, info, missing,
+      log.can_rollback_to,
+      &new_divergent_prior,
+      rollbacker);
+    if (new_divergent_prior)
+      add_divergent_prior(
+       (*new_divergent_prior).first,
+       (*new_divergent_prior).second);
+    return merged;
+  }
 public:
   void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead,
-                            pg_info_t &info, list<hobject_t>& remove_snap,
+                            pg_info_t &info, LogEntryHandler *rollbacker,
                             bool &dirty_info, bool &dirty_big_info);
 
   void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog, int from,
-                      pg_info_t &info, list<hobject_t>& remove_snap,
-                      bool &dirty_info, bool &dirty_big_info);
+                pg_info_t &info, LogEntryHandler *rollbacker,
+                bool &dirty_info, bool &dirty_big_info);
 
   void write_log(ObjectStore::Transaction& t, const hobject_t &log_oid);
 
index 4fca14ed4be28c96221318556ffcbb9fa3b1c7d2..84eacec10a6914b05376192edca21bc6bea8e700 100644 (file)
@@ -405,7 +405,7 @@ public:
     t->remove(coll, hoid);
     t->collection_move_rename(
       coll,
-      ghobject_t(hoid, old_version, ghobject_t::NO_SHARD),
+      ghobject_t(hoid, old_version, 0),
       coll,
       hoid);
   }
@@ -415,6 +415,14 @@ public:
     ObjectStore::Transaction *t) {
     t->remove(coll, hoid);
   }
+
+  void trim_stashed_object(
+    const hobject_t &hoid,
+    version_t old_version,
+    ObjectStore::Transaction *t) {
+    t->remove(coll, ghobject_t(hoid, old_version, 0));
+  }
+
 private:
   void issue_op(
     const hobject_t &soid,
index 303e328aa4deb5af8a65b45e79df68b41b3ca748..f1af161ae3b0ce85c3424e99e6e413caebaf7920 100644 (file)
@@ -1802,7 +1802,7 @@ struct pg_log_t {
   eversion_t head;    // newest entry
   eversion_t tail;    // version prior to oldest
 
-  // We can rollback rollback-able objects to can_rollback_to
+  // We can rollback rollback-able entries > can_rollback_to
   eversion_t can_rollback_to;
 
   list<pg_log_entry_t> log;  // the actual log.
index e0863f726a08e064384a50cf769b83225304e806..39af882983e36da4045f182d60ba67f7aa4f036b 100644 (file)
@@ -35,6 +35,21 @@ public:
   }
 };
 
+struct TestHandler : public PGLog::LogEntryHandler {
+  list<hobject_t> &removed;
+  TestHandler(list<hobject_t> &removed) : removed(removed) {}
+
+  void rollback(
+    const pg_log_entry_t &entry) {}
+  void remove(
+    const hobject_t &hoid) {
+    removed.push_back(hoid);
+  }
+  void cant_rollback(const pg_log_entry_t &entry) {}
+  void trim(
+    const pg_log_entry_t &entry) {}
+};
+
 TEST_F(PGLogTest, rewind_divergent_log) {
   // newhead > log.tail : throw an assert
   {
@@ -47,7 +62,8 @@ TEST_F(PGLogTest, rewind_divergent_log) {
     bool dirty_big_info = false;
 
     log.tail = eversion_t(2, 1);
-    EXPECT_THROW(rewind_divergent_log(t, eversion_t(1, 1), info, remove_snap,
+    TestHandler h(remove_snap);
+    EXPECT_THROW(rewind_divergent_log(t, eversion_t(1, 1), info, &h,
                                      dirty_info, dirty_big_info),
                 FailedAssertion);
   }
@@ -88,6 +104,7 @@ TEST_F(PGLogTest, rewind_divergent_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
@@ -119,7 +136,8 @@ TEST_F(PGLogTest, rewind_divergent_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    rewind_divergent_log(t, newhead, info, remove_snap,
+    TestHandler h(remove_snap);
+    rewind_divergent_log(t, newhead, info, &h,
                         dirty_info, dirty_big_info);
 
     EXPECT_TRUE(log.objects.count(divergent));
@@ -165,6 +183,7 @@ TEST_F(PGLogTest, rewind_divergent_log) {
     eversion_t newhead;
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       info.log_tail = log.tail = eversion_t(1, 1);
       newhead = eversion_t(1, 3);
@@ -185,7 +204,8 @@ TEST_F(PGLogTest, rewind_divergent_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    rewind_divergent_log(t, newhead, info, remove_snap,
+    TestHandler h(remove_snap);
+    rewind_divergent_log(t, newhead, info, &h,
                         dirty_info, dirty_big_info);
 
     EXPECT_TRUE(missing.is_missing(divergent_object));
@@ -206,6 +226,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -219,7 +240,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_FALSE(missing.have_missing());
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
@@ -234,13 +256,15 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
     oe.op = pg_log_entry_t::CLONE;
 
     oe.soid.snap = CEPH_NOSNAP;
-    EXPECT_THROW(merge_old_entry(t, oe, info, remove_snap), FailedAssertion);
+    TestHandler h(remove_snap);
+    EXPECT_THROW(merge_old_entry(t, oe, info, &h), FailedAssertion);
     oe.soid.snap = 1U;
     missing.add(oe.soid, eversion_t(), eversion_t());
 
@@ -251,7 +275,7 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_TRUE(missing.is_missing(oe.soid));
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_EQ(oe.soid, remove_snap.front());
@@ -267,6 +291,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -279,7 +304,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_FALSE(missing.have_missing());
     EXPECT_EQ(1U, log.log.size());
 
-    EXPECT_TRUE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_TRUE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
@@ -299,6 +325,7 @@ TEST_F(PGLogTest, merge_old_entry) {
     list<hobject_t> remove_snap;
 
     pg_log_entry_t ne;
+    ne.mod_desc.mark_unrollbackable();
     ne.version = eversion_t(2,1);
     log.add(ne);
 
@@ -313,9 +340,11 @@ TEST_F(PGLogTest, merge_old_entry) {
     {
       log.log.front().op = pg_log_entry_t::DELETE;
       pg_log_entry_t oe;
+      oe.mod_desc.mark_unrollbackable();
       oe.version = eversion_t(1,1);
 
-      EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+      TestHandler h(remove_snap);
+      EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
     }
 
     // if the newer entry is not DELETE, the object must be in missing
@@ -324,23 +353,15 @@ TEST_F(PGLogTest, merge_old_entry) {
       ne.op = pg_log_entry_t::MODIFY;
       missing.add_next_event(ne);
       pg_log_entry_t oe;
+      oe.mod_desc.mark_unrollbackable();
       oe.version = eversion_t(1,1);
 
-      EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+      TestHandler h(remove_snap);
+      EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
       missing.rm(ne.soid, ne.version);
     }
 
-    // throw if the newer entry is not DELETE and not in missing
-    {
-      pg_log_entry_t &ne = log.log.front();
-      ne.op = pg_log_entry_t::MODIFY;
-      pg_log_entry_t oe;
-      oe.version = eversion_t(1,1);
-
-      EXPECT_THROW(merge_old_entry(t, oe, info, remove_snap), FailedAssertion);
-    }
-
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
     EXPECT_TRUE(t.empty());
@@ -358,10 +379,12 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
     pg_log_entry_t ne;
+    ne.mod_desc.mark_unrollbackable();
     ne.version = eversion_t(1,1);
     ne.op = pg_log_entry_t::DELETE;
     log.add(ne);
@@ -375,7 +398,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_FALSE(missing.have_missing());
     EXPECT_EQ(1U, log.log.size());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
@@ -399,10 +423,12 @@ TEST_F(PGLogTest, merge_old_entry) {
 
       ObjectStore::Transaction t;
       pg_log_entry_t oe;
+      oe.mod_desc.mark_unrollbackable();
       pg_info_t info;
       list<hobject_t> remove_snap;
 
       pg_log_entry_t ne;
+      ne.mod_desc.mark_unrollbackable();
       ne.version = eversion_t(1,1);
       ne.op = pg_log_entry_t::MODIFY;
       log.add(ne);
@@ -419,7 +445,8 @@ TEST_F(PGLogTest, merge_old_entry) {
       eversion_t old_version(0, 0);
       // if the object is not already in missing, add it
       {
-        EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+       TestHandler h(remove_snap);
+        EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
         EXPECT_TRUE(missing.is_missing(ne.soid, ne.version));
         EXPECT_FALSE(missing.is_missing(ne.soid, old_version));
@@ -429,7 +456,8 @@ TEST_F(PGLogTest, merge_old_entry) {
         missing.revise_need(ne.soid, old_version);
         EXPECT_TRUE(missing.is_missing(ne.soid, old_version));
 
-        EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+       TestHandler h(remove_snap);
+        EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
         EXPECT_TRUE(missing.is_missing(ne.soid, ne.version));
         EXPECT_FALSE(missing.is_missing(ne.soid, old_version));
@@ -452,10 +480,12 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
     pg_log_entry_t ne;
+    ne.mod_desc.mark_unrollbackable();
     ne.version = eversion_t(1,1);
     ne.op = pg_log_entry_t::DELETE;
     log.add(ne);
@@ -470,10 +500,11 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_TRUE(missing.is_missing(oe.soid));
     EXPECT_EQ(1U, log.log.size());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
-    EXPECT_TRUE(remove_snap.empty());
+    EXPECT_TRUE(remove_snap.size() > 0);
     EXPECT_TRUE(t.empty());
     EXPECT_FALSE(missing.have_missing());
     EXPECT_EQ(1U, log.log.size());
@@ -488,25 +519,26 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
     info.log_tail = eversion_t(1,1);
     oe.op = pg_log_entry_t::MODIFY;
     oe.prior_version = eversion_t(2,1);
+    missing_add(oe.soid, oe.prior_version, eversion_t());
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
     EXPECT_TRUE(t.empty());
-    EXPECT_FALSE(missing.have_missing());
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
     EXPECT_TRUE(t.empty());
-    EXPECT_FALSE(missing.have_missing());
     EXPECT_TRUE(log.empty());
   }
 
@@ -523,6 +555,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -537,7 +570,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_FALSE(missing.have_missing());
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_TRUE(is_dirty());
     EXPECT_EQ(oe.soid, remove_snap.front());
@@ -559,6 +593,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -573,7 +608,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_FALSE(missing.have_missing());
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_TRUE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
@@ -594,6 +630,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -610,7 +647,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_TRUE(missing.is_missing(oe.soid));
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_TRUE(remove_snap.empty());
@@ -631,6 +669,7 @@ TEST_F(PGLogTest, merge_old_entry) {
 
     ObjectStore::Transaction t;
     pg_log_entry_t oe;
+    oe.mod_desc.mark_unrollbackable();
     pg_info_t info;
     list<hobject_t> remove_snap;
 
@@ -647,7 +686,8 @@ TEST_F(PGLogTest, merge_old_entry) {
     EXPECT_TRUE(missing.is_missing(oe.soid));
     EXPECT_TRUE(log.empty());
 
-    EXPECT_FALSE(merge_old_entry(t, oe, info, remove_snap));
+    TestHandler h(remove_snap);
+    EXPECT_FALSE(merge_old_entry(t, oe, info, &h));
 
     EXPECT_FALSE(is_dirty());
     EXPECT_EQ(oe.soid, remove_snap.front());
@@ -691,7 +731,8 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    merge_log(t, oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
     EXPECT_FALSE(missing.have_missing());
@@ -742,7 +783,8 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    merge_log(t, oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
     EXPECT_FALSE(missing.have_missing());
@@ -808,6 +850,7 @@ TEST_F(PGLogTest, merge_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 4);
       e.soid.hash = 0x5;
@@ -847,7 +890,8 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    merge_log(t, oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
     EXPECT_FALSE(missing.have_missing());
@@ -903,6 +947,7 @@ TEST_F(PGLogTest, merge_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
@@ -956,7 +1001,8 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    merge_log(t, oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
     /* When the divergent entry is a DELETE and the authoritative
@@ -1014,6 +1060,7 @@ TEST_F(PGLogTest, merge_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
@@ -1056,7 +1103,8 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_info);
     EXPECT_FALSE(dirty_big_info);
 
-    merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    merge_log(t, oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
     EXPECT_FALSE(missing.have_missing());
@@ -1086,7 +1134,8 @@ TEST_F(PGLogTest, merge_log) {
     // olog has been trimmed
     olog.tail = eversion_t(1, 1);
 
-    ASSERT_THROW(merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    ASSERT_THROW(merge_log(t, oinfo, olog, fromosd, info, &h,
                            dirty_info, dirty_big_info), FailedAssertion);
   }
 
@@ -1095,16 +1144,16 @@ TEST_F(PGLogTest, merge_log) {
             +--------+-------+---------+
             |        |object |         |
             |version | hash  | version |
-            |        |       |         |
-       tail > (1,1)  |  x5   |         |
+       tail > (0,0)  |       |         |
+            | (1,1)  |  x5   |         |
             |        |       |         |
             |        |       |         |
        head > (1,2)  |  x3   |         |
             |        |       |         |
-            |        |  x9   |  (2,3)  < tail
-            |        |       |         |
+            |        |       |  (2,3)  < tail
+            |        |  x9   |  (2,4)  |
             |        |       |         |
-            |        |  x5   |  (2,4)  < head
+            |        |  x5   |  (2,5)  < head
             |        |       |         |
             +--------+-------+---------+
 
@@ -1125,10 +1174,11 @@ TEST_F(PGLogTest, merge_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
+      log.tail = eversion_t();
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
-      log.tail = e.version;
       log.log.push_back(e);
       e.version = eversion_t(1, 2);
       e.soid.hash = 0x3;
@@ -1138,16 +1188,18 @@ TEST_F(PGLogTest, merge_log) {
 
       info.last_update = log.head;
 
-      e.version = eversion_t(2, 3);
+      olog.tail = eversion_t(2, 3);
+      e.version = eversion_t(2, 4);
       e.soid.hash = 0x9;
       olog.log.push_back(e);
-      e.version = eversion_t(2, 4);
+      e.version = eversion_t(2, 5);
       e.soid.hash = 0x5;
       olog.log.push_back(e);
       olog.head = e.version;
     }
 
-    EXPECT_THROW(merge_log(t, oinfo, olog, fromosd, info, remove_snap,
+    TestHandler h(remove_snap);
+    EXPECT_THROW(merge_log(t, oinfo, olog, fromosd, info, &h,
                            dirty_info, dirty_big_info), FailedAssertion);
   }
 }
@@ -1217,6 +1269,7 @@ TEST_F(PGLogTest, proc_replica_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 2);
       e.soid.hash = 0x5;
@@ -1256,40 +1309,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_complete);
   }
 
-  /*        +--------------------------+
-            |  log              olog   |
-            +--------+-------+---------+
-            |        |object |         |
-            |version | hash  | version |
-            |        |       |         |
-       tail > (1,1)  |  x5   |  (1,1)  < tail
-            |        |       |         |
-            |        |       |         |
-            | (1,2)  |  x3   |  (1,2)  |
-            |        |       |         |
-            |        |       |         |
-            | (1,3)  |  x9   |         |
-            | MODIFY |       |         |
-            |        |       |         |
-            |        |  x7   |  (1,4)  |
-            |        |       |         |
-            | (1,6)  |  x8   |  (1,5)  |
-            |        |       |         |
-            |        |  x9   |  (2,3)  < head < last_backfill
-            |        |       |  DELETE |
-            |        |       |         |
-       head > (1,7)  |  xa   |  (2,5)  |
-            |        |       |         |
-            +--------+-------+---------+
-
-      The log entry (1,3) modifies the object x9 but the olog entry
-      (2,3) deletes it : log is authoritative and the object is added
-      to missing. x7 is divergent and ignored. x8 has a more recent
-      version in the log and the olog entry is ignored. xa is past
-      last_backfill and ignored.
-
-  */
-  {
+ {
     clear();
 
     ObjectStore::Transaction t;
@@ -1298,56 +1318,83 @@ TEST_F(PGLogTest, proc_replica_log) {
     pg_missing_t omissing;
     int from = -1;
 
-    eversion_t last_update(1, 2);
     hobject_t divergent_object;
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
-      e.version = eversion_t(1, 1);
-      e.soid.hash = 0x5;
-      log.tail = e.version;
-      log.log.push_back(e);
-      e.version = last_update;
-      e.soid.hash = 0x3;
-      log.log.push_back(e);
-      e.version = eversion_t(1,3);
-      e.soid.hash = 0x9;
-      divergent_object = e.soid;
-      e.op = pg_log_entry_t::MODIFY;
-      log.log.push_back(e);
-      e.version = eversion_t(1, 6);
-      e.soid.hash = 0x8;
-      log.log.push_back(e);
-      e.version = eversion_t(1, 7);
-      e.soid.hash = 0xa;
-      log.log.push_back(e);
-      log.head = e.version;
+      {
+       e.soid = divergent_object;
+       e.soid.hash = 0x1;
+       e.version = eversion_t(1, 1);
+       log.tail = e.version;
+       log.log.push_back(e);
+
+       e.soid = divergent_object;
+       e.version = eversion_t(1, 2);
+       log.tail = e.version;
+       log.log.push_back(e);
+
+       e.soid.hash = 0x3;
+       e.version = eversion_t(1, 4);
+       log.log.push_back(e);
+
+       e.soid.hash = 0x7;
+       e.version = eversion_t(1, 5);
+       log.log.push_back(e);
+
+       e.soid.hash = 0x8;
+       e.version = eversion_t(1, 6);
+       log.log.push_back(e);
+
+       e.soid.hash = 0x9;
+       e.op = pg_log_entry_t::DELETE;
+       e.version = eversion_t(2, 7);
+       log.log.push_back(e);
+
+       e.soid.hash = 0xa;
+       e.version = eversion_t(2, 8);
+       log.head = e.version;
+       log.log.push_back(e);
+      }
       log.index();
 
-      e.version = eversion_t(1, 1);
-      e.soid.hash = 0x5;
-      olog.tail = e.version;
-      olog.log.push_back(e);
-      e.version = last_update;
-      e.soid.hash = 0x3;
-      olog.log.push_back(e);
-      e.version = eversion_t(1, 4); // divergent entry : will be ignored
-      e.soid.hash = 0x7;
-      olog.log.push_back(e);
-      e.version = eversion_t(1, 5); // log has newer entry : it will be ignored
-      e.soid.hash = 0x8;
-      olog.log.push_back(e);
-      e.version = eversion_t(2, 3);
-      e.soid.hash = 0x9;
-      oinfo.last_backfill = e.soid;
-      e.op = pg_log_entry_t::DELETE;
-      olog.log.push_back(e);
-      e.version = eversion_t(2, 4);
-      e.soid.hash = 0xa; // > last_backfill are ignored
-      olog.log.push_back(e);
-      olog.head = e.version;
-
+      {
+       e.soid = divergent_object;
+       e.soid.hash = 0x1;
+       e.version = eversion_t(1, 1);
+       olog.tail = e.version;
+       olog.log.push_back(e);
+
+       e.soid = divergent_object;
+       e.version = eversion_t(1, 2);
+       olog.log.push_back(e);
+
+       e.soid.hash = 0x3;
+       e.version = eversion_t(1, 4);
+       olog.log.push_back(e);
+
+       e.soid.hash = 0x7;
+       e.version = eversion_t(1, 5);
+       olog.log.push_back(e);
+
+       e.soid.hash = 0x8;
+       e.version = eversion_t(1, 6);
+       olog.log.push_back(e);
+
+       e.soid.hash = 0x9; // should be ignored, matches above
+       e.op = pg_log_entry_t::DELETE;
+       e.version = eversion_t(1, 7);
+       olog.log.push_back(e);
+
+       e.soid = divergent_object; // should be marked missing at version 1'2
+       e.op = pg_log_entry_t::MODIFY;
+       e.version = eversion_t(1, 8);
+       e.prior_version = eversion_t(1, 2);
+       olog.log.push_back(e);
+       olog.head = e.version;
+      }
       oinfo.last_update = olog.head;
       oinfo.last_complete = olog.head;
     }
@@ -1362,9 +1409,9 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_TRUE(t.empty());
     EXPECT_TRUE(omissing.have_missing());
     EXPECT_TRUE(omissing.is_missing(divergent_object));
-    EXPECT_EQ(eversion_t(1, 3), omissing.missing[divergent_object].need);
-    EXPECT_EQ(last_update, oinfo.last_update);
-    EXPECT_EQ(last_update, oinfo.last_complete);
+    EXPECT_EQ(eversion_t(1, 2), omissing.missing[divergent_object].need);
+    EXPECT_EQ(eversion_t(1, 6), oinfo.last_update);
+    EXPECT_EQ(eversion_t(1, 1), oinfo.last_complete);
   }
 
   /*        +--------------------------+
@@ -1404,6 +1451,7 @@ TEST_F(PGLogTest, proc_replica_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
@@ -1487,6 +1535,7 @@ TEST_F(PGLogTest, proc_replica_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;
@@ -1576,6 +1625,7 @@ TEST_F(PGLogTest, proc_replica_log) {
 
     {
       pg_log_entry_t e;
+      e.mod_desc.mark_unrollbackable();
 
       e.version = eversion_t(1, 1);
       e.soid.hash = 0x5;