]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: chunky scrub, scrub PGs a chunk of objects at a time
authorMike Ryan <mike.ryan@inktank.com>
Mon, 16 Jul 2012 22:58:26 +0000 (15:58 -0700)
committerMike Ryan <mike.ryan@inktank.com>
Wed, 5 Sep 2012 21:19:13 +0000 (14:19 -0700)
Chunky scrub is a more efficient scrub. It blocks writes on a subset of
objects and scrubs those, allowing writes through to the rest of the PG.

The scrub takes longer to complete than a classic scrub, but improves
overall write throughput.

This feature is backward-compatible with classic scrub. If the primary
detects that any replica does not have the chunky scrub feature, it
falls back to the less efficient classic scrub.

Signed-off-by: Mike Ryan <mike.ryan@inktank.com>
src/include/ceph_features.h
src/messages/MOSDRepScrub.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 71654a35446e3b2596bd5339f0a1d0df3a21dc86..d00ae7c938b016b1d4828c06ac14283bfe147397 100644 (file)
@@ -23,6 +23,7 @@
 #define CEPH_FEATURE_QUERY_T        (1<<16)
 #define CEPH_FEATURE_INDEP_PG_MAP   (1<<17)
 #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
+#define CEPH_FEATURE_CHUNKY_SCRUB   (1<<19)
 
 /*
  * Features supported.  Should be everything above.
@@ -46,7 +47,8 @@
         CEPH_FEATURE_QUERY_T |          \
         CEPH_FEATURE_MONENC |           \
         CEPH_FEATURE_INDEP_PG_MAP |     \
-        CEPH_FEATURE_CRUSH_TUNABLES)
+        CEPH_FEATURE_CRUSH_TUNABLES |   \
+        CEPH_FEATURE_CHUNKY_SCRUB)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
 
index af3569c0a320fa1897eafb848a04d1870c47b160..184d153bcc99fc966aa24c52e6479adfc4bb62d5 100644 (file)
 
 struct MOSDRepScrub : public Message {
 
-  static const int HEAD_VERSION = 2;
+  static const int HEAD_VERSION = 3;
+  static const int COMPAT_VERSION = 2;
 
   pg_t pgid;             // PG to scrub
   eversion_t scrub_from; // only scrub log entries after scrub_from
   eversion_t scrub_to;   // last_update_applied when message sent
   epoch_t map_epoch;
+  bool chunky;           // true for chunky scrubs
+  hobject_t start;       // lower bound of scrub, inclusive
+  hobject_t end;         // upper bound of scrub, exclusive
 
-  MOSDRepScrub() : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION) { }
+  MOSDRepScrub() : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION) { }
   MOSDRepScrub(pg_t pgid, eversion_t scrub_from, eversion_t scrub_to,
               epoch_t map_epoch)
-    : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION),
+    : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
       pgid(pgid),
       scrub_from(scrub_from),
       scrub_to(scrub_to),
-      map_epoch(map_epoch) { }
-  
+      map_epoch(map_epoch),
+      chunky(false) { }
+
+  MOSDRepScrub(pg_t pgid, eversion_t scrub_to, epoch_t map_epoch,
+               hobject_t start, hobject_t end)
+    : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+      pgid(pgid),
+      scrub_to(scrub_to),
+      map_epoch(map_epoch),
+      chunky(true),
+      start(start),
+      end(end) { }
+
+
 private:
   ~MOSDRepScrub() {}
 
@@ -48,7 +64,9 @@ public:
   void print(ostream& out) const {
     out << "replica scrub(pg: ";
     out << pgid << ",from:" << scrub_from << ",to:" << scrub_to
-       << "epoch:" << map_epoch;
+        << ",epoch:" << map_epoch << ",start:" << start << ",end:" << end
+        << ",chunky:" << chunky
+        << ",version:" << header.version;
     out << ")";
   }
 
@@ -57,6 +75,9 @@ public:
     ::encode(scrub_from, payload);
     ::encode(scrub_to, payload);
     ::encode(map_epoch, payload);
+    ::encode(chunky, payload);
+    ::encode(start, payload);
+    ::encode(end, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
@@ -64,6 +85,14 @@ public:
     ::decode(scrub_from, p);
     ::decode(scrub_to, p);
     ::decode(map_epoch, p);
+
+    if (header.version >= 3) {
+      ::decode(chunky, p);
+      ::decode(start, p);
+      ::decode(end, p);
+    } else { // v2 scrub: non-chunky
+      chunky = false;
+    }
   }
 };
 
index 61476ccfebcd8016e0ec5bf42572de838211ccec..7fea23ba5a39da1ad6e08af9aab8243771405e88 100644 (file)
@@ -84,6 +84,10 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   scrub_reserved(false), scrub_reserve_failed(false),
   scrub_waiting_on(0),
   active_rep_scrub(0),
+  scrub_is_chunky(false),
+  scrub_errors(0),
+  scrub_fixed(0),
+  active_pushes(0),
   recovery_state(this)
 {
 }
@@ -2767,29 +2771,40 @@ void PG::sub_op_scrub_map(OpRequestRef op)
 
   dout(10) << " got osd." << from << " scrub map" << dendl;
   bufferlist::iterator p = m->get_data().begin();
-  if (scrub_received_maps.count(from)) {
-    ScrubMap incoming;
-    incoming.decode(p, info.pgid.pool());
-    dout(10) << "from replica " << from << dendl;
-    dout(10) << "map version is " << incoming.valid_through << dendl;
-    scrub_received_maps[from].merge_incr(incoming);
-  } else {
+
+  if (scrub_is_chunky) { // chunky scrub
     scrub_received_maps[from].decode(p, info.pgid.pool());
+    dout(10) << "map version is " << scrub_received_maps[from].valid_through << dendl;
+  } else {               // classic scrub
+    if (scrub_received_maps.count(from)) {
+      ScrubMap incoming;
+      incoming.decode(p, info.pgid.pool());
+      dout(10) << "from replica " << from << dendl;
+      dout(10) << "map version is " << incoming.valid_through << dendl;
+      scrub_received_maps[from].merge_incr(incoming);
+    } else {
+      scrub_received_maps[from].decode(p, info.pgid.pool());
+    }
   }
 
   --scrub_waiting_on;
   scrub_waiting_on_whom.erase(from);
+
   if (scrub_waiting_on == 0) {
-    if (finalizing_scrub) { // incremental lists received
-      osd->scrub_finalize_wq.queue(this);
-    } else {                // initial lists received
-      scrub_block_writes = true;
-      if (last_update_applied == info.last_update) {
-        finalizing_scrub = true;
-        scrub_gather_replica_maps();
-        ++scrub_waiting_on;
-        scrub_waiting_on_whom.insert(osd->whoami);
-        osd->scrub_wq.queue(this);
+    if (scrub_is_chunky) {    // chunky scrub
+      osd->scrub_wq.queue(this);
+    } else {                  // classic scrub
+      if (finalizing_scrub) { // incremental lists received
+        osd->scrub_finalize_wq.queue(this);
+      } else {                // initial lists received
+        scrub_block_writes = true;
+        if (last_update_applied == info.last_update) {
+          finalizing_scrub = true;
+          scrub_gather_replica_maps();
+          ++scrub_waiting_on;
+          scrub_waiting_on_whom.insert(osd->whoami);
+          osd->scrub_wq.queue(this);
+        }
       }
     }
   }
@@ -2821,7 +2836,8 @@ void PG::_scan_list(ScrubMap &map, vector<hobject_t> &ls)
   }
 }
 
-void PG::_request_scrub_map(int replica, eversion_t version)
+// send scrub v2-compatible messages (classic scrub)
+void PG::_request_scrub_map_classic(int replica, eversion_t version)
 {
   assert(replica != osd->whoami);
   dout(10) << "scrub  requesting scrubmap from osd." << replica << dendl;
@@ -2832,6 +2848,18 @@ void PG::_request_scrub_map(int replica, eversion_t version)
                                        get_osdmap()->get_cluster_inst(replica));
 }
 
+// send scrub v3 messages (chunky scrub)
+void PG::_request_scrub_map(int replica, eversion_t version, hobject_t start, hobject_t end)
+{
+  assert(replica != osd->whoami);
+  dout(10) << "scrub  requesting scrubmap from osd." << replica << dendl;
+  MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
+                                              get_osdmap()->get_epoch(),
+                                              start, end);
+  osd->cluster_messenger->send_message(repscrubop,
+                                       get_osdmap()->get_cluster_inst(replica));
+}
+
 void PG::sub_op_scrub_reserve(OpRequestRef op)
 {
   MOSDSubOp *m = (MOSDSubOp*)op->request;
@@ -2954,6 +2982,37 @@ void PG::scrub_unreserve_replicas()
   }
 }
 
+/*
+ * build a scrub map over a chunk without releasing the lock
+ * only used by chunky scrub
+ */
+int PG::build_scrub_map_chunk(ScrubMap &map, hobject_t start, hobject_t end)
+{
+  dout(10) << "build_scrub_map" << dendl;
+  dout(20) << "scrub_map_chunk [" << start << "," << end << ")" << dendl;
+
+  map.valid_through = info.last_update;
+
+  // objects
+  vector<hobject_t> ls;
+  int ret = osd->store->collection_list_range(coll, start, end, 0, &ls);
+  if (ret < 0) {
+    dout(5) << "collection_list_range error: " << ret << dendl;
+    return ret;
+  }
+
+  _scan_list(map, ls);
+
+  // pg attrs
+  osd->store->collection_getattrs(coll, map.attrs);
+
+  // log
+  osd->store->read(coll_t(), log_oid, 0, 0, map.logbl);
+  dout(10) << " done.  pg log is " << map.logbl.length() << " bytes" << dendl;
+
+  return 0;
+}
+
 /*
  * build a (sorted) summary of pg content for purposes of scrubbing
  * called while holding pg lock
@@ -3053,6 +3112,8 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer
 }
 
 /* replica_scrub
+ *
+ * Classic behavior:
  *
  * If msg->scrub_from is not set, replica_scrub calls build_scrubmap to
  * build a complete map (with the pg lock dropped).
@@ -3062,6 +3123,12 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer
  * replica_scrub returns to be requeued by sub_op_modify_applied.
  * replica_scrub then builds an incremental scrub map with the 
  * pg lock held.
+ *
+ * Chunky behavior:
+ *
+ * Wait for last_update_applied to match msg->scrub_to as above. Wait
+ * for pushes to complete in case of recent recovery. Build a single
+ * scrubmap of objects that are in the range [msg->start, msg->end).
  */
 void PG::replica_scrub(MOSDRepScrub *msg)
 {
@@ -3081,27 +3148,47 @@ void PG::replica_scrub(MOSDRepScrub *msg)
   }
 
   ScrubMap map;
-  if (msg->scrub_from > eversion_t()) {
-    if (finalizing_scrub) {
-      assert(last_update_applied == info.last_update);
-      assert(last_update_applied == msg->scrub_to);
-    } else {
-      finalizing_scrub = 1;
-      if (last_update_applied != msg->scrub_to) {
-       active_rep_scrub = msg;
-       msg->get();
-       return;
-      }
+
+  if (msg->chunky) { // chunky scrub
+    if (last_update_applied < msg->scrub_to) {
+      dout(10) << "waiting for last_update_applied to catch up" << dendl;
+      active_rep_scrub = msg;
+      msg->get();
+      return;
     }
-    build_inc_scrub_map(map, msg->scrub_from);
-    finalizing_scrub = 0;
+
+    if (active_pushes > 0) {
+      dout(10) << "waiting for active pushes to finish" << dendl;
+      active_rep_scrub = msg;
+      msg->get();
+      return;
+    }
+
+    build_scrub_map_chunk(map, msg->start, msg->end);
+
   } else {
-    build_scrub_map(map);
-  }
+    if (msg->scrub_from > eversion_t()) {
+      if (finalizing_scrub) {
+        assert(last_update_applied == info.last_update);
+        assert(last_update_applied == msg->scrub_to);
+      } else {
+        finalizing_scrub = 1;
+        if (last_update_applied != msg->scrub_to) {
+          active_rep_scrub = msg;
+          msg->get();
+          return;
+        }
+      }
+      build_inc_scrub_map(map, msg->scrub_from);
+      finalizing_scrub = 0;
+    } else {
+      build_scrub_map(map);
+    }
 
-  if (msg->map_epoch < info.history.same_interval_since) {
-    dout(10) << "scrub  pg changed, aborting" << dendl;
-    return;
+    if (msg->map_epoch < info.history.same_interval_since) {
+      dout(10) << "scrub  pg changed, aborting" << dendl;
+      return;
+    }
   }
 
   vector<OSDOp> scrub(1);
@@ -3120,30 +3207,11 @@ void PG::replica_scrub(MOSDRepScrub *msg)
 /* Scrub:
  * PG_STATE_SCRUBBING is set when the scrub is queued
  * 
- * Once the initial scrub has completed and the requests have gone out to 
- * replicas for maps, we set scrub_active and wait for the replicas to
- * complete their maps. Once the maps are received, scrub_block_writes is set.
- * scrub_waiting_on is set to the number of maps outstanding (active.size()).
- *
- * If last_update_applied is behind the head of the log, scrub returns to be
- * requeued by op_applied.
- *
- * Once last_update_applied == info.last_update, scrub catches itself up and
- * decrements scrub_waiting_on.
- *
- * sub_op_scrub_map similarly decrements scrub_waiting_on for each map received.
- * 
- * Once scrub_waiting_on hits 0 (either in scrub or sub_op_scrub_map) 
- * scrub_finalize is queued.
- * 
- * In scrub_finalize, if any replica maps are too old, new ones are requested,
- * scrub_waiting_on is reset, and scrub_finalize returns to be requeued by
- * sub_op_scrub_map.  If all maps are up to date, scrub_finalize checks 
- * the maps and performs repairs.
+ * scrub will be chunky if all OSDs in PG support chunky scrub
+ * scrub will fall back to classic in any other case
  */
 void PG::scrub()
 {
-
   lock();
   if (deleting) {
     unlock();
@@ -3157,6 +3225,74 @@ void PG::scrub()
     return;
   }
 
+  // when we're starting a scrub, we need to determine which type of scrub to do
+  if (!scrub_active) {
+    OSDMapRef curmap = osd->get_osdmap();
+    scrub_is_chunky = true;
+    for (unsigned i=1; i<acting.size(); i++) {
+      Connection *con = osd->cluster_messenger->get_connection(curmap->get_cluster_inst(acting[i]));
+      if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
+        dout(20) << "OSD " << acting[i]
+                 << " does not support chunky scrubs, falling back to classic"
+                 << dendl;
+        scrub_is_chunky = false;
+        break;
+      }
+    }
+
+    dout(10) << "starting a new " << (scrub_is_chunky ? "chunky" : "classic") << " scrub" << dendl;
+  }
+
+  if (scrub_is_chunky) {
+    chunky_scrub();
+  } else {
+    classic_scrub();
+  }
+
+  unlock();
+}
+
+/*
+ * Classic scrub is a two stage scrub: an initial scrub with writes enabled
+ * followed by a finalize with writes blocked.
+ *
+ * A request is sent out to all replicas for initial scrub maps. Once they reply
+ * (sub_op_scrub_map) writes are blocked for all objects in the PG.
+ *
+ * Finalize: Primaries and replicas wait for all writes in the log to be applied
+ * (op_applied), then builds an incremental scrub of all the changes since the
+ * beginning of the scrub.
+ *
+ * Once the primary has received all maps, it compares them and performs
+ * repairs.
+ *
+ * The initial stage of the scrub is handled by scrub_wq and the final stage by
+ * scrub_finalize_wq.
+ *
+ * Relevant variables:
+ *
+ * scrub_waiting_on (int)
+ * scrub_waiting_on_whom
+ *    Number of people who still need to build an initial/incremental scrub map.
+ *    This is decremented in sub_op_scrub_map.
+ *
+ * last_update_applied
+ *    The last update that's hit the disk. In the finalize stage, we block
+ *    writes and wait for all writes to flush by checking:
+ *
+ *      last_update_appied == info.last_update
+ *
+ *    This is checked in op_applied.
+ *
+ *  scrub_block_writes
+ *    Flag to determine if writes are blocked.
+ *
+ *  finalizing scrub
+ *    Flag set when we're in the finalize stage.
+ *
+ */
+void PG::classic_scrub()
+{
   if (!scrub_active) {
     dout(10) << "scrub start" << dendl;
     scrub_active = true;
@@ -3184,7 +3320,7 @@ void PG::scrub()
 
     // request maps from replicas
     for (unsigned i=1; i<acting.size(); i++) {
-      _request_scrub_map(acting[i], eversion_t());
+      _request_scrub_map_classic(acting[i], eversion_t());
     }
 
     // Unlocks and relocks...
@@ -3195,7 +3331,6 @@ void PG::scrub()
       dout(10) << "scrub  pg changed, aborting" << dendl;
       scrub_clear_state();
       scrub_unreserve_replicas();
-      unlock();
       return;
     }
 
@@ -3207,13 +3342,11 @@ void PG::scrub()
       scrub_block_writes = true;
     } else {
       dout(10) << "wait for replicas to build initial scrub map" << dendl;
-      unlock();
       return;
     }
 
     if (last_update_applied != info.last_update) {
       dout(10) << "wait for cleanup" << dendl;
-      unlock();
       return;
     }
 
@@ -3234,7 +3367,6 @@ void PG::scrub()
     dout(10) << "scrub  pg changed, aborting" << dendl;
     scrub_clear_state();
     scrub_unreserve_replicas();
-    unlock();
     return;
   }
   
@@ -3250,8 +3382,271 @@ void PG::scrub()
     assert(last_update_applied == info.last_update);
     osd->scrub_finalize_wq.queue(this);
   }
-  
-  unlock();
+}
+
+/*
+ * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
+ * chunk.
+ *
+ * The object store is partitioned into chunks which end on hash boundaries. For
+ * each chunk, the following logic is performed:
+ *
+ *  (1) Block writes on the chunk
+ *  (2) Request maps from replicas
+ *  (3) Wait for pushes to be applied (after recovery)
+ *  (4) Wait for writes to flush on the chunk
+ *  (5) Wait for maps from replicas
+ *  (6) Compare / repair all scrub maps
+ *
+ * This logic is encoded in the very linear state machine:
+ *
+ *              +---------------------+
+ *  ____________v_____________        |
+ * |                          |       |
+ * |      SCRUB_INACTIVE      |       |
+ * |__________________________|       |
+ *              |                     |
+ *              |   +-------------+   |
+ *  ____________v___v_________    |   |
+ * |                          |   |   |
+ * |      SCRUB_NEW_CHUNK     |   |   |
+ * |__________________________|   |   |
+ *              |                 |   |
+ *   ___________v_____________    |   |
+ *  |                         |   |   |
+ *  |    SCRUB_WAIT_PUSHES    |   |   |
+ *  |_________________________|   |   |
+ *              |                 |   |
+ *  ____________v_____________    |   |
+ * |                          |   |   |
+ * |  SCRUB_WAIT_LAST_UPDATE  |   |   |
+ * |__________________________|   |   |
+ *              |                 |   |
+ *  ____________v_____________    |   |
+ * |                          |   |   |
+ * |      SCRUB_BUILD_MAP     |   |   |
+ * |__________________________|   |   |
+ *              |                 |   |
+ *  ____________v_____________    |   |
+ * |                          |   |   |
+ * |    SCRUB_WAIT_REPLICAS   |   |   |
+ * |__________________________|   |   |
+ *              |                 |   |
+ *  ____________v_____________    |   |
+ * |                          |   |   |
+ * |    SCRUB_COMPARE_MAPS    |   |   |
+ * |__________________________|   |   |
+ *              |   |             |   |
+ *              |   +-------------+   |
+ *  ____________v_____________        |
+ * |                          |       |
+ * |       SCRUB_FINISH       |       |
+ * |__________________________|       |
+ *              |                     |
+ *              +---------------------+
+ *
+ * The primary determines the last update from the subset by walking the log. If
+ * it sees a log entry pertaining to a file in the chunk, it tells the replicas
+ * to wait until that update is applied before building a scrub map. Both the
+ * primary and replicas will wait for any active pushes to be applied.
+ *
+ * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
+ *
+ * scrub_state encodes the current state of the scrub (refer to state diagram
+ * for details).
+ */
+void PG::chunky_scrub() {
+  // check for map changes
+  if (is_chunky_scrub_active()) {
+    if (scrub_epoch_start != info.history.same_interval_since) {
+      dout(10) << "scrub  pg changed, aborting" << dendl;
+      scrub_clear_state();
+      scrub_unreserve_replicas();
+      return;
+    }
+  }
+
+  bool done = false;
+  int ret;
+
+  while (!done) {
+    dout(20) << "scrub state " << scrub_string(scrub_state) << dendl;
+
+    switch (scrub_state) {
+      case SCRUB_INACTIVE:
+        dout(10) << "scrub start" << dendl;
+
+        update_stats();
+        scrub_epoch_start = info.history.same_interval_since;
+        scrub_active = true;
+
+        osd->sched_scrub_lock.Lock();
+        if (scrub_reserved) {
+          --(osd->scrubs_pending);
+          assert(osd->scrubs_pending >= 0);
+          scrub_reserved = false;
+          scrub_reserved_peers.clear();
+        }
+        ++(osd->scrubs_active);
+        osd->sched_scrub_lock.Unlock();
+
+        scrub_start = hobject_t();
+        scrub_state = SCRUB_NEW_CHUNK;
+
+        break;
+
+      case SCRUB_NEW_CHUNK:
+        primary_scrubmap = ScrubMap();
+        scrub_received_maps.clear();
+
+        {
+
+          // get the start and end of our scrub chunk
+          //
+          // start and end need to lie on a hash boundary. We test for this by
+          // requesting a list and searching backward from the end looking for a
+          // boundary. If there's no boundary, we request a list after the first
+          // list, and so forth.
+
+          bool boundary_found = false;
+          hobject_t start = scrub_start;
+          while (!boundary_found) {
+            vector<hobject_t> objects;
+            ret = osd->store->collection_list_partial(coll, start,
+                                                      5, 5, 0,
+                                                      &objects, &scrub_end);
+            assert(ret >= 0);
+
+            // in case we don't find a boundary: start again at the end
+            start = scrub_end;
+
+            // special case: reached end of file store, implicitly a boundary
+            if (objects.size() == 0) {
+              break;
+            }
+
+            // search backward from the end looking for a boundary
+            objects.push_back(scrub_end);
+            while (!boundary_found && objects.size() > 1) {
+              hobject_t end = objects.back();
+              objects.pop_back();
+
+              if (objects.back().get_filestore_key() != end.get_filestore_key()) {
+                scrub_end = end;
+                boundary_found = true;
+              }
+            }
+          }
+        }
+
+        scrub_block_writes = true;
+
+        // walk the log to find the latest update that affects our chunk
+        scrub_subset_last_update = eversion_t();
+        for (list<pg_log_entry_t>::iterator p = log.log.begin();
+             p != log.log.end();
+             ++p) {
+          if (p->soid >= scrub_start && p->soid < scrub_end)
+            scrub_subset_last_update = p->version;
+        }
+
+        // ask replicas to wait until last_update_applied >= scrub_subset_last_update and then scan
+        scrub_waiting_on_whom.insert(osd->whoami);
+        ++scrub_waiting_on;
+
+        // request maps from replicas
+        for (unsigned i=1; i<acting.size(); i++) {
+          _request_scrub_map(acting[i], scrub_subset_last_update,
+                             scrub_start, scrub_end);
+          scrub_waiting_on_whom.insert(acting[i]);
+          ++scrub_waiting_on;
+        }
+
+        scrub_state = SCRUB_WAIT_PUSHES;
+
+        break;
+
+      case SCRUB_WAIT_PUSHES:
+        if (active_pushes == 0) {
+          scrub_state = SCRUB_WAIT_LAST_UPDATE;
+        } else {
+          dout(15) << "wait for pushes to apply" << dendl;
+          done = true;
+        }
+        break;
+
+      case SCRUB_WAIT_LAST_UPDATE:
+        if (last_update_applied >= scrub_subset_last_update) {
+          scrub_state = SCRUB_BUILD_MAP;
+        } else {
+          // will be requeued by op_applied
+          dout(15) << "wait for writes to flush" << dendl;
+          done = true;
+        }
+        break;
+
+      case SCRUB_BUILD_MAP:
+        assert(last_update_applied >= scrub_subset_last_update);
+
+        // build my own scrub map
+        ret = build_scrub_map_chunk(primary_scrubmap, scrub_start, scrub_end);
+        if (ret < 0) {
+          dout(5) << "error building scrub map: " << ret << ", aborting" << dendl;
+          scrub_clear_state();
+          scrub_unreserve_replicas();
+          return;
+        }
+
+        --scrub_waiting_on;
+        scrub_waiting_on_whom.erase(osd->whoami);
+
+        scrub_state = SCRUB_WAIT_REPLICAS;
+        break;
+
+      case SCRUB_WAIT_REPLICAS:
+        if (scrub_waiting_on > 0) {
+          // will be requeued by sub_op_scrub_map
+          dout(10) << "wait for replicas to build scrub map" << dendl;
+          done = true;
+        } else {
+          scrub_state = SCRUB_COMPARE_MAPS;
+        }
+        break;
+
+      case SCRUB_COMPARE_MAPS:
+        assert(last_update_applied >= scrub_subset_last_update);
+        assert(scrub_waiting_on == 0);
+
+        scrub_compare_maps();
+        scrub_block_writes = false;
+
+        // requeue the writes from the chunk that just finished
+        requeue_ops(waiting_for_active);
+
+        if (scrub_end < hobject_t::get_max()) {
+          // schedule another leg of the scrub
+          scrub_start = scrub_end;
+
+          scrub_state = SCRUB_NEW_CHUNK;
+          osd->scrub_wq.queue(this);
+          done = true;
+        } else {
+          scrub_state = SCRUB_FINISH;
+        }
+
+        break;
+
+      case SCRUB_FINISH:
+        scrub_finish();
+        scrub_state = SCRUB_INACTIVE;
+        done = true;
+
+        break;
+
+      default:
+        assert(0);
+    }
+  }
 }
 
 void PG::scrub_clear_state()
@@ -3276,6 +3671,16 @@ void PG::scrub_clear_state()
     active_rep_scrub = NULL;
   }
   scrub_received_maps.clear();
+
+  scrub_state = SCRUB_INACTIVE;
+  scrub_start = hobject_t();
+  scrub_end = hobject_t();
+  scrub_subset_last_update = eversion_t();
+  scrub_errors = 0;
+  scrub_fixed = 0;
+
+  // type-specific state clear
+  _scrub_clear_state();
 }
 
 bool PG::scrub_gather_replica_maps() {
@@ -3290,7 +3695,7 @@ bool PG::scrub_gather_replica_maps() {
       scrub_waiting_on++;
       scrub_waiting_on_whom.insert(p->first);
       // Need to request another incremental map
-      _request_scrub_map(p->first, p->second.valid_through);
+      _request_scrub_map_classic(p->first, p->second.valid_through);
     }
   }
   
@@ -3398,31 +3803,8 @@ void PG::_compare_scrubmaps(const map<int,ScrubMap*> &maps,
   }
 }
 
-void PG::scrub_finalize() {
-  lock();
-  if (deleting) {
-    unlock();
-    return;
-  }
-
-  assert(last_update_applied == info.last_update);
-
-  if (scrub_epoch_start != info.history.same_interval_since) {
-    dout(10) << "scrub  pg changed, aborting" << dendl;
-    scrub_clear_state();
-    scrub_unreserve_replicas();
-    unlock();
-    return;
-  }
-  
-  if (!scrub_gather_replica_maps()) {
-    dout(10) << "maps not yet up to date, sent out new requests" << dendl;
-    unlock();
-    return;
-  }
-
-  dout(10) << "scrub_finalize has maps, analyzing" << dendl;
-  int errors = 0, fixed = 0;
+void PG::scrub_compare_maps() {
+  dout(10) << "scrub_compare_maps has maps, analyzing" << dendl;
   bool repair = state_test(PG_STATE_REPAIR);
   const char *mode = repair ? "repair":"scrub";
   if (acting.size() > 1) {
@@ -3489,27 +3871,67 @@ void PG::scrub_finalize() {
   }
 
   // ok, do the pg-type specific scrubbing
-  _scrub(primary_scrubmap, &errors, &fixed);
+  _scrub(primary_scrubmap);
+}
+
+void PG::scrub_finalize() {
+  lock();
+  if (deleting) {
+    unlock();
+    return;
+  }
+
+  assert(last_update_applied == info.last_update);
+
+  if (scrub_epoch_start != info.history.same_interval_since) {
+    dout(10) << "scrub  pg changed, aborting" << dendl;
+    scrub_clear_state();
+    scrub_unreserve_replicas();
+    unlock();
+    return;
+  }
+
+  if (!scrub_gather_replica_maps()) {
+    dout(10) << "maps not yet up to date, sent out new requests" << dendl;
+    unlock();
+    return;
+  }
+
+  scrub_compare_maps();
+
+  scrub_finish();
+
+  dout(10) << "scrub done" << dendl;
+  unlock();
+}
+
+// the part that actually finalizes a scrub
+void PG::scrub_finish() {
+  bool repair = state_test(PG_STATE_REPAIR);
+  const char *mode = repair ? "repair":"scrub";
+
+  // type-specific finish (can tally more errors)
+  _scrub_finish();
+
+  if (scrub_errors == 0 || (repair && (scrub_errors - scrub_fixed) == 0))
+    state_clear(PG_STATE_INCONSISTENT);
 
   {
     stringstream oss;
     oss << info.pgid << " " << mode << " ";
-    if (errors)
-      oss << errors << " errors";
+    if (scrub_errors)
+      oss << scrub_errors << " errors";
     else
       oss << "ok";
     if (repair)
-      oss << ", " << fixed << " fixed";
+      oss << ", " << scrub_fixed << " fixed";
     oss << "\n";
-    if (errors)
+    if (scrub_errors)
       osd->clog.error(oss);
     else
       osd->clog.info(oss);
   }
 
-  if (errors == 0 || (repair && (errors - fixed) == 0))
-    state_clear(PG_STATE_INCONSISTENT);
-
   // finish up
   osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
   info.history.last_scrub = info.last_update;
@@ -3529,9 +3951,6 @@ void PG::scrub_finalize() {
   if (is_active() && is_primary()) {
     share_pg_info();
   }
-
-  dout(10) << "scrub done" << dendl;
-  unlock();
 }
 
 void PG::share_pg_info()
index 72b5c594cc065ed7948b90b061c3cfaf5382cc40..e2f240f7ded4237dab02d9b1b33bfde0a52ca097 100644 (file)
@@ -785,6 +785,43 @@ public:
   ScrubMap primary_scrubmap;
   MOSDRepScrub *active_rep_scrub;
 
+  bool scrub_is_chunky;
+
+  hobject_t scrub_start;
+  hobject_t scrub_end;
+  eversion_t scrub_subset_last_update;
+  int scrub_errors;
+  int scrub_fixed;
+
+  int active_pushes;
+
+  enum ScrubState {
+    SCRUB_INACTIVE,
+    SCRUB_NEW_CHUNK,
+    SCRUB_WAIT_PUSHES,
+    SCRUB_WAIT_LAST_UPDATE,
+    SCRUB_BUILD_MAP,
+    SCRUB_WAIT_REPLICAS,
+    SCRUB_COMPARE_MAPS,
+    SCRUB_FINISH,
+  } scrub_state;
+
+  static const char *scrub_string(const PG::ScrubState& state) {
+    const char *ret = NULL;
+    switch( state )
+    {
+      case SCRUB_INACTIVE: ret = "SCRUB_INACTIVE"; break;
+      case SCRUB_NEW_CHUNK: ret = "SCRUB_NEW_CHUNK"; break;
+      case SCRUB_WAIT_PUSHES: ret = "SCRUB_WAIT_PUSHES"; break;
+      case SCRUB_WAIT_LAST_UPDATE: ret = "SCRUB_WAIT_LAST_UPDATE"; break;
+      case SCRUB_BUILD_MAP: ret = "SCRUB_BUILD_MAP"; break;
+      case SCRUB_WAIT_REPLICAS: ret = "SCRUB_WAIT_REPLICAS"; break;
+      case SCRUB_COMPARE_MAPS: ret = "SCRUB_COMPARE_MAPS"; break;
+      case SCRUB_FINISH: ret = "SCRUB_FINISH"; break;
+    }
+    return ret;
+  }
+
   void repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer, int ok_peer);
   bool _compare_scrub_objects(ScrubMap::object &auth,
                              ScrubMap::object &candidate,
@@ -795,14 +832,22 @@ public:
                          map<hobject_t, int> &authoritative,
                          ostream &errorstream);
   void scrub();
+  void classic_scrub();
+  void chunky_scrub();
+  void scrub_compare_maps();
   void scrub_finalize();
+  void scrub_finish();
   void scrub_clear_state();
   bool scrub_gather_replica_maps();
   void _scan_list(ScrubMap &map, vector<hobject_t> &ls);
-  void _request_scrub_map(int replica, eversion_t version);
+  void _request_scrub_map_classic(int replica, eversion_t version);
+  void _request_scrub_map(int replica, eversion_t version, hobject_t start, hobject_t end);
+  int build_scrub_map_chunk(ScrubMap &map, hobject_t start, hobject_t end);
   void build_scrub_map(ScrubMap &map);
   void build_inc_scrub_map(ScrubMap &map, eversion_t v);
-  virtual int _scrub(ScrubMap &map, int* errors, int* fixed) { return 0; }
+  virtual void _scrub(ScrubMap &map) { }
+  virtual void _scrub_clear_state() { }
+  virtual void _scrub_finish() { }
   virtual coll_t get_temp_coll() = 0;
   virtual bool have_temp_coll() = 0;
   void clear_scrub_reserved();
@@ -1436,6 +1481,8 @@ public:
 
   bool       is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); }
 
+  bool       is_chunky_scrub_active() const { return scrub_state != SCRUB_INACTIVE; }
+
   bool  is_empty() const { return info.last_update == eversion_t(0,0); }
 
   void init(int role, vector<int>& up, vector<int>& acting, pg_history_t& history,
index 7be408437936b7ad12d83d10b685e85daefd87f0..11a75b6ab6edfe7f993fae43dc5d0d82f7e507ee 100644 (file)
@@ -615,17 +615,22 @@ void ReplicatedPG::do_op(OpRequestRef op)
 
   dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
 
+  hobject_t head(m->get_oid(), m->get_object_locator().key,
+                CEPH_NOSNAP, m->get_pg().ps(),
+                info.pgid.pool());
+
   if (scrub_block_writes && m->may_write()) {
-    dout(20) << __func__ << ": waiting for scrub" << dendl;
-    waiting_for_active.push_back(op);
-    op->mark_delayed();
-    return;
+    // classic (non chunk) scrubs block all writes
+    // chunky scrubs only block writes to a range
+    if (!scrub_is_chunky || (head >= scrub_start && head < scrub_end)) {
+      dout(20) << __func__ << ": waiting for scrub" << dendl;
+      waiting_for_active.push_back(op);
+      op->mark_delayed();
+      return;
+    }
   }
 
   // missing object?
-  hobject_t head(m->get_oid(), m->get_object_locator().key,
-                CEPH_NOSNAP, m->get_pg().ps(),
-                info.pgid.pool());
   if (is_missing_object(head)) {
     wait_for_missing_object(head, op);
     return;
@@ -1439,7 +1444,7 @@ void ReplicatedPG::snap_trimmer()
   dout(10) << "snap_trimmer entry" << dendl;
   if (is_primary()) {
     entity_inst_t nobody;
-    if (!mode.try_write(nobody) || scrub_block_writes) {
+    if (!mode.try_write(nobody) || scrub_active) {
       dout(10) << " can't write, requeueing" << dendl;
       queue_snap_trim();
       unlock();
@@ -3348,6 +3353,12 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
       pending_backfill_updates[soid].stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
   }
 
+  if (scrub_active && scrub_is_chunky) {
+    assert(soid < scrub_start || soid >= scrub_end);
+    if (soid < scrub_start)
+      scrub_cstat.add(ctx->delta_stats, ctx->obc->obs.oi.category);
+  }
+
   return result;
 }
 
@@ -3456,7 +3467,15 @@ void ReplicatedPG::op_applied(RepGather *repop)
     assert(info.last_update >= repop->v);
     assert(last_update_applied < repop->v);
     last_update_applied = repop->v;
-    if (last_update_applied == info.last_update && scrub_block_writes) {
+
+    // chunky scrub
+    if (scrub_active && scrub_is_chunky) {
+      if (last_update_applied == scrub_subset_last_update) {
+        osd->scrub_wq.queue(this);
+      }
+
+    // classic scrub
+    } else if (last_update_applied == info.last_update && scrub_block_writes) {
       dout(10) << "requeueing scrub for cleanup" << dendl;
       finalizing_scrub = true;
       scrub_gather_replica_maps();
@@ -4360,8 +4379,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
     assert(info.last_update >= m->version);
     assert(last_update_applied < m->version);
     last_update_applied = m->version;
-    if (finalizing_scrub) {
-      assert(active_rep_scrub);
+    if (active_rep_scrub) {
       if (last_update_applied == active_rep_scrub->scrub_to) {
        osd->rep_scrub_wq.queue(active_rep_scrub);
        active_rep_scrub = 0;
@@ -5078,7 +5096,7 @@ void ReplicatedPG::handle_push(OpRequestRef op)
   bool complete = m->recovery_progress.data_complete &&
     m->recovery_progress.omap_complete;
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  Context *onreadable = new ObjectStore::C_DeleteTransaction(t);
+  Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
   Context *onreadable_sync = 0;
   submit_push_data(m->recovery_info,
                   first,
@@ -5387,6 +5405,34 @@ void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, Object
   lock();
   dout(10) << "_applied_recovered_object " << *obc << dendl;
   put_object_context(obc);
+
+  assert(active_pushes >= 1);
+  --active_pushes;
+
+  // requeue an active chunky scrub waiting on recovery ops
+  if (active_pushes == 0 && is_chunky_scrub_active()) {
+    osd->scrub_wq.queue(this);
+  }
+
+  unlock();
+  delete t;
+}
+
+void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
+{
+  lock();
+  dout(10) << "_applied_recovered_object_replica" << dendl;
+
+  assert(active_pushes >= 1);
+  --active_pushes;
+
+  // requeue an active chunky scrub waiting on recovery ops
+  if (active_pushes == 0 &&
+      active_rep_scrub && active_rep_scrub->chunky) {
+    osd->rep_scrub_wq.queue(active_rep_scrub);
+    active_rep_scrub = 0;
+  }
+
   unlock();
   delete t;
 }
@@ -5476,6 +5522,10 @@ void ReplicatedPG::trim_pushed_data(
 void ReplicatedPG::sub_op_push(OpRequestRef op)
 {
   op->mark_started();
+
+  // keep track of active pushes for scrub
+  ++active_pushes;
+
   if (is_primary()) {
     handle_pull_response(op);
   } else {
@@ -6074,6 +6124,8 @@ int ReplicatedPG::recover_primary(int max)
 
              recover_got(soid, latest->version);
 
+             ++active_pushes;
+
              osd->store->queue_transaction(osr.get(), t,
                                            new C_OSD_AppliedRecoveredObject(this, t, obc),
                                            new C_OSD_CommittedPushedObject(this, OpRequestRef(),
@@ -6538,7 +6590,7 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
 // SCRUB
 
 
-int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
+void ReplicatedPG::_scrub(ScrubMap& scrubmap)
 {
   dout(10) << "_scrub" << dendl;
 
@@ -6551,8 +6603,6 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
   SnapSet snapset;
   vector<snapid_t>::reverse_iterator curclone;
 
-  object_stat_collection_t cstat;
-
   bufferlist last_data;
 
   for (map<hobject_t,ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin(); 
@@ -6569,7 +6619,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
       if (p->second.attrs.count(SS_ATTR) == 0) {
        osd->clog.error() << mode << " " << info.pgid << " " << soid
                          << " no '" << SS_ATTR << "' attr";
-       ++(*errors);
+        ++scrub_errors;
        continue;
       }
       bufferlist bl;
@@ -6581,7 +6631,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
       if (head != hobject_t()) {
        osd->clog.error() << mode << " " << info.pgid << " " << head
                          << " missing clones";
-       ++(*errors);
+        ++scrub_errors;
       }
       
       // what will be next?
@@ -6606,7 +6656,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
     }
     if (soid.snap == CEPH_SNAPDIR) {
       string cat;
-      cstat.add(stat, cat);
+      scrub_cstat.add(stat, cat);
       continue;
     }
 
@@ -6614,7 +6664,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
     if (p->second.attrs.count(OI_ATTR) == 0) {
       osd->clog.error() << mode << " " << info.pgid << " " << soid
                        << " no '" << OI_ATTR << "' attr";
-      ++(*errors);
+      ++scrub_errors;
       continue;
     }
     bufferlist bv;
@@ -6625,7 +6675,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
       osd->clog.error() << mode << " " << info.pgid << " " << soid
                        << " on disk size (" << p->second.size
                        << ") does not match object info size (" << oi.size << ")";
-      ++(*errors);
+      ++scrub_errors;
     }
 
     dout(20) << mode << "  " << soid << " " << oi << dendl;
@@ -6640,7 +6690,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
       if (!snapset.head_exists) {
        osd->clog.error() << mode << " " << info.pgid << " " << soid
                          << " snapset.head_exists=false, but object exists";
-       ++(*errors);
+        ++scrub_errors;
        continue;
       }
     } else if (soid.snap) {
@@ -6652,7 +6702,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
       if (soid.snap != *curclone) {
        osd->clog.error() << mode << " " << info.pgid << " " << soid
                          << " expected clone " << *curclone;
-       ++(*errors);
+        ++scrub_errors;
        assert(soid.snap == *curclone);
       }
 
@@ -6673,35 +6723,45 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
     }
 
     string cat; // fixme
-    cstat.add(stat, cat);
-  }  
+    scrub_cstat.add(stat, cat);
+  }
   
+  dout(10) << "_scrub (" << mode << ") finish" << dendl;
+}
+
+void ReplicatedPG::_scrub_clear_state()
+{
+  scrub_cstat = object_stat_collection_t();
+}
+
+void ReplicatedPG::_scrub_finish()
+{
+  bool repair = state_test(PG_STATE_REPAIR);
+  const char *mode = repair ? "repair":"scrub";
+
   dout(10) << mode << " got "
-          << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
-          << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
-          << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes."
+          << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
+          << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
+          << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes."
           << dendl;
 
-  if (cstat.sum.num_objects != info.stats.stats.sum.num_objects ||
-      cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones ||
-      cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) {
+  if (scrub_cstat.sum.num_objects != info.stats.stats.sum.num_objects ||
+      scrub_cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones ||
+      scrub_cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) {
     osd->clog.error() << info.pgid << " " << mode
                      << " stat mismatch, got "
-                     << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
-                     << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
-                     << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n";
-    ++(*errors);
+                     << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
+                     << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
+                     << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n";
+    ++scrub_errors;
 
     if (repair) {
-      ++(*fixed);
-      info.stats.stats = cstat;
+      ++scrub_fixed;
+      info.stats.stats = scrub_cstat;
       update_stats();
       share_pg_info();
     }
   }
-
-  dout(10) << "_scrub (" << mode << ") finish" << dendl;
-  return (*errors);
 }
 
 /*---SnapTrimmer Logging---*/
@@ -6749,7 +6809,7 @@ boost::statechart::result ReplicatedPG::NotTrimming::react(const SnapTrim&)
   } else if (!pg->is_primary() || !pg->is_active() || !pg->is_clean()) {
     dout(10) << "NotTrimming not primary, active, clean" << dendl;
     return discard_event();
-  } else if (pg->scrub_block_writes) {
+  } else if (pg->scrub_active) {
     dout(10) << "NotTrimming finalizing scrub" << dendl;
     pg->queue_snap_trim();
     return discard_event();
index 2cd54301f3b7a116b59a6c6daf0ddd484857d6a4..9c2836e7e090c7ab4a00a59c0a062521aaad67f4 100644 (file)
@@ -885,6 +885,15 @@ protected:
       pg->put();
     }
   };
+  struct C_OSD_AppliedRecoveredObjectReplica : public Context {
+    boost::intrusive_ptr<ReplicatedPG> pg;
+    ObjectStore::Transaction *t;
+    C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p, ObjectStore::Transaction *tt) :
+      pg(p), t(tt) {}
+    void finish(int r) {
+      pg->_applied_recovered_object_replica(t);
+    }
+  };
 
   void sub_op_remove(OpRequestRef op);
 
@@ -894,6 +903,7 @@ protected:
 
   void sub_op_modify_reply(OpRequestRef op);
   void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
+  void _applied_recovered_object_replica(ObjectStore::Transaction *t);
   void _committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t lc);
   void recover_got(hobject_t oid, eversion_t v);
   void sub_op_push(OpRequestRef op);
@@ -905,7 +915,10 @@ protected:
 
 
   // -- scrub --
-  virtual int _scrub(ScrubMap& map, int* errors, int* fixed);
+  virtual void _scrub(ScrubMap& map);
+  virtual void _scrub_clear_state();
+  virtual void _scrub_finish();
+  object_stat_collection_t scrub_cstat;
 
   void apply_and_flush_repops(bool requeue);