#include "msg/Message.h"
#include <tr1/memory>
+class TrackedOp;
+typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
+
+class OpTracker;
+class OpHistory {
+ set<pair<utime_t, TrackedOpRef> > arrived;
+ set<pair<double, TrackedOpRef> > duration;
+ void cleanup(utime_t now);
+ bool shutdown;
+ OpTracker *tracker;
+ uint32_t history_size;
+ uint32_t history_duration;
+
+public:
+ OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_),
+ history_size(0), history_duration(0) {}
+ ~OpHistory() {
+ assert(arrived.empty());
+ assert(duration.empty());
+ }
+ void insert(utime_t now, TrackedOpRef op);
+ void dump_ops(utime_t now, Formatter *f);
+ void on_shutdown();
+ void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
+ history_size = new_size;
+ history_duration = new_duration;
+ }
+};
+
+class OpTracker {
+ class RemoveOnDelete {
+ OpTracker *tracker;
+ public:
+ RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
+ void operator()(TrackedOp *op);
+ };
+ friend class RemoveOnDelete;
+ friend class OpHistory;
+ uint64_t seq;
+ Mutex ops_in_flight_lock;
+ xlist<TrackedOp *> ops_in_flight;
+ OpHistory history;
+ float complaint_time;
+ int log_threshold;
+
+public:
+ CephContext *cct;
+ OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"),
+ history(this), complaint_time(0), log_threshold(0), cct(cct_) {}
+ void set_complaint_and_threshold(float time, int threshold) {
+ complaint_time = time;
+ log_threshold = threshold;
+ }
+ void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
+ history.set_size_and_duration(new_size, new_duration);
+ }
+ void dump_ops_in_flight(Formatter *f);
+ void dump_historic_ops(Formatter *f);
+ void register_inflight_op(xlist<TrackedOp*>::item *i);
+ void unregister_inflight_op(TrackedOp *i);
+
+ void get_age_ms_histogram(pow2_hist_t *h);
+
+ /**
+ * Look for Ops which are too old, and insert warning
+ * strings for each Op that is too old.
+ *
+ * @param warning_strings A vector<string> reference which is filled
+ * with a warning string for each old Op.
+ * @return True if there are any Ops to warn on, false otherwise.
+ */
+ bool check_ops_in_flight(std::vector<string> &warning_strings);
+ void mark_event(TrackedOp *op, const string &evt);
+ void _mark_event(TrackedOp *op, const string &evt, utime_t now);
+
+ void on_shutdown() {
+ Mutex::Locker l(ops_in_flight_lock);
+ history.on_shutdown();
+ }
+ ~OpTracker() {
+ assert(ops_in_flight.empty());
+ }
+
+ template <typename T>
+ typename T::Ref create_request(Message *ref)
+ {
+ typename T::Ref retval(new T(ref, this),
+ RemoveOnDelete(this));
+
+ _mark_event(retval.get(), "header_read", ref->get_recv_stamp());
+ _mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
+ _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
+ _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
+
+ retval->init_from_message();
+
+ return retval;
+ }
+};
+
class TrackedOp {
- uint8_t warn_interval_multiplier; // limits output of a given op warning
+private:
+ friend class OpHistory;
+ friend class OpTracker;
+ xlist<TrackedOp*>::item xitem;
+protected:
+ Message *request; /// the logical request we are tracking
+ OpTracker *tracker; /// the tracker we are associated with
+
+ list<pair<utime_t, string> > events; /// list of events and their times
+ Mutex lock; /// to protect the events list
+ string current; /// the current state the event is in
+ uint64_t seq; /// a unique value set by the OpTracker
+
++ uint32_t warn_interval_multiplier; // limits output of a given op warning
+
+ TrackedOp(Message *req, OpTracker *_tracker) :
+ xitem(this),
+ request(req),
+ tracker(_tracker),
+ lock("TrackedOp::lock"),
+ seq(0),
+ warn_interval_multiplier(1)
+ {
+ tracker->register_inflight_op(&xitem);
+ }
+
+ virtual void init_from_message() {}
+ /// output any type-specific data you want to get when dump() is called
+ virtual void _dump(utime_t now, Formatter *f) const {}
+ /// if you want something else to happen when events are marked, implement
+ virtual void _event_marked() {}
+
public:
- virtual void mark_event(const string &event) = 0;
- virtual ~TrackedOp() {}
+ virtual ~TrackedOp() { assert(request); request->put(); }
+
+ utime_t get_arrived() const {
+ return request->get_recv_stamp();
+ }
+ // This function maybe needs some work; assumes last event is completion time
+ double get_duration() const {
+ return events.size() ?
+ (events.rbegin()->first - get_arrived()) :
+ 0.0;
+ }
+ Message *get_req() const { return request; }
+
+ void mark_event(const string &event);
+ virtual const char *state_string() const {
+ return events.rbegin()->second.c_str();
+ }
+ void dump(utime_t now, Formatter *f) const;
};
-typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
#endif
--- /dev/null
- switch (op->request->get_type()) {
+ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+ // vim: ts=8 sw=2 smarttab
+ /*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+ #include "ReplicatedBackend.h"
+ #include "messages/MOSDSubOp.h"
+ #include "messages/MOSDSubOpReply.h"
+ #include "messages/MOSDPGPush.h"
+ #include "messages/MOSDPGPull.h"
+ #include "messages/MOSDPGPushReply.h"
+
+ #define dout_subsys ceph_subsys_osd
+ #define DOUT_PREFIX_ARGS this
+ #undef dout_prefix
+ #define dout_prefix _prefix(_dout, this)
+ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+ return *_dout << pgb->get_parent()->gen_dbg_prefix();
+ }
+
+ ReplicatedBackend::ReplicatedBackend(
+ PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+ PGBackend(pg), temp_created(false),
+ temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
+ coll(coll), osd(osd), cct(osd->cct) {}
+
+ void ReplicatedBackend::run_recovery_op(
+ PGBackend::RecoveryHandle *_h,
+ int priority)
+ {
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ send_pushes(priority, h->pushes);
+ send_pulls(priority, h->pulls);
+ delete h;
+ }
+
+ void ReplicatedBackend::recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *_h
+ )
+ {
+ dout(10) << __func__ << ": " << hoid << dendl;
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ if (get_parent()->get_local_missing().is_missing(hoid)) {
+ assert(!obc);
+ // pull
+ prepare_pull(
+ hoid,
+ head,
+ h);
+ return;
+ } else {
+ assert(obc);
+ int started = start_pushes(
+ hoid,
+ obc,
+ h);
+ assert(started > 0);
+ }
+ }
+
+ void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
+ {
+ for(map<int, set<hobject_t> >::iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ) {
+ if (osdmap->is_down(i->first)) {
+ dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
+ << ", osdmap has it marked down" << dendl;
+ for (set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ assert(pulling.count(*j) == 1);
+ get_parent()->cancel_pull(*j);
+ pulling.erase(*j);
+ }
+ pull_from_peer.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ }
+
+ bool ReplicatedBackend::handle_message(
+ OpRequestRef op
+ )
+ {
+ dout(10) << __func__ << ": " << op << dendl;
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
++ switch (op->get_req()->get_type()) {
+ case MSG_OSD_PG_PUSH:
+ // TODOXXX: needs to be active possibly
+ do_push(op);
+ return true;
+
+ case MSG_OSD_PG_PULL:
+ do_pull(op);
+ return true;
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ do_push_reply(op);
+ return true;
+
+ case MSG_OSD_SUBOP: {
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
++ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
+ if (m->ops.size() >= 1) {
+ OSDOp *first = &m->ops[0];
+ switch (first->op.op) {
+ case CEPH_OSD_OP_PULL:
+ sub_op_pull(op);
+ return true;
+ case CEPH_OSD_OP_PUSH:
+ // TODOXXX: needs to be active possibly
+ sub_op_push(op);
+ return true;
+ default:
+ break;
+ }
+ }
+ break;
+ }
+
+ case MSG_OSD_SUBOPREPLY: {
++ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
+ if (r->ops.size() >= 1) {
+ OSDOp &first = r->ops[0];
+ switch (first.op.op) {
+ case CEPH_OSD_OP_PUSH:
+ // continue peer recovery
+ sub_op_push_reply(op);
+ return true;
+ }
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+ return false;
+ }
+
+ void ReplicatedBackend::clear_state()
+ {
+ // clear pushing/pulling maps
+ pushing.clear();
+ pulling.clear();
+ pull_from_peer.clear();
+ }
+
+ void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+ {
+ dout(10) << __func__ << dendl;
+ // clear temp
+ for (set<hobject_t>::iterator i = temp_contents.begin();
+ i != temp_contents.end();
+ ++i) {
+ dout(10) << __func__ << ": Removing oid "
+ << *i << " from the temp collection" << dendl;
+ t->remove(get_temp_coll(t), *i);
+ }
+ temp_contents.clear();
+ clear_state();
+ }
+
+ coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+ {
+ if (temp_created)
+ return temp_coll;
+ if (!osd->store->collection_exists(temp_coll))
+ t->create_collection(temp_coll);
+ temp_created = true;
+ return temp_coll;
+ }
+
+ void ReplicatedBackend::on_flushed()
+ {
+ if (have_temp_coll() &&
+ !osd->store->collection_empty(get_temp_coll())) {
+ vector<hobject_t> objects;
+ osd->store->collection_list(get_temp_coll(), objects);
+ derr << __func__ << ": found objects in the temp collection: "
+ << objects << ", crashing now"
+ << dendl;
+ assert(0 == "found garbage in the temp collection");
+ }
+ }
{
}
- latency -= op->request->get_recv_stamp();
+ static void log_subop_stats(
+ OSDService *osd,
+ OpRequestRef op, int tag_inb, int tag_lat)
+ {
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t latency = now;
- uint64_t inb = op->request->get_data().length();
++ latency -= op->get_req()->get_recv_stamp();
+
++ uint64_t inb = op->get_req()->get_data().length();
+
+ osd->logger->inc(l_osd_sop);
+
+ osd->logger->inc(l_osd_sop_inb, inb);
+ osd->logger->tinc(l_osd_sop_lat, latency);
+
+ if (tag_inb)
+ osd->logger->inc(tag_inb, inb);
+ osd->logger->tinc(tag_lat, latency);
+ }
+
+ // ======================
+ // PGBackend::Listener
+
+
+ void ReplicatedPG::on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t)
+ {
+ pg_log.revise_have(oid, eversion_t());
+ remove_snap_mapped_object(*t, oid);
+ t->remove(coll, oid);
+ }
+
+ void ReplicatedPG::on_local_recover(
+ const hobject_t &hoid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &_recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ )
+ {
+ ObjectRecoveryInfo recovery_info(_recovery_info);
+ if (recovery_info.soid.snap < CEPH_NOSNAP) {
+ assert(recovery_info.oi.snaps.size());
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ set<snapid_t> snaps(
+ recovery_info.oi.snaps.begin(),
+ recovery_info.oi.snaps.end());
+ snap_mapper.add_oid(
+ recovery_info.soid,
+ snaps,
+ &_t);
+ }
+
+ if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+ assert(is_primary());
+ const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+ if (latest->op == pg_log_entry_t::LOST_REVERT &&
+ latest->reverting_to == recovery_info.version) {
+ dout(10) << " got old revert version " << recovery_info.version
+ << " for " << *latest << dendl;
+ recovery_info.version = latest->version;
+ // update the attr to the revert event version
+ recovery_info.oi.prior_version = recovery_info.oi.version;
+ recovery_info.oi.version = latest->version;
+ bufferlist bl;
+ ::encode(recovery_info.oi, bl);
+ t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+ }
+ }
+
+ // keep track of active pushes for scrub
+ ++active_pushes;
+
+ recover_got(recovery_info.soid, recovery_info.version);
+
+ if (is_primary()) {
+ info.stats.stats.sum.add(stat_diff);
+
+ assert(obc);
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+
+
+ t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+
+ publish_stats_to_osd();
+ if (waiting_for_missing_object.count(hoid)) {
+ dout(20) << " kicking waiters on " << hoid << dendl;
+ requeue_ops(waiting_for_missing_object[hoid]);
+ waiting_for_missing_object.erase(hoid);
+ if (pg_log.get_missing().missing.size() == 0) {
+ requeue_ops(waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ }
+ }
+ } else {
+ t->register_on_applied(
+ new C_OSD_AppliedRecoveredObjectReplica(this));
+
+ }
+
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
+
+ // update pg
+ dirty_info = true;
+ write_if_dirty(*t);
+
+ }
+
+ void ReplicatedPG::on_global_recover(
+ const hobject_t &soid)
+ {
+ publish_stats_to_osd();
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
+ assert(recovering.count(soid));
+ recovering.erase(soid);
+ finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ requeue_ops(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
+ finish_degraded_object(soid);
+ }
+
+ void ReplicatedPG::on_peer_recover(
+ int peer,
+ const hobject_t &soid,
+ const ObjectRecoveryInfo &recovery_info,
+ const object_stat_sum_t &stat)
+ {
+ info.stats.stats.sum.add(stat);
+ publish_stats_to_osd();
+ // done!
+ peer_missing[peer].got(soid, recovery_info.version);
+ if (peer == backfill_target && backfills_in_flight.count(soid))
+ backfills_in_flight.erase(soid);
+ }
+
+ void ReplicatedPG::begin_peer_recover(
+ int peer,
+ const hobject_t soid)
+ {
+ peer_missing[peer].revise_have(soid, eversion_t());
+ }
+
// =======================
// pg changes
src_oloc.key = oid.name;
}
- switch (op->request->get_type()) {
+ void ReplicatedPG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
+ {
+ // do any pending flush
+ do_pending_flush();
+
+ if (!op_has_sufficient_caps(op)) {
+ osd->reply_op_error(op, -EPERM);
+ return;
+ }
+ assert(!op_must_wait_for_map(get_osdmap(), op));
+ if (can_discard_request(op)) {
+ return;
+ }
+ if (!flushed) {
+ dout(20) << " !flushed, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+
+ if (pgbackend->handle_message(op))
+ return;
+
++ switch (op->get_req()->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ if (is_replay() || !is_active()) {
+ dout(20) << " replay, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+ do_op(op); // do it now
+ break;
+
+ case MSG_OSD_SUBOP:
+ do_sub_op(op);
+ break;
+
+ case MSG_OSD_SUBOPREPLY:
+ do_sub_op_reply(op);
+ break;
+
+ case MSG_OSD_PG_SCAN:
+ do_scan(op, handle);
+ break;
+
+ case MSG_OSD_PG_BACKFILL:
+ do_backfill(op);
+ break;
+
+ default:
+ assert(0 == "bad message type in do_request");
+ }
+ }
+
+
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
<< " lat " << latency << dendl;
}
- void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
- {
- utime_t now = ceph_clock_now(cct);
- utime_t latency = now;
- latency -= op->get_req()->get_recv_stamp();
-
- uint64_t inb = op->get_req()->get_data().length();
-
- osd->logger->inc(l_osd_sop);
-
- osd->logger->inc(l_osd_sop_inb, inb);
- osd->logger->tinc(l_osd_sop_lat, latency);
-
- if (tag_inb)
- osd->logger->inc(tag_inb, inb);
- osd->logger->tinc(tag_lat, latency);
- }
-
-
-
void ReplicatedPG::do_sub_op(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(have_same_or_newer_map(m->map_epoch));
assert(m->get_header().type == MSG_OSD_SUBOP);
- dout(15) << "do_sub_op " << *op->request << dendl;
+ dout(15) << "do_sub_op " << *op->get_req() << dendl;
OSDOp *first = NULL;
if (m->ops.size() >= 1) {
}
}
- void ReplicatedPG::_do_push(OpRequestRef op)
+ void ReplicatedBackend::_do_push(OpRequestRef op)
{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH);
int from = m->get_source().num();
reply->replies.swap(replies);
reply->compute_cost(cct);
- t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ t->register_on_complete(
+ new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection()));
- osd->store->queue_transaction(osr.get(), t);
+ get_parent()->queue_transaction(t);
}
- void ReplicatedPG::_do_pull_response(OpRequestRef op)
+ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+ ReplicatedBackend *bc;
+ list<ObjectContextRef> to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(ThreadPool::TPHandle &handle) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<ObjectContextRef>::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) {
+ bc->get_parent()->on_global_recover(
+ (*i)->obs.oi.soid);
+ }
+ handle.reset_tp_timeout();
+ }
+ bc->run_recovery_op(h, priority);
+ }
+ };
+
+ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
- MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH);
int from = m->get_source().num();
reply->pulls.swap(replies);
reply->compute_cost(cct);
- t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ t->register_on_complete(
+ new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection()));
}
- osd->store->queue_transaction(osr.get(), t);
+ get_parent()->queue_transaction(t);
}
- void ReplicatedPG::do_pull(OpRequestRef op)
+ void ReplicatedBackend::do_pull(OpRequestRef op)
{
- MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
+ MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PULL);
int from = m->get_source().num();
send_pushes(m->get_priority(), replies);
}
- void ReplicatedPG::do_push_reply(OpRequestRef op)
+ void ReplicatedBackend::do_push_reply(OpRequestRef op)
{
- MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
+ MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
int from = m->get_source().num();
op->soid = soid;
}
- void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
+ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
{
- MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
+ MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
const hobject_t& soid = reply->get_poid();
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
PushOp pop;
bool more = handle_push_reply(peer, rop, &pop);
if (more)
- send_push_op_legacy(pushing[soid][peer].priority, peer, pop);
- send_push_op_legacy(op->request->get_priority(), peer, pop);
++ send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
}
- bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
+ bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
if (pushing.count(soid) == 0) {
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
- void ReplicatedPG::sub_op_pull(OpRequestRef op)
+ void ReplicatedBackend::sub_op_pull(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
op->mark_started();
/** op_push
* NOTE: called from opqueue.
*/
- void ReplicatedPG::sub_op_push(OpRequestRef op)
+ void ReplicatedBackend::sub_op_push(OpRequestRef op)
{
op->mark_started();
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
+ MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
PushOp pop;
pop.soid = m->recovery_info.soid;
m->get_source().num(),
resp.recovery_info,
resp.recovery_progress);
- }
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
- op->request->get_priority());
++ op->get_req()->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new C_QueueInWQ(
+ &osd->push_wq,
+ get_parent()->bless_gencontext(c)));
+ }
- run_recovery_op(h, op->request->get_priority());
++ run_recovery_op(h, op->get_req()->get_priority());
} else {
PushReplyOp resp;
MOSDSubOpReply *reply = new MOSDSubOpReply(