// set default cost equal to 50MB io
OPTION(osd_scrub_cost, OPT_U32, 50<<20)
+OPTION(osd_recovery_priority, OPT_U32, 5)
+// set default cost equal to 20MB io
+OPTION(osd_recovery_cost, OPT_U32, 20<<20)
+
/**
* osd_recovery_op_warn_multiple scales the normal warning threshhold,
* osd_op_complaint_time, so that slow recovery ops won't cause noise
return pg->scrub(op.epoch_queued, handle);
}
+void PGQueueable::RunVis::operator()(const PGRecovery &op) {
+ /// TODO: need to handle paused recovery
+ return osd->do_recovery(pg.get(), op.epoch_queued, handle);
+}
+
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
monc(osd->monc),
op_wq(osd->op_shardedwq),
peering_wq(osd->peering_wq),
- recovery_wq(osd->recovery_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
- &osd->recovery_tp),
+ &osd->disk_tp),
op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
class_handler(osd->class_handler),
pg_epoch_lock("OSDService::pg_epoch_lock"),
return _add_map(map);
}
-bool OSDService::queue_for_recovery(PG *pg)
-{
- bool b = recovery_wq.queue(pg);
- if (b)
- dout(10) << "queue_for_recovery queued " << *pg << dendl;
- else
- dout(10) << "queue_for_recovery already queued " << *pg << dendl;
- return b;
-}
-
-
// ops
osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
- recovery_tp(cct, "OSD::recovery_tp", "tp_osd_recov", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
paused_recovery(false),
cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
+ recovery_lock("OSD::recovery_lock"),
recovery_ops_active(0),
- recovery_wq(
- this,
- cct->_conf->osd_recovery_thread_timeout,
- cct->_conf->osd_recovery_thread_suicide_timeout,
- &recovery_tp),
replay_queue_lock("OSD::replay_queue_lock"),
remove_wq(
store,
osd_tp.start();
osd_op_tp.start();
- recovery_tp.start();
disk_tp.start();
command_tp.start();
heartbeat_lock.Unlock();
heartbeat_thread.join();
- recovery_tp.drain();
- recovery_tp.stop();
- dout(10) << "recovery tp stopped" << dendl;
-
osd_tp.drain();
peering_wq.clear();
osd_tp.stop();
}
if (is_active()) {
- // periodically kick recovery work queue
- recovery_tp.wake();
+ if (!scrub_random_backoff()) {
+ sched_scrub();
+ }
check_replay_queue();
<< "to " << cct->_conf->osd_recovery_delay_start;
defer_recovery_until = ceph_clock_now(cct);
defer_recovery_until += cct->_conf->osd_recovery_delay_start;
- recovery_wq.wake();
+ /// TODO
}
else if (prefix == "cpu_profiler") {
if (!paused_recovery) {
dout(1) << "pausing recovery (NORECOVER flag set)" << dendl;
paused_recovery = true;
- recovery_tp.pause_new();
+ /// TODO
}
} else {
if (paused_recovery) {
dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl;
+ /// TODO
paused_recovery = false;
- recovery_tp.unpause();
}
}
return true;
}
-void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
+void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
{
if (g_conf->osd_recovery_sleep > 0) {
handle.suspend_tp_timeout();
}
// see how many we should try to start. note that this is a bit racy.
- recovery_wq.lock();
+ recovery_lock.Lock();
int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active,
cct->_conf->osd_recovery_max_single_start);
if (max > 0) {
dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
<< " rops)" << dendl;
}
- recovery_wq.unlock();
+ recovery_lock.Unlock();
if (max <= 0) {
dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
- recovery_wq.queue(pg);
+ service.queue_for_recovery(pg, true);
return;
} else {
pg->lock_suspend_timeout(handle);
- if (pg->deleting || !(pg->is_peered() && pg->is_primary())) {
+ if (pg->pg_has_reset_since(queued) ||
+ pg->deleting || !(pg->is_peered() && pg->is_primary())) {
pg->unlock();
goto out;
}
+ assert(pg->recovery_queued);
+ pg->recovery_queued = false;
+
dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
#ifdef DEBUG_RECOVERY_OIDS
dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
// If no recovery op is started, don't bother to manipulate the RecoveryCtx
if (!started && (more || !pg->have_unfound())) {
- pg->unlock();
goto out;
}
pg->discover_all_missing(*rctx.query_map);
if (rctx.query_map->empty()) {
dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
- recovery_wq.lock();
- recovery_wq._dequeue(pg);
- recovery_wq.unlock();
+ } else {
+ dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
+ pg->queue_recovery();
}
}
}
out:
- recovery_wq.lock();
+ recovery_lock.Lock();
if (max > 0) {
assert(recovery_ops_active >= max);
recovery_ops_active -= max;
}
- recovery_wq._wake();
- recovery_wq.unlock();
+ recovery_lock.Unlock();
}
void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
{
- recovery_wq.lock();
+ recovery_lock.Lock();
dout(10) << "start_recovery_op " << *pg << " " << soid
<< " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
<< dendl;
recovery_oids[pg->info.pgid].insert(soid);
#endif
- recovery_wq.unlock();
+ recovery_lock.Unlock();
}
void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
{
- recovery_wq.lock();
+ recovery_lock.Lock();
dout(10) << "finish_recovery_op " << *pg << " " << soid
<< " dequeue=" << dequeue
<< " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
recovery_oids[pg->info.pgid].erase(soid);
#endif
- if (dequeue)
- recovery_wq._dequeue(pg);
- else {
- recovery_wq._queue_front(pg);
- }
-
- recovery_wq._wake();
- recovery_wq.unlock();
+ recovery_lock.Unlock();
}
// =========================================================
return 0;
}
-bool OSD::RecoveryWQ::_enqueue(PG *pg) {
- if (!pg->recovery_item.is_on_list()) {
- pg->get("RecoveryWQ");
- osd->recovery_queue.push_back(&pg->recovery_item);
-
- if (osd->cct->_conf->osd_recovery_delay_start > 0) {
- osd->defer_recovery_until = ceph_clock_now(osd->cct);
- osd->defer_recovery_until += osd->cct->_conf->osd_recovery_delay_start;
- }
- return true;
- }
- return false;
-}
-
void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
set<PG*> got;
for (list<PG*>::iterator i = peering_queue.begin();
}
};
+struct PGRecovery {
+ epoch_t epoch_queued;
+ PGRecovery(epoch_t e) : epoch_queued(e) {}
+ ostream &operator<<(ostream &rhs) {
+ return rhs << "PGRecovery";
+ }
+};
+
+
class PGQueueable {
typedef boost::variant<
OpRequestRef,
PGSnapTrim,
- PGScrub
+ PGScrub,
+ PGRecovery
> QVariant;
QVariant qvariant;
int cost;
void operator()(const OpRequestRef &op);
void operator()(const PGSnapTrim &op);
void operator()(const PGScrub &op);
+ void operator()(const PGRecovery &op);
};
public:
// cppcheck-suppress noExplicitConstructor
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
- boost::optional<OpRequestRef> maybe_get_op() {
- OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
- return op ? *op : boost::optional<OpRequestRef>();
+ PGQueueable(
+ const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
+ const entity_inst_t &owner)
+ : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+ owner(owner) {}
+ const boost::optional<OpRequestRef> maybe_get_op() const {
+ const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
+ return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
}
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
MonClient *&monc;
ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
- ThreadPool::WorkQueue<PG> &recovery_wq;
GenContextWQ recovery_gen_wq;
GenContextWQ op_gen_wq;
ClassHandler *&class_handler;
void send_pg_temp();
void queue_for_peering(PG *pg);
- bool queue_for_recovery(PG *pg);
+ void queue_for_recovery(PG *pg, bool front = false) {
+ pair<PGRef, PGQueueable> to_queue = make_pair(
+ pg,
+ PGQueueable(
+ PGRecovery(pg->get_osdmap()->get_epoch()),
+ cct->_conf->osd_recovery_cost,
+ cct->_conf->osd_recovery_priority,
+ ceph_clock_now(cct),
+ entity_inst_t()));
+ if (front) {
+ op_wq.queue_front(to_queue);
+ } else {
+ op_wq.queue(to_queue);
+ }
+ }
void queue_for_snap_trim(PG *pg) {
op_wq.queue(
make_pair(
ThreadPool osd_tp;
ShardedThreadPool osd_op_tp;
- ThreadPool recovery_tp;
ThreadPool disk_tp;
ThreadPool command_tp;
void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
// -- pg recovery --
- xlist<PG*> recovery_queue;
+ Mutex recovery_lock;
utime_t defer_recovery_until;
int recovery_ops_active;
#ifdef DEBUG_RECOVERY_OIDS
map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
#endif
- struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
- OSD *osd;
- RecoveryWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, si, tp), osd(o) {}
-
- bool _empty() {
- return osd->recovery_queue.empty();
- }
- bool _enqueue(PG *pg);
- void _dequeue(PG *pg) {
- if (pg->recovery_item.remove_myself())
- pg->put("RecoveryWQ");
- }
- PG *_dequeue() {
- if (osd->recovery_queue.empty())
- return NULL;
-
- if (!osd->_recover_now())
- return NULL;
-
- PG *pg = osd->recovery_queue.front();
- osd->recovery_queue.pop_front();
- return pg;
- }
- void _queue_front(PG *pg) {
- if (!pg->recovery_item.is_on_list()) {
- pg->get("RecoveryWQ");
- osd->recovery_queue.push_front(&pg->recovery_item);
- }
- }
- void _process(PG *pg, ThreadPool::TPHandle &handle) override {
- osd->do_recovery(pg, handle);
- pg->put("RecoveryWQ");
- }
- void _clear() {
- while (!osd->recovery_queue.empty()) {
- PG *pg = osd->recovery_queue.front();
- osd->recovery_queue.pop_front();
- pg->put("RecoveryWQ");
- }
- }
- } recovery_wq;
-
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
- void do_recovery(PG *pg, ThreadPool::TPHandle &handle);
+ void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle);
bool _recover_now();
// replay / delayed pg activation
coll(p), pg_log(cct),
pgmeta_oid(p.make_pgmeta_oid()),
missing_loc(this),
- recovery_item(this), stat_queue_item(this),
+ stat_queue_item(this),
snap_trim_queued(false),
scrub_queued(false),
+ recovery_queued(false),
recovery_ops_active(0),
role(-1),
state(0),
scrubber.reserved_peers.clear();
scrub_after_recovery = false;
- osd->recovery_wq.dequeue(this);
-
agent_clear();
}
build_might_have_unfound();
state_set(PG_STATE_DEGRADED);
- dout(10) << "activate - starting recovery" << dendl;
- osd->queue_for_recovery(this);
if (have_unfound())
discover_all_missing(query_map);
}
}
}
+void PG::queue_recovery(bool front)
+{
+ if (!is_primary() || !is_peered()) {
+ dout(10) << "queue_recovery -- not primary or not peered " << dendl;
+ assert(!recovery_queued);
+ } else if (recovery_queued) {
+ dout(10) << "queue_recovery -- already queued" << dendl;
+ } else {
+ dout(10) << "queue_recovery -- queuing" << dendl;
+ recovery_queued = true;
+ osd->queue_for_recovery(this, front);
+ }
+}
+
bool PG::queue_scrub()
{
assert(_lock.is_locked());
#endif
// TODOSAM: osd->osd-> not good
osd->osd->finish_recovery_op(this, soid, dequeue);
+
+ if (!dequeue) {
+ }
}
static void split_replay_queue(
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->backfill_reserved = true;
- pg->osd->queue_for_recovery(pg);
+ pg->queue_recovery();
pg->state_clear(PG_STATE_BACKFILL_TOOFULL);
pg->state_clear(PG_STATE_BACKFILL_WAIT);
pg->state_set(PG_STATE_BACKFILL);
}
}
- pg->osd->recovery_wq.dequeue(pg);
-
pg->waiting_on_backfill.clear();
pg->finish_recovery_op(hobject_t::get_max());
PG *pg = context< RecoveryMachine >().pg;
pg->state_clear(PG_STATE_RECOVERY_WAIT);
pg->state_set(PG_STATE_RECOVERING);
- pg->osd->queue_for_recovery(pg);
+ pg->queue_recovery();
}
void PG::RecoveryState::Recovering::release_reservations()
if (!pg->is_clean() &&
!pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
(!pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) || pg->is_degraded())) {
- pg->osd->queue_for_recovery(pg);
+ pg->queue_recovery();
}
return forward_event();
}
logevt.from,
context< RecoveryMachine >().get_recovery_ctx());
if (got_missing)
- pg->osd->queue_for_recovery(pg);
+ pg->queue_recovery();
return discard_event();
}
/* You should not use these items without taking their respective queue locks
* (if they have one) */
- xlist<PG*>::item recovery_item, stat_queue_item;
+ xlist<PG*>::item stat_queue_item;
bool snap_trim_queued;
bool scrub_queued;
+ bool recovery_queued;
int recovery_ops_active;
set<pg_shard_t> waiting_on_backfill;
void queue_snap_trim();
bool requeue_scrub();
+ void queue_recovery(bool front = false);
bool queue_scrub();
unsigned get_scrub_priority();
void operator()() {
pg->requeue_ops(pg->waiting_for_all_missing);
pg->waiting_for_all_missing.clear();
- pg->osd->queue_for_recovery(pg);
+ pg->queue_recovery();
}
};
submit_log_entries(
[=]() {
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
- osd->queue_for_recovery(this);
+ queue_recovery();
stringstream ss;
ss << "pg has " << num_unfound
dout(10) << "on_shutdown" << dendl;
// remove from queues
- osd->recovery_wq.dequeue(this);
osd->pg_stat_queue_dequeue(this);
osd->dequeue_pg(this, 0);
osd->peering_wq.dequeue(this);
&requeue_recovery,
&requeue_snaptrim);
if (requeue_recovery)
- osd->recovery_wq.queue(this);
+ queue_recovery();
if (requeue_snaptrim)
queue_snap_trim();