}
// wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted
- for (hash_map<pg_t, list<Message*> >::iterator p = waiting_for_pg.begin();
- p != waiting_for_pg.end();
- p++)
- take_waiters(p->second);
- waiting_for_pg.clear();
-
+ wake_all_pg_waiters();
// finishers?
finished_lock.Lock();
logger->set("numpg", pg_map.size());
+ wake_all_pg_waiters(); // the pg mapping may have shifted
+
update_heartbeat_peers();
}
t.collection_setattr(pg->info.pgid, "info", (char*)&pg->info, sizeof(pg->info));
pg->write_log(t);
- if (waiting_for_pg.count(pg->info.pgid)) {
- take_waiters(waiting_for_pg[pg->info.pgid]);
- waiting_for_pg.erase(pg->info.pgid);
- }
+ wake_pg_waiters(pg->info.pgid);
+
pg->peer(t, query_map, &info_map);
pg->update_stats();
pg->unlock();
PG *pg = try_create_pg(pgid, t);
if (pg) {
created++;
- if (waiting_for_pg.count(pgid)) {
- take_waiters(waiting_for_pg[pgid]);
- waiting_for_pg.erase(pgid);
- }
+ wake_pg_waiters(pg->info.pgid);
pg->peer(t, query_map, &info_map);
pg->update_stats();
pg->unlock();
dout(10) << *pg << " is new" << dendl;
// kick any waiters
- if (waiting_for_pg.count(pgid)) {
- take_waiters(waiting_for_pg[pgid]);
- waiting_for_pg.erase(pgid);
- }
+ wake_pg_waiters(pg->info.pgid);
} else {
// already had it. am i (still) the primary?
pg = _lookup_lock_pg(pgid);
vector<int>& last);
void activate_pg(pg_t pgid, epoch_t epoch);
+ void wake_pg_waiters(pg_t pgid) {
+ if (waiting_for_pg.count(pgid)) {
+ take_waiters(waiting_for_pg[pgid]);
+ waiting_for_pg.erase(pgid);
+ }
+ }
+ void wake_all_pg_waiters() {
+ for (hash_map<pg_t, list<Message*> >::iterator p = waiting_for_pg.begin();
+ p != waiting_for_pg.end();
+ p++)
+ take_waiters(p->second);
+ waiting_for_pg.clear();
+ }
+
class C_Activate : public Context {
OSD *osd;
pg_t pgid;