peer_missing[from].swap(omissing);
}
-void PG::proc_replica_info(int from, Info &oinfo)
+bool PG::proc_replica_info(int from, Info &oinfo)
{
- dout(10) << "proc_replica_info osd" << from << " " << oinfo << dendl;
+ map<int,PG::Info>::iterator p = peer_info.find(from);
+ if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
+ dout(10) << " got dup osd" << from << " info " << oinfo << ", identical to ours" << dendl;
+ return false;
+ }
+
+ dout(10) << " got osd" << from << " " << oinfo << dendl;
assert(is_primary());
peer_info[from] = oinfo;
might_have_unfound.insert(from);
}
}
update_stats();
+
+ return true;
}
pg->generate_past_intervals();
auto_ptr<PgPriorSet> &prior_set = context< Peering >().prior_set;
- if (!prior_set.get()) {
- stringstream out;
+ if (!prior_set.get())
pg->build_prior(prior_set);
- dout(10) << "PG::do_peer: peer prior_set is "
- << *prior_set << dendl;
- if (pg->need_up_thru) {
- dout(10) << "transitioning to pending, need upthru" << dendl;
- post_event(NeedNewMap());
- } else {
- for (set<int>::const_iterator it = prior_set->cur.begin();
- it != prior_set->cur.end();
- ++it) {
- int peer = *it;
- if (pg->peer_info.count(peer)) {
- dout(10) << " have osd" << peer << " info " << pg->peer_info[peer] << dendl;
- } else if (peer_info_requested.count(peer)) {
- dout(10) << " already requested info from osd" << peer << dendl;
- } else if (!pg->osd->osdmap->is_up(peer)) {
- dout(10) << " not querying info from down osd" << peer << dendl;
- } else {
- dout(10) << " querying info from osd" << peer << dendl;
- context< RecoveryMachine >().send_query(peer, Query(Query::INFO, pg->info.history));
- peer_info_requested.insert(peer);
- }
+ get_infos();
+}
+
+void PG::RecoveryState::GetInfo::get_infos() {
+ PG *pg = context< RecoveryMachine >().pg;
+ auto_ptr<PgPriorSet> &prior_set = context< Peering >().prior_set;
+
+ if (pg->need_up_thru) {
+ dout(10) << "transitioning to pending, need upthru" << dendl;
+ post_event(NeedNewMap());
+ } else {
+ for (set<int>::const_iterator it = prior_set->cur.begin();
+ it != prior_set->cur.end();
+ ++it) {
+ int peer = *it;
+ if (pg->peer_info.count(peer)) {
+ dout(10) << " have osd" << peer << " info " << pg->peer_info[peer] << dendl;
+ continue;
+ }
+ if (peer_info_requested.count(peer)) {
+ dout(10) << " already requested info from osd" << peer << dendl;
+ } else if (!pg->osd->osdmap->is_up(peer)) {
+ dout(10) << " not querying info from down osd" << peer << dendl;
+ } else {
+ dout(10) << " querying info from osd" << peer << dendl;
+ context< RecoveryMachine >().send_query(peer, Query(Query::INFO, pg->info.history));
+ peer_info_requested.insert(peer);
}
}
}
-
-#warning is this right?
- if (peer_info_requested.empty()) {
+
+ if (peer_info_requested.empty() && prior_set->down.empty()) {
post_event(GotInfo());
}
}
boost::statechart::result
PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt) {
- if (peer_info_requested.count(infoevt.from)) {
- peer_info_requested.erase(infoevt.from);
- }
- if (peer_info_requested.empty()) {
- post_event(GotInfo());
+ set<int>::iterator p = peer_info_requested.find(infoevt.from);
+ if (p != peer_info_requested.end())
+ peer_info_requested.erase(p);
+
+ PG *pg = context< RecoveryMachine >().pg;
+ epoch_t old_start = pg->info.history.last_epoch_started;
+ if (pg->proc_replica_info(infoevt.from, infoevt.info)) {
+ // we got something new ...
+ auto_ptr<PgPriorSet> &prior_set = context< Peering >().prior_set;
+ if (old_start < pg->info.history.last_epoch_started) {
+ dout(10) << " last_epoch_started moved forward, rebuilding prior" << dendl;
+ pg->build_prior(prior_set);
+ get_infos();
+ } else {
+ // are we done getting everything?
+ if (peer_info_requested.empty() && prior_set->down.empty())
+ post_event(GotInfo());
+ }
}
- return forward_event();
+ return discard_event();
+}
+
+boost::statechart::result
+PG::RecoveryState::GetInfo::react(const MInfoRec& infoevt) {
+ // notify and info are equivalent in this context.
+ post_event(MInfoRec(infoevt.from, infoevt.info));
+ return discard_event();
}
void PG::RecoveryState::GetInfo::exit() {
GetInfo(my_context ctx);
void exit();
+ void get_infos();
typedef boost::mpl::list <
boost::statechart::transition< GotInfo, GetLog >,
- boost::statechart::custom_reaction< MNotifyRec >
+ boost::statechart::custom_reaction< MNotifyRec >,
+ boost::statechart::custom_reaction< MInfoRec >
> reactions;
boost::statechart::result react(const MNotifyRec& infoevt);
+ boost::statechart::result react(const MInfoRec& infoevt);
};
struct GetMissing;
Missing& omissing, int from);
void proc_master_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog,
Missing& omissing, int from);
- void proc_replica_info(int from, Info &info);
+ bool proc_replica_info(int from, Info &info);
bool merge_old_entry(ObjectStore::Transaction& t, Log::Entry& oe);
void merge_log(ObjectStore::Transaction& t, Info &oinfo, Log &olog, int from);
void search_for_missing(const Info &oinfo, const Missing *omissing,