From: Greg Farnum Date: Thu, 3 Oct 2013 22:50:40 +0000 (-0700) Subject: Merge branch 'master' into wip-optracker X-Git-Tag: v0.72-rc1~93^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b9d4e97378aa90687e23de73493ec92b280cae6a;p=ceph.git Merge branch 'master' into wip-optracker Conflicts: src/osd/OpRequest.h src/osd/PG.cc src/osd/ReplicatedPG.cc Signed-off-by: Greg Farnum --- b9d4e97378aa90687e23de73493ec92b280cae6a diff --cc src/common/TrackedOp.h index 9e00c14b1784,753331df7f38..44e03905759a --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@@ -22,158 -21,11 +22,158 @@@ #include "msg/Message.h" #include +class TrackedOp; +typedef std::tr1::shared_ptr TrackedOpRef; + +class OpTracker; +class OpHistory { + set > arrived; + set > duration; + void cleanup(utime_t now); + bool shutdown; + OpTracker *tracker; + uint32_t history_size; + uint32_t history_duration; + +public: + OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_), + history_size(0), history_duration(0) {} + ~OpHistory() { + assert(arrived.empty()); + assert(duration.empty()); + } + void insert(utime_t now, TrackedOpRef op); + void dump_ops(utime_t now, Formatter *f); + void on_shutdown(); + void set_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history_size = new_size; + history_duration = new_duration; + } +}; + +class OpTracker { + class RemoveOnDelete { + OpTracker *tracker; + public: + RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} + void operator()(TrackedOp *op); + }; + friend class RemoveOnDelete; + friend class OpHistory; + uint64_t seq; + Mutex ops_in_flight_lock; + xlist ops_in_flight; + OpHistory history; + float complaint_time; + int log_threshold; + +public: + CephContext *cct; + OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), + history(this), complaint_time(0), log_threshold(0), cct(cct_) {} + void set_complaint_and_threshold(float time, int threshold) { + complaint_time = time; + log_threshold = threshold; + } + void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history.set_size_and_duration(new_size, new_duration); + } + void dump_ops_in_flight(Formatter *f); + void dump_historic_ops(Formatter *f); + void register_inflight_op(xlist::item *i); + void unregister_inflight_op(TrackedOp *i); + + void get_age_ms_histogram(pow2_hist_t *h); + + /** + * Look for Ops which are too old, and insert warning + * strings for each Op that is too old. + * + * @param warning_strings A vector reference which is filled + * with a warning string for each old Op. + * @return True if there are any Ops to warn on, false otherwise. + */ + bool check_ops_in_flight(std::vector &warning_strings); + void mark_event(TrackedOp *op, const string &evt); + void _mark_event(TrackedOp *op, const string &evt, utime_t now); + + void on_shutdown() { + Mutex::Locker l(ops_in_flight_lock); + history.on_shutdown(); + } + ~OpTracker() { + assert(ops_in_flight.empty()); + } + + template + typename T::Ref create_request(Message *ref) + { + typename T::Ref retval(new T(ref, this), + RemoveOnDelete(this)); + + _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); + _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); + _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); + _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); + + retval->init_from_message(); + + return retval; + } +}; + class TrackedOp { +private: + friend class OpHistory; + friend class OpTracker; + xlist::item xitem; +protected: + Message *request; /// the logical request we are tracking + OpTracker *tracker; /// the tracker we are associated with + + list > events; /// list of events and their times + Mutex lock; /// to protect the events list + string current; /// the current state the event is in + uint64_t seq; /// a unique value set by the OpTracker + - uint8_t warn_interval_multiplier; // limits output of a given op warning ++ uint32_t warn_interval_multiplier; // limits output of a given op warning + + TrackedOp(Message *req, OpTracker *_tracker) : + xitem(this), + request(req), + tracker(_tracker), + lock("TrackedOp::lock"), + seq(0), + warn_interval_multiplier(1) + { + tracker->register_inflight_op(&xitem); + } + + virtual void init_from_message() {} + /// output any type-specific data you want to get when dump() is called + virtual void _dump(utime_t now, Formatter *f) const {} + /// if you want something else to happen when events are marked, implement + virtual void _event_marked() {} + public: - virtual void mark_event(const string &event) = 0; - virtual ~TrackedOp() {} + virtual ~TrackedOp() { assert(request); request->put(); } + + utime_t get_arrived() const { + return request->get_recv_stamp(); + } + // This function maybe needs some work; assumes last event is completion time + double get_duration() const { + return events.size() ? + (events.rbegin()->first - get_arrived()) : + 0.0; + } + Message *get_req() const { return request; } + + void mark_event(const string &event); + virtual const char *state_string() const { + return events.rbegin()->second.c_str(); + } + void dump(utime_t now, Formatter *f) const; }; -typedef std::tr1::shared_ptr TrackedOpRef; #endif diff --cc src/osd/ReplicatedBackend.cc index 000000000000,9868e7af2c87..b39207e14f85 mode 000000,100644..100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@@ -1,0 -1,196 +1,196 @@@ + // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + // vim: ts=8 sw=2 smarttab + /* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #include "ReplicatedBackend.h" + #include "messages/MOSDSubOp.h" + #include "messages/MOSDSubOpReply.h" + #include "messages/MOSDPGPush.h" + #include "messages/MOSDPGPull.h" + #include "messages/MOSDPGPushReply.h" + + #define dout_subsys ceph_subsys_osd + #define DOUT_PREFIX_ARGS this + #undef dout_prefix + #define dout_prefix _prefix(_dout, this) + static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { + return *_dout << pgb->get_parent()->gen_dbg_prefix(); + } + + ReplicatedBackend::ReplicatedBackend( + PGBackend::Listener *pg, coll_t coll, OSDService *osd) : + PGBackend(pg), temp_created(false), + temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)), + coll(coll), osd(osd), cct(osd->cct) {} + + void ReplicatedBackend::run_recovery_op( + PGBackend::RecoveryHandle *_h, + int priority) + { + RPGHandle *h = static_cast(_h); + send_pushes(priority, h->pushes); + send_pulls(priority, h->pulls); + delete h; + } + + void ReplicatedBackend::recover_object( + const hobject_t &hoid, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *_h + ) + { + dout(10) << __func__ << ": " << hoid << dendl; + RPGHandle *h = static_cast(_h); + if (get_parent()->get_local_missing().is_missing(hoid)) { + assert(!obc); + // pull + prepare_pull( + hoid, + head, + h); + return; + } else { + assert(obc); + int started = start_pushes( + hoid, + obc, + h); + assert(started > 0); + } + } + + void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap) + { + for(map >::iterator i = pull_from_peer.begin(); + i != pull_from_peer.end(); + ) { + if (osdmap->is_down(i->first)) { + dout(10) << "check_recovery_sources resetting pulls from osd." << i->first + << ", osdmap has it marked down" << dendl; + for (set::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + assert(pulling.count(*j) == 1); + get_parent()->cancel_pull(*j); + pulling.erase(*j); + } + pull_from_peer.erase(i++); + } else { + ++i; + } + } + } + + bool ReplicatedBackend::handle_message( + OpRequestRef op + ) + { + dout(10) << __func__ << ": " << op << dendl; - switch (op->request->get_type()) { ++ switch (op->get_req()->get_type()) { + case MSG_OSD_PG_PUSH: + // TODOXXX: needs to be active possibly + do_push(op); + return true; + + case MSG_OSD_PG_PULL: + do_pull(op); + return true; + + case MSG_OSD_PG_PUSH_REPLY: + do_push_reply(op); + return true; + + case MSG_OSD_SUBOP: { - MOSDSubOp *m = static_cast(op->request); ++ MOSDSubOp *m = static_cast(op->get_req()); + if (m->ops.size() >= 1) { + OSDOp *first = &m->ops[0]; + switch (first->op.op) { + case CEPH_OSD_OP_PULL: + sub_op_pull(op); + return true; + case CEPH_OSD_OP_PUSH: + // TODOXXX: needs to be active possibly + sub_op_push(op); + return true; + default: + break; + } + } + break; + } + + case MSG_OSD_SUBOPREPLY: { - MOSDSubOpReply *r = static_cast(op->request); ++ MOSDSubOpReply *r = static_cast(op->get_req()); + if (r->ops.size() >= 1) { + OSDOp &first = r->ops[0]; + switch (first.op.op) { + case CEPH_OSD_OP_PUSH: + // continue peer recovery + sub_op_push_reply(op); + return true; + } + } + break; + } + + default: + break; + } + return false; + } + + void ReplicatedBackend::clear_state() + { + // clear pushing/pulling maps + pushing.clear(); + pulling.clear(); + pull_from_peer.clear(); + } + + void ReplicatedBackend::on_change(ObjectStore::Transaction *t) + { + dout(10) << __func__ << dendl; + // clear temp + for (set::iterator i = temp_contents.begin(); + i != temp_contents.end(); + ++i) { + dout(10) << __func__ << ": Removing oid " + << *i << " from the temp collection" << dendl; + t->remove(get_temp_coll(t), *i); + } + temp_contents.clear(); + clear_state(); + } + + coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t) + { + if (temp_created) + return temp_coll; + if (!osd->store->collection_exists(temp_coll)) + t->create_collection(temp_coll); + temp_created = true; + return temp_coll; + } + + void ReplicatedBackend::on_flushed() + { + if (have_temp_coll() && + !osd->store->collection_empty(get_temp_coll())) { + vector objects; + osd->store->collection_list(get_temp_coll(), objects); + derr << __func__ << ": found objects in the temp collection: " + << objects << ", crashing now" + << dendl; + assert(0 == "found garbage in the temp collection"); + } + } diff --cc src/osd/ReplicatedPG.cc index 401ad9014ff1,fb5e45a1a712..1e2a863e3893 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@@ -79,6 -80,159 +80,159 @@@ PGLSFilter::~PGLSFilter( { } + static void log_subop_stats( + OSDService *osd, + OpRequestRef op, int tag_inb, int tag_lat) + { + utime_t now = ceph_clock_now(g_ceph_context); + utime_t latency = now; - latency -= op->request->get_recv_stamp(); ++ latency -= op->get_req()->get_recv_stamp(); + - uint64_t inb = op->request->get_data().length(); ++ uint64_t inb = op->get_req()->get_data().length(); + + osd->logger->inc(l_osd_sop); + + osd->logger->inc(l_osd_sop_inb, inb); + osd->logger->tinc(l_osd_sop_lat, latency); + + if (tag_inb) + osd->logger->inc(tag_inb, inb); + osd->logger->tinc(tag_lat, latency); + } + + // ====================== + // PGBackend::Listener + + + void ReplicatedPG::on_local_recover_start( + const hobject_t &oid, + ObjectStore::Transaction *t) + { + pg_log.revise_have(oid, eversion_t()); + remove_snap_mapped_object(*t, oid); + t->remove(coll, oid); + } + + void ReplicatedPG::on_local_recover( + const hobject_t &hoid, + const object_stat_sum_t &stat_diff, + const ObjectRecoveryInfo &_recovery_info, + ObjectContextRef obc, + ObjectStore::Transaction *t + ) + { + ObjectRecoveryInfo recovery_info(_recovery_info); + if (recovery_info.soid.snap < CEPH_NOSNAP) { + assert(recovery_info.oi.snaps.size()); + OSDriver::OSTransaction _t(osdriver.get_transaction(t)); + set snaps( + recovery_info.oi.snaps.begin(), + recovery_info.oi.snaps.end()); + snap_mapper.add_oid( + recovery_info.soid, + snaps, + &_t); + } + + if (pg_log.get_missing().is_missing(recovery_info.soid) && + pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) { + assert(is_primary()); + const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second; + if (latest->op == pg_log_entry_t::LOST_REVERT && + latest->reverting_to == recovery_info.version) { + dout(10) << " got old revert version " << recovery_info.version + << " for " << *latest << dendl; + recovery_info.version = latest->version; + // update the attr to the revert event version + recovery_info.oi.prior_version = recovery_info.oi.version; + recovery_info.oi.version = latest->version; + bufferlist bl; + ::encode(recovery_info.oi, bl); + t->setattr(coll, recovery_info.soid, OI_ATTR, bl); + } + } + + // keep track of active pushes for scrub + ++active_pushes; + + recover_got(recovery_info.soid, recovery_info.version); + + if (is_primary()) { + info.stats.stats.sum.add(stat_diff); + + assert(obc); + obc->obs.exists = true; + obc->ondisk_write_lock(); + obc->obs.oi = recovery_info.oi; // may have been updated above + + + t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc)); + t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc)); + + publish_stats_to_osd(); + if (waiting_for_missing_object.count(hoid)) { + dout(20) << " kicking waiters on " << hoid << dendl; + requeue_ops(waiting_for_missing_object[hoid]); + waiting_for_missing_object.erase(hoid); + if (pg_log.get_missing().missing.size() == 0) { + requeue_ops(waiting_for_all_missing); + waiting_for_all_missing.clear(); + } + } + } else { + t->register_on_applied( + new C_OSD_AppliedRecoveredObjectReplica(this)); + + } + + t->register_on_commit( + new C_OSD_CommittedPushedObject( + this, + get_osdmap()->get_epoch(), + info.last_complete)); + + // update pg + dirty_info = true; + write_if_dirty(*t); + + } + + void ReplicatedPG::on_global_recover( + const hobject_t &soid) + { + publish_stats_to_osd(); + dout(10) << "pushed " << soid << " to all replicas" << dendl; + assert(recovering.count(soid)); + recovering.erase(soid); + finish_recovery_op(soid); + if (waiting_for_degraded_object.count(soid)) { + requeue_ops(waiting_for_degraded_object[soid]); + waiting_for_degraded_object.erase(soid); + } + finish_degraded_object(soid); + } + + void ReplicatedPG::on_peer_recover( + int peer, + const hobject_t &soid, + const ObjectRecoveryInfo &recovery_info, + const object_stat_sum_t &stat) + { + info.stats.stats.sum.add(stat); + publish_stats_to_osd(); + // done! + peer_missing[peer].got(soid, recovery_info.version); + if (peer == backfill_target && backfills_in_flight.count(soid)) + backfills_in_flight.erase(soid); + } + + void ReplicatedPG::begin_peer_recover( + int peer, + const hobject_t soid) + { + peer_missing[peer].revise_have(soid, eversion_t()); + } + // ======================= // pg changes @@@ -644,6 -797,62 +797,62 @@@ void ReplicatedPG::get_src_oloc(const o src_oloc.key = oid.name; } + void ReplicatedPG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) + { + // do any pending flush + do_pending_flush(); + + if (!op_has_sufficient_caps(op)) { + osd->reply_op_error(op, -EPERM); + return; + } + assert(!op_must_wait_for_map(get_osdmap(), op)); + if (can_discard_request(op)) { + return; + } + if (!flushed) { + dout(20) << " !flushed, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + + if (pgbackend->handle_message(op)) + return; + - switch (op->request->get_type()) { ++ switch (op->get_req()->get_type()) { + case CEPH_MSG_OSD_OP: + if (is_replay() || !is_active()) { + dout(20) << " replay, waiting for active on " << op << dendl; + waiting_for_active.push_back(op); + return; + } + do_op(op); // do it now + break; + + case MSG_OSD_SUBOP: + do_sub_op(op); + break; + + case MSG_OSD_SUBOPREPLY: + do_sub_op_reply(op); + break; + + case MSG_OSD_PG_SCAN: + do_scan(op, handle); + break; + + case MSG_OSD_PG_BACKFILL: + do_backfill(op); + break; + + default: + assert(0 == "bad message type in do_request"); + } + } + + /** do_op - do an op * pg lock will be held (if multithreaded) * osd_lock NOT held. @@@ -1229,32 -1451,12 +1451,12 @@@ void ReplicatedPG::log_op_stats(OpConte << " lat " << latency << dendl; } - void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat) - { - utime_t now = ceph_clock_now(cct); - utime_t latency = now; - latency -= op->get_req()->get_recv_stamp(); - - uint64_t inb = op->get_req()->get_data().length(); - - osd->logger->inc(l_osd_sop); - - osd->logger->inc(l_osd_sop_inb, inb); - osd->logger->tinc(l_osd_sop_lat, latency); - - if (tag_inb) - osd->logger->inc(tag_inb, inb); - osd->logger->tinc(tag_lat, latency); - } - - - void ReplicatedPG::do_sub_op(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(have_same_or_newer_map(m->map_epoch)); assert(m->get_header().type == MSG_OSD_SUBOP); - dout(15) << "do_sub_op " << *op->request << dendl; + dout(15) << "do_sub_op " << *op->get_req() << dendl; OSDOp *first = NULL; if (m->ops.size() >= 1) { @@@ -1395,9 -1584,9 +1584,9 @@@ void ReplicatedPG::do_scan } } - void ReplicatedPG::_do_push(OpRequestRef op) + void ReplicatedBackend::_do_push(OpRequestRef op) { - MOSDPGPush *m = static_cast(op->request); + MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@@ -1417,15 -1606,39 +1606,39 @@@ reply->replies.swap(replies); reply->compute_cost(cct); - t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + t->register_on_complete( + new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection())); - osd->store->queue_transaction(osr.get(), t); + get_parent()->queue_transaction(t); } - void ReplicatedPG::_do_pull_response(OpRequestRef op) + struct C_ReplicatedBackend_OnPullComplete : GenContext { + ReplicatedBackend *bc; + list to_continue; + int priority; + C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority) + : bc(bc), priority(priority) {} + + void finish(ThreadPool::TPHandle &handle) { + ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op(); + for (list::iterator i = + to_continue.begin(); + i != to_continue.end(); + ++i) { + if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) { + bc->get_parent()->on_global_recover( + (*i)->obs.oi.soid); + } + handle.reset_tp_timeout(); + } + bc->run_recovery_op(h, priority); + } + }; + + void ReplicatedBackend::_do_pull_response(OpRequestRef op) { - MOSDPGPush *m = static_cast(op->request); + MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@@ -1448,16 -1673,17 +1673,17 @@@ reply->pulls.swap(replies); reply->compute_cost(cct); - t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + t->register_on_complete( + new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection())); } - osd->store->queue_transaction(osr.get(), t); + get_parent()->queue_transaction(t); } - void ReplicatedPG::do_pull(OpRequestRef op) + void ReplicatedBackend::do_pull(OpRequestRef op) { - MOSDPGPull *m = static_cast(op->request); + MOSDPGPull *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PULL); int from = m->get_source().num(); @@@ -1471,9 -1697,9 +1697,9 @@@ send_pushes(m->get_priority(), replies); } - void ReplicatedPG::do_push_reply(OpRequestRef op) + void ReplicatedBackend::do_push_reply(OpRequestRef op) { - MOSDPGPushReply *m = static_cast(op->request); + MOSDPGPushReply *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY); int from = m->get_source().num(); @@@ -6484,9 -6590,9 +6590,9 @@@ void ReplicatedBackend::prep_push_op_bl op->soid = soid; } - void ReplicatedPG::sub_op_push_reply(OpRequestRef op) + void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) { - MOSDSubOpReply *reply = static_cast(op->request); + MOSDSubOpReply *reply = static_cast(op->get_req()); const hobject_t& soid = reply->get_poid(); assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; @@@ -6499,10 -6605,10 +6605,10 @@@ PushOp pop; bool more = handle_push_reply(peer, rop, &pop); if (more) - send_push_op_legacy(pushing[soid][peer].priority, peer, pop); - send_push_op_legacy(op->request->get_priority(), peer, pop); ++ send_push_op_legacy(op->get_req()->get_priority(), peer, pop); } - bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) + bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) { const hobject_t &soid = op.soid; if (pushing.count(soid) == 0) { @@@ -6585,9 -6684,9 +6684,9 @@@ void ReplicatedPG::finish_degraded_obje * process request to pull an entire object. * NOTE: called from opqueue. */ - void ReplicatedPG::sub_op_pull(OpRequestRef op) + void ReplicatedBackend::sub_op_pull(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); op->mark_started(); @@@ -6776,10 -6876,10 +6876,10 @@@ void ReplicatedBackend::trim_pushed_dat /** op_push * NOTE: called from opqueue. */ - void ReplicatedPG::sub_op_push(OpRequestRef op) + void ReplicatedBackend::sub_op_push(OpRequestRef op) { op->mark_started(); - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); PushOp pop; pop.soid = m->recovery_info.soid; @@@ -6803,7 -6907,18 +6907,18 @@@ m->get_source().num(), resp.recovery_info, resp.recovery_progress); - } + } else { + C_ReplicatedBackend_OnPullComplete *c = + new C_ReplicatedBackend_OnPullComplete( + this, - op->request->get_priority()); ++ op->get_req()->get_priority()); + c->to_continue.swap(to_continue); + t->register_on_complete( + new C_QueueInWQ( + &osd->push_wq, + get_parent()->bless_gencontext(c))); + } - run_recovery_op(h, op->request->get_priority()); ++ run_recovery_op(h, op->get_req()->get_priority()); } else { PushReplyOp resp; MOSDSubOpReply *reply = new MOSDSubOpReply(