]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: Move ReplicatedBackend methods into ReplicatedBackend.cc 4196/head
authorSamuel Just <sjust@redhat.com>
Thu, 26 Mar 2015 17:50:19 +0000 (10:50 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 26 Mar 2015 17:50:19 +0000 (10:50 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedPG.cc

index 680c27a5e8be6091284542d670ad384b4979086a..b86d4d1e7444f1661af0604e0f2804eb02160b0f 100644 (file)
@@ -30,6 +30,35 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
   return *_dout << pgb->get_parent()->gen_dbg_prefix();
 }
 
+static void log_subop_stats(
+  PerfCounters *logger,
+  OpRequestRef op, int subop)
+{
+  utime_t now = ceph_clock_now(g_ceph_context);
+  utime_t latency = now;
+  latency -= op->get_req()->get_recv_stamp();
+
+
+  logger->inc(l_osd_sop);
+  logger->tinc(l_osd_sop_lat, latency);
+  logger->inc(subop);
+
+  if (subop != l_osd_sop_pull) {
+    uint64_t inb = op->get_req()->get_data().length();
+    logger->inc(l_osd_sop_inb, inb);
+    if (subop == l_osd_sop_w) {
+      logger->inc(l_osd_sop_w_inb, inb);
+      logger->tinc(l_osd_sop_w_lat, latency);
+    } else if (subop == l_osd_sop_push) {
+      logger->inc(l_osd_sop_push_inb, inb);
+      logger->tinc(l_osd_sop_push_lat, latency);
+    } else
+      assert("no support subop" == 0);
+  } else {
+    logger->tinc(l_osd_sop_pull_lat, latency);
+  }
+}
+
 ReplicatedBackend::ReplicatedBackend(
   PGBackend::Listener *pg,
   coll_t coll,
@@ -797,3 +826,1597 @@ void ReplicatedBackend::be_deep_scrub(
   o.omap_digest = oh.digest();
   o.omap_digest_present = true;
 }
+
+void ReplicatedBackend::_do_push(OpRequestRef op)
+{
+  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+  assert(m->get_type() == MSG_OSD_PG_PUSH);
+  pg_shard_t from = m->from;
+
+  vector<PushReplyOp> replies;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  for (vector<PushOp>::iterator i = m->pushes.begin();
+       i != m->pushes.end();
+       ++i) {
+    replies.push_back(PushReplyOp());
+    handle_push(from, *i, &(replies.back()), t);
+  }
+
+  MOSDPGPushReply *reply = new MOSDPGPushReply;
+  reply->from = get_parent()->whoami_shard();
+  reply->set_priority(m->get_priority());
+  reply->pgid = get_info().pgid;
+  reply->map_epoch = m->map_epoch;
+  reply->replies.swap(replies);
+  reply->compute_cost(cct);
+
+  t->register_on_complete(
+    new PG_SendMessageOnConn(
+      get_parent(), reply, m->get_connection()));
+
+  t->register_on_applied(
+    new ObjectStore::C_DeleteTransaction(t));
+  get_parent()->queue_transaction(t);
+}
+
+struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+  ReplicatedBackend *bc;
+  list<hobject_t> to_continue;
+  int priority;
+  C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+    : bc(bc), priority(priority) {}
+
+  void finish(ThreadPool::TPHandle &handle) {
+    ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+    for (list<hobject_t>::iterator i =
+          to_continue.begin();
+        i != to_continue.end();
+        ++i) {
+      map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
+       bc->pulling.find(*i);
+      assert(j != bc->pulling.end());
+      if (!bc->start_pushes(*i, j->second.obc, h)) {
+       bc->get_parent()->on_global_recover(
+         *i);
+      }
+      bc->pulling.erase(*i);
+      handle.reset_tp_timeout();
+    }
+    bc->run_recovery_op(h, priority);
+  }
+};
+
+void ReplicatedBackend::_do_pull_response(OpRequestRef op)
+{
+  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
+  assert(m->get_type() == MSG_OSD_PG_PUSH);
+  pg_shard_t from = m->from;
+
+  vector<PullOp> replies(1);
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  list<hobject_t> to_continue;
+  for (vector<PushOp>::iterator i = m->pushes.begin();
+       i != m->pushes.end();
+       ++i) {
+    bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
+    if (more)
+      replies.push_back(PullOp());
+  }
+  if (!to_continue.empty()) {
+    C_ReplicatedBackend_OnPullComplete *c =
+      new C_ReplicatedBackend_OnPullComplete(
+       this,
+       m->get_priority());
+    c->to_continue.swap(to_continue);
+    t->register_on_complete(
+      new PG_RecoveryQueueAsync(
+       get_parent(),
+       get_parent()->bless_gencontext(c)));
+  }
+  replies.erase(replies.end() - 1);
+
+  if (replies.size()) {
+    MOSDPGPull *reply = new MOSDPGPull;
+    reply->from = parent->whoami_shard();
+    reply->set_priority(m->get_priority());
+    reply->pgid = get_info().pgid;
+    reply->map_epoch = m->map_epoch;
+    reply->pulls.swap(replies);
+    reply->compute_cost(cct);
+
+    t->register_on_complete(
+      new PG_SendMessageOnConn(
+       get_parent(), reply, m->get_connection()));
+  }
+
+  t->register_on_applied(
+    new ObjectStore::C_DeleteTransaction(t));
+  get_parent()->queue_transaction(t);
+}
+
+void ReplicatedBackend::do_pull(OpRequestRef op)
+{
+  MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
+  assert(m->get_type() == MSG_OSD_PG_PULL);
+  pg_shard_t from = m->from;
+
+  map<pg_shard_t, vector<PushOp> > replies;
+  for (vector<PullOp>::iterator i = m->pulls.begin();
+       i != m->pulls.end();
+       ++i) {
+    replies[from].push_back(PushOp());
+    handle_pull(from, *i, &(replies[from].back()));
+  }
+  send_pushes(m->get_priority(), replies);
+}
+
+void ReplicatedBackend::do_push_reply(OpRequestRef op)
+{
+  MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
+  assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
+  pg_shard_t from = m->from;
+
+  vector<PushOp> replies(1);
+  for (vector<PushReplyOp>::iterator i = m->replies.begin();
+       i != m->replies.end();
+       ++i) {
+    bool more = handle_push_reply(from, *i, &(replies.back()));
+    if (more)
+      replies.push_back(PushOp());
+  }
+  replies.erase(replies.end() - 1);
+
+  map<pg_shard_t, vector<PushOp> > _replies;
+  _replies[from].swap(replies);
+  send_pushes(m->get_priority(), _replies);
+}
+
+template<typename T, int MSGTYPE>
+Message * ReplicatedBackend::generate_subop(
+  const hobject_t &soid,
+  const eversion_t &at_version,
+  ceph_tid_t tid,
+  osd_reqid_t reqid,
+  eversion_t pg_trim_to,
+  eversion_t pg_trim_rollback_to,
+  hobject_t new_temp_oid,
+  hobject_t discard_temp_oid,
+  const vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_hist,
+  InProgressOp *op,
+  ObjectStore::Transaction *op_t,
+  pg_shard_t peer,
+  const pg_info_t &pinfo)
+{
+  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+  assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
+  // forward the write/update/whatever
+  T *wr = new T(
+    reqid, parent->whoami_shard(),
+    spg_t(get_info().pgid.pgid, peer.shard),
+    soid, acks_wanted,
+    get_osdmap()->get_epoch(),
+    tid, at_version);
+
+  // ship resulting transaction, log entries, and pg_stats
+  if (!parent->should_send_op(peer, soid)) {
+    dout(10) << "issue_repop shipping empty opt to osd." << peer
+            <<", object " << soid
+            << " beyond MAX(last_backfill_started "
+            << ", pinfo.last_backfill "
+            << pinfo.last_backfill << ")" << dendl;
+    ObjectStore::Transaction t;
+    t.set_use_tbl(op_t->get_use_tbl());
+    ::encode(t, wr->get_data());
+  } else {
+    ::encode(*op_t, wr->get_data());
+  }
+
+  ::encode(log_entries, wr->logbl);
+
+  if (pinfo.is_incomplete())
+    wr->pg_stats = pinfo.stats;  // reflects backfill progress
+  else
+    wr->pg_stats = get_info().stats;
+
+  wr->pg_trim_to = pg_trim_to;
+  wr->pg_trim_rollback_to = pg_trim_rollback_to;
+
+  wr->new_temp_oid = new_temp_oid;
+  wr->discard_temp_oid = discard_temp_oid;
+  wr->updated_hit_set_history = hset_hist;
+  return wr;
+}
+
+void ReplicatedBackend::issue_op(
+  const hobject_t &soid,
+  const eversion_t &at_version,
+  ceph_tid_t tid,
+  osd_reqid_t reqid,
+  eversion_t pg_trim_to,
+  eversion_t pg_trim_rollback_to,
+  hobject_t new_temp_oid,
+  hobject_t discard_temp_oid,
+  const vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_hist,
+  InProgressOp *op,
+  ObjectStore::Transaction *op_t)
+{
+
+  if (parent->get_actingbackfill_shards().size() > 1) {
+    ostringstream ss;
+    set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
+    replicas.erase(parent->whoami_shard());
+    ss << "waiting for subops from " << replicas;
+    if (op->op)
+      op->op->mark_sub_op_sent(ss.str());
+  }
+  for (set<pg_shard_t>::const_iterator i =
+        parent->get_actingbackfill_shards().begin();
+       i != parent->get_actingbackfill_shards().end();
+       ++i) {
+    if (*i == parent->whoami_shard()) continue;
+    pg_shard_t peer = *i;
+    const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
+
+    Message *wr;
+    uint64_t min_features = parent->min_peer_features();
+    if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
+      dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
+      wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
+           soid,
+           at_version,
+           tid,
+           reqid,
+           pg_trim_to,
+           pg_trim_rollback_to,
+           new_temp_oid,
+           discard_temp_oid,
+           log_entries,
+           hset_hist,
+           op,
+           op_t,
+           peer,
+           pinfo);
+    } else {
+      wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
+           soid,
+           at_version,
+           tid,
+           reqid,
+           pg_trim_to,
+           pg_trim_rollback_to,
+           new_temp_oid,
+           discard_temp_oid,
+           log_entries,
+           hset_hist,
+           op,
+           op_t,
+           peer,
+           pinfo);
+    }
+
+    get_parent()->send_message_osd_cluster(
+      peer.osd, wr, get_osdmap()->get_epoch());
+  }
+}
+
+// sub op modify
+void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
+  Message *m = op->get_req();
+  int msg_type = m->get_type();
+  if (msg_type == MSG_OSD_SUBOP) {
+    sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  } else if (msg_type == MSG_OSD_REPOP) {
+    sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
+  } else {
+    assert(0);
+  }
+}
+
+template<typename T, int MSGTYPE>
+void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
+{
+  T *m = static_cast<T *>(op->get_req());
+  int msg_type = m->get_type();
+  assert(MSGTYPE == msg_type);
+  assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
+
+  const hobject_t& soid = m->poid;
+
+  dout(10) << "sub_op_modify trans"
+           << " " << soid
+           << " v " << m->version
+          << (m->logbl.length() ? " (transaction)" : " (parallel exec")
+          << " " << m->logbl.length()
+          << dendl;
+
+  // sanity checks
+  assert(m->map_epoch >= get_info().history.same_interval_since);
+
+  // we better not be missing this.
+  assert(!parent->get_log().get_missing().is_missing(soid));
+
+  int ackerosd = m->get_source().num();
+
+  op->mark_started();
+
+  RepModifyRef rm(new RepModify);
+  rm->op = op;
+  rm->ackerosd = ackerosd;
+  rm->last_complete = get_info().last_complete;
+  rm->epoch_started = get_osdmap()->get_epoch();
+
+  assert(m->logbl.length());
+  // shipped transaction and log entries
+  vector<pg_log_entry_t> log;
+
+  bufferlist::iterator p = m->get_data().begin();
+  ::decode(rm->opt, p);
+  rm->localt.set_use_tbl(rm->opt.get_use_tbl());
+
+  if (m->new_temp_oid != hobject_t()) {
+    dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
+    add_temp_obj(m->new_temp_oid);
+    get_temp_coll(&rm->localt);
+  }
+  if (m->discard_temp_oid != hobject_t()) {
+    dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
+    if (rm->opt.empty()) {
+      dout(10) << __func__ << ": removing object " << m->discard_temp_oid
+              << " since we won't get the transaction" << dendl;
+      rm->localt.remove(temp_coll, m->discard_temp_oid);
+    }
+    clear_temp_obj(m->discard_temp_oid);
+  }
+
+  p = m->logbl.begin();
+  ::decode(log, p);
+  rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
+  bool update_snaps = false;
+  if (!rm->opt.empty()) {
+    // If the opt is non-empty, we infer we are before
+    // last_backfill (according to the primary, not our
+    // not-quite-accurate value), and should update the
+    // collections now.  Otherwise, we do it later on push.
+    update_snaps = true;
+  }
+  parent->update_stats(m->pg_stats);
+  parent->log_operation(
+    log,
+    m->updated_hit_set_history,
+    m->pg_trim_to,
+    m->pg_trim_rollback_to,
+    update_snaps,
+    &(rm->localt));
+
+  rm->bytes_written = rm->opt.get_encoded_bytes();
+
+  op->mark_started();
+
+  rm->localt.append(rm->opt);
+  rm->localt.register_on_commit(
+    parent->bless_context(
+      new C_OSD_RepModifyCommit(this, rm)));
+  rm->localt.register_on_applied(
+    parent->bless_context(
+      new C_OSD_RepModifyApply(this, rm)));
+  parent->queue_transaction(&(rm->localt), op);
+  // op is cleaned up by oncommit/onapply when both are executed
+}
+
+void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
+{
+  rm->op->mark_event("sub_op_applied");
+  rm->applied = true;
+
+  dout(10) << "sub_op_modify_applied on " << rm << " op "
+          << *rm->op->get_req() << dendl;
+  Message *m = rm->op->get_req();
+
+  Message *ack = NULL;
+  eversion_t version;
+
+  if (m->get_type() == MSG_OSD_SUBOP) {
+    // doesn't have CLIENT SUBOP feature ,use Subop
+    MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
+    version = req->version;
+    if (!rm->committed)
+      ack = new MOSDSubOpReply(
+       req, parent->whoami_shard(),
+       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  } else if (m->get_type() == MSG_OSD_REPOP) {
+    MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
+    version = req->version;
+    if (!rm->committed)
+      ack = new MOSDRepOpReply(
+       static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
+       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  } else {
+    assert(0);
+  }
+
+  // send ack to acker only if we haven't sent a commit already
+  if (ack) {
+    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+    get_parent()->send_message_osd_cluster(
+      rm->ackerosd, ack, get_osdmap()->get_epoch());
+  }
+
+  parent->op_applied(version);
+}
+
+void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
+{
+  rm->op->mark_commit_sent();
+  rm->committed = true;
+
+  // send commit.
+  dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
+          << ", sending commit to osd." << rm->ackerosd
+          << dendl;
+
+  assert(get_osdmap()->is_up(rm->ackerosd));
+  get_parent()->update_last_complete_ondisk(rm->last_complete);
+
+  Message *m = rm->op->get_req();
+  Message *commit;
+  if (m->get_type() == MSG_OSD_SUBOP) {
+    // doesn't have CLIENT SUBOP feature ,use Subop
+    MOSDSubOpReply  *reply = new MOSDSubOpReply(
+      static_cast<MOSDSubOp*>(m),
+      get_parent()->whoami_shard(),
+      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+    reply->set_last_complete_ondisk(rm->last_complete);
+    commit = reply;
+  } else if (m->get_type() == MSG_OSD_REPOP) {
+    MOSDRepOpReply *reply = new MOSDRepOpReply(
+      static_cast<MOSDRepOp*>(m),
+      get_parent()->whoami_shard(),
+      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+    reply->set_last_complete_ondisk(rm->last_complete);
+    commit = reply;
+  }
+  else {
+    assert(0);
+  }
+
+  commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+  get_parent()->send_message_osd_cluster(
+    rm->ackerosd, commit, get_osdmap()->get_epoch());
+
+  log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
+}
+
+
+// ===========================================================
+
+void ReplicatedBackend::calc_head_subsets(
+  ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+  const pg_missing_t& missing,
+  const hobject_t &last_backfill,
+  interval_set<uint64_t>& data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+  dout(10) << "calc_head_subsets " << head
+          << " clone_overlap " << snapset.clone_overlap << dendl;
+
+  uint64_t size = obc->obs.oi.size;
+  if (size)
+    data_subset.insert(0, size);
+
+  if (get_parent()->get_pool().allow_incomplete_clones()) {
+    dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+    return;
+  }
+
+  if (!cct->_conf->osd_recover_clone_overlap) {
+    dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
+    return;
+  }
+
+
+  interval_set<uint64_t> cloning;
+  interval_set<uint64_t> prev;
+  if (size)
+    prev.insert(0, size);
+
+  for (int j=snapset.clones.size()-1; j>=0; j--) {
+    hobject_t c = head;
+    c.snap = snapset.clones[j];
+    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+    if (!missing.is_missing(c) && c < last_backfill) {
+      dout(10) << "calc_head_subsets " << head << " has prev " << c
+              << " overlap " << prev << dendl;
+      clone_subsets[c] = prev;
+      cloning.union_of(prev);
+      break;
+    }
+    dout(10) << "calc_head_subsets " << head << " does not have prev " << c
+            << " overlap " << prev << dendl;
+  }
+
+
+  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+    dout(10) << "skipping clone, too many holes" << dendl;
+    clone_subsets.clear();
+    cloning.clear();
+  }
+
+  // what's left for us to push?
+  data_subset.subtract(cloning);
+
+  dout(10) << "calc_head_subsets " << head
+          << "  data_subset " << data_subset
+          << "  clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::calc_clone_subsets(
+  SnapSet& snapset, const hobject_t& soid,
+  const pg_missing_t& missing,
+  const hobject_t &last_backfill,
+  interval_set<uint64_t>& data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+{
+  dout(10) << "calc_clone_subsets " << soid
+          << " clone_overlap " << snapset.clone_overlap << dendl;
+
+  uint64_t size = snapset.clone_size[soid.snap];
+  if (size)
+    data_subset.insert(0, size);
+
+  if (get_parent()->get_pool().allow_incomplete_clones()) {
+    dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
+    return;
+  }
+
+  if (!cct->_conf->osd_recover_clone_overlap) {
+    dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
+    return;
+  }
+
+  unsigned i;
+  for (i=0; i < snapset.clones.size(); i++)
+    if (snapset.clones[i] == soid.snap)
+      break;
+
+  // any overlap with next older clone?
+  interval_set<uint64_t> cloning;
+  interval_set<uint64_t> prev;
+  if (size)
+    prev.insert(0, size);
+  for (int j=i-1; j>=0; j--) {
+    hobject_t c = soid;
+    c.snap = snapset.clones[j];
+    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
+    if (!missing.is_missing(c) && c < last_backfill) {
+      dout(10) << "calc_clone_subsets " << soid << " has prev " << c
+              << " overlap " << prev << dendl;
+      clone_subsets[c] = prev;
+      cloning.union_of(prev);
+      break;
+    }
+    dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
+            << " overlap " << prev << dendl;
+  }
+
+  // overlap with next newest?
+  interval_set<uint64_t> next;
+  if (size)
+    next.insert(0, size);
+  for (unsigned j=i+1; j<snapset.clones.size(); j++) {
+    hobject_t c = soid;
+    c.snap = snapset.clones[j];
+    next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
+    if (!missing.is_missing(c) && c < last_backfill) {
+      dout(10) << "calc_clone_subsets " << soid << " has next " << c
+              << " overlap " << next << dendl;
+      clone_subsets[c] = next;
+      cloning.union_of(next);
+      break;
+    }
+    dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
+            << " overlap " << next << dendl;
+  }
+
+  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+    dout(10) << "skipping clone, too many holes" << dendl;
+    clone_subsets.clear();
+    cloning.clear();
+  }
+
+
+  // what's left for us to push?
+  data_subset.subtract(cloning);
+
+  dout(10) << "calc_clone_subsets " << soid
+          << "  data_subset " << data_subset
+          << "  clone_subsets " << clone_subsets << dendl;
+}
+
+void ReplicatedBackend::prepare_pull(
+  eversion_t v,
+  const hobject_t& soid,
+  ObjectContextRef headctx,
+  RPGHandle *h)
+{
+  assert(get_parent()->get_local_missing().missing.count(soid));
+  eversion_t _v = get_parent()->get_local_missing().missing.find(
+    soid)->second.need;
+  assert(_v == v);
+  const map<hobject_t, set<pg_shard_t> > &missing_loc(
+    get_parent()->get_missing_loc_shards());
+  const map<pg_shard_t, pg_missing_t > &peer_missing(
+    get_parent()->get_shard_missing());
+  map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid);
+  assert(q != missing_loc.end());
+  assert(!q->second.empty());
+
+  // pick a pullee
+  vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
+  random_shuffle(shuffle.begin(), shuffle.end());
+  vector<pg_shard_t>::iterator p = shuffle.begin();
+  assert(get_osdmap()->is_up(p->osd));
+  pg_shard_t fromshard = *p;
+
+  dout(7) << "pull " << soid
+         << " v " << v
+         << " on osds " << *p
+         << " from osd." << fromshard
+         << dendl;
+
+  assert(peer_missing.count(fromshard));
+  const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
+  if (pmissing.is_missing(soid, v)) {
+    assert(pmissing.missing.find(soid)->second.have != v);
+    dout(10) << "pulling soid " << soid << " from osd " << fromshard
+            << " at version " << pmissing.missing.find(soid)->second.have
+            << " rather than at version " << v << dendl;
+    v = pmissing.missing.find(soid)->second.have;
+    assert(get_parent()->get_log().get_log().objects.count(soid) &&
+          (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
+           pg_log_entry_t::LOST_REVERT) &&
+          (get_parent()->get_log().get_log().objects.find(
+            soid)->second->reverting_to ==
+           v));
+  }
+
+  ObjectRecoveryInfo recovery_info;
+
+  if (soid.is_snap()) {
+    assert(!get_parent()->get_local_missing().is_missing(
+            soid.get_head()) ||
+          !get_parent()->get_local_missing().is_missing(
+            soid.get_snapdir()));
+    assert(headctx);
+    // check snapset
+    SnapSetContext *ssc = headctx->ssc;
+    assert(ssc);
+    dout(10) << " snapset " << ssc->snapset << dendl;
+    calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
+                      get_info().last_backfill,
+                      recovery_info.copy_subset,
+                      recovery_info.clone_subset);
+    // FIXME: this may overestimate if we are pulling multiple clones in parallel...
+    dout(10) << " pulling " << recovery_info << dendl;
+  } else {
+    // pulling head or unversioned object.
+    // always pull the whole thing.
+    recovery_info.copy_subset.insert(0, (uint64_t)-1);
+    recovery_info.size = ((uint64_t)-1);
+  }
+
+  h->pulls[fromshard].push_back(PullOp());
+  PullOp &op = h->pulls[fromshard].back();
+  op.soid = soid;
+
+  op.recovery_info = recovery_info;
+  op.recovery_info.soid = soid;
+  op.recovery_info.version = v;
+  op.recovery_progress.data_complete = false;
+  op.recovery_progress.omap_complete = false;
+  op.recovery_progress.data_recovered_to = 0;
+  op.recovery_progress.first = true;
+
+  assert(!pulling.count(soid));
+  pull_from_peer[fromshard].insert(soid);
+  PullInfo &pi = pulling[soid];
+  pi.head_ctx = headctx;
+  pi.recovery_info = op.recovery_info;
+  pi.recovery_progress = op.recovery_progress;
+}
+
+/*
+ * intelligently push an object to a replica.  make use of existing
+ * clones/heads and dup data ranges where possible.
+ */
+void ReplicatedBackend::prep_push_to_replica(
+  ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
+  PushOp *pop)
+{
+  const object_info_t& oi = obc->obs.oi;
+  uint64_t size = obc->obs.oi.size;
+
+  dout(10) << __func__ << ": " << soid << " v" << oi.version
+          << " size " << size << " to osd." << peer << dendl;
+
+  map<hobject_t, interval_set<uint64_t> > clone_subsets;
+  interval_set<uint64_t> data_subset;
+
+  // are we doing a clone on the replica?
+  if (soid.snap && soid.snap < CEPH_NOSNAP) {
+    hobject_t head = soid;
+    head.snap = CEPH_NOSNAP;
+
+    // try to base push off of clones that succeed/preceed poid
+    // we need the head (and current SnapSet) locally to do that.
+    if (get_parent()->get_local_missing().is_missing(head)) {
+      dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
+      return prep_push(obc, soid, peer, pop);
+    }
+    hobject_t snapdir = head;
+    snapdir.snap = CEPH_SNAPDIR;
+    if (get_parent()->get_local_missing().is_missing(snapdir)) {
+      dout(15) << "push_to_replica missing snapdir " << snapdir
+              << ", pushing raw clone" << dendl;
+      return prep_push(obc, soid, peer, pop);
+    }
+
+    SnapSetContext *ssc = obc->ssc;
+    assert(ssc);
+    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+    map<pg_shard_t, pg_missing_t>::const_iterator pm =
+      get_parent()->get_shard_missing().find(peer);
+    assert(pm != get_parent()->get_shard_missing().end());
+    map<pg_shard_t, pg_info_t>::const_iterator pi =
+      get_parent()->get_shard_info().find(peer);
+    assert(pi != get_parent()->get_shard_info().end());
+    calc_clone_subsets(ssc->snapset, soid,
+                      pm->second,
+                      pi->second.last_backfill,
+                      data_subset, clone_subsets);
+  } else if (soid.snap == CEPH_NOSNAP) {
+    // pushing head or unversioned object.
+    // base this on partially on replica's clones?
+    SnapSetContext *ssc = obc->ssc;
+    assert(ssc);
+    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
+    calc_head_subsets(
+      obc,
+      ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
+      get_parent()->get_shard_info().find(peer)->second.last_backfill,
+      data_subset, clone_subsets);
+  }
+
+  prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
+}
+
+void ReplicatedBackend::prep_push(ObjectContextRef obc,
+                            const hobject_t& soid, pg_shard_t peer,
+                            PushOp *pop)
+{
+  interval_set<uint64_t> data_subset;
+  if (obc->obs.oi.size)
+    data_subset.insert(0, obc->obs.oi.size);
+  map<hobject_t, interval_set<uint64_t> > clone_subsets;
+
+  prep_push(obc, soid, peer,
+           obc->obs.oi.version, data_subset, clone_subsets,
+           pop);
+}
+
+void ReplicatedBackend::prep_push(
+  ObjectContextRef obc,
+  const hobject_t& soid, pg_shard_t peer,
+  eversion_t version,
+  interval_set<uint64_t> &data_subset,
+  map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+  PushOp *pop)
+{
+  get_parent()->begin_peer_recover(peer, soid);
+  // take note.
+  PushInfo &pi = pushing[soid][peer];
+  pi.obc = obc;
+  pi.recovery_info.size = obc->obs.oi.size;
+  pi.recovery_info.copy_subset = data_subset;
+  pi.recovery_info.clone_subset = clone_subsets;
+  pi.recovery_info.soid = soid;
+  pi.recovery_info.oi = obc->obs.oi;
+  pi.recovery_info.version = version;
+  pi.recovery_progress.first = true;
+  pi.recovery_progress.data_recovered_to = 0;
+  pi.recovery_progress.data_complete = 0;
+  pi.recovery_progress.omap_complete = 0;
+
+  ObjectRecoveryProgress new_progress;
+  int r = build_push_op(pi.recovery_info,
+                       pi.recovery_progress,
+                       &new_progress,
+                       pop,
+                       &(pi.stat));
+  assert(r == 0);
+  pi.recovery_progress = new_progress;
+}
+
+int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
+                                       const ObjectRecoveryInfo &recovery_info,
+                                       ObjectRecoveryProgress progress)
+{
+  // send op
+  ceph_tid_t tid = get_parent()->get_tid();
+  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+
+  dout(10) << "send_pull_op " << recovery_info.soid << " "
+          << recovery_info.version
+          << " first=" << progress.first
+          << " data " << recovery_info.copy_subset
+          << " from osd." << peer
+          << " tid " << tid << dendl;
+
+  MOSDSubOp *subop = new MOSDSubOp(
+    rid, parent->whoami_shard(),
+    get_info().pgid, recovery_info.soid,
+    CEPH_OSD_FLAG_ACK,
+    get_osdmap()->get_epoch(), tid,
+    recovery_info.version);
+  subop->set_priority(prio);
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_PULL;
+  subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
+  subop->recovery_info = recovery_info;
+  subop->recovery_progress = progress;
+
+  get_parent()->send_message_osd_cluster(
+    peer.osd, subop, get_osdmap()->get_epoch());
+
+  get_parent()->get_logger()->inc(l_osd_pull);
+  return 0;
+}
+
+void ReplicatedBackend::submit_push_data(
+  ObjectRecoveryInfo &recovery_info,
+  bool first,
+  bool complete,
+  const interval_set<uint64_t> &intervals_included,
+  bufferlist data_included,
+  bufferlist omap_header,
+  map<string, bufferlist> &attrs,
+  map<string, bufferlist> &omap_entries,
+  ObjectStore::Transaction *t)
+{
+  coll_t target_coll;
+  if (first && complete) {
+    target_coll = coll;
+  } else {
+    dout(10) << __func__ << ": Creating oid "
+            << recovery_info.soid << " in the temp collection" << dendl;
+    add_temp_obj(recovery_info.soid);
+    target_coll = get_temp_coll(t);
+  }
+
+  if (first) {
+    get_parent()->on_local_recover_start(recovery_info.soid, t);
+    t->remove(get_temp_coll(t), recovery_info.soid);
+    t->touch(target_coll, recovery_info.soid);
+    t->truncate(target_coll, recovery_info.soid, recovery_info.size);
+    t->omap_setheader(target_coll, recovery_info.soid, omap_header);
+  }
+  uint64_t off = 0;
+  for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
+       p != intervals_included.end();
+       ++p) {
+    bufferlist bit;
+    bit.substr_of(data_included, off, p.get_len());
+    t->write(target_coll, recovery_info.soid,
+            p.get_start(), p.get_len(), bit);
+    off += p.get_len();
+  }
+
+  t->omap_setkeys(target_coll, recovery_info.soid,
+                 omap_entries);
+  t->setattrs(target_coll, recovery_info.soid,
+             attrs);
+
+  if (complete) {
+    if (!first) {
+      dout(10) << __func__ << ": Removing oid "
+              << recovery_info.soid << " from the temp collection" << dendl;
+      clear_temp_obj(recovery_info.soid);
+      t->collection_move(coll, target_coll, recovery_info.soid);
+    }
+
+    submit_push_complete(recovery_info, t);
+  }
+}
+
+void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+                                            ObjectStore::Transaction *t)
+{
+  for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
+        recovery_info.clone_subset.begin();
+       p != recovery_info.clone_subset.end();
+       ++p) {
+    for (interval_set<uint64_t>::const_iterator q = p->second.begin();
+        q != p->second.end();
+        ++q) {
+      dout(15) << " clone_range " << p->first << " "
+              << q.get_start() << "~" << q.get_len() << dendl;
+      t->clone_range(coll, p->first, recovery_info.soid,
+                    q.get_start(), q.get_len(), q.get_start());
+    }
+  }
+}
+
+ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
+  const ObjectRecoveryInfo& recovery_info,
+  SnapSetContext *ssc)
+{
+  if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
+    return recovery_info;
+  ObjectRecoveryInfo new_info = recovery_info;
+  new_info.copy_subset.clear();
+  new_info.clone_subset.clear();
+  assert(ssc);
+  calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+                    get_info().last_backfill,
+                    new_info.copy_subset, new_info.clone_subset);
+  return new_info;
+}
+
+bool ReplicatedBackend::handle_pull_response(
+  pg_shard_t from, PushOp &pop, PullOp *response,
+  list<hobject_t> *to_continue,
+  ObjectStore::Transaction *t
+  )
+{
+  interval_set<uint64_t> data_included = pop.data_included;
+  bufferlist data;
+  data.claim(pop.data);
+  dout(10) << "handle_pull_response "
+          << pop.recovery_info
+          << pop.after_progress
+          << " data.size() is " << data.length()
+          << " data_included: " << data_included
+          << dendl;
+  if (pop.version == eversion_t()) {
+    // replica doesn't have it!
+    _failed_push(from, pop.soid);
+    return false;
+  }
+
+  hobject_t &hoid = pop.soid;
+  assert((data_included.empty() && data.length() == 0) ||
+        (!data_included.empty() && data.length() > 0));
+
+  if (!pulling.count(hoid)) {
+    return false;
+  }
+
+  PullInfo &pi = pulling[hoid];
+  if (pi.recovery_info.size == (uint64_t(-1))) {
+    pi.recovery_info.size = pop.recovery_info.size;
+    pi.recovery_info.copy_subset.intersection_of(
+      pop.recovery_info.copy_subset);
+  }
+
+  bool first = pi.recovery_progress.first;
+  if (first) {
+    pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
+    pi.recovery_info.oi = pi.obc->obs.oi;
+    pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+  }
+
+
+  interval_set<uint64_t> usable_intervals;
+  bufferlist usable_data;
+  trim_pushed_data(pi.recovery_info.copy_subset,
+                  data_included,
+                  data,
+                  &usable_intervals,
+                  &usable_data);
+  data_included = usable_intervals;
+  data.claim(usable_data);
+
+
+  pi.recovery_progress = pop.after_progress;
+
+  pi.stat.num_bytes_recovered += data.length();
+
+  dout(10) << "new recovery_info " << pi.recovery_info
+          << ", new progress " << pi.recovery_progress
+          << dendl;
+
+  bool complete = pi.is_complete();
+
+  submit_push_data(pi.recovery_info, first,
+                  complete,
+                  data_included, data,
+                  pop.omap_header,
+                  pop.attrset,
+                  pop.omap_entries,
+                  t);
+
+  pi.stat.num_keys_recovered += pop.omap_entries.size();
+
+  if (complete) {
+    to_continue->push_back(hoid);
+    pi.stat.num_objects_recovered++;
+    get_parent()->on_local_recover(
+      hoid, pi.stat, pi.recovery_info, pi.obc, t);
+    pull_from_peer[from].erase(hoid);
+    if (pull_from_peer[from].empty())
+      pull_from_peer.erase(from);
+    return false;
+  } else {
+    response->soid = pop.soid;
+    response->recovery_info = pi.recovery_info;
+    response->recovery_progress = pi.recovery_progress;
+    return true;
+  }
+}
+
+void ReplicatedBackend::handle_push(
+  pg_shard_t from, PushOp &pop, PushReplyOp *response,
+  ObjectStore::Transaction *t)
+{
+  dout(10) << "handle_push "
+          << pop.recovery_info
+          << pop.after_progress
+          << dendl;
+  bufferlist data;
+  data.claim(pop.data);
+  bool first = pop.before_progress.first;
+  bool complete = pop.after_progress.data_complete &&
+    pop.after_progress.omap_complete;
+
+  response->soid = pop.recovery_info.soid;
+  submit_push_data(pop.recovery_info,
+                  first,
+                  complete,
+                  pop.data_included,
+                  data,
+                  pop.omap_header,
+                  pop.attrset,
+                  pop.omap_entries,
+                  t);
+
+  if (complete)
+    get_parent()->on_local_recover(
+      pop.recovery_info.soid,
+      object_stat_sum_t(),
+      pop.recovery_info,
+      ObjectContextRef(), // ok, is replica
+      t);
+}
+
+void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
+{
+  for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
+       i != pushes.end();
+       ++i) {
+    ConnectionRef con = get_parent()->get_con_osd_cluster(
+      i->first.osd,
+      get_osdmap()->get_epoch());
+    if (!con)
+      continue;
+    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+      for (vector<PushOp>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       dout(20) << __func__ << ": sending push (legacy) " << *j
+                << " to osd." << i->first << dendl;
+       send_push_op_legacy(prio, i->first, *j);
+      }
+    } else {
+      vector<PushOp>::iterator j = i->second.begin();
+      while (j != i->second.end()) {
+       uint64_t cost = 0;
+       uint64_t pushes = 0;
+       MOSDPGPush *msg = new MOSDPGPush();
+       msg->from = get_parent()->whoami_shard();
+       msg->pgid = get_parent()->primary_spg_t();
+       msg->map_epoch = get_osdmap()->get_epoch();
+       msg->set_priority(prio);
+       for (;
+            (j != i->second.end() &&
+             cost < cct->_conf->osd_max_push_cost &&
+             pushes < cct->_conf->osd_max_push_objects) ;
+            ++j) {
+         dout(20) << __func__ << ": sending push " << *j
+                  << " to osd." << i->first << dendl;
+         cost += j->cost(cct);
+         pushes += 1;
+         msg->pushes.push_back(*j);
+       }
+       msg->compute_cost(cct);
+       get_parent()->send_message_osd_cluster(msg, con);
+      }
+    }
+  }
+}
+
+void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
+{
+  for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
+       i != pulls.end();
+       ++i) {
+    ConnectionRef con = get_parent()->get_con_osd_cluster(
+      i->first.osd,
+      get_osdmap()->get_epoch());
+    if (!con)
+      continue;
+    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
+      for (vector<PullOp>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       dout(20) << __func__ << ": sending pull (legacy) " << *j
+                << " to osd." << i->first << dendl;
+       send_pull_legacy(
+         prio,
+         i->first,
+         j->recovery_info,
+         j->recovery_progress);
+      }
+    } else {
+      dout(20) << __func__ << ": sending pulls " << i->second
+              << " to osd." << i->first << dendl;
+      MOSDPGPull *msg = new MOSDPGPull();
+      msg->from = parent->whoami_shard();
+      msg->set_priority(prio);
+      msg->pgid = get_parent()->primary_spg_t();
+      msg->map_epoch = get_osdmap()->get_epoch();
+      msg->pulls.swap(i->second);
+      msg->compute_cost(cct);
+      get_parent()->send_message_osd_cluster(msg, con);
+    }
+  }
+}
+
+int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
+                                    const ObjectRecoveryProgress &progress,
+                                    ObjectRecoveryProgress *out_progress,
+                                    PushOp *out_op,
+                                    object_stat_sum_t *stat)
+{
+  ObjectRecoveryProgress _new_progress;
+  if (!out_progress)
+    out_progress = &_new_progress;
+  ObjectRecoveryProgress &new_progress = *out_progress;
+  new_progress = progress;
+
+  dout(7) << "send_push_op " << recovery_info.soid
+         << " v " << recovery_info.version
+         << " size " << recovery_info.size
+         << " recovery_info: " << recovery_info
+          << dendl;
+
+  if (progress.first) {
+    store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+    store->getattrs(coll, recovery_info.soid, out_op->attrset);
+
+    // Debug
+    bufferlist bv = out_op->attrset[OI_ATTR];
+    object_info_t oi(bv);
+
+    if (oi.version != recovery_info.version) {
+      get_parent()->clog_error() << get_info().pgid << " push "
+                                << recovery_info.soid << " v "
+                                << recovery_info.version
+                                << " failed because local copy is "
+                                << oi.version << "\n";
+      return -EINVAL;
+    }
+
+    new_progress.first = false;
+  }
+
+  uint64_t available = cct->_conf->osd_recovery_max_chunk;
+  if (!progress.omap_complete) {
+    ObjectMap::ObjectMapIterator iter =
+      store->get_omap_iterator(coll,
+                              recovery_info.soid);
+    for (iter->lower_bound(progress.omap_recovered_to);
+        iter->valid();
+        iter->next()) {
+      if (!out_op->omap_entries.empty() &&
+         available <= (iter->key().size() + iter->value().length()))
+       break;
+      out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
+
+      if ((iter->key().size() + iter->value().length()) <= available)
+       available -= (iter->key().size() + iter->value().length());
+      else
+       available = 0;
+    }
+    if (!iter->valid())
+      new_progress.omap_complete = true;
+    else
+      new_progress.omap_recovered_to = iter->key();
+  }
+
+  if (available > 0) {
+    if (!recovery_info.copy_subset.empty()) {
+      interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
+      bufferlist bl;
+      int r = store->fiemap(coll, recovery_info.soid, 0,
+                            copy_subset.range_end(), bl);
+      if (r >= 0)  {
+        interval_set<uint64_t> fiemap_included;
+        map<uint64_t, uint64_t> m;
+        bufferlist::iterator iter = bl.begin();
+        ::decode(m, iter);
+        map<uint64_t, uint64_t>::iterator miter;
+        for (miter = m.begin(); miter != m.end(); ++miter) {
+          fiemap_included.insert(miter->first, miter->second);
+        }
+
+        copy_subset.intersection_of(fiemap_included);
+      }
+      out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
+                                    available);
+      if (out_op->data_included.empty()) // zero filled section, skip to end!
+        new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
+      else
+        new_progress.data_recovered_to = out_op->data_included.range_end();
+    }
+  } else {
+    out_op->data_included.clear();
+  }
+
+  for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
+       p != out_op->data_included.end();
+       ++p) {
+    bufferlist bit;
+    store->read(coll, recovery_info.soid,
+                    p.get_start(), p.get_len(), bit);
+    if (p.get_len() != bit.length()) {
+      dout(10) << " extent " << p.get_start() << "~" << p.get_len()
+              << " is actually " << p.get_start() << "~" << bit.length()
+              << dendl;
+      interval_set<uint64_t>::iterator save = p++;
+      if (bit.length() == 0)
+        out_op->data_included.erase(save);     //Remove this empty interval
+      else
+        save.set_len(bit.length());
+      // Remove any other intervals present
+      while (p != out_op->data_included.end()) {
+        interval_set<uint64_t>::iterator save = p++;
+        out_op->data_included.erase(save);
+      }
+      new_progress.data_complete = true;
+      out_op->data.claim_append(bit);
+      break;
+    }
+    out_op->data.claim_append(bit);
+  }
+
+  if (new_progress.is_complete(recovery_info)) {
+    new_progress.data_complete = true;
+    if (stat)
+      stat->num_objects_recovered++;
+  }
+
+  if (stat) {
+    stat->num_keys_recovered += out_op->omap_entries.size();
+    stat->num_bytes_recovered += out_op->data.length();
+  }
+
+  get_parent()->get_logger()->inc(l_osd_push);
+  get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
+
+  // send
+  out_op->version = recovery_info.version;
+  out_op->soid = recovery_info.soid;
+  out_op->recovery_info = recovery_info;
+  out_op->after_progress = new_progress;
+  out_op->before_progress = progress;
+  return 0;
+}
+
+int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
+{
+  ceph_tid_t tid = get_parent()->get_tid();
+  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
+  MOSDSubOp *subop = new MOSDSubOp(
+    rid, parent->whoami_shard(),
+    spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
+    0, get_osdmap()->get_epoch(),
+    tid, pop.recovery_info.version);
+  subop->ops = vector<OSDOp>(1);
+  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+  subop->set_priority(prio);
+  subop->version = pop.version;
+  subop->ops[0].indata.claim(pop.data);
+  subop->data_included.swap(pop.data_included);
+  subop->omap_header.claim(pop.omap_header);
+  subop->omap_entries.swap(pop.omap_entries);
+  subop->attrset.swap(pop.attrset);
+  subop->recovery_info = pop.recovery_info;
+  subop->current_progress = pop.before_progress;
+  subop->recovery_progress = pop.after_progress;
+
+  get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
+  return 0;
+}
+
+void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
+{
+  op->recovery_info.version = eversion_t();
+  op->version = eversion_t();
+  op->soid = soid;
+}
+
+void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
+{
+  MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
+  const hobject_t& soid = reply->get_poid();
+  assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
+  dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+  pg_shard_t peer = reply->from;
+
+  op->mark_started();
+
+  PushReplyOp rop;
+  rop.soid = soid;
+  PushOp pop;
+  bool more = handle_push_reply(peer, rop, &pop);
+  if (more)
+    send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
+}
+
+bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
+{
+  const hobject_t &soid = op.soid;
+  if (pushing.count(soid) == 0) {
+    dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+            << ", or anybody else"
+            << dendl;
+    return false;
+  } else if (pushing[soid].count(peer) == 0) {
+    dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
+            << dendl;
+    return false;
+  } else {
+    PushInfo *pi = &pushing[soid][peer];
+
+    if (!pi->recovery_progress.data_complete) {
+      dout(10) << " pushing more from, "
+              << pi->recovery_progress.data_recovered_to
+              << " of " << pi->recovery_info.copy_subset << dendl;
+      ObjectRecoveryProgress new_progress;
+      int r = build_push_op(
+       pi->recovery_info,
+       pi->recovery_progress, &new_progress, reply,
+       &(pi->stat));
+      assert(r == 0);
+      pi->recovery_progress = new_progress;
+      return true;
+    } else {
+      // done!
+      get_parent()->on_peer_recover(
+       peer, soid, pi->recovery_info,
+       pi->stat);
+
+      pushing[soid].erase(peer);
+      pi = NULL;
+
+
+      if (pushing[soid].empty()) {
+       get_parent()->on_global_recover(soid);
+       pushing.erase(soid);
+      } else {
+       dout(10) << "pushed " << soid << ", still waiting for push ack from "
+                << pushing[soid].size() << " others" << dendl;
+      }
+      return false;
+    }
+  }
+}
+
+/** op_pull
+ * process request to pull an entire object.
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_pull(OpRequestRef op)
+{
+  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+  assert(m->get_type() == MSG_OSD_SUBOP);
+
+  op->mark_started();
+
+  const hobject_t soid = m->poid;
+
+  dout(7) << "pull" << soid << " v " << m->version
+          << " from " << m->get_source()
+          << dendl;
+
+  assert(!is_primary());  // we should be a replica or stray.
+
+  PullOp pop;
+  pop.soid = soid;
+  pop.recovery_info = m->recovery_info;
+  pop.recovery_progress = m->recovery_progress;
+
+  PushOp reply;
+  handle_pull(m->from, pop, &reply);
+  send_push_op_legacy(
+    m->get_priority(),
+    m->from,
+    reply);
+
+  log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
+}
+
+void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
+{
+  const hobject_t &soid = op.soid;
+  struct stat st;
+  int r = store->stat(coll, soid, &st);
+  if (r != 0) {
+    get_parent()->clog_error() << get_info().pgid << " "
+                              << peer << " tried to pull " << soid
+                              << " but got " << cpp_strerror(-r) << "\n";
+    prep_push_op_blank(soid, reply);
+  } else {
+    ObjectRecoveryInfo &recovery_info = op.recovery_info;
+    ObjectRecoveryProgress &progress = op.recovery_progress;
+    if (progress.first && recovery_info.size == ((uint64_t)-1)) {
+      // Adjust size and copy_subset
+      recovery_info.size = st.st_size;
+      recovery_info.copy_subset.clear();
+      if (st.st_size)
+        recovery_info.copy_subset.insert(0, st.st_size);
+      assert(recovery_info.clone_subset.empty());
+    }
+
+    r = build_push_op(recovery_info, progress, 0, reply);
+    if (r < 0)
+      prep_push_op_blank(soid, reply);
+  }
+}
+
+/**
+ * trim received data to remove what we don't want
+ *
+ * @param copy_subset intervals we want
+ * @param data_included intervals we got
+ * @param data_recieved data we got
+ * @param intervals_usable intervals we want to keep
+ * @param data_usable matching data we want to keep
+ */
+void ReplicatedBackend::trim_pushed_data(
+  const interval_set<uint64_t> &copy_subset,
+  const interval_set<uint64_t> &intervals_received,
+  bufferlist data_received,
+  interval_set<uint64_t> *intervals_usable,
+  bufferlist *data_usable)
+{
+  if (intervals_received.subset_of(copy_subset)) {
+    *intervals_usable = intervals_received;
+    *data_usable = data_received;
+    return;
+  }
+
+  intervals_usable->intersection_of(copy_subset,
+                                   intervals_received);
+
+  uint64_t off = 0;
+  for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
+       p != intervals_received.end();
+       ++p) {
+    interval_set<uint64_t> x;
+    x.insert(p.get_start(), p.get_len());
+    x.intersection_of(copy_subset);
+    for (interval_set<uint64_t>::const_iterator q = x.begin();
+        q != x.end();
+        ++q) {
+      bufferlist sub;
+      uint64_t data_off = off + (q.get_start() - p.get_start());
+      sub.substr_of(data_received, data_off, q.get_len());
+      data_usable->claim_append(sub);
+    }
+    off += p.get_len();
+  }
+}
+
+/** op_push
+ * NOTE: called from opqueue.
+ */
+void ReplicatedBackend::sub_op_push(OpRequestRef op)
+{
+  op->mark_started();
+  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
+
+  PushOp pop;
+  pop.soid = m->recovery_info.soid;
+  pop.version = m->version;
+  m->claim_data(pop.data);
+  pop.data_included.swap(m->data_included);
+  pop.omap_header.swap(m->omap_header);
+  pop.omap_entries.swap(m->omap_entries);
+  pop.attrset.swap(m->attrset);
+  pop.recovery_info = m->recovery_info;
+  pop.before_progress = m->current_progress;
+  pop.after_progress = m->recovery_progress;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+
+  if (is_primary()) {
+    PullOp resp;
+    RPGHandle *h = _open_recovery_op();
+    list<hobject_t> to_continue;
+    bool more = handle_pull_response(
+      m->from, pop, &resp,
+      &to_continue, t);
+    if (more) {
+      send_pull_legacy(
+       m->get_priority(),
+       m->from,
+       resp.recovery_info,
+       resp.recovery_progress);
+    } else {
+      C_ReplicatedBackend_OnPullComplete *c =
+       new C_ReplicatedBackend_OnPullComplete(
+         this,
+         op->get_req()->get_priority());
+      c->to_continue.swap(to_continue);
+      t->register_on_complete(
+       new PG_RecoveryQueueAsync(
+         get_parent(),
+         get_parent()->bless_gencontext(c)));
+    }
+    run_recovery_op(h, op->get_req()->get_priority());
+  } else {
+    PushReplyOp resp;
+    MOSDSubOpReply *reply = new MOSDSubOpReply(
+      m, parent->whoami_shard(), 0,
+      get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+    reply->set_priority(m->get_priority());
+    assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+    handle_push(m->from, pop, &resp, t);
+    t->register_on_complete(new PG_SendMessageOnConn(
+                             get_parent(), reply, m->get_connection()));
+  }
+  t->register_on_applied(
+    new ObjectStore::C_DeleteTransaction(t));
+  get_parent()->queue_transaction(t);
+  return;
+}
+
+void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
+{
+  get_parent()->failed_push(from, soid);
+  pull_from_peer[from].erase(soid);
+  if (pull_from_peer[from].empty())
+    pull_from_peer.erase(from);
+  pulling.erase(soid);
+}
+
+int ReplicatedBackend::start_pushes(
+  const hobject_t &soid,
+  ObjectContextRef obc,
+  RPGHandle *h)
+{
+  int pushes = 0;
+  // who needs it?
+  assert(get_parent()->get_actingbackfill_shards().size() > 0);
+  for (set<pg_shard_t>::iterator i =
+        get_parent()->get_actingbackfill_shards().begin();
+       i != get_parent()->get_actingbackfill_shards().end();
+       ++i) {
+    if (*i == get_parent()->whoami_shard()) continue;
+    pg_shard_t peer = *i;
+    map<pg_shard_t, pg_missing_t>::const_iterator j =
+      get_parent()->get_shard_missing().find(peer);
+    assert(j != get_parent()->get_shard_missing().end());
+    if (j->second.is_missing(soid)) {
+      ++pushes;
+      h->pushes[peer].push_back(PushOp());
+      prep_push_to_replica(obc, soid, peer,
+                          &(h->pushes[peer].back())
+       );
+    }
+  }
+  return pushes;
+}
index 64b88eb0266af5c6f2983d1be05a933aeca89fbd..c748aa9c74e02dbd972ed9054442b9f7fa19888f 100644 (file)
@@ -90,35 +90,6 @@ PGLSFilter::~PGLSFilter()
 {
 }
 
-static void log_subop_stats(
-  PerfCounters *logger,
-  OpRequestRef op, int subop)
-{
-  utime_t now = ceph_clock_now(g_ceph_context);
-  utime_t latency = now;
-  latency -= op->get_req()->get_recv_stamp();
-
-
-  logger->inc(l_osd_sop);
-  logger->tinc(l_osd_sop_lat, latency);
-  logger->inc(subop);
-
-  if (subop != l_osd_sop_pull) {
-    uint64_t inb = op->get_req()->get_data().length();
-    logger->inc(l_osd_sop_inb, inb);
-    if (subop == l_osd_sop_w) {
-      logger->inc(l_osd_sop_w_inb, inb);
-      logger->tinc(l_osd_sop_w_lat, latency);
-    } else if (subop == l_osd_sop_push) {
-      logger->inc(l_osd_sop_push_inb, inb);
-      logger->tinc(l_osd_sop_push_lat, latency);
-    } else
-      assert("no support subop" == 0);
-  } else {
-    logger->tinc(l_osd_sop_pull_lat, latency);
-  }
-}
-
 struct OnReadComplete : public Context {
   ReplicatedPG *pg;
   ReplicatedPG::OpContext *opcontext;
@@ -2605,150 +2576,6 @@ void ReplicatedPG::do_scan(
   }
 }
 
-void ReplicatedBackend::_do_push(OpRequestRef op)
-{
-  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_PUSH);
-  pg_shard_t from = m->from;
-
-  vector<PushReplyOp> replies;
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  for (vector<PushOp>::iterator i = m->pushes.begin();
-       i != m->pushes.end();
-       ++i) {
-    replies.push_back(PushReplyOp());
-    handle_push(from, *i, &(replies.back()), t);
-  }
-
-  MOSDPGPushReply *reply = new MOSDPGPushReply;
-  reply->from = get_parent()->whoami_shard();
-  reply->set_priority(m->get_priority());
-  reply->pgid = get_info().pgid;
-  reply->map_epoch = m->map_epoch;
-  reply->replies.swap(replies);
-  reply->compute_cost(cct);
-
-  t->register_on_complete(
-    new PG_SendMessageOnConn(
-      get_parent(), reply, m->get_connection()));
-
-  t->register_on_applied(
-    new ObjectStore::C_DeleteTransaction(t));
-  get_parent()->queue_transaction(t);
-}
-
-struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
-  ReplicatedBackend *bc;
-  list<hobject_t> to_continue;
-  int priority;
-  C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
-    : bc(bc), priority(priority) {}
-
-  void finish(ThreadPool::TPHandle &handle) {
-    ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
-    for (list<hobject_t>::iterator i =
-          to_continue.begin();
-        i != to_continue.end();
-        ++i) {
-      map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
-       bc->pulling.find(*i);
-      assert(j != bc->pulling.end());
-      if (!bc->start_pushes(*i, j->second.obc, h)) {
-       bc->get_parent()->on_global_recover(
-         *i);
-      }
-      bc->pulling.erase(*i);
-      handle.reset_tp_timeout();
-    }
-    bc->run_recovery_op(h, priority);
-  }
-};
-
-void ReplicatedBackend::_do_pull_response(OpRequestRef op)
-{
-  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_PUSH);
-  pg_shard_t from = m->from;
-
-  vector<PullOp> replies(1);
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-  list<hobject_t> to_continue;
-  for (vector<PushOp>::iterator i = m->pushes.begin();
-       i != m->pushes.end();
-       ++i) {
-    bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
-    if (more)
-      replies.push_back(PullOp());
-  }
-  if (!to_continue.empty()) {
-    C_ReplicatedBackend_OnPullComplete *c =
-      new C_ReplicatedBackend_OnPullComplete(
-       this,
-       m->get_priority());
-    c->to_continue.swap(to_continue);
-    t->register_on_complete(
-      new PG_RecoveryQueueAsync(
-       get_parent(),
-       get_parent()->bless_gencontext(c)));
-  }
-  replies.erase(replies.end() - 1);
-
-  if (replies.size()) {
-    MOSDPGPull *reply = new MOSDPGPull;
-    reply->from = parent->whoami_shard();
-    reply->set_priority(m->get_priority());
-    reply->pgid = get_info().pgid;
-    reply->map_epoch = m->map_epoch;
-    reply->pulls.swap(replies);
-    reply->compute_cost(cct);
-
-    t->register_on_complete(
-      new PG_SendMessageOnConn(
-       get_parent(), reply, m->get_connection()));
-  }
-
-  t->register_on_applied(
-    new ObjectStore::C_DeleteTransaction(t));
-  get_parent()->queue_transaction(t);
-}
-
-void ReplicatedBackend::do_pull(OpRequestRef op)
-{
-  MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_PULL);
-  pg_shard_t from = m->from;
-
-  map<pg_shard_t, vector<PushOp> > replies;
-  for (vector<PullOp>::iterator i = m->pulls.begin();
-       i != m->pulls.end();
-       ++i) {
-    replies[from].push_back(PushOp());
-    handle_pull(from, *i, &(replies[from].back()));
-  }
-  send_pushes(m->get_priority(), replies);
-}
-
-void ReplicatedBackend::do_push_reply(OpRequestRef op)
-{
-  MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
-  pg_shard_t from = m->from;
-
-  vector<PushOp> replies(1);
-  for (vector<PushReplyOp>::iterator i = m->replies.begin();
-       i != m->replies.end();
-       ++i) {
-    bool more = handle_push_reply(from, *i, &(replies.back()));
-    if (more)
-      replies.push_back(PushOp());
-  }
-  replies.erase(replies.end() - 1);
-
-  map<pg_shard_t, vector<PushOp> > _replies;
-  _replies[from].swap(replies);
-  send_pushes(m->get_priority(), _replies);
-}
-
 void ReplicatedPG::do_backfill(OpRequestRef op)
 {
   MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
@@ -7608,136 +7435,6 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
   repop->ctx->op_t = NULL;
 }
 
-template<typename T, int MSGTYPE>
-Message * ReplicatedBackend::generate_subop(
-  const hobject_t &soid,
-  const eversion_t &at_version,
-  ceph_tid_t tid,
-  osd_reqid_t reqid,
-  eversion_t pg_trim_to,
-  eversion_t pg_trim_rollback_to,
-  hobject_t new_temp_oid,
-  hobject_t discard_temp_oid,
-  const vector<pg_log_entry_t> &log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_hist,
-  InProgressOp *op,
-  ObjectStore::Transaction *op_t,
-  pg_shard_t peer,
-  const pg_info_t &pinfo)
-{
-  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
-  assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
-  // forward the write/update/whatever
-  T *wr = new T(
-    reqid, parent->whoami_shard(),
-    spg_t(get_info().pgid.pgid, peer.shard),
-    soid, acks_wanted,
-    get_osdmap()->get_epoch(),
-    tid, at_version);
-
-  // ship resulting transaction, log entries, and pg_stats
-  if (!parent->should_send_op(peer, soid)) {
-    dout(10) << "issue_repop shipping empty opt to osd." << peer
-            <<", object " << soid
-            << " beyond MAX(last_backfill_started "
-            << ", pinfo.last_backfill "
-            << pinfo.last_backfill << ")" << dendl;
-    ObjectStore::Transaction t;
-    t.set_use_tbl(op_t->get_use_tbl());
-    ::encode(t, wr->get_data());
-  } else {
-    ::encode(*op_t, wr->get_data());
-  }
-
-  ::encode(log_entries, wr->logbl);
-
-  if (pinfo.is_incomplete())
-    wr->pg_stats = pinfo.stats;  // reflects backfill progress
-  else
-    wr->pg_stats = get_info().stats;
-    
-  wr->pg_trim_to = pg_trim_to;
-  wr->pg_trim_rollback_to = pg_trim_rollback_to;
-
-  wr->new_temp_oid = new_temp_oid;
-  wr->discard_temp_oid = discard_temp_oid;
-  wr->updated_hit_set_history = hset_hist;
-  return wr;
-}
-
-void ReplicatedBackend::issue_op(
-  const hobject_t &soid,
-  const eversion_t &at_version,
-  ceph_tid_t tid,
-  osd_reqid_t reqid,
-  eversion_t pg_trim_to,
-  eversion_t pg_trim_rollback_to,
-  hobject_t new_temp_oid,
-  hobject_t discard_temp_oid,
-  const vector<pg_log_entry_t> &log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_hist,
-  InProgressOp *op,
-  ObjectStore::Transaction *op_t)
-{
-
-  if (parent->get_actingbackfill_shards().size() > 1) {
-    ostringstream ss;
-    set<pg_shard_t> replicas = parent->get_actingbackfill_shards();
-    replicas.erase(parent->whoami_shard());
-    ss << "waiting for subops from " << replicas;
-    if (op->op)
-      op->op->mark_sub_op_sent(ss.str());
-  }
-  for (set<pg_shard_t>::const_iterator i =
-        parent->get_actingbackfill_shards().begin();
-       i != parent->get_actingbackfill_shards().end();
-       ++i) {
-    if (*i == parent->whoami_shard()) continue;
-    pg_shard_t peer = *i;
-    const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
-
-    Message *wr;
-    uint64_t min_features = parent->min_peer_features();
-    if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
-      dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
-      wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
-           soid,
-           at_version,
-           tid,
-           reqid,
-           pg_trim_to,
-           pg_trim_rollback_to,
-           new_temp_oid,
-           discard_temp_oid,
-           log_entries,
-           hset_hist,
-           op,
-           op_t,
-           peer,
-           pinfo);
-    } else {
-      wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
-           soid,
-           at_version,
-           tid,
-           reqid,
-           pg_trim_to,
-           pg_trim_rollback_to,
-           new_temp_oid,
-           discard_temp_oid,
-           log_entries,
-           hset_hist,
-           op,
-           op_t,
-           peer,
-           pinfo);
-    }
-
-    get_parent()->send_message_osd_cluster(
-      peer.osd, wr, get_osdmap()->get_epoch());
-  }
-}
-
 ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
                                                 ceph_tid_t rep_tid)
 {
@@ -8439,341 +8136,6 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
   }
 }
 
-// sub op modify
-void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
-  Message *m = op->get_req();
-  int msg_type = m->get_type();
-  if (msg_type == MSG_OSD_SUBOP) {
-    sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
-  } else if (msg_type == MSG_OSD_REPOP) {
-    sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
-  } else {
-    assert(0);
-  }
-}
-
-template<typename T, int MSGTYPE>
-void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
-{
-  T *m = static_cast<T *>(op->get_req());
-  int msg_type = m->get_type();
-  assert(MSGTYPE == msg_type);
-  assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
-
-  const hobject_t& soid = m->poid;
-
-  dout(10) << "sub_op_modify trans"
-           << " " << soid 
-           << " v " << m->version
-          << (m->logbl.length() ? " (transaction)" : " (parallel exec")
-          << " " << m->logbl.length()
-          << dendl;  
-
-  // sanity checks
-  assert(m->map_epoch >= get_info().history.same_interval_since);
-  
-  // we better not be missing this.
-  assert(!parent->get_log().get_missing().is_missing(soid));
-
-  int ackerosd = m->get_source().num();
-  
-  op->mark_started();
-
-  RepModifyRef rm(new RepModify);
-  rm->op = op;
-  rm->ackerosd = ackerosd;
-  rm->last_complete = get_info().last_complete;
-  rm->epoch_started = get_osdmap()->get_epoch();
-
-  assert(m->logbl.length());
-  // shipped transaction and log entries
-  vector<pg_log_entry_t> log;
-
-  bufferlist::iterator p = m->get_data().begin();
-  ::decode(rm->opt, p);
-  rm->localt.set_use_tbl(rm->opt.get_use_tbl());
-
-  if (m->new_temp_oid != hobject_t()) {
-    dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
-    add_temp_obj(m->new_temp_oid);
-    get_temp_coll(&rm->localt);
-  }
-  if (m->discard_temp_oid != hobject_t()) {
-    dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
-    if (rm->opt.empty()) {
-      dout(10) << __func__ << ": removing object " << m->discard_temp_oid
-              << " since we won't get the transaction" << dendl;
-      rm->localt.remove(temp_coll, m->discard_temp_oid);
-    }
-    clear_temp_obj(m->discard_temp_oid);
-  }
-
-  p = m->logbl.begin();
-  ::decode(log, p);
-  rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
-
-  bool update_snaps = false;
-  if (!rm->opt.empty()) {
-    // If the opt is non-empty, we infer we are before
-    // last_backfill (according to the primary, not our
-    // not-quite-accurate value), and should update the
-    // collections now.  Otherwise, we do it later on push.
-    update_snaps = true;
-  }
-  parent->update_stats(m->pg_stats);
-  parent->log_operation(
-    log,
-    m->updated_hit_set_history,
-    m->pg_trim_to,
-    m->pg_trim_rollback_to,
-    update_snaps,
-    &(rm->localt));
-
-  rm->bytes_written = rm->opt.get_encoded_bytes();
-  
-  op->mark_started();
-
-  rm->localt.append(rm->opt);
-  rm->localt.register_on_commit(
-    parent->bless_context(
-      new C_OSD_RepModifyCommit(this, rm)));
-  rm->localt.register_on_applied(
-    parent->bless_context(
-      new C_OSD_RepModifyApply(this, rm)));
-  parent->queue_transaction(&(rm->localt), op);
-  // op is cleaned up by oncommit/onapply when both are executed
-}
-
-void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
-{
-  rm->op->mark_event("sub_op_applied");
-  rm->applied = true;
-
-  dout(10) << "sub_op_modify_applied on " << rm << " op "
-          << *rm->op->get_req() << dendl;
-  Message *m = rm->op->get_req();
-
-  Message *ack = NULL;
-  eversion_t version;
-
-  if (m->get_type() == MSG_OSD_SUBOP) {
-    // doesn't have CLIENT SUBOP feature ,use Subop
-    MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
-    version = req->version;
-    if (!rm->committed)
-      ack = new MOSDSubOpReply(
-       req, parent->whoami_shard(),
-       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-  } else if (m->get_type() == MSG_OSD_REPOP) {
-    MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
-    version = req->version;
-    if (!rm->committed)
-      ack = new MOSDRepOpReply(
-       static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
-       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-  } else {
-    assert(0);
-  }
-
-  // send ack to acker only if we haven't sent a commit already
-  if (ack) {
-    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
-    get_parent()->send_message_osd_cluster(
-      rm->ackerosd, ack, get_osdmap()->get_epoch());
-  }
-  
-  parent->op_applied(version);
-}
-
-void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
-{
-  rm->op->mark_commit_sent();
-  rm->committed = true;
-
-  // send commit.
-  dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
-          << ", sending commit to osd." << rm->ackerosd
-          << dendl;
-  
-  assert(get_osdmap()->is_up(rm->ackerosd));
-  get_parent()->update_last_complete_ondisk(rm->last_complete);
-
-  Message *m = rm->op->get_req();
-  Message *commit;
-  if (m->get_type() == MSG_OSD_SUBOP) {
-    // doesn't have CLIENT SUBOP feature ,use Subop
-    MOSDSubOpReply  *reply = new MOSDSubOpReply(
-      static_cast<MOSDSubOp*>(m),
-      get_parent()->whoami_shard(),
-      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-    reply->set_last_complete_ondisk(rm->last_complete);
-    commit = reply;
-  } else if (m->get_type() == MSG_OSD_REPOP) {
-    MOSDRepOpReply *reply = new MOSDRepOpReply(
-      static_cast<MOSDRepOp*>(m),
-      get_parent()->whoami_shard(),
-      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-    reply->set_last_complete_ondisk(rm->last_complete);
-    commit = reply;
-  }
-  else {
-    assert(0);
-  }
-
-  commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
-  get_parent()->send_message_osd_cluster(
-    rm->ackerosd, commit, get_osdmap()->get_epoch());
-  
-  log_subop_stats(get_parent()->get_logger(), rm->op, l_osd_sop_w);
-}
-
-
-// ===========================================================
-
-void ReplicatedBackend::calc_head_subsets(
-  ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
-  const pg_missing_t& missing,
-  const hobject_t &last_backfill,
-  interval_set<uint64_t>& data_subset,
-  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
-{
-  dout(10) << "calc_head_subsets " << head
-          << " clone_overlap " << snapset.clone_overlap << dendl;
-
-  uint64_t size = obc->obs.oi.size;
-  if (size)
-    data_subset.insert(0, size);
-
-  if (get_parent()->get_pool().allow_incomplete_clones()) {
-    dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
-    return;
-  }
-
-  if (!cct->_conf->osd_recover_clone_overlap) {
-    dout(10) << "calc_head_subsets " << head << " -- osd_recover_clone_overlap disabled" << dendl;
-    return;
-  }
-
-
-  interval_set<uint64_t> cloning;
-  interval_set<uint64_t> prev;
-  if (size)
-    prev.insert(0, size);    
-  
-  for (int j=snapset.clones.size()-1; j>=0; j--) {
-    hobject_t c = head;
-    c.snap = snapset.clones[j];
-    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
-    if (!missing.is_missing(c) && c < last_backfill) {
-      dout(10) << "calc_head_subsets " << head << " has prev " << c
-              << " overlap " << prev << dendl;
-      clone_subsets[c] = prev;
-      cloning.union_of(prev);
-      break;
-    }
-    dout(10) << "calc_head_subsets " << head << " does not have prev " << c
-            << " overlap " << prev << dendl;
-  }
-
-
-  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
-    dout(10) << "skipping clone, too many holes" << dendl;
-    clone_subsets.clear();
-    cloning.clear();
-  }
-
-  // what's left for us to push?
-  data_subset.subtract(cloning);
-
-  dout(10) << "calc_head_subsets " << head
-          << "  data_subset " << data_subset
-          << "  clone_subsets " << clone_subsets << dendl;
-}
-
-void ReplicatedBackend::calc_clone_subsets(
-  SnapSet& snapset, const hobject_t& soid,
-  const pg_missing_t& missing,
-  const hobject_t &last_backfill,
-  interval_set<uint64_t>& data_subset,
-  map<hobject_t, interval_set<uint64_t> >& clone_subsets)
-{
-  dout(10) << "calc_clone_subsets " << soid
-          << " clone_overlap " << snapset.clone_overlap << dendl;
-
-  uint64_t size = snapset.clone_size[soid.snap];
-  if (size)
-    data_subset.insert(0, size);
-
-  if (get_parent()->get_pool().allow_incomplete_clones()) {
-    dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
-    return;
-  }
-
-  if (!cct->_conf->osd_recover_clone_overlap) {
-    dout(10) << "calc_clone_subsets " << soid << " -- osd_recover_clone_overlap disabled" << dendl;
-    return;
-  }
-  
-  unsigned i;
-  for (i=0; i < snapset.clones.size(); i++)
-    if (snapset.clones[i] == soid.snap)
-      break;
-
-  // any overlap with next older clone?
-  interval_set<uint64_t> cloning;
-  interval_set<uint64_t> prev;
-  if (size)
-    prev.insert(0, size);    
-  for (int j=i-1; j>=0; j--) {
-    hobject_t c = soid;
-    c.snap = snapset.clones[j];
-    prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
-    if (!missing.is_missing(c) && c < last_backfill) {
-      dout(10) << "calc_clone_subsets " << soid << " has prev " << c
-              << " overlap " << prev << dendl;
-      clone_subsets[c] = prev;
-      cloning.union_of(prev);
-      break;
-    }
-    dout(10) << "calc_clone_subsets " << soid << " does not have prev " << c
-            << " overlap " << prev << dendl;
-  }
-  
-  // overlap with next newest?
-  interval_set<uint64_t> next;
-  if (size)
-    next.insert(0, size);    
-  for (unsigned j=i+1; j<snapset.clones.size(); j++) {
-    hobject_t c = soid;
-    c.snap = snapset.clones[j];
-    next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
-    if (!missing.is_missing(c) && c < last_backfill) {
-      dout(10) << "calc_clone_subsets " << soid << " has next " << c
-              << " overlap " << next << dendl;
-      clone_subsets[c] = next;
-      cloning.union_of(next);
-      break;
-    }
-    dout(10) << "calc_clone_subsets " << soid << " does not have next " << c
-            << " overlap " << next << dendl;
-  }
-
-  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
-    dout(10) << "skipping clone, too many holes" << dendl;
-    clone_subsets.clear();
-    cloning.clear();
-  }
-
-  
-  // what's left for us to push?
-  data_subset.subtract(cloning);
-
-  dout(10) << "calc_clone_subsets " << soid
-          << "  data_subset " << data_subset
-          << "  clone_subsets " << clone_subsets << dendl;
-}
-
-
 /** pull - request object from a peer
  */
 
@@ -8785,98 +8147,6 @@ void ReplicatedBackend::calc_clone_subsets(
  */
 enum { PULL_NONE, PULL_OTHER, PULL_YES };
 
-void ReplicatedBackend::prepare_pull(
-  eversion_t v,
-  const hobject_t& soid,
-  ObjectContextRef headctx,
-  RPGHandle *h)
-{
-  assert(get_parent()->get_local_missing().missing.count(soid));
-  eversion_t _v = get_parent()->get_local_missing().missing.find(
-    soid)->second.need;
-  assert(_v == v);
-  const map<hobject_t, set<pg_shard_t> > &missing_loc(
-    get_parent()->get_missing_loc_shards());
-  const map<pg_shard_t, pg_missing_t > &peer_missing(
-    get_parent()->get_shard_missing());
-  map<hobject_t, set<pg_shard_t> >::const_iterator q = missing_loc.find(soid);
-  assert(q != missing_loc.end());
-  assert(!q->second.empty());
-
-  // pick a pullee
-  vector<pg_shard_t> shuffle(q->second.begin(), q->second.end());
-  random_shuffle(shuffle.begin(), shuffle.end());
-  vector<pg_shard_t>::iterator p = shuffle.begin();
-  assert(get_osdmap()->is_up(p->osd));
-  pg_shard_t fromshard = *p;
-
-  dout(7) << "pull " << soid
-         << " v " << v
-         << " on osds " << *p
-         << " from osd." << fromshard
-         << dendl;
-
-  assert(peer_missing.count(fromshard));
-  const pg_missing_t &pmissing = peer_missing.find(fromshard)->second;
-  if (pmissing.is_missing(soid, v)) {
-    assert(pmissing.missing.find(soid)->second.have != v);
-    dout(10) << "pulling soid " << soid << " from osd " << fromshard
-            << " at version " << pmissing.missing.find(soid)->second.have
-            << " rather than at version " << v << dendl;
-    v = pmissing.missing.find(soid)->second.have;
-    assert(get_parent()->get_log().get_log().objects.count(soid) &&
-          (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
-           pg_log_entry_t::LOST_REVERT) &&
-          (get_parent()->get_log().get_log().objects.find(
-            soid)->second->reverting_to ==
-           v));
-  }
-  
-  ObjectRecoveryInfo recovery_info;
-
-  if (soid.is_snap()) {
-    assert(!get_parent()->get_local_missing().is_missing(
-            soid.get_head()) ||
-          !get_parent()->get_local_missing().is_missing(
-            soid.get_snapdir()));
-    assert(headctx);
-    // check snapset
-    SnapSetContext *ssc = headctx->ssc;
-    assert(ssc);
-    dout(10) << " snapset " << ssc->snapset << dendl;
-    calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
-                      get_info().last_backfill,
-                      recovery_info.copy_subset,
-                      recovery_info.clone_subset);
-    // FIXME: this may overestimate if we are pulling multiple clones in parallel...
-    dout(10) << " pulling " << recovery_info << dendl;
-  } else {
-    // pulling head or unversioned object.
-    // always pull the whole thing.
-    recovery_info.copy_subset.insert(0, (uint64_t)-1);
-    recovery_info.size = ((uint64_t)-1);
-  }
-
-  h->pulls[fromshard].push_back(PullOp());
-  PullOp &op = h->pulls[fromshard].back();
-  op.soid = soid;
-
-  op.recovery_info = recovery_info;
-  op.recovery_info.soid = soid;
-  op.recovery_info.version = v;
-  op.recovery_progress.data_complete = false;
-  op.recovery_progress.omap_complete = false;
-  op.recovery_progress.data_recovered_to = 0;
-  op.recovery_progress.first = true;
-
-  assert(!pulling.count(soid));
-  pull_from_peer[fromshard].insert(soid);
-  PullInfo &pi = pulling[soid];
-  pi.head_ctx = headctx;
-  pi.recovery_info = op.recovery_info;
-  pi.recovery_progress = op.recovery_progress;
-}
-
 int ReplicatedPG::recover_missing(
   const hobject_t &soid, eversion_t v,
   int priority,
@@ -8966,694 +8236,6 @@ void ReplicatedPG::send_remove_op(
   osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
 }
 
-/*
- * intelligently push an object to a replica.  make use of existing
- * clones/heads and dup data ranges where possible.
- */
-void ReplicatedBackend::prep_push_to_replica(
-  ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
-  PushOp *pop)
-{
-  const object_info_t& oi = obc->obs.oi;
-  uint64_t size = obc->obs.oi.size;
-
-  dout(10) << __func__ << ": " << soid << " v" << oi.version
-          << " size " << size << " to osd." << peer << dendl;
-
-  map<hobject_t, interval_set<uint64_t> > clone_subsets;
-  interval_set<uint64_t> data_subset;
-
-  // are we doing a clone on the replica?
-  if (soid.snap && soid.snap < CEPH_NOSNAP) {  
-    hobject_t head = soid;
-    head.snap = CEPH_NOSNAP;
-
-    // try to base push off of clones that succeed/preceed poid
-    // we need the head (and current SnapSet) locally to do that.
-    if (get_parent()->get_local_missing().is_missing(head)) {
-      dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
-      return prep_push(obc, soid, peer, pop);
-    }
-    hobject_t snapdir = head;
-    snapdir.snap = CEPH_SNAPDIR;
-    if (get_parent()->get_local_missing().is_missing(snapdir)) {
-      dout(15) << "push_to_replica missing snapdir " << snapdir
-              << ", pushing raw clone" << dendl;
-      return prep_push(obc, soid, peer, pop);
-    }
-    
-    SnapSetContext *ssc = obc->ssc;
-    assert(ssc);
-    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
-    map<pg_shard_t, pg_missing_t>::const_iterator pm =
-      get_parent()->get_shard_missing().find(peer);
-    assert(pm != get_parent()->get_shard_missing().end());
-    map<pg_shard_t, pg_info_t>::const_iterator pi =
-      get_parent()->get_shard_info().find(peer);
-    assert(pi != get_parent()->get_shard_info().end());
-    calc_clone_subsets(ssc->snapset, soid,
-                      pm->second,
-                      pi->second.last_backfill,
-                      data_subset, clone_subsets);
-  } else if (soid.snap == CEPH_NOSNAP) {
-    // pushing head or unversioned object.
-    // base this on partially on replica's clones?
-    SnapSetContext *ssc = obc->ssc;
-    assert(ssc);
-    dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
-    calc_head_subsets(
-      obc,
-      ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
-      get_parent()->get_shard_info().find(peer)->second.last_backfill,
-      data_subset, clone_subsets);
-  }
-
-  prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
-}
-
-void ReplicatedBackend::prep_push(ObjectContextRef obc,
-                            const hobject_t& soid, pg_shard_t peer,
-                            PushOp *pop)
-{
-  interval_set<uint64_t> data_subset;
-  if (obc->obs.oi.size)
-    data_subset.insert(0, obc->obs.oi.size);
-  map<hobject_t, interval_set<uint64_t> > clone_subsets;
-
-  prep_push(obc, soid, peer,
-           obc->obs.oi.version, data_subset, clone_subsets,
-           pop);
-}
-
-void ReplicatedBackend::prep_push(
-  ObjectContextRef obc,
-  const hobject_t& soid, pg_shard_t peer,
-  eversion_t version,
-  interval_set<uint64_t> &data_subset,
-  map<hobject_t, interval_set<uint64_t> >& clone_subsets,
-  PushOp *pop)
-{
-  get_parent()->begin_peer_recover(peer, soid);
-  // take note.
-  PushInfo &pi = pushing[soid][peer];
-  pi.obc = obc;
-  pi.recovery_info.size = obc->obs.oi.size;
-  pi.recovery_info.copy_subset = data_subset;
-  pi.recovery_info.clone_subset = clone_subsets;
-  pi.recovery_info.soid = soid;
-  pi.recovery_info.oi = obc->obs.oi;
-  pi.recovery_info.version = version;
-  pi.recovery_progress.first = true;
-  pi.recovery_progress.data_recovered_to = 0;
-  pi.recovery_progress.data_complete = 0;
-  pi.recovery_progress.omap_complete = 0;
-
-  ObjectRecoveryProgress new_progress;
-  int r = build_push_op(pi.recovery_info,
-                       pi.recovery_progress,
-                       &new_progress,
-                       pop,
-                       &(pi.stat));
-  assert(r == 0);
-  pi.recovery_progress = new_progress;
-}
-
-int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
-                                       const ObjectRecoveryInfo &recovery_info,
-                                       ObjectRecoveryProgress progress)
-{
-  // send op
-  ceph_tid_t tid = get_parent()->get_tid();
-  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
-
-  dout(10) << "send_pull_op " << recovery_info.soid << " "
-          << recovery_info.version
-          << " first=" << progress.first
-          << " data " << recovery_info.copy_subset
-          << " from osd." << peer
-          << " tid " << tid << dendl;
-
-  MOSDSubOp *subop = new MOSDSubOp(
-    rid, parent->whoami_shard(),
-    get_info().pgid, recovery_info.soid,
-    CEPH_OSD_FLAG_ACK,
-    get_osdmap()->get_epoch(), tid,
-    recovery_info.version);
-  subop->set_priority(prio);
-  subop->ops = vector<OSDOp>(1);
-  subop->ops[0].op.op = CEPH_OSD_OP_PULL;
-  subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
-  subop->recovery_info = recovery_info;
-  subop->recovery_progress = progress;
-
-  get_parent()->send_message_osd_cluster(
-    peer.osd, subop, get_osdmap()->get_epoch());
-
-  get_parent()->get_logger()->inc(l_osd_pull);
-  return 0;
-}
-
-void ReplicatedBackend::submit_push_data(
-  ObjectRecoveryInfo &recovery_info,
-  bool first,
-  bool complete,
-  const interval_set<uint64_t> &intervals_included,
-  bufferlist data_included,
-  bufferlist omap_header,
-  map<string, bufferlist> &attrs,
-  map<string, bufferlist> &omap_entries,
-  ObjectStore::Transaction *t)
-{
-  coll_t target_coll;
-  if (first && complete) {
-    target_coll = coll;
-  } else {
-    dout(10) << __func__ << ": Creating oid "
-            << recovery_info.soid << " in the temp collection" << dendl;
-    add_temp_obj(recovery_info.soid);
-    target_coll = get_temp_coll(t);
-  }
-
-  if (first) {
-    get_parent()->on_local_recover_start(recovery_info.soid, t);
-    t->remove(get_temp_coll(t), recovery_info.soid);
-    t->touch(target_coll, recovery_info.soid);
-    t->truncate(target_coll, recovery_info.soid, recovery_info.size);
-    t->omap_setheader(target_coll, recovery_info.soid, omap_header);
-  }
-  uint64_t off = 0;
-  for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
-       p != intervals_included.end();
-       ++p) {
-    bufferlist bit;
-    bit.substr_of(data_included, off, p.get_len());
-    t->write(target_coll, recovery_info.soid,
-            p.get_start(), p.get_len(), bit);
-    off += p.get_len();
-  }
-
-  t->omap_setkeys(target_coll, recovery_info.soid,
-                 omap_entries);
-  t->setattrs(target_coll, recovery_info.soid,
-             attrs);
-
-  if (complete) {
-    if (!first) {
-      dout(10) << __func__ << ": Removing oid "
-              << recovery_info.soid << " from the temp collection" << dendl;
-      clear_temp_obj(recovery_info.soid);
-      t->collection_move(coll, target_coll, recovery_info.soid);
-    }
-
-    submit_push_complete(recovery_info, t);
-  }
-}
-
-void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
-                                            ObjectStore::Transaction *t)
-{
-  for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
-        recovery_info.clone_subset.begin();
-       p != recovery_info.clone_subset.end();
-       ++p) {
-    for (interval_set<uint64_t>::const_iterator q = p->second.begin();
-        q != p->second.end();
-        ++q) {
-      dout(15) << " clone_range " << p->first << " "
-              << q.get_start() << "~" << q.get_len() << dendl;
-      t->clone_range(coll, p->first, recovery_info.soid,
-                    q.get_start(), q.get_len(), q.get_start());
-    }
-  }
-}
-
-ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
-  const ObjectRecoveryInfo& recovery_info,
-  SnapSetContext *ssc)
-{
-  if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
-    return recovery_info;
-  ObjectRecoveryInfo new_info = recovery_info;
-  new_info.copy_subset.clear();
-  new_info.clone_subset.clear();
-  assert(ssc);
-  calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
-                    get_info().last_backfill,
-                    new_info.copy_subset, new_info.clone_subset);
-  return new_info;
-}
-
-bool ReplicatedBackend::handle_pull_response(
-  pg_shard_t from, PushOp &pop, PullOp *response,
-  list<hobject_t> *to_continue,
-  ObjectStore::Transaction *t
-  )
-{
-  interval_set<uint64_t> data_included = pop.data_included;
-  bufferlist data;
-  data.claim(pop.data);
-  dout(10) << "handle_pull_response "
-          << pop.recovery_info
-          << pop.after_progress
-          << " data.size() is " << data.length()
-          << " data_included: " << data_included
-          << dendl;
-  if (pop.version == eversion_t()) {
-    // replica doesn't have it!
-    _failed_push(from, pop.soid);
-    return false;
-  }
-
-  hobject_t &hoid = pop.soid;
-  assert((data_included.empty() && data.length() == 0) ||
-        (!data_included.empty() && data.length() > 0));
-
-  if (!pulling.count(hoid)) {
-    return false;
-  }
-
-  PullInfo &pi = pulling[hoid];
-  if (pi.recovery_info.size == (uint64_t(-1))) {
-    pi.recovery_info.size = pop.recovery_info.size;
-    pi.recovery_info.copy_subset.intersection_of(
-      pop.recovery_info.copy_subset);
-  }
-
-  bool first = pi.recovery_progress.first;
-  if (first) {
-    pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
-    pi.recovery_info.oi = pi.obc->obs.oi;
-    pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
-  }
-
-
-  interval_set<uint64_t> usable_intervals;
-  bufferlist usable_data;
-  trim_pushed_data(pi.recovery_info.copy_subset,
-                  data_included,
-                  data,
-                  &usable_intervals,
-                  &usable_data);
-  data_included = usable_intervals;
-  data.claim(usable_data);
-
-
-  pi.recovery_progress = pop.after_progress;
-
-  pi.stat.num_bytes_recovered += data.length();
-
-  dout(10) << "new recovery_info " << pi.recovery_info
-          << ", new progress " << pi.recovery_progress
-          << dendl;
-
-  bool complete = pi.is_complete();
-
-  submit_push_data(pi.recovery_info, first,
-                  complete,
-                  data_included, data,
-                  pop.omap_header,
-                  pop.attrset,
-                  pop.omap_entries,
-                  t);
-
-  pi.stat.num_keys_recovered += pop.omap_entries.size();
-
-  if (complete) {
-    to_continue->push_back(hoid);
-    pi.stat.num_objects_recovered++;
-    get_parent()->on_local_recover(
-      hoid, pi.stat, pi.recovery_info, pi.obc, t);
-    pull_from_peer[from].erase(hoid);
-    if (pull_from_peer[from].empty())
-      pull_from_peer.erase(from);
-    return false;
-  } else {
-    response->soid = pop.soid;
-    response->recovery_info = pi.recovery_info;
-    response->recovery_progress = pi.recovery_progress;
-    return true;
-  }
-}
-
-void ReplicatedBackend::handle_push(
-  pg_shard_t from, PushOp &pop, PushReplyOp *response,
-  ObjectStore::Transaction *t)
-{
-  dout(10) << "handle_push "
-          << pop.recovery_info
-          << pop.after_progress
-          << dendl;
-  bufferlist data;
-  data.claim(pop.data);
-  bool first = pop.before_progress.first;
-  bool complete = pop.after_progress.data_complete &&
-    pop.after_progress.omap_complete;
-
-  response->soid = pop.recovery_info.soid;
-  submit_push_data(pop.recovery_info,
-                  first,
-                  complete,
-                  pop.data_included,
-                  data,
-                  pop.omap_header,
-                  pop.attrset,
-                  pop.omap_entries,
-                  t);
-
-  if (complete)
-    get_parent()->on_local_recover(
-      pop.recovery_info.soid,
-      object_stat_sum_t(),
-      pop.recovery_info,
-      ObjectContextRef(), // ok, is replica
-      t);
-}
-
-void ReplicatedBackend::send_pushes(int prio, map<pg_shard_t, vector<PushOp> > &pushes)
-{
-  for (map<pg_shard_t, vector<PushOp> >::iterator i = pushes.begin();
-       i != pushes.end();
-       ++i) {
-    ConnectionRef con = get_parent()->get_con_osd_cluster(
-      i->first.osd,
-      get_osdmap()->get_epoch());
-    if (!con)
-      continue;
-    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
-      for (vector<PushOp>::iterator j = i->second.begin();
-          j != i->second.end();
-          ++j) {
-       dout(20) << __func__ << ": sending push (legacy) " << *j
-                << " to osd." << i->first << dendl;
-       send_push_op_legacy(prio, i->first, *j);
-      }
-    } else {
-      vector<PushOp>::iterator j = i->second.begin();
-      while (j != i->second.end()) {
-       uint64_t cost = 0;
-       uint64_t pushes = 0;
-       MOSDPGPush *msg = new MOSDPGPush();
-       msg->from = get_parent()->whoami_shard();
-       msg->pgid = get_parent()->primary_spg_t();
-       msg->map_epoch = get_osdmap()->get_epoch();
-       msg->set_priority(prio);
-       for (;
-            (j != i->second.end() &&
-             cost < cct->_conf->osd_max_push_cost &&
-             pushes < cct->_conf->osd_max_push_objects) ;
-            ++j) {
-         dout(20) << __func__ << ": sending push " << *j
-                  << " to osd." << i->first << dendl;
-         cost += j->cost(cct);
-         pushes += 1;
-         msg->pushes.push_back(*j);
-       }
-       msg->compute_cost(cct);
-       get_parent()->send_message_osd_cluster(msg, con);
-      }
-    }
-  }
-}
-
-void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &pulls)
-{
-  for (map<pg_shard_t, vector<PullOp> >::iterator i = pulls.begin();
-       i != pulls.end();
-       ++i) {
-    ConnectionRef con = get_parent()->get_con_osd_cluster(
-      i->first.osd,
-      get_osdmap()->get_epoch());
-    if (!con)
-      continue;
-    if (!(con->get_features() & CEPH_FEATURE_OSD_PACKED_RECOVERY)) {
-      for (vector<PullOp>::iterator j = i->second.begin();
-          j != i->second.end();
-          ++j) {
-       dout(20) << __func__ << ": sending pull (legacy) " << *j
-                << " to osd." << i->first << dendl;
-       send_pull_legacy(
-         prio,
-         i->first,
-         j->recovery_info,
-         j->recovery_progress);
-      }
-    } else {
-      dout(20) << __func__ << ": sending pulls " << i->second
-              << " to osd." << i->first << dendl;
-      MOSDPGPull *msg = new MOSDPGPull();
-      msg->from = parent->whoami_shard();
-      msg->set_priority(prio);
-      msg->pgid = get_parent()->primary_spg_t();
-      msg->map_epoch = get_osdmap()->get_epoch();
-      msg->pulls.swap(i->second);
-      msg->compute_cost(cct);
-      get_parent()->send_message_osd_cluster(msg, con);
-    }
-  }
-}
-
-int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
-                                    const ObjectRecoveryProgress &progress,
-                                    ObjectRecoveryProgress *out_progress,
-                                    PushOp *out_op,
-                                    object_stat_sum_t *stat)
-{
-  ObjectRecoveryProgress _new_progress;
-  if (!out_progress)
-    out_progress = &_new_progress;
-  ObjectRecoveryProgress &new_progress = *out_progress;
-  new_progress = progress;
-
-  dout(7) << "send_push_op " << recovery_info.soid
-         << " v " << recovery_info.version
-         << " size " << recovery_info.size
-         << " recovery_info: " << recovery_info
-          << dendl;
-
-  if (progress.first) {
-    store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
-    store->getattrs(coll, recovery_info.soid, out_op->attrset);
-
-    // Debug
-    bufferlist bv = out_op->attrset[OI_ATTR];
-    object_info_t oi(bv);
-
-    if (oi.version != recovery_info.version) {
-      get_parent()->clog_error() << get_info().pgid << " push "
-                                << recovery_info.soid << " v "
-                                << recovery_info.version
-                                << " failed because local copy is "
-                                << oi.version << "\n";
-      return -EINVAL;
-    }
-
-    new_progress.first = false;
-  }
-
-  uint64_t available = cct->_conf->osd_recovery_max_chunk;
-  if (!progress.omap_complete) {
-    ObjectMap::ObjectMapIterator iter =
-      store->get_omap_iterator(coll,
-                              recovery_info.soid);
-    for (iter->lower_bound(progress.omap_recovered_to);
-        iter->valid();
-        iter->next()) {
-      if (!out_op->omap_entries.empty() &&
-         available <= (iter->key().size() + iter->value().length()))
-       break;
-      out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
-
-      if ((iter->key().size() + iter->value().length()) <= available)
-       available -= (iter->key().size() + iter->value().length());
-      else
-       available = 0;
-    }
-    if (!iter->valid())
-      new_progress.omap_complete = true;
-    else
-      new_progress.omap_recovered_to = iter->key();
-  }
-
-  if (available > 0) {
-    if (!recovery_info.copy_subset.empty()) {
-      interval_set<uint64_t> copy_subset = recovery_info.copy_subset;
-      bufferlist bl;
-      int r = store->fiemap(coll, recovery_info.soid, 0,
-                            copy_subset.range_end(), bl);
-      if (r >= 0)  {
-        interval_set<uint64_t> fiemap_included;
-        map<uint64_t, uint64_t> m;
-        bufferlist::iterator iter = bl.begin();
-        ::decode(m, iter);
-        map<uint64_t, uint64_t>::iterator miter;
-        for (miter = m.begin(); miter != m.end(); ++miter) {
-          fiemap_included.insert(miter->first, miter->second);
-        }
-
-        copy_subset.intersection_of(fiemap_included);
-      }
-      out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
-                                    available);
-      if (out_op->data_included.empty()) // zero filled section, skip to end!
-        new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
-      else
-        new_progress.data_recovered_to = out_op->data_included.range_end();
-    }
-  } else {
-    out_op->data_included.clear();
-  }
-
-  for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
-       p != out_op->data_included.end();
-       ++p) {
-    bufferlist bit;
-    store->read(coll, recovery_info.soid,
-                    p.get_start(), p.get_len(), bit);
-    if (p.get_len() != bit.length()) {
-      dout(10) << " extent " << p.get_start() << "~" << p.get_len()
-              << " is actually " << p.get_start() << "~" << bit.length()
-              << dendl;
-      interval_set<uint64_t>::iterator save = p++;
-      if (bit.length() == 0)
-        out_op->data_included.erase(save);     //Remove this empty interval
-      else
-        save.set_len(bit.length());
-      // Remove any other intervals present
-      while (p != out_op->data_included.end()) {
-        interval_set<uint64_t>::iterator save = p++;
-        out_op->data_included.erase(save);
-      }
-      new_progress.data_complete = true;
-      out_op->data.claim_append(bit);
-      break;
-    }
-    out_op->data.claim_append(bit);
-  }
-
-  if (new_progress.is_complete(recovery_info)) {
-    new_progress.data_complete = true;
-    if (stat)
-      stat->num_objects_recovered++;
-  }
-
-  if (stat) {
-    stat->num_keys_recovered += out_op->omap_entries.size();
-    stat->num_bytes_recovered += out_op->data.length();
-  }
-
-  get_parent()->get_logger()->inc(l_osd_push);
-  get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
-  
-  // send
-  out_op->version = recovery_info.version;
-  out_op->soid = recovery_info.soid;
-  out_op->recovery_info = recovery_info;
-  out_op->after_progress = new_progress;
-  out_op->before_progress = progress;
-  return 0;
-}
-
-int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
-{
-  ceph_tid_t tid = get_parent()->get_tid();
-  osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
-  MOSDSubOp *subop = new MOSDSubOp(
-    rid, parent->whoami_shard(),
-    spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
-    0, get_osdmap()->get_epoch(),
-    tid, pop.recovery_info.version);
-  subop->ops = vector<OSDOp>(1);
-  subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-
-  subop->set_priority(prio);
-  subop->version = pop.version;
-  subop->ops[0].indata.claim(pop.data);
-  subop->data_included.swap(pop.data_included);
-  subop->omap_header.claim(pop.omap_header);
-  subop->omap_entries.swap(pop.omap_entries);
-  subop->attrset.swap(pop.attrset);
-  subop->recovery_info = pop.recovery_info;
-  subop->current_progress = pop.before_progress;
-  subop->recovery_progress = pop.after_progress;
-
-  get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
-  return 0;
-}
-
-void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
-{
-  op->recovery_info.version = eversion_t();
-  op->version = eversion_t();
-  op->soid = soid;
-}
-
-void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
-{
-  MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
-  const hobject_t& soid = reply->get_poid();
-  assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
-  dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
-  pg_shard_t peer = reply->from;
-
-  op->mark_started();
-  
-  PushReplyOp rop;
-  rop.soid = soid;
-  PushOp pop;
-  bool more = handle_push_reply(peer, rop, &pop);
-  if (more)
-    send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
-}
-
-bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
-{
-  const hobject_t &soid = op.soid;
-  if (pushing.count(soid) == 0) {
-    dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
-            << ", or anybody else"
-            << dendl;
-    return false;
-  } else if (pushing[soid].count(peer) == 0) {
-    dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
-            << dendl;
-    return false;
-  } else {
-    PushInfo *pi = &pushing[soid][peer];
-
-    if (!pi->recovery_progress.data_complete) {
-      dout(10) << " pushing more from, "
-              << pi->recovery_progress.data_recovered_to
-              << " of " << pi->recovery_info.copy_subset << dendl;
-      ObjectRecoveryProgress new_progress;
-      int r = build_push_op(
-       pi->recovery_info,
-       pi->recovery_progress, &new_progress, reply,
-       &(pi->stat));
-      assert(r == 0);
-      pi->recovery_progress = new_progress;
-      return true;
-    } else {
-      // done!
-      get_parent()->on_peer_recover(
-       peer, soid, pi->recovery_info,
-       pi->stat);
-      
-      pushing[soid].erase(peer);
-      pi = NULL;
-      
-      
-      if (pushing[soid].empty()) {
-       get_parent()->on_global_recover(soid);
-       pushing.erase(soid);
-      } else {
-       dout(10) << "pushed " << soid << ", still waiting for push ack from " 
-                << pushing[soid].size() << " others" << dendl;
-      }
-      return false;
-    }
-  }
-}
-
 void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
 {
   dout(10) << "finish_degraded_object " << oid << dendl;
@@ -9678,69 +8260,6 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
   }
 }
 
-/** op_pull
- * process request to pull an entire object.
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_pull(OpRequestRef op)
-{
-  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
-  assert(m->get_type() == MSG_OSD_SUBOP);
-
-  op->mark_started();
-
-  const hobject_t soid = m->poid;
-
-  dout(7) << "pull" << soid << " v " << m->version
-          << " from " << m->get_source()
-          << dendl;
-
-  assert(!is_primary());  // we should be a replica or stray.
-
-  PullOp pop;
-  pop.soid = soid;
-  pop.recovery_info = m->recovery_info;
-  pop.recovery_progress = m->recovery_progress;
-
-  PushOp reply;
-  handle_pull(m->from, pop, &reply);
-  send_push_op_legacy(
-    m->get_priority(),
-    m->from,
-    reply);
-
-  log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
-}
-
-void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
-{
-  const hobject_t &soid = op.soid;
-  struct stat st;
-  int r = store->stat(coll, soid, &st);
-  if (r != 0) {
-    get_parent()->clog_error() << get_info().pgid << " "
-                              << peer << " tried to pull " << soid
-                              << " but got " << cpp_strerror(-r) << "\n";
-    prep_push_op_blank(soid, reply);
-  } else {
-    ObjectRecoveryInfo &recovery_info = op.recovery_info;
-    ObjectRecoveryProgress &progress = op.recovery_progress;
-    if (progress.first && recovery_info.size == ((uint64_t)-1)) {
-      // Adjust size and copy_subset
-      recovery_info.size = st.st_size;
-      recovery_info.copy_subset.clear();
-      if (st.st_size)
-        recovery_info.copy_subset.insert(0, st.st_size);
-      assert(recovery_info.clone_subset.empty());
-    }
-
-    r = build_push_op(recovery_info, progress, 0, reply);
-    if (r < 0)
-      prep_push_op_blank(soid, reply);
-  }
-}
-
-
 void ReplicatedPG::_committed_pushed_object(
   epoch_t epoch, eversion_t last_complete)
 {
@@ -9827,113 +8346,6 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
 }
 
 
-/**
- * trim received data to remove what we don't want
- *
- * @param copy_subset intervals we want
- * @param data_included intervals we got
- * @param data_recieved data we got
- * @param intervals_usable intervals we want to keep
- * @param data_usable matching data we want to keep
- */
-void ReplicatedBackend::trim_pushed_data(
-  const interval_set<uint64_t> &copy_subset,
-  const interval_set<uint64_t> &intervals_received,
-  bufferlist data_received,
-  interval_set<uint64_t> *intervals_usable,
-  bufferlist *data_usable)
-{
-  if (intervals_received.subset_of(copy_subset)) {
-    *intervals_usable = intervals_received;
-    *data_usable = data_received;
-    return;
-  }
-
-  intervals_usable->intersection_of(copy_subset,
-                                   intervals_received);
-
-  uint64_t off = 0;
-  for (interval_set<uint64_t>::const_iterator p = intervals_received.begin();
-       p != intervals_received.end();
-       ++p) {
-    interval_set<uint64_t> x;
-    x.insert(p.get_start(), p.get_len());
-    x.intersection_of(copy_subset);
-    for (interval_set<uint64_t>::const_iterator q = x.begin();
-        q != x.end();
-        ++q) {
-      bufferlist sub;
-      uint64_t data_off = off + (q.get_start() - p.get_start());
-      sub.substr_of(data_received, data_off, q.get_len());
-      data_usable->claim_append(sub);
-    }
-    off += p.get_len();
-  }
-}
-
-/** op_push
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_push(OpRequestRef op)
-{
-  op->mark_started();
-  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
-
-  PushOp pop;
-  pop.soid = m->recovery_info.soid;
-  pop.version = m->version;
-  m->claim_data(pop.data);
-  pop.data_included.swap(m->data_included);
-  pop.omap_header.swap(m->omap_header);
-  pop.omap_entries.swap(m->omap_entries);
-  pop.attrset.swap(m->attrset);
-  pop.recovery_info = m->recovery_info;
-  pop.before_progress = m->current_progress;
-  pop.after_progress = m->recovery_progress;
-  ObjectStore::Transaction *t = new ObjectStore::Transaction;
-
-  if (is_primary()) {
-    PullOp resp;
-    RPGHandle *h = _open_recovery_op();
-    list<hobject_t> to_continue;
-    bool more = handle_pull_response(
-      m->from, pop, &resp,
-      &to_continue, t);
-    if (more) {
-      send_pull_legacy(
-       m->get_priority(),
-       m->from,
-       resp.recovery_info,
-       resp.recovery_progress);
-    } else {
-      C_ReplicatedBackend_OnPullComplete *c =
-       new C_ReplicatedBackend_OnPullComplete(
-         this,
-         op->get_req()->get_priority());
-      c->to_continue.swap(to_continue);
-      t->register_on_complete(
-       new PG_RecoveryQueueAsync(
-         get_parent(),
-         get_parent()->bless_gencontext(c)));
-    }
-    run_recovery_op(h, op->get_req()->get_priority());
-  } else {
-    PushReplyOp resp;
-    MOSDSubOpReply *reply = new MOSDSubOpReply(
-      m, parent->whoami_shard(), 0,
-      get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-    reply->set_priority(m->get_priority());
-    assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
-    handle_push(m->from, pop, &resp, t);
-    t->register_on_complete(new PG_SendMessageOnConn(
-                             get_parent(), reply, m->get_connection()));
-  }
-  t->register_on_applied(
-    new ObjectStore::C_DeleteTransaction(t));
-  get_parent()->queue_transaction(t);
-  return;
-}
-
 void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
 {
   assert(recovering.count(soid));
@@ -9945,15 +8357,6 @@ void ReplicatedPG::failed_push(pg_shard_t from, const hobject_t &soid)
   finish_recovery_op(soid);  // close out this attempt,
 }
 
-void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
-{
-  get_parent()->failed_push(from, soid);
-  pull_from_peer[from].erase(soid);
-  if (pull_from_peer[from].empty())
-    pull_from_peer.erase(from);
-  pulling.erase(soid);
-}
-
 void ReplicatedPG::sub_op_remove(OpRequestRef op)
 {
   MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
@@ -9968,7 +8371,6 @@ void ReplicatedPG::sub_op_remove(OpRequestRef op)
   assert(r == 0);
 }
 
-
 eversion_t ReplicatedPG::pick_newest_available(const hobject_t& oid)
 {
   eversion_t v;
@@ -10966,34 +9368,6 @@ int ReplicatedPG::prep_object_replica_pushes(
   return 1;
 }
 
-int ReplicatedBackend::start_pushes(
-  const hobject_t &soid,
-  ObjectContextRef obc,
-  RPGHandle *h)
-{
-  int pushes = 0;
-  // who needs it?  
-  assert(get_parent()->get_actingbackfill_shards().size() > 0);
-  for (set<pg_shard_t>::iterator i =
-        get_parent()->get_actingbackfill_shards().begin();
-       i != get_parent()->get_actingbackfill_shards().end();
-       ++i) {
-    if (*i == get_parent()->whoami_shard()) continue;
-    pg_shard_t peer = *i;
-    map<pg_shard_t, pg_missing_t>::const_iterator j =
-      get_parent()->get_shard_missing().find(peer);
-    assert(j != get_parent()->get_shard_missing().end());
-    if (j->second.is_missing(soid)) {
-      ++pushes;
-      h->pushes[peer].push_back(PushOp());
-      prep_push_to_replica(obc, soid, peer,
-                          &(h->pushes[peer].back())
-       );
-    }
-  }
-  return pushes;
-}
-
 int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
 {
   dout(10) << __func__ << "(" << max << ")" << dendl;