From 997582c52332d26f6cadccfe1d47b9f3411634a4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 20 Nov 2008 13:56:36 -0800 Subject: [PATCH] osd: convert recovery to a work queue --- src/osd/OSD.cc | 101 +++++++++++++++++------------------------------- src/osd/OSD.h | 103 +++++++++++++++++++++++-------------------------- 2 files changed, 85 insertions(+), 119 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index e8fe2da42fd90..cdd7744215596 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -280,10 +280,9 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) : map_cache_lock("OSD::map_cache_lock"), pg_stat_queue_lock("OSD::pg_stat_queue_lock"), tid_lock("OSD::tid_lock"), - recovery_lock("OSD::recovery_lock"), - recovery_ops_active(0), recovery_stop(false), recovery_pause(false), + recovery_ops_active(0), + recovery_wq(this), remove_list_lock("OSD::remove_list_lock"), - recovery_thread(this), snap_trim_wq(this), scrub_wq(this) { @@ -451,7 +450,7 @@ int OSD::init() booting = true; do_mon_report(); // start mon report timer - recovery_thread.create(); + recovery_wq.start(); scrub_wq.start(); snap_trim_wq.start(); @@ -488,9 +487,8 @@ int OSD::shutdown() delete threadpool; threadpool = 0; - stop_recovery_thread(); - dout(10) << "recovery thread stopped" << dendl; - + recovery_wq.stop(); + dout(10) << "recovery wq stopped" << dendl; scrub_wq.stop(); dout(10) << "scrub wq stopped" << dendl; snap_trim_wq.stop(); @@ -667,6 +665,7 @@ void OSD::_remove_unlock_pg(PG *pg) dout(10) << "_remove_unlock_pg " << pgid << dendl; + recovery_wq.dequeue(pg); snap_trim_wq.dequeue(pg); scrub_wq.dequeue(pg); @@ -961,6 +960,9 @@ void OSD::heartbeat() messenger->get_myinst()); return; } + + // periodically kick recovery work queue + recovery_wq.kick(); // get CPU load avg ifstream in("/proc/loadavg"); @@ -1519,7 +1521,7 @@ void OSD::handle_osd_map(MOSDMap *m) booting = boot_pending = false; wait_for_no_ops(); - pause_recovery_thread(); + recovery_wq.pause(); scrub_wq.pause(); snap_trim_wq.pause(); map_lock.get_write(); @@ -1714,7 +1716,7 @@ void OSD::handle_osd_map(MOSDMap *m) store->sync(); map_lock.put_write(); - unpause_recovery_thread(); + recovery_wq.unpause(); scrub_wq.unpause(); snap_trim_wq.unpause(); @@ -2938,34 +2940,18 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch) } - -void OSD::queue_for_recovery(PG *pg) +bool OSD::queue_for_recovery(PG *pg) { - recovery_lock.Lock(); - if (!pg->recovery_item.get_xlist()) { - recovering_pgs.push_back(&pg->recovery_item); - pg->get(); - dout(10) << "queue_for_recovery " << *pg - << " -- " << recovering_pgs.size() << " queued" << dendl; - } else { - dout(10) << "queue_for_recovery " << *pg << " -- already queued" << dendl; - } - - // delay recovery start? - if (g_conf.osd_recovery_delay_start > 0) { - defer_recovery_until = g_clock.now(); - defer_recovery_until += g_conf.osd_recovery_delay_start; - timer.add_event_at(defer_recovery_until, new C_StartRecovery(this)); - } else if (_recover_now()) - recovery_cond.Signal(); - recovery_lock.Unlock(); + bool b = recovery_wq.queue(pg); + if (b) + dout(10) << "queue_for_recovery queued " << *pg << dendl; + else + dout(10) << "queue_for_recovery already queued " << *pg << dendl; + return b; } bool OSD::_recover_now() { - if (recovering_pgs.empty()) - return false; - if (recovery_ops_active >= g_conf.osd_recovery_max_active) { dout(15) << "_recover_now max " << g_conf.osd_recovery_max_active << " active" << dendl; return false; @@ -2977,45 +2963,27 @@ bool OSD::_recover_now() return true; } -void OSD::_do_recovery() + +void OSD::do_recovery(PG *pg) { - assert(recovery_lock.is_locked()); + pg->lock(); int max = g_conf.osd_recovery_max_active - recovery_ops_active; - - PG *pg = recovering_pgs.front(); - pg->get(); - + dout(10) << "do_recovery starting " << max << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " active) on " << *pg << dendl; - - recovery_lock.Unlock(); - - pg->lock(); + int started = pg->start_recovery_ops(max); recovery_ops_active += started; pg->recovery_ops_active += started; if (started < max) pg->recovery_item.remove_myself(); + pg->put_unlock(); - - recovery_lock.Lock(); } -void OSD::recovery_entry() -{ - recovery_lock.Lock(); - dout(10) << "recovery_entry - start" << dendl; - while (!recovery_stop) { - if (!recovery_pause && _recover_now()) - _do_recovery(); - recovery_cond.Wait(recovery_lock); - } - dout(10) << "recovery_entry - done" << dendl; - recovery_lock.Unlock(); -} @@ -3023,7 +2991,7 @@ void OSD::finish_recovery_op(PG *pg, int count, bool dequeue) { dout(10) << "finish_recovery_op " << *pg << " count " << count << " dequeue=" << dequeue << dendl; - recovery_lock.Lock(); + recovery_wq.lock(); // adjust count recovery_ops_active -= count; @@ -3031,22 +2999,25 @@ void OSD::finish_recovery_op(PG *pg, int count, bool dequeue) if (dequeue) pg->recovery_item.remove_myself(); - else - recovering_pgs.push_front(&pg->recovery_item); // requeue + else { + pg->get(); + recovery_queue.push_front(&pg->recovery_item); // requeue + } - recovery_cond.Signal(); - recovery_lock.Unlock(); + recovery_wq._kick(); + recovery_wq.unlock(); } void OSD::defer_recovery(PG *pg) { dout(10) << "defer_recovery " << *pg << dendl; - recovery_lock.Lock(); // move pg to the end of the queue... - recovering_pgs.push_back(&pg->recovery_item); - - recovery_lock.Unlock(); + recovery_wq.lock(); + pg->get(); + recovery_queue.push_back(&pg->recovery_item); + recovery_wq._kick(); + recovery_wq.unlock(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 727962ba11469..a17f75d2a8a29 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -417,48 +417,54 @@ private: int& created); // -- pg recovery -- - Mutex recovery_lock; - Cond recovery_cond; - xlist recovering_pgs; + xlist recovery_queue; utime_t defer_recovery_until; int recovery_ops_active; - bool recovery_stop; - bool recovery_pause; - Mutex remove_list_lock; - map > > remove_list; + struct RecoveryWQ : public WorkQueue { + OSD *osd; + RecoveryWQ(OSD *o) : WorkQueue("OSD::RecoveryWQ"), osd(o) {} + + bool _enqueue(PG *pg) { + if (!pg->recovery_item.get_xlist()) { + pg->get(); + osd->recovery_queue.push_back(&pg->recovery_item); + + if (g_conf.osd_recovery_delay_start > 0) { + osd->defer_recovery_until = g_clock.now(); + osd->defer_recovery_until += g_conf.osd_recovery_delay_start; + } + return true; + } + return false; + } + void _dequeue(PG *pg) { + pg->recovery_item.remove_myself(); + } + PG * _dequeue() { + if (osd->recovery_queue.empty()) + return NULL; + + if (!osd->_recover_now()) + return NULL; + + PG *pg = osd->recovery_queue.front(); + osd->recovery_queue.pop_front(); + return pg; + } + void _process(PG *pg) { + osd->do_recovery(pg); + } + } recovery_wq; - void queue_for_recovery(PG *pg); + bool queue_for_recovery(PG *pg); void finish_recovery_op(PG *pg, int count, bool more); void defer_recovery(PG *pg); - void _do_recovery(); - void recovery_entry(); + void do_recovery(PG *pg); bool _recover_now(); - void kick_recovery() { - recovery_lock.Lock(); - recovery_cond.Signal(); - recovery_lock.Unlock(); - } - void stop_recovery_thread() { - osd_lock.Unlock(); - recovery_lock.Lock(); - recovery_stop = true; - recovery_cond.Signal(); - recovery_lock.Unlock(); - recovery_thread.join(); - osd_lock.Lock(); - } - void pause_recovery_thread() { - recovery_lock.Lock(); - recovery_pause = true; - recovery_lock.Unlock(); - } - void unpause_recovery_thread() { - recovery_lock.Lock(); - recovery_pause = false; - recovery_cond.Signal(); - recovery_lock.Unlock(); - } + + Mutex remove_list_lock; + map > > remove_list; void queue_for_removal(int osd, pg_t pgid) { remove_list_lock.Lock(); @@ -466,23 +472,6 @@ private: remove_list_lock.Unlock(); } - struct RecoveryThread : public Thread { - OSD *osd; - RecoveryThread(OSD *o) : osd(o) {} - void *entry() { - osd->recovery_entry(); - return 0; - } - } recovery_thread; - - struct C_StartRecovery : public Context { - OSD *osd; - C_StartRecovery(OSD *o) : osd(o) {} - void finish(int r) { - osd->kick_recovery(); - } - }; - void activate_pg(pg_t pgid, epoch_t epoch); class C_Activate : public Context { @@ -504,8 +493,11 @@ private: OSD *osd; SnapTrimWQ(OSD *o) : WorkQueue("OSD::SnapTrimWQ"), osd(o) {} - void _enqueue(PG *pg) { + bool _enqueue(PG *pg) { + if (pg->snap_trim_item.is_on_xlist()) + return false; osd->snap_trim_queue.push_back(&pg->snap_trim_item); + return true; } void _dequeue(PG *pg) { pg->snap_trim_item.remove_myself(); @@ -530,8 +522,11 @@ private: OSD *osd; ScrubWQ(OSD *o) : WorkQueue("OSD::ScrubWQ"), osd(o) {} - void _enqueue(PG *pg) { + bool _enqueue(PG *pg) { + if (pg->scrub_item.is_on_xlist()) + return false; osd->scrub_queue.push_back(&pg->scrub_item); + return true; } void _dequeue(PG *pg) { pg->scrub_item.remove_myself(); -- 2.39.5