From 80bf4068271bff3e8ff860c092e8c35f9784ae0e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 15 Mar 2008 10:49:37 -0700 Subject: [PATCH] basics are working --- src/mon/PGMap.h | 6 +- src/mon/PGMonitor.cc | 21 +++++-- src/msg/SimpleMessenger.cc | 4 +- src/osd/OSD.cc | 112 +++++++++++++++++++++++++++++++------ src/osd/OSD.h | 10 ++-- src/osd/PG.cc | 16 +++--- src/osd/PG.h | 40 ++----------- src/osd/osd_types.h | 26 +++++++++ 8 files changed, 158 insertions(+), 77 deletions(-) diff --git a/src/mon/PGMap.h b/src/mon/PGMap.h index 6cd85aa795835..e7b92090f58f3 100644 --- a/src/mon/PGMap.h +++ b/src/mon/PGMap.h @@ -23,8 +23,6 @@ #include "osd/osd_types.h" -#define PG_STATE_CREATING 0x100 // this had better not collide with PG::STATE_* in osd/PG.h - class PGMap { public: // the map @@ -109,7 +107,7 @@ public: total_pg_num_bytes += s.num_bytes; total_pg_num_blocks += s.num_blocks; total_pg_num_objects += s.num_objects; - if (s.state == PG_STATE_CREATING) + if (s.state & PG_STATE_CREATING) creating_pgs.insert(pgid); } void stat_pg_sub(pg_t pgid, pg_stat_t &s) { @@ -118,7 +116,7 @@ public: total_pg_num_bytes -= s.num_bytes; total_pg_num_blocks -= s.num_blocks; total_pg_num_objects -= s.num_objects; - if (s.state == PG_STATE_CREATING) + if (s.state & PG_STATE_CREATING) creating_pgs.erase(pgid); } void stat_osd_add(osd_stat_t &s) { diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 639872f0e06ab..21a0c1a4d698a 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -100,7 +100,7 @@ bool PGMonitor::update_from_paxos() ++p) { if (p != pg_map.num_pg_by_state.begin()) ss << ", "; - ss << p->second << " " << PG::get_state_string(p->first) << "(" << p->first << ")"; + ss << p->second << " " << pg_state_string(p->first) << "(" << p->first << ")"; } string states = ss.str(); dout(0) << "v" << pg_map.version << " " << states << dendl; @@ -236,14 +236,23 @@ bool PGMonitor::handle_pg_stats(MPGStats *stats) continue; } - dout(15) << " got " << pgid << " reported at " << p->second.reported - << " state " << PG::get_state_string(p->second.state) + if (pg_map.pg_stat.count(pgid) == 0) { + dout(15) << " got " << pgid << " reported at " << p->second.reported + << " state " << pg_state_string(p->second.state) + << " but DNE in pg_map!!" + << dendl; + assert(0); + } + + dout(15) << " got " << pgid + << " reported at " << p->second.reported + << " state " << pg_state_string(pg_map.pg_stat[pgid].state) + << " -> " << pg_state_string(p->second.state) << dendl; pending_inc.pg_stat_updates[pgid] = p->second; - // we don't care about consistency; apply to live map. - if (pg_map.pg_stat.count(pgid)) - pg_map.stat_pg_sub(pgid, pg_map.pg_stat[pgid]); + // we don't care much about consistency, here; apply to live map. + pg_map.stat_pg_sub(pgid, pg_map.pg_stat[pgid]); pg_map.pg_stat[pgid] = p->second; pg_map.stat_pg_add(pgid, pg_map.pg_stat[pgid]); } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index dd154f99c5f7f..c45386c373d92 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -729,12 +729,12 @@ void Rank::mark_down(entity_addr_t addr) lock.Lock(); if (rank_pipe.count(addr)) { Pipe *p = rank_pipe[addr]; - dout(0) << "mark_down " << addr << " -- " << p << dendl; + dout(2) << "mark_down " << addr << " -- " << p << dendl; p->lock.Lock(); p->stop(); p->lock.Unlock(); } else { - dout(0) << "mark_down " << addr << " -- pipe dne" << dendl; + dout(2) << "mark_down " << addr << " -- pipe dne" << dendl; } lock.Unlock(); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5e6a1176e2948..049789ddc703b 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -609,6 +609,25 @@ void OSD::load_pgs() +/* + * calculate pg primaries during an epoch interval + */ +void OSD::calc_primaries_during(pg_t pgid, epoch_t start, epoch_t end, set& pset) +{ + dout(15) << "calc_primaries_during " << pgid << " [" << start << "," << end << ")" << dendl; + + for (epoch_t e = start; e < end; e++) { + OSDMap oldmap; + get_map(e, oldmap); + vector acting; + oldmap.pg_to_acting_osds(pgid, acting); + dout(20) << " " << pgid << " in epoch " << e << " was " << acting << dendl; + if (acting.size()) + pset.insert(acting[0]); + } +} + + /** * check epochs starting from start to verify the pg acting set hasn't changed * up until now @@ -1395,6 +1414,13 @@ void OSD::advance_map(ObjectStore::Transaction& t) << " " << pg_map.size() << " pgs" << dendl; + // scan pg creations + for (hash_map::iterator p = creating_pgs.begin(); + p != creating_pgs.end(); + p++) { + dout(0) << "IMPLEMENT ME *********" << dendl; + } + // scan existing pg's for (hash_map::iterator it = pg_map.begin(); it != pg_map.end(); @@ -1439,7 +1465,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) } // deactivate. - pg->state_clear(PG::STATE_ACTIVE); + pg->state_clear(PG_STATE_ACTIVE); // reset primary state? if (oldrole == 0 || pg->get_role() == 0) @@ -1456,7 +1482,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) if (role != oldrole) { // old primary? if (oldrole == 0) { - pg->state_clear(PG::STATE_CLEAN); + pg->state_clear(PG_STATE_CLEAN); // take replay queue waiters list ls; @@ -1476,10 +1502,10 @@ void OSD::advance_map(ObjectStore::Transaction& t) // new primary? if (role == 0) { // i am new primary - pg->state_clear(PG::STATE_STRAY); + pg->state_clear(PG_STATE_STRAY); } else { // i am now replica|stray. we need to send a notify. - pg->state_set(PG::STATE_STRAY); + pg->state_set(PG_STATE_STRAY); if (nrep == 0) { // did they all shut down cleanly? @@ -1491,7 +1517,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) if (clean) { dout(1) << *pg << " is cleanly inactive" << dendl; } else { - pg->state_set(PG::STATE_CRASHED); + pg->state_set(PG_STATE_CRASHED); dout(1) << *pg << " is crashed" << dendl; } } @@ -1506,7 +1532,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) // did primary change? if (pg->get_primary() != oldprimary) { // we need to announce - pg->state_set(PG::STATE_STRAY); + pg->state_set(PG_STATE_STRAY); dout(10) << *pg << " " << oldacting << " -> " << pg->acting << ", acting primary " @@ -1516,8 +1542,8 @@ void OSD::advance_map(ObjectStore::Transaction& t) // primary is the same. if (role == 0) { // i am (still) primary. but my replica set changed. - pg->state_clear(PG::STATE_CLEAN); - pg->state_clear(PG::STATE_REPLAY); + pg->state_clear(PG_STATE_CLEAN); + pg->state_clear(PG_STATE_REPLAY); dout(10) << *pg << " " << oldacting << " -> " << pg->acting << ", replicas changed" << dendl; @@ -1728,7 +1754,9 @@ void OSD::handle_pg_create(MOSDPGCreate *m) if (!require_same_or_newer_map(m, m->epoch)) return; + map< int, map > query_map; ObjectStore::Transaction t; + int created = 0; for (map::iterator p = m->mkpg.begin(); p != m->mkpg.end(); @@ -1743,7 +1771,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m) int role = osdmap->calc_pg_role(whoami, acting, nrep); if (role != 0) { - dout(10) << "mkpg " << pgid << " no longer primary (role=" << role << "), skipping" << dendl; + dout(10) << "mkpg " << pgid << " not primary (role=" << role << "), skipping" << dendl; continue; } @@ -1757,19 +1785,34 @@ void OSD::handle_pg_create(MOSDPGCreate *m) PG::Info::History history; project_pg_history(pgid, history, created, acting); - if (history.same_primary_since <= created) { - dout(10) << "mkpg " << pgid << " creating now" << dendl; - try_create_pg(pgid, t); - } else { - dout(10) << "mkpg " << pgid << " e " << created << " : querying priors" << dendl; - assert(0); + if (history.same_primary_since > created) { + // calc prior primary set. may be empty, keep in mind. + set pset; + calc_primaries_during(pgid, created, history.same_primary_since, pset); + pset.erase(whoami); + if (pset.size()) { + dout(10) << "mkpg " << pgid << " e " << created << " : querying priors " + << pset << dendl; + for (set::iterator p = pset.begin(); p != pset.end(); p++) + if (osdmap->is_up(*p)) + query_map[*p][pgid].type = PG::Query::INFO; + creating_pgs[pgid].created = created; + creating_pgs[pgid].prior.swap(pset); + continue; + } } + dout(10) << "mkpg " << pgid << " creating now" << dendl; + try_create_pg(pgid, t); + created++; } store->apply_transaction(t); + do_queries(query_map); delete m; -} + if (created) + update_heartbeat_sets(); +} // ---------------------------------------- @@ -1824,6 +1867,7 @@ void OSD::do_activators(map& activator_map) activator_map.clear(); } + /** PGNotify * from non-primary to primary * includes PG::Info. @@ -1841,6 +1885,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) // look for unknown PGs i'm primary for map< int, map > query_map; map activator_map; + int created = 0; for (list::iterator it = m->get_pg_list().begin(); it != m->get_pg_list().end(); @@ -1865,16 +1910,44 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) assert(role == 0); // otherwise, probably bug in project_pg_history. + epoch_t last_epoch_started = it->last_epoch_started; + + // is this an empty info? + if (it->last_update == eversion_t()) { + // is there a creation probe on this pg? + if (creating_pgs.count(pgid)) { + creating_pgs[pgid].prior.erase(from); + if (!creating_pgs[pgid].prior.empty()) { + dout(10) << "handle_pg_notify pg " << pgid + << " doing creation probe, still waiting for " + << creating_pgs[pgid].prior << dendl; + continue; + } + dout(10) << "handle_pg_notify pg " << pgid + << " finished creation probe and DNE, creating" + << dendl; + last_epoch_started = + history.same_since = + history.same_primary_since = + history.same_acker_since = osdmap->get_epoch(); + } else { + dout(10) << "handle_pg_notify pg " << pgid + << " was doing creation probe, but found pg info on osd" << from << dendl; + } + } + creating_pgs.erase(pgid); + // ok, create PG! pg = _create_lock_pg(pgid, t); pg->acting.swap(acting); pg->set_role(role); pg->info.history = history; pg->clear_primary_state(); // yep, notably, set hml=false - pg->last_epoch_started_any = it->last_epoch_started; // _after_ clear_primary_state() + pg->last_epoch_started_any = last_epoch_started; // _after_ clear_primary_state() pg->build_prior(); pg->write_log(t); + created++; dout(10) << *pg << " is new" << dendl; // kick any waiters @@ -1901,7 +1974,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) if (!acting && (*it).last_epoch_started > 0) { dout(10) << *pg << " osd" << from << " has stray content: " << *it << dendl; pg->stray_set.insert(from); - pg->state_clear(PG::STATE_CLEAN); + pg->state_clear(PG_STATE_CLEAN); } // save info. @@ -1938,6 +2011,9 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) do_queries(query_map); do_activators(activator_map); + if (created) + update_heartbeat_sets(); + delete m; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 4a4a5e61989bf..350612f0e513a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -282,6 +282,7 @@ private: void try_create_pg(pg_t pgid, ObjectStore::Transaction& t); void load_pgs(); + void calc_primaries_during(pg_t pgid, epoch_t start, epoch_t end, set& pset); void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from, vector& last); void activate_pg(pg_t pgid, epoch_t epoch); @@ -298,12 +299,11 @@ private: }; // -- pg creation -- - struct pg_create_info { - epoch_t first_epoch; - set prior_set; - set dne_set; + struct create_pg_info { + epoch_t created; + set prior; }; - map creating_pg; + hash_map creating_pgs; void handle_pg_create(class MOSDPGCreate *m); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 1ad05ab48afca..ef5fd91f215f3 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -656,14 +656,14 @@ void PG::peer(ObjectStore::Transaction& t, dout(10) << " cleanly stopped since epoch " << last_epoch_started_any << dendl; } else { dout(10) << " crashed since epoch " << last_epoch_started_any << dendl; - state_set(STATE_CRASHED); + state_set(PG_STATE_CRASHED); } } else { dout(10) << " still active from last started: " << last_started << dendl; } } else if (osd->osdmap->get_epoch() > info.epoch_created) { // FIXME hrm is htis right? dout(10) << " crashed since epoch " << last_epoch_started_any << dendl; - state_set(STATE_CRASHED); + state_set(PG_STATE_CRASHED); } dout(10) << " peers_complete_thru " << peers_complete_thru << dendl; @@ -820,7 +820,7 @@ void PG::peer(ObjectStore::Transaction& t, // -- crash recovery? if (is_crashed()) { dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window << dendl; - state_set(STATE_REPLAY); + state_set(PG_STATE_REPLAY); osd->timer.add_event_after(g_conf.osd_replay_window, new OSD::C_Activate(osd, info.pgid, osd->osdmap->get_epoch())); } @@ -837,12 +837,12 @@ void PG::activate(ObjectStore::Transaction& t, assert(!is_active()); // twiddle pg state - state_set(STATE_ACTIVE); - state_clear(STATE_STRAY); + state_set(PG_STATE_ACTIVE); + state_clear(PG_STATE_STRAY); if (is_crashed()) { //assert(is_replay()); // HELP.. not on replica? - state_clear(STATE_CRASHED); - state_clear(STATE_REPLAY); + state_clear(PG_STATE_CRASHED); + state_clear(PG_STATE_REPLAY); } last_epoch_started_any = info.last_epoch_started = osd->osdmap->get_epoch(); @@ -1007,7 +1007,7 @@ void PG::finish_recovery() { dout(10) << "finish_recovery" << dendl; - state_set(PG::STATE_CLEAN); + state_set(PG_STATE_CLEAN); purge_strays(); update_stats(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 1d02f0913a1cf..632903b1b6320 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -374,34 +374,6 @@ public: /*** PG ****/ -public: - // any - static const int STATE_ACTIVE = 1; // i am active. (primary: replicas too) - - // primary - static const int STATE_CLEAN = 2; // peers are complete, clean of stray replicas. - static const int STATE_CRASHED = 4; // all replicas went down. - static const int STATE_REPLAY = 8; // crashed, waiting for replay - - // non-primary - static const int STATE_STRAY = 16; // i must notify the primary i exist. - - static const int STATE_CREATING = 256; // pg is being created (used by pgmonitor only) - - static std::string get_state_string(int state) { - std::string st; - if (state & STATE_ACTIVE) st += "active+"; - if (state & STATE_CLEAN) st += "clean+"; - if (state & STATE_CRASHED) st += "crashed+"; - if (state & STATE_REPLAY) st += "replay+"; - if (state & STATE_STRAY) st += "stray+"; - if (!st.length()) - st = "inactive"; - else - st.resize(st.length()-1); - return st; - } - protected: OSD *osd; @@ -615,12 +587,12 @@ public: bool is_complete() const { return info.last_complete == info.last_update; } - bool is_active() const { return state_test(STATE_ACTIVE); } - bool is_crashed() const { return state_test(STATE_CRASHED); } - bool is_replay() const { return state_test(STATE_REPLAY); } - //bool is_complete() { return state_test(STATE_COMPLETE); } - bool is_clean() const { return state_test(STATE_CLEAN); } - bool is_stray() const { return state_test(STATE_STRAY); } + bool is_active() const { return state_test(PG_STATE_ACTIVE); } + bool is_crashed() const { return state_test(PG_STATE_CRASHED); } + bool is_replay() const { return state_test(PG_STATE_REPLAY); } + //bool is_complete() { return state_test(PG_STATE_COMPLETE); } + bool is_clean() const { return state_test(PG_STATE_CLEAN); } + bool is_stray() const { return state_test(PG_STATE_STRAY); } bool is_empty() const { return info.last_update == eversion_t(0,0); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 421761bc679ab..50042e2e160f0 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -222,6 +222,32 @@ struct osd_stat_t { }; + +/* + * pg states + */ +#define PG_STATE_CREATING 1 // this had better not collide with PG::STATE_* in osd/PG.h +#define PG_STATE_ACTIVE 2 // i am active. (primary: replicas too) +#define PG_STATE_CLEAN 4 // peers are complete, clean of stray replicas. +#define PG_STATE_CRASHED 8 // all replicas went down. +#define PG_STATE_REPLAY 16 // crashed, waiting for replay +#define PG_STATE_STRAY 32 // i must notify the primary i exist. + +static inline std::string pg_state_string(int state) { + std::string st; + if (state & PG_STATE_ACTIVE) st += "active+"; + if (state & PG_STATE_CLEAN) st += "clean+"; + if (state & PG_STATE_CRASHED) st += "crashed+"; + if (state & PG_STATE_REPLAY) st += "replay+"; + if (state & PG_STATE_STRAY) st += "stray+"; + if (state & PG_STATE_CREATING) st += "creating+"; + if (!st.length()) + st = "inactive"; + else + st.resize(st.length()-1); + return st; +} + /** pg_stat * aggregate stats for a single PG. */ -- 2.39.5