From 462476404d1728122c4e44c1ed7a5ffa267b50b2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 20 Jul 2010 12:30:53 -0700 Subject: [PATCH] osd: push in chunks Signed-off-by: Sage Weil --- src/messages/MOSDSubOp.h | 20 +++++++++- src/osd/ReplicatedPG.cc | 83 +++++++++++++++++++++++++--------------- src/osd/ReplicatedPG.h | 5 ++- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index e4aecf8b114ca..aacd023857bd0 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -64,7 +64,9 @@ public: interval_set data_subset; map > clone_subsets; - virtual void decode_payload() { + bool first, complete; + + virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(map_epoch, p); ::decode(reqid, p); @@ -96,9 +98,16 @@ public: ::decode(attrset, p); ::decode(data_subset, p); ::decode(clone_subsets, p); + + if (header.version >= 2) { + ::decode(first, p); + ::decode(complete, p); + } } virtual void encode_payload() { + header.version = 2; + ::encode(map_epoch, payload); ::encode(reqid, payload); ::encode(pgid, payload); @@ -131,6 +140,8 @@ public: header.data_off = ops[0].op.extent.offset; else header.data_off = 0; + ::encode(first, payload); + ::encode(complete, payload); } @@ -144,7 +155,8 @@ public: acks_wanted(aw), noop(noop_), old_exists(false), old_size(0), - version(v) + version(v), + first(false), complete(false) { memset(&peer_stat, 0, sizeof(peer_stat)); set_tid(rtid); @@ -162,6 +174,10 @@ public: << " " << ops; if (noop) out << " (NOOP)"; + if (first) + out << " first"; + if (complete) + out << " complete"; out << " v " << version << " snapset=" << snapset << " snapc=" << snapc; if (!data_subset.empty()) out << " subset " << data_subset; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index e450c696a63ac..00e77651ff68e 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3209,14 +3209,17 @@ void ReplicatedPG::push_start(const sobject_t& soid, int peer, { // take note. push_info_t *pi = &pushing[soid][peer]; + pi->size = size; pi->version = version; pi->data_subset = data_subset; pi->clone_subsets = clone_subsets; - //pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk); + + pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk); + bool complete = pi->data_subset_pushing == pi->data_subset; dout(10) << "push_start " << soid << " size " << size << " data " << data_subset << " cloning " << clone_subsets << dendl; - send_push_op(soid, peer, size, pi->data_subset, pi->clone_subsets); + send_push_op(soid, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets); } @@ -3225,7 +3228,7 @@ void ReplicatedPG::push_start(const sobject_t& soid, int peer, */ void ReplicatedPG::send_push_op(const sobject_t& soid, int peer, - uint64_t size, + uint64_t size, bool first, bool complete, interval_set &data_subset, map >& clone_subsets) { @@ -3272,10 +3275,12 @@ void ReplicatedPG::send_push_op(const sobject_t& soid, int peer, //subop->ops[0].op.extent.offset = 0; //subop->ops[0].op.extent.length = size; subop->ops[0].data = bl; - subop->data_subset.swap(data_subset); - subop->clone_subsets.swap(clone_subsets); + subop->data_subset = data_subset; + subop->clone_subsets = clone_subsets; subop->attrset.swap(attrset); subop->old_size = size; + subop->first = first; + subop->complete = complete; osd->messenger->send_message(subop, osd->osdmap->get_inst(peer)); } @@ -3296,26 +3301,41 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply) } else { push_info_t *pi = &pushing[soid][peer]; - peer_missing[peer].got(soid, pi->version); - if (peer_missing[peer].num_missing() == 0) - uptodate_set.insert(peer); - - pushing[soid].erase(peer); - pi = NULL; - - update_stats(); - - if (pushing[soid].empty()) { - pushing.erase(soid); - dout(10) << "pushed " << soid << " to all replicas" << dendl; - finish_recovery_op(soid); - if (waiting_for_degraded_object.count(soid)) { - osd->take_waiters(waiting_for_degraded_object[soid]); - waiting_for_degraded_object.erase(soid); - } + bool complete = false; + if (pi->data_subset.empty() || + pi->data_subset.end() == pi->data_subset_pushing.end()) + complete = true; + + if (!complete) { + // push more + uint64_t from = pi->data_subset_pushing.end(); + dout(10) << " pushing more, " << pi->data_subset << " from " << from << dendl; + pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf.osd_recovery_max_chunk); + complete = pi->data_subset.end() == pi->data_subset_pushing.end(); + send_push_op(soid, peer, pi->size, false, complete, pi->data_subset_pushing, pi->clone_subsets); } else { - dout(10) << "pushed " << soid << ", still waiting for push ack from " - << pushing[soid].size() << " others" << dendl; + // done! + peer_missing[peer].got(soid, pi->version); + if (peer_missing[peer].num_missing() == 0) + uptodate_set.insert(peer); + + pushing[soid].erase(peer); + pi = NULL; + + update_stats(); + + if (pushing[soid].empty()) { + pushing.erase(soid); + dout(10) << "pushed " << soid << " to all replicas" << dendl; + finish_recovery_op(soid); + if (waiting_for_degraded_object.count(soid)) { + osd->take_waiters(waiting_for_degraded_object[soid]); + waiting_for_degraded_object.erase(soid); + } + } else { + dout(10) << "pushed " << soid << ", still waiting for push ack from " + << pushing[soid].size() << " others" << dendl; + } } } reply->put(); @@ -3342,7 +3362,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op) assert(r == 0); uint64_t size = st.st_size; - send_push_op(soid, op->get_source().num(), size, op->data_subset, op->clone_subsets); + send_push_op(soid, op->get_source().num(), size, op->first, op->complete, op->data_subset, op->clone_subsets); op->put(); } @@ -3431,8 +3451,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) clone_subsets = op->clone_subsets; pull_info_t *pi = 0; - bool first = true; - bool complete = true; + bool first = op->first; + bool complete = op->complete; if (is_primary()) { if (pulling.count(soid) == 0) { dout(10) << " not pulling, ignoring" << dendl; @@ -3500,9 +3520,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) } if (pi->data_subset.empty()) { - first = complete = true; + complete = true; } else { - first = pi->data_subset.start() == data_subset.start(); complete = pi->data_subset.end() == data_subset.end(); } } @@ -3591,6 +3610,10 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) dout(10) << " log.complete_to = " << log.complete_to->version << dendl; } + // update pg + write_info(*t); + + // track ObjectContext if (is_primary()) { dout(10) << " setting up obc for " << soid << dendl; @@ -3606,12 +3629,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) } else { onreadable = new ObjectStore::C_DeleteTransaction(t); } + } else { onreadable = new ObjectStore::C_DeleteTransaction(t); } // apply to disk! - write_info(*t); int r = osd->store->queue_transaction(&osr, t, onreadable, new C_OSD_Commit(this, info.history.same_acting_since, diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 9a74a868b5df7..65dbcab0b4c33 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -457,8 +457,9 @@ protected: // push struct push_info_t { + uint64_t size; eversion_t version; - interval_set data_subset; + interval_set data_subset, data_subset_pushing; map > clone_subsets; }; map > pushing; @@ -478,7 +479,7 @@ protected: interval_set &data_subset, map >& clone_subsets); void send_push_op(const sobject_t& oid, int dest, - uint64_t size, + uint64_t size, bool first, bool complete, interval_set& data_subset, map >& clone_subsets); -- 2.39.5