recovery_ops_active(0),
recovery_wq(this, &recovery_tp),
remove_list_lock("OSD::remove_list_lock"),
+ replay_queue_lock("OSD::replay_queue_lock"),
snap_trim_wq(this, &disk_tp),
scrub_wq(this, &disk_tp)
{
map_lock.get_read();
+ check_replay_queue();
+
// mon report?
utime_t now = g_clock.now();
if (now - last_mon_report > g_conf.osd_mon_report_interval)
map_lock.put_read();
timer.add_event_after(1.0, new C_Tick(this));
+
+
+ // finishers?
+ finished_lock.Lock();
+ if (finished.empty()) {
+ finished_lock.Unlock();
+ } else {
+ list<Message*> waiting;
+ waiting.splice(waiting.begin(), finished);
+
+ finished_lock.Unlock();
+ osd_lock.Unlock();
+
+ for (list<Message*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++) {
+ dispatch(*it);
+ }
+
+ osd_lock.Lock();
+ }
}
// =========================================
// take osd_lock, map_log (read)
pg->unlock();
- osd_lock.Lock();
map_lock.get_read();
pg->lock();
out2:
map_lock.put_read();
- osd_lock.Unlock();
out:
pg->unlock();
+void OSD::check_replay_queue()
+{
+ utime_t now = g_clock.now();
+ list< pair<pg_t,utime_t> > pgids;
+ replay_queue_lock.Lock();
+ while (!replay_queue.empty() &&
+ replay_queue.front().second <= now) {
+ pgids.push_back(replay_queue.front());
+ replay_queue.pop_front();
+ }
+ replay_queue_lock.Unlock();
+
+ for (list< pair<pg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); p++)
+ activate_pg(p->first, p->second);
+}
+
/*
* NOTE: this is called from SafeTimer, so caller holds osd_lock
*/
-void OSD::activate_pg(pg_t pgid, epoch_t epoch)
+void OSD::activate_pg(pg_t pgid, utime_t activate_at)
{
assert(osd_lock.is_locked());
if (pg->is_crashed() &&
pg->is_replay() &&
pg->get_role() == 0 &&
- pg->info.history.same_primary_since <= epoch) {
+ pg->replay_until == activate_at) {
ObjectStore::Transaction t;
pg->activate(t);
store->apply_transaction(t);
// wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted
wake_all_pg_waiters();
-
- // finishers?
- finished_lock.Lock();
- if (finished.empty()) {
- finished_lock.Unlock();
- } else {
- list<Message*> waiting;
- waiting.splice(waiting.begin(), finished);
-
- finished_lock.Unlock();
- osd_lock.Unlock();
-
- for (list<Message*>::iterator it = waiting.begin();
- it != waiting.end();
- it++) {
- dispatch(*it);
- }
-
- osd_lock.Lock();
- }
}
remove_list_lock.Unlock();
}
- void activate_pg(pg_t pgid, epoch_t epoch);
-
- class C_Activate : public Context {
- OSD *osd;
- pg_t pgid;
- epoch_t epoch;
- public:
- C_Activate(OSD *o, pg_t p, epoch_t e) : osd(o), pgid(p), epoch(e) {}
- void finish(int r) {
- osd->activate_pg(pgid, epoch);
- }
- };
+ // replay / delayed pg activation
+ Mutex replay_queue_lock;
+ list< pair<pg_t, utime_t > > replay_queue;
+
+ void check_replay_queue();
+ void activate_pg(pg_t pgid, utime_t activate_at);
// -- snap trimming --
// -- crash recovery?
if (is_crashed()) {
- dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window << dendl;
+ replay_until = g_clock.now();
+ replay_until += g_conf.osd_replay_window;
+ dout(10) << "crashed, allowing op replay for " << g_conf.osd_replay_window
+ << " until " << replay_until << dendl;
state_set(PG_STATE_REPLAY);
- osd->timer.add_event_after(g_conf.osd_replay_window,
- new OSD::C_Activate(osd, info.pgid, osd->osdmap->get_epoch()));
+ osd->replay_queue_lock.Lock();
+ osd->replay_queue.push_back(pair<pg_t,utime_t>(info.pgid, replay_until));
+ osd->replay_queue_lock.Unlock();
}
else if (!is_active()) {
// -- ok, activate!
void PG::clear_stats()
{
+ dout(15) << "clear_stats" << dendl;
pg_stats_lock.Lock();
pg_stats_valid = false;
pg_stats_lock.Unlock();