From ee14d9841ccea9362d1f2c3ca10a59545ae90b18 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 15 Sep 2006 21:53:31 +0000 Subject: [PATCH] cleanup of osd failure recovery git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@858 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 11 +- ceph/common/Clock.h | 1 + ceph/osd/OSD.cc | 382 ++++++++++++++++++++++++------------------ ceph/osd/OSD.h | 7 +- ceph/osd/PG.cc | 94 ++++++++--- ceph/osd/PG.h | 44 ++--- ceph/osdc/Objecter.cc | 89 +++++----- ceph/osdc/Objecter.h | 43 +++-- 8 files changed, 381 insertions(+), 290 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index 7d670169710e9..66dadb3f38603 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -4,17 +4,13 @@ - how to get usage feedback to monitor? -- fix tare weirdness - - osd/rados -- clean up writeahead logs +- consider implications of nvram writeahead logs +- deal with divergent replicas that recover - fix heartbeat wrt new replication -- figure out new rep failure cases -- same_primary_since -> same_tail_since - mark residual pgs obsolete ??? -- deal with divergent disconnected primaries - rdlocks +- optimize remove wrt recovery pushes - pg_bit changes - use pg->info.same_role_since wrt replication ops. - report crashed pgs? @@ -39,7 +35,6 @@ monitor objecter -- handle new rep mode failure modes... head, tail, middle objectcacher - ocacher flushing diff --git a/ceph/common/Clock.h b/ceph/common/Clock.h index 10543a41cee76..1fed020eddfa4 100644 --- a/ceph/common/Clock.h +++ b/ceph/common/Clock.h @@ -127,6 +127,7 @@ class Clock { public: Clock() { // set offset + tare(); } // real time. diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 2f2533e507e1d..802ac1eb2c412 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -64,6 +64,7 @@ #include "config.h" #undef dout #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " " char *osd_base_path = "./osddata"; char *ebofs_base_path = "./ebofsdev"; @@ -433,7 +434,7 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch) if (pg->is_crashed() && pg->is_replay() && pg->get_role() == 0 && - pg->info.same_primary_since <= epoch) { + pg->info.history.same_primary_since <= epoch) { ObjectStore::Transaction t; pg->activate(t); store->apply_transaction(t); @@ -980,9 +981,9 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->acting.swap(acting); pg->last_epoch_started_any = pg->info.last_epoch_started = - pg->info.same_primary_since = - pg->info.same_acker_since = - pg->info.same_role_since = osdmap->get_epoch(); + pg->info.history.same_since = + pg->info.history.same_primary_since = + pg->info.history.same_acker_since = osdmap->get_epoch(); pg->activate(t); dout(7) << "created " << *pg << endl; @@ -999,9 +1000,9 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->set_role(role); pg->last_epoch_started_any = pg->info.last_epoch_started = - pg->info.same_primary_since = - pg->info.same_acker_since = - pg->info.same_role_since = osdmap->get_epoch(); + pg->info.history.same_primary_since = + pg->info.history.same_acker_since = + pg->info.history.same_since = osdmap->get_epoch(); pg->activate(t); dout(7) << "created " << *pg << endl; @@ -1023,16 +1024,16 @@ void OSD::advance_map(ObjectStore::Transaction& t) } // get new acting set - vector acting; - int nrep = osdmap->pg_to_acting_osds(pgid, acting); - int role = osdmap->calc_pg_role(whoami, acting, nrep); + vector tacting; + int nrep = osdmap->pg_to_acting_osds(pgid, tacting); + int role = osdmap->calc_pg_role(whoami, tacting, nrep); // no change? - if (acting == pg->acting) + if (tacting == pg->acting) continue; - int primary = -1; - if (nrep > 0) primary = acting[0]; + // -- there was a change! -- + _lock_pg(pgid); int oldrole = pg->get_role(); int oldprimary = pg->get_primary(); @@ -1040,25 +1041,49 @@ void OSD::advance_map(ObjectStore::Transaction& t) vector oldacting = pg->acting; // update PG - pg->acting.swap(acting); + pg->acting.swap(tacting); pg->set_role(role); // did primary|acker change? - if (oldprimary != primary) { - pg->info.same_primary_since = osdmap->get_epoch(); + pg->info.history.same_since = osdmap->get_epoch(); + if (oldprimary != pg->get_primary()) { + pg->info.history.same_primary_since = osdmap->get_epoch(); pg->cancel_recovery(); } if (oldacker != pg->get_acker()) { - pg->info.same_acker_since = osdmap->get_epoch(); + pg->info.history.same_acker_since = osdmap->get_epoch(); } - if (oldprimary != primary || oldacker != pg->get_acker()) { + + // deactivate. + pg->state_clear(PG::STATE_ACTIVE); + + // discard any repops in progress. + if (oldacker == whoami) { // drop our write-ahead log. (we'll only have on if we were just the acker) pg->trim_write_ahead(); + + // drop repops + for (map::iterator p = pg->repop_gather.begin(); + p != pg->repop_gather.end(); + p++) { + dout(-1) << *pg << " discarding repop " << p->second << endl; + delete p->second->op; + delete p->second; + } + pg->repop_gather.clear(); + + // and repop waiters + for (map >::iterator p = pg->waiting_for_repop.begin(); + p != pg->waiting_for_repop.end(); + p++) + for (list::iterator pm = p->second.begin(); + pm != p->second.end(); + pm++) + delete *pm; + pg->waiting_for_repop.clear(); } if (role != oldrole) { - pg->info.same_role_since = osdmap->get_epoch(); - // old primary? if (oldrole == 0) { pg->state_clear(PG::STATE_CLEAN); @@ -1089,12 +1114,10 @@ void OSD::advance_map(ObjectStore::Transaction& t) // new primary? if (role == 0) { // i am new primary - pg->state_clear(PG::STATE_ACTIVE); pg->state_clear(PG::STATE_STRAY); pg->last_epoch_started_any = pg->info.last_epoch_started; } else { // i am now replica|stray. we need to send a notify. - pg->state_clear(PG::STATE_ACTIVE); pg->state_set(PG::STATE_STRAY); if (nrep == 0) { @@ -1104,30 +1127,28 @@ void OSD::advance_map(ObjectStore::Transaction& t) } // my role changed. - dout(10) << *pg << " " << oldacting << " -> " << acting + dout(10) << *pg << " " << oldacting << " -> " << pg->acting << ", role " << oldrole << " -> " << role << endl; } else { // no role change. // did primary change? - if (primary != oldprimary) { + if (pg->get_primary() != oldprimary) { // we need to announce pg->state_set(PG::STATE_STRAY); - pg->state_clear(PG::STATE_ACTIVE); - dout(10) << *pg << " " << oldacting << " -> " << acting + dout(10) << *pg << " " << oldacting << " -> " << pg->acting << ", acting primary " - << oldprimary << " -> " << primary + << oldprimary << " -> " << pg->get_primary() << endl; } else { // primary is the same. if (role == 0) { // i am (still) primary. but my replica set changed. - pg->state_clear(PG::STATE_ACTIVE); pg->state_clear(PG::STATE_CLEAN); pg->state_clear(PG::STATE_REPLAY); - dout(10) << *pg << " " << oldacting << " -> " << acting + dout(10) << *pg << " " << oldacting << " -> " << pg->acting << ", replicas changed" << endl; // completely restart peering process. @@ -1156,6 +1177,8 @@ void OSD::advance_map(ObjectStore::Transaction& t) } } + + _unlock_pg(pgid); } } } @@ -1183,7 +1206,8 @@ void OSD::activate_map(ObjectStore::Transaction& t) pg->build_prior(); pg->peer(t, query_map); } - else if (pg->is_stray()) { + else if (pg->is_stray() && + pg->get_primary() >= 0) { // i am residual|replica notify_list[pg->get_primary()].push_back(pg->info); } @@ -1394,30 +1418,63 @@ void OSD::load_pgs() } /** - * check epochs starting from start to verify the primary hasn't changed + * check epochs starting from start to verify the pg acting set hasn't changed * up until now */ -epoch_t OSD::calc_pg_primary_since(int primary, pg_t pgid, epoch_t start) +void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from) { + dout(-15) << "project_pg_history " << hex << pgid << dec + << " from " << from << " to " << osdmap->get_epoch() + << ", start " << h + << endl; + + vector last; + osdmap->pg_to_acting_osds(pgid, last); + for (epoch_t e = osdmap->get_epoch()-1; - e >= start; + e >= from; e--) { // verify during intermediate epoch - vector acting; - OSDMap oldmap; get_map(e, oldmap); + + vector acting; oldmap.pg_to_acting_osds(pgid, acting); - if (acting[0] != primary) - return e+1; // nope, primary only goes back through e! - } + // acting set change? + if (acting != last && + e <= h.same_since) { + dout(-15) << "project_pg_history " << hex << pgid << dec << " changed in " << e+1 + << " from " << acting << " -> " << last << endl; + h.same_since = e+1; + } - return start; // same all the way back thru start! -} + // primary change? + if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) && + e <= h.same_primary_since) { + dout(-15) << "project_pg_history " << hex << pgid << dec << " primary changed in " << e+1 << endl; + h.same_primary_since = e+1; + + if (g_conf.osd_rep == OSD_REP_PRIMARY) + h.same_acker_since = h.same_primary_since; + } + // acker change? + if (g_conf.osd_rep != OSD_REP_PRIMARY) { + if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) && + e <= h.same_acker_since) { + dout(-15) << "project_pg_history " << hex << pgid << dec << " acker changed in " << e+1 << endl; + h.same_acker_since = e+1; + } + } + if (h.same_since > e && + h.same_primary_since > e && + h.same_acker_since > e) break; + } + dout(-15) << "project_pg_history end " << h << endl; +} /** do_notifies @@ -1488,37 +1545,21 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) PG *pg; if (pg_map.count(pgid) == 0) { - // check mapping. - vector acting; - int nrep = osdmap->pg_to_acting_osds(pgid, acting); - if (!nrep) { - dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " has null mapping" << endl; - continue; - } - - // am i still the primary? - assert(it->same_primary_since <= osdmap->get_epoch()); - if (acting.empty() || acting[0] != whoami) { - // not primary now, so who cares! - dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i'm not the primary" << endl; + // same primary? + PG::Info::History history = it->history; + project_pg_history(pgid, history, m->get_epoch()); + + if (m->get_epoch() < history.same_primary_since) { + dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and primary changed in " + << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl; continue; - } else { - // ok, well, i'm primary now... was it continuous since caller's epoch? - epoch_t since = calc_pg_primary_since(whoami, pgid, m->get_epoch()); - if (since > m->get_epoch()) { - dout(10) << "handle_pg_notify pg " << hex << pgid << dec << " dne, and i wasn't primary during intermediate epoch " << since - << " (caller " << m->get_epoch() << " < " << since << " < now " << osdmap->get_epoch() << ")" << endl; - continue; - } } // ok, create PG! pg = create_pg(pgid, t); - pg->acting.swap( acting ); + osdmap->pg_to_acting_osds(pgid, pg->acting); pg->set_role(0); - pg->info.same_primary_since = it->same_primary_since; - pg->info.same_acker_since = it->same_acker_since; - pg->info.same_role_since = osdmap->get_epoch(); + pg->info.history = history; pg->last_epoch_started_any = it->last_epoch_started; pg->build_prior(); @@ -1533,21 +1574,14 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) waiting_for_pg.erase(pgid); } - pg = _lock_pg(pgid); + _lock_pg(pgid); } else { // already had it. am i (still) the primary? pg = _lock_pg(pgid); - if (pg->is_primary()) { - if (pg->info.same_primary_since > m->get_epoch()) { - dout(10) << *pg << " requestor epoch " << m->get_epoch() - << " < my primary start epoch " << pg->info.same_primary_since - << endl; - _unlock_pg(pgid); - continue; - } - } else { - dout(10) << *pg << " not primary" << endl; - assert(m->get_epoch() < osdmap->get_epoch()); + if (m->get_epoch() < pg->info.history.same_primary_since) { + dout(10) << *pg << " handle_pg_notify primary changed in " + << pg->info.history.same_primary_since + << " (msg from " << m->get_epoch() << ")" << endl; _unlock_pg(pgid); continue; } @@ -1628,12 +1662,12 @@ void OSD::handle_pg_log(MOSDPGLog *m) assert(pg); dout(7) << "handle_pg_log " << *pg - << " got " << m->log + << " got " << m->log << " " << m->missing << " from " << m->get_source() << endl; ObjectStore::Transaction t; - if (pg->acting[0] == whoami) { + if (pg->is_primary()) { // i am PRIMARY assert(pg->peer_log_requested.count(from) || pg->peer_summary_requested.count(from)); @@ -1657,9 +1691,7 @@ void OSD::handle_pg_log(MOSDPGLog *m) pg->merge_log(m->log, m->missing, from); assert(pg->missing.num_lost() == 0); - // ok active! - pg->info.same_primary_since = m->info.same_primary_since; - pg->info.same_acker_since = m->info.same_acker_since; + // ok activate! pg->activate(t); } @@ -1689,61 +1721,60 @@ void OSD::handle_pg_query(MOSDPGQuery *m) it != m->pg_list.end(); it++) { pg_t pgid = it->first; - PG *pg; + PG *pg = 0; if (pg_map.count(pgid) == 0) { + // same primary? + PG::Info::History history = it->second.history; + project_pg_history(pgid, history, m->get_epoch()); + + if (m->get_epoch() < history.same_primary_since) { + dout(10) << " pg " << hex << pgid << dec << " dne, and primary has changed in " + << history.same_primary_since << " (msg from " << m->get_epoch() << ")" << endl; + continue; + } + // get active rush mapping vector acting; int nrep = osdmap->pg_to_acting_osds(pgid, acting); int role = osdmap->calc_pg_role(whoami, acting, nrep); - if (role == 0) { - dout(10) << " pg " << hex << pgid << dec << " dne, and i am primary. just waiting for notify." << endl; - continue; - } if (role < 0) { dout(10) << " pg " << hex << pgid << dec << " dne, and i am not an active replica" << endl; PG::Info empty(pgid); notify_list[from].push_back(empty); continue; } - + assert(role > 0); + ObjectStore::Transaction t; - PG *pg = create_pg(pgid, t); + pg = create_pg(pgid, t); pg->acting.swap( acting ); pg->set_role(role); - - pg->info.same_primary_since = it->second.same_primary_since; - pg->info.same_acker_since = it->second.same_acker_since; - pg->info.same_role_since = osdmap->get_epoch(); + pg->info.history = history; t.collection_setattr(pgid, "info", (char*)&pg->info, sizeof(pg->info)); store->apply_transaction(t); dout(10) << *pg << " dne (before), but i am role " << role << endl; - } - pg = _lock_pg(pgid); - - // verify this is from same primary - if (pg->is_primary()) { - dout(10) << *pg << " i am primary, skipping" << endl; - _unlock_pg(pgid); - continue; - } else { - if (from == pg->acting[0]) { - if (m->get_epoch() < pg->info.same_primary_since) { - dout(10) << *pg << " not same primary since " << m->get_epoch() << ", skipping" << endl; - _unlock_pg(pgid); - continue; - } - } else { - dout(10) << *pg << " query not from primary, skipping" << endl; - assert(m->get_epoch() < osdmap->get_epoch()); + _lock_pg(pgid); + } else { + pg = _lock_pg(pgid); + + // same primary? + if (m->get_epoch() < pg->info.history.same_primary_since) { + dout(10) << *pg << " handle_pg_query primary changed in " + << pg->info.history.same_primary_since + << " (msg from " << m->get_epoch() << ")" << endl; _unlock_pg(pgid); continue; } } + // ok, process query! + assert(!pg->acting.empty()); + assert(from == pg->acting[0]); + if (it->second.type == PG::Query::INFO) { // info dout(10) << *pg << " sending info" << endl; @@ -1828,9 +1859,10 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) /** pull - request object from a peer */ -void OSD::pull(PG *pg, object_t oid, eversion_t v) +void OSD::pull(PG *pg, object_t oid) { assert(pg->missing.loc.count(oid)); + eversion_t v = pg->missing.missing[oid]; int osd = pg->missing.loc[oid]; dout(7) << *pg << " pull " << hex << oid << dec @@ -1905,19 +1937,33 @@ void OSD::op_pull(MOSDOp *op, PG *pg) const eversion_t v = op->get_version(); int from = op->get_source().num(); - dout(7) << "op_pull " << hex << oid << dec << " v " << op->get_version() + dout(7) << *pg << " op_pull " << hex << oid << dec << " v " << op->get_version() << " from " << op->get_source() << endl; // is a replica asking? are they missing it? - if (pg->is_primary() && - (pg->peer_missing.count(from) == 0 || - !pg->peer_missing[from].is_missing(oid))) { - dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl; - delete op; - return; + if (pg->is_primary()) { + // primary + assert(pg->peer_missing.count(from)); // we had better know this, from the peering process. + + if (!pg->peer_missing[from].is_missing(oid)) { + dout(7) << *pg << " op_pull replica isn't actually missing it, we must have already pushed to them" << endl; + delete op; + return; + } + + // do we have it yet? + if (waitfor_missing_object(op, pg)) + return; + } else { + // non-primary + if (pg->missing.is_missing(oid)) { + dout(7) << *pg << " op_pull not primary, and missing " << hex << oid << dec << ", ignoring" << endl; + delete op; + return; + } } - + // push it back! push(pg, oid, op->get_source().num()); } @@ -1932,15 +1978,14 @@ void OSD::op_push(MOSDOp *op, PG *pg) eversion_t v = op->get_version(); if (!pg->missing.is_missing(oid)) { - dout(7) << "op_push not missing object " << hex << oid << dec << endl; + dout(7) << *pg << " op_push not missing " << hex << oid << dec << endl; return; } - dout(7) << "op_push " + dout(7) << *pg << " op_push " << hex << oid << dec << " v " << v << " size " << op->get_length() << " " << op->get_data().length() - << " in " << *pg << endl; assert(op->get_data().length() == op->get_length()); @@ -1952,6 +1997,13 @@ void OSD::op_push(MOSDOp *op, PG *pg) t.setattrs(oid, op->get_attrset()); t.collection_add(pg->info.pgid, oid); + // close out pull op? + num_pulling--; + if (pg->objects_pulling.count(oid)) + pg->objects_pulling.erase(oid); + pg->missing.got(oid, v); + + // raise last_complete? assert(pg->log.complete_to != pg->log.log.end()); while (pg->log.complete_to != pg->log.log.end()) { @@ -1962,34 +2014,30 @@ void OSD::op_push(MOSDOp *op, PG *pg) } dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl; + // apply to disk! t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info)); unsigned r = store->apply_transaction(t); assert(r == 0); - // close out pull op? - num_pulling--; - if (pg->objects_pulling.count(oid)) - pg->objects_pulling.erase(oid); - pg->missing.got(oid, v); // am i primary? are others missing this too? if (pg->is_primary()) { for (unsigned i=1; iacting.size(); i++) { int peer = pg->acting[i]; - if (pg->peer_missing.count(peer) && - pg->peer_missing[peer].is_missing(oid)) { + assert(pg->peer_missing.count(peer)); + if (pg->peer_missing[peer].is_missing(oid)) { // ok, push it, and they (will) have it now. pg->peer_missing[peer].got(oid, v); push(pg, oid, peer); } } - - // continue recovery - pg->do_recovery(); } + // continue recovery + pg->do_recovery(); + // kick waiters if (pg->waiting_for_missing_object.count(oid)) take_waiters(pg->waiting_for_missing_object[oid]); @@ -2206,21 +2254,23 @@ void OSD::handle_op(MOSDOp *op) if (read) { // read. am i the (same) acker? if (pg->get_acker() != whoami || - op->get_map_epoch() < pg->info.same_acker_since) { + op->get_map_epoch() < pg->info.history.same_acker_since) { dout(7) << "acting acker is osd" << pg->get_acker() - << " since " << pg->info.same_acker_since + << " since " << pg->info.history.same_acker_since << ", dropping" << endl; assert(op->get_map_epoch() < osdmap->get_epoch()); + delete op; return; } } else { // write. am i the (same) primary? if (pg->get_primary() != whoami || - op->get_map_epoch() < pg->info.same_primary_since) { + op->get_map_epoch() < pg->info.history.same_primary_since) { dout(7) << "acting primary is osd" << pg->get_primary() - << " since " << pg->info.same_primary_since + << " since " << pg->info.history.same_primary_since << ", dropping" << endl; assert(op->get_map_epoch() < osdmap->get_epoch()); + delete op; return; } } @@ -2255,27 +2305,33 @@ void OSD::handle_op(MOSDOp *op) // have pg? if (!pg) { - dout(7) << "handle_rep_op " << op - << " pgid " << hex << pgid << dec << " dne" << endl; + derr(-7) << "handle_rep_op " << op + << " pgid " << hex << pgid << dec << " dne" << endl; delete op; + //assert(0); // wtf, shouldn't happen. return; } - // check osd map: same primary+acker? - if (op->get_map_epoch() != osdmap->get_epoch()) { - if (op->get_map_epoch() < pg->info.same_primary_since || - op->get_map_epoch() < pg->info.same_acker_since) { - // drop message. - delete op; - return; - } - assert(pg->get_role() >= 0); - - dout(5) << "handle_rep_op map " << op->get_map_epoch() << " != " << osdmap->get_epoch() - << ", but primary+acker same in " << *pg - << endl; + // check osd map: same set, or primary+acker? + if (g_conf.osd_rep == OSD_REP_CHAIN && + op->get_map_epoch() < pg->info.history.same_since) { + dout(10) << "handle_rep_op pg changed " << pg->info.history + << " after " << op->get_map_epoch() + << ", dropping" << endl; + delete op; + return; } - + if (g_conf.osd_rep != OSD_REP_CHAIN && + (op->get_map_epoch() < pg->info.history.same_primary_since || + op->get_map_epoch() < pg->info.history.same_acker_since)) { + dout(10) << "handle_rep_op pg primary|acker changed " << pg->info.history + << " after " << op->get_map_epoch() + << ", dropping" << endl; + delete op; + return; + } + + assert(pg->get_role() >= 0); dout(7) << "handle_rep_op " << op << " in " << *pg << endl; } @@ -2367,10 +2423,10 @@ void OSD::dequeue_op(pg_t pgid) ls.pop_front(); if (pgid) { - dout(10) << "dequeue_op write pg " << hex << pgid << dec << " op " << op << ", " + dout(10) << "dequeue_op " << op << " write pg " << hex << pgid << dec << ls.size() << " / " << (pending_ops-1) << " more pending" << endl; } else { - dout(10) << "dequeue_op read op " << op << ", " + dout(10) << "dequeue_op " << op << " read " << ls.size() << " / " << (pending_ops-1) << " more pending" << endl; } @@ -2390,7 +2446,7 @@ void OSD::dequeue_op(pg_t pgid) _unlock_pg(pgid); } - dout(10) << "dequeue_op finish op " << op << endl; + dout(10) << "dequeue_op " << op << " finish" << endl; assert(pending_ops > 0); if (pending_ops > g_conf.osd_max_opq) @@ -2412,7 +2468,7 @@ void OSD::dequeue_op(pg_t pgid) */ void OSD::do_op(Message *m, PG *pg) { - //dout(15) << "do_op " << *op << " in " << *pg << endl; + //dout(15) << "do_op " << *m << endl; if (m->get_type() == MSG_OSD_OP) { MOSDOp *op = (MOSDOp*)m; @@ -2549,7 +2605,7 @@ bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg) << " in " << *pg << ", pulling" << endl; - pull(pg, oid, v); + pull(pg, oid); } pg->waiting_for_missing_object[oid].push_back(op); return true; @@ -2695,13 +2751,13 @@ void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop) // adjust peers_complete_thru if (!repop->pg_complete_thru.empty()) { eversion_t min = pg->info.last_complete; // hrm.... - for (unsigned i=1; iacting.size(); i++) { - if (repop->pg_complete_thru[i] < min) // note: if we haven't heard, it'll be zero, which is what we want. - min = repop->pg_complete_thru[i]; + for (unsigned i=0; iacting.size(); i++) { + if (repop->pg_complete_thru[pg->acting[i]] < min) // note: if we haven't heard, it'll be zero, which is what we want. + min = repop->pg_complete_thru[pg->acting[i]]; } if (min > pg->peers_complete_thru) { - dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl; + dout(10) << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << " in " << *pg << endl; pg->peers_complete_thru = min; } } @@ -2877,7 +2933,7 @@ void OSD::op_modify(MOSDOp *op, PG *pg) // are any peers missing this? for (unsigned i=1; iacting.size(); i++) { int peer = pg->acting[i]; - if (pg->peer_missing.count(i) && + if (pg->peer_missing.count(peer) && pg->peer_missing[peer].is_missing(oid)) { // push it before this update. FIXME, this is probably extra much work (eg if we're about to overwrite) pg->peer_missing[peer].got(oid); diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index ef5fa1a4cf826..05ba3e03d430c 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -192,15 +192,14 @@ public: // PG hash_map pg_map; - //void get_pg_list(list& ls); void load_pgs(); bool pg_exists(pg_t pg); PG *create_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG PG *get_pg(pg_t pg); // return existing PG, or null - //void close_pg(pg_t pg); // close in-memory state void _remove_pg(pg_t pg); // remove from store and memory - epoch_t calc_pg_primary_since(int primary, pg_t pgid, epoch_t start); + void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from); + void activate_pg(pg_t pgid, epoch_t epoch); class C_Activate : public Context { @@ -236,7 +235,7 @@ public: void do_queries(map< int, map >& query_map); void repeer(PG *pg, map< int, map >& query_map); - void pull(PG *pg, object_t, eversion_t); + void pull(PG *pg, object_t oid); void push(PG *pg, object_t oid, int dest); bool require_current_map(Message *m, epoch_t v); diff --git a/ceph/osd/PG.cc b/ceph/osd/PG.cc index d4cbeedcfcb7e..542624d1f68a9 100644 --- a/ceph/osd/PG.cc +++ b/ceph/osd/PG.cc @@ -224,11 +224,12 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second << " also LOST on source, osd" << fromosd << endl; } - } else { + } + else if (p->second <= olog.top) { dout(10) << "merge_log missing " << hex << p->first << dec << " " << p->second << " on source, osd" << fromosd << endl; missing.loc[p->first] = fromosd; - } + } } dout(10) << "merge_log missing " << hex << missing.missing << dec << endl; @@ -391,7 +392,7 @@ void PG::peer(ObjectStore::Transaction& t, } dout(10) << " querying info from osd" << *it << endl; - query_map[*it][info.pgid] = Query(Query::INFO, info.same_primary_since, info.same_acker_since); + query_map[*it][info.pgid] = Query(Query::INFO, info.history); peer_info_requested.insert(*it); } if (missing_info) return; @@ -465,7 +466,20 @@ void PG::peer(ObjectStore::Transaction& t, } } - // gather log? + // gather log+missing? + // ...from all active + for (unsigned i=1; iwhoami) { if (peer_log_requested.count(newest_update_osd) || peer_summary_requested.count(newest_update_osd)) { @@ -479,8 +493,7 @@ void PG::peer(ObjectStore::Transaction& t, << " v " << newest_update << ", querying since " << oldest_update_needed << endl; - query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, - info.same_primary_since, info.same_acker_since); + query_map[newest_update_osd][info.pgid] = Query(Query::LOG, oldest_update_needed, info.history); peer_log_requested[newest_update_osd] = oldest_update_needed; } else { dout(10) << " newest update on osd" << newest_update_osd @@ -490,8 +503,7 @@ void PG::peer(ObjectStore::Transaction& t, assert((peer_info[newest_update_osd].last_complete >= peer_info[newest_update_osd].log_bottom) || peer_info[newest_update_osd].log_backlog); // or else we're in trouble. - query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, - info.same_primary_since, info.same_acker_since); + query_map[newest_update_osd][info.pgid] = Query(Query::BACKLOG, info.history); peer_summary_requested.insert(newest_update_osd); } } @@ -499,6 +511,18 @@ void PG::peer(ObjectStore::Transaction& t, } else { dout(10) << " i have the most up-to-date pg v " << info.last_update << endl; } + + // did we get them all? + bool have_missing = true; + for (unsigned i=1; iwhoami); // can't be me, or we're in trouble. - query_map[who][info.pgid] = Query(Query::BACKLOG, - info.same_primary_since, info.same_acker_since); + query_map[who][info.pgid] = Query(Query::BACKLOG, info.history); peer_summary_requested.insert(who); } return; @@ -557,9 +580,7 @@ void PG::peer(ObjectStore::Transaction& t, } dout(10) << " requesting summary/backlog from osd" << peer << endl; - query_map[peer][info.pgid] = Query(Query::INFO, - info.same_primary_since, - info.same_acker_since); + query_map[peer][info.pgid] = Query(Query::INFO, info.history); peer_summary_requested.insert(peer); waiting = true; } @@ -589,7 +610,8 @@ void PG::peer(ObjectStore::Transaction& t, state_set(STATE_REPLAY); g_timer.add_event_after(g_conf.osd_replay_window, new OSD::C_Activate(osd, info.pgid, osd->osdmap->get_epoch())); - } else { + } + else if (!is_active()) { // -- ok, activate! activate(t); } @@ -598,6 +620,8 @@ void PG::peer(ObjectStore::Transaction& t, void PG::activate(ObjectStore::Transaction& t) { + assert(!is_active()); + // twiddle pg state state_set(STATE_ACTIVE); state_clear(STATE_STRAY); @@ -629,7 +653,8 @@ void PG::activate(ObjectStore::Transaction& t) log.complete_to == log.log.end(); log.requested_to = log.log.end(); } - else if (is_primary()) { + //else if (is_primary()) { + else if (true) { dout(10) << "activate - not complete, " << missing << ", starting recovery" << endl; // init complete_to @@ -638,7 +663,7 @@ void PG::activate(ObjectStore::Transaction& t) log.complete_to++; assert(log.complete_to != log.log.end()); } - + // start recovery log.requested_to = log.complete_to; do_recovery(); @@ -660,10 +685,6 @@ void PG::activate(ObjectStore::Transaction& t) int peer = acting[i]; assert(peer_info.count(peer)); - if (peer_info[peer].is_clean()) - clean_set.insert(peer); - - MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(), info.pgid); m->info = info; @@ -681,19 +702,42 @@ void PG::activate(ObjectStore::Transaction& t) assert(peer_info[peer].last_update < info.last_update); m->log.copy_after(log, peer_info[peer].last_update); } + + // update local version of peer's missing list! + { + eversion_t plu = peer_info[peer].last_update; + Missing& pm = peer_missing[peer]; + for (list::iterator p = m->log.log.begin(); + p != m->log.log.end(); + p++) + if (p->version > plu) + pm.add(p->oid, p->version); + } - dout(10) << "sending " << m->log << " " << m->missing + dout(10) << "activate sending " << m->log << " " << m->missing << " to osd" << peer << endl; - //m->log.print(cout); - osd->messenger->send_message(m, MSG_ADDR_OSD(peer)); + + // update our missing + if (peer_missing[peer].num_missing() == 0) { + dout(10) << "activate peer osd" << peer << " already clean, " << peer_info[peer] << endl; + assert(peer_info[peer].last_complete == info.last_update); + clean_set.insert(peer); + } else { + dout(10) << "activate peer osd" << peer << " " << peer_info[peer] + << " missing " << peer_missing[peer] << endl; + } + } + + // discard unneeded peering state + //peer_log.clear(); // actually, do this carefully, in case peer() is called again. // all clean? if (is_all_clean()) { state_set(STATE_CLEAN); - dout(10) << "all replicas clean" << endl; + dout(10) << "activate all replicas clean" << endl; clean_replicas(); } } @@ -839,7 +883,7 @@ bool PG::do_recovery() if (latest->is_update() && !objects_pulling.count(latest->oid) && missing.is_missing(latest->oid)) { - osd->pull(this, latest->oid, latest->version); + osd->pull(this, latest->oid); return true; } diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index 9ea3600accb09..19c7b7d749416 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -90,19 +90,21 @@ public: epoch_t last_epoch_started; // last epoch started. epoch_t last_epoch_finished; // last epoch finished. - epoch_t same_primary_since; // upper bound: same primary at least back through this epoch. - epoch_t same_acker_since; // upper bound: same acker at least back through this epoch. - epoch_t same_role_since; // upper bound: i have held same role since + struct History { + epoch_t same_since; // same acting set since + epoch_t same_primary_since; // same primary at least back through this epoch. + epoch_t same_acker_since; // same acker at least back through this epoch. + History() : same_since(0), same_primary_since(0), same_acker_since(0) {} + } history; + Info(pg_t p=0) : pgid(p), log_backlog(false), - last_epoch_started(0), last_epoch_finished(0), - same_primary_since(0), same_acker_since(0), - same_role_since(0) {} + last_epoch_started(0), last_epoch_finished(0) {} bool is_clean() { return last_update == last_complete; } }; - - + + /** * Query - used to ask a peer for information about a pg. * @@ -116,16 +118,13 @@ public: int type; eversion_t version; - epoch_t same_primary_since; - epoch_t same_acker_since; - - Query() : type(-1), same_primary_since(0), same_acker_since(0) {} - Query(int t, epoch_t ps, epoch_t as) : - type(t), - same_primary_since(ps), same_acker_since(as) {} - Query(int t, eversion_t v, epoch_t ps, epoch_t as) : - type(t), version(v), - same_primary_since(ps), same_acker_since(as) {} + Info::History history; + + Query() : type(-1) {} + Query(int t, Info::History& h) : + type(t), history(h) {} + Query(int t, eversion_t v, Info::History& h) : + type(t), version(v), history(h) {} }; @@ -409,7 +408,7 @@ public: public: // any static const int STATE_ACTIVE = 1; // i am active. (primary: replicas too) - + // primary static const int STATE_CLEAN = 2; // peers are complete, clean of stray replicas. static const int STATE_CRASHED = 4; // all replicas went down. @@ -601,6 +600,12 @@ public: }; + +inline ostream& operator<<(ostream& out, const PG::Info::History& h) +{ + return out << h.same_since << "/" << h.same_primary_since << "/" << h.same_acker_since; +} + inline ostream& operator<<(ostream& out, const PG::Info& pgi) { return out << "pginfo(" << hex << pgi.pgid << dec @@ -608,6 +613,7 @@ inline ostream& operator<<(ostream& out, const PG::Info& pgi) << " (" << pgi.log_bottom << "," << pgi.last_update << "]" << (pgi.log_backlog ? "+backlog":"") << " e " << pgi.last_epoch_started << "/" << pgi.last_epoch_finished + << " " << pgi.history << ")"; } diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index 08860bd7a9736..95a961587e7b6 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -107,51 +107,45 @@ void Objecter::scan_pgs(set& changed_pgs) i++) { pg_t pgid = i->first; PG& pg = i->second; + + // calc new. + vector other; + osdmap->pg_to_acting_osds(pgid, other); - int oldu = pg.updater; - int oldr = pg.reader; - pg.calc(pgid, osdmap); - - if (oldu != pg.updater || - oldr != pg.reader) { - /* - if (oldu < 0) { - dout(10) << "scan_pgs pg " << hex << pgid << dec - << " (" << pg.active_tids << ")" - << " updater " << oldu << " -> " << pg.updater - << " (was crashed)" - << endl; - //recovering_pgs.insert(pgid); - } - else if (osdmap->is_down(oldu)) { - dout(10) << "scan_pgs pg " << hex << pgid << dec - << " (" << pg.active_tids << ")" - << " updater " << oldu << " -> " << pg.updater - << " (updater went down)" - << endl; - //down_pgs.insert(pgid); - } - else { - dout(10) << "scan_pgs pg " << hex << pgid << dec - << " (" << pg.active_tids << ")" - << " updater " << oldu << " -> " << pg.updater - << " (primary changed)" - << endl; - } - */ - dout(10) << "scan_pgs pg " << hex << pgid << dec - << " (" << pg.active_tids << ")" - << " updater " << oldu << " -> " << pg.updater - << ", reader " << oldr << " -> " << pg.reader - << endl; - changed_pgs.insert(pgid); + if (other == pg.acting) + continue; // no change. + + other.swap(pg.acting); + + if (g_conf.osd_rep == OSD_REP_PRIMARY) { + // same primary? + if (!other.empty() && + !pg.acting.empty() && + other[0] == pg.acting[0]) + continue; + } + else if (g_conf.osd_rep == OSD_REP_SPLAY) { + // same primary and acker? + if (!other.empty() && + !pg.acting.empty() && + other[0] == pg.acting[0] && + other[other.size()-1] == pg.acting[pg.acting.size()-1]) + continue; } + else if (g_conf.osd_rep == OSD_REP_CHAIN) { + // any change is significant. + } + + // changed significantly. + dout(10) << "scan_pgs pg " << hex << pgid << dec + << " (" << pg.active_tids << ")" + << " " << other << " -> " << pg.acting + << endl; + changed_pgs.insert(pgid); } } -void Objecter::kick_requests(set& changed_pgs/*, - set& down_pgs, - set& recovering_pgs*/) +void Objecter::kick_requests(set& changed_pgs) { dout(10) << "kick_requests in pgs " << hex << changed_pgs << dec << endl; @@ -274,11 +268,11 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) << " oid " << hex << ex.oid << dec << " " << ex.start << "~" << ex.length << " (" << ex.buffer_extents.size() << " buffer fragments)" << " pg " << hex << ex.pgid << dec - << " osd" << pg.reader + << " osd" << pg.acker() << endl; - if (pg.reader >= 0) - messenger->send_message(m, MSG_ADDR_OSD(pg.reader), 0); + if (pg.acker() >= 0) + messenger->send_message(m, MSG_ADDR_OSD(pg.acker()), 0); // add to gather set rd->ops[last_tid] = ex; @@ -517,7 +511,6 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) { // find PG &pg = get_pg( ex.pgid ); - //int osd = osdmap->get_pg_acting_primary( ex.pgid ); // send tid_t tid; @@ -566,10 +559,10 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) << " oid " << hex << ex.oid << dec << " " << ex.start << "~" << ex.length << " pg " << hex << ex.pgid << dec - << " osd" << pg.updater + << " osd" << pg.primary() << endl; - if (pg.updater >= 0) - messenger->send_message(m, MSG_ADDR_OSD(pg.updater), 0); + if (pg.primary() >= 0) + messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0); return tid; } @@ -601,7 +594,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) PG &pg = get_pg( m->get_pg() ); // ignore? - if (pg.reader != m->get_source().num()) { + if (pg.acker() != m->get_source().num()) { dout(7) << " ignoring ack|commit from non-acker" << endl; delete m; return; diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h index 902d270ed7fdc..140bb58dc7efb 100644 --- a/ceph/osdc/Objecter.h +++ b/ceph/osdc/Objecter.h @@ -68,15 +68,7 @@ class Objecter { OSDWrite(bufferlist &b) : OSDModify(OSD_OP_WRITE), bl(b) {} }; - /* - class OSDLock : public OSDModify { - public: - map by_osd; - map::iterator next; - OSDLock(int o) : OSDModify(o) {} - }; - */ - + private: // pending ops @@ -89,27 +81,32 @@ class Objecter { */ class PG { public: - int updater; // where i write - int reader; // where i read, and expect acks from + vector acting; set active_tids; // active ops - - PG() : updater(-1), reader(-1) {} - - void calc(pg_t pgid, OSDMap *osdmap) { // return true if change - updater = osdmap->get_pg_acting_primary(pgid); + + PG() {} + + // primary - where i write + int primary() { + if (acting.empty()) return -1; + return acting[0]; + } + // acker - where i read, and receive acks from + int acker() { + if (acting.empty()) return -1; if (g_conf.osd_rep == OSD_REP_PRIMARY) - reader = updater; - else - reader = osdmap->get_pg_acting_tail(pgid); + return acting[0]; + else + return acting[acting.size()-1]; } }; hash_map pg_map; - + PG &get_pg(pg_t pgid) { if (!pg_map.count(pgid)) - pg_map[pgid].calc(pgid, osdmap); + osdmap->pg_to_acting_osds(pgid, pg_map[pgid].acting); return pg_map[pgid]; } void close_pg(pg_t pgid) { @@ -117,8 +114,8 @@ class Objecter { assert(pg_map[pgid].active_tids.empty()); pg_map.erase(pgid); } - void scan_pgs(set& chnaged_pgs);//, set& down_pgs); - void kick_requests(set& changed_pgs);//, set& down_pgs); + void scan_pgs(set& chnaged_pgs); + void kick_requests(set& changed_pgs); public: -- 2.39.5