return pgb->get_parent()->gen_dbg_prefix(*_dout);
 }
 
+static ostream& _prefix(std::ostream *_dout, ECBackend::RecoveryBackend *pgb) {
+  return pgb->get_parent()->gen_dbg_prefix(*_dout);
+}
+
 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
-  list<ECBackend::RecoveryOp> ops;
+  list<ECBackend::RecoveryBackend::RecoveryOp> ops;
 };
 
 static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
   return lhs << "]";
 }
 
-ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
+ostream &operator<<(ostream &lhs, const ECBackend::RecoveryBackend::RecoveryOp &rhs)
 {
   return lhs << "RecoveryOp("
             << "hoid=" << rhs.hoid
             << " recovery_info=" << rhs.recovery_info
             << " recovery_progress=" << rhs.recovery_progress
             << " obc refcount=" << rhs.obc.use_count()
-            << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
+            << " state=" << ECBackend::RecoveryBackend::RecoveryOp::tostr(rhs.state)
             << " waiting_on_pushes=" << rhs.waiting_on_pushes
             << " extent_requested=" << rhs.extent_requested
             << ")";
 }
 
-void ECBackend::RecoveryOp::dump(Formatter *f) const
+void ECBackend::RecoveryBackend::RecoveryOp::dump(Formatter *f) const
 {
   f->dump_stream("hoid") << hoid;
   f->dump_stream("v") << v;
   : PGBackend(cct, pg, store, coll, ch),
     read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener()),
     rmw_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener(), *this),
-    unstable_hashinfo_registry(cct, ec_impl),
+    recovery_backend(cct, coll, ec_impl, this->sinfo, read_pipeline, unstable_hashinfo_registry, get_parent()->get_eclistener()),
     ec_impl(ec_impl),
-    sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
+    sinfo(ec_impl->get_data_chunk_count(), stripe_width),
+    unstable_hashinfo_registry(cct, ec_impl) {
   ceph_assert((ec_impl->get_data_chunk_count() *
          ec_impl->get_chunk_size(stripe_width)) == stripe_width);
 }
 
 PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
+{
+  return recovery_backend.open_recovery_op();
+}
+
+PGBackend::RecoveryHandle *ECBackend::RecoveryBackend::open_recovery_op()
 {
   return new ECRecoveryHandle;
 }
 
-void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
+void ECBackend::RecoveryBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
 {
   dout(10) << __func__ << ": Read error " << hoid << " r="
           << res.r << " errors=" << res.errors << dendl;
   const PushOp &op,
   RecoveryMessages *m,
   bool is_repair)
+{
+  if (get_parent()->pg_is_remote_backfilling()) {
+    get_parent()->pg_add_local_num_bytes(op.data.length());
+    get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
+    dout(10) << __func__ << " " << op.soid
+             << " add new actual data by " << op.data.length()
+             << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
+             << dendl;
+  }
+
+  recovery_backend.handle_recovery_push(op, m, is_repair);
+
+  if (op.after_progress.data_complete) {
+    if ((get_parent()->pgb_is_primary())) {
+      if (get_parent()->pg_is_repair() || is_repair)
+        get_parent()->inc_osd_stat_repaired();
+    } else {
+      // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
+      if (is_repair)
+        get_parent()->inc_osd_stat_repaired();
+      if (get_parent()->pg_is_remote_backfilling()) {
+        struct stat st;
+        int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
+                            get_parent()->whoami_shard().shard), &st);
+        if (r == 0) {
+          get_parent()->pg_sub_local_num_bytes(st.st_size);
+         // XXX: This can be way overestimated for small objects
+         get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
+         dout(10) << __func__ << " " << op.soid
+                  << " sub actual data by " << st.st_size
+                  << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
+                  << dendl;
+        }
+      }
+    }
+  }
+}
+
+void ECBackend::RecoveryBackend::handle_recovery_push(
+  const PushOp &op,
+  RecoveryMessages *m,
+  bool is_repair)
 {
   if (get_parent()->check_failsafe_full()) {
     dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
     ceph_assert(op.data.length() == 0);
   }
 
-  if (get_parent()->pg_is_remote_backfilling()) {
-    get_parent()->pg_add_local_num_bytes(op.data.length());
-    get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
-    dout(10) << __func__ << " " << op.soid
-             << " add new actual data by " << op.data.length()
-             << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
-             << dendl;
-  }
-
   if (op.before_progress.first) {
     ceph_assert(op.attrset.count(string("_")));
     m->t.setattrs(
        ObjectContextRef(),
        false,
        &m->t);
-      if (get_parent()->pg_is_remote_backfilling()) {
-        struct stat st;
-        int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
-                            get_parent()->whoami_shard().shard), &st);
-        if (r == 0) {
-          get_parent()->pg_sub_local_num_bytes(st.st_size);
-         // XXX: This can be way overestimated for small objects
-         get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
-         dout(10) << __func__ << " " << op.soid
-                  << " sub actual data by " << st.st_size
-                  << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
-                  << dendl;
-        }
-      }
     }
   }
   m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
   m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
 }
 
-void ECBackend::handle_recovery_push_reply(
+void ECBackend::RecoveryBackend::handle_recovery_push_reply(
   const PushReplyOp &op,
   pg_shard_t from,
   RecoveryMessages *m)
   continue_recovery_op(rop, m);
 }
 
-void ECBackend::handle_recovery_read_complete(
+void ECBackend::RecoveryBackend::handle_recovery_read_complete(
   const hobject_t &hoid,
   boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
   std::optional<map<string, bufferlist, less<>> > attrs,
           << ")"
           << dendl;
   ceph_assert(recovery_ops.count(hoid));
-  RecoveryOp &op = recovery_ops[hoid];
+  RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
   ceph_assert(op.returned_data.empty());
   map<int, bufferlist*> target;
   for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
 };
 
 struct RecoveryReadCompleter : ECCommon::ReadCompleter {
-  RecoveryReadCompleter(ECBackend& backend)
+  RecoveryReadCompleter(ECBackend::RecoveryBackend& backend)
     : backend(backend) {}
 
   void finish_single_request(
     backend.dispatch_recovery_messages(rm, priority);
   }
 
-  ECBackend& backend;
+  ECBackend::RecoveryBackend& backend;
   RecoveryMessages rm;
 };
 
-void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
+void ECBackend::RecoveryBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
 {
   for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
        i != m.pushes.end();
        m.pushes.erase(i++)) {
     MOSDPGPush *msg = new MOSDPGPush();
     msg->set_priority(priority);
-    msg->map_epoch = get_osdmap_epoch();
+    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
     msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
     msg->from = get_parent()->whoami_shard();
     msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
     msg->pushes.swap(i->second);
     msg->compute_cost(cct);
     msg->is_repair = get_parent()->pg_is_repair();
-    get_parent()->send_message(
-      i->first.osd,
-      msg);
+    std::vector wrapped_msg {
+      std::make_pair(i->first.osd, static_cast<Message*>(msg))
+    };
+    get_parent()->send_message_osd_cluster(wrapped_msg, msg->map_epoch);
   }
   map<int, MOSDPGPushReply*> replies;
   for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
        m.push_replies.erase(i++)) {
     MOSDPGPushReply *msg = new MOSDPGPushReply();
     msg->set_priority(priority);
-    msg->map_epoch = get_osdmap_epoch();
+    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
     msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
     msg->from = get_parent()->whoami_shard();
     msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
 void ECBackend::RecoveryBackend::continue_recovery_op(
   RecoveryBackend::RecoveryOp &op,
   RecoveryMessages *m)
-{
-}
-
-void ECBackend::continue_recovery_op(
-  RecoveryOp &op,
-  RecoveryMessages *m)
 {
   dout(10) << __func__ << ": continuing " << op << dendl;
+  using RecoveryOp = RecoveryBackend::RecoveryOp;
   while (1) {
     switch (op.state) {
     case RecoveryOp::IDLE: {
           ceph_assert(recovery_ops.count(op.hoid));
           eversion_t v = recovery_ops[op.hoid].v;
           recovery_ops.erase(op.hoid);
+         // TODO: not in crimson yet
           get_parent()->on_failed_pull({get_parent()->whoami_shard()},
                                        op.hoid, v);
           return;
        ceph_assert(!op.recovery_progress.first);
        dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
                 << dendl;
+       // in crimson
        get_parent()->cancel_pull(op.hoid);
        recovery_ops.erase(op.hoid);
        return;
        pop.before_progress = op.recovery_progress;
        pop.after_progress = after_progress;
        if (*mi != get_parent()->primary_shard())
+         // already in crimson -- junction point with PeeringState
          get_parent()->begin_peer_recover(
            *mi,
            op.hoid);
          stat.num_bytes_recovered = op.recovery_info.size;
          stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
          stat.num_objects_recovered = 1;
+         // TODO: not in crimson yet
          if (get_parent()->pg_is_repair())
            stat.num_objects_repaired = 1;
+         // pg_recovery.cc in crimson has it
          get_parent()->on_global_recover(op.hoid, stat, false);
          dout(10) << __func__ << ": WRITING return " << op << dendl;
          recovery_ops.erase(op.hoid);
   RecoveryHandle *_h,
   int priority)
 {
-  ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+  ceph_assert(_h);
+  ECRecoveryHandle &h = static_cast<ECRecoveryHandle&>(*_h);
+  recovery_backend.run_recovery_op(h, priority);
+  send_recovery_deletes(priority, h.deletes);
+  delete _h;
+}
+
+void ECBackend::RecoveryBackend::run_recovery_op(
+  ECRecoveryHandle &h,
+  int priority)
+{
   RecoveryMessages m;
-  for (list<RecoveryOp>::iterator i = h->ops.begin();
-       i != h->ops.end();
+  for (list<RecoveryOp>::iterator i = h.ops.begin();
+       i != h.ops.end();
        ++i) {
     dout(10) << __func__ << ": starting " << *i << dendl;
     ceph_assert(!recovery_ops.count(i->hoid));
     RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
     continue_recovery_op(op, &m);
   }
-
   dispatch_recovery_messages(m, priority);
-  send_recovery_deletes(priority, h->deletes);
-  delete _h;
 }
 
 int ECBackend::recover_object(
   ObjectContextRef head,
   ObjectContextRef obc,
   RecoveryHandle *_h)
+{
+  return recovery_backend.recover_object(hoid, v, head, obc, _h);
+}
+
+int ECBackend::RecoveryBackend::recover_object(
+  const hobject_t &hoid,
+  eversion_t v,
+  ObjectContextRef head,
+  ObjectContextRef obc,
+  RecoveryHandle *_h)
 {
   ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
   h->ops.push_back(RecoveryOp());
         ++i) {
       handle_recovery_push(*i, &rm, op->is_repair);
     }
-    dispatch_recovery_messages(rm, priority);
+    recovery_backend.dispatch_recovery_messages(rm, priority);
     return true;
   }
   case MSG_OSD_PG_PUSH_REPLY: {
     for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
         i != op->replies.end();
         ++i) {
-      handle_recovery_push_reply(*i, op->from, &rm);
+      recovery_backend.handle_recovery_push_reply(*i, op->from, &rm);
     }
-    dispatch_recovery_messages(rm, priority);
+    recovery_backend.dispatch_recovery_messages(rm, priority);
     return true;
   }
   default:
 
 void ECBackend::clear_recovery_state()
 {
-  recovery_ops.clear();
+  recovery_backend.recovery_ops.clear();
 }
 
 void ECBackend::dump_recovery_info(Formatter *f) const
 {
   f->open_array_section("recovery_ops");
-  for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
-       i != recovery_ops.end();
+  for (map<hobject_t, RecoveryBackend::RecoveryOp>::const_iterator i = recovery_backend.recovery_ops.begin();
+       i != recovery_backend.recovery_ops.end();
        ++i) {
     f->open_object_section("op");
     i->second.dump(f);
 
 
   void kick_reads();
 
-  uint64_t get_recovery_chunk_size() const {
-    return round_up_to(cct->_conf->osd_recovery_max_chunk,
-                       sinfo.get_stripe_width());
-  }
-
   /**
    * Recovery
    *
    * Transaction, and reads in a RecoveryMessages object which is passed
    * among the recovery methods.
    */
+public:
+  struct RecoveryBackend {
+    CephContext* cct;
+    const coll_t &coll;
+    ceph::ErasureCodeInterfaceRef ec_impl;
+    const ECUtil::stripe_info_t& sinfo;
+    ReadPipeline& read_pipeline;
+    UnstableHashInfoRegistry& unstable_hashinfo_registry;
+    // TODO: lay an interface down here
+    ECListener* parent;
+
+    ECListener *get_parent() const { return parent; }
+    const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+    epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+    const pg_info_t &get_info() { return get_parent()->get_info(); }
+    void add_temp_obj(const hobject_t &oid) { get_parent()->add_temp_obj(oid); }
+    void clear_temp_obj(const hobject_t &oid) { get_parent()->clear_temp_obj(oid); }
+
+    RecoveryBackend(CephContext* cct,
+                   const coll_t &coll,
+                ceph::ErasureCodeInterfaceRef ec_impl,
+                const ECUtil::stripe_info_t& sinfo,
+               ReadPipeline& read_pipeline,
+               UnstableHashInfoRegistry& unstable_hashinfo_registry,
+                ECListener* parent)
+      : cct(cct),
+        coll(coll),
+        ec_impl(std::move(ec_impl)),
+        sinfo(sinfo),
+       read_pipeline(read_pipeline),
+       unstable_hashinfo_registry(unstable_hashinfo_registry),
+        parent(parent) {
+    }
+  // <<<----
   struct RecoveryOp {
     hobject_t hoid;
     eversion_t v;
 
     static const char* tostr(state_t state) {
       switch (state) {
-      case ECBackend::RecoveryOp::IDLE:
+      case RecoveryOp::IDLE:
        return "IDLE";
-      case ECBackend::RecoveryOp::READING:
+      case RecoveryOp::READING:
        return "READING";
-      case ECBackend::RecoveryOp::WRITING:
+      case RecoveryOp::WRITING:
        return "WRITING";
-      case ECBackend::RecoveryOp::COMPLETE:
+      case RecoveryOp::COMPLETE:
        return "COMPLETE";
       default:
        ceph_abort();
   friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
   std::map<hobject_t, RecoveryOp> recovery_ops;
 
+  uint64_t get_recovery_chunk_size() const {
+    return round_up_to(cct->_conf->osd_recovery_max_chunk,
+                       sinfo.get_stripe_width());
+  }
+
+  void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+
+  RecoveryHandle *open_recovery_op();
+  void run_recovery_op(
+    struct ECRecoveryHandle &h,
+    int priority);
+  int recover_object(
+    const hobject_t &hoid,
+    eversion_t v,
+    ObjectContextRef head,
+    ObjectContextRef obc,
+    RecoveryHandle *h);
   void continue_recovery_op(
-    RecoveryOp &op,
+    RecoveryBackend::RecoveryOp &op,
     RecoveryMessages *m);
-  friend struct RecoveryMessages;
-  void dispatch_recovery_messages(RecoveryMessages &m, int priority);
-  friend struct OnRecoveryReadComplete;
-  friend struct RecoveryReadCompleter;
   void handle_recovery_read_complete(
     const hobject_t &hoid,
     boost::tuple<uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > &to_read,
     const PushReplyOp &op,
     pg_shard_t from,
     RecoveryMessages *m);
+  friend struct RecoveryMessages;
+  int get_ec_data_chunk_count() const {
+    return ec_impl->get_data_chunk_count();
+  }
+  void _failed_push(const hobject_t &hoid, ECCommon::read_result_t &res);
+  };
+  friend ostream &operator<<(ostream &lhs, const RecoveryBackend::RecoveryOp &rhs);
+  friend struct RecoveryMessages;
+  friend struct OnRecoveryReadComplete;
+  friend struct RecoveryReadCompleter;
+
+  void handle_recovery_push(
+    const PushOp &op,
+    RecoveryMessages *m,
+    bool is_repair);
 
 public:
   struct ReadPipeline read_pipeline;
   struct RMWPipeline rmw_pipeline;
+  struct RecoveryBackend recovery_backend;
 
   ceph::ErasureCodeInterfaceRef ec_impl;
 
   uint64_t be_get_ondisk_size(uint64_t logical_size) const final {
     return sinfo.logical_to_next_chunk_offset(logical_size);
   }
-  void _failed_push(const hobject_t &hoid, ECBackend::read_result_t &res);
 };
 ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::pipeline_state_t &rhs);