From: Samuel Just Date: Wed, 12 Jun 2013 22:50:55 +0000 (-0700) Subject: ReplicatedPG: split send_push into build_push_op and send_push_op X-Git-Tag: v0.67-rc1~138^2~1^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=82cb922e89645e342ca6b2362af02c6004ca9f29;p=ceph.git ReplicatedPG: split send_push into build_push_op and send_push_op Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index f01425b4414c..f0f959fa16be 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5723,45 +5723,48 @@ void ReplicatedPG::handle_push(OpRequestRef op) int ReplicatedPG::send_push(int prio, int peer, const ObjectRecoveryInfo &recovery_info, - ObjectRecoveryProgress progress, + const ObjectRecoveryProgress &progress, ObjectRecoveryProgress *out_progress) { - ObjectRecoveryProgress new_progress = progress; + PushOp op; + int r = build_push_op(recovery_info, progress, out_progress, &op); + if (r < 0) + return r; + return send_push_op(prio, peer, op); +} - tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, - false, 0, get_osdmap()->get_epoch(), - tid, recovery_info.version); - subop->set_priority(prio); +int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op) +{ + ObjectRecoveryProgress _new_progress; + if (!out_progress) + out_progress = &_new_progress; + ObjectRecoveryProgress &new_progress = *out_progress; + new_progress = progress; dout(7) << "send_push_op " << recovery_info.soid << " v " << recovery_info.version << " size " << recovery_info.size - << " to osd." << peer << " recovery_info: " << recovery_info << dendl; - subop->ops = vector(1); - subop->ops[0].op.op = CEPH_OSD_OP_PUSH; - if (progress.first) { - osd->store->omap_get_header(coll, recovery_info.soid, &subop->omap_header); - osd->store->getattrs(coll, recovery_info.soid, subop->attrset); + osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); + osd->store->getattrs(coll, recovery_info.soid, out_op->attrset); // Debug bufferlist bv; - bv.push_back(subop->attrset[OI_ATTR]); + bv.push_back(out_op->attrset[OI_ATTR]); object_info_t oi(bv); if (oi.version != recovery_info.version) { osd->clog.error() << info.pgid << " push " << recovery_info.soid << " v " - << recovery_info.version << " to osd." << peer << " failed because local copy is " << oi.version << "\n"; - subop->put(); - return -1; + return -EINVAL; } new_progress.first = false; @@ -5775,10 +5778,10 @@ int ReplicatedPG::send_push(int prio, int peer, for (iter->lower_bound(progress.omap_recovered_to); iter->valid(); iter->next()) { - if (!subop->omap_entries.empty() && + if (!out_op->omap_entries.empty() && available <= (iter->key().size() + iter->value().length())) break; - subop->omap_entries.insert(make_pair(iter->key(), iter->value())); + out_op->omap_entries.insert(make_pair(iter->key(), iter->value())); if ((iter->key().size() + iter->value().length()) <= available) available -= (iter->key().size() + iter->value().length()); @@ -5792,15 +5795,15 @@ int ReplicatedPG::send_push(int prio, int peer, } if (available > 0) { - subop->data_included.span_of(recovery_info.copy_subset, + out_op->data_included.span_of(recovery_info.copy_subset, progress.data_recovered_to, available); } else { - subop->data_included.clear(); + out_op->data_included.clear(); } - for (interval_set::iterator p = subop->data_included.begin(); - p != subop->data_included.end(); + for (interval_set::iterator p = out_op->data_included.begin(); + p != out_op->data_included.end(); ++p) { bufferlist bit; osd->store->read(coll, recovery_info.soid, @@ -5812,30 +5815,54 @@ int ReplicatedPG::send_push(int prio, int peer, p.set_len(bit.length()); new_progress.data_complete = true; } - subop->ops[0].indata.claim_append(bit); + out_op->data.claim_append(bit); } - if (!subop->data_included.empty()) - new_progress.data_recovered_to = subop->data_included.range_end(); + if (!out_op->data_included.empty()) + new_progress.data_recovered_to = out_op->data_included.range_end(); if (new_progress.is_complete(recovery_info)) { new_progress.data_complete = true; info.stats.stats.sum.num_objects_recovered++; } - info.stats.stats.sum.num_keys_recovered += subop->omap_entries.size(); - info.stats.stats.sum.num_bytes_recovered += subop->ops[0].indata.length(); + info.stats.stats.sum.num_keys_recovered += out_op->omap_entries.size(); + info.stats.stats.sum.num_bytes_recovered += out_op->data.length(); osd->logger->inc(l_osd_push); - osd->logger->inc(l_osd_push_outb, subop->ops[0].indata.length()); + osd->logger->inc(l_osd_push_outb, out_op->data.length()); // send - subop->recovery_info = recovery_info; - subop->recovery_progress = new_progress; - subop->current_progress = progress; + out_op->version = recovery_info.version; + out_op->soid = recovery_info.soid; + out_op->recovery_info = recovery_info; + out_op->after_progress = new_progress; + out_op->before_progress = progress; + return 0; +} + +int ReplicatedPG::send_push_op(int prio, int peer, PushOp &pop) +{ + tid_t tid = osd->get_tid(); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, pop.soid, + false, 0, get_osdmap()->get_epoch(), + tid, pop.recovery_info.version); + subop->ops = vector(1); + subop->ops[0].op.op = CEPH_OSD_OP_PUSH; + + subop->set_priority(prio); + subop->version = pop.version; + subop->ops[0].indata.claim(pop.data); + subop->data_included.swap(pop.data_included); + subop->omap_header.claim(pop.omap_header); + subop->omap_entries.swap(pop.omap_entries); + subop->attrset.swap(pop.attrset); + subop->recovery_info = pop.recovery_info; + subop->current_progress = pop.before_progress; + subop->recovery_progress = pop.after_progress; + osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); - if (out_progress) - *out_progress = new_progress; return 0; } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c8b6e15bb843..bc44dee0cff3 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -560,8 +560,15 @@ protected: void handle_push(OpRequestRef op); int send_push(int priority, int peer, const ObjectRecoveryInfo& recovery_info, - ObjectRecoveryProgress progress, + const ObjectRecoveryProgress &progress, ObjectRecoveryProgress *out_progress = 0); + int build_push_op(const ObjectRecoveryInfo &recovery_info, + const ObjectRecoveryProgress &progress, + ObjectRecoveryProgress *out_progress, + PushOp *out_op); + int send_push_op(int priority, int peer, + PushOp &pop); + int send_pull(int priority, int peer, const ObjectRecoveryInfo& recovery_info, ObjectRecoveryProgress progress);