]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: recover discontiguous peers using backfill instead of backlog
authorSage Weil <sage.weil@dreamhost.com>
Fri, 2 Dec 2011 17:39:17 +0000 (09:39 -0800)
committerSamuel Just <samuel.just@dreamhost.com>
Wed, 14 Dec 2011 19:31:32 +0000 (11:31 -0800)
Instead of generating a huge list of objects to recover, and then pushing
them, iterate over the collection and copy objects as we go.

Disable various bits of backlog code; it will all get ripped out shortly.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f64e8a50cbe5e416377d59e41b5d705b3aa2eb9c..763d5a8c115a0010fb3016f93aa6118ff24dcce8 100644 (file)
@@ -68,6 +68,7 @@
 #include "messages/MOSDPGCreate.h"
 #include "messages/MOSDPGTrim.h"
 #include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
 #include "messages/MOSDPGMissing.h"
 
 #include "messages/MOSDAlive.h"
@@ -2875,6 +2876,9 @@ void OSD::_dispatch(Message *m)
       case MSG_OSD_PG_SCAN:
        handle_pg_scan((MOSDPGScan*)m);
        break;
+      case MSG_OSD_PG_BACKFILL:
+       handle_pg_backfill((MOSDPGBackfill*)m);
+       break;
 
        // client ops
       case CEPH_MSG_OSD_OP:
@@ -4548,6 +4552,46 @@ bool OSD::scan_is_queueable(PG *pg, MOSDPGScan *m)
   return true;
 }
 
+void OSD::handle_pg_backfill(MOSDPGBackfill *m)
+{
+  dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl;
+  
+  if (!require_osd_peer(m))
+    return;
+  if (!require_same_or_newer_map(m, m->query_epoch))
+    return;
+
+  PG *pg;
+  
+  if (!_have_pg(m->pgid)) {
+    m->put();
+    return;
+  }
+
+  pg = _lookup_lock_pg(m->pgid);
+  assert(pg);
+
+  pg->get();
+  enqueue_op(pg, m);
+  pg->unlock();
+  pg->put();
+}
+
+bool OSD::backfill_is_queueable(PG *pg, MOSDPGBackfill *m)
+{
+  assert(pg->is_locked());
+
+  if (m->query_epoch < pg->info.history.same_interval_since) {
+    dout(10) << *pg << " got old backfill, ignoring" << dendl;
+    m->put();
+    return false;
+  }
+
+  return true;
+}
+
+
+
 void OSD::handle_pg_missing(MOSDPGMissing *m)
 {
   assert(0); // MOSDPGMissing is fantastical
@@ -4946,7 +4990,8 @@ void OSD::generate_backlog(PG *pg)
     ObjectStore::Transaction *t = new ObjectStore::Transaction;
     C_Contexts *fin = new C_Contexts(g_ceph_context);
     PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
-    pg->handle_backlog_generated(&rctx);
+    //pg->handle_backlog_generated(&rctx);
+#warning dead code
     do_queries(query_map);
     do_infos(info_map);
     tr = store->queue_transaction(&pg->osr, t,
@@ -5482,6 +5527,11 @@ void OSD::enqueue_op(PG *pg, Message *op)
       return;
     break;
 
+  case MSG_OSD_PG_BACKFILL:
+    if (!backfill_is_queueable(pg, (MOSDPGBackfill*)op))
+      return;
+    break;
+
   default:
     assert(0 == "enqueued an illegal message type");
   }
@@ -5592,6 +5642,10 @@ void OSD::dequeue_op(PG *pg)
     pg->do_scan((MOSDPGScan*)op);
     break;
 
+  case MSG_OSD_PG_BACKFILL:
+    pg->do_backfill((MOSDPGBackfill*)op);
+    break;
+
   default:
     assert(0 == "bad message type in dequeue_op");
   }
index 961ca27caa3f7dda266f067734d9e4eaac43b67b..6a73c818404828dfada64d161d10cc761da16acf 100644 (file)
@@ -589,6 +589,9 @@ protected:
   void handle_pg_scan(class MOSDPGScan *m);
   bool scan_is_queueable(PG *pg, MOSDPGScan *m);
 
+  void handle_pg_backfill(class MOSDPGBackfill *m);
+  bool backfill_is_queueable(PG *pg, MOSDPGBackfill *m);
+
   void handle_pg_remove(class MOSDPGRemove *m);
   void queue_pg_for_deletion(PG *pg);
   void _remove_pg(PG *pg);
index 0e8b100bf075fd96bbdd4e4ea20781a80a7a72eb..4b74d1b29d636fbc148a4184156bec59072e551b 100644 (file)
@@ -3666,8 +3666,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
 
 void PG::proc_primary_info(ObjectStore::Transaction &t, const Info &oinfo)
 {
-  assert(is_replica());
-  assert(is_active());
+  assert(!is_primary());
+  assert(is_stray() || is_active());
   info.stats = oinfo.stats;
 
   osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
@@ -4093,7 +4093,8 @@ void PG::RecoveryState::Peering::exit() {
 }
 
 /*---------Active---------*/
-PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) {
+PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx)
+{
   state_name = "Started/Primary/Active";
   context< RecoveryMachine >().log_enter(state_name);
 
@@ -4108,8 +4109,8 @@ PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) {
   dout(10) << "Activate Finished" << dendl;
 }
 
-boost::statechart::result 
-PG::RecoveryState::Active::react(const AdvMap& advmap) {
+boost::statechart::result PG::RecoveryState::Active::react(const AdvMap& advmap)
+{
   PG *pg = context< RecoveryMachine >().pg;
   dout(10) << "Active advmap" << dendl;
   if (!pg->pool->newly_removed_snaps.empty()) {
@@ -4120,8 +4121,8 @@ PG::RecoveryState::Active::react(const AdvMap& advmap) {
   return forward_event();
 }
     
-boost::statechart::result 
-PG::RecoveryState::Active::react(const ActMap&) {
+boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
+{
   PG *pg = context< RecoveryMachine >().pg;
   dout(10) << "Active: handling ActMap" << dendl;
   assert(pg->is_active());
@@ -4157,8 +4158,8 @@ PG::RecoveryState::Active::react(const ActMap&) {
   return forward_event();
 }
 
-boost::statechart::result 
-PG::RecoveryState::Active::react(const MNotifyRec& notevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MNotifyRec& notevt)
+{
   PG *pg = context< RecoveryMachine >().pg;
   assert(pg->is_active());
   assert(pg->is_primary());
@@ -4178,8 +4179,8 @@ PG::RecoveryState::Active::react(const MNotifyRec& notevt) {
   return discard_event();
 }
 
-boost::statechart::result 
-PG::RecoveryState::Active::react(const MInfoRec& infoevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MInfoRec& infoevt)
+{
   PG *pg = context< RecoveryMachine >().pg;
   assert(pg->is_active());
   assert(pg->is_primary());
@@ -4203,8 +4204,8 @@ PG::RecoveryState::Active::react(const MInfoRec& infoevt) {
   return discard_event();
 }
 
-boost::statechart::result
-PG::RecoveryState::Active::react(const MLogRec& logevt) {
+boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt)
+{
   dout(10) << "searching osd." << logevt.from
            << " log for unfound items" << dendl;
   PG *pg = context< RecoveryMachine >().pg;
@@ -4215,7 +4216,22 @@ PG::RecoveryState::Active::react(const MLogRec& logevt) {
   return discard_event();
 }
 
-void PG::RecoveryState::Active::exit() {
+boost::statechart::result PG::RecoveryState::Active::react(const BackfillComplete& evt)
+{
+  PG *pg = context< RecoveryMachine >().pg;
+
+  int newest_update_osd;
+  if (!pg->choose_acting(newest_update_osd, pg->backfill)) {
+    post_event(NeedNewMap());
+  } else {
+    assert(0 == "we shouldn't get here");
+  }
+
+  return discard_event();
+}
+
+void PG::RecoveryState::Active::exit()
+ {
   context< RecoveryMachine >().log_exit(state_name, enter_time);
 }
 
@@ -4294,14 +4310,13 @@ boost::statechart::result
 PG::RecoveryState::Stray::react(const MLogRec& logevt) {
   PG *pg = context< RecoveryMachine >().pg;
   MOSDPGLog *msg = logevt.msg;
-  dout(10) << "received log from " << logevt.from << dendl;
+  dout(10) << "got log from osd." << logevt.from << dendl;
   pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
                msg->info, msg->log, logevt.from);
 
   assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
   assert(pg->log.head == pg->info.last_update);
 
-  dout(10) << "activating!" << dendl;
   post_event(Activate());
   return discard_event();
 }
@@ -4309,12 +4324,22 @@ PG::RecoveryState::Stray::react(const MLogRec& logevt) {
 boost::statechart::result PG::RecoveryState::Stray::react(const MInfoRec& infoevt)
 {
   PG *pg = context< RecoveryMachine >().pg;
-  dout(10) << "received info from " << infoevt.from << dendl;
-  assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
-  assert(pg->log.head == pg->info.last_update);
+  dout(10) << "got info from osd." << infoevt.from << dendl;
 
-  dout(10) << "activating!" << dendl;
-  post_event(Activate());
+  if (pg->is_replica()) {
+    assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+    assert(pg->log.head == pg->info.last_update);
+    post_event(Activate());
+  } else {
+    // pg creation for backfill
+    dout(10) << "updating info to " << infoevt.info << dendl;
+    pg->info = infoevt.info;
+
+    ObjectStore::Transaction *t = new ObjectStore::Transaction;
+    pg->write_info(*t);
+    int tr = pg->osd->store->queue_transaction(&pg->osr, t);
+    assert(tr == 0);
+  }
   return discard_event();
 }
 
@@ -4742,11 +4767,11 @@ void PG::RecoveryState::handle_activate_map(RecoveryCtx *rctx)
   end_handle();
 }
 
-void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx)
+void PG::RecoveryState::handle_backfill_complete(RecoveryCtx *rctx)
 {
-  dout(10) << "handle_backlog_generated" << dendl;
+  dout(10) << "handle_backfill_complete" << dendl;
   start_handle(rctx);
-  //machine.process_event(BacklogComplete());
+  machine.process_event(BackfillComplete());
   end_handle();
 }
 
index 5a65053fe0e055277b937363294092779965ebe3..0ebf0a6d92d9d34784c78711920430c52db04dd6 100644 (file)
@@ -56,6 +56,7 @@ class MOSDOp;
 class MOSDSubOp;
 class MOSDSubOpReply;
 class MOSDPGScan;
+class MOSDPGBackfill;
 class MOSDPGInfo;
 class MOSDPGLog;
 
@@ -175,7 +176,7 @@ public:
     eversion_t log_tail;     // oldest log entry.
     bool       log_backlog;    // do we store a complete log?
 
-    interval_set<__u32> incomplete;  // incomplete hash ranges prior to last_complete
+    interval_set<uint64_t> incomplete;  // incomplete hash ranges prior to last_complete
 
     interval_set<snapid_t> purged_snaps;
 
@@ -997,6 +998,9 @@ public:
        osdmap(osdmap), lastmap(lastmap), newup(newup), newacting(newacting) {}
     };
 
+    struct BackfillComplete : boost::statechart::event< BackfillComplete > {
+      BackfillComplete() : boost::statechart::event< BackfillComplete >() {}
+    };
     struct ActMap : boost::statechart::event< ActMap > {
       ActMap() : boost::statechart::event< ActMap >() {}
     };
@@ -1205,13 +1209,15 @@ public:
        boost::statechart::custom_reaction< AdvMap >,
        boost::statechart::custom_reaction< MInfoRec >,
        boost::statechart::custom_reaction< MNotifyRec >,
-       boost::statechart::custom_reaction< MLogRec >
+       boost::statechart::custom_reaction< MLogRec >,
+       boost::statechart::custom_reaction< BackfillComplete >
        > reactions;
       boost::statechart::result react(const ActMap&);
       boost::statechart::result react(const AdvMap&);
       boost::statechart::result react(const MInfoRec& infoevt);
       boost::statechart::result react(const MNotifyRec& notevt);
       boost::statechart::result react(const MLogRec& logevt);
+      boost::statechart::result react(const BackfillComplete&);
     };
 
     struct ReplicaActive : boost::statechart::state< ReplicaActive, Started >, NamedState {
@@ -1336,7 +1342,7 @@ public:
                            vector<int>& newup, vector<int>& newacting, 
                            RecoveryCtx *ctx);
     void handle_activate_map(RecoveryCtx *ctx);
-    void handle_backlog_generated(RecoveryCtx *ctx);
+    void handle_backfill_complete(RecoveryCtx *ctx);
     void handle_create(RecoveryCtx *ctx);
     void handle_loaded(RecoveryCtx *ctx);
   } recovery_state;
@@ -1361,14 +1367,15 @@ protected:
     // info about a backfill interval on a peer
     map<hobject_t,eversion_t> objects;
     hobject_t begin, end;
-
+    
+    /// true if there are no objects in this interval
     bool empty() {
       return objects.empty();
     }
 
-    /// true if interval starts at end of range
+    /// true if interval extends to the end of the range
     bool at_end() {
-      return begin == hobject_t::get_max();
+      return end == hobject_t::get_max();
     }
 
     /// drop first entry, and adjust @begin accordingly
@@ -1381,6 +1388,9 @@ protected:
        begin = objects.begin()->first;
     }
   };
+  
+  BackfillInterval backfill_info;
+  map<int,BackfillInterval> peer_backfill_info;
 
   epoch_t last_peering_reset;
 
@@ -1700,8 +1710,8 @@ public:
   void handle_activate_map(RecoveryCtx *rctx) {
     recovery_state.handle_activate_map(rctx);
   }
-  void handle_backlog_generated(RecoveryCtx *rctx) {
-    recovery_state.handle_backlog_generated(rctx);
+  void handle_backfill_complete(RecoveryCtx *rctx) {
+    recovery_state.handle_backfill_complete(rctx);
   }
   void handle_create(RecoveryCtx *rctx) {
     recovery_state.handle_create(rctx);
@@ -1716,6 +1726,7 @@ public:
   virtual void do_sub_op(MOSDSubOp *op) = 0;
   virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
   virtual void do_scan(MOSDPGScan *op) = 0;
+  virtual void do_backfill(MOSDPGBackfill *op) = 0;
   virtual bool snap_trimmer() = 0;
 
   virtual bool same_for_read_since(epoch_t e) = 0;
@@ -1769,7 +1780,7 @@ inline ostream& operator<<(ostream& out, const PG::Info& pgi)
     out << " (" << pgi.log_tail << "," << pgi.last_update << "]"
         << (pgi.log_backlog ? "+backlog":"");
     if (!pgi.incomplete.empty())
-      out << " incomp " << pgi.incomplete;
+      out << " incomp " << std::hex << pgi.incomplete << std::dec;
   }
   //out << " c " << pgi.epoch_created;
   out << " n=" << pgi.stats.stats.sum.num_objects;
index b66c22432094fea6e081ae842f9b581f78232a12..3aae3839a8e580800db7277fad4bfe88b048551d 100644 (file)
@@ -29,6 +29,7 @@
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGTrim.h"
 #include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
 
 #include "messages/MOSDPing.h"
 #include "messages/MWatchNotify.h"
@@ -844,6 +845,7 @@ void ReplicatedPG::do_scan(MOSDPGScan *m)
   case MOSDPGScan::OP_SCAN_GET_DIGEST:
     {
       BackfillInterval bi;
+      osr.flush();
       scan_range(m->begin, 100, 200, &bi);
       MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
                                         get_osdmap()->get_epoch(), m->query_epoch,
@@ -866,7 +868,57 @@ void ReplicatedPG::do_scan(MOSDPGScan *m)
     }
     break;
   }
-  
+
+  m->put();
+}
+
+void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
+{
+  dout(10) << "do_backfill " << *m << dendl;
+
+  switch (m->op) {
+  case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
+    {
+      assert(get_role() < 0);
+
+      info.incomplete = m->incomplete;
+
+      ObjectStore::Transaction *t = new ObjectStore::Transaction;
+      write_info(*t);
+      int tr = osd->store->queue_transaction(&osr, t);
+      assert(tr == 0);
+    }
+    break;
+
+  case MOSDPGBackfill::OP_BACKFILL_FINISH:
+    {
+      assert(get_role() < 0);
+      info.last_complete = info.last_update;
+      info.incomplete.clear();
+      
+      ObjectStore::Transaction *t = new ObjectStore::Transaction;
+      log.clear();
+      log.head = info.last_update;
+      log.tail = info.last_update;
+      write_log(*t);
+      write_info(*t);
+      int tr = osd->store->queue_transaction(&osr, t);
+      assert(tr == 0);
+
+      MOSDPGBackfill *reply = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
+                                                get_osdmap()->get_epoch(), m->query_epoch, info.pgid);
+      osd->cluster_messenger->send_message(reply, m->get_connection());
+    }
+    break;
+
+  case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
+    {
+      assert(is_primary());
+      finish_recovery_op(hobject_t::get_max());
+    }
+    break;
+  }
+
   m->put();
 }
 
@@ -3812,6 +3864,21 @@ void ReplicatedPG::send_pull_op(const hobject_t& soid, eversion_t v, bool first,
   osd->logger->inc(l_osd_pull);
 }
 
+void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
+{
+  tid_t tid = osd->get_tid();
+  osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+
+  dout(10) << "send_remove_op " << oid << " from osd." << peer
+          << " tid " << tid << dendl;
+  
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, oid, false, CEPH_OSD_FLAG_ACK,
+                                  get_osdmap()->get_epoch(), tid, v);
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_DELETE;
+
+  osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
+}
 
 /*
  * intelligently push an object to a replica.  make use of existing
@@ -4035,7 +4102,8 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
                   pi->data_subset_pushing, pi->clone_subsets);
     } else {
       // done!
-      peer_missing[peer].got(soid, pi->version);
+      if (pi->version > peer_info[peer].log_tail)
+       peer_missing[peer].got(soid, pi->version);
       
       pushing[soid].erase(peer);
       pi = NULL;
@@ -4893,14 +4961,24 @@ int ReplicatedPG::start_recovery_ops(int max)
     // second chance to recovery replicas
     started = recover_replicas(max);
   }
+  if (!backfill.empty() && started < max) {
+    started += recover_backfill(max - started);
+  }
 
   dout(10) << " started " << started << dendl;
-
   osd->logger->inc(l_osd_rop, started);
 
-  if (started)
+  if (started || recovery_ops_active > 0)
     return started;
 
+  assert(recovery_ops_active == 0);
+
+  if (backfill.size()) {
+    PG::RecoveryCtx rctx(0, 0, 0, 0, 0);
+    handle_backfill_complete(&rctx);
+    return 0;
+  }
+
   if (is_all_uptodate()) {
     dout(10) << __func__ << ": all OSDs in the PG are up-to-date!" << dendl;
     log.reset_recovery_pointers();
@@ -5186,15 +5264,173 @@ int ReplicatedPG::recover_replicas(int max)
   return started;
 }
 
+int ReplicatedPG::recover_backfill(int max)
+{
+  dout(10) << "recover_backfill (" << max << ")" << dendl;
+  assert(!backfill.empty());
+  
+  // initially just backfill one peer at a time.  FIXME.
+  int peer = *backfill.begin();
+  Info& pinfo = peer_info[peer];
+  BackfillInterval& pbi = peer_backfill_info[peer];
+
+  dout(10) << " peer osd." << peer << " " << pinfo
+          << " interval " << pbi.begin << "-" << pbi.end << " " << pbi.objects.size() << " objects" << dendl;
+
+  // does the pg exist yet on the peer?
+  if (pinfo.dne()) {
+    // ok, we know they have no objects.
+    pbi.end = hobject_t::get_max();
+
+    // fill in pinfo
+    pinfo.last_update = info.last_update;
+    pinfo.log_tail = info.last_update;
+    pinfo.incomplete.insert(0, 0x100000000ull);
+    pinfo.history = info.history;
+    dout(10) << " peer osd." << peer << " pg dne; setting info to " << pinfo << dendl;
+
+    // create pg on remote
+    MOSDPGInfo *mp = new MOSDPGInfo(get_osdmap()->get_epoch());
+    mp->pg_info.push_back(pinfo);
+    osd->cluster_messenger->send_message(mp, get_osdmap()->get_cluster_inst(peer));
+  }
+
+  int ops = 0;
+  while (ops < max) {
+    if (!backfill_info.at_end() && (backfill_info.end <= pbi.begin ||
+                                   backfill_info.empty())) {
+      osr.flush();
+      scan_range(backfill_info.end, 10, 20, &backfill_info);
+    }
+
+    dout(20) << "   my backfill " << backfill_info.begin << "-" << backfill_info.end
+            << " " << backfill_info.objects << dendl;
+    dout(20) << " peer backfill " << pbi.begin << "-" << pbi.end << " " << pbi.objects << dendl;
+
+    if (!pbi.at_end() && (pbi.end <= backfill_info.begin ||
+                         pbi.empty())) {
+      epoch_t e = get_osdmap()->get_epoch();
+      MOSDPGScan *m = new MOSDPGScan(MOSDPGScan::OP_SCAN_GET_DIGEST, e, e, info.pgid,
+                                    pbi.end, hobject_t());
+      osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+      start_recovery_op(pbi.end);
+      ops++;
+      break;
+    }
+    
+    if (backfill_info.empty()) {
+      // this only happens when we reach the end of the collection.
+      assert(backfill_info.at_end());
+      if (pbi.empty()) {
+       assert(pbi.at_end());
+       dout(10) << " reached end for both local and peer" << dendl;
+       if (pbi.begin != hobject_t::get_max()) {
+         pbi.begin = hobject_t::get_max();
+
+         pinfo.incomplete.clear();
+
+         epoch_t e = get_osdmap()->get_epoch();
+         MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_FINISH, e, e, info.pgid);
+         osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+         start_recovery_op(hobject_t::get_max());
+         ops++;
+       }
+       return ops;
+      }
+
+      // remove peer objects < backfill_info.end
+      const hobject_t& pf = pbi.objects.begin()->first;
+      eversion_t pv = pbi.objects.begin()->second;
+      assert(pf < backfill_info.end);
+      
+      dout(20) << " removing peer " << pf << " <= local end " << backfill_info.end << dendl;
+      send_remove_op(pf, pv, peer);
+      pbi.pop_front();
+      continue;
+    }
+
+    const hobject_t& my_first = backfill_info.objects.begin()->first;
+    eversion_t mv = backfill_info.objects.begin()->second;
+
+    if (pbi.empty()) {
+      assert(pbi.at_end());
+      dout(20) << " pushing local " << my_first << " " << backfill_info.objects.begin()->second
+              << " to peer osd." << peer << dendl;
+      push_backfill_object(my_first, mv, peer);
+      backfill_info.pop_front();
+      pbi.begin = my_first;
+      ++ops;
+      continue;
+    }
+
+    const hobject_t& peer_first = pbi.objects.begin()->first;
+    eversion_t pv = pbi.objects.begin()->second;
+
+    if (peer_first < my_first) {
+      dout(20) << " removing peer " << peer_first << " <= local " << my_first << dendl;
+      send_remove_op(peer_first, pv, peer);
+      pbi.pop_front();
+    } else if (peer_first == my_first) {
+      if (pv == mv) {
+       dout(20) << " keeping peer " << peer_first << " " << pv << dendl;
+      } else {
+       dout(20) << " replacing peer " << peer_first << " with local " << mv << dendl;
+       push_backfill_object(my_first, mv, peer);
+       ++ops;
+      }
+      pbi.pop_front();
+      backfill_info.pop_front();
+    } else {
+      // peer_first > my_first
+      dout(20) << " pushing local " << my_first << " " << mv
+              << " to peer osd." << peer << dendl;
+      push_backfill_object(my_first, mv, peer);
+      backfill_info.pop_front();
+      ++ops;
+    }
+  }
+
+  if (!pinfo.incomplete.empty()) {
+    hobject_t b;
+    b.set_filestore_key(pinfo.incomplete.range_start());
+    dout(20) << " b " << b << " pbi.begin " << pbi.begin << " " << pinfo << dendl;
+    if (b < pbi.begin) {
+      pinfo.incomplete.erase(b.get_filestore_key(), pbi.begin.get_filestore_key() - b.get_filestore_key());
+      dout(10) << " peer osd." << peer << " info.incomplete now "
+              << std::hex << pinfo.incomplete << std::dec << dendl;
+      
+      epoch_t e = get_osdmap()->get_epoch();
+      MOSDPGBackfill *m = new MOSDPGBackfill(MOSDPGBackfill::OP_BACKFILL_PROGRESS, e, e, info.pgid);
+      m->incomplete = pinfo.incomplete;
+      osd->cluster_messenger->send_message(m, get_osdmap()->get_cluster_inst(peer));
+    }
+  }
+  return ops;
+}
+
+void ReplicatedPG::push_backfill_object(hobject_t oid, eversion_t v, int peer)
+{
+  dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
+  start_recovery_op(oid);
+  ObjectContext *obc = get_object_context(oid, OLOC_BLANK, false);
+  obc->ondisk_read_lock();
+  push_to_replica(obc, oid, peer);
+  obc->ondisk_read_unlock();
+  put_object_context(obc);
+}
+
 void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
 {
   assert(is_locked());
   dout(10) << "scan_range from " << begin << dendl;
   bi->begin = begin;
 
-  vector<hobject_t> ls(max);
+  vector<hobject_t> ls;
+  ls.reserve(max);
   int r = osd->store->collection_list_partial(coll, begin, min, max, &ls, &bi->end);
   assert(r >= 0);
+  dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;
+  dout(20) << ls << dendl;
 
   for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
     ObjectContext *obc = NULL;
index 5d2af6aed3aae62ccb4b0e3ce54acf373759a219..49fcc3174db41651858b92abe1960be02438bb2d 100644 (file)
@@ -611,6 +611,7 @@ protected:
   int start_recovery_ops(int max);
   int recover_primary(int max);
   int recover_replicas(int max);
+  int recover_backfill(int max);
 
   /**
    * scan a (hash) range of objects in the current pg
@@ -622,6 +623,10 @@ protected:
    */
   void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
 
+  void push_backfill_object(hobject_t oid, eversion_t v, int peer);
+  void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
+
+
   void dump_watchers(ObjectContext *obc);
   void remove_watcher(ObjectContext *obc, entity_name_t entity);
   void remove_notify(ObjectContext *obc, Watch::Notification *notif);
@@ -741,6 +746,7 @@ public:
   void do_sub_op(MOSDSubOp *op);
   void do_sub_op_reply(MOSDSubOpReply *op);
   void do_scan(MOSDPGScan *op);
+  void do_backfill(MOSDPGBackfill *op);
   bool get_obs_to_trim(snapid_t &snap_to_trim,
                       coll_t &col_to_trim,
                       vector<hobject_t> &obs_to_trim);