From 4c2404e85ac44b5f4b79c0ae284a3c6c829b3e3e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 25 Nov 2008 06:45:48 -0800 Subject: [PATCH] osd: clean up repop code --- src/osd/ReplicatedPG.cc | 324 +++++++++++++++++++--------------------- src/osd/ReplicatedPG.h | 48 +++--- 2 files changed, 181 insertions(+), 191 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4a2effcb419e4..0d5fa0f6ebf6a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1168,38 +1168,57 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t } + + + + + // ======================================================================== // rep op gather class C_OSD_ModifyCommit : public Context { public: ReplicatedPG *pg; - tid_t rep_tid; - eversion_t pg_last_complete; - C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) { - pg->get(); // we're copying the pointer + ReplicatedPG::RepGather *repop; + + C_OSD_ModifyCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) : + pg(p), repop(rg) { + repop->get(); + pg->get(); // we're copying the pointer } void finish(int r) { pg->lock(); if (!pg->is_deleted()) - pg->op_modify_ondisk(rep_tid, pg_last_complete); + pg->op_modify_ondisk(repop); + repop->put(); pg->put_unlock(); } }; - -void ReplicatedPG::get_rep_gather(RepGather *repop) +/** op_modify_commit + * transaction commit on the acker. + */ +void ReplicatedPG::op_modify_ondisk(RepGather *repop) { - //repop->lock.Lock(); - dout(10) << "get_repop " << *repop << dendl; + if (repop->aborted) { + dout(10) << "op_modify_ondisk " << *repop->op << " -- aborted" << dendl; + } else { + dout(10) << "op_modify_ondisk " << *repop->op << dendl; + assert(repop->waitfor_disk.count(osd->get_nodeid())); + repop->waitfor_nvram.erase(osd->get_nodeid()); + repop->waitfor_disk.erase(osd->get_nodeid()); + repop->pg_complete_thru[osd->get_nodeid()] = repop->pg_local_last_complete; + eval_repop(repop); + } } + void ReplicatedPG::apply_repop(RepGather *repop) { dout(10) << "apply_repop applying update on " << *repop << dendl; assert(!repop->applied); - Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete); + Context *oncommit = new C_OSD_ModifyCommit(this, repop); unsigned r = osd->store->apply_transaction(repop->t, oncommit); if (r) dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl; @@ -1245,19 +1264,18 @@ void ReplicatedPG::apply_repop(RepGather *repop) } update_stats(); - } -void ReplicatedPG::put_rep_gather(RepGather *repop) +void ReplicatedPG::eval_repop(RepGather *repop) { - dout(10) << "put_repop " << *repop << dendl; + dout(10) << "eval_repop " << *repop << dendl; // disk? if (repop->can_send_disk()) { if (repop->op->wants_ondisk()) { // send commit. MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONDISK); - dout(10) << "put_repop sending commit on " << *repop << " " << reply << dendl; + dout(10) << " sending commit on " << *repop << " " << reply << dendl; osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); repop->sent_disk = true; } @@ -1268,7 +1286,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) if (repop->op->wants_onnvram()) { // send commit. MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ONNVRAM); - dout(10) << "put_repop sending onnvram on " << *repop << " " << reply << dendl; + dout(10) << " sending onnvram on " << *repop << " " << reply << dendl; osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); repop->sent_nvram = true; } @@ -1283,7 +1301,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) if (repop->op->wants_ack()) { // send ack MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK); - dout(10) << "put_repop sending ack on " << *repop << " " << reply << dendl; + dout(10) << " sending ack on " << *repop << " " << reply << dendl; osd->messenger->send_message(reply, repop->op->get_orig_source_inst()); repop->sent_ack = true; } @@ -1305,20 +1323,19 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) } if (min > peers_complete_thru) { - dout(10) << "put_repop peers_complete_thru " - << peers_complete_thru << " -> " << min - << dendl; + dout(10) << " peers_complete_thru " + << peers_complete_thru << " -> " << min + << dendl; peers_complete_thru = min; } } - dout(10) << "put_repop deleting " << *repop << dendl; - - assert(rep_gather.count(repop->rep_tid)); - rep_gather.erase(repop->rep_tid); - - delete repop->op; - delete repop; + dout(10) << " removing " << *repop << dendl; + assert(!repop_queue.empty()); + assert(repop_queue.front() == repop); + repop_queue.pop_front(); + repop_map.erase(repop->rep_tid); + repop->put(); } } @@ -1347,42 +1364,32 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) osd->messenger->send_message(wr, osd->osdmap->get_inst(dest)); } -ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv, - SnapSet& snapset, SnapContext& snapc) +ReplicatedPG::RepGather *ReplicatedPG::new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv, + SnapSet& snapset, SnapContext& snapc) { - dout(10) << "new_rep_gather rep_tid " << rep_tid << " on " << *op << dendl; + dout(10) << "new_repop rep_tid " << rep_tid << " on " << *op << dendl; + RepGather *repop = new RepGather(op, rep_tid, nv, info.last_complete, snapset, snapc); - // osds. commits all come to me. - for (unsigned i=0; iosds.insert(osd); - repop->waitfor_disk.insert(osd); - } - - // primary. all osds ack to me. + // initialize gather sets for (unsigned i=0; iwaitfor_ack.insert(osd); + repop->waitfor_disk.insert(osd); } repop->start = g_clock.now(); - rep_gather[repop->rep_tid] = repop; - - // anyone waiting? (acks that got here before the op did) - if (waiting_for_repop.count(repop->rep_tid)) { - osd->take_waiters(waiting_for_repop[repop->rep_tid]); - waiting_for_repop.erase(repop->rep_tid); - } + repop_queue.push_back(repop); + repop_map[repop->rep_tid] = repop; + repop->get(); return repop; } -void ReplicatedPG::repop_ack(RepGather *repop, - int result, int ack_type, +void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, int fromosd, eversion_t pg_complete_thru) { MOSDOp *op = repop->op; @@ -1392,34 +1399,25 @@ void ReplicatedPG::repop_ack(RepGather *repop, << " ack_type " << ack_type << " from osd" << fromosd << dendl; - - get_rep_gather(repop); - { - if (ack_type & CEPH_OSD_OP_ONDISK) { - // disk - assert(repop->waitfor_disk.count(fromosd)); - repop->waitfor_disk.erase(fromosd); - repop->waitfor_nvram.erase(fromosd); - repop->waitfor_ack.erase(fromosd); - repop->pg_complete_thru[fromosd] = pg_complete_thru; - } else if (ack_type & CEPH_OSD_OP_ONNVRAM) { - // nvram - repop->waitfor_nvram.erase(fromosd); - repop->waitfor_ack.erase(fromosd); - } else { - // ack - repop->waitfor_ack.erase(fromosd); - } + + if (ack_type & CEPH_OSD_OP_ONDISK) { + // disk + assert(repop->waitfor_disk.count(fromosd)); + repop->waitfor_disk.erase(fromosd); + repop->waitfor_nvram.erase(fromosd); + repop->waitfor_ack.erase(fromosd); + repop->pg_complete_thru[fromosd] = pg_complete_thru; + } else if (ack_type & CEPH_OSD_OP_ONNVRAM) { + // nvram + repop->waitfor_nvram.erase(fromosd); + repop->waitfor_ack.erase(fromosd); + } else { + // ack + repop->waitfor_ack.erase(fromosd); } - put_rep_gather(repop); -} - - - - - - + eval_repop(repop); +} @@ -1436,77 +1434,9 @@ void ReplicatedPG::repop_ack(RepGather *repop, -/** op_modify_commit - * transaction commit on the acker. - */ -void ReplicatedPG::op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru) -{ - if (rep_gather.count(rep_tid)) { - RepGather *repop = rep_gather[rep_tid]; - - dout(10) << "op_modify_ondisk " << *repop->op << dendl; - get_rep_gather(repop); - { - assert(repop->waitfor_disk.count(osd->get_nodeid())); - repop->waitfor_nvram.erase(osd->get_nodeid()); - repop->waitfor_disk.erase(osd->get_nodeid()); - repop->pg_complete_thru[osd->get_nodeid()] = pg_complete_thru; - } - put_rep_gather(repop); - dout(10) << "op_modify_ondisk done on " << repop << dendl; - } else { - dout(10) << "op_modify_ondisk rep_tid " << rep_tid << " dne" << dendl; - } -} -// commit (to disk) callback -class C_OSD_RepModifyCommit : public Context { -public: - ReplicatedPG *pg; - MOSDSubOp *op; - int destosd; - - eversion_t pg_last_complete; - - Mutex lock; - Cond cond; - bool acked; - bool waiting; - - C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : - pg(p), op(oo), destosd(dosd), pg_last_complete(lc), - lock("C_OSD_RepModifyCommit::lock"), - acked(false), waiting(false) { - pg->get(); // we're copying the pointer. - } - void finish(int r) { - lock.Lock(); - assert(!waiting); - while (!acked) { - waiting = true; - cond.Wait(lock); - } - assert(acked); - lock.Unlock(); - - pg->lock(); - pg->sub_op_modify_ondisk(op, destosd, pg_last_complete); - pg->put_unlock(); - } - void ack() { - lock.Lock(); - assert(!acked); - acked = true; - if (waiting) cond.Signal(); - - // discard my reference to buffer - op->get_data().clear(); - - lock.Unlock(); - } -}; void ReplicatedPG::op_modify(MOSDOp *op) { @@ -1645,7 +1575,8 @@ void ReplicatedPG::op_modify(MOSDOp *op) // issue replica writes tid_t rep_tid = osd->get_tid(); - RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc); + RepGather *repop = new_repop(op, rep_tid, av, snapset, snapc); + for (unsigned i=1; iwaitfor_ack.count(whoami)); - repop->waitfor_ack.erase(whoami); - } - put_rep_gather(repop); + assert(repop->waitfor_ack.count(whoami)); + repop->waitfor_ack.erase(whoami); + eval_repop(repop); + repop->put(); } + + // sub op modify + +// commit (to disk) callback +class C_OSD_RepModifyCommit : public Context { +public: + ReplicatedPG *pg; + + MOSDSubOp *op; + int destosd; + + eversion_t pg_last_complete; + + Mutex lock; + Cond cond; + bool acked; + bool waiting; + + C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDSubOp *oo, int dosd, eversion_t lc) : + pg(p), op(oo), destosd(dosd), pg_last_complete(lc), + lock("C_OSD_RepModifyCommit::lock"), + acked(false), waiting(false) { + pg->get(); // we're copying the pointer. + } + void finish(int r) { + lock.Lock(); + assert(!waiting); + while (!acked) { + waiting = true; + cond.Wait(lock); + } + assert(acked); + lock.Unlock(); + + pg->lock(); + pg->sub_op_modify_ondisk(op, destosd, pg_last_complete); + pg->put_unlock(); + } + void ack() { + lock.Lock(); + assert(!acked); + acked = true; + if (waiting) cond.Signal(); + + // discard my reference to buffer + op->get_data().clear(); + + lock.Unlock(); + } +}; + void ReplicatedPG::sub_op_modify(MOSDSubOp *op) { pobject_t poid = op->poid; @@ -1763,17 +1743,15 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r) osd->take_peer_stat(fromosd, r->get_peer_stat()); - if (rep_gather.count(rep_tid)) { + if (repop_map.count(rep_tid)) { // oh, good. - repop_ack(rep_gather[rep_tid], + repop_ack(repop_map[rep_tid], r->get_result(), r->ack_type, fromosd, r->get_pg_complete_thru()); - delete r; - } else { - // early ack. - waiting_for_repop[rep_tid].push_back(r); } + + delete r; } @@ -2385,14 +2363,15 @@ void ReplicatedPG::on_osd_failure(int o) { dout(10) << "on_osd_failure " << o << dendl; - hash_map::iterator p = rep_gather.begin(); - while (p != rep_gather.end()) { - RepGather *repop = p->second; - p++; - dout(-1) << "checking repop tid " << repop->rep_tid << dendl; + // artificially ack failed osds + deque::iterator p = repop_queue.begin(); + while (p != repop_queue.end()) { + RepGather *repop = *p++; + dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl; if (repop->waitfor_ack.count(o) || + repop->waitfor_nvram.count(o) || repop->waitfor_disk.count(o)) - repop_ack(repop, -1, true, o); + repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o); } // remove from pushing map @@ -2419,26 +2398,27 @@ void ReplicatedPG::on_change() // apply all local repops // (pg is inactive; we will repeer) - for (hash_map::iterator p = rep_gather.begin(); - p != rep_gather.end(); + for (deque::iterator p = repop_queue.begin(); + p != repop_queue.end(); p++) - if (!p->second->applied) - apply_repop(p->second); + if (!(*p)->applied) + apply_repop(*p); - hash_map::iterator p = rep_gather.begin(); - while (p != rep_gather.end()) { - RepGather *repop = p->second; + deque::iterator p = repop_queue.begin(); + while (p != repop_queue.end()) { + RepGather *repop = *p; - if (acting.empty() || acting[0] != osd->get_nodeid()) { + if (!is_primary()) { // no longer primary. hose repops. - dout(-1) << "no longer primary, aborting repop tid " << repop->rep_tid << dendl; - rep_gather.erase(p++); - delete repop->op; - delete repop; + dout(-1) << " aborting repop tid " << repop->rep_tid << dendl; + repop->aborted = true; + repop_queue.erase(p++); + repop_map.erase(repop->rep_tid); + repop->put(); } else { // still primary. artificially ack+commit any replicas who dropped out of the pg p++; - dout(-1) << "checking repop tid " << repop->rep_tid << dendl; + dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl; set all; set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(), repop->waitfor_ack.begin(), repop->waitfor_ack.end(), diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index d33427dcc1797..61ff11b3f8d39 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -28,34 +28,34 @@ public: */ class RepGather { public: + int nref; + class MOSDOp *op; tid_t rep_tid; ObjectStore::Transaction t; - bool applied; + bool applied, aborted; pg_stat_t stats; set waitfor_ack; set waitfor_nvram; set waitfor_disk; + bool sent_ack, sent_nvram, sent_disk; utime_t start; - - bool sent_ack, sent_nvram, sent_disk; - set osds; eversion_t old_version, at_version; SnapSet snapset; SnapContext snapc; - eversion_t pg_local_last_complete; + eversion_t pg_local_last_complete; map pg_complete_thru; RepGather(MOSDOp *o, tid_t rt, eversion_t av, eversion_t lc, SnapSet& ss, SnapContext& sc) : - op(o), rep_tid(rt), - applied(false), + nref(1), op(o), rep_tid(rt), + applied(false), aborted(false), sent_ack(false), sent_nvram(false), sent_disk(false), at_version(av), snapset(ss), snapc(sc), @@ -79,28 +79,38 @@ public: bool can_delete() { return waitfor_ack.empty() && waitfor_nvram.empty() && waitfor_disk.empty(); } + + void get() { + nref++; + } + void put() { + if (--nref == 0) { + delete op; + delete this; + } + } }; protected: // replica ops // [primary|tail] - hash_map rep_gather; - hash_map > waiting_for_repop; - - // load balancing - set balancing_reads; - set unbalancing_reads; - hash_map > waiting_for_unbalanced_reads; // i.e. primary-lock + deque repop_queue; + map repop_map; - void get_rep_gather(RepGather*); void apply_repop(RepGather *repop); - void put_rep_gather(RepGather*); + void eval_repop(RepGather*); void issue_repop(RepGather *repop, int dest, utime_t now); - RepGather *new_rep_gather(MOSDOp *op, tid_t rep_tid, eversion_t nv, - SnapSet& snapset, SnapContext& snapc); + RepGather *new_repop(MOSDOp *op, tid_t rep_tid, eversion_t nv, + SnapSet& snapset, SnapContext& snapc); void repop_ack(RepGather *repop, int result, int ack_type, int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); + + // load balancing + set balancing_reads; + set unbalancing_reads; + hash_map > waiting_for_unbalanced_reads; // i.e. primary-lock + // push/pull map > pulling; // which objects are currently being pulled, and from where @@ -122,7 +132,7 @@ protected: // modify - void op_modify_ondisk(tid_t rep_tid, eversion_t pg_complete_thru); + void op_modify_ondisk(RepGather *repop); void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete); void _make_clone(ObjectStore::Transaction& t, -- 2.39.5