{
assert(osd_lock.is_locked());
- map< int, map<pg_t,PG::Query> > query_map; // peer -> PG -> get_summary_since
-
+ dout(10) << "activate_pg" << dendl;
if (pg_map.count(pgid)) {
PG *pg = _lookup_lock_pg(pgid);
- if (pg->is_crashed() &&
+ if (pg->is_active() &&
pg->is_replay() &&
pg->get_role() == 0 &&
pg->replay_until == activate_at) {
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- C_Contexts *fin = new C_Contexts;
- pg->activate(*t, fin->contexts, query_map);
- int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin);
- assert(tr == 0);
+
+ pg->replay_queued_ops();
}
pg->unlock();
}
- do_queries(query_map);
-
// wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted
wake_all_pg_waiters();
}
// pg must be active.
- if (!pg->is_active()) {
- // replay?
+ if (pg->is_replay()) {
if (op->get_version().version > 0) {
- if (op->get_version() > pg->info.last_update) {
- dout(7) << *pg << " queueing replay at " << op->get_version()
- << " for " << *op << dendl;
- pg->replay_queue[op->get_version()] = op;
- pg->unlock();
- return;
- } else {
- dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
- << " for " << *op
- << ", will queue for WRNOOP" << dendl;
- }
+ dout(7) << *pg << " queueing replay at " << op->get_version()
+ << " for " << *op << dendl;
+ pg->replay_queue[op->get_version()] = op;
+ pg->unlock();
+ return;
}
-
+ }
+
+ if (!pg->is_active()) {
dout(7) << *pg << " not active (yet)" << dendl;
pg->waiting_for_active.push_back(op);
pg->unlock();
dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
<< " until " << replay_until << dendl;
state_set(PG_STATE_REPLAY);
- state_clear(PG_STATE_PEERING);
osd->replay_queue_lock.Lock();
osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
osd->replay_queue_lock.Unlock();
- }
- else if (!is_active()) {
- // -- ok, activate!
+ }
+
+ if (!is_active()) {
activate(t, tfin, query_map, activator_map);
}
- else if (is_all_uptodate())
+
+ if (is_all_uptodate())
finish_recovery(t, tfin);
}
state_clear(PG_STATE_STRAY);
state_clear(PG_STATE_DOWN);
state_clear(PG_STATE_PEERING);
- if (is_crashed()) {
- //assert(is_replay()); // HELP.. not on replica?
- state_clear(PG_STATE_CRASHED);
- state_clear(PG_STATE_REPLAY);
- }
+ state_clear(PG_STATE_CRASHED);
if (is_primary() &&
osd->osdmap->get_pg_size(info.pgid) != acting.size())
state_set(PG_STATE_DEGRADED);
update_stats();
}
-
- // replay (queue them _before_ other waiting ops!)
- if (!replay_queue.empty()) {
- eversion_t c = info.last_update;
- list<Message*> replay;
- for (map<eversion_t,MOSDOp*>::iterator p = replay_queue.begin();
- p != replay_queue.end();
- p++) {
- if (p->first <= info.last_update) {
- dout(10) << "activate will WRNOOP " << p->first << " " << *p->second << dendl;
- replay.push_back(p->second);
- continue;
- }
- if (p->first.version != c.version+1) {
- dout(10) << "activate replay " << p->first
- << " skipping " << c.version+1 - p->first.version
- << " ops"
- << dendl;
- }
- dout(10) << "activate replay " << p->first << " " << *p->second << dendl;
- replay.push_back(p->second);
+ // waiters
+ if (!is_replay()) {
+ osd->take_waiters(waiting_for_active);
+ }
+}
+
+
+void PG::replay_queued_ops()
+{
+ assert(is_replay() && is_active() && !is_crashed());
+ eversion_t c = info.last_update;
+ list<Message*> replay;
+ dout(10) << "replay_queued_ops" << dendl;
+ state_clear(PG_STATE_REPLAY);
+
+ for (map<eversion_t,MOSDOp*>::iterator p = replay_queue.begin();
+ p != replay_queue.end();
+ p++) {
+ if (p->first.version != c.version+1) {
+ dout(10) << "activate replay " << p->first
+ << " skipping " << c.version+1 - p->first.version
+ << " ops"
+ << dendl;
c = p->first;
}
- replay_queue.clear();
- osd->take_waiters(replay);
+ dout(10) << "activate replay " << p->first << " " << *p->second << dendl;
+ replay.push_back(p->second);
}
-
- // waiters
+ replay_queue.clear();
+ osd->take_waiters(replay);
osd->take_waiters(waiting_for_active);
+ state_clear(PG_STATE_REPLAY);
+ update_stats();
}
void PG::_activate_committed(epoch_t e)
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map=0);
void build_might_have_unfound();
+ void replay_queued_ops();
void activate(ObjectStore::Transaction& t, list<Context*>& tfin,
map< int, map<pg_t,Query> >& query_map,
map<int, MOSDPGInfo*> *activator_map=0);