From f49f78d03ba9786f4e8423c530f0e016a8d814fa Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 22 Jan 2009 16:14:52 -0800 Subject: [PATCH] osd: reset peering, in-flight repops on every pg change Instead of complicated (and flawed) logic for letting in-progress rep ops complete while the PG changes (but primary remains the same), reset the pg state entirely. Apply any ops we have in hand, but drop everything else, including PUSH/PULL ops. This vastly simplifies the logic in the OSD and makes it easy to reason about things. Fix clients (Objecter, osd_client) to resubmit ops when PG membership changes (not just on primary change). --- src/osd/OSD.cc | 197 ++++++++++++++++++---------------------- src/osd/ReplicatedPG.cc | 114 +++-------------------- src/osdc/Objecter.cc | 19 ---- 3 files changed, 104 insertions(+), 226 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c1aa6b63846a2..ed5ab2ad266d1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3372,129 +3372,100 @@ void OSD::handle_op(MOSDOp *op) } } - if (!op->get_source().is_osd()) { - // REGULAR OP (non-replication) - - // note original source - op->clear_payload(); // and hose encoded payload (in case we forward) - - // have pg? - if (!pg) { - dout(7) << "hit non-existent pg " - << pgid - << ", waiting" << dendl; - waiting_for_pg[pgid].push_back(op); + // we don't need encoded payload anymore + op->clear_payload(); + + // have pg? + if (!pg) { + dout(7) << "hit non-existent pg " + << pgid + << ", waiting" << dendl; + waiting_for_pg[pgid].push_back(op); + return; + } + + // pg must be same-ish... + if (!op->is_modify()) { + // read + if (!pg->same_for_read_since(op->get_map_epoch())) { + dout(7) << "handle_rep_op pg changed " << pg->info.history + << " after " << op->get_map_epoch() + << ", dropping" << dendl; + assert(op->get_map_epoch() < osdmap->get_epoch()); + pg->unlock(); + delete op; return; } - - // pg must be same-ish... - if (!op->is_modify()) { - // read - if (!pg->same_for_read_since(op->get_map_epoch())) { - dout(7) << "handle_rep_op pg changed " << pg->info.history - << " after " << op->get_map_epoch() - << ", dropping" << dendl; - assert(op->get_map_epoch() < osdmap->get_epoch()); - pg->unlock(); - delete op; - return; - } - - if (op->get_oid().snap > 0) { - // snap read. hrm. - // are we missing a revision that we might need? - // let's get them all. - for (unsigned i=0; iget_snaps().size(); i++) { - object_t oid = op->get_oid(); - oid.snap = op->get_snaps()[i]; - if (pg->is_missing_object(oid)) { - dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl; - pg->wait_for_missing_object(op->get_oid(), op); - pg->unlock(); - return; - } - } - } - - } else { - // modify - if ((!pg->is_primary() || - !pg->same_for_modify_since(op->get_map_epoch()))) { - dout(7) << "handle_op pg changed " << pg->info.history - << " after " << op->get_map_epoch() - << ", dropping" << dendl; - assert(op->get_map_epoch() < osdmap->get_epoch()); - pg->unlock(); - delete op; - return; - } - - // scrubbing? - if (pg->is_scrubbing()) { - dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl; - pg->waiting_for_active.push_back(op); - pg->unlock(); - return; - } - } - // pg must be active. - if (!pg->is_active()) { - // replay? - if (op->get_version().version > 0) { - if (op->get_version() > pg->info.last_update) { - dout(7) << *pg << " queueing replay at " << op->get_version() - << " for " << *op << dendl; - pg->replay_queue[op->get_version()] = op; + if (op->get_oid().snap > 0) { + // snap read. hrm. + // are we missing a revision that we might need? + // let's get them all. + for (unsigned i=0; iget_snaps().size(); i++) { + object_t oid = op->get_oid(); + oid.snap = op->get_snaps()[i]; + if (pg->is_missing_object(oid)) { + dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl; + pg->wait_for_missing_object(op->get_oid(), op); pg->unlock(); - return; - } else { - dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update - << " for " << *op - << ", will queue for WRNOOP" << dendl; - } + return; + } } - - dout(7) << *pg << " not active (yet)" << dendl; - pg->waiting_for_active.push_back(op); - pg->unlock(); - return; } - // missing object? - if (pg->is_missing_object(op->get_oid())) { - pg->wait_for_missing_object(op->get_oid(), op); - pg->unlock(); - return; - } - - dout(10) << "handle_op " << *op << " in " << *pg << dendl; - } else { - // REPLICATION OP (it's from another OSD) - - // have pg? - if (!pg) { - derr(-7) << "handle_rep_op " << *op - << " pgid " << pgid << " dne" << dendl; + // modify + if ((!pg->is_primary() || + !pg->same_for_modify_since(op->get_map_epoch()))) { + dout(7) << "handle_op pg changed " << pg->info.history + << " after " << op->get_map_epoch() + << ", dropping" << dendl; + assert(op->get_map_epoch() < osdmap->get_epoch()); + pg->unlock(); delete op; - //assert(0); // wtf, shouldn't happen. return; } - // check osd map: same set, or primary+acker? - if (!pg->same_for_rep_modify_since(op->get_map_epoch())) { - dout(10) << "handle_rep_op pg changed " << pg->info.history - << " after " << op->get_map_epoch() - << ", dropping" << dendl; + // scrubbing? + if (pg->is_scrubbing()) { + dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl; + pg->waiting_for_active.push_back(op); pg->unlock(); - delete op; return; } - - assert(pg->get_role() >= 0); - dout(7) << "handle_rep_op " << op << " in " << *pg << dendl; } + + // pg must be active. + if (!pg->is_active()) { + // replay? + if (op->get_version().version > 0) { + if (op->get_version() > pg->info.last_update) { + dout(7) << *pg << " queueing replay at " << op->get_version() + << " for " << *op << dendl; + pg->replay_queue[op->get_version()] = op; + pg->unlock(); + return; + } else { + dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update + << " for " << *op + << ", will queue for WRNOOP" << dendl; + } + } + + dout(7) << *pg << " not active (yet)" << dendl; + pg->waiting_for_active.push_back(op); + pg->unlock(); + return; + } + + // missing object? + if (pg->is_missing_object(op->get_oid())) { + pg->wait_for_missing_object(op->get_oid(), op); + pg->unlock(); + return; + } + + dout(10) << "handle_op " << *op << " in " << *pg << dendl; // proprocess op? if (pg->preprocess_op(op, now)) { @@ -3554,6 +3525,18 @@ void OSD::handle_sub_op(MOSDSubOp *op) } PG *pg = _lookup_lock_pg(pgid); + + // same pg? + // if pg changes _at all_, we reset and repeer! + if (op->map_epoch < pg->info.history.same_since) { + dout(10) << "handle_sub_op pg changed " << pg->info.history + << " after " << op->map_epoch + << ", dropping" << dendl; + pg->unlock(); + delete op; + return; + } + if (g_conf.osd_maxthreads < 1) { pg->do_sub_op(op); // do it now } else { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index dbb4777dc3916..c679cafdf1e3b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1764,17 +1764,8 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) << dendl; // sanity checks - if (op->map_epoch < info.history.same_primary_since) { - dout(10) << "sub_op_modify discarding old sub_op from " - << op->map_epoch << " < " << info.history.same_primary_since << dendl; - delete op; - return; - } - if (!is_active()) { - dout(10) << "sub_op_modify not active" << dendl; - delete op; - return; - } + assert(op->map_epoch >= info.history.same_primary_since); + assert(is_active()); assert(is_replica()); // note peer's stat @@ -2246,16 +2237,8 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op) << " from " << op->get_source() << dendl; - if (op->map_epoch < info.history.same_primary_since) { - dout(10) << "sub_op_pull discarding old sub_op from " - << op->map_epoch << " < " << info.history.same_primary_since << dendl; - delete op; - return; - } - assert(!is_primary()); // we should be a replica or stray. - // push it back! push(poid, op->get_source().num(), op->data_subset, op->clone_subsets); delete op; } @@ -2282,24 +2265,6 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) interval_set<__u64> data_subset; map > clone_subsets; - if (!is_primary()) { - // non-primary should only accept pushes from the current primary. - if (op->map_epoch < info.history.same_primary_since) { - dout(10) << "sub_op_push discarding old sub_op from " - << op->map_epoch << " < " << info.history.same_primary_since << dendl; - delete op; - return; - } - // FIXME: actually, no, what i really want here is a personal "same_role_since" - if (!is_active()) { - dout(10) << "sub_op_push not active" << dendl; - delete op; - return; - } - } else { - // primary will accept pushes anytime. - } - // are we missing (this specific version)? // (if version is wrong, it is either old (we don't want it) or // newer (peering is buggy)) @@ -2498,31 +2463,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) void ReplicatedPG::on_osd_failure(int o) { - dout(10) << "on_osd_failure " << o << dendl; - - // artificially ack failed osds - xlist::iterator p = repop_queue.begin(); - while (!p.end()) { - RepGather *repop = *p; - ++p; - dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl; - if (repop->waitfor_ack.count(o) || - repop->waitfor_nvram.count(o) || - repop->waitfor_disk.count(o)) - repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o); - } - - // remove from pushing map - { - map >::iterator p = pulling.begin(); - while (p != pulling.end()) - if (p->second.second == o) { - dout(10) << " forgetting pull of " << p->first << " " << p->second.first - << " from osd" << o << dendl; - pulling.erase(p++); - } else - p++; - } + //dout(10) << "on_osd_failure " << o << dendl; } void ReplicatedPG::on_shutdown() @@ -2548,47 +2489,20 @@ void ReplicatedPG::on_change() { dout(10) << "on_change" << dendl; - // apply all local repops - // (pg is inactive; we will repeer) - for (xlist::iterator p = repop_queue.begin(); - !p.end(); ++p) - if (!(*p)->applied) - apply_repop(*p); - - xlist::iterator p = repop_queue.begin(); - while (!p.end()) { - RepGather *repop = *p; - ++p; - - if (!is_primary()) { - // no longer primary. hose repops. - dout(-1) << " aborting repop tid " << repop->rep_tid << dendl; - repop->aborted = true; - repop->queue_item.remove_myself(); - repop_map.erase(repop->rep_tid); - repop->put(); - } else { - // still primary. artificially ack+commit any replicas who dropped out of the pg - dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl; - set all; - set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(), - repop->waitfor_ack.begin(), repop->waitfor_ack.end(), - inserter(all, all.begin())); - for (set::iterator q = all.begin(); q != all.end(); q++) { - if (*q == osd->get_nodeid()) - continue; - bool have = false; - for (unsigned i=1; irep_tid << dendl; + if (!repop->applied) + apply_repop(repop); + repop->aborted = true; + repop->put(); } - // clear pushing map + // clear pushing/pulling maps pushing.clear(); + pulling.clear(); } void ReplicatedPG::on_role_change() diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 43ed4d20cae09..333635e0e6c16 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -215,25 +215,6 @@ void Objecter::scan_pgs(set& changed_pgs) other.swap(pg.acting); - if (g_conf.osd_rep == OSD_REP_PRIMARY) { - // same primary? - if (!other.empty() && - !pg.acting.empty() && - other[0] == pg.acting[0]) - continue; - } - else if (g_conf.osd_rep == OSD_REP_SPLAY) { - // same primary and acker? - if (!other.empty() && - !pg.acting.empty() && - other[0] == pg.acting[0] && - other[other.size() > 1 ? 1:0] == pg.acting[pg.acting.size() > 1 ? 1:0]) - continue; - } - else if (g_conf.osd_rep == OSD_REP_CHAIN) { - // any change is significant. - } - // changed significantly. dout(10) << "scan_pgs pg " << pgid << " (" << pg.active_tids << ")" -- 2.39.5