void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
+ const hobject_t& soid = reply->get_poid();
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+ int peer = reply->get_source().num();
op->mark_started();
- int peer = reply->get_source().num();
- const hobject_t& soid = reply->get_poid();
-
+ PushReplyOp rop;
+ rop.soid = soid;
+ PushOp pop;
+ bool more = handle_push_reply(peer, rop, &pop);
+ if (more)
+ send_push_op(pushing[soid][peer].priority, peer, pop);
+}
+
+bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
+{
+ const hobject_t &soid = op.soid;
if (pushing.count(soid) == 0) {
dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
<< ", or anybody else"
<< dendl;
+ return false;
} else if (pushing[soid].count(peer) == 0) {
dout(10) << "huh, i wasn't pushing " << soid << " to osd." << peer
<< dendl;
+ return false;
} else {
PushInfo *pi = &pushing[soid][peer];
<< pi->recovery_progress.data_recovered_to
<< " of " << pi->recovery_info.copy_subset << dendl;
ObjectRecoveryProgress new_progress;
- send_push(
- pi->priority,
- peer, pi->recovery_info,
- pi->recovery_progress, &new_progress);
+ build_push_op(
+ pi->recovery_info,
+ pi->recovery_progress, &new_progress, reply);
pi->recovery_progress = new_progress;
+ return true;
} else {
// done!
if (peer == backfill_target && backfills_in_flight.count(soid))
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
}
+ return false;
}
}
}
void sub_op_push(OpRequestRef op);
void _failed_push(int from, const hobject_t &soid);
void sub_op_push_reply(OpRequestRef op);
+ bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
void sub_op_pull(OpRequestRef op);
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);