map_cache_lock("OSD::map_cache_lock"),
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
tid_lock("OSD::tid_lock"),
- recovery_lock("OSD::recovery_lock"),
- recovery_ops_active(0), recovery_stop(false), recovery_pause(false),
+ recovery_ops_active(0),
+ recovery_wq(this),
remove_list_lock("OSD::remove_list_lock"),
- recovery_thread(this),
snap_trim_wq(this),
scrub_wq(this)
{
booting = true;
do_mon_report(); // start mon report timer
- recovery_thread.create();
+ recovery_wq.start();
scrub_wq.start();
snap_trim_wq.start();
delete threadpool;
threadpool = 0;
- stop_recovery_thread();
- dout(10) << "recovery thread stopped" << dendl;
-
+ recovery_wq.stop();
+ dout(10) << "recovery wq stopped" << dendl;
scrub_wq.stop();
dout(10) << "scrub wq stopped" << dendl;
snap_trim_wq.stop();
dout(10) << "_remove_unlock_pg " << pgid << dendl;
+ recovery_wq.dequeue(pg);
snap_trim_wq.dequeue(pg);
scrub_wq.dequeue(pg);
messenger->get_myinst());
return;
}
+
+ // periodically kick recovery work queue
+ recovery_wq.kick();
// get CPU load avg
ifstream in("/proc/loadavg");
booting = boot_pending = false;
wait_for_no_ops();
- pause_recovery_thread();
+ recovery_wq.pause();
scrub_wq.pause();
snap_trim_wq.pause();
map_lock.get_write();
store->sync();
map_lock.put_write();
- unpause_recovery_thread();
+ recovery_wq.unpause();
scrub_wq.unpause();
snap_trim_wq.unpause();
}
-
-void OSD::queue_for_recovery(PG *pg)
+bool OSD::queue_for_recovery(PG *pg)
{
- recovery_lock.Lock();
- if (!pg->recovery_item.get_xlist()) {
- recovering_pgs.push_back(&pg->recovery_item);
- pg->get();
- dout(10) << "queue_for_recovery " << *pg
- << " -- " << recovering_pgs.size() << " queued" << dendl;
- } else {
- dout(10) << "queue_for_recovery " << *pg << " -- already queued" << dendl;
- }
-
- // delay recovery start?
- if (g_conf.osd_recovery_delay_start > 0) {
- defer_recovery_until = g_clock.now();
- defer_recovery_until += g_conf.osd_recovery_delay_start;
- timer.add_event_at(defer_recovery_until, new C_StartRecovery(this));
- } else if (_recover_now())
- recovery_cond.Signal();
- recovery_lock.Unlock();
+ bool b = recovery_wq.queue(pg);
+ if (b)
+ dout(10) << "queue_for_recovery queued " << *pg << dendl;
+ else
+ dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+ return b;
}
bool OSD::_recover_now()
{
- if (recovering_pgs.empty())
- return false;
-
if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
dout(15) << "_recover_now max " << g_conf.osd_recovery_max_active << " active" << dendl;
return false;
return true;
}
-void OSD::_do_recovery()
+
+void OSD::do_recovery(PG *pg)
{
- assert(recovery_lock.is_locked());
+ pg->lock();
int max = g_conf.osd_recovery_max_active - recovery_ops_active;
-
- PG *pg = recovering_pgs.front();
- pg->get();
-
+
dout(10) << "do_recovery starting " << max
<< " (" << recovery_ops_active
<< "/" << g_conf.osd_recovery_max_active << " active) on "
<< *pg << dendl;
-
- recovery_lock.Unlock();
-
- pg->lock();
+
int started = pg->start_recovery_ops(max);
recovery_ops_active += started;
pg->recovery_ops_active += started;
if (started < max)
pg->recovery_item.remove_myself();
+
pg->put_unlock();
-
- recovery_lock.Lock();
}
-void OSD::recovery_entry()
-{
- recovery_lock.Lock();
- dout(10) << "recovery_entry - start" << dendl;
- while (!recovery_stop) {
- if (!recovery_pause && _recover_now())
- _do_recovery();
- recovery_cond.Wait(recovery_lock);
- }
- dout(10) << "recovery_entry - done" << dendl;
- recovery_lock.Unlock();
-}
{
dout(10) << "finish_recovery_op " << *pg << " count " << count
<< " dequeue=" << dequeue << dendl;
- recovery_lock.Lock();
+ recovery_wq.lock();
// adjust count
recovery_ops_active -= count;
if (dequeue)
pg->recovery_item.remove_myself();
- else
- recovering_pgs.push_front(&pg->recovery_item); // requeue
+ else {
+ pg->get();
+ recovery_queue.push_front(&pg->recovery_item); // requeue
+ }
- recovery_cond.Signal();
- recovery_lock.Unlock();
+ recovery_wq._kick();
+ recovery_wq.unlock();
}
void OSD::defer_recovery(PG *pg)
{
dout(10) << "defer_recovery " << *pg << dendl;
- recovery_lock.Lock();
// move pg to the end of the queue...
- recovering_pgs.push_back(&pg->recovery_item);
-
- recovery_lock.Unlock();
+ recovery_wq.lock();
+ pg->get();
+ recovery_queue.push_back(&pg->recovery_item);
+ recovery_wq._kick();
+ recovery_wq.unlock();
}
int& created);
// -- pg recovery --
- Mutex recovery_lock;
- Cond recovery_cond;
- xlist<PG*> recovering_pgs;
+ xlist<PG*> recovery_queue;
utime_t defer_recovery_until;
int recovery_ops_active;
- bool recovery_stop;
- bool recovery_pause;
- Mutex remove_list_lock;
- map<epoch_t, map<int, vector<pg_t> > > remove_list;
+ struct RecoveryWQ : public WorkQueue<PG> {
+ OSD *osd;
+ RecoveryWQ(OSD *o) : WorkQueue<PG>("OSD::RecoveryWQ"), osd(o) {}
+
+ bool _enqueue(PG *pg) {
+ if (!pg->recovery_item.get_xlist()) {
+ pg->get();
+ osd->recovery_queue.push_back(&pg->recovery_item);
+
+ if (g_conf.osd_recovery_delay_start > 0) {
+ osd->defer_recovery_until = g_clock.now();
+ osd->defer_recovery_until += g_conf.osd_recovery_delay_start;
+ }
+ return true;
+ }
+ return false;
+ }
+ void _dequeue(PG *pg) {
+ pg->recovery_item.remove_myself();
+ }
+ PG * _dequeue() {
+ if (osd->recovery_queue.empty())
+ return NULL;
+
+ if (!osd->_recover_now())
+ return NULL;
+
+ PG *pg = osd->recovery_queue.front();
+ osd->recovery_queue.pop_front();
+ return pg;
+ }
+ void _process(PG *pg) {
+ osd->do_recovery(pg);
+ }
+ } recovery_wq;
- void queue_for_recovery(PG *pg);
+ bool queue_for_recovery(PG *pg);
void finish_recovery_op(PG *pg, int count, bool more);
void defer_recovery(PG *pg);
- void _do_recovery();
- void recovery_entry();
+ void do_recovery(PG *pg);
bool _recover_now();
- void kick_recovery() {
- recovery_lock.Lock();
- recovery_cond.Signal();
- recovery_lock.Unlock();
- }
- void stop_recovery_thread() {
- osd_lock.Unlock();
- recovery_lock.Lock();
- recovery_stop = true;
- recovery_cond.Signal();
- recovery_lock.Unlock();
- recovery_thread.join();
- osd_lock.Lock();
- }
- void pause_recovery_thread() {
- recovery_lock.Lock();
- recovery_pause = true;
- recovery_lock.Unlock();
- }
- void unpause_recovery_thread() {
- recovery_lock.Lock();
- recovery_pause = false;
- recovery_cond.Signal();
- recovery_lock.Unlock();
- }
+
+ Mutex remove_list_lock;
+ map<epoch_t, map<int, vector<pg_t> > > remove_list;
void queue_for_removal(int osd, pg_t pgid) {
remove_list_lock.Lock();
remove_list_lock.Unlock();
}
- struct RecoveryThread : public Thread {
- OSD *osd;
- RecoveryThread(OSD *o) : osd(o) {}
- void *entry() {
- osd->recovery_entry();
- return 0;
- }
- } recovery_thread;
-
- struct C_StartRecovery : public Context {
- OSD *osd;
- C_StartRecovery(OSD *o) : osd(o) {}
- void finish(int r) {
- osd->kick_recovery();
- }
- };
-
void activate_pg(pg_t pgid, epoch_t epoch);
class C_Activate : public Context {
OSD *osd;
SnapTrimWQ(OSD *o) : WorkQueue<PG>("OSD::SnapTrimWQ"), osd(o) {}
- void _enqueue(PG *pg) {
+ bool _enqueue(PG *pg) {
+ if (pg->snap_trim_item.is_on_xlist())
+ return false;
osd->snap_trim_queue.push_back(&pg->snap_trim_item);
+ return true;
}
void _dequeue(PG *pg) {
pg->snap_trim_item.remove_myself();
OSD *osd;
ScrubWQ(OSD *o) : WorkQueue<PG>("OSD::ScrubWQ"), osd(o) {}
- void _enqueue(PG *pg) {
+ bool _enqueue(PG *pg) {
+ if (pg->scrub_item.is_on_xlist())
+ return false;
osd->scrub_queue.push_back(&pg->scrub_item);
+ return true;
}
void _dequeue(PG *pg) {
pg->scrub_item.remove_myself();