From: Samuel Just Date: Tue, 3 May 2011 00:03:56 +0000 (-0700) Subject: OSD,PG: Peering refactor X-Git-Tag: v0.28~74^2~46 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=48eb343ba222381bcf8eb789020701598c8c3c9f;p=ceph.git OSD,PG: Peering refactor Previously, peering was handled by a defacto state machine in do_peer and related methods. Peering state will now be encapsulated in RecoveryState, which uses boost::state_chart internally to enforce an explicit state machine abstraction. OSD::handle_pg_* pass off to PG::handle_*, which pass messages to the state machine. Signed-off-by: Samuel Just --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 4c23be8897d..75cace88af1 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1168,6 +1168,11 @@ PG *OSD::get_or_create_pg(const PG::Info& info, epoch_t epoch, int from, int& cr // kick any waiters wake_pg_waiters(pg->info.pgid); + + map > query_map; + PG::RecoveryCtx rctx(&query_map, 0, 0, 0, 0); // GetInfo only needs a query_map + pg->handle_create(&rctx); + do_queries(query_map); } else { // already had it. did the mapping change? pg = _lookup_lock_pg(info.pgid); @@ -3203,25 +3208,8 @@ void OSD::advance_map(ObjectStore::Transaction& t) PG *pg = it->second; pg->lock(); - if (pg->handle_advance_map(*osdmap, *lastmap)) { - pg->unlock(); - continue; - } - - // make sure we clear out any pg_temp change requests - pg_temp_wanted.erase(pgid); - pg->prior_set.reset(NULL); - - - pg->cancel_recovery(); - - - // sanity check pg_temp - if (pg->acting.empty() && pg->up.size() && pg->up[0] == whoami) { - dout(10) << *pg << " acting empty, but i am up[0], clearing pg_temp" << dendl; - queue_want_pg_temp(pg->info.pgid, pg->acting); - } - + dout(10) << "Scanning pg " << *pg << dendl; + pg->handle_advance_map(*osdmap, *lastmap, 0); pg->unlock(); } } @@ -3236,8 +3224,6 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) map< int, map > query_map; // peer -> PG -> get_summary_since map info_map; // peer -> message - epoch_t up_thru = osdmap->get_up_thru(whoami); - int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0; epoch_t oldest_last_clean = osdmap->get_epoch(); @@ -3248,7 +3234,6 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) it++) { PG *pg = it->second; pg->lock(); - pg->check_recovery_op_pulls(osdmap); if (pg->is_primary()) num_pg_primary++; @@ -3260,44 +3245,16 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean) oldest_last_clean = pg->info.history.last_epoch_clean; - if (g_conf.osd_check_for_log_corruption) - pg->check_log_for_corruption(store); - - if (pg->is_active() && pg->is_primary() && - (pg->missing.num_missing() > pg->missing_loc.size())) { - if (pg->all_unfound_are_lost(osdmap)) { - pg->mark_all_unfound_as_lost(t); - } - } - if (!osdmap->have_pg_pool(pg->info.pgid.pool())) { //pool is deleted! queue_pg_for_deletion(pg); pg->unlock(); continue; } - if (pg->is_active()) { - // i am active - if (pg->is_primary() && - !pg->snap_trimq.empty() && - pg->is_clean()) - pg->queue_snap_trim(); - } - else if (pg->is_primary() && - !pg->is_active()) { - // i am (inactive) primary - if ((!pg->is_peering() && !pg->is_replay()) || - (pg->need_up_thru && up_thru >= pg->info.history.same_acting_since)) - pg->do_peer(t, tfin, query_map, &info_map); - } - else if (pg->is_stray() && - pg->get_primary() >= 0) { - // i am residual|replica - notify_list[pg->get_primary()].push_back(pg->info); - } - if (pg->is_primary()) - pg->update_stats(); + PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &tfin, &t); + pg->handle_activate_map(&rctx); + pg->unlock(); } @@ -3652,7 +3609,9 @@ void OSD::kick_pg_split_queue() wake_pg_waiters(pg->info.pgid); - pg->do_peer(*t, fin->contexts, query_map, &info_map); + PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); + pg->handle_create(&rctx); + pg->update_stats(); pg->unlock(); created++; @@ -3869,7 +3828,8 @@ void OSD::handle_pg_create(MOSDPGCreate *m) creating_pgs.erase(pgid); wake_pg_waiters(pg->info.pgid); - pg->do_peer(*t, fin->contexts, query_map, &info_map); + PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); + pg->handle_create(&rctx); pg->update_stats(); int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); @@ -3975,39 +3935,8 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) if (!pg) continue; - if (pg->peer_info.count(from) && - pg->peer_info[from].last_update == it->last_update) { - dout(10) << *pg << " got dup osd" << from << " info " << *it << ", identical to ours" << dendl; - } else if (pg->peer_info.count(from) && - pg->is_active()) { - dout(10) << *pg << " got dup osd" << from << " info " << *it - << " but pg is active, keeping our info " << pg->peer_info[from] - << dendl; - } else { - dout(10) << *pg << " got osd" << from << " info " << *it << dendl; - pg->peer_info[from] = *it; - pg->might_have_unfound.insert(from); - - unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); - pg->info.history.merge(it->history); - reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); - - // stray? - if (!pg->is_acting(from)) { - dout(10) << *pg << " osd" << from << " has stray content: " << *it << dendl; - pg->stray_set.insert(from); - pg->state_clear(PG_STATE_CLEAN); - } - - pg->do_peer(*t, fin->contexts, query_map, &info_map); - pg->update_stats(); - } - - if (pg->is_active() && pg->have_unfound()) { - // Make sure we've requested MISSING information from every OSD - // we know about. - pg->discover_all_missing(query_map); - } + PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); + pg->handle_notify(from, *it, &rctx); int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); assert(tr == 0); @@ -4025,140 +3954,6 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) m->put(); } - - -/** PGLog - * from non-primary to primary - * includes log and info - * from primary to non-primary - * includes log for use in recovery - * NOTE: called with opqueue active. - */ - -void OSD::_process_pg_info(epoch_t epoch, int from, - PG::Info &info, - PG::Log &log, - PG::Missing *missing, - map< int, map >& query_map, - map* info_map, - int& created) -{ - ObjectStore::Transaction *t; - C_Contexts *fin; - PG *pg; - - pg = get_or_create_pg(info, epoch, from, created, false, &t, &fin); - if (!pg) - return; - - dout(10) << *pg << ": " << __func__ << " info: " << info << ", "; - if (log.empty()) - *_dout << "(log omitted)"; - else - *_dout << "log: " << log; - *_dout << ", "; - if (!missing) - *_dout << "(missing omitted)"; - else - *_dout << "missing: " << *missing; - *_dout << dendl; - - // don't update history (yet) if we are active and primary; the replica - // may be telling us they have activated (and committed) but we can't - // share that until _everyone_ does the same. - if (pg->is_active() && pg->is_primary() && pg->is_acting(from) && - pg->info.history.last_epoch_started < pg->info.history.same_acting_since && - info.history.last_epoch_started >= pg->info.history.same_acting_since) { - dout(10) << " peer osd" << from << " activated and committed" << dendl; - pg->peer_activated.insert(from); - if (pg->peer_activated.size() == pg->acting.size()) - pg->all_activated_and_committed(); - } else { - unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); - pg->info.history.merge(info.history); - reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); - } - - // dump log - dout(15) << *pg << " my log = "; - pg->log.print(*_dout); - *_dout << std::endl; - if (!log.empty()) { - *_dout << *pg << " osd" << from << " log = "; - log.print(*_dout); - } - *_dout << dendl; - - if (pg->is_primary()) { - // i am PRIMARY - if (pg->is_active()) { - // PG is ACTIVE - if (!pg->is_clean()) { - dout(10) << *pg << " searching osd" << from << " log for unfound items." << dendl; - pg->search_for_missing(info, missing, from); - if (pg->have_unfound()) { - // Make sure we've requested MISSING information from every OSD - // we know about. - pg->discover_all_missing(query_map); - } - queue_for_recovery(pg); // in case we found something. - } - } - else if (missing) { - // PG is INACTIVE - pg->proc_replica_log(*t, info, log, *missing, from); - - // peer - pg->do_peer(*t, fin->contexts, query_map, info_map); - pg->update_stats(); - } - } else if (!pg->info.dne()) { - if (!pg->is_active()) { - // INACTIVE REPLICA - assert(from == pg->acting[0]); - pg->merge_log(*t, info, log, from); - - // We should have the right logs before activating. - assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); - assert(pg->log.head == pg->info.last_update); - - pg->activate(*t, fin->contexts, query_map, info_map); - } else { - // ACTIVE REPLICA - assert(pg->is_replica()); - - // just update our stats - dout(10) << *pg << " writing updated stats" << dendl; - pg->info.stats = info.stats; - - // Handle changes to purged_snaps - interval_set p; - p.union_of(info.purged_snaps, pg->info.purged_snaps); - p.subtract(pg->info.purged_snaps); - pg->info.purged_snaps = info.purged_snaps; - if (!p.empty()) { - dout(10) << " purged_snaps " << pg->info.purged_snaps - << " -> " << info.purged_snaps - << " removed " << p << dendl; - pg->adjust_local_snaps(*t, p); - } - } - - pg->write_info(*t); - - if (!log.empty()) { - dout(10) << *pg << ": inactive replica merging new PG log entries" << dendl; - pg->merge_log(*t, info, log, from); - } - } - - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); - - pg->unlock(); -} - - void OSD::handle_pg_log(MOSDPGLog *m) { dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl; @@ -4167,17 +3962,29 @@ void OSD::handle_pg_log(MOSDPGLog *m) return; int from = m->get_source().num(); - int created = 0; if (!require_same_or_newer_map(m, m->get_epoch())) return; + int created = 0; + ObjectStore::Transaction *t; + C_Contexts *fin; + PG *pg = get_or_create_pg(m->info, m->get_epoch(), + from, created, false, &t, &fin); + if (!pg) + return; + map< int, map > query_map; - _process_pg_info(m->get_epoch(), from, - m->info, m->log, &m->missing, query_map, 0, - created); + map< int, MOSDPGInfo* > info_map; + PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); + pg->handle_log(from, m, &rctx); + pg->unlock(); do_queries(query_map); + do_infos(info_map); + + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); + assert(!tr); + if (created) update_heartbeat_peers(); - m->put(); } @@ -4190,18 +3997,28 @@ void OSD::handle_pg_info(MOSDPGInfo *m) int from = m->get_source().num(); if (!require_same_or_newer_map(m, m->get_epoch())) return; + map< int, MOSDPGInfo* > info_map; - PG::Log empty_log; - map info_map; int created = 0; - map< int, map > query_map; for (vector::iterator p = m->pg_info.begin(); p != m->pg_info.end(); - ++p) - _process_pg_info(m->get_epoch(), from, *p, empty_log, NULL, query_map, &info_map, created); + ++p) { + ObjectStore::Transaction *t = 0; + C_Contexts *fin = 0; + PG *pg = get_or_create_pg(*p, m->get_epoch(), + from, created, false, &t, &fin); + PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t); + if (!pg) + continue; + pg->handle_info(from, *p, &rctx); + + int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); + assert(!tr); + + pg->unlock(); + } - do_queries(query_map); do_infos(info_map); if (created) update_heartbeat_peers(); @@ -4255,6 +4072,8 @@ void OSD::handle_pg_trim(MOSDPGTrim *m) void OSD::handle_pg_missing(MOSDPGMissing *m) { + assert(0); // MOSDPGMissing is fantastical +#if 0 dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl; if (!require_osd_peer(m)) @@ -4267,13 +4086,14 @@ void OSD::handle_pg_missing(MOSDPGMissing *m) map< int, map > query_map; PG::Log empty_log; int created = 0; - _process_pg_info(m->get_epoch(), from, m->info, + _pro-cess_pg_info(m->get_epoch(), from, m->info, //misspelling added to prevent erroneous finds empty_log, &m->missing, query_map, NULL, created); do_queries(query_map); if (created) update_heartbeat_peers(); m->put(); +#endif } /** PGQuery @@ -4292,7 +4112,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m) if (!require_same_or_newer_map(m, m->get_epoch())) return; - int created = 0; map< int, vector > notify_list; for (map::iterator it = m->pg_list.begin(); @@ -4322,15 +4141,15 @@ void OSD::handle_pg_query(MOSDPGQuery *m) PG::Info empty(pgid); notify_list[from].push_back(empty); continue; - } else { - pg = _lookup_lock_pg(pgid); - if (m->get_epoch() < pg->info.history.same_acting_since) { - dout(10) << *pg << " handle_pg_query changed in " - << pg->info.history.same_acting_since - << " (msg from " << m->get_epoch() << ")" << dendl; - pg->unlock(); - continue; - } + } + + pg = _lookup_lock_pg(pgid); + if (m->get_epoch() < pg->info.history.same_acting_since) { + dout(10) << *pg << " handle_pg_query changed in " + << pg->info.history.same_acting_since + << " (msg from " << m->get_epoch() << ")" << dendl; + pg->unlock(); + continue; } if (pg->deleting) { @@ -4352,56 +4171,14 @@ void OSD::handle_pg_query(MOSDPGQuery *m) reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); // ok, process query! - assert(!pg->acting.empty()); - assert(from == pg->acting[0]); - - if (it->second.type == PG::Query::INFO) { - // info - dout(10) << *pg << " sending info" << dendl; - notify_list[from].push_back(pg->info); - } else { - if (it->second.type == PG::Query::BACKLOG && - !pg->log.backlog) { - dout(10) << *pg << " requested info+missing+backlog - queueing for backlog" << dendl; - queue_generate_backlog(pg); - } else { - MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), pg->info); - mlog->missing = pg->missing; - - // primary -> other, when building master log - if (it->second.type == PG::Query::LOG) { - dout(10) << *pg << " sending info+missing+log since " << it->second.since - << dendl; - mlog->log.copy_after(pg->log, it->second.since); - } - - if (it->second.type == PG::Query::BACKLOG) { - dout(10) << *pg << " sending info+missing+backlog" << dendl; - assert(pg->log.backlog); - mlog->log = pg->log; - } - else if (it->second.type == PG::Query::FULLLOG) { - dout(10) << *pg << " sending info+missing+full log" << dendl; - mlog->log.copy_non_backlog(pg->log); - } - - dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl; - //m->log.print(cout); - - _share_map_outgoing(osdmap->get_cluster_inst(from)); - cluster_messenger->send_message(mlog, m->get_connection()); - } - } - + PG::RecoveryCtx rctx(0, 0, ¬ify_list, 0, 0); + pg->handle_query(from, it->second, &rctx); pg->unlock(); } do_notifies(notify_list); m->put(); - - if (created) - update_heartbeat_peers(); } @@ -4627,6 +4404,13 @@ void OSD::generate_backlog(PG *pg) pg->lock(); dout(10) << *pg << " generate_backlog" << dendl; + int tr; + map< int, map > query_map; + map< int, MOSDPGInfo* > info_map; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + C_Contexts *fin = new C_Contexts; + PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); + if (!pg->generate_backlog_epoch) { dout(10) << *pg << " generate_backlog was canceled" << dendl; goto out; @@ -4647,27 +4431,14 @@ void OSD::generate_backlog(PG *pg) goto out2; } - if (!pg->is_primary()) { - dout(10) << *pg << " sending info+missing+backlog to primary" << dendl; - assert(!pg->is_active()); // for now - MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info); - m->missing = pg->missing; - m->log = pg->log; - _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary())); - cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary())); - } else { - dout(10) << *pg << " generated backlog, peering" << dendl; - - map< int, map > query_map; // peer -> PG -> get_summary_since - ObjectStore::Transaction *t = new ObjectStore::Transaction; - C_Contexts *fin = new C_Contexts; - pg->do_peer(*t, fin->contexts, query_map, NULL); - do_queries(query_map); - pg->write_info(*t); - pg->write_log(*t); - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); - } + pg->handle_backlog_generated(&rctx); + do_queries(query_map); + do_infos(info_map); + pg->write_info(*t); + pg->write_log(*t); + tr = store->queue_transaction(&pg->osr, t, + new ObjectStore::C_DeleteTransaction(t), fin); + assert(!tr); out2: map_lock.put_read(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 748ccab71a4..40ff50644ba 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -661,15 +661,6 @@ protected: void queue_pg_for_deletion(PG *pg); void _remove_pg(PG *pg); - // helper for handle_pg_log and handle_pg_info - void _process_pg_info(epoch_t epoch, int from, - PG::Info &info, - PG::Log &log, - PG::Missing *missing, - map< int, map >& query_map, - map* info_map, - int& created); - // backlogs xlist backlog_queue; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 96604c1e4b1..3596ba00917 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -177,91 +177,98 @@ void PG::trim_write_ahead() } -void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from) +void PG::proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from) { dout(10) << "proc_replica_log for osd" << from << ": " << olog << " " << omissing << dendl; - assert(!is_active()); + assert(!is_active() && is_primary()); - if (!have_master_log) { - // merge log into our own log to build master log. no need to - // make any adjustments to their missing map; we are taking their - // log to be authoritative (i.e., their entries are by definitely - // non-divergent). - merge_log(t, oinfo, olog, from); + // merge log into our own log to build master log. no need to + // make any adjustments to their missing map; we are taking their + // log to be authoritative (i.e., their entries are by definitely + // non-divergent). + merge_log(t, oinfo, olog, from); + peer_info[from] = oinfo; + dout(10) << " peer osd" << from << " now " << oinfo << " " << omissing << dendl; + might_have_unfound.insert(from); - } else if (is_acting(from)) { - // replica. have master log. - // populate missing; check for divergence + search_for_missing(oinfo, &omissing, from); + peer_missing[from].swap(omissing); +} - /* - basically what we're doing here is rewinding the remote log, - dropping divergent entries, until we find something that matches - our master log. we then reset last_update to reflect the new - point up to which missing is accurate. - - later, in activate(), missing will get wound forward again and - we will send the peer enough log to arrive at the same state. - */ - - list::const_reverse_iterator pp = olog.log.rbegin(); - eversion_t lu(oinfo.last_update); - while (true) { - if (pp == olog.log.rend()) { - lu = olog.tail; - break; - } - const Log::Entry& oe = *pp; +void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from) { + /* + basically what we're doing here is rewinding the remote log, + dropping divergent entries, until we find something that matches + our master log. we then reset last_update to reflect the new + point up to which missing is accurate. - // don't continue past the tail of our log. - if (oe.version <= log.tail) - break; + later, in activate(), missing will get wound forward again and + we will send the peer enough log to arrive at the same state. + */ - if (!log.objects.count(oe.soid)) { - dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl; - ++pp; - continue; - } + for (map::iterator i = omissing.missing.begin(); + i != omissing.missing.end(); + ++i) { + dout(10) << "Missing sobject: " << i->first << dendl; + } + list::const_reverse_iterator pp = olog.log.rbegin(); + eversion_t lu(oinfo.last_update); + while (true) { + if (pp == olog.log.rend()) { + lu = olog.tail; + break; + } + const Log::Entry& oe = *pp; + + // don't continue past the tail of our log. + if (oe.version <= log.tail) + break; + + if (!log.objects.count(oe.soid)) { + dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl; + ++pp; + continue; + } - Log::Entry& ne = *log.objects[oe.soid]; - if (ne.version == oe.version) { - dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl; - lu = pp->version; - break; - } - if (ne.version > oe.version) { - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + Log::Entry& ne = *log.objects[oe.soid]; + if (ne.version == oe.version) { + dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl; + lu = pp->version; + break; + } + if (ne.version > oe.version) { + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + } else { + if (oe.is_delete()) { + if (ne.is_delete()) { + // old and new are delete + dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl; + } else { + // old delete, new update. + dout(20) << " had " << oe << " new " << ne << " : missing" << dendl; + omissing.add(ne.soid, ne.version, eversion_t()); + } } else { - if (oe.is_delete()) { - if (ne.is_delete()) { - // old and new are delete - dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl; - } else { - // old delete, new update. - dout(20) << " had " << oe << " new " << ne << " : missing" << dendl; - omissing.add(ne.soid, ne.version, eversion_t()); - } + if (ne.is_delete()) { + // old update, new delete + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + omissing.rm(oe.soid, oe.version); } else { - if (ne.is_delete()) { - // old update, new delete - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.rm(oe.soid, oe.version); - } else { - // old update, new update - dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; - omissing.revise_need(ne.soid, ne.version); - } + // old update, new update + dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl; + omissing.revise_need(ne.soid, ne.version); } } + } - ++pp; - } + ++pp; + } - if (lu < oinfo.last_update) { - dout(10) << " peer osd" << from << " last_update now " << lu << dendl; - oinfo.last_update = lu; - if (lu < oinfo.last_complete) - oinfo.last_complete = lu; - } + if (lu < oinfo.last_update) { + dout(10) << " peer osd" << from << " last_update now " << lu << dendl; + oinfo.last_update = lu; + if (lu < oinfo.last_complete) + oinfo.last_complete = lu; } peer_info[from] = oinfo; @@ -269,9 +276,35 @@ void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, M might_have_unfound.insert(from); search_for_missing(oinfo, &omissing, from); + for (map::iterator i = omissing.missing.begin(); + i != omissing.missing.end(); + ++i) { + dout(10) << "Final Missing sobject: " << i->first << dendl; + } peer_missing[from].swap(omissing); } +void PG::proc_replica_info(int from, Info &info) +{ + assert(is_primary()); + peer_info[from] = info; + might_have_unfound.insert(from); + + osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); + info.history.merge(info.history); + osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); + + // stray? + if (!is_acting(from)) { + dout(10) << " osd" << from << " has stray content: " << info << dendl; + stray_set.insert(from); + if (is_clean()) { + purge_strays(); + } + } + update_stats(); +} + /* * merge an old (possibly divergent) log entry into the new log. this @@ -915,10 +948,8 @@ void PG::trim_past_intervals() // true if the given map affects the prior set -bool PG::prior_set_affected(const OSDMap *osdmap) const +bool PG::prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const { - const PgPriorSet &prior = *prior_set.get(); - for (set::iterator p = prior.cur.begin(); p != prior.cur.end(); ++p) @@ -1083,7 +1114,7 @@ void PG::mark_all_unfound_as_lost(ObjectStore::Transaction& t) share_pg_log(old_last_update); } -void PG::build_prior() +void PG::build_prior(std::auto_ptr &prior_set) { if (1) { // sanity check @@ -1093,11 +1124,6 @@ void PG::build_prior() assert(info.history.last_epoch_started >= it->second.history.last_epoch_started); } } - generate_past_intervals(); - - state_clear(PG_STATE_CRASHED); - state_clear(PG_STATE_DOWN); - stringstream out; prior_set.reset(new PgPriorSet(osd->whoami, *osd->osdmap, @@ -1109,7 +1135,6 @@ void PG::build_prior() dout(10) << out << dendl; PgPriorSet &prior(*prior_set.get()); - dout(10) << "build_prior: " << *this << " " << (prior.crashed ? " crashed":"") << (prior.pg_down ? " down":"") @@ -1142,7 +1167,6 @@ void PG::clear_primary_state() // clear peering state have_master_log = false; - prior_set.reset(NULL); stray_set.clear(); peer_info_requested.clear(); peer_log_requested.clear(); @@ -1216,127 +1240,6 @@ bool PG::choose_acting(int newest_update_osd) return true; } -// if false, stop. -bool PG::recover_master_log(map< int, map >& query_map, - eversion_t &oldest_update) -{ - dout(10) << "recover_master_log" << dendl; - - if (is_down()) - return false; - - if (peer_info_requested.empty()) { - stringstream out; - prior_set->gen_query_map(*osd->osdmap, info, query_map); - dout(10) << out << dendl; - for (map< int, map >::const_iterator i = query_map.begin(); - i != query_map.end(); ++i) { - peer_info_requested.insert(i->first); - } - } - - bool lack_info = false; - for (set::const_iterator it = prior_set->cur.begin(); - it != prior_set->cur.end(); - ++it) { - if (peer_info.count(*it)) { - dout(10) << " have info from osd" << *it - << ": " << peer_info[*it] - << dendl; - continue; - } - lack_info = true; - - if (peer_info_requested.find(*it) != peer_info_requested.end()) { - dout(10) << " waiting for osd" << *it << dendl; - continue; - } - } - if (lack_info) - return false; - - // -- ok, we have all (prior_set) info. (and maybe others.) - dout(10) << " have prior_set info. min_last_complete_ondisk " << min_last_complete_ondisk << dendl; - - bool need_backlog, wait_for_backlog; - int pull_from; - eversion_t newest_update; - stringstream out; - choose_log_location(need_backlog, - wait_for_backlog, pull_from, newest_update, - oldest_update); - dout(10) << out << dendl; - - if (pull_from == -1) { - pull_from = osd->whoami; - } - - // -- do i need to generate backlog? - if (need_backlog && !info.log_backlog) { - osd->queue_generate_backlog(this); - } - - // -- decide what acting set i want, based on state of up set - if (!choose_acting(pull_from)) - return false; - - // gather log(+missing) from that person! - if (pull_from != osd->whoami) { - const Info& pi = peer_info[pull_from]; - if (pi.log_tail <= log.head) { - if (peer_log_requested.count(pull_from)) { - dout(10) << " newest update on osd" << pull_from - << " v " << newest_update - << ", already queried log" - << dendl; - } else { - // we'd _like_ it back to oldest_update, but take what we can get. - dout(10) << " newest update on osd" << pull_from - << " v " << newest_update - << ", querying since oldest_update " << oldest_update - << dendl; - query_map[pull_from][info.pgid] = Query(Query::LOG, oldest_update, info.history); - peer_log_requested.insert(pull_from); - } - } else { - dout(10) << " newest update on osd" << pull_from - << ", whose log.tail " << pi.log_tail - << " > my log.head " << log.head - << ", i will need backlog for me+them." << dendl; - // it's possible another peer could fill in the missing bits, but - // pretty unlikely. someday it may be worth the complexity to - // try. until then, just get the full backlogs. - if (!log.backlog) { - osd->queue_generate_backlog(this); - return false; - } - - if (peer_backlog_requested.count(pull_from)) { - dout(10) << " newest update on osd" << pull_from - << " v " << newest_update - << ", already queried summary/backlog" - << dendl; - } else { - dout(10) << " newest update on osd" << pull_from - << " v " << newest_update - << ", querying entire summary/backlog" - << dendl; - query_map[pull_from][info.pgid] = Query(Query::BACKLOG, info.history); - peer_backlog_requested.insert(pull_from); - } - } - return false; - } else { - dout(10) << " newest_update " << info.last_update << " (me)" << dendl; - } - - dout(10) << " oldest_update " << oldest_update << dendl; - - have_master_log = true; - - return true; -} - void PG::choose_log_location(bool &need_backlog, bool &wait_on_backlog, int &pull_from, @@ -1416,149 +1319,6 @@ void PG::choose_log_location(bool &need_backlog, } } -void PG::do_peer(ObjectStore::Transaction& t, list& tfin, - map< int, map >& query_map, - map *activator_map) -{ - dout(10) << "PG::do_peer: peer up " << up << ", acting " - << acting << dendl; - - if (!is_active()) - state_set(PG_STATE_PEERING); - - if (!prior_set.get()) - build_prior(); - - dout(10) << "PG::do_peer: peer prior_set is " - << *prior_set << dendl; - - eversion_t oldest_update; - if (!have_master_log) { - if (!recover_master_log(query_map, oldest_update)) - return; - } - else { - if (up != acting) { - // are we done generating backlog(s)? - if (!choose_acting(osd->whoami)) - return; - } - } - - /** COLLECT MISSING+LOG FROM PEERS **********/ - /* - we also detect divergent replicas here by pulling the full log - from everyone. - - for example: - 0: 1: 2: - 2'6 2'6 2'6 - 2'7 2'7 2'7 - 3'8 | 2'8 2'8 - 3'9 | 2'9 - - */ - - // gather missing from peers - bool have_all_missing = true; - for (unsigned i=1; iosdmap->get_up_thru(osd->whoami) - << " < same_since " << info.history.same_acting_since - << ", must notify monitor" << dendl; - need_up_thru = true; - osd->queue_want_up_thru(info.history.same_acting_since); - return; - } else { - dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami) - << " >= same_since " << info.history.same_acting_since - << ", all is well" << dendl; - } - } - - // -- crash recovery? - if (is_crashed()) { - replay_until = g_clock.now(); - replay_until += g_conf.osd_replay_window; - dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window - << " until " << replay_until << dendl; - state_set(PG_STATE_REPLAY); - osd->replay_queue_lock.Lock(); - osd->replay_queue.push_back(pair(info.pgid, replay_until)); - osd->replay_queue_lock.Unlock(); - } - - if (!is_active()) { - activate(t, tfin, query_map, activator_map); - } - - if (is_all_uptodate()) - finish_recovery(t, tfin); -} - /* Build the might_have_unfound set. * * This is used by the primary OSD during recovery. @@ -1625,6 +1385,17 @@ void PG::activate(ObjectStore::Transaction& t, list& tfin, map *activator_map) { assert(!is_active()); + // -- crash recovery? + if (is_crashed()) { + replay_until = g_clock.now(); + replay_until += g_conf.osd_replay_window; + dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window + << " until " << replay_until << dendl; + state_set(PG_STATE_REPLAY); + osd->replay_queue_lock.Lock(); + osd->replay_queue.push_back(pair(info.pgid, replay_until)); + osd->replay_queue_lock.Unlock(); + } // twiddle pg state state_set(PG_STATE_ACTIVE); @@ -1658,9 +1429,6 @@ void PG::activate(ObjectStore::Transaction& t, list& tfin, need_up_thru = false; - // clear prior set (and dependency info)... we are done peering! - prior_set.reset(NULL); - // if we are building a backlog, cancel it! if (up == acting) osd->cancel_generate_backlog(this); @@ -1730,7 +1498,7 @@ void PG::activate(ObjectStore::Transaction& t, list& tfin, if (pi.last_update == info.last_update && !need_old_log_entries) { // empty log - if (activator_map) { + if (!pi.is_empty() && activator_map) { dout(10) << "activate peer osd" << peer << " is up to date, queueing in pending_activators" << dendl; if (activator_map->count(peer) == 0) (*activator_map)[peer] = new MOSDPGInfo(osd->osdmap->get_epoch()); @@ -1850,7 +1618,9 @@ void PG::_activate_committed(epoch_t e) if (is_primary()) { peer_activated.insert(osd->whoami); - dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated << dendl; + dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated + << " last_epoch_started " << info.history.last_epoch_started + << " same_acting_since " << info.history.same_acting_since << dendl; if (peer_activated.size() == acting.size()) all_activated_and_committed(); } else { @@ -2044,6 +1814,7 @@ void PG::purge_strays() peer_info.erase(*p); } + state_set(PG_STATE_CLEAN); stray_set.clear(); // clear _requested maps; we may have to peer() again if we discover @@ -3503,14 +3274,52 @@ void PG::share_pg_log(const eversion_t &oldver) } } -bool PG::handle_advance_map(OSDMap &osdmap, - const OSDMap &lastmap) +void PG::fulfill_info(int from, const Query &query, + pair ¬ify_info) { - if (acting_up_affected(osdmap, lastmap)) { - return true; + assert(!acting.empty()); + assert(from == acting[0]); + assert(query.type == PG::Query::INFO); + + // info + dout(10) << "sending info" << dendl; + notify_info = make_pair(from, info); +} + +void PG::fulfill_log(int from, const Query &query) +{ + assert(!acting.empty()); + assert(from == acting[0]); + assert(query.type != PG::Query::INFO); + if (query.type == PG::Query::BACKLOG && + !log.backlog) { + assert(0); // generated in the state machine } else { - warm_restart(); - return false; + MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(), info); + mlog->missing = missing; + + // primary -> other, when building master log + if (query.type == PG::Query::LOG) { + dout(10) << " sending info+missing+log since " << query.since + << dendl; + mlog->log.copy_after(log, query.since); + } + + if (query.type == PG::Query::BACKLOG) { + dout(10) << "sending info+missing+backlog" << dendl; + assert(log.backlog); + mlog->log = log; + } + else if (query.type == PG::Query::FULLLOG) { + dout(10) << " sending info+missing+full log" << dendl; + mlog->log.copy_non_backlog(log); + } + + dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl; + + osd->_share_map_outgoing(osd->osdmap->get_cluster_inst(from)); + osd->cluster_messenger->send_message(mlog, + osd->osdmap->get_cluster_inst(from)); } } @@ -3687,6 +3496,26 @@ void PG::warm_restart() } } +void PG::process_primary_info(ObjectStore::Transaction &t, const Info &oinfo) +{ + assert(is_replica()); + assert(is_active()); + info.stats = oinfo.stats; + + // Handle changes to purged_snaps + interval_set p; + p.union_of(oinfo.purged_snaps, info.purged_snaps); + p.subtract(info.purged_snaps); + info.purged_snaps = oinfo.purged_snaps; + if (!p.empty()) { + dout(10) << " purged_snaps " << info.purged_snaps + << " -> " << oinfo.purged_snaps + << " removed " << p << dendl; + adjust_local_snaps(t, p); + } + write_info(t); +} + unsigned int PG::Missing::num_missing() const { return missing.size(); @@ -3886,6 +3715,583 @@ std::ostream& operator<<(std::ostream& oss, << "lost=" << prior.lost << " ]]"; return oss; } + +/*------------ Recovery State Machine----------------*/ +#undef dout_prefix +#define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() << "StateMachine: ") + +/*------Started-------*/ +boost::statechart::result +PG::RecoveryState::Started::react(const AdvMap& advmap) { + dout(10) << "Started advmap" << dendl; + PG *pg = context< RecoveryMachine >().pg; + if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) { + dout(10) << "up or acting affected, transitioning to Reset" << dendl; + return transit< Reset >(); + } + return discard_event(); +} + +/*--------Reset---------*/ +boost::statechart::result +PG::RecoveryState::Reset::react(const AdvMap& advmap) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "Reset advmap" << dendl; + if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) { + dout(10) << "up or acting affected, calling warm_restart again" << dendl; + pg->warm_restart(); + } + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::Reset::react(const ActMap& actmap) { + PG *pg = context< RecoveryMachine >().pg; + if (pg->is_stray() && pg->get_primary() >= 0) { + context< RecoveryMachine >().send_notify(pg->get_primary(), + pg->info); + } + return transit< Started >(); +} + +PG::RecoveryState::Reset::Reset(my_context ctx) : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "Reseting" << dendl; + pg->warm_restart(); + post_event(Initialize()); +} + +/*-------Start---------*/ +PG::RecoveryState::Start::Start(my_context ctx) : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + if (pg->is_primary()) { + dout(1) << "transitioning to Primary" << dendl; + post_event(MakePrimary()); + } else { //is_stray + dout(1) << "transitioning to Stray" << dendl; + post_event(MakeStray()); + } +} + +/*---------Primary--------*/ +boost::statechart::result +PG::RecoveryState::Primary::react(const BacklogComplete&) { + PG *pg = context< RecoveryMachine >().pg; + pg->choose_acting(pg->osd->whoami); + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::Primary::react(const MNotifyRec& notevt) { + dout(7) << "handle_pg_notify from " << notevt.from << dendl; + PG *pg = context< RecoveryMachine >().pg; + if (pg->peer_info.count(notevt.from) && + pg->peer_info[notevt.from].last_update == notevt.info.last_update) { + dout(10) << *pg << " got dup osd" << notevt.from << " info " << notevt.info + << ", identical to ours" << dendl; + } else { + pg->proc_replica_info(notevt.from, notevt.info); + } + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::Primary::react(const ActMap&) { + dout(7) << "handle ActMap primary" << dendl; + PG *pg = context< RecoveryMachine >().pg; + pg->update_stats(); + return forward_event(); +} + +/*---------Peering--------*/ +PG::RecoveryState::Peering::Peering(my_context ctx) + : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + assert(!pg->is_active()); + assert(!pg->is_peering()); + assert(pg->is_primary()); + pg->state_set(PG_STATE_PEERING); +} + +boost::statechart::result +PG::RecoveryState::Peering::react(const AdvMap& advmap) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "Peering advmap" << dendl; + if (pg->prior_set_affected(*prior_set.get(), &advmap.osdmap)) { + dout(1) << "Peering, priors_set_affected, going to Reset" << dendl; + return transit< Reset >(); + } + return forward_event(); +} + +void PG::RecoveryState::Peering::exit() { + dout(10) << "Leaving Peering" << dendl; +} + +/*---------Active---------*/ +PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + assert(pg->is_primary()); + dout(10) << "In Active, about to call activate" << dendl; + pg->activate(*context< RecoveryMachine >().get_cur_transaction(), + *context< RecoveryMachine >().get_context_list(), + *context< RecoveryMachine >().get_query_map(), + context< RecoveryMachine >().get_info_map()); + assert(pg->is_active()); + dout(10) << "Activate Finished" << dendl; +} + +boost::statechart::result +PG::RecoveryState::Active::react(const AdvMap& advmap) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "Active advmap" << dendl; + if (!pg->pool->newly_removed_snaps.empty()) { + pg->snap_trimq.union_of(pg->pool->newly_removed_snaps); + dout(10) << *pg << " snap_trimq now " << pg->snap_trimq << dendl; + pg->dirty_info = true; + } + return forward_event(); +} + + +boost::statechart::result +PG::RecoveryState::Active::react(const ActMap&) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "Active: handling ActMap" << dendl; + assert(pg->is_active()); + assert(pg->is_primary()); + pg->check_recovery_op_pulls(pg->osd->osdmap); + + if (g_conf.osd_check_for_log_corruption) + pg->check_log_for_corruption(pg->osd->store); + + if (pg->missing.num_missing() > pg->missing_loc.size()) { + if (pg->all_unfound_are_lost(pg->osd->osdmap)) { + pg->mark_all_unfound_as_lost( + *context< RecoveryMachine >().get_cur_transaction()); + } + } + + if (!pg->snap_trimq.empty() && + pg->is_clean()) { + dout(10) << "Active: queuing snap trim" << dendl; + pg->queue_snap_trim(); + } + + if (pg->is_all_uptodate()) { + dout(10) << "Active: all up to date, going clean" << dendl; + pg->finish_recovery(*context< RecoveryMachine >().get_cur_transaction(), + *context< RecoveryMachine >().get_context_list()); + } + + pg->choose_acting(pg->osd->whoami); + + return forward_event(); +} + + +boost::statechart::result +PG::RecoveryState::Active::react(const MNotifyRec& notevt) { + PG *pg = context< RecoveryMachine >().pg; + assert(pg->is_active()); + assert(pg->is_primary()); + if (pg->peer_info.count(notevt.from)) { + dout(10) << "Active: got notify from " << notevt.from + << ", already have info from that osd, ignoring" + << dendl; + } else { + dout(10) << "Active: got notify from " << notevt.from + << ", calling proc_replica_info and discover_all_missing" + << dendl; + pg->proc_replica_info(notevt.from, notevt.info); + if (pg->have_unfound()) { + pg->discover_all_missing(*context< RecoveryMachine >().get_query_map()); + } + } + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::Active::react(const MInfoRec& infoevt) { + PG *pg = context< RecoveryMachine >().pg; + assert(pg->is_active()); + assert(pg->is_primary()); + + // don't update history (yet) if we are active and primary; the replica + // may be telling us they have activated (and committed) but we can't + // share that until _everyone_ does the same. + if (pg->is_acting(infoevt.from)) { + assert(pg->info.history.last_epoch_started < + pg->info.history.same_acting_since); + assert(infoevt.info.history.last_epoch_started >= + pg->info.history.same_acting_since); + dout(10) << " peer osd" << infoevt.from << " activated and committed" + << dendl; + pg->peer_activated.insert(infoevt.from); + } + + if (pg->peer_activated.size() == pg->acting.size()) { + pg->all_activated_and_committed(); + } + return discard_event(); +} + +/*------ReplicaActive-----*/ +PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx) + : my_base(ctx) { + dout(10) << "In ReplicaActive, about to call activate" << dendl; + PG *pg = context< RecoveryMachine >().pg; + map< int, map< pg_t, Query> > query_map; + pg->activate(*context< RecoveryMachine >().get_cur_transaction(), + *context< RecoveryMachine >().get_context_list(), + query_map, NULL); + dout(10) << "Activate Finished" << dendl; +} + +boost::statechart::result +PG::RecoveryState::ReplicaActive::react(const MInfoRec& infoevt) { + PG *pg = context< RecoveryMachine >().pg; + pg->process_primary_info(*context().get_cur_transaction(), + infoevt.info); + return discard_event(); +} + +/*-------Stray---*/ +PG::RecoveryState::Stray::Stray(my_context ctx) + : my_base(ctx), backlog_requested(false) { + PG *pg = context< RecoveryMachine >().pg; + assert(!pg->is_active()); + assert(!pg->is_peering()); + assert(!pg->is_primary()); + pg->state_set(PG_STATE_PEERING); +} + +boost::statechart::result +PG::RecoveryState::Stray::react(const MLogRec& logevt) { + PG *pg = context< RecoveryMachine >().pg; + MOSDPGLog *msg = logevt.msg; + dout(10) << "received log from " << logevt.from << dendl; + pg->merge_log(*context().get_cur_transaction(), + msg->info, msg->log, logevt.from); + + assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); + assert(pg->log.head == pg->info.last_update); + + pg->write_info(*context().get_cur_transaction()); + dout(10) << "activating!" << dendl; + post_event(Activate()); + return discard_event(); +}; + +boost::statechart::result +PG::RecoveryState::Stray::react(const MInfoRec& infoevt) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "received info from " << infoevt.from << dendl; + + Log empty_log; + pg->merge_log(*context().get_cur_transaction(), + infoevt.info, empty_log, infoevt.from); + + assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog); + assert(pg->log.head == pg->info.last_update); + + pg->write_info(*context().get_cur_transaction()); + dout(10) << "activating!" << dendl; + post_event(Activate()); + return discard_event(); +}; + +boost::statechart::result +PG::RecoveryState::Stray::react(const BacklogComplete&) { + PG *pg = context< RecoveryMachine >().pg; + assert(backlog_requested); + dout(10) << "BacklogComplete" << dendl; + for (map::iterator i = pending_queries.begin(); + i != pending_queries.end(); + pending_queries.erase(i++)) { + dout(10) << "sending log to " << i->first << dendl; + pg->fulfill_log(i->first, i->second); + } + backlog_requested = false; + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::Stray::react(const MQuery& query) { + PG *pg = context< RecoveryMachine >().pg; + if (query.query.type == Query::BACKLOG) { + if (!pg->log.backlog) { + dout(10) << "Stray, need a backlog!" + << dendl; + pending_queries[query.from] = query.query; + if (!backlog_requested) { + dout(10) << "Stray, generating a backlog!" + << dendl; + backlog_requested = true; + pg->osd->queue_generate_backlog(pg); + } + return discard_event(); + } + } + + if (query.query.type == Query::INFO) { + pair notify_info; + pg->fulfill_info(query.from, query.query, notify_info); + context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second); + } else { + pg->fulfill_log(query.from, query.query); + } + return discard_event(); +} + +/*--------GetInfo---------*/ +PG::RecoveryState::GetInfo::GetInfo(my_context ctx) : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + pg->generate_past_intervals(); + auto_ptr &prior_set = context< Peering >().prior_set; + + if (!prior_set.get()) { + stringstream out; + pg->build_prior(prior_set); + dout(10) << "PG::do_peer: peer prior_set is " + << *prior_set << dendl; + + if (pg->need_up_thru) { + dout(10) << "transitioning to pending, need upthru" << dendl; + post_event(NeedNewMap()); + } else { + stringstream out; + map< int, map< pg_t, Query > > query_map; + prior_set->gen_query_map(*pg->osd->osdmap, + pg->info, + query_map); + for (map< int, map< pg_t, Query> >::iterator i = query_map.begin(); + i != query_map.end(); + ++i) { + for (map< pg_t, Query >::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + context< RecoveryMachine >().send_query(i->first, j->second); + peer_info_requested.insert(i->first); + } + } + } + } + if (peer_info_requested.empty()) { + post_event(GotInfo()); + } +} + +boost::statechart::result +PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) { + if (peer_info_requested.count(infoevt.from)) { + peer_info_requested.erase(infoevt.from); + } + if (peer_info_requested.empty()) { + post_event(GotInfo()); + } + return forward_event(); +} + +/*------GetLog------------*/ +PG::RecoveryState::GetLog::GetLog(my_context ctx) : + my_base(ctx), newest_update_osd(-1), need_backlog(false), msg(0) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "In GetLog, selecting log location" << dendl; + eversion_t newest_update; + eversion_t oldest_update; + stringstream out; + pg->choose_log_location(need_backlog, + wait_on_backlog, + newest_update_osd, + newest_update, + oldest_update); + + if (!pg->choose_acting(newest_update_osd == -1 ? 0 : newest_update_osd)) { + dout(10) << "transitioning to pending, need new acting" << dendl; + post_event(NeedNewMap()); + } else { + + if (need_backlog && !pg->log.backlog) { + dout(10) << "In GetLog, need backlog" << dendl; + pg->osd->queue_generate_backlog(pg); + } + + if (newest_update_osd != -1) { + dout(10) << "Sending request to osd" << newest_update_osd + << "for the master log" << dendl; + context().send_query(newest_update_osd, + Query(wait_on_backlog ? Query::BACKLOG : Query::LOG, + oldest_update, + pg->info.history)); + } + + if (pg->log.backlog) { + wait_on_backlog = false; + } + + if (!wait_on_backlog && newest_update_osd == -1) { + dout(10) << "GetLog: neither backlog nor master log needed, " + << "moving to GetMissing" << dendl; + post_event(GotLog()); + } + } +} + +boost::statechart::result +PG::RecoveryState::GetLog::react(const MLogRec& logevt) { + assert(!msg); + assert(logevt.from == newest_update_osd); + dout(10) << "GetLog: recieved master log from osd" + << logevt.from << dendl; + msg = logevt.msg; + msg->get(); + if (!wait_on_backlog) { + dout(10) << "GetLog: already have/don't need backlog." + << "moving on to GetMissing" << dendl; + post_event(GotLog()); + } + dout(10) << "GetLog: Still need backlog" << dendl; + return discard_event(); +} + +boost::statechart::result +PG::RecoveryState::GetLog::react(const BacklogComplete&) { + wait_on_backlog = false; + if (msg || newest_update_osd == -1) { + dout(10) << "GetLog: already have/don't need master log." + << "moving on to GetMissing" << dendl; + post_event(GotLog()); + } else { + dout(10) << "GetLog: Still need master log" << dendl; + } + return forward_event(); +} + +void PG::RecoveryState::GetLog::exit() { + PG *pg = context< RecoveryMachine >().pg; + if (msg) { + dout(10) << "processing master log" << dendl; + pg->proc_master_log(*context().get_cur_transaction(), + msg->info, msg->log, msg->missing, + newest_update_osd); + } +} + +PG::RecoveryState::GetLog::~GetLog() { + dout(10) << "leaving GetLog" << dendl; + if (msg) msg->put(); +} + +/*------GetMissing--------*/ +PG::RecoveryState::GetMissing::GetMissing(my_context ctx) : my_base(ctx) { + PG *pg = context< RecoveryMachine >().pg; + map &peer_missing = pg->peer_missing; + for (vector::iterator i = pg->acting.begin()++; + i != pg->acting.end(); + ++i) { + const Info& pi = pg->peer_info[*i]; + if (pi.is_empty()) continue; // Does not have the PG yet + if (pi.last_update == pi.last_complete && // peer has no missing + pi.last_update == pg->info.last_update) { // peer is up to date + // replica has no missing and identical log as us. no need to + // pull anything. + dout(10) << "osd" << *i << " has no missing, identical log" << dendl; + peer_missing[*i]; + pg->search_for_missing(pg->peer_info[*i], &peer_missing[*i], *i); + } + dout(10) << "Requesting missing from osd" << *i << dendl; + context< RecoveryMachine >().send_query(*i, + Query(Query::LOG, + eversion_t(pi.history.last_epoch_started, 0), + pg->info.history)); + peer_missing_requested.insert(*i); + } + + if (peer_missing_requested.empty()) { + post_event(Activate()); + } +} + +boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt) { + PG *pg = context< RecoveryMachine >().pg; + dout(10) << "GetMissing: Got a missing from osd" << logevt.from + << "waiting on " << peer_missing_requested.size() - 1 + << " more" << dendl; + MOSDPGLog *msg = logevt.msg; + peer_missing_requested.erase(logevt.from); + pg->proc_replica_log(*context().get_cur_transaction(), + msg->info, msg->log, msg->missing, logevt.from); + + if (peer_missing_requested.empty()) { + post_event(Activate()); + } + return discard_event(); +}; + +/*----Public Methods-----*/ +void PG::RecoveryState::handle_notify(int from, PG::Info& i, + RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(MNotifyRec(from, i)); + end_handle(); +} + +void PG::RecoveryState::handle_info(int from, PG::Info& i, + RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(MInfoRec(from, i)); + end_handle(); +} + +void PG::RecoveryState::handle_log(int from, + MOSDPGLog *msg, + RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(MLogRec(from, msg)); + end_handle(); +} + +void PG::RecoveryState::handle_query(int from, const PG::Query& q, + RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(MQuery(from, q)); + end_handle(); +} + +void PG::RecoveryState::handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, + RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(AdvMap(osdmap, lastmap)); + end_handle(); +} + +void PG::RecoveryState::handle_activate_map(RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(ActMap()); + end_handle(); +} + +void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(BacklogComplete()); + end_handle(); +} + +void PG::RecoveryState::handle_create(RecoveryCtx *rctx) +{ + start_handle(rctx); + machine.process_event(Initialize()); + end_handle(); +} +/*---------------------------------------------------*/ #undef dout_prefix #define dout_prefix (*_dout << pg->gen_prefix() << "PgPriorSet: ") diff --git a/src/osd/PG.h b/src/osd/PG.h index 1a1c159d1ed..90f611b475a 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -27,6 +27,13 @@ #include "msg/Messenger.h" #include "messages/MOSDRepScrub.h" +#include +#include +#include +#include +#include +#include + #include "common/DecayCounter.h" #include @@ -47,6 +54,7 @@ class MOSDOp; class MOSDSubOp; class MOSDSubOpReply; class MOSDPGInfo; +class MOSDPGLog; struct PGPool { @@ -766,7 +774,320 @@ public: friend std::ostream& operator<<(std::ostream& oss, const struct PgPriorSet &prior); - std::auto_ptr < PgPriorSet > prior_set; +public: + struct RecoveryCtx { + map< int, map > *query_map; + map< int, MOSDPGInfo* > *info_map; + map< int, vector > *notify_list; + list< Context* > *context_list; + ObjectStore::Transaction *transaction; + RecoveryCtx() : query_map(0), info_map(0), notify_list(0), + context_list(0), transaction(0) {} + RecoveryCtx(map< int, map > *query_map, + map< int, MOSDPGInfo* > *info_map, + map< int, vector > *notify_list, + list< Context* > *context_list, + ObjectStore::Transaction *transaction) + : query_map(query_map), info_map(info_map), + notify_list(notify_list), + context_list(context_list), transaction(transaction) {} + }; + + /* Encapsulates PG recovery process */ + class RecoveryState { + void start_handle(RecoveryCtx *new_ctx) { + assert(!rctx); + rctx = new_ctx; + } + + void end_handle() { + rctx = 0; + } + + struct MInfoRec : boost::statechart::event< MInfoRec > { + int from; + Info &info; + MInfoRec(int from, Info &info) : + from(from), info(info) {} + }; + + struct MLogRec : boost::statechart::event< MLogRec > { + int from; + MOSDPGLog *msg; + MLogRec(int from, MOSDPGLog *msg) : + from(from), msg(msg) {} + }; + + struct MNotifyRec : boost::statechart::event< MNotifyRec > { + int from; + Info &info; + MNotifyRec(int from, Info &info) : + from(from), info(info) {} + }; + + struct MQuery : boost::statechart::event< MQuery > { + int from; + const Query &query; + MQuery(int from, const Query &query): + from(from), query(query) {} + }; + + struct AdvMap : boost::statechart::event< AdvMap > { + OSDMap &osdmap; + OSDMap &lastmap; + AdvMap(OSDMap &osdmap, OSDMap &lastmap): + osdmap(osdmap), lastmap(lastmap) {} + }; + + struct BacklogComplete : boost::statechart::event< BacklogComplete > {}; + struct ActMap : boost::statechart::event< ActMap > {}; + struct Activate : boost::statechart::event< Activate > {}; + struct Initialize : boost::statechart::event< Initialize > {}; + + + /* States */ + struct Initial; + class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > { + RecoveryState *state; + public: + PG *pg; + + RecoveryMachine(RecoveryState *state, PG *pg) : state(state), pg(pg) {} + + /* Accessor functions for state methods */ + ObjectStore::Transaction* get_cur_transaction() { + assert(state->rctx->transaction); + return state->rctx->transaction; + } + + void send_query(int to, const Query &query) { + assert(state->rctx->query_map); + (*state->rctx->query_map)[to][pg->info.pgid] = query; + } + + map > *get_query_map() { + assert(state->rctx->query_map); + return state->rctx->query_map; + } + + map *get_info_map() { + assert(state->rctx->info_map); + return state->rctx->info_map; + } + + list< Context* > *get_context_list() { + assert(state->rctx->context_list); + return state->rctx->context_list; + } + + void send_notify(int to, const Info &info) { + assert(state->rctx->notify_list); + (*state->rctx->notify_list)[to].push_back(info); + } + }; + friend class RecoveryMachine; + + /* States */ + struct Crashed : + boost::statechart::state< Crashed, RecoveryMachine > { + Crashed(my_context ctx) : my_base(ctx) { assert(0); } + }; + + struct Started; + struct Initial : + boost::statechart::simple_state< Initial, RecoveryMachine > { + typedef boost::mpl::list < + boost::statechart::transition< Initialize, Started > + > reactions; + }; + + struct Reset : + boost::statechart::state< Reset, RecoveryMachine > { + typedef boost::mpl::list < + boost::statechart::custom_reaction< AdvMap >, + boost::statechart::custom_reaction< ActMap > + > reactions; + + /* Entry function for RecoveryMachine. Should initialize relevant peering + * state */ + Reset(my_context ctx); + boost::statechart::result react(const AdvMap&); + boost::statechart::result react(const ActMap&); + }; + + struct Start; + struct Started : + boost::statechart::simple_state< Started, RecoveryMachine, Start > { + typedef boost::mpl::list < + boost::statechart::custom_reaction< AdvMap > + > reactions; + + boost::statechart::result react(const AdvMap&); + }; + + struct MakePrimary : boost::statechart::event< MakePrimary > {}; + struct MakeStray : boost::statechart::event< MakeStray > {}; + struct Primary; + struct Stray; + struct Start : + boost::statechart::state< Start, Started > { + typedef boost::mpl::list < + boost::statechart::transition< MakePrimary, Primary >, + boost::statechart::transition< MakeStray, Stray > + > reactions; + Start(my_context ctx); + }; + + struct Peering; + struct Pending; + struct NeedNewMap : boost::statechart::event< NeedNewMap > {}; + struct Primary : + boost::statechart::simple_state< Primary, Started, Peering > { + typedef boost::mpl::list < + boost::statechart::custom_reaction< ActMap >, + boost::statechart::custom_reaction< BacklogComplete >, + boost::statechart::custom_reaction< MNotifyRec >, + boost::statechart::transition< NeedNewMap, Pending > + > reactions; + boost::statechart::result react(const BacklogComplete&); + boost::statechart::result react(const ActMap&); + boost::statechart::result react(const MNotifyRec&); + }; + + struct Pending : + boost::statechart::simple_state< Pending, Primary> {}; + + + struct GetInfo; + struct Active; + struct Peering : + boost::statechart::state< Peering, Primary, GetInfo > { + typedef boost::mpl::list < + boost::statechart::transition< Activate, Active >, + boost::statechart::custom_reaction< AdvMap > + > reactions; + std::auto_ptr< PgPriorSet > prior_set; + + Peering(my_context ctx); + boost::statechart::result react(const AdvMap &advmap); + void exit(); + }; + + struct Active : + boost::statechart::state< Active, Primary > { + typedef boost::mpl::list < + boost::statechart::custom_reaction< ActMap >, + boost::statechart::custom_reaction< AdvMap >, + boost::statechart::custom_reaction< MInfoRec >, + boost::statechart::custom_reaction< MNotifyRec > + > reactions; + + Active(my_context ctx); + boost::statechart::result react(const ActMap&); + boost::statechart::result react(const AdvMap&); + boost::statechart::result react(const MInfoRec& infoevt); + boost::statechart::result react(const MNotifyRec& notevt); + }; + + struct ReplicaActive : boost::statechart::state< ReplicaActive, Started > { + typedef boost::mpl::list < + boost::statechart::transition< MQuery, Crashed >, + boost::statechart::custom_reaction< MInfoRec > + > reactions; + + ReplicaActive(my_context ctx); + boost::statechart::result react(const MInfoRec& infoevt); + }; + + struct Stray : boost::statechart::state< Stray, Started > { + bool backlog_requested; + map pending_queries; + typedef boost::mpl::list < + boost::statechart::custom_reaction< MQuery >, + boost::statechart::custom_reaction< MLogRec >, + boost::statechart::custom_reaction< MInfoRec >, + boost::statechart::custom_reaction< BacklogComplete >, + boost::statechart::transition< Activate, ReplicaActive > + > reactions; + + Stray(my_context ctx); + + boost::statechart::result react(const MQuery& query); + boost::statechart::result react(const BacklogComplete&); + boost::statechart::result react(const MLogRec& logevt); + boost::statechart::result react(const MInfoRec& infoevt); + }; + + struct GetLog; + struct GotInfo : boost::statechart::event< GotInfo > {}; + struct GetInfo : + boost::statechart::state< GetInfo, Peering > { + set peer_info_requested; + typedef boost::mpl::list < + boost::statechart::transition< GotInfo, GetLog >, + boost::statechart::custom_reaction< MNotifyRec >, + boost::statechart::transition< MLogRec, Crashed >, + boost::statechart::transition< BacklogComplete, Crashed > + > reactions; + + GetInfo(my_context ctx); + boost::statechart::result react(const MNotifyRec& infoevt); + }; + + struct GetMissing; + struct GotLog : boost::statechart::event< GotLog > {}; + struct GetLog : + boost::statechart::state< GetLog, Peering > { + int newest_update_osd; + bool need_backlog; + bool wait_on_backlog; + MOSDPGLog *msg; + typedef boost::mpl::list < + boost::statechart::custom_reaction< MLogRec >, + boost::statechart::custom_reaction< BacklogComplete >, + boost::statechart::transition< GotLog, GetMissing > + > reactions; + + GetLog(my_context ctx); + boost::statechart::result react(const MLogRec& logevt); + boost::statechart::result react(const BacklogComplete&); + void exit(); + ~GetLog(); + }; + + struct GetMissing : + boost::statechart::state< GetMissing, Peering > { + set peer_missing_requested; + typedef boost::mpl::list < + boost::statechart::custom_reaction< MLogRec > + > reactions; + + GetMissing(my_context ctx); + boost::statechart::result react(const MLogRec& logevt); + }; + + RecoveryMachine machine; + PG *pg; + RecoveryCtx *rctx; + + public: + RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) { + machine.initiate(); + } + + void handle_notify(int from, Info& i, RecoveryCtx *ctx); + void handle_info(int from, Info& i, RecoveryCtx *ctx); + void handle_log(int from, + MOSDPGLog *msg, + RecoveryCtx *ctx); + void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx); + void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, RecoveryCtx *ctx); + void handle_activate_map(RecoveryCtx *ctx); + void handle_backlog_generated(RecoveryCtx *ctx); + void handle_create(RecoveryCtx *ctx); + } recovery_state; + +protected: bool need_up_thru; set stray_set; // non-acting osds that have PG data. @@ -828,9 +1149,9 @@ public: void generate_past_intervals(); void trim_past_intervals(); - void build_prior(); + void build_prior(std::auto_ptr &prior_set); void clear_prior(); - bool prior_set_affected(const OSDMap *map) const; + bool prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const; bool all_unfound_are_lost(const OSDMap* osdmap) const; void mark_obj_as_lost(ObjectStore::Transaction& t, @@ -856,6 +1177,9 @@ public: void proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from); + void proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, + Missing& omissing, int from); + void proc_replica_info(int from, Info &info); bool merge_old_entry(ObjectStore::Transaction& t, Log::Entry& oe); void merge_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, int from); void search_for_missing(const Info &oinfo, const Missing *omissing, @@ -892,6 +1216,8 @@ public: void _activate_committed(epoch_t e); void all_activated_and_committed(); + void process_primary_info(ObjectStore::Transaction &t, const Info &info); + bool have_unfound() const { return missing.num_missing() > missing_loc.size(); } @@ -977,6 +1303,7 @@ public: role(0), state(0), have_master_log(true), + recovery_state(this), need_up_thru(false), pg_stats_lock("PG::pg_stats_lock"), pg_stats_valid(false), @@ -1056,11 +1383,24 @@ public: void warm_restart(); + void fulfill_info(int from, const Query &query, + pair ¬ify_info); + void fulfill_log(int from, const Query &query); bool acting_up_affected(OSDMap &osdmap, const OSDMap &lastmap); + // abstract bits - virtual bool handle_advance_map(OSDMap &osdmap, - const OSDMap &lastmap); + virtual void handle_notify(int from, PG::Info& i, RecoveryCtx *ctx) = 0; + virtual void handle_info(int from, PG::Info& i, RecoveryCtx *ctx) = 0; + virtual void handle_log(int from, + MOSDPGLog *msg, + RecoveryCtx *ctx) = 0; + virtual void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx) = 0; + virtual void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, + RecoveryCtx *ctx) = 0; + virtual void handle_activate_map(RecoveryCtx *ctx) = 0; + virtual void handle_backlog_generated(RecoveryCtx *ctx) = 0; + virtual void handle_create(RecoveryCtx *ctx) = 0; virtual void do_op(MOSDOp *op) = 0; virtual void do_sub_op(MOSDSubOp *op) = 0; virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index cd70f783c51..ef83da2331e 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -662,6 +662,35 @@ public: { } ~ReplicatedPG() {} + + void handle_notify(int from, PG::Info& i, RecoveryCtx *rctx) { + recovery_state.handle_notify(from, i, rctx); + } + void handle_info(int from, PG::Info& i, RecoveryCtx *rctx) { + recovery_state.handle_info(from, i, rctx); + } + void handle_log(int from, + MOSDPGLog *msg, + RecoveryCtx *rctx) { + recovery_state.handle_log(from, msg, rctx); + } + void handle_query(int from, const PG::Query& q, RecoveryCtx *rctx) { + recovery_state.handle_query(from, q, rctx); + } + void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, + RecoveryCtx *rctx) { + recovery_state.handle_advance_map(osdmap, lastmap, rctx); + } + void handle_activate_map(RecoveryCtx *rctx) { + recovery_state.handle_activate_map(rctx); + } + void handle_backlog_generated(RecoveryCtx *rctx) { + recovery_state.handle_backlog_generated(rctx); + } + void handle_create(RecoveryCtx *rctx) { + recovery_state.handle_create(rctx); + } + void do_op(MOSDOp *op); void do_pg_op(MOSDOp *op); void do_sub_op(MOSDSubOp *op);