From 87003118a1f434fe7d391b1f02d52ac6ade6ebe9 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 15 Mar 2008 12:28:28 -0700 Subject: [PATCH] mon, osd recovery and pg creation cleanups - osd mkpg cleanup - osd recovery fixes - pg state reporting fixes --- src/TODO | 6 +- src/mon/Monitor.cc | 1 + src/mon/OSDMonitor.cc | 15 +++-- src/mon/OSDMonitor.h | 2 + src/mon/PGMap.h | 3 +- src/mon/PGMonitor.cc | 56 ++++++++++------ src/mon/PGMonitor.h | 7 +- src/osd/OSD.cc | 144 +++++++++++++++++++++++----------------- src/osd/OSD.h | 9 ++- src/osd/PG.cc | 1 - src/osd/PG.h | 3 + src/osd/ReplicatedPG.cc | 28 +++----- src/start.sh | 3 +- 13 files changed, 160 insertions(+), 118 deletions(-) diff --git a/src/TODO b/src/TODO index 6f578f8007547..7666de36b8ce4 100644 --- a/src/TODO +++ b/src/TODO @@ -171,11 +171,9 @@ osd/rados 2: B C 3: A C -> prior_set can be , bc C would carry any epoch 2 updates - 1: A B C - 2: B C - 3: A C -> prior_set can be , bc C would carry any epoch 2 updates - -> so: we need at least 1 osd from each epoch, IFF we make store sync on osdmap boundaries. + + -> so, use calc_priors_during in build_prior, then make recovery code check for is_up - paxos replication (i.e. majority voting)? diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index dab077c98564f..d3a7642cd769f 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -402,6 +402,7 @@ void Monitor::tick() osdmon->tick(); mdsmon->tick(); + pgmon->tick(); // next tick! reset_tick(); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 89ef45dd2eafa..e4afa18f8cbdd 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -159,10 +159,7 @@ bool OSDMonitor::update_from_paxos() mon->store->put_bl_sn(bl, "osdmap_full", osdmap.epoch); // share - dout(1) << osdmap.get_num_osds() << " osds, " - << osdmap.get_num_up_osds() << " up, " - << osdmap.get_num_in_osds() << " in" - << dendl; + print_summary_stats(1); } mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch"); @@ -177,6 +174,13 @@ bool OSDMonitor::update_from_paxos() return true; } +void OSDMonitor::print_summary_stats(int dbl) +{ + dout(dbl) << osdmap.get_num_osds() << " osds: " + << osdmap.get_num_up_osds() << " up, " + << osdmap.get_num_in_osds() << " in" + << dendl; +} void OSDMonitor::create_pending() { @@ -637,6 +641,8 @@ void OSDMonitor::bcast_full_osd() void OSDMonitor::tick() { + print_summary_stats(10); + if (!mon->is_leader()) return; if (!paxos->is_active()) return; @@ -664,6 +670,7 @@ void OSDMonitor::tick() propose_pending(); } + #define SWAP_PRIMARIES_AT_START 0 #define SWAP_TIME 1 diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 49fcbf767e4a8..df591002f112c 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -115,6 +115,8 @@ private: bool preprocess_out(class MOSDOut *m); bool prepare_out(class MOSDOut *m); + void print_summary_stats(int dbl=10); + public: OSDMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } diff --git a/src/mon/PGMap.h b/src/mon/PGMap.h index e7b92090f58f3..7cebd57158d56 100644 --- a/src/mon/PGMap.h +++ b/src/mon/PGMap.h @@ -112,7 +112,8 @@ public: } void stat_pg_sub(pg_t pgid, pg_stat_t &s) { num_pg--; - num_pg_by_state[s.state]--; + if (--num_pg_by_state[s.state] == 0) + num_pg_by_state.erase(s.state); total_pg_num_bytes -= s.num_bytes; total_pg_num_blocks -= s.num_blocks; total_pg_num_objects -= s.num_objects; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 21a0c1a4d698a..29e17bc8131a6 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -42,9 +42,11 @@ Tick function to update the map based on performance every N seconds */ -/* -void PGMonitor::tick() { +void PGMonitor::tick() +{ + print_summary_stats(10); + /* // magic incantation that Sage told me if (!mon->is_leader()) return; if (!paxos->is_active()) return; @@ -58,10 +60,9 @@ void PGMonitor::tick() { if (mon->osdmon->paxos->is_readable()) { // safely use mon->osdmon->osdmap } - + */ } -*/ void PGMonitor::create_initial() { dout(10) << "create_initial -- creating initial map" << dendl; @@ -94,17 +95,7 @@ bool PGMonitor::update_from_paxos() inc._decode(bl, off); pg_map.apply_incremental(inc); - std::stringstream ss; - for (hash_map::iterator p = pg_map.num_pg_by_state.begin(); - p != pg_map.num_pg_by_state.end(); - ++p) { - if (p != pg_map.num_pg_by_state.begin()) - ss << ", "; - ss << p->second << " " << pg_state_string(p->first) << "(" << p->first << ")"; - } - string states = ss.str(); - dout(0) << "v" << pg_map.version << " " << states << dendl; - + print_summary_stats(0); } else { dout(7) << "update_from_paxos couldn't read incremental " << pg_map.version+1 << dendl; return false; @@ -121,6 +112,24 @@ bool PGMonitor::update_from_paxos() return true; } +void PGMonitor::print_summary_stats(int dbl) +{ + std::stringstream ss; + for (hash_map::iterator p = pg_map.num_pg_by_state.begin(); + p != pg_map.num_pg_by_state.end(); + ++p) { + if (p != pg_map.num_pg_by_state.begin()) + ss << ", "; + ss << p->second << " " << pg_state_string(p->first);// << "(" << p->first << ")"; + } + string states = ss.str(); + dout(dbl) << "v" << pg_map.version << " " + << pg_map.pg_stat.size() << " pgs: " + << states << dendl; + if (!pg_map.creating_pgs.empty()) + dout(20) << " creating_pgs = " << pg_map.creating_pgs << dendl; +} + void PGMonitor::create_pending() { pending_inc = PGMap::Incremental(); @@ -138,7 +147,6 @@ void PGMonitor::encode_pending(bufferlist &bl) bool PGMonitor::preprocess_query(Message *m) { dout(10) << "preprocess_query " << *m << " from " << m->get_source_inst() << dendl; - switch (m->get_type()) { case CEPH_MSG_STATFS: handle_statfs((MStatfs*)m); @@ -147,14 +155,18 @@ bool PGMonitor::preprocess_query(Message *m) case MSG_PGSTATS: { MPGStats *stats = (MPGStats*)m; + int from = m->get_source().num(); + if (pg_map.osd_stat.count(from) || + memcmp(&pg_map.osd_stat[from], &stats->osd_stat, sizeof(stats->osd_stat)) != 0) + return false; // new osd stat for (map::iterator p = stats->pg_stat.begin(); p != stats->pg_stat.end(); p++) { if (pg_map.pg_stat.count(p->first) == 0 || - pg_map.pg_stat[p->first].reported < p->second.reported) - return false; + memcmp(&pg_map.pg_stat[p->first], &p->second, sizeof(p->second)) != 0) + return false; // new pg stat(s) } - dout(10) << " message contains no new pg stats" << dendl; + dout(10) << " message contains no new osd|pg stats" << dendl; return true; } @@ -170,7 +182,7 @@ bool PGMonitor::prepare_update(Message *m) dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << dendl; switch (m->get_type()) { case MSG_PGSTATS: - return handle_pg_stats((MPGStats*)m); + return prepare_pg_stats((MPGStats*)m); default: assert(0); @@ -203,9 +215,9 @@ void PGMonitor::handle_statfs(MStatfs *statfs) delete statfs; } -bool PGMonitor::handle_pg_stats(MPGStats *stats) +bool PGMonitor::prepare_pg_stats(MPGStats *stats) { - dout(10) << "handle_pg_stats " << *stats << " from " << stats->get_source() << dendl; + dout(10) << "prepare_pg_stats " << *stats << " from " << stats->get_source() << dendl; int from = stats->get_source().num(); if (!stats->get_source().is_osd() || !mon->osdmon->osdmap.is_up(from) || diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index b3fc0c0395db4..69593b455dc29 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -50,15 +50,18 @@ private: bool preprocess_query(Message *m); // true if processed. bool prepare_update(Message *m); + bool prepare_pg_stats(MPGStats *stats); + void handle_statfs(MStatfs *statfs); - bool handle_pg_stats(MPGStats *stats); + + void print_summary_stats(int dbl=5); map last_sent_pg_create; // per osd throttle public: PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } - //void tick(); // check state, take actions + void tick(); // check state, take actions void register_new_pgs(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 049789ddc703b..bf22c7b51ed52 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -472,7 +472,7 @@ int OSD::read_superblock() // ====================================================== // PG's -PG *OSD::_new_lock_pg(pg_t pgid) +PG *OSD::_open_lock_pg(pg_t pgid) { // create PG *pg; @@ -500,7 +500,7 @@ PG *OSD::_create_lock_pg(pg_t pgid, ObjectStore::Transaction& t) dout(0) << "_create_lock_pg on " << pgid << ", already have " << *pg_map[pgid] << dendl; // open - PG *pg = _new_lock_pg(pgid); + PG *pg = _open_lock_pg(pgid); // create collection assert(!store->collection_exists(pgid)); @@ -509,6 +509,27 @@ PG *OSD::_create_lock_pg(pg_t pgid, ObjectStore::Transaction& t) return pg; } +PG * OSD::_create_lock_new_pg(pg_t pgid, vector& acting, ObjectStore::Transaction& t) +{ + dout(20) << "_create_lock_new_pg pgid " << pgid << " -> " << acting << dendl; + assert(whoami == acting[0]); + + PG *pg = _create_lock_pg(pgid, t); + pg->set_role(0); + pg->acting.swap(acting); + pg->info.epoch_created = + pg->last_epoch_started_any = + pg->info.last_epoch_started = + pg->info.history.same_since = + pg->info.history.same_primary_since = + pg->info.history.same_acker_since = osdmap->get_epoch(); + pg->write_log(t); + + dout(7) << "_create_lock_new_pg " << *pg << dendl; + return pg; +} + + bool OSD::_have_pg(pg_t pgid) { return pg_map.count(pgid); @@ -554,29 +575,6 @@ void OSD::_remove_unlock_pg(PG *pg) pg->put_unlock(); // will delete, if last reference } - -void OSD::try_create_pg(pg_t pgid, ObjectStore::Transaction& t) -{ - vector acting; - int nrep = osdmap->pg_to_acting_osds(pgid, acting); - dout(20) << "pgid " << pgid << " -> " << acting << dendl; - int role = osdmap->calc_pg_role(whoami, acting, nrep); - if (role < 0) return; - - PG *pg = _create_lock_pg(pgid, t); - pg->set_role(role); - pg->acting.swap(acting); - pg->last_epoch_started_any = - pg->info.last_epoch_started = - pg->info.history.same_since = - pg->info.history.same_primary_since = - pg->info.history.same_acker_since = osdmap->get_epoch(); - pg->write_log(t); - - dout(7) << "created " << *pg << dendl; - pg->unlock(); -} - void OSD::load_pgs() { dout(10) << "load_pgs" << dendl; @@ -589,7 +587,7 @@ void OSD::load_pgs() it != ls.end(); it++) { pg_t pgid = *it; - PG *pg = _new_lock_pg(pgid); + PG *pg = _open_lock_pg(pgid); // read pg info store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info)); @@ -610,11 +608,13 @@ void OSD::load_pgs() /* - * calculate pg primaries during an epoch interval + * calculate prior pg members during an epoch interval [start,end) + * - from each epoch, include all osds up then AND now + * - if no osds from then are up now, include them all, even tho they're not reachable now */ -void OSD::calc_primaries_during(pg_t pgid, epoch_t start, epoch_t end, set& pset) +void OSD::calc_priors_during(pg_t pgid, epoch_t start, epoch_t end, set& pset) { - dout(15) << "calc_primaries_during " << pgid << " [" << start << "," << end << ")" << dendl; + dout(15) << "calc_priors_during " << pgid << " [" << start << "," << end << ")" << dendl; for (epoch_t e = start; e < end; e++) { OSDMap oldmap; @@ -622,9 +622,21 @@ void OSD::calc_primaries_during(pg_t pgid, epoch_t start, epoch_t end, set& vector acting; oldmap.pg_to_acting_osds(pgid, acting); dout(20) << " " << pgid << " in epoch " << e << " was " << acting << dendl; - if (acting.size()) - pset.insert(acting[0]); + int added = 0; + for (unsigned i=0; iis_up(acting[i])) { + pset.insert(acting[i]); + added++; + } + if (!added && acting.size()) { + // sucky. add down osds, even tho we can't reach them right now. + for (unsigned i=0; i created) { // calc prior primary set. may be empty, keep in mind. set pset; - calc_primaries_during(pgid, created, history.same_primary_since, pset); + calc_priors_during(pgid, created, history.same_primary_since, pset); pset.erase(whoami); if (pset.size()) { dout(10) << "mkpg " << pgid << " e " << created << " : querying priors " @@ -1802,7 +1814,8 @@ void OSD::handle_pg_create(MOSDPGCreate *m) } } dout(10) << "mkpg " << pgid << " creating now" << dendl; - try_create_pg(pgid, t); + PG *pg = _create_lock_new_pg(pgid, acting, t); + pg->unlock(); created++; } @@ -1811,7 +1824,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m) delete m; if (created) - update_heartbeat_sets(); + update_heartbeat_peers(); } @@ -1891,7 +1904,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) it != m->get_pg_list().end(); it++) { pg_t pgid = it->pgid; - PG *pg; + PG *pg = 0; if (!_have_pg(pgid)) { // same primary? @@ -1912,40 +1925,46 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) epoch_t last_epoch_started = it->last_epoch_started; - // is this an empty info? - if (it->last_update == eversion_t()) { - // is there a creation probe on this pg? + // DNE on source? + if (it->dne()) { + // is there a creation pending on this pg? if (creating_pgs.count(pgid)) { creating_pgs[pgid].prior.erase(from); if (!creating_pgs[pgid].prior.empty()) { dout(10) << "handle_pg_notify pg " << pgid << " doing creation probe, still waiting for " << creating_pgs[pgid].prior << dendl; + /* + * hrm, what happens if one of the prior osds fails permanently? + * admin needs to recreate the pg basically? + */ continue; } dout(10) << "handle_pg_notify pg " << pgid << " finished creation probe and DNE, creating" << dendl; - last_epoch_started = - history.same_since = - history.same_primary_since = - history.same_acker_since = osdmap->get_epoch(); + pg = _create_lock_new_pg(pgid, acting, t); + // fall through } else { dout(10) << "handle_pg_notify pg " << pgid - << " was doing creation probe, but found pg info on osd" << from << dendl; + << " DNE on source, but creation probe, ignoring" << dendl; + continue; } } creating_pgs.erase(pgid); - // ok, create PG! - pg = _create_lock_pg(pgid, t); - pg->acting.swap(acting); - pg->set_role(role); - pg->info.history = history; - pg->clear_primary_state(); // yep, notably, set hml=false - pg->last_epoch_started_any = last_epoch_started; // _after_ clear_primary_state() - pg->build_prior(); - pg->write_log(t); + // ok, create PG locally using provided Info and History + if (!pg) { + pg = _create_lock_pg(pgid, t); + pg->acting.swap(acting); + pg->set_role(role); + pg->info.history = history; + pg->info.epoch_created = it->epoch_created; + pg->last_epoch_started_any = last_epoch_started; // _after_ clear_primary_state() + pg->clear_primary_state(); // yep, notably, set hml=false + pg->build_prior(); + pg->write_log(t); + } created++; dout(10) << *pg << " is new" << dendl; @@ -1983,22 +2002,20 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) if (had) { if (pg->is_active() && - (*it).is_uptodate() && acting) { + (*it).is_uptodate() && + acting) { pg->uptodate_set.insert(from); dout(10) << *pg << " osd" << from << " now uptodate (" << pg->uptodate_set << "): " << *it << dendl; - if (pg->is_all_uptodate()) - pg->finish_recovery(); } else { // hmm, maybe keep an eye out for cases where we see this, but peer should happen. dout(10) << *pg << " already had notify info from osd" << from << ": " << *it << dendl; } + if (pg->is_all_uptodate()) + pg->finish_recovery(); } else { - // adjust prior? if (it->last_epoch_started > pg->last_epoch_started_any) pg->adjust_prior(); - - // peer pg->peer(t, query_map, &activator_map); } @@ -2012,7 +2029,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) do_activators(activator_map); if (created) - update_heartbeat_sets(); + update_heartbeat_peers(); delete m; } @@ -2131,6 +2148,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m) if (!require_same_or_newer_map(m, m->get_epoch())) return; + int created = 0; map< int, list > notify_list; for (map::iterator it = m->pg_list.begin(); @@ -2170,6 +2188,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m) pg->info.history = history; pg->write_log(t); store->apply_transaction(t); + created++; dout(10) << *pg << " dne (before), but i am role " << role << dendl; } else { @@ -2235,6 +2254,9 @@ void OSD::handle_pg_query(MOSDPGQuery *m) do_notifies(notify_list); delete m; + + if (created) + update_heartbeat_peers(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 350612f0e513a..f8f509966922a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -100,7 +100,7 @@ private: set heartbeat_to, heartbeat_from; map heartbeat_from_stamp; - void update_heartbeat_sets(); + void update_heartbeat_peers(); void heartbeat(); class C_Heartbeat : public Context { @@ -275,14 +275,13 @@ private: bool _have_pg(pg_t pgid); PG *_lookup_lock_pg(pg_t pgid); - PG *_new_lock_pg(pg_t pg); // create new PG (in memory) + PG *_open_lock_pg(pg_t pg); // create new PG (in memory) PG *_create_lock_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG + PG *_create_lock_new_pg(pg_t pgid, vector& acting, ObjectStore::Transaction& t); void _remove_unlock_pg(PG *pg); // remove from store and memory - void try_create_pg(pg_t pgid, ObjectStore::Transaction& t); - void load_pgs(); - void calc_primaries_during(pg_t pgid, epoch_t start, epoch_t end, set& pset); + void calc_priors_during(pg_t pgid, epoch_t start, epoch_t end, set& pset); void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from, vector& last); void activate_pg(pg_t pgid, epoch_t epoch); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index ef5fd91f215f3..a1247847e3ec2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1006,7 +1006,6 @@ void PG::activate(ObjectStore::Transaction& t, void PG::finish_recovery() { dout(10) << "finish_recovery" << dendl; - state_set(PG_STATE_CLEAN); purge_strays(); update_stats(); diff --git a/src/osd/PG.h b/src/osd/PG.h index 632903b1b6320..7edb71cc73068 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -83,6 +83,7 @@ public: last_epoch_started(0), last_epoch_finished(0) {} bool is_uptodate() const { return last_update == last_complete; } bool is_empty() const { return last_update.version == 0; } + bool dne() const { return epoch_created == 0; } }; @@ -648,6 +649,8 @@ inline ostream& operator<<(ostream& out, const PG::Info::History& h) inline ostream& operator<<(ostream& out, const PG::Info& pgi) { out << pgi.pgid << "("; + if (pgi.dne()) + out << " DNE"; if (pgi.is_empty()) out << " empty"; else diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 6aa1b6bdf1857..867ce6c1002a6 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1558,6 +1558,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) if (is_primary()) { // continue recovery + if (info.is_uptodate()) + uptodate_set.insert(osd->get_nodeid()); do_recovery(); } else { // ack if i'm a replica and being pushed to. @@ -1646,21 +1648,6 @@ void ReplicatedPG::cancel_recovery() bool ReplicatedPG::do_recovery() { assert(is_primary()); - /*if (!is_primary()) { - dout(10) << "do_recovery not primary, doing nothing" << dendl; - return true; - } - */ - - if (info.is_uptodate()) { // am i up to date? - if (!is_all_uptodate()) { - dout(-10) << "do_recovery i'm clean but replicas aren't, starting peer recovery" << dendl; - do_peer_recovery(); - } else { - dout(-10) << "do_recovery all clean, nothing to do" << dendl; - } - return true; - } dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, " << osd->num_pulling << "/" << g_conf.osd_max_pull << " total" @@ -1708,10 +1695,14 @@ bool ReplicatedPG::do_recovery() if (is_primary()) { // i am primary - dout(-7) << "do_recovery complete, cleaning strays" << dendl; uptodate_set.insert(osd->whoami); - if (is_all_uptodate()) + if (is_all_uptodate()) { + dout(-7) << "do_recovery complete" << dendl; finish_recovery(); + } else { + dout(-10) << "do_recovery primary now complete, starting peer recovery" << dendl; + do_peer_recovery(); + } } else { // tell primary dout(7) << "do_recovery complete, telling primary" << dendl; @@ -1760,6 +1751,9 @@ void ReplicatedPG::do_peer_recovery() if (is_all_uptodate()) finish_recovery(); + else { + dout(10) << "do_peer_recovery not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl; + } } void ReplicatedPG::purge_strays() diff --git a/src/start.sh b/src/start.sh index 94a6ebe6115a3..44c83aea5d9b2 100755 --- a/src/start.sh +++ b/src/start.sh @@ -1,6 +1,7 @@ #!/bin/sh ./stop.sh +rm core* test -d out || mkdir out rm out/* @@ -38,7 +39,7 @@ $CEPH_BIN/cmonctl osd setmap -i .ceph_osdmap for osd in 0 1 2 3 do $CEPH_BIN/cosd --mkfs_for_osd $osd dev/osd$osd # initialize empty object store - $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_osd 10 + $CEPH_BIN/cosd $ARGS dev/osd$osd --debug_osd 20 done # mds -- 2.39.5