peer_missing[from].claim(omissing);
}
-bool PG::proc_replica_info(
- pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
-{
- map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
- if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
- dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
- return false;
- }
-
- if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
- dout(10) << " got info " << oinfo << " from down osd." << from
- << " discarding" << dendl;
- return false;
- }
-
- dout(10) << " got osd." << from << " " << oinfo << dendl;
- ceph_assert(is_primary());
- peer_info[from] = oinfo;
- might_have_unfound.insert(from);
-
- update_history(oinfo.history);
-
- // stray?
- if (!is_up(from) && !is_acting(from)) {
- dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
- stray_set.insert(from);
- if (is_clean()) {
- purge_strays();
- }
- }
-
- // was this a new info? if so, update peers!
- if (p == peer_info.end())
- update_heartbeat_peers();
-
- return true;
-}
-
void PG::remove_snap_mapped_object(
ObjectStore::Transaction &t, const hobject_t &soid)
{
return false;
}
-void PG::remove_down_peer_info(const OSDMapRef osdmap)
-{
- // Remove any downed osds from peer_info
- bool removed = false;
- map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
- while (p != peer_info.end()) {
- if (!osdmap->is_up(p->first.osd)) {
- dout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
- peer_missing.erase(p->first);
- peer_log_requested.erase(p->first);
- peer_missing_requested.erase(p->first);
- peer_purged.erase(p->first); // so we can re-purge if necessary
- peer_info.erase(p++);
- removed = true;
- } else
- ++p;
- }
-
- // if we removed anyone, update peers (which include peer_info)
- if (removed)
- update_heartbeat_peers();
- check_recovery_sources(osdmap);
-}
-
/*
* Returns true unless there is a non-lost OSD in might_have_unfound.
*/
// if we removed anyone, update peers (which include peer_info)
if (removed)
- update_heartbeat_peers();
+ recovery_state.update_heartbeat_peers();
stray_set.clear();
probe_targets.clear();
}
-void PG::update_heartbeat_peers()
+void PG::update_heartbeat_peers(set<int> new_peers)
{
- ceph_assert(is_locked());
-
- if (!is_primary())
- return;
-
- set<int> new_peers;
- for (unsigned i=0; i<acting.size(); i++) {
- if (acting[i] != CRUSH_ITEM_NONE)
- new_peers.insert(acting[i]);
- }
- for (unsigned i=0; i<up.size(); i++) {
- if (up[i] != CRUSH_ITEM_NONE)
- new_peers.insert(up[i]);
- }
- for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
- p != peer_info.end();
- ++p)
- new_peers.insert(p->first.osd);
-
bool need_update = false;
heartbeat_peer_lock.Lock();
if (new_peers == heartbeat_peers) {
machine.initiate();
}
+bool PeeringState::proc_replica_info(
+ pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
+{
+ map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);
+ if (p != peer_info.end() && p->second.last_update == oinfo.last_update) {
+ dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;
+ return false;
+ }
+
+ if (!get_osdmap()->has_been_up_since(from.osd, send_epoch)) {
+ dout(10) << " got info " << oinfo << " from down osd." << from
+ << " discarding" << dendl;
+ return false;
+ }
+
+ dout(10) << " got osd." << from << " " << oinfo << dendl;
+ ceph_assert(is_primary());
+ peer_info[from] = oinfo;
+ might_have_unfound.insert(from);
+
+ pg->update_history(oinfo.history);
+
+ // stray?
+ if (!is_up(from) && !is_acting(from)) {
+ dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
+ stray_set.insert(from);
+ if (is_clean()) {
+ pg->purge_strays();
+ }
+ }
+
+ // was this a new info? if so, update peers!
+ if (p == peer_info.end())
+ update_heartbeat_peers();
+
+ return true;
+}
+
+
+void PeeringState::remove_down_peer_info(const OSDMapRef &osdmap)
+{
+ // Remove any downed osds from peer_info
+ bool removed = false;
+ map<pg_shard_t, pg_info_t>::iterator p = peer_info.begin();
+ while (p != peer_info.end()) {
+ if (!osdmap->is_up(p->first.osd)) {
+ psdout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
+ peer_missing.erase(p->first);
+ peer_log_requested.erase(p->first);
+ peer_missing_requested.erase(p->first);
+ peer_purged.erase(p->first);
+ peer_info.erase(p++);
+ removed = true;
+ } else
+ ++p;
+ }
+
+ // if we removed anyone, update peers (which include peer_info)
+ if (removed)
+ update_heartbeat_peers();
+ pg->check_recovery_sources(osdmap);
+}
+
+void PeeringState::update_heartbeat_peers()
+{
+ if (!is_primary())
+ return;
+
+ set<int> new_peers;
+ for (unsigned i=0; i<acting.size(); i++) {
+ if (acting[i] != CRUSH_ITEM_NONE)
+ new_peers.insert(acting[i]);
+ }
+ for (unsigned i=0; i<up.size(); i++) {
+ if (up[i] != CRUSH_ITEM_NONE)
+ new_peers.insert(up[i]);
+ }
+ for (map<pg_shard_t,pg_info_t>::iterator p = peer_info.begin();
+ p != peer_info.end();
+ ++p) {
+ new_peers.insert(p->first.osd);
+ }
+ pl->update_heartbeat_peers(std::move(new_peers));
+}
+
void PeeringState::write_if_dirty(ObjectStore::Transaction& t)
{
pl->prepare_write(
boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
- pg->proc_replica_info(
+ ps->proc_replica_info(
notify.from, notify.notify.info, notify.notify.epoch_sent);
pg->set_last_peering_reset();
return transit< Primary >();
boost::statechart::result PeeringState::Started::react(const AdvMap& advmap)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
psdout(10) << "Started advmap" << dendl;
pg->check_full_transition(advmap.lastmap, advmap.osdmap);
post_event(advmap);
return transit< Reset >();
}
- pg->remove_down_peer_info(advmap.osdmap);
+ ps->remove_down_peer_info(advmap.osdmap);
return discard_event();
}
boost::statechart::result PeeringState::Reset::react(const AdvMap& advmap)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
psdout(10) << "Reset advmap" << dendl;
advmap.newacting, advmap.acting_primary,
context< PeeringMachine >().get_cur_transaction());
}
- pg->remove_down_peer_info(advmap.osdmap);
+ ps->remove_down_peer_info(advmap.osdmap);
pg->check_past_interval_bounds();
return discard_event();
}
boost::statechart::result PeeringState::Reset::react(const ActMap&)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
if (pg->should_send_notify() && pg->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
pg->past_intervals);
}
- pg->update_heartbeat_peers();
+ ps->update_heartbeat_peers();
pg->take_waiters();
return transit< Started >();
boost::statechart::result PeeringState::Primary::react(const MNotifyRec& notevt)
{
- PG *pg = context< PeeringMachine >().pg;
+ PeeringState *ps = context< PeeringMachine >().state;
psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl;
- pg->proc_replica_info(
+ ps->proc_replica_info(
notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
return discard_event();
}
boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
ceph_assert(pg->is_primary());
if (pg->peer_info.count(notevt.from)) {
psdout(10) << "Active: got notify from " << notevt.from
<< ", calling proc_replica_info and discover_all_missing"
<< dendl;
- pg->proc_replica_info(
+ ps->proc_replica_info(
notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
if (pg->have_unfound() || (pg->is_degraded() && pg->might_have_unfound.count(notevt.from))) {
pg->discover_all_missing(*context< PeeringMachine >().get_query_map());
boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt)
{
+ PeeringState *ps = context< PeeringMachine >().state;
PG *pg = context< PeeringMachine >().pg;
set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);
}
epoch_t old_start = pg->info.history.last_epoch_started;
- if (pg->proc_replica_info(
+ if (ps->proc_replica_info(
infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)) {
// we got something new ...
PastIntervals::PriorSet &prior_set = context< Peering >().prior_set;
}
boost::statechart::result PeeringState::Incomplete::react(const MNotifyRec& notevt) {
- PG *pg = context< PeeringMachine >().pg;
+ PeeringState *ps = context< PeeringMachine >().state;
psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl;
- if (pg->proc_replica_info(
+ if (ps->proc_replica_info(
notevt.from, notevt.notify.info, notevt.notify.epoch_sent)) {
// We got something new, try again!
return transit< GetLog >();