From 601f59857e7347168799def55e846bb53e3bed50 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 14 Mar 2011 17:25:46 -0700 Subject: [PATCH] PG,OSD: activate pg during replay Replay PGs already accept and queue transactions. PGs will now go to active during replay in order to simplify the state reported to the user and to allow recovery to being. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 37 ++++++++++---------------- src/osd/PG.cc | 70 +++++++++++++++++++++++++------------------------- src/osd/PG.h | 1 + 3 files changed, 49 insertions(+), 59 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index a503091769ce0..427544f6c70a6 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4896,25 +4896,19 @@ void OSD::activate_pg(pg_t pgid, utime_t activate_at) { assert(osd_lock.is_locked()); - map< int, map > query_map; // peer -> PG -> get_summary_since - + dout(10) << "activate_pg" << dendl; if (pg_map.count(pgid)) { PG *pg = _lookup_lock_pg(pgid); - if (pg->is_crashed() && + if (pg->is_active() && pg->is_replay() && pg->get_role() == 0 && pg->replay_until == activate_at) { - ObjectStore::Transaction *t = new ObjectStore::Transaction; - C_Contexts *fin = new C_Contexts; - pg->activate(*t, fin->contexts, query_map); - int tr = store->queue_transaction(&pg->osr, t, new ObjectStore::C_DeleteTransaction(t), fin); - assert(tr == 0); + + pg->replay_queued_ops(); } pg->unlock(); } - do_queries(query_map); - // wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted wake_all_pg_waiters(); } @@ -5187,22 +5181,17 @@ void OSD::handle_op(MOSDOp *op) // pg must be active. - if (!pg->is_active()) { - // replay? + if (pg->is_replay()) { if (op->get_version().version > 0) { - if (op->get_version() > pg->info.last_update) { - dout(7) << *pg << " queueing replay at " << op->get_version() - << " for " << *op << dendl; - pg->replay_queue[op->get_version()] = op; - pg->unlock(); - return; - } else { - dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update - << " for " << *op - << ", will queue for WRNOOP" << dendl; - } + dout(7) << *pg << " queueing replay at " << op->get_version() + << " for " << *op << dendl; + pg->replay_queue[op->get_version()] = op; + pg->unlock(); + return; } - + } + + if (!pg->is_active()) { dout(7) << *pg << " not active (yet)" << dendl; pg->waiting_for_active.push_back(op); pg->unlock(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 8c32f3ef1b0e9..fbd9779b812e3 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1712,16 +1712,16 @@ void PG::do_peer(ObjectStore::Transaction& t, list& tfin, dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window << " until " << replay_until << dendl; state_set(PG_STATE_REPLAY); - state_clear(PG_STATE_PEERING); osd->replay_queue_lock.Lock(); osd->replay_queue.push_back(pair(info.pgid, replay_until)); osd->replay_queue_lock.Unlock(); - } - else if (!is_active()) { - // -- ok, activate! + } + + if (!is_active()) { activate(t, tfin, query_map, activator_map); } - else if (is_all_uptodate()) + + if (is_all_uptodate()) finish_recovery(t, tfin); } @@ -1797,11 +1797,7 @@ void PG::activate(ObjectStore::Transaction& t, list& tfin, state_clear(PG_STATE_STRAY); state_clear(PG_STATE_DOWN); state_clear(PG_STATE_PEERING); - if (is_crashed()) { - //assert(is_replay()); // HELP.. not on replica? - state_clear(PG_STATE_CRASHED); - state_clear(PG_STATE_REPLAY); - } + state_clear(PG_STATE_CRASHED); if (is_primary() && osd->osdmap->get_pg_size(info.pgid) != acting.size()) state_set(PG_STATE_DEGRADED); @@ -1976,35 +1972,39 @@ void PG::activate(ObjectStore::Transaction& t, list& tfin, update_stats(); } - - // replay (queue them _before_ other waiting ops!) - if (!replay_queue.empty()) { - eversion_t c = info.last_update; - list replay; - for (map::iterator p = replay_queue.begin(); - p != replay_queue.end(); - p++) { - if (p->first <= info.last_update) { - dout(10) << "activate will WRNOOP " << p->first << " " << *p->second << dendl; - replay.push_back(p->second); - continue; - } - if (p->first.version != c.version+1) { - dout(10) << "activate replay " << p->first - << " skipping " << c.version+1 - p->first.version - << " ops" - << dendl; - } - dout(10) << "activate replay " << p->first << " " << *p->second << dendl; - replay.push_back(p->second); + // waiters + if (!is_replay()) { + osd->take_waiters(waiting_for_active); + } +} + + +void PG::replay_queued_ops() +{ + assert(is_replay() && is_active() && !is_crashed()); + eversion_t c = info.last_update; + list replay; + dout(10) << "replay_queued_ops" << dendl; + state_clear(PG_STATE_REPLAY); + + for (map::iterator p = replay_queue.begin(); + p != replay_queue.end(); + p++) { + if (p->first.version != c.version+1) { + dout(10) << "activate replay " << p->first + << " skipping " << c.version+1 - p->first.version + << " ops" + << dendl; c = p->first; } - replay_queue.clear(); - osd->take_waiters(replay); + dout(10) << "activate replay " << p->first << " " << *p->second << dendl; + replay.push_back(p->second); } - - // waiters + replay_queue.clear(); + osd->take_waiters(replay); osd->take_waiters(waiting_for_active); + state_clear(PG_STATE_REPLAY); + update_stats(); } void PG::_activate_committed(epoch_t e) diff --git a/src/osd/PG.h b/src/osd/PG.h index 7810cb1ba6f3b..b79ac7f48c523 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -861,6 +861,7 @@ public: map< int, map >& query_map, map *activator_map=0); void build_might_have_unfound(); + void replay_queued_ops(); void activate(ObjectStore::Transaction& t, list& tfin, map< int, map >& query_map, map *activator_map=0); -- 2.39.5