// kick any waiters
wake_pg_waiters(pg->info.pgid);
+
+ map<int, map<pg_t, PG::Query> > query_map;
+ PG::RecoveryCtx rctx(&query_map, 0, 0, 0, 0); // GetInfo only needs a query_map
+ pg->handle_create(&rctx);
+ do_queries(query_map);
} else {
// already had it. did the mapping change?
pg = _lookup_lock_pg(info.pgid);
PG *pg = it->second;
pg->lock();
- if (pg->handle_advance_map(*osdmap, *lastmap)) {
- pg->unlock();
- continue;
- }
-
- // make sure we clear out any pg_temp change requests
- pg_temp_wanted.erase(pgid);
- pg->prior_set.reset(NULL);
-
-
- pg->cancel_recovery();
-
-
- // sanity check pg_temp
- if (pg->acting.empty() && pg->up.size() && pg->up[0] == whoami) {
- dout(10) << *pg << " acting empty, but i am up[0], clearing pg_temp" << dendl;
- queue_want_pg_temp(pg->info.pgid, pg->acting);
- }
-
+ dout(10) << "Scanning pg " << *pg << dendl;
+ pg->handle_advance_map(*osdmap, *lastmap, 0);
pg->unlock();
}
}
map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
map<int,MOSDPGInfo*> info_map; // peer -> message
- epoch_t up_thru = osdmap->get_up_thru(whoami);
-
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
epoch_t oldest_last_clean = osdmap->get_epoch();
it++) {
PG *pg = it->second;
pg->lock();
- pg->check_recovery_op_pulls(osdmap);
if (pg->is_primary())
num_pg_primary++;
if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean)
oldest_last_clean = pg->info.history.last_epoch_clean;
- if (g_conf.osd_check_for_log_corruption)
- pg->check_log_for_corruption(store);
-
- if (pg->is_active() && pg->is_primary() &&
- (pg->missing.num_missing() > pg->missing_loc.size())) {
- if (pg->all_unfound_are_lost(osdmap)) {
- pg->mark_all_unfound_as_lost(t);
- }
- }
-
if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
//pool is deleted!
queue_pg_for_deletion(pg);
pg->unlock();
continue;
}
- if (pg->is_active()) {
- // i am active
- if (pg->is_primary() &&
- !pg->snap_trimq.empty() &&
- pg->is_clean())
- pg->queue_snap_trim();
- }
- else if (pg->is_primary() &&
- !pg->is_active()) {
- // i am (inactive) primary
- if ((!pg->is_peering() && !pg->is_replay()) ||
- (pg->need_up_thru && up_thru >= pg->info.history.same_acting_since))
- pg->do_peer(t, tfin, query_map, &info_map);
- }
- else if (pg->is_stray() &&
- pg->get_primary() >= 0) {
- // i am residual|replica
- notify_list[pg->get_primary()].push_back(pg->info);
- }
- if (pg->is_primary())
- pg->update_stats();
+ PG::RecoveryCtx rctx(&query_map, &info_map, ¬ify_list, &tfin, &t);
+ pg->handle_activate_map(&rctx);
+
pg->unlock();
}
wake_pg_waiters(pg->info.pgid);
- pg->do_peer(*t, fin->contexts, query_map, &info_map);
+ PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+ pg->handle_create(&rctx);
+
pg->update_stats();
pg->unlock();
created++;
creating_pgs.erase(pgid);
wake_pg_waiters(pg->info.pgid);
- pg->do_peer(*t, fin->contexts, query_map, &info_map);
+ PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+ pg->handle_create(&rctx);
pg->update_stats();
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
if (!pg)
continue;
- if (pg->peer_info.count(from) &&
- pg->peer_info[from].last_update == it->last_update) {
- dout(10) << *pg << " got dup osd" << from << " info " << *it << ", identical to ours" << dendl;
- } else if (pg->peer_info.count(from) &&
- pg->is_active()) {
- dout(10) << *pg << " got dup osd" << from << " info " << *it
- << " but pg is active, keeping our info " << pg->peer_info[from]
- << dendl;
- } else {
- dout(10) << *pg << " got osd" << from << " info " << *it << dendl;
- pg->peer_info[from] = *it;
- pg->might_have_unfound.insert(from);
-
- unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
- pg->info.history.merge(it->history);
- reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
-
- // stray?
- if (!pg->is_acting(from)) {
- dout(10) << *pg << " osd" << from << " has stray content: " << *it << dendl;
- pg->stray_set.insert(from);
- pg->state_clear(PG_STATE_CLEAN);
- }
-
- pg->do_peer(*t, fin->contexts, query_map, &info_map);
- pg->update_stats();
- }
-
- if (pg->is_active() && pg->have_unfound()) {
- // Make sure we've requested MISSING information from every OSD
- // we know about.
- pg->discover_all_missing(query_map);
- }
+ PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+ pg->handle_notify(from, *it, &rctx);
int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
assert(tr == 0);
m->put();
}
-
-
-/** PGLog
- * from non-primary to primary
- * includes log and info
- * from primary to non-primary
- * includes log for use in recovery
- * NOTE: called with opqueue active.
- */
-
-void OSD::_process_pg_info(epoch_t epoch, int from,
- PG::Info &info,
- PG::Log &log,
- PG::Missing *missing,
- map< int, map<pg_t,PG::Query> >& query_map,
- map<int, MOSDPGInfo*>* info_map,
- int& created)
-{
- ObjectStore::Transaction *t;
- C_Contexts *fin;
- PG *pg;
-
- pg = get_or_create_pg(info, epoch, from, created, false, &t, &fin);
- if (!pg)
- return;
-
- dout(10) << *pg << ": " << __func__ << " info: " << info << ", ";
- if (log.empty())
- *_dout << "(log omitted)";
- else
- *_dout << "log: " << log;
- *_dout << ", ";
- if (!missing)
- *_dout << "(missing omitted)";
- else
- *_dout << "missing: " << *missing;
- *_dout << dendl;
-
- // don't update history (yet) if we are active and primary; the replica
- // may be telling us they have activated (and committed) but we can't
- // share that until _everyone_ does the same.
- if (pg->is_active() && pg->is_primary() && pg->is_acting(from) &&
- pg->info.history.last_epoch_started < pg->info.history.same_acting_since &&
- info.history.last_epoch_started >= pg->info.history.same_acting_since) {
- dout(10) << " peer osd" << from << " activated and committed" << dendl;
- pg->peer_activated.insert(from);
- if (pg->peer_activated.size() == pg->acting.size())
- pg->all_activated_and_committed();
- } else {
- unreg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
- pg->info.history.merge(info.history);
- reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
- }
-
- // dump log
- dout(15) << *pg << " my log = ";
- pg->log.print(*_dout);
- *_dout << std::endl;
- if (!log.empty()) {
- *_dout << *pg << " osd" << from << " log = ";
- log.print(*_dout);
- }
- *_dout << dendl;
-
- if (pg->is_primary()) {
- // i am PRIMARY
- if (pg->is_active()) {
- // PG is ACTIVE
- if (!pg->is_clean()) {
- dout(10) << *pg << " searching osd" << from << " log for unfound items." << dendl;
- pg->search_for_missing(info, missing, from);
- if (pg->have_unfound()) {
- // Make sure we've requested MISSING information from every OSD
- // we know about.
- pg->discover_all_missing(query_map);
- }
- queue_for_recovery(pg); // in case we found something.
- }
- }
- else if (missing) {
- // PG is INACTIVE
- pg->proc_replica_log(*t, info, log, *missing, from);
-
- // peer
- pg->do_peer(*t, fin->contexts, query_map, info_map);
- pg->update_stats();
- }
- } else if (!pg->info.dne()) {
- if (!pg->is_active()) {
- // INACTIVE REPLICA
- assert(from == pg->acting[0]);
- pg->merge_log(*t, info, log, from);
-
- // We should have the right logs before activating.
- assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
- assert(pg->log.head == pg->info.last_update);
-
- pg->activate(*t, fin->contexts, query_map, info_map);
- } else {
- // ACTIVE REPLICA
- assert(pg->is_replica());
-
- // just update our stats
- dout(10) << *pg << " writing updated stats" << dendl;
- pg->info.stats = info.stats;
-
- // Handle changes to purged_snaps
- interval_set<snapid_t> p;
- p.union_of(info.purged_snaps, pg->info.purged_snaps);
- p.subtract(pg->info.purged_snaps);
- pg->info.purged_snaps = info.purged_snaps;
- if (!p.empty()) {
- dout(10) << " purged_snaps " << pg->info.purged_snaps
- << " -> " << info.purged_snaps
- << " removed " << p << dendl;
- pg->adjust_local_snaps(*t, p);
- }
- }
-
- pg->write_info(*t);
-
- if (!log.empty()) {
- dout(10) << *pg << ": inactive replica merging new PG log entries" << dendl;
- pg->merge_log(*t, info, log, from);
- }
- }
-
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
-
- pg->unlock();
-}
-
-
void OSD::handle_pg_log(MOSDPGLog *m)
{
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
return;
int from = m->get_source().num();
- int created = 0;
if (!require_same_or_newer_map(m, m->get_epoch())) return;
+ int created = 0;
+ ObjectStore::Transaction *t;
+ C_Contexts *fin;
+ PG *pg = get_or_create_pg(m->info, m->get_epoch(),
+ from, created, false, &t, &fin);
+ if (!pg)
+ return;
+
map< int, map<pg_t,PG::Query> > query_map;
- _process_pg_info(m->get_epoch(), from,
- m->info, m->log, &m->missing, query_map, 0,
- created);
+ map< int, MOSDPGInfo* > info_map;
+ PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+ pg->handle_log(from, m, &rctx);
+ pg->unlock();
do_queries(query_map);
+ do_infos(info_map);
+
+ int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
+ assert(!tr);
+
if (created)
update_heartbeat_peers();
-
m->put();
}
int from = m->get_source().num();
if (!require_same_or_newer_map(m, m->get_epoch())) return;
+ map< int, MOSDPGInfo* > info_map;
- PG::Log empty_log;
- map<int,MOSDPGInfo*> info_map;
int created = 0;
- map< int, map<pg_t,PG::Query> > query_map;
for (vector<PG::Info>::iterator p = m->pg_info.begin();
p != m->pg_info.end();
- ++p)
- _process_pg_info(m->get_epoch(), from, *p, empty_log, NULL, query_map, &info_map, created);
+ ++p) {
+ ObjectStore::Transaction *t = 0;
+ C_Contexts *fin = 0;
+ PG *pg = get_or_create_pg(*p, m->get_epoch(),
+ from, created, false, &t, &fin);
+ PG::RecoveryCtx rctx(0, &info_map, 0, &fin->contexts, t);
+ if (!pg)
+ continue;
+ pg->handle_info(from, *p, &rctx);
+
+ int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
+ assert(!tr);
+
+ pg->unlock();
+ }
- do_queries(query_map);
do_infos(info_map);
if (created)
update_heartbeat_peers();
void OSD::handle_pg_missing(MOSDPGMissing *m)
{
+ assert(0); // MOSDPGMissing is fantastical
+#if 0
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
if (!require_osd_peer(m))
map< int, map<pg_t,PG::Query> > query_map;
PG::Log empty_log;
int created = 0;
- _process_pg_info(m->get_epoch(), from, m->info,
+ _pro-cess_pg_info(m->get_epoch(), from, m->info, //misspelling added to prevent erroneous finds
empty_log, &m->missing, query_map, NULL, created);
do_queries(query_map);
if (created)
update_heartbeat_peers();
m->put();
+#endif
}
/** PGQuery
if (!require_same_or_newer_map(m, m->get_epoch())) return;
- int created = 0;
map< int, vector<PG::Info> > notify_list;
for (map<pg_t,PG::Query>::iterator it = m->pg_list.begin();
PG::Info empty(pgid);
notify_list[from].push_back(empty);
continue;
- } else {
- pg = _lookup_lock_pg(pgid);
- if (m->get_epoch() < pg->info.history.same_acting_since) {
- dout(10) << *pg << " handle_pg_query changed in "
- << pg->info.history.same_acting_since
- << " (msg from " << m->get_epoch() << ")" << dendl;
- pg->unlock();
- continue;
- }
+ }
+
+ pg = _lookup_lock_pg(pgid);
+ if (m->get_epoch() < pg->info.history.same_acting_since) {
+ dout(10) << *pg << " handle_pg_query changed in "
+ << pg->info.history.same_acting_since
+ << " (msg from " << m->get_epoch() << ")" << dendl;
+ pg->unlock();
+ continue;
}
if (pg->deleting) {
reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
// ok, process query!
- assert(!pg->acting.empty());
- assert(from == pg->acting[0]);
-
- if (it->second.type == PG::Query::INFO) {
- // info
- dout(10) << *pg << " sending info" << dendl;
- notify_list[from].push_back(pg->info);
- } else {
- if (it->second.type == PG::Query::BACKLOG &&
- !pg->log.backlog) {
- dout(10) << *pg << " requested info+missing+backlog - queueing for backlog" << dendl;
- queue_generate_backlog(pg);
- } else {
- MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), pg->info);
- mlog->missing = pg->missing;
-
- // primary -> other, when building master log
- if (it->second.type == PG::Query::LOG) {
- dout(10) << *pg << " sending info+missing+log since " << it->second.since
- << dendl;
- mlog->log.copy_after(pg->log, it->second.since);
- }
-
- if (it->second.type == PG::Query::BACKLOG) {
- dout(10) << *pg << " sending info+missing+backlog" << dendl;
- assert(pg->log.backlog);
- mlog->log = pg->log;
- }
- else if (it->second.type == PG::Query::FULLLOG) {
- dout(10) << *pg << " sending info+missing+full log" << dendl;
- mlog->log.copy_non_backlog(pg->log);
- }
-
- dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl;
- //m->log.print(cout);
-
- _share_map_outgoing(osdmap->get_cluster_inst(from));
- cluster_messenger->send_message(mlog, m->get_connection());
- }
- }
-
+ PG::RecoveryCtx rctx(0, 0, ¬ify_list, 0, 0);
+ pg->handle_query(from, it->second, &rctx);
pg->unlock();
}
do_notifies(notify_list);
m->put();
-
- if (created)
- update_heartbeat_peers();
}
pg->lock();
dout(10) << *pg << " generate_backlog" << dendl;
+ int tr;
+ map< int, map<pg_t,PG::Query> > query_map;
+ map< int, MOSDPGInfo* > info_map;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ C_Contexts *fin = new C_Contexts;
+ PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
+
if (!pg->generate_backlog_epoch) {
dout(10) << *pg << " generate_backlog was canceled" << dendl;
goto out;
goto out2;
}
- if (!pg->is_primary()) {
- dout(10) << *pg << " sending info+missing+backlog to primary" << dendl;
- assert(!pg->is_active()); // for now
- MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
- m->missing = pg->missing;
- m->log = pg->log;
- _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
- cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
- } else {
- dout(10) << *pg << " generated backlog, peering" << dendl;
-
- map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *fin = new C_Contexts;
- pg->do_peer(*t, fin->contexts, query_map, NULL);
- do_queries(query_map);
- pg->write_info(*t);
- pg->write_log(*t);
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
- }
+ pg->handle_backlog_generated(&rctx);
+ do_queries(query_map);
+ do_infos(info_map);
+ pg->write_info(*t);
+ pg->write_log(*t);
+ tr = store->queue_transaction(&pg->osr, t,
+ new ObjectStore::C_DeleteTransaction(t), fin);
+ assert(!tr);
out2:
map_lock.put_read();
}
-void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from)
+void PG::proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from)
{
dout(10) << "proc_replica_log for osd" << from << ": " << olog << " " << omissing << dendl;
- assert(!is_active());
+ assert(!is_active() && is_primary());
- if (!have_master_log) {
- // merge log into our own log to build master log. no need to
- // make any adjustments to their missing map; we are taking their
- // log to be authoritative (i.e., their entries are by definitely
- // non-divergent).
- merge_log(t, oinfo, olog, from);
+ // merge log into our own log to build master log. no need to
+ // make any adjustments to their missing map; we are taking their
+ // log to be authoritative (i.e., their entries are by definitely
+ // non-divergent).
+ merge_log(t, oinfo, olog, from);
+ peer_info[from] = oinfo;
+ dout(10) << " peer osd" << from << " now " << oinfo << " " << omissing << dendl;
+ might_have_unfound.insert(from);
- } else if (is_acting(from)) {
- // replica. have master log.
- // populate missing; check for divergence
+ search_for_missing(oinfo, &omissing, from);
+ peer_missing[from].swap(omissing);
+}
- /*
- basically what we're doing here is rewinding the remote log,
- dropping divergent entries, until we find something that matches
- our master log. we then reset last_update to reflect the new
- point up to which missing is accurate.
-
- later, in activate(), missing will get wound forward again and
- we will send the peer enough log to arrive at the same state.
- */
-
- list<Log::Entry>::const_reverse_iterator pp = olog.log.rbegin();
- eversion_t lu(oinfo.last_update);
- while (true) {
- if (pp == olog.log.rend()) {
- lu = olog.tail;
- break;
- }
- const Log::Entry& oe = *pp;
+void PG::proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, Missing& omissing, int from) {
+ /*
+ basically what we're doing here is rewinding the remote log,
+ dropping divergent entries, until we find something that matches
+ our master log. we then reset last_update to reflect the new
+ point up to which missing is accurate.
- // don't continue past the tail of our log.
- if (oe.version <= log.tail)
- break;
+ later, in activate(), missing will get wound forward again and
+ we will send the peer enough log to arrive at the same state.
+ */
- if (!log.objects.count(oe.soid)) {
- dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
- ++pp;
- continue;
- }
+ for (map<sobject_t, Missing::item>::iterator i = omissing.missing.begin();
+ i != omissing.missing.end();
+ ++i) {
+ dout(10) << "Missing sobject: " << i->first << dendl;
+ }
+ list<Log::Entry>::const_reverse_iterator pp = olog.log.rbegin();
+ eversion_t lu(oinfo.last_update);
+ while (true) {
+ if (pp == olog.log.rend()) {
+ lu = olog.tail;
+ break;
+ }
+ const Log::Entry& oe = *pp;
+
+ // don't continue past the tail of our log.
+ if (oe.version <= log.tail)
+ break;
+
+ if (!log.objects.count(oe.soid)) {
+ dout(10) << " had " << oe << " new dne : divergent, ignoring" << dendl;
+ ++pp;
+ continue;
+ }
- Log::Entry& ne = *log.objects[oe.soid];
- if (ne.version == oe.version) {
- dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
- lu = pp->version;
- break;
- }
- if (ne.version > oe.version) {
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ Log::Entry& ne = *log.objects[oe.soid];
+ if (ne.version == oe.version) {
+ dout(10) << " had " << oe << " new " << ne << " : match, stopping" << dendl;
+ lu = pp->version;
+ break;
+ }
+ if (ne.version > oe.version) {
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ } else {
+ if (oe.is_delete()) {
+ if (ne.is_delete()) {
+ // old and new are delete
+ dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl;
+ } else {
+ // old delete, new update.
+ dout(20) << " had " << oe << " new " << ne << " : missing" << dendl;
+ omissing.add(ne.soid, ne.version, eversion_t());
+ }
} else {
- if (oe.is_delete()) {
- if (ne.is_delete()) {
- // old and new are delete
- dout(20) << " had " << oe << " new " << ne << " : both deletes" << dendl;
- } else {
- // old delete, new update.
- dout(20) << " had " << oe << " new " << ne << " : missing" << dendl;
- omissing.add(ne.soid, ne.version, eversion_t());
- }
+ if (ne.is_delete()) {
+ // old update, new delete
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ omissing.rm(oe.soid, oe.version);
} else {
- if (ne.is_delete()) {
- // old update, new delete
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.rm(oe.soid, oe.version);
- } else {
- // old update, new update
- dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
- omissing.revise_need(ne.soid, ne.version);
- }
+ // old update, new update
+ dout(10) << " had " << oe << " new " << ne << " : new will supercede" << dendl;
+ omissing.revise_need(ne.soid, ne.version);
}
}
+ }
- ++pp;
- }
+ ++pp;
+ }
- if (lu < oinfo.last_update) {
- dout(10) << " peer osd" << from << " last_update now " << lu << dendl;
- oinfo.last_update = lu;
- if (lu < oinfo.last_complete)
- oinfo.last_complete = lu;
- }
+ if (lu < oinfo.last_update) {
+ dout(10) << " peer osd" << from << " last_update now " << lu << dendl;
+ oinfo.last_update = lu;
+ if (lu < oinfo.last_complete)
+ oinfo.last_complete = lu;
}
peer_info[from] = oinfo;
might_have_unfound.insert(from);
search_for_missing(oinfo, &omissing, from);
+ for (map<sobject_t, Missing::item>::iterator i = omissing.missing.begin();
+ i != omissing.missing.end();
+ ++i) {
+ dout(10) << "Final Missing sobject: " << i->first << dendl;
+ }
peer_missing[from].swap(omissing);
}
+void PG::proc_replica_info(int from, Info &info)
+{
+ assert(is_primary());
+ peer_info[from] = info;
+ might_have_unfound.insert(from);
+
+ osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
+ info.history.merge(info.history);
+ osd->reg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
+
+ // stray?
+ if (!is_acting(from)) {
+ dout(10) << " osd" << from << " has stray content: " << info << dendl;
+ stray_set.insert(from);
+ if (is_clean()) {
+ purge_strays();
+ }
+ }
+ update_stats();
+}
+
/*
* merge an old (possibly divergent) log entry into the new log. this
// true if the given map affects the prior set
-bool PG::prior_set_affected(const OSDMap *osdmap) const
+bool PG::prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const
{
- const PgPriorSet &prior = *prior_set.get();
-
for (set<int>::iterator p = prior.cur.begin();
p != prior.cur.end();
++p)
share_pg_log(old_last_update);
}
-void PG::build_prior()
+void PG::build_prior(std::auto_ptr<PgPriorSet> &prior_set)
{
if (1) {
// sanity check
assert(info.history.last_epoch_started >= it->second.history.last_epoch_started);
}
}
- generate_past_intervals();
-
- state_clear(PG_STATE_CRASHED);
- state_clear(PG_STATE_DOWN);
-
stringstream out;
prior_set.reset(new PgPriorSet(osd->whoami,
*osd->osdmap,
dout(10) << out << dendl;
PgPriorSet &prior(*prior_set.get());
-
dout(10) << "build_prior: " << *this << " "
<< (prior.crashed ? " crashed":"")
<< (prior.pg_down ? " down":"")
// clear peering state
have_master_log = false;
- prior_set.reset(NULL);
stray_set.clear();
peer_info_requested.clear();
peer_log_requested.clear();
return true;
}
-// if false, stop.
-bool PG::recover_master_log(map< int, map<pg_t,Query> >& query_map,
- eversion_t &oldest_update)
-{
- dout(10) << "recover_master_log" << dendl;
-
- if (is_down())
- return false;
-
- if (peer_info_requested.empty()) {
- stringstream out;
- prior_set->gen_query_map(*osd->osdmap, info, query_map);
- dout(10) << out << dendl;
- for (map< int, map<pg_t, Query> >::const_iterator i = query_map.begin();
- i != query_map.end(); ++i) {
- peer_info_requested.insert(i->first);
- }
- }
-
- bool lack_info = false;
- for (set<int>::const_iterator it = prior_set->cur.begin();
- it != prior_set->cur.end();
- ++it) {
- if (peer_info.count(*it)) {
- dout(10) << " have info from osd" << *it
- << ": " << peer_info[*it]
- << dendl;
- continue;
- }
- lack_info = true;
-
- if (peer_info_requested.find(*it) != peer_info_requested.end()) {
- dout(10) << " waiting for osd" << *it << dendl;
- continue;
- }
- }
- if (lack_info)
- return false;
-
- // -- ok, we have all (prior_set) info. (and maybe others.)
- dout(10) << " have prior_set info. min_last_complete_ondisk " << min_last_complete_ondisk << dendl;
-
- bool need_backlog, wait_for_backlog;
- int pull_from;
- eversion_t newest_update;
- stringstream out;
- choose_log_location(need_backlog,
- wait_for_backlog, pull_from, newest_update,
- oldest_update);
- dout(10) << out << dendl;
-
- if (pull_from == -1) {
- pull_from = osd->whoami;
- }
-
- // -- do i need to generate backlog?
- if (need_backlog && !info.log_backlog) {
- osd->queue_generate_backlog(this);
- }
-
- // -- decide what acting set i want, based on state of up set
- if (!choose_acting(pull_from))
- return false;
-
- // gather log(+missing) from that person!
- if (pull_from != osd->whoami) {
- const Info& pi = peer_info[pull_from];
- if (pi.log_tail <= log.head) {
- if (peer_log_requested.count(pull_from)) {
- dout(10) << " newest update on osd" << pull_from
- << " v " << newest_update
- << ", already queried log"
- << dendl;
- } else {
- // we'd _like_ it back to oldest_update, but take what we can get.
- dout(10) << " newest update on osd" << pull_from
- << " v " << newest_update
- << ", querying since oldest_update " << oldest_update
- << dendl;
- query_map[pull_from][info.pgid] = Query(Query::LOG, oldest_update, info.history);
- peer_log_requested.insert(pull_from);
- }
- } else {
- dout(10) << " newest update on osd" << pull_from
- << ", whose log.tail " << pi.log_tail
- << " > my log.head " << log.head
- << ", i will need backlog for me+them." << dendl;
- // it's possible another peer could fill in the missing bits, but
- // pretty unlikely. someday it may be worth the complexity to
- // try. until then, just get the full backlogs.
- if (!log.backlog) {
- osd->queue_generate_backlog(this);
- return false;
- }
-
- if (peer_backlog_requested.count(pull_from)) {
- dout(10) << " newest update on osd" << pull_from
- << " v " << newest_update
- << ", already queried summary/backlog"
- << dendl;
- } else {
- dout(10) << " newest update on osd" << pull_from
- << " v " << newest_update
- << ", querying entire summary/backlog"
- << dendl;
- query_map[pull_from][info.pgid] = Query(Query::BACKLOG, info.history);
- peer_backlog_requested.insert(pull_from);
- }
- }
- return false;
- } else {
- dout(10) << " newest_update " << info.last_update << " (me)" << dendl;
- }
-
- dout(10) << " oldest_update " << oldest_update << dendl;
-
- have_master_log = true;
-
- return true;
-}
-
void PG::choose_log_location(bool &need_backlog,
bool &wait_on_backlog,
int &pull_from,
}
}
-void PG::do_peer(ObjectStore::Transaction& t, list<Context*>& tfin,
- map< int, map<pg_t,Query> >& query_map,
- map<int, MOSDPGInfo*> *activator_map)
-{
- dout(10) << "PG::do_peer: peer up " << up << ", acting "
- << acting << dendl;
-
- if (!is_active())
- state_set(PG_STATE_PEERING);
-
- if (!prior_set.get())
- build_prior();
-
- dout(10) << "PG::do_peer: peer prior_set is "
- << *prior_set << dendl;
-
- eversion_t oldest_update;
- if (!have_master_log) {
- if (!recover_master_log(query_map, oldest_update))
- return;
- }
- else {
- if (up != acting) {
- // are we done generating backlog(s)?
- if (!choose_acting(osd->whoami))
- return;
- }
- }
-
- /** COLLECT MISSING+LOG FROM PEERS **********/
- /*
- we also detect divergent replicas here by pulling the full log
- from everyone.
-
- for example:
- 0: 1: 2:
- 2'6 2'6 2'6
- 2'7 2'7 2'7
- 3'8 | 2'8 2'8
- 3'9 | 2'9
-
- */
-
- // gather missing from peers
- bool have_all_missing = true;
- for (unsigned i=1; i<acting.size(); i++) {
- int peer = acting[i];
- Info& pi = peer_info[peer];
- dout(10) << " peer osd" << peer << " " << pi << dendl;
-
- if (pi.is_empty())
- continue;
- if (peer_missing.find(peer) == peer_missing.end()) {
- if (pi.last_update == pi.last_complete && // peer has no missing
- pi.last_update == info.last_update) { // peer is up to date
- // replica has no missing and identical log as us. no need to
- // pull anything.
- dout(10) << " infering up to date and no missing (last_update==last_complete) for osd" << peer << dendl;
- peer_missing[peer].num_missing(); // just create the entry.
- search_for_missing(peer_info[peer], &peer_missing[peer], peer);
- continue;
- } else {
- dout(10) << " still need log+missing from osd" << peer << dendl;
- have_all_missing = false;
- }
- }
- if (peer_log_requested.find(peer) != peer_log_requested.end())
- continue;
- if (peer_backlog_requested.find(peer) != peer_backlog_requested.end())
- continue;
-
- assert(pi.last_update <= log.head);
-
- if (pi.last_update < log.tail) {
- // we need the full backlog in order to build this node's missing map.
- dout(10) << " osd" << peer << " last_update " << pi.last_update
- << " < log.tail " << log.tail
- << ", pulling missing+backlog" << dendl;
- query_map[peer][info.pgid] = Query(Query::BACKLOG, info.history);
- peer_backlog_requested.insert(peer);
- } else {
- // we need just enough log to get any divergent items so that we
- // can appropriate adjust the missing map. that can be as far back
- // as the peer's last_epoch_started.
- eversion_t from(pi.history.last_epoch_started, 0);
- dout(10) << " osd" << peer << " last_update " << pi.last_update
- << ", pulling missing+log from it's last_epoch_started " << from << dendl;
- query_map[peer][info.pgid] = Query(Query::LOG, from, info.history);
- peer_log_requested.insert(peer);
- }
- }
- if (!have_all_missing)
- return;
-
- {
- int num_missing = missing.num_missing();
- int num_locs = missing_loc.size();
- dout(10) << "num_missing = " << num_missing
- << ", num_unfound = " << (num_missing - num_locs) << dendl;
- }
-
- // sanity check
- assert(info.last_complete >= log.tail || log.backlog);
-
- // -- do need to notify the monitor?
- if (true) {
- // NOTE: we can skip the up_thru check if this is a new PG and there
- // were no prior intervals.
- if (info.history.epoch_created < info.history.same_acting_since &&
- osd->osdmap->get_up_thru(osd->whoami) < info.history.same_acting_since) {
- dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
- << " < same_since " << info.history.same_acting_since
- << ", must notify monitor" << dendl;
- need_up_thru = true;
- osd->queue_want_up_thru(info.history.same_acting_since);
- return;
- } else {
- dout(10) << "up_thru " << osd->osdmap->get_up_thru(osd->whoami)
- << " >= same_since " << info.history.same_acting_since
- << ", all is well" << dendl;
- }
- }
-
- // -- crash recovery?
- if (is_crashed()) {
- replay_until = g_clock.now();
- replay_until += g_conf.osd_replay_window;
- dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
- << " until " << replay_until << dendl;
- state_set(PG_STATE_REPLAY);
- osd->replay_queue_lock.Lock();
- osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
- osd->replay_queue_lock.Unlock();
- }
-
- if (!is_active()) {
- activate(t, tfin, query_map, activator_map);
- }
-
- if (is_all_uptodate())
- finish_recovery(t, tfin);
-}
-
/* Build the might_have_unfound set.
*
* This is used by the primary OSD during recovery.
map<int, MOSDPGInfo*> *activator_map)
{
assert(!is_active());
+ // -- crash recovery?
+ if (is_crashed()) {
+ replay_until = g_clock.now();
+ replay_until += g_conf.osd_replay_window;
+ dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
+ << " until " << replay_until << dendl;
+ state_set(PG_STATE_REPLAY);
+ osd->replay_queue_lock.Lock();
+ osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
+ osd->replay_queue_lock.Unlock();
+ }
// twiddle pg state
state_set(PG_STATE_ACTIVE);
need_up_thru = false;
- // clear prior set (and dependency info)... we are done peering!
- prior_set.reset(NULL);
-
// if we are building a backlog, cancel it!
if (up == acting)
osd->cancel_generate_backlog(this);
if (pi.last_update == info.last_update && !need_old_log_entries) {
// empty log
- if (activator_map) {
+ if (!pi.is_empty() && activator_map) {
dout(10) << "activate peer osd" << peer << " is up to date, queueing in pending_activators" << dendl;
if (activator_map->count(peer) == 0)
(*activator_map)[peer] = new MOSDPGInfo(osd->osdmap->get_epoch());
if (is_primary()) {
peer_activated.insert(osd->whoami);
- dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated << dendl;
+ dout(10) << "_activate_committed " << e << " peer_activated now " << peer_activated
+ << " last_epoch_started " << info.history.last_epoch_started
+ << " same_acting_since " << info.history.same_acting_since << dendl;
if (peer_activated.size() == acting.size())
all_activated_and_committed();
} else {
peer_info.erase(*p);
}
+ state_set(PG_STATE_CLEAN);
stray_set.clear();
// clear _requested maps; we may have to peer() again if we discover
}
}
-bool PG::handle_advance_map(OSDMap &osdmap,
- const OSDMap &lastmap)
+void PG::fulfill_info(int from, const Query &query,
+ pair<int, Info> ¬ify_info)
{
- if (acting_up_affected(osdmap, lastmap)) {
- return true;
+ assert(!acting.empty());
+ assert(from == acting[0]);
+ assert(query.type == PG::Query::INFO);
+
+ // info
+ dout(10) << "sending info" << dendl;
+ notify_info = make_pair(from, info);
+}
+
+void PG::fulfill_log(int from, const Query &query)
+{
+ assert(!acting.empty());
+ assert(from == acting[0]);
+ assert(query.type != PG::Query::INFO);
+ if (query.type == PG::Query::BACKLOG &&
+ !log.backlog) {
+ assert(0); // generated in the state machine
} else {
- warm_restart();
- return false;
+ MOSDPGLog *mlog = new MOSDPGLog(osd->osdmap->get_epoch(), info);
+ mlog->missing = missing;
+
+ // primary -> other, when building master log
+ if (query.type == PG::Query::LOG) {
+ dout(10) << " sending info+missing+log since " << query.since
+ << dendl;
+ mlog->log.copy_after(log, query.since);
+ }
+
+ if (query.type == PG::Query::BACKLOG) {
+ dout(10) << "sending info+missing+backlog" << dendl;
+ assert(log.backlog);
+ mlog->log = log;
+ }
+ else if (query.type == PG::Query::FULLLOG) {
+ dout(10) << " sending info+missing+full log" << dendl;
+ mlog->log.copy_non_backlog(log);
+ }
+
+ dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
+
+ osd->_share_map_outgoing(osd->osdmap->get_cluster_inst(from));
+ osd->cluster_messenger->send_message(mlog,
+ osd->osdmap->get_cluster_inst(from));
}
}
}
}
+void PG::process_primary_info(ObjectStore::Transaction &t, const Info &oinfo)
+{
+ assert(is_replica());
+ assert(is_active());
+ info.stats = oinfo.stats;
+
+ // Handle changes to purged_snaps
+ interval_set<snapid_t> p;
+ p.union_of(oinfo.purged_snaps, info.purged_snaps);
+ p.subtract(info.purged_snaps);
+ info.purged_snaps = oinfo.purged_snaps;
+ if (!p.empty()) {
+ dout(10) << " purged_snaps " << info.purged_snaps
+ << " -> " << oinfo.purged_snaps
+ << " removed " << p << dendl;
+ adjust_local_snaps(t, p);
+ }
+ write_info(t);
+}
+
unsigned int PG::Missing::num_missing() const
{
return missing.size();
<< "lost=" << prior.lost << " ]]";
return oss;
}
+
+/*------------ Recovery State Machine----------------*/
+#undef dout_prefix
+#define dout_prefix (*_dout << context< RecoveryMachine >().pg->gen_prefix() << "StateMachine: ")
+
+/*------Started-------*/
+boost::statechart::result
+PG::RecoveryState::Started::react(const AdvMap& advmap) {
+ dout(10) << "Started advmap" << dendl;
+ PG *pg = context< RecoveryMachine >().pg;
+ if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) {
+ dout(10) << "up or acting affected, transitioning to Reset" << dendl;
+ return transit< Reset >();
+ }
+ return discard_event();
+}
+
+/*--------Reset---------*/
+boost::statechart::result
+PG::RecoveryState::Reset::react(const AdvMap& advmap) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "Reset advmap" << dendl;
+ if (pg->acting_up_affected(advmap.osdmap, advmap.lastmap)) {
+ dout(10) << "up or acting affected, calling warm_restart again" << dendl;
+ pg->warm_restart();
+ }
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::Reset::react(const ActMap& actmap) {
+ PG *pg = context< RecoveryMachine >().pg;
+ if (pg->is_stray() && pg->get_primary() >= 0) {
+ context< RecoveryMachine >().send_notify(pg->get_primary(),
+ pg->info);
+ }
+ return transit< Started >();
+}
+
+PG::RecoveryState::Reset::Reset(my_context ctx) : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "Reseting" << dendl;
+ pg->warm_restart();
+ post_event(Initialize());
+}
+
+/*-------Start---------*/
+PG::RecoveryState::Start::Start(my_context ctx) : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ if (pg->is_primary()) {
+ dout(1) << "transitioning to Primary" << dendl;
+ post_event(MakePrimary());
+ } else { //is_stray
+ dout(1) << "transitioning to Stray" << dendl;
+ post_event(MakeStray());
+ }
+}
+
+/*---------Primary--------*/
+boost::statechart::result
+PG::RecoveryState::Primary::react(const BacklogComplete&) {
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->choose_acting(pg->osd->whoami);
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::Primary::react(const MNotifyRec& notevt) {
+ dout(7) << "handle_pg_notify from " << notevt.from << dendl;
+ PG *pg = context< RecoveryMachine >().pg;
+ if (pg->peer_info.count(notevt.from) &&
+ pg->peer_info[notevt.from].last_update == notevt.info.last_update) {
+ dout(10) << *pg << " got dup osd" << notevt.from << " info " << notevt.info
+ << ", identical to ours" << dendl;
+ } else {
+ pg->proc_replica_info(notevt.from, notevt.info);
+ }
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::Primary::react(const ActMap&) {
+ dout(7) << "handle ActMap primary" << dendl;
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->update_stats();
+ return forward_event();
+}
+
+/*---------Peering--------*/
+PG::RecoveryState::Peering::Peering(my_context ctx)
+ : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(!pg->is_active());
+ assert(!pg->is_peering());
+ assert(pg->is_primary());
+ pg->state_set(PG_STATE_PEERING);
+}
+
+boost::statechart::result
+PG::RecoveryState::Peering::react(const AdvMap& advmap) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "Peering advmap" << dendl;
+ if (pg->prior_set_affected(*prior_set.get(), &advmap.osdmap)) {
+ dout(1) << "Peering, priors_set_affected, going to Reset" << dendl;
+ return transit< Reset >();
+ }
+ return forward_event();
+}
+
+void PG::RecoveryState::Peering::exit() {
+ dout(10) << "Leaving Peering" << dendl;
+}
+
+/*---------Active---------*/
+PG::RecoveryState::Active::Active(my_context ctx) : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(pg->is_primary());
+ dout(10) << "In Active, about to call activate" << dendl;
+ pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
+ *context< RecoveryMachine >().get_context_list(),
+ *context< RecoveryMachine >().get_query_map(),
+ context< RecoveryMachine >().get_info_map());
+ assert(pg->is_active());
+ dout(10) << "Activate Finished" << dendl;
+}
+
+boost::statechart::result
+PG::RecoveryState::Active::react(const AdvMap& advmap) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "Active advmap" << dendl;
+ if (!pg->pool->newly_removed_snaps.empty()) {
+ pg->snap_trimq.union_of(pg->pool->newly_removed_snaps);
+ dout(10) << *pg << " snap_trimq now " << pg->snap_trimq << dendl;
+ pg->dirty_info = true;
+ }
+ return forward_event();
+}
+
+
+boost::statechart::result
+PG::RecoveryState::Active::react(const ActMap&) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "Active: handling ActMap" << dendl;
+ assert(pg->is_active());
+ assert(pg->is_primary());
+ pg->check_recovery_op_pulls(pg->osd->osdmap);
+
+ if (g_conf.osd_check_for_log_corruption)
+ pg->check_log_for_corruption(pg->osd->store);
+
+ if (pg->missing.num_missing() > pg->missing_loc.size()) {
+ if (pg->all_unfound_are_lost(pg->osd->osdmap)) {
+ pg->mark_all_unfound_as_lost(
+ *context< RecoveryMachine >().get_cur_transaction());
+ }
+ }
+
+ if (!pg->snap_trimq.empty() &&
+ pg->is_clean()) {
+ dout(10) << "Active: queuing snap trim" << dendl;
+ pg->queue_snap_trim();
+ }
+
+ if (pg->is_all_uptodate()) {
+ dout(10) << "Active: all up to date, going clean" << dendl;
+ pg->finish_recovery(*context< RecoveryMachine >().get_cur_transaction(),
+ *context< RecoveryMachine >().get_context_list());
+ }
+
+ pg->choose_acting(pg->osd->whoami);
+
+ return forward_event();
+}
+
+
+boost::statechart::result
+PG::RecoveryState::Active::react(const MNotifyRec& notevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(pg->is_active());
+ assert(pg->is_primary());
+ if (pg->peer_info.count(notevt.from)) {
+ dout(10) << "Active: got notify from " << notevt.from
+ << ", already have info from that osd, ignoring"
+ << dendl;
+ } else {
+ dout(10) << "Active: got notify from " << notevt.from
+ << ", calling proc_replica_info and discover_all_missing"
+ << dendl;
+ pg->proc_replica_info(notevt.from, notevt.info);
+ if (pg->have_unfound()) {
+ pg->discover_all_missing(*context< RecoveryMachine >().get_query_map());
+ }
+ }
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::Active::react(const MInfoRec& infoevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(pg->is_active());
+ assert(pg->is_primary());
+
+ // don't update history (yet) if we are active and primary; the replica
+ // may be telling us they have activated (and committed) but we can't
+ // share that until _everyone_ does the same.
+ if (pg->is_acting(infoevt.from)) {
+ assert(pg->info.history.last_epoch_started <
+ pg->info.history.same_acting_since);
+ assert(infoevt.info.history.last_epoch_started >=
+ pg->info.history.same_acting_since);
+ dout(10) << " peer osd" << infoevt.from << " activated and committed"
+ << dendl;
+ pg->peer_activated.insert(infoevt.from);
+ }
+
+ if (pg->peer_activated.size() == pg->acting.size()) {
+ pg->all_activated_and_committed();
+ }
+ return discard_event();
+}
+
+/*------ReplicaActive-----*/
+PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx)
+ : my_base(ctx) {
+ dout(10) << "In ReplicaActive, about to call activate" << dendl;
+ PG *pg = context< RecoveryMachine >().pg;
+ map< int, map< pg_t, Query> > query_map;
+ pg->activate(*context< RecoveryMachine >().get_cur_transaction(),
+ *context< RecoveryMachine >().get_context_list(),
+ query_map, NULL);
+ dout(10) << "Activate Finished" << dendl;
+}
+
+boost::statechart::result
+PG::RecoveryState::ReplicaActive::react(const MInfoRec& infoevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->process_primary_info(*context<RecoveryMachine>().get_cur_transaction(),
+ infoevt.info);
+ return discard_event();
+}
+
+/*-------Stray---*/
+PG::RecoveryState::Stray::Stray(my_context ctx)
+ : my_base(ctx), backlog_requested(false) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(!pg->is_active());
+ assert(!pg->is_peering());
+ assert(!pg->is_primary());
+ pg->state_set(PG_STATE_PEERING);
+}
+
+boost::statechart::result
+PG::RecoveryState::Stray::react(const MLogRec& logevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ MOSDPGLog *msg = logevt.msg;
+ dout(10) << "received log from " << logevt.from << dendl;
+ pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
+ msg->info, msg->log, logevt.from);
+
+ assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+ assert(pg->log.head == pg->info.last_update);
+
+ pg->write_info(*context<RecoveryMachine>().get_cur_transaction());
+ dout(10) << "activating!" << dendl;
+ post_event(Activate());
+ return discard_event();
+};
+
+boost::statechart::result
+PG::RecoveryState::Stray::react(const MInfoRec& infoevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "received info from " << infoevt.from << dendl;
+
+ Log empty_log;
+ pg->merge_log(*context<RecoveryMachine>().get_cur_transaction(),
+ infoevt.info, empty_log, infoevt.from);
+
+ assert(pg->log.tail <= pg->info.last_complete || pg->log.backlog);
+ assert(pg->log.head == pg->info.last_update);
+
+ pg->write_info(*context<RecoveryMachine>().get_cur_transaction());
+ dout(10) << "activating!" << dendl;
+ post_event(Activate());
+ return discard_event();
+};
+
+boost::statechart::result
+PG::RecoveryState::Stray::react(const BacklogComplete&) {
+ PG *pg = context< RecoveryMachine >().pg;
+ assert(backlog_requested);
+ dout(10) << "BacklogComplete" << dendl;
+ for (map<int, Query>::iterator i = pending_queries.begin();
+ i != pending_queries.end();
+ pending_queries.erase(i++)) {
+ dout(10) << "sending log to " << i->first << dendl;
+ pg->fulfill_log(i->first, i->second);
+ }
+ backlog_requested = false;
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::Stray::react(const MQuery& query) {
+ PG *pg = context< RecoveryMachine >().pg;
+ if (query.query.type == Query::BACKLOG) {
+ if (!pg->log.backlog) {
+ dout(10) << "Stray, need a backlog!"
+ << dendl;
+ pending_queries[query.from] = query.query;
+ if (!backlog_requested) {
+ dout(10) << "Stray, generating a backlog!"
+ << dendl;
+ backlog_requested = true;
+ pg->osd->queue_generate_backlog(pg);
+ }
+ return discard_event();
+ }
+ }
+
+ if (query.query.type == Query::INFO) {
+ pair<int, Info> notify_info;
+ pg->fulfill_info(query.from, query.query, notify_info);
+ context< RecoveryMachine >().send_notify(notify_info.first, notify_info.second);
+ } else {
+ pg->fulfill_log(query.from, query.query);
+ }
+ return discard_event();
+}
+
+/*--------GetInfo---------*/
+PG::RecoveryState::GetInfo::GetInfo(my_context ctx) : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->generate_past_intervals();
+ auto_ptr<PgPriorSet> &prior_set = context< Peering >().prior_set;
+
+ if (!prior_set.get()) {
+ stringstream out;
+ pg->build_prior(prior_set);
+ dout(10) << "PG::do_peer: peer prior_set is "
+ << *prior_set << dendl;
+
+ if (pg->need_up_thru) {
+ dout(10) << "transitioning to pending, need upthru" << dendl;
+ post_event(NeedNewMap());
+ } else {
+ stringstream out;
+ map< int, map< pg_t, Query > > query_map;
+ prior_set->gen_query_map(*pg->osd->osdmap,
+ pg->info,
+ query_map);
+ for (map< int, map< pg_t, Query> >::iterator i = query_map.begin();
+ i != query_map.end();
+ ++i) {
+ for (map< pg_t, Query >::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ context< RecoveryMachine >().send_query(i->first, j->second);
+ peer_info_requested.insert(i->first);
+ }
+ }
+ }
+ }
+ if (peer_info_requested.empty()) {
+ post_event(GotInfo());
+ }
+}
+
+boost::statechart::result
+PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) {
+ if (peer_info_requested.count(infoevt.from)) {
+ peer_info_requested.erase(infoevt.from);
+ }
+ if (peer_info_requested.empty()) {
+ post_event(GotInfo());
+ }
+ return forward_event();
+}
+
+/*------GetLog------------*/
+PG::RecoveryState::GetLog::GetLog(my_context ctx) :
+ my_base(ctx), newest_update_osd(-1), need_backlog(false), msg(0) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "In GetLog, selecting log location" << dendl;
+ eversion_t newest_update;
+ eversion_t oldest_update;
+ stringstream out;
+ pg->choose_log_location(need_backlog,
+ wait_on_backlog,
+ newest_update_osd,
+ newest_update,
+ oldest_update);
+
+ if (!pg->choose_acting(newest_update_osd == -1 ? 0 : newest_update_osd)) {
+ dout(10) << "transitioning to pending, need new acting" << dendl;
+ post_event(NeedNewMap());
+ } else {
+
+ if (need_backlog && !pg->log.backlog) {
+ dout(10) << "In GetLog, need backlog" << dendl;
+ pg->osd->queue_generate_backlog(pg);
+ }
+
+ if (newest_update_osd != -1) {
+ dout(10) << "Sending request to osd" << newest_update_osd
+ << "for the master log" << dendl;
+ context<RecoveryMachine>().send_query(newest_update_osd,
+ Query(wait_on_backlog ? Query::BACKLOG : Query::LOG,
+ oldest_update,
+ pg->info.history));
+ }
+
+ if (pg->log.backlog) {
+ wait_on_backlog = false;
+ }
+
+ if (!wait_on_backlog && newest_update_osd == -1) {
+ dout(10) << "GetLog: neither backlog nor master log needed, "
+ << "moving to GetMissing" << dendl;
+ post_event(GotLog());
+ }
+ }
+}
+
+boost::statechart::result
+PG::RecoveryState::GetLog::react(const MLogRec& logevt) {
+ assert(!msg);
+ assert(logevt.from == newest_update_osd);
+ dout(10) << "GetLog: recieved master log from osd"
+ << logevt.from << dendl;
+ msg = logevt.msg;
+ msg->get();
+ if (!wait_on_backlog) {
+ dout(10) << "GetLog: already have/don't need backlog."
+ << "moving on to GetMissing" << dendl;
+ post_event(GotLog());
+ }
+ dout(10) << "GetLog: Still need backlog" << dendl;
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::GetLog::react(const BacklogComplete&) {
+ wait_on_backlog = false;
+ if (msg || newest_update_osd == -1) {
+ dout(10) << "GetLog: already have/don't need master log."
+ << "moving on to GetMissing" << dendl;
+ post_event(GotLog());
+ } else {
+ dout(10) << "GetLog: Still need master log" << dendl;
+ }
+ return forward_event();
+}
+
+void PG::RecoveryState::GetLog::exit() {
+ PG *pg = context< RecoveryMachine >().pg;
+ if (msg) {
+ dout(10) << "processing master log" << dendl;
+ pg->proc_master_log(*context<RecoveryMachine>().get_cur_transaction(),
+ msg->info, msg->log, msg->missing,
+ newest_update_osd);
+ }
+}
+
+PG::RecoveryState::GetLog::~GetLog() {
+ dout(10) << "leaving GetLog" << dendl;
+ if (msg) msg->put();
+}
+
+/*------GetMissing--------*/
+PG::RecoveryState::GetMissing::GetMissing(my_context ctx) : my_base(ctx) {
+ PG *pg = context< RecoveryMachine >().pg;
+ map<int, Missing> &peer_missing = pg->peer_missing;
+ for (vector<int>::iterator i = pg->acting.begin()++;
+ i != pg->acting.end();
+ ++i) {
+ const Info& pi = pg->peer_info[*i];
+ if (pi.is_empty()) continue; // Does not have the PG yet
+ if (pi.last_update == pi.last_complete && // peer has no missing
+ pi.last_update == pg->info.last_update) { // peer is up to date
+ // replica has no missing and identical log as us. no need to
+ // pull anything.
+ dout(10) << "osd" << *i << " has no missing, identical log" << dendl;
+ peer_missing[*i];
+ pg->search_for_missing(pg->peer_info[*i], &peer_missing[*i], *i);
+ }
+ dout(10) << "Requesting missing from osd" << *i << dendl;
+ context< RecoveryMachine >().send_query(*i,
+ Query(Query::LOG,
+ eversion_t(pi.history.last_epoch_started, 0),
+ pg->info.history));
+ peer_missing_requested.insert(*i);
+ }
+
+ if (peer_missing_requested.empty()) {
+ post_event(Activate());
+ }
+}
+
+boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt) {
+ PG *pg = context< RecoveryMachine >().pg;
+ dout(10) << "GetMissing: Got a missing from osd" << logevt.from
+ << "waiting on " << peer_missing_requested.size() - 1
+ << " more" << dendl;
+ MOSDPGLog *msg = logevt.msg;
+ peer_missing_requested.erase(logevt.from);
+ pg->proc_replica_log(*context<RecoveryMachine>().get_cur_transaction(),
+ msg->info, msg->log, msg->missing, logevt.from);
+
+ if (peer_missing_requested.empty()) {
+ post_event(Activate());
+ }
+ return discard_event();
+};
+
+/*----Public Methods-----*/
+void PG::RecoveryState::handle_notify(int from, PG::Info& i,
+ RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(MNotifyRec(from, i));
+ end_handle();
+}
+
+void PG::RecoveryState::handle_info(int from, PG::Info& i,
+ RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(MInfoRec(from, i));
+ end_handle();
+}
+
+void PG::RecoveryState::handle_log(int from,
+ MOSDPGLog *msg,
+ RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(MLogRec(from, msg));
+ end_handle();
+}
+
+void PG::RecoveryState::handle_query(int from, const PG::Query& q,
+ RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(MQuery(from, q));
+ end_handle();
+}
+
+void PG::RecoveryState::handle_advance_map(OSDMap &osdmap, OSDMap &lastmap,
+ RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(AdvMap(osdmap, lastmap));
+ end_handle();
+}
+
+void PG::RecoveryState::handle_activate_map(RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(ActMap());
+ end_handle();
+}
+
+void PG::RecoveryState::handle_backlog_generated(RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(BacklogComplete());
+ end_handle();
+}
+
+void PG::RecoveryState::handle_create(RecoveryCtx *rctx)
+{
+ start_handle(rctx);
+ machine.process_event(Initialize());
+ end_handle();
+}
+/*---------------------------------------------------*/
#undef dout_prefix
#define dout_prefix (*_dout << pg->gen_prefix() << "PgPriorSet: ")
#include "msg/Messenger.h"
#include "messages/MOSDRepScrub.h"
+#include <boost/statechart/custom_reaction.hpp>
+#include <boost/statechart/event.hpp>
+#include <boost/statechart/simple_state.hpp>
+#include <boost/statechart/state.hpp>
+#include <boost/statechart/state_machine.hpp>
+#include <boost/statechart/transition.hpp>
+
#include "common/DecayCounter.h"
#include <list>
class MOSDSubOp;
class MOSDSubOpReply;
class MOSDPGInfo;
+class MOSDPGLog;
struct PGPool {
friend std::ostream& operator<<(std::ostream& oss,
const struct PgPriorSet &prior);
- std::auto_ptr < PgPriorSet > prior_set;
+public:
+ struct RecoveryCtx {
+ map< int, map<pg_t, Query> > *query_map;
+ map< int, MOSDPGInfo* > *info_map;
+ map< int, vector<Info> > *notify_list;
+ list< Context* > *context_list;
+ ObjectStore::Transaction *transaction;
+ RecoveryCtx() : query_map(0), info_map(0), notify_list(0),
+ context_list(0), transaction(0) {}
+ RecoveryCtx(map< int, map<pg_t, Query> > *query_map,
+ map< int, MOSDPGInfo* > *info_map,
+ map< int, vector<Info> > *notify_list,
+ list< Context* > *context_list,
+ ObjectStore::Transaction *transaction)
+ : query_map(query_map), info_map(info_map),
+ notify_list(notify_list),
+ context_list(context_list), transaction(transaction) {}
+ };
+
+ /* Encapsulates PG recovery process */
+ class RecoveryState {
+ void start_handle(RecoveryCtx *new_ctx) {
+ assert(!rctx);
+ rctx = new_ctx;
+ }
+
+ void end_handle() {
+ rctx = 0;
+ }
+
+ struct MInfoRec : boost::statechart::event< MInfoRec > {
+ int from;
+ Info &info;
+ MInfoRec(int from, Info &info) :
+ from(from), info(info) {}
+ };
+
+ struct MLogRec : boost::statechart::event< MLogRec > {
+ int from;
+ MOSDPGLog *msg;
+ MLogRec(int from, MOSDPGLog *msg) :
+ from(from), msg(msg) {}
+ };
+
+ struct MNotifyRec : boost::statechart::event< MNotifyRec > {
+ int from;
+ Info &info;
+ MNotifyRec(int from, Info &info) :
+ from(from), info(info) {}
+ };
+
+ struct MQuery : boost::statechart::event< MQuery > {
+ int from;
+ const Query &query;
+ MQuery(int from, const Query &query):
+ from(from), query(query) {}
+ };
+
+ struct AdvMap : boost::statechart::event< AdvMap > {
+ OSDMap &osdmap;
+ OSDMap &lastmap;
+ AdvMap(OSDMap &osdmap, OSDMap &lastmap):
+ osdmap(osdmap), lastmap(lastmap) {}
+ };
+
+ struct BacklogComplete : boost::statechart::event< BacklogComplete > {};
+ struct ActMap : boost::statechart::event< ActMap > {};
+ struct Activate : boost::statechart::event< Activate > {};
+ struct Initialize : boost::statechart::event< Initialize > {};
+
+
+ /* States */
+ struct Initial;
+ class RecoveryMachine : public boost::statechart::state_machine< RecoveryMachine, Initial > {
+ RecoveryState *state;
+ public:
+ PG *pg;
+
+ RecoveryMachine(RecoveryState *state, PG *pg) : state(state), pg(pg) {}
+
+ /* Accessor functions for state methods */
+ ObjectStore::Transaction* get_cur_transaction() {
+ assert(state->rctx->transaction);
+ return state->rctx->transaction;
+ }
+
+ void send_query(int to, const Query &query) {
+ assert(state->rctx->query_map);
+ (*state->rctx->query_map)[to][pg->info.pgid] = query;
+ }
+
+ map<int, map<pg_t, Query> > *get_query_map() {
+ assert(state->rctx->query_map);
+ return state->rctx->query_map;
+ }
+
+ map<int, MOSDPGInfo*> *get_info_map() {
+ assert(state->rctx->info_map);
+ return state->rctx->info_map;
+ }
+
+ list< Context* > *get_context_list() {
+ assert(state->rctx->context_list);
+ return state->rctx->context_list;
+ }
+
+ void send_notify(int to, const Info &info) {
+ assert(state->rctx->notify_list);
+ (*state->rctx->notify_list)[to].push_back(info);
+ }
+ };
+ friend class RecoveryMachine;
+
+ /* States */
+ struct Crashed :
+ boost::statechart::state< Crashed, RecoveryMachine > {
+ Crashed(my_context ctx) : my_base(ctx) { assert(0); }
+ };
+
+ struct Started;
+ struct Initial :
+ boost::statechart::simple_state< Initial, RecoveryMachine > {
+ typedef boost::mpl::list <
+ boost::statechart::transition< Initialize, Started >
+ > reactions;
+ };
+
+ struct Reset :
+ boost::statechart::state< Reset, RecoveryMachine > {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< AdvMap >,
+ boost::statechart::custom_reaction< ActMap >
+ > reactions;
+
+ /* Entry function for RecoveryMachine. Should initialize relevant peering
+ * state */
+ Reset(my_context ctx);
+ boost::statechart::result react(const AdvMap&);
+ boost::statechart::result react(const ActMap&);
+ };
+
+ struct Start;
+ struct Started :
+ boost::statechart::simple_state< Started, RecoveryMachine, Start > {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< AdvMap >
+ > reactions;
+
+ boost::statechart::result react(const AdvMap&);
+ };
+
+ struct MakePrimary : boost::statechart::event< MakePrimary > {};
+ struct MakeStray : boost::statechart::event< MakeStray > {};
+ struct Primary;
+ struct Stray;
+ struct Start :
+ boost::statechart::state< Start, Started > {
+ typedef boost::mpl::list <
+ boost::statechart::transition< MakePrimary, Primary >,
+ boost::statechart::transition< MakeStray, Stray >
+ > reactions;
+ Start(my_context ctx);
+ };
+
+ struct Peering;
+ struct Pending;
+ struct NeedNewMap : boost::statechart::event< NeedNewMap > {};
+ struct Primary :
+ boost::statechart::simple_state< Primary, Started, Peering > {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< ActMap >,
+ boost::statechart::custom_reaction< BacklogComplete >,
+ boost::statechart::custom_reaction< MNotifyRec >,
+ boost::statechart::transition< NeedNewMap, Pending >
+ > reactions;
+ boost::statechart::result react(const BacklogComplete&);
+ boost::statechart::result react(const ActMap&);
+ boost::statechart::result react(const MNotifyRec&);
+ };
+
+ struct Pending :
+ boost::statechart::simple_state< Pending, Primary> {};
+
+
+ struct GetInfo;
+ struct Active;
+ struct Peering :
+ boost::statechart::state< Peering, Primary, GetInfo > {
+ typedef boost::mpl::list <
+ boost::statechart::transition< Activate, Active >,
+ boost::statechart::custom_reaction< AdvMap >
+ > reactions;
+ std::auto_ptr< PgPriorSet > prior_set;
+
+ Peering(my_context ctx);
+ boost::statechart::result react(const AdvMap &advmap);
+ void exit();
+ };
+
+ struct Active :
+ boost::statechart::state< Active, Primary > {
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< ActMap >,
+ boost::statechart::custom_reaction< AdvMap >,
+ boost::statechart::custom_reaction< MInfoRec >,
+ boost::statechart::custom_reaction< MNotifyRec >
+ > reactions;
+
+ Active(my_context ctx);
+ boost::statechart::result react(const ActMap&);
+ boost::statechart::result react(const AdvMap&);
+ boost::statechart::result react(const MInfoRec& infoevt);
+ boost::statechart::result react(const MNotifyRec& notevt);
+ };
+
+ struct ReplicaActive : boost::statechart::state< ReplicaActive, Started > {
+ typedef boost::mpl::list <
+ boost::statechart::transition< MQuery, Crashed >,
+ boost::statechart::custom_reaction< MInfoRec >
+ > reactions;
+
+ ReplicaActive(my_context ctx);
+ boost::statechart::result react(const MInfoRec& infoevt);
+ };
+
+ struct Stray : boost::statechart::state< Stray, Started > {
+ bool backlog_requested;
+ map<int, Query> pending_queries;
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< MQuery >,
+ boost::statechart::custom_reaction< MLogRec >,
+ boost::statechart::custom_reaction< MInfoRec >,
+ boost::statechart::custom_reaction< BacklogComplete >,
+ boost::statechart::transition< Activate, ReplicaActive >
+ > reactions;
+
+ Stray(my_context ctx);
+
+ boost::statechart::result react(const MQuery& query);
+ boost::statechart::result react(const BacklogComplete&);
+ boost::statechart::result react(const MLogRec& logevt);
+ boost::statechart::result react(const MInfoRec& infoevt);
+ };
+
+ struct GetLog;
+ struct GotInfo : boost::statechart::event< GotInfo > {};
+ struct GetInfo :
+ boost::statechart::state< GetInfo, Peering > {
+ set<int> peer_info_requested;
+ typedef boost::mpl::list <
+ boost::statechart::transition< GotInfo, GetLog >,
+ boost::statechart::custom_reaction< MNotifyRec >,
+ boost::statechart::transition< MLogRec, Crashed >,
+ boost::statechart::transition< BacklogComplete, Crashed >
+ > reactions;
+
+ GetInfo(my_context ctx);
+ boost::statechart::result react(const MNotifyRec& infoevt);
+ };
+
+ struct GetMissing;
+ struct GotLog : boost::statechart::event< GotLog > {};
+ struct GetLog :
+ boost::statechart::state< GetLog, Peering > {
+ int newest_update_osd;
+ bool need_backlog;
+ bool wait_on_backlog;
+ MOSDPGLog *msg;
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< MLogRec >,
+ boost::statechart::custom_reaction< BacklogComplete >,
+ boost::statechart::transition< GotLog, GetMissing >
+ > reactions;
+
+ GetLog(my_context ctx);
+ boost::statechart::result react(const MLogRec& logevt);
+ boost::statechart::result react(const BacklogComplete&);
+ void exit();
+ ~GetLog();
+ };
+
+ struct GetMissing :
+ boost::statechart::state< GetMissing, Peering > {
+ set<int> peer_missing_requested;
+ typedef boost::mpl::list <
+ boost::statechart::custom_reaction< MLogRec >
+ > reactions;
+
+ GetMissing(my_context ctx);
+ boost::statechart::result react(const MLogRec& logevt);
+ };
+
+ RecoveryMachine machine;
+ PG *pg;
+ RecoveryCtx *rctx;
+
+ public:
+ RecoveryState(PG *pg) : machine(this, pg), pg(pg), rctx(0) {
+ machine.initiate();
+ }
+
+ void handle_notify(int from, Info& i, RecoveryCtx *ctx);
+ void handle_info(int from, Info& i, RecoveryCtx *ctx);
+ void handle_log(int from,
+ MOSDPGLog *msg,
+ RecoveryCtx *ctx);
+ void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx);
+ void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap, RecoveryCtx *ctx);
+ void handle_activate_map(RecoveryCtx *ctx);
+ void handle_backlog_generated(RecoveryCtx *ctx);
+ void handle_create(RecoveryCtx *ctx);
+ } recovery_state;
+
+protected:
bool need_up_thru;
set<int> stray_set; // non-acting osds that have PG data.
void generate_past_intervals();
void trim_past_intervals();
- void build_prior();
+ void build_prior(std::auto_ptr<PgPriorSet> &prior_set);
void clear_prior();
- bool prior_set_affected(const OSDMap *map) const;
+ bool prior_set_affected(PgPriorSet &prior, const OSDMap *osdmap) const;
bool all_unfound_are_lost(const OSDMap* osdmap) const;
void mark_obj_as_lost(ObjectStore::Transaction& t,
void proc_replica_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog,
Missing& omissing, int from);
+ void proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog,
+ Missing& omissing, int from);
+ void proc_replica_info(int from, Info &info);
bool merge_old_entry(ObjectStore::Transaction& t, Log::Entry& oe);
void merge_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, int from);
void search_for_missing(const Info &oinfo, const Missing *omissing,
void _activate_committed(epoch_t e);
void all_activated_and_committed();
+ void process_primary_info(ObjectStore::Transaction &t, const Info &info);
+
bool have_unfound() const {
return missing.num_missing() > missing_loc.size();
}
role(0),
state(0),
have_master_log(true),
+ recovery_state(this),
need_up_thru(false),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),
void warm_restart();
+ void fulfill_info(int from, const Query &query,
+ pair<int, Info> ¬ify_info);
+ void fulfill_log(int from, const Query &query);
bool acting_up_affected(OSDMap &osdmap,
const OSDMap &lastmap);
+
// abstract bits
- virtual bool handle_advance_map(OSDMap &osdmap,
- const OSDMap &lastmap);
+ virtual void handle_notify(int from, PG::Info& i, RecoveryCtx *ctx) = 0;
+ virtual void handle_info(int from, PG::Info& i, RecoveryCtx *ctx) = 0;
+ virtual void handle_log(int from,
+ MOSDPGLog *msg,
+ RecoveryCtx *ctx) = 0;
+ virtual void handle_query(int from, const PG::Query& q, RecoveryCtx *ctx) = 0;
+ virtual void handle_advance_map(OSDMap &osdmap, OSDMap &lastmap,
+ RecoveryCtx *ctx) = 0;
+ virtual void handle_activate_map(RecoveryCtx *ctx) = 0;
+ virtual void handle_backlog_generated(RecoveryCtx *ctx) = 0;
+ virtual void handle_create(RecoveryCtx *ctx) = 0;
virtual void do_op(MOSDOp *op) = 0;
virtual void do_sub_op(MOSDSubOp *op) = 0;
virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;