read_latency_calc(g_conf.osd_max_opq<1 ? 1:g_conf.osd_max_opq),
qlen_calc(3),
iat_averager(g_conf.osd_flash_crowd_iat_alpha),
- snap_trimmer_thread(this)
+ snap_trimmer_thread(this),
+ recovery_ops_active(0), recovery_stop(false), recovery_thread(this)
{
messenger = m;
monmap = mm;
booting = true;
do_mon_report(); // start mon report timer
+ recovery_thread.create();
+
// start the heartbeat
timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Heartbeat(this));
timer.cancel_all();
timer.join();
- // flush data to disk
osd_lock.Unlock();
+
+ // flush data to disk
store->sync();
+
+ stop_recovery_thread();
+
osd_lock.Lock();
// finish ops
logger->fset("rdlatm", my_stat.read_latency_mine);
logger->fset("fshdin", my_stat.frac_rd_ops_shed_in);
logger->fset("fshdout", my_stat.frac_rd_ops_shed_out);
- dout(12) << "_refresh_my_stat " << my_stat << dendl;
+ dout(30) << "_refresh_my_stat " << my_stat << dendl;
stat_rd_ops = 0;
stat_rd_ops_shed_in = 0;
}
pg_stat_queue_lock.Lock();
if (!pg_stat_pending.empty() || osd_stat_pending) {
- dout(10) << "requeueing pg_stat_pending" << dendl;
+ dout(30) << "requeueing pg_stat_pending" << dendl;
retry = true;
osd_stat_updated = osd_stat_updated || osd_stat_pending;
osd_stat_pending = false;
{
assert(osd_lock.is_locked());
- dout(10) << "send_pg_stats" << dendl;
+ dout(20) << "send_pg_stats" << dendl;
// grab queue
assert(pg_stat_pending.empty());
if (g_conf.osd_recovery_delay_start > 0) {
defer_recovery_until = g_clock.now();
defer_recovery_until += g_conf.osd_recovery_delay_start;
- }
-
- recovery_lock.Unlock();
-
- if (defer_recovery_until == utime_t())
- maybe_start_recovery();
- else
timer.add_event_at(defer_recovery_until, new C_StartRecovery(this));
+ } else if (_recover_now())
+ recovery_cond.Signal();
+ recovery_lock.Unlock();
}
-void OSD::maybe_start_recovery()
+bool OSD::_recover_now()
{
- if (g_clock.now() > defer_recovery_until &&
- recovery_ops_active == 0) {
- dout(10) << "maybe_start_recovery starting" << dendl;
- do_recovery();
+ if (recovering_pgs.empty())
+ return false;
+
+ if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
+ dout(15) << "_recover_now max " << g_conf.osd_recovery_max_active << " active" << dendl;
+ return false;
+ }
+ if (g_clock.now() >= defer_recovery_until) {
+ dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
+ return false;
}
-}
-void OSD::do_recovery()
+ return true;
+}
+void OSD::_do_recovery()
{
- while (1) {
- recovery_lock.Lock();
- if (recovering_pgs.empty()) {
- dout(10) << "do_recovery -- no recoverying pgs, nothing to do" << dendl;
- recovery_lock.Unlock();
- return;
- }
- if (recovery_ops_active >= g_conf.osd_recovery_max_active) {
- recovery_lock.Unlock();
- return;
- }
- int max = g_conf.osd_recovery_max_active - recovery_ops_active;
-
- PG *pg = recovering_pgs.front();
- pg->get();
+ assert(recovery_lock.is_locked());
- dout(10) << "do_recovery starting " << max
- << " (" << recovery_ops_active
- << "/" << g_conf.osd_recovery_max_active << " active) on "
- << *pg << dendl;
+ int max = g_conf.osd_recovery_max_active - recovery_ops_active;
+
+ PG *pg = recovering_pgs.front();
+ pg->get();
+
+ dout(10) << "do_recovery starting " << max
+ << " (" << recovery_ops_active
+ << "/" << g_conf.osd_recovery_max_active << " active) on "
+ << *pg << dendl;
+
+ recovery_lock.Unlock();
+
+ pg->lock();
+ int started = pg->start_recovery_ops(max);
+ recovery_ops_active += started;
+ pg->recovery_ops_active += started;
+ if (started < max)
+ pg->recovery_item.remove_myself();
+ pg->put_unlock();
+
+ recovery_lock.Lock();
+}
- recovery_lock.Unlock();
-
- pg->lock();
- int started = pg->start_recovery_ops(max);
- recovery_ops_active += started;
- pg->recovery_ops_active += started;
- if (started < max)
- pg->recovery_item.remove_myself();
- pg->put_unlock();
+void OSD::recovery_entry()
+{
+ recovery_lock.Lock();
+ dout(10) << "recovery_entry - start" << dendl;
+ while (!recovery_stop) {
+ while (_recover_now())
+ _do_recovery();
+ recovery_cond.Wait(recovery_lock);
}
+ dout(10) << "recovery_entry - done" << dendl;
+ recovery_lock.Unlock();
}
+
+
void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
{
dout(10) << "finish_recovery_op " << *pg << " count " << count
else
recovering_pgs.push_front(&pg->recovery_item); // requeue
+ recovery_cond.Signal();
recovery_lock.Unlock();
-
- // continue recovery?
- if (count && g_clock.now() > defer_recovery_until)
- do_recovery();
}
void OSD::defer_recovery(PG *pg)
// -- pg recovery --
Mutex recovery_lock;
+ Cond recovery_cond;
xlist<PG*> recovering_pgs;
utime_t defer_recovery_until;
int recovery_ops_active;
+ bool recovery_stop;
Mutex remove_list_lock;
map<epoch_t, map<int, vector<pg_t> > > remove_list;
void queue_for_recovery(PG *pg);
- void maybe_start_recovery();
- void do_recovery();
void finish_recovery_op(PG *pg, int count, bool more);
void defer_recovery(PG *pg);
+ void _do_recovery();
+ void recovery_entry();
+ bool _recover_now();
+ void kick_recovery() {
+ recovery_lock.Lock();
+ recovery_cond.Signal();
+ recovery_lock.Unlock();
+ }
+ void stop_recovery_thread() {
+ recovery_lock.Lock();
+ recovery_stop = true;
+ recovery_cond.Signal();
+ recovery_lock.Unlock();
+ recovery_thread.join();
+ }
void queue_for_removal(int osd, pg_t pgid) {
remove_list_lock.Lock();
remove_list_lock.Unlock();
}
+ struct RecoveryThread : public Thread {
+ OSD *osd;
+ RecoveryThread(OSD *o) : osd(o) {}
+ void *entry() {
+ osd->recovery_entry();
+ return 0;
+ }
+ } recovery_thread;
+
struct C_StartRecovery : public Context {
OSD *osd;
C_StartRecovery(OSD *o) : osd(o) {}
void finish(int r) {
- osd->maybe_start_recovery();
+ osd->kick_recovery();
}
};