i != pg_map.end();
i++) {
PG *pg = i->second;
-
- // replicas (new and old) ping primary.
- if (pg->get_role() == 0) {
- assert(pg->acting[0] == whoami);
- for (unsigned i=0; i<pg->acting.size(); i++)
- _add_heartbeat_source(pg->acting[i], old_from, old_from_stamp, old_con);
- for (unsigned i=0; i<pg->up.size(); i++)
- _add_heartbeat_source(pg->up[i], old_from, old_from_stamp, old_con);
- for (map<int,pg_info_t>::iterator p = pg->peer_info.begin(); p != pg->peer_info.end(); ++p)
- if (osdmap->is_up(p->first))
- _add_heartbeat_source(p->first, old_from, old_from_stamp, old_con);
- }
+ pg->heartbeat_peer_lock.Lock();
+ dout(20) << *pg << " heartbeat_peers " << pg->heartbeat_peers << dendl;
+ for (set<int>::iterator p = pg->heartbeat_peers.begin();
+ p != pg->heartbeat_peers.end();
+ ++p)
+ if (osdmap->is_up(*p))
+ _add_heartbeat_source(*p, old_from, old_from_stamp, old_con);
+ pg->heartbeat_peer_lock.Unlock();
}
for (map<int,epoch_t>::iterator p = old_from.begin();
}
update_stats();
+ // was this a new info? if so, update peers!
+ if (p == peer_info.end())
+ update_heartbeat_peers();
+
return true;
}
void PG::remove_down_peer_info(const OSDMapRef osdmap)
{
// Remove any downed osds from peer_info
+ bool removed = false;
map<int,pg_info_t>::iterator p = peer_info.begin();
while (p != peer_info.end()) {
if (!osdmap->is_up(p->first)) {
dout(10) << " dropping down osd." << p->first << " info " << p->second << dendl;
peer_missing.erase(p->first);
peer_info.erase(p++);
+ removed = true;
} else
p++;
}
+
+ // if we removed anyone, update peers (which include peer_info)
+ if (removed)
+ update_heartbeat_peers();
}
/*
{
dout(10) << "purge_strays " << stray_set << dendl;
+ bool removed = false;
for (set<int>::iterator p = stray_set.begin();
p != stray_set.end();
p++) {
dout(10) << "not sending PGRemove to down osd." << *p << dendl;
}
peer_info.erase(*p);
+ removed = true;
}
+ // if we removed anyone, update peers (which include peer_info)
+ if (removed)
+ update_heartbeat_peers();
+
stray_set.clear();
// clear _requested maps; we may have to peer() again if we discover
peer_missing_requested.clear();
}
-
-
+void PG::update_heartbeat_peers()
+{
+ assert(is_locked());
+ heartbeat_peer_lock.Lock();
+ heartbeat_peers.clear();
+ if (role == 0) {
+ for (unsigned i=0; i<acting.size(); i++)
+ heartbeat_peers.insert(acting[i]);
+ for (unsigned i=0; i<up.size(); i++)
+ heartbeat_peers.insert(up[i]);
+ for (map<int,pg_info_t>::iterator p = peer_info.begin(); p != peer_info.end(); ++p)
+ heartbeat_peers.insert(p->first);
+ }
+ dout(10) << "update_heartbeat_peers " << heartbeat_peers << dendl;
+ heartbeat_peer_lock.Unlock();
+}
void PG::update_stats()
{
{
PG *pg = context< RecoveryMachine >().pg;
pg->proc_replica_info(notify.from, notify.info);
+ pg->update_heartbeat_peers();
return transit< Primary >();
}
context< RecoveryMachine >().send_notify(pg->get_primary(),
pg->info);
}
+
+ pg->update_heartbeat_peers();
+
return transit< Started >();
}
dout(10) << "Active: handling ActMap" << dendl;
assert(pg->is_active());
assert(pg->is_primary());
+
pg->check_recovery_op_pulls(pg->get_osdmap());
if (g_conf->osd_check_for_log_corruption)
epoch_t last_peering_reset;
+
+ /* heartbeat peers */
+public:
+ Mutex heartbeat_peer_lock;
+ set<int> heartbeat_peers;
+
+protected:
/**
* BackfillInterval
*
void purge_strays();
+ void update_heartbeat_peers();
+
Context *finish_sync_event;
void finish_recovery(ObjectStore::Transaction& t, list<Context*>& tfin);
state(0),
need_up_thru(false),
last_peering_reset(0),
+ heartbeat_peer_lock("PG::heartbeat_peer_lock"),
backfill_target(-1),
pg_stats_lock("PG::pg_stats_lock"),
pg_stats_valid(false),