switch (op->get_req()->get_type()) {
case MSG_OSD_PG_PULL:
return true;
- case MSG_OSD_SUBOP: {
- const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
- if (m->ops.size() >= 1) {
- const OSDOp *first = &m->ops[0];
- switch (first->op.op) {
- case CEPH_OSD_OP_PULL:
- return true;
- default:
- return false;
- }
- } else {
- return false;
- }
- }
default:
return false;
}
case MSG_OSD_SUBOP: {
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
- if (m->ops.size() >= 1) {
- const OSDOp *first = &m->ops[0];
- switch (first->op.op) {
- case CEPH_OSD_OP_PULL:
- sub_op_pull(op);
- return true;
- case CEPH_OSD_OP_PUSH:
- sub_op_push(op);
- return true;
- default:
- break;
- }
- } else {
+ if (m->ops.size() == 0) {
sub_op_modify(op);
return true;
}
return true;
}
- case MSG_OSD_SUBOPREPLY: {
- const MOSDSubOpReply *r = static_cast<const MOSDSubOpReply*>(op->get_req());
- if (r->ops.size() >= 1) {
- const OSDOp &first = r->ops[0];
- switch (first.op.op) {
- case CEPH_OSD_OP_PUSH:
- // continue peer recovery
- sub_op_push_reply(op);
- return true;
- }
- }
- break;
- }
-
case MSG_OSD_REPOPREPLY: {
sub_op_modify_reply(op);
return true;
pi.recovery_progress = new_progress;
}
-int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
- const ObjectRecoveryInfo &recovery_info,
- ObjectRecoveryProgress progress)
-{
- // send op
- ceph_tid_t tid = get_parent()->get_tid();
- osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
-
- dout(10) << "send_pull_op " << recovery_info.soid << " "
- << recovery_info.version
- << " first=" << progress.first
- << " data " << recovery_info.copy_subset
- << " from osd." << peer
- << " tid " << tid << dendl;
-
- MOSDSubOp *subop = new MOSDSubOp(
- rid, parent->whoami_shard(),
- get_info().pgid, recovery_info.soid,
- CEPH_OSD_FLAG_ACK,
- get_osdmap()->get_epoch(), tid,
- recovery_info.version);
- subop->set_priority(prio);
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PULL;
- subop->ops[0].op.extent.length = cct->_conf->osd_recovery_max_chunk;
- subop->recovery_info = recovery_info;
- subop->recovery_progress = progress;
-
- get_parent()->send_message_osd_cluster(
- peer.osd, subop, get_osdmap()->get_epoch());
-
- get_parent()->get_logger()->inc(l_osd_pull);
- return 0;
-}
-
void ReplicatedBackend::submit_push_data(
const ObjectRecoveryInfo &recovery_info,
bool first,
return 0;
}
-int ReplicatedBackend::send_push_op_legacy(int prio, pg_shard_t peer, PushOp &pop)
-{
- ceph_tid_t tid = get_parent()->get_tid();
- osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(
- rid, parent->whoami_shard(),
- spg_t(get_info().pgid.pgid, peer.shard), pop.soid,
- 0, get_osdmap()->get_epoch(),
- tid, pop.recovery_info.version);
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-
- subop->set_priority(prio);
- subop->version = pop.version;
- subop->ops[0].indata.claim(pop.data);
- subop->data_included.swap(pop.data_included);
- subop->omap_header.claim(pop.omap_header);
- subop->omap_entries.swap(pop.omap_entries);
- subop->attrset.swap(pop.attrset);
- subop->recovery_info = pop.recovery_info;
- subop->current_progress = pop.before_progress;
- subop->recovery_progress = pop.after_progress;
-
- get_parent()->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
- return 0;
-}
-
void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
{
op->recovery_info.version = eversion_t();
op->soid = soid;
}
-void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
-{
- const MOSDSubOpReply *reply = static_cast<const MOSDSubOpReply*>(op->get_req());
- const hobject_t& soid = reply->get_poid();
- assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
- dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
- pg_shard_t peer = reply->from;
-
- op->mark_started();
-
- PushReplyOp rop;
- rop.soid = soid;
- PushOp pop;
- bool more = handle_push_reply(peer, rop, &pop);
- if (more)
- send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
-}
-
bool ReplicatedBackend::handle_push_reply(
pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
{
}
}
-/** op_pull
- * process request to pull an entire object.
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_pull(OpRequestRef op)
-{
- const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
- assert(m->get_type() == MSG_OSD_SUBOP);
-
- op->mark_started();
-
- const hobject_t soid = m->poid;
-
- dout(7) << "pull" << soid << " v " << m->version
- << " from " << m->get_source()
- << dendl;
-
- assert(!is_primary()); // we should be a replica or stray.
-
- PullOp pop;
- pop.soid = soid;
- pop.recovery_info = m->recovery_info;
- pop.recovery_progress = m->recovery_progress;
-
- PushOp reply;
- handle_pull(m->from, pop, &reply);
- send_push_op_legacy(
- m->get_priority(),
- m->from,
- reply);
-
- log_subop_stats(get_parent()->get_logger(), op, l_osd_sop_pull);
-}
-
void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
}
}
-/** op_push
- * NOTE: called from opqueue.
- */
-void ReplicatedBackend::sub_op_push(OpRequestRef op)
-{
- op->mark_started();
- // don't bother with const-ness here; we're about to kill MOSDSubOp
- // anyway.
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_nonconst_req());
-
- PushOp pop;
- pop.soid = m->poid;
- pop.version = m->version;
- m->claim_data(pop.data);
- pop.data_included.swap(m->data_included);
- pop.omap_header.swap(m->omap_header);
- pop.omap_entries.swap(m->omap_entries);
- pop.attrset.swap(m->attrset);
- pop.recovery_info = m->recovery_info;
- pop.before_progress = m->current_progress;
- pop.after_progress = m->recovery_progress;
- ObjectStore::Transaction t;
-
- if (is_primary()) {
- PullOp resp;
- RPGHandle *h = _open_recovery_op();
- list<pull_complete_info> to_continue;
- bool more = handle_pull_response(
- m->from, pop, &resp,
- &to_continue, &t);
- if (more) {
- send_pull_legacy(
- m->get_priority(),
- m->from,
- resp.recovery_info,
- resp.recovery_progress);
- } else {
- C_ReplicatedBackend_OnPullComplete *c =
- new C_ReplicatedBackend_OnPullComplete(
- this,
- op->get_req()->get_priority());
- c->to_continue.swap(to_continue);
- t.register_on_complete(
- new PG_RecoveryQueueAsync(
- get_parent(),
- get_parent()->bless_gencontext(c)));
- }
- run_recovery_op(h, op->get_req()->get_priority());
- } else {
- PushReplyOp resp;
- MOSDSubOpReply *reply = new MOSDSubOpReply(
- m, parent->whoami_shard(), 0,
- get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- reply->set_priority(m->get_priority());
- assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- handle_push(m->from, pop, &resp, &t);
- t.register_on_complete(new PG_SendMessageOnConn(
- get_parent(), reply, m->get_connection()));
- }
- get_parent()->queue_transaction(std::move(t));
- return;
-}
-
void ReplicatedBackend::_failed_push(pg_shard_t from, const hobject_t &soid)
{
list<pg_shard_t> fl = { from };