int ReplicatedPG::send_push(int prio, int peer,
const ObjectRecoveryInfo &recovery_info,
- ObjectRecoveryProgress progress,
+ const ObjectRecoveryProgress &progress,
ObjectRecoveryProgress *out_progress)
{
- ObjectRecoveryProgress new_progress = progress;
+ PushOp op;
+ int r = build_push_op(recovery_info, progress, out_progress, &op);
+ if (r < 0)
+ return r;
+ return send_push_op(prio, peer, op);
+}
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
- false, 0, get_osdmap()->get_epoch(),
- tid, recovery_info.version);
- subop->set_priority(prio);
+int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op)
+{
+ ObjectRecoveryProgress _new_progress;
+ if (!out_progress)
+ out_progress = &_new_progress;
+ ObjectRecoveryProgress &new_progress = *out_progress;
+ new_progress = progress;
dout(7) << "send_push_op " << recovery_info.soid
<< " v " << recovery_info.version
<< " size " << recovery_info.size
- << " to osd." << peer
<< " recovery_info: " << recovery_info
<< dendl;
- subop->ops = vector<OSDOp>(1);
- subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
-
if (progress.first) {
- osd->store->omap_get_header(coll, recovery_info.soid, &subop->omap_header);
- osd->store->getattrs(coll, recovery_info.soid, subop->attrset);
+ osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+ osd->store->getattrs(coll, recovery_info.soid, out_op->attrset);
// Debug
bufferlist bv;
- bv.push_back(subop->attrset[OI_ATTR]);
+ bv.push_back(out_op->attrset[OI_ATTR]);
object_info_t oi(bv);
if (oi.version != recovery_info.version) {
osd->clog.error() << info.pgid << " push "
<< recovery_info.soid << " v "
- << recovery_info.version << " to osd." << peer
<< " failed because local copy is "
<< oi.version << "\n";
- subop->put();
- return -1;
+ return -EINVAL;
}
new_progress.first = false;
for (iter->lower_bound(progress.omap_recovered_to);
iter->valid();
iter->next()) {
- if (!subop->omap_entries.empty() &&
+ if (!out_op->omap_entries.empty() &&
available <= (iter->key().size() + iter->value().length()))
break;
- subop->omap_entries.insert(make_pair(iter->key(), iter->value()));
+ out_op->omap_entries.insert(make_pair(iter->key(), iter->value()));
if ((iter->key().size() + iter->value().length()) <= available)
available -= (iter->key().size() + iter->value().length());
}
if (available > 0) {
- subop->data_included.span_of(recovery_info.copy_subset,
+ out_op->data_included.span_of(recovery_info.copy_subset,
progress.data_recovered_to,
available);
} else {
- subop->data_included.clear();
+ out_op->data_included.clear();
}
- for (interval_set<uint64_t>::iterator p = subop->data_included.begin();
- p != subop->data_included.end();
+ for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
+ p != out_op->data_included.end();
++p) {
bufferlist bit;
osd->store->read(coll, recovery_info.soid,
p.set_len(bit.length());
new_progress.data_complete = true;
}
- subop->ops[0].indata.claim_append(bit);
+ out_op->data.claim_append(bit);
}
- if (!subop->data_included.empty())
- new_progress.data_recovered_to = subop->data_included.range_end();
+ if (!out_op->data_included.empty())
+ new_progress.data_recovered_to = out_op->data_included.range_end();
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
info.stats.stats.sum.num_objects_recovered++;
}
- info.stats.stats.sum.num_keys_recovered += subop->omap_entries.size();
- info.stats.stats.sum.num_bytes_recovered += subop->ops[0].indata.length();
+ info.stats.stats.sum.num_keys_recovered += out_op->omap_entries.size();
+ info.stats.stats.sum.num_bytes_recovered += out_op->data.length();
osd->logger->inc(l_osd_push);
- osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length());
+ osd->logger->inc(l_osd_push_outb, out_op->data.length());
// send
- subop->recovery_info = recovery_info;
- subop->recovery_progress = new_progress;
- subop->current_progress = progress;
+ out_op->version = recovery_info.version;
+ out_op->soid = recovery_info.soid;
+ out_op->recovery_info = recovery_info;
+ out_op->after_progress = new_progress;
+ out_op->before_progress = progress;
+ return 0;
+}
+
+int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop)
+{
+ tid_t tid = osd->get_tid();
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+ MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, pop.soid,
+ false, 0, get_osdmap()->get_epoch(),
+ tid, pop.recovery_info.version);
+ subop->ops = vector<OSDOp>(1);
+ subop->ops[0].op.op = CEPH_OSD_OP_PUSH;
+
+ subop->set_priority(prio);
+ subop->version = pop.version;
+ subop->ops[0].indata.claim(pop.data);
+ subop->data_included.swap(pop.data_included);
+ subop->omap_header.claim(pop.omap_header);
+ subop->omap_entries.swap(pop.omap_entries);
+ subop->attrset.swap(pop.attrset);
+ subop->recovery_info = pop.recovery_info;
+ subop->current_progress = pop.before_progress;
+ subop->recovery_progress = pop.after_progress;
+
osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
- if (out_progress)
- *out_progress = new_progress;
return 0;
}