return 0;
}
-void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
+void ReplicatedPG::prep_push_op_blank(const hobject_t& soid, PushOp *op)
{
- // send a blank push back to the primary
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
- get_osdmap()->get_epoch(), tid, eversion_t());
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
- subop->first = false;
- subop->complete = false;
- osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
+ op->recovery_info.version = eversion_t();
+ op->version = eversion_t();
+ op->soid = soid;
}
void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
const hobject_t soid = m->poid;
- dout(7) << "op_pull " << soid << " v " << m->version
+ dout(7) << "pull" << soid << " v " << m->version
<< " from " << m->get_source()
<< dendl;
assert(!is_primary()); // we should be a replica or stray.
+ PullOp pop;
+ pop.soid = soid;
+ pop.recovery_info = m->recovery_info;
+ pop.recovery_progress = m->recovery_progress;
+
+ PushOp reply;
+ handle_pull(m->get_source().num(), pop, &reply);
+ send_push_op(
+ m->get_priority(),
+ m->get_source().num(),
+ reply);
+
+ log_subop_stats(op, 0, l_osd_sop_pull_lat);
+}
+
+void ReplicatedPG::handle_pull(int peer, PullOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
struct stat st;
int r = osd->store->stat(coll, soid, &st);
if (r != 0) {
- osd->clog.error() << info.pgid << " " << m->get_source() << " tried to pull " << soid
+ osd->clog.error() << info.pgid << " " << peer << " tried to pull " << soid
<< " but got " << cpp_strerror(-r) << "\n";
- send_push_op_blank(soid, m->get_source().num());
+ prep_push_op_blank(soid, reply);
} else {
- ObjectRecoveryInfo recovery_info = m->recovery_info;
- ObjectRecoveryProgress progress = m->recovery_progress;
+ ObjectRecoveryInfo &recovery_info = op.recovery_info;
+ ObjectRecoveryProgress &progress = op.recovery_progress;
if (progress.first && recovery_info.size == ((uint64_t)-1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
assert(recovery_info.clone_subset.empty());
}
- r = send_push(m->get_priority(),
- m->get_source().num(),
- recovery_info, progress);
+ r = build_push_op(recovery_info, progress, 0, reply);
if (r < 0)
- send_push_op_blank(soid, m->get_source().num());
+ prep_push_op_blank(soid, reply);
}
-
- log_subop_stats(op, 0, l_osd_sop_pull_lat);
}
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets,
PushOp *op);
- void send_push_op_blank(const hobject_t& soid, int peer);
+ void prep_push_op_blank(const hobject_t& soid, PushOp *op);
void finish_degraded_object(const hobject_t& oid);
void sub_op_push_reply(OpRequestRef op);
bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
void sub_op_pull(OpRequestRef op);
+ void handle_pull(int peer, PullOp &op, PushOp *reply);
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);