]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge branch 'master' into wip-optracker
authorGreg Farnum <greg@inktank.com>
Thu, 3 Oct 2013 22:50:40 +0000 (15:50 -0700)
committerGreg Farnum <greg@inktank.com>
Thu, 3 Oct 2013 22:50:40 +0000 (15:50 -0700)
Conflicts:
src/osd/OpRequest.h
src/osd/PG.cc
src/osd/ReplicatedPG.cc

Signed-off-by: Greg Farnum <greg@inktank.com>
1  2 
src/common/TrackedOp.h
src/osd/Makefile.am
src/osd/OSD.cc
src/osd/PG.cc
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h
src/test/encoding/types.h

index 9e00c14b1784a33f786e160fc0b9451d3d9ac7f3,753331df7f381ca90f6d30a2834f57b85651c7e7..44e03905759a4da1390c627cca746b457ba3159e
  #include "msg/Message.h"
  #include <tr1/memory>
  
 +class TrackedOp;
 +typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
 +
 +class OpTracker;
 +class OpHistory {
 +  set<pair<utime_t, TrackedOpRef> > arrived;
 +  set<pair<double, TrackedOpRef> > duration;
 +  void cleanup(utime_t now);
 +  bool shutdown;
 +  OpTracker *tracker;
 +  uint32_t history_size;
 +  uint32_t history_duration;
 +
 +public:
 +  OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_),
 +  history_size(0), history_duration(0) {}
 +  ~OpHistory() {
 +    assert(arrived.empty());
 +    assert(duration.empty());
 +  }
 +  void insert(utime_t now, TrackedOpRef op);
 +  void dump_ops(utime_t now, Formatter *f);
 +  void on_shutdown();
 +  void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
 +    history_size = new_size;
 +    history_duration = new_duration;
 +  }
 +};
 +
 +class OpTracker {
 +  class RemoveOnDelete {
 +    OpTracker *tracker;
 +  public:
 +    RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
 +    void operator()(TrackedOp *op);
 +  };
 +  friend class RemoveOnDelete;
 +  friend class OpHistory;
 +  uint64_t seq;
 +  Mutex ops_in_flight_lock;
 +  xlist<TrackedOp *> ops_in_flight;
 +  OpHistory history;
 +  float complaint_time;
 +  int log_threshold;
 +
 +public:
 +  CephContext *cct;
 +  OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"),
 +      history(this), complaint_time(0), log_threshold(0), cct(cct_) {}
 +  void set_complaint_and_threshold(float time, int threshold) {
 +    complaint_time = time;
 +    log_threshold = threshold;
 +  }
 +  void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
 +    history.set_size_and_duration(new_size, new_duration);
 +  }
 +  void dump_ops_in_flight(Formatter *f);
 +  void dump_historic_ops(Formatter *f);
 +  void register_inflight_op(xlist<TrackedOp*>::item *i);
 +  void unregister_inflight_op(TrackedOp *i);
 +
 +  void get_age_ms_histogram(pow2_hist_t *h);
 +
 +  /**
 +   * Look for Ops which are too old, and insert warning
 +   * strings for each Op that is too old.
 +   *
 +   * @param warning_strings A vector<string> reference which is filled
 +   * with a warning string for each old Op.
 +   * @return True if there are any Ops to warn on, false otherwise.
 +   */
 +  bool check_ops_in_flight(std::vector<string> &warning_strings);
 +  void mark_event(TrackedOp *op, const string &evt);
 +  void _mark_event(TrackedOp *op, const string &evt, utime_t now);
 +
 +  void on_shutdown() {
 +    Mutex::Locker l(ops_in_flight_lock);
 +    history.on_shutdown();
 +  }
 +  ~OpTracker() {
 +    assert(ops_in_flight.empty());
 +  }
 +
 +  template <typename T>
 +  typename T::Ref create_request(Message *ref)
 +  {
 +    typename T::Ref retval(new T(ref, this),
 +                         RemoveOnDelete(this));
 +    
 +    _mark_event(retval.get(), "header_read", ref->get_recv_stamp());
 +    _mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
 +    _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
 +    _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
 +    
 +    retval->init_from_message();
 +    
 +    return retval;
 +  }
 +};
 +
  class TrackedOp {
-   uint8_t warn_interval_multiplier; // limits output of a given op warning
 +private:
 +  friend class OpHistory;
 +  friend class OpTracker;
 +  xlist<TrackedOp*>::item xitem;
 +protected:
 +  Message *request; /// the logical request we are tracking
 +  OpTracker *tracker; /// the tracker we are associated with
 +
 +  list<pair<utime_t, string> > events; /// list of events and their times
 +  Mutex lock; /// to protect the events list
 +  string current; /// the current state the event is in
 +  uint64_t seq; /// a unique value set by the OpTracker
 +
++  uint32_t warn_interval_multiplier; // limits output of a given op warning
 +
 +  TrackedOp(Message *req, OpTracker *_tracker) :
 +    xitem(this),
 +    request(req),
 +    tracker(_tracker),
 +    lock("TrackedOp::lock"),
 +    seq(0),
 +    warn_interval_multiplier(1)
 +  {
 +    tracker->register_inflight_op(&xitem);
 +  }
 +
 +  virtual void init_from_message() {}
 +  /// output any type-specific data you want to get when dump() is called
 +  virtual void _dump(utime_t now, Formatter *f) const {}
 +  /// if you want something else to happen when events are marked, implement
 +  virtual void _event_marked() {}
 +
  public:
 -  virtual void mark_event(const string &event) = 0;
 -  virtual ~TrackedOp() {}
 +  virtual ~TrackedOp() { assert(request); request->put(); }
 +
 +  utime_t get_arrived() const {
 +    return request->get_recv_stamp();
 +  }
 +  // This function maybe needs some work; assumes last event is completion time
 +  double get_duration() const {
 +    return events.size() ?
 +      (events.rbegin()->first - get_arrived()) :
 +      0.0;
 +  }
 +  Message *get_req() const { return request; }
 +
 +  void mark_event(const string &event);
 +  virtual const char *state_string() const {
 +    return events.rbegin()->second.c_str();
 +  }
 +  void dump(utime_t now, Formatter *f) const;
  };
 -typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
  
  #endif
Simple merge
diff --cc src/osd/OSD.cc
Simple merge
diff --cc src/osd/PG.cc
Simple merge
index 0000000000000000000000000000000000000000,9868e7af2c8771af7d9b090351f0495f8e770216..b39207e14f85c427cef9b12d8781b71ba4208482
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,196 +1,196 @@@
 -  switch (op->request->get_type()) {
+ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+ // vim: ts=8 sw=2 smarttab
+ /*
+  * Ceph - scalable distributed file system
+  *
+  * Copyright (C) 2013 Inktank Storage, Inc.
+  *
+  * This is free software; you can redistribute it and/or
+  * modify it under the terms of the GNU Lesser General Public
+  * License version 2.1, as published by the Free Software
+  * Foundation.  See file COPYING.
+  *
+  */
+ #include "ReplicatedBackend.h"
+ #include "messages/MOSDSubOp.h"
+ #include "messages/MOSDSubOpReply.h"
+ #include "messages/MOSDPGPush.h"
+ #include "messages/MOSDPGPull.h"
+ #include "messages/MOSDPGPushReply.h"
+ #define dout_subsys ceph_subsys_osd
+ #define DOUT_PREFIX_ARGS this
+ #undef dout_prefix
+ #define dout_prefix _prefix(_dout, this)
+ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+   return *_dout << pgb->get_parent()->gen_dbg_prefix();
+ }
+ ReplicatedBackend::ReplicatedBackend(
+   PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+   PGBackend(pg), temp_created(false),
+   temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
+   coll(coll), osd(osd), cct(osd->cct) {}
+ void ReplicatedBackend::run_recovery_op(
+   PGBackend::RecoveryHandle *_h,
+   int priority)
+ {
+   RPGHandle *h = static_cast<RPGHandle *>(_h);
+   send_pushes(priority, h->pushes);
+   send_pulls(priority, h->pulls);
+   delete h;
+ }
+ void ReplicatedBackend::recover_object(
+   const hobject_t &hoid,
+   ObjectContextRef head,
+   ObjectContextRef obc,
+   RecoveryHandle *_h
+   )
+ {
+   dout(10) << __func__ << ": " << hoid << dendl;
+   RPGHandle *h = static_cast<RPGHandle *>(_h);
+   if (get_parent()->get_local_missing().is_missing(hoid)) {
+     assert(!obc);
+     // pull
+     prepare_pull(
+       hoid,
+       head,
+       h);
+     return;
+   } else {
+     assert(obc);
+     int started = start_pushes(
+       hoid,
+       obc,
+       h);
+     assert(started > 0);
+   }
+ }
+ void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
+ {
+   for(map<int, set<hobject_t> >::iterator i = pull_from_peer.begin();
+       i != pull_from_peer.end();
+       ) {
+     if (osdmap->is_down(i->first)) {
+       dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
+              << ", osdmap has it marked down" << dendl;
+       for (set<hobject_t>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       assert(pulling.count(*j) == 1);
+       get_parent()->cancel_pull(*j);
+       pulling.erase(*j);
+       }
+       pull_from_peer.erase(i++);
+     } else {
+       ++i;
+     }
+   }
+ }
+ bool ReplicatedBackend::handle_message(
+   OpRequestRef op
+   )
+ {
+   dout(10) << __func__ << ": " << op << dendl;
 -    MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
++  switch (op->get_req()->get_type()) {
+   case MSG_OSD_PG_PUSH:
+     // TODOXXX: needs to be active possibly
+     do_push(op);
+     return true;
+   case MSG_OSD_PG_PULL:
+     do_pull(op);
+     return true;
+   case MSG_OSD_PG_PUSH_REPLY:
+     do_push_reply(op);
+     return true;
+   case MSG_OSD_SUBOP: {
 -    MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
++    MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+     if (m->ops.size() >= 1) {
+       OSDOp *first = &m->ops[0];
+       switch (first->op.op) {
+       case CEPH_OSD_OP_PULL:
+       sub_op_pull(op);
+       return true;
+       case CEPH_OSD_OP_PUSH:
+         // TODOXXX: needs to be active possibly
+       sub_op_push(op);
+       return true;
+       default:
+       break;
+       }
+     }
+     break;
+   }
+   case MSG_OSD_SUBOPREPLY: {
++    MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
+     if (r->ops.size() >= 1) {
+       OSDOp &first = r->ops[0];
+       switch (first.op.op) {
+       case CEPH_OSD_OP_PUSH:
+       // continue peer recovery
+       sub_op_push_reply(op);
+       return true;
+       }
+     }
+     break;
+   }
+   default:
+     break;
+   }
+   return false;
+ }
+ void ReplicatedBackend::clear_state()
+ {
+   // clear pushing/pulling maps
+   pushing.clear();
+   pulling.clear();
+   pull_from_peer.clear();
+ }
+ void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+ {
+   dout(10) << __func__ << dendl;
+   // clear temp
+   for (set<hobject_t>::iterator i = temp_contents.begin();
+        i != temp_contents.end();
+        ++i) {
+     dout(10) << __func__ << ": Removing oid "
+            << *i << " from the temp collection" << dendl;
+     t->remove(get_temp_coll(t), *i);
+   }
+   temp_contents.clear();
+   clear_state();
+ }
+ coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+ {
+   if (temp_created)
+     return temp_coll;
+   if (!osd->store->collection_exists(temp_coll))
+       t->create_collection(temp_coll);
+   temp_created = true;
+   return temp_coll;
+ }
+ void ReplicatedBackend::on_flushed()
+ {
+   if (have_temp_coll() &&
+       !osd->store->collection_empty(get_temp_coll())) {
+     vector<hobject_t> objects;
+     osd->store->collection_list(get_temp_coll(), objects);
+     derr << __func__ << ": found objects in the temp collection: "
+        << objects << ", crashing now"
+        << dendl;
+     assert(0 == "found garbage in the temp collection");
+   }
+ }
index 401ad9014ff133a0ab47d1d424eea7bdae0ad717,fb5e45a1a712ceb2c74dd4ab1c109b3be830f2b2..1e2a863e3893bd225457bb929bfc6b2cd902f26e
@@@ -79,6 -80,159 +80,159 @@@ PGLSFilter::~PGLSFilter(
  {
  }
  
 -  latency -= op->request->get_recv_stamp();
+ static void log_subop_stats(
+   OSDService *osd,
+   OpRequestRef op, int tag_inb, int tag_lat)
+ {
+   utime_t now = ceph_clock_now(g_ceph_context);
+   utime_t latency = now;
 -  uint64_t inb = op->request->get_data().length();
++  latency -= op->get_req()->get_recv_stamp();
++  uint64_t inb = op->get_req()->get_data().length();
+   osd->logger->inc(l_osd_sop);
+   osd->logger->inc(l_osd_sop_inb, inb);
+   osd->logger->tinc(l_osd_sop_lat, latency);
+   if (tag_inb)
+     osd->logger->inc(tag_inb, inb);
+   osd->logger->tinc(tag_lat, latency);
+ }
+ // ======================
+ // PGBackend::Listener
+ void ReplicatedPG::on_local_recover_start(
+   const hobject_t &oid,
+   ObjectStore::Transaction *t)
+ {
+   pg_log.revise_have(oid, eversion_t());
+   remove_snap_mapped_object(*t, oid);
+   t->remove(coll, oid);
+ }
+ void ReplicatedPG::on_local_recover(
+   const hobject_t &hoid,
+   const object_stat_sum_t &stat_diff,
+   const ObjectRecoveryInfo &_recovery_info,
+   ObjectContextRef obc,
+   ObjectStore::Transaction *t
+   )
+ {
+   ObjectRecoveryInfo recovery_info(_recovery_info);
+   if (recovery_info.soid.snap < CEPH_NOSNAP) {
+     assert(recovery_info.oi.snaps.size());
+     OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+     set<snapid_t> snaps(
+       recovery_info.oi.snaps.begin(),
+       recovery_info.oi.snaps.end());
+     snap_mapper.add_oid(
+       recovery_info.soid,
+       snaps,
+       &_t);
+   }
+   if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+       pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+     assert(is_primary());
+     const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+     if (latest->op == pg_log_entry_t::LOST_REVERT &&
+       latest->reverting_to == recovery_info.version) {
+       dout(10) << " got old revert version " << recovery_info.version
+              << " for " << *latest << dendl;
+       recovery_info.version = latest->version;
+       // update the attr to the revert event version
+       recovery_info.oi.prior_version = recovery_info.oi.version;
+       recovery_info.oi.version = latest->version;
+       bufferlist bl;
+       ::encode(recovery_info.oi, bl);
+       t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+     }
+   }
+   // keep track of active pushes for scrub
+   ++active_pushes;
+   recover_got(recovery_info.soid, recovery_info.version);
+   if (is_primary()) {
+     info.stats.stats.sum.add(stat_diff);
+     assert(obc);
+     obc->obs.exists = true;
+     obc->ondisk_write_lock();
+     obc->obs.oi = recovery_info.oi;  // may have been updated above
+     t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+     t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+     publish_stats_to_osd();
+     if (waiting_for_missing_object.count(hoid)) {
+       dout(20) << " kicking waiters on " << hoid << dendl;
+       requeue_ops(waiting_for_missing_object[hoid]);
+       waiting_for_missing_object.erase(hoid);
+       if (pg_log.get_missing().missing.size() == 0) {
+       requeue_ops(waiting_for_all_missing);
+       waiting_for_all_missing.clear();
+       }
+     }
+   } else {
+     t->register_on_applied(
+       new C_OSD_AppliedRecoveredObjectReplica(this));
+   }
+   t->register_on_commit(
+     new C_OSD_CommittedPushedObject(
+       this,
+       get_osdmap()->get_epoch(),
+       info.last_complete));
+   // update pg
+   dirty_info = true;
+   write_if_dirty(*t);
+ }
+ void ReplicatedPG::on_global_recover(
+   const hobject_t &soid)
+ {
+   publish_stats_to_osd();
+   dout(10) << "pushed " << soid << " to all replicas" << dendl;
+   assert(recovering.count(soid));
+   recovering.erase(soid);
+   finish_recovery_op(soid);
+   if (waiting_for_degraded_object.count(soid)) {
+     requeue_ops(waiting_for_degraded_object[soid]);
+     waiting_for_degraded_object.erase(soid);
+   }
+   finish_degraded_object(soid);
+ }
+ void ReplicatedPG::on_peer_recover(
+   int peer,
+   const hobject_t &soid,
+   const ObjectRecoveryInfo &recovery_info,
+   const object_stat_sum_t &stat)
+ {
+   info.stats.stats.sum.add(stat);
+   publish_stats_to_osd();
+   // done!
+   peer_missing[peer].got(soid, recovery_info.version);
+   if (peer == backfill_target && backfills_in_flight.count(soid))
+     backfills_in_flight.erase(soid);
+ }
+ void ReplicatedPG::begin_peer_recover(
+   int peer,
+   const hobject_t soid)
+ {
+   peer_missing[peer].revise_have(soid, eversion_t());
+ }
  // =======================
  // pg changes
  
@@@ -644,6 -797,62 +797,62 @@@ void ReplicatedPG::get_src_oloc(const o
      src_oloc.key = oid.name;
  }
  
 -  switch (op->request->get_type()) {
+ void ReplicatedPG::do_request(
+   OpRequestRef op,
+   ThreadPool::TPHandle &handle)
+ {
+   // do any pending flush
+   do_pending_flush();
+   if (!op_has_sufficient_caps(op)) {
+     osd->reply_op_error(op, -EPERM);
+     return;
+   }
+   assert(!op_must_wait_for_map(get_osdmap(), op));
+   if (can_discard_request(op)) {
+     return;
+   }
+   if (!flushed) {
+     dout(20) << " !flushed, waiting for active on " << op << dendl;
+     waiting_for_active.push_back(op);
+     return;
+   }
+   if (pgbackend->handle_message(op))
+     return;
++  switch (op->get_req()->get_type()) {
+   case CEPH_MSG_OSD_OP:
+     if (is_replay() || !is_active()) {
+       dout(20) << " replay, waiting for active on " << op << dendl;
+       waiting_for_active.push_back(op);
+       return;
+     }
+     do_op(op); // do it now
+     break;
+   case MSG_OSD_SUBOP:
+     do_sub_op(op);
+     break;
+   case MSG_OSD_SUBOPREPLY:
+     do_sub_op_reply(op);
+     break;
+   case MSG_OSD_PG_SCAN:
+     do_scan(op, handle);
+     break;
+   case MSG_OSD_PG_BACKFILL:
+     do_backfill(op);
+     break;
+   default:
+     assert(0 == "bad message type in do_request");
+   }
+ }
  /** do_op - do an op
   * pg lock will be held (if multithreaded)
   * osd_lock NOT held.
@@@ -1229,32 -1451,12 +1451,12 @@@ void ReplicatedPG::log_op_stats(OpConte
           << " lat " << latency << dendl;
  }
  
- void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
- {
-   utime_t now = ceph_clock_now(cct);
-   utime_t latency = now;
-   latency -= op->get_req()->get_recv_stamp();
-   uint64_t inb = op->get_req()->get_data().length();
-   osd->logger->inc(l_osd_sop);
-   osd->logger->inc(l_osd_sop_inb, inb);
-   osd->logger->tinc(l_osd_sop_lat, latency);
-   if (tag_inb)
-     osd->logger->inc(tag_inb, inb);
-   osd->logger->tinc(tag_lat, latency);
- }
  void ReplicatedPG::do_sub_op(OpRequestRef op)
  {
 -  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
 +  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
    assert(have_same_or_newer_map(m->map_epoch));
    assert(m->get_header().type == MSG_OSD_SUBOP);
 -  dout(15) << "do_sub_op " << *op->request << dendl;
 +  dout(15) << "do_sub_op " << *op->get_req() << dendl;
  
    OSDOp *first = NULL;
    if (m->ops.size() >= 1) {
@@@ -1395,9 -1584,9 +1584,9 @@@ void ReplicatedPG::do_scan
    }
  }
  
- void ReplicatedPG::_do_push(OpRequestRef op)
+ void ReplicatedBackend::_do_push(OpRequestRef op)
  {
 -  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
 +  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
    assert(m->get_header().type == MSG_OSD_PG_PUSH);
    int from = m->get_source().num();
  
    reply->replies.swap(replies);
    reply->compute_cost(cct);
  
-   t->register_on_complete(new C_OSD_SendMessageOnConn(
-                           osd, reply, m->get_connection()));
+   t->register_on_complete(
+     new C_OSD_SendMessageOnConn(
+       osd, reply, m->get_connection()));
  
-   osd->store->queue_transaction(osr.get(), t);
+   get_parent()->queue_transaction(t);
  }
  
- void ReplicatedPG::_do_pull_response(OpRequestRef op)
+ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+   ReplicatedBackend *bc;
+   list<ObjectContextRef> to_continue;
+   int priority;
+   C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+     : bc(bc), priority(priority) {}
+   void finish(ThreadPool::TPHandle &handle) {
+     ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+     for (list<ObjectContextRef>::iterator i =
+          to_continue.begin();
+        i != to_continue.end();
+        ++i) {
+       if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) {
+       bc->get_parent()->on_global_recover(
+         (*i)->obs.oi.soid);
+       }
+       handle.reset_tp_timeout();
+     }
+     bc->run_recovery_op(h, priority);
+   }
+ };
+ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
  {
 -  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
 +  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
    assert(m->get_header().type == MSG_OSD_PG_PUSH);
    int from = m->get_source().num();
  
      reply->pulls.swap(replies);
      reply->compute_cost(cct);
  
-     t->register_on_complete(new C_OSD_SendMessageOnConn(
-                             osd, reply, m->get_connection()));
+     t->register_on_complete(
+       new C_OSD_SendMessageOnConn(
+       osd, reply, m->get_connection()));
    }
  
-   osd->store->queue_transaction(osr.get(), t);
+   get_parent()->queue_transaction(t);
  }
  
- void ReplicatedPG::do_pull(OpRequestRef op)
+ void ReplicatedBackend::do_pull(OpRequestRef op)
  {
 -  MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
 +  MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
    assert(m->get_header().type == MSG_OSD_PG_PULL);
    int from = m->get_source().num();
  
    send_pushes(m->get_priority(), replies);
  }
  
- void ReplicatedPG::do_push_reply(OpRequestRef op)
+ void ReplicatedBackend::do_push_reply(OpRequestRef op)
  {
 -  MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
 +  MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
    assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
    int from = m->get_source().num();
  
@@@ -6484,9 -6590,9 +6590,9 @@@ void ReplicatedBackend::prep_push_op_bl
    op->soid = soid;
  }
  
- void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
+ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
  {
 -  MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
 +  MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
    const hobject_t& soid = reply->get_poid();
    assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
    dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
    PushOp pop;
    bool more = handle_push_reply(peer, rop, &pop);
    if (more)
-     send_push_op_legacy(pushing[soid][peer].priority, peer, pop);
 -    send_push_op_legacy(op->request->get_priority(), peer, pop);
++    send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
  }
  
- bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
+ bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
  {
    const hobject_t &soid = op.soid;
    if (pushing.count(soid) == 0) {
@@@ -6585,9 -6684,9 +6684,9 @@@ void ReplicatedPG::finish_degraded_obje
   * process request to pull an entire object.
   * NOTE: called from opqueue.
   */
- void ReplicatedPG::sub_op_pull(OpRequestRef op)
+ void ReplicatedBackend::sub_op_pull(OpRequestRef op)
  {
 -  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
 +  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
    assert(m->get_header().type == MSG_OSD_SUBOP);
  
    op->mark_started();
@@@ -6776,10 -6876,10 +6876,10 @@@ void ReplicatedBackend::trim_pushed_dat
  /** op_push
   * NOTE: called from opqueue.
   */
- void ReplicatedPG::sub_op_push(OpRequestRef op)
+ void ReplicatedBackend::sub_op_push(OpRequestRef op)
  {
    op->mark_started();
 -  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
 +  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
  
    PushOp pop;
    pop.soid = m->recovery_info.soid;
        m->get_source().num(),
        resp.recovery_info,
        resp.recovery_progress);
-     }
+     } else {
+       C_ReplicatedBackend_OnPullComplete *c =
+       new C_ReplicatedBackend_OnPullComplete(
+         this,
 -        op->request->get_priority());
++        op->get_req()->get_priority());
+       c->to_continue.swap(to_continue);
+       t->register_on_complete(
+       new C_QueueInWQ(
+         &osd->push_wq,
+         get_parent()->bless_gencontext(c)));
+     }
 -    run_recovery_op(h, op->request->get_priority());
++    run_recovery_op(h, op->get_req()->get_priority());
    } else {
      PushReplyOp resp;
      MOSDSubOpReply *reply = new MOSDSubOpReply(
Simple merge
Simple merge
Simple merge