From: Samuel Just Date: Fri, 12 Dec 2014 22:06:27 +0000 (-0800) Subject: osd/: move recovery into opwq X-Git-Tag: v11.0.0~465^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=28cc10f5f7720cbf6a08cfd049a967fde444fa88;p=ceph.git osd/: move recovery into opwq Signed-off-by: Samuel Just --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 67d729ce3370..2da949a2edd8 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -865,6 +865,10 @@ OPTION(osd_scrub_priority, OPT_U32, 5) // set default cost equal to 50MB io OPTION(osd_scrub_cost, OPT_U32, 50<<20) +OPTION(osd_recovery_priority, OPT_U32, 5) +// set default cost equal to 20MB io +OPTION(osd_recovery_cost, OPT_U32, 20<<20) + /** * osd_recovery_op_warn_multiple scales the normal warning threshhold, * osd_op_complaint_time, so that slow recovery ops won't cause noise diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 973f3eb2cd90..c57a776e8daa 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -171,6 +171,11 @@ void PGQueueable::RunVis::operator()(const PGScrub &op) { return pg->scrub(op.epoch_queued, handle); } +void PGQueueable::RunVis::operator()(const PGRecovery &op) { + /// TODO: need to handle paused recovery + return osd->do_recovery(pg.get(), op.epoch_queued, handle); +} + //Initial features in new superblock. //Features here are also automatically upgraded CompatSet OSD::get_osd_initial_compat_set() { @@ -215,9 +220,8 @@ OSDService::OSDService(OSD *osd) : monc(osd->monc), op_wq(osd->op_shardedwq), peering_wq(osd->peering_wq), - recovery_wq(osd->recovery_wq), recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout, - &osd->recovery_tp), + &osd->disk_tp), op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp), class_handler(osd->class_handler), pg_epoch_lock("OSDService::pg_epoch_lock"), @@ -1356,17 +1360,6 @@ OSDMapRef OSDService::try_get_map(epoch_t epoch) return _add_map(map); } -bool OSDService::queue_for_recovery(PG *pg) -{ - bool b = recovery_wq.queue(pg); - if (b) - dout(10) << "queue_for_recovery queued " << *pg << dendl; - else - dout(10) << "queue_for_recovery already queued " << *pg << dendl; - return b; -} - - // ops @@ -1659,7 +1652,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"), osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp", cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards), - recovery_tp(cct, "OSD::recovery_tp", "tp_osd_recov", cct->_conf->osd_recovery_threads, "osd_recovery_threads"), disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"), command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1), paused_recovery(false), @@ -1705,12 +1697,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_command_thread_timeout, cct->_conf->osd_command_thread_suicide_timeout, &command_tp), + recovery_lock("OSD::recovery_lock"), recovery_ops_active(0), - recovery_wq( - this, - cct->_conf->osd_recovery_thread_timeout, - cct->_conf->osd_recovery_thread_suicide_timeout, - &recovery_tp), replay_queue_lock("OSD::replay_queue_lock"), remove_wq( store, @@ -2194,7 +2182,6 @@ int OSD::init() osd_tp.start(); osd_op_tp.start(); - recovery_tp.start(); disk_tp.start(); command_tp.start(); @@ -2684,10 +2671,6 @@ int OSD::shutdown() heartbeat_lock.Unlock(); heartbeat_thread.join(); - recovery_tp.drain(); - recovery_tp.stop(); - dout(10) << "recovery tp stopped" << dendl; - osd_tp.drain(); peering_wq.clear(); osd_tp.stop(); @@ -4340,8 +4323,9 @@ void OSD::tick() } if (is_active()) { - // periodically kick recovery work queue - recovery_tp.wake(); + if (!scrub_random_backoff()) { + sched_scrub(); + } check_replay_queue(); @@ -5768,7 +5752,7 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe << "to " << cct->_conf->osd_recovery_delay_start; defer_recovery_until = ceph_clock_now(cct); defer_recovery_until += cct->_conf->osd_recovery_delay_start; - recovery_wq.wake(); + /// TODO } else if (prefix == "cpu_profiler") { @@ -7374,13 +7358,13 @@ void OSD::activate_map() if (!paused_recovery) { dout(1) << "pausing recovery (NORECOVER flag set)" << dendl; paused_recovery = true; - recovery_tp.pause_new(); + /// TODO } } else { if (paused_recovery) { dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl; + /// TODO paused_recovery = false; - recovery_tp.unpause(); } } @@ -8375,7 +8359,7 @@ bool OSD::_recover_now() return true; } -void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) +void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle) { if (g_conf->osd_recovery_sleep > 0) { handle.suspend_tp_timeout(); @@ -8387,7 +8371,7 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) } // see how many we should try to start. note that this is a bit racy. - recovery_wq.lock(); + recovery_lock.Lock(); int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active, cct->_conf->osd_recovery_max_single_start); if (max > 0) { @@ -8398,19 +8382,23 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)" << dendl; } - recovery_wq.unlock(); + recovery_lock.Unlock(); if (max <= 0) { dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl; - recovery_wq.queue(pg); + service.queue_for_recovery(pg, true); return; } else { pg->lock_suspend_timeout(handle); - if (pg->deleting || !(pg->is_peered() && pg->is_primary())) { + if (pg->pg_has_reset_since(queued) || + pg->deleting || !(pg->is_peered() && pg->is_primary())) { pg->unlock(); goto out; } + assert(pg->recovery_queued); + pg->recovery_queued = false; + dout(10) << "do_recovery starting " << max << " " << *pg << dendl; #ifdef DEBUG_RECOVERY_OIDS dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl; @@ -8421,7 +8409,6 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl; // If no recovery op is started, don't bother to manipulate the RecoveryCtx if (!started && (more || !pg->have_unfound())) { - pg->unlock(); goto out; } @@ -8438,9 +8425,9 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) pg->discover_all_missing(*rctx.query_map); if (rctx.query_map->empty()) { dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl; - recovery_wq.lock(); - recovery_wq._dequeue(pg); - recovery_wq.unlock(); + } else { + dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl; + pg->queue_recovery(); } } @@ -8451,18 +8438,17 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) } out: - recovery_wq.lock(); + recovery_lock.Lock(); if (max > 0) { assert(recovery_ops_active >= max); recovery_ops_active -= max; } - recovery_wq._wake(); - recovery_wq.unlock(); + recovery_lock.Unlock(); } void OSD::start_recovery_op(PG *pg, const hobject_t& soid) { - recovery_wq.lock(); + recovery_lock.Lock(); dout(10) << "start_recovery_op " << *pg << " " << soid << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)" << dendl; @@ -8475,12 +8461,12 @@ void OSD::start_recovery_op(PG *pg, const hobject_t& soid) recovery_oids[pg->info.pgid].insert(soid); #endif - recovery_wq.unlock(); + recovery_lock.Unlock(); } void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) { - recovery_wq.lock(); + recovery_lock.Lock(); dout(10) << "finish_recovery_op " << *pg << " " << soid << " dequeue=" << dequeue << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)" @@ -8496,14 +8482,7 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) recovery_oids[pg->info.pgid].erase(soid); #endif - if (dequeue) - recovery_wq._dequeue(pg); - else { - recovery_wq._queue_front(pg); - } - - recovery_wq._wake(); - recovery_wq.unlock(); + recovery_lock.Unlock(); } // ========================================================= @@ -9338,20 +9317,6 @@ int OSD::init_op_flags(OpRequestRef& op) return 0; } -bool OSD::RecoveryWQ::_enqueue(PG *pg) { - if (!pg->recovery_item.is_on_list()) { - pg->get("RecoveryWQ"); - osd->recovery_queue.push_back(&pg->recovery_item); - - if (osd->cct->_conf->osd_recovery_delay_start > 0) { - osd->defer_recovery_until = ceph_clock_now(osd->cct); - osd->defer_recovery_until += osd->cct->_conf->osd_recovery_delay_start; - } - return true; - } - return false; -} - void OSD::PeeringWQ::_dequeue(list *out) { set got; for (list::iterator i = peering_queue.begin(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index fd40d3b1867b..87dc5522ea6a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -345,11 +345,21 @@ struct PGSnapTrim { } }; +struct PGRecovery { + epoch_t epoch_queued; + PGRecovery(epoch_t e) : epoch_queued(e) {} + ostream &operator<<(ostream &rhs) { + return rhs << "PGRecovery"; + } +}; + + class PGQueueable { typedef boost::variant< OpRequestRef, PGSnapTrim, - PGScrub + PGScrub, + PGRecovery > QVariant; QVariant qvariant; int cost; @@ -365,6 +375,7 @@ class PGQueueable { void operator()(const OpRequestRef &op); void operator()(const PGSnapTrim &op); void operator()(const PGScrub &op); + void operator()(const PGRecovery &op); }; public: // cppcheck-suppress noExplicitConstructor @@ -384,9 +395,14 @@ public: const entity_inst_t &owner) : qvariant(op), cost(cost), priority(priority), start_time(start_time), owner(owner) {} - boost::optional maybe_get_op() { - OpRequestRef *op = boost::get(&qvariant); - return op ? *op : boost::optional(); + PGQueueable( + const PGRecovery &op, int cost, unsigned priority, utime_t start_time, + const entity_inst_t &owner) + : qvariant(op), cost(cost), priority(priority), start_time(start_time), + owner(owner) {} + const boost::optional maybe_get_op() const { + const OpRequestRef *op = boost::get(&qvariant); + return op ? OpRequestRef(*op) : boost::optional(); } void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { RunVis v(osd, pg, handle); @@ -419,7 +435,6 @@ public: MonClient *&monc; ShardedThreadPool::ShardedWQ < pair > &op_wq; ThreadPool::BatchWorkQueue &peering_wq; - ThreadPool::WorkQueue &recovery_wq; GenContextWQ recovery_gen_wq; GenContextWQ op_gen_wq; ClassHandler *&class_handler; @@ -868,7 +883,21 @@ public: void send_pg_temp(); void queue_for_peering(PG *pg); - bool queue_for_recovery(PG *pg); + void queue_for_recovery(PG *pg, bool front = false) { + pair to_queue = make_pair( + pg, + PGQueueable( + PGRecovery(pg->get_osdmap()->get_epoch()), + cct->_conf->osd_recovery_cost, + cct->_conf->osd_recovery_priority, + ceph_clock_now(cct), + entity_inst_t())); + if (front) { + op_wq.queue_front(to_queue); + } else { + op_wq.queue(to_queue); + } + } void queue_for_snap_trim(PG *pg) { op_wq.queue( make_pair( @@ -1278,7 +1307,6 @@ private: ThreadPool osd_tp; ShardedThreadPool osd_op_tp; - ThreadPool recovery_tp; ThreadPool disk_tp; ThreadPool command_tp; @@ -2219,59 +2247,16 @@ protected: void do_command(Connection *con, ceph_tid_t tid, vector& cmd, bufferlist& data); // -- pg recovery -- - xlist recovery_queue; + Mutex recovery_lock; utime_t defer_recovery_until; int recovery_ops_active; #ifdef DEBUG_RECOVERY_OIDS map > recovery_oids; #endif - struct RecoveryWQ : public ThreadPool::WorkQueue { - OSD *osd; - RecoveryWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) - : ThreadPool::WorkQueue("OSD::RecoveryWQ", ti, si, tp), osd(o) {} - - bool _empty() { - return osd->recovery_queue.empty(); - } - bool _enqueue(PG *pg); - void _dequeue(PG *pg) { - if (pg->recovery_item.remove_myself()) - pg->put("RecoveryWQ"); - } - PG *_dequeue() { - if (osd->recovery_queue.empty()) - return NULL; - - if (!osd->_recover_now()) - return NULL; - - PG *pg = osd->recovery_queue.front(); - osd->recovery_queue.pop_front(); - return pg; - } - void _queue_front(PG *pg) { - if (!pg->recovery_item.is_on_list()) { - pg->get("RecoveryWQ"); - osd->recovery_queue.push_front(&pg->recovery_item); - } - } - void _process(PG *pg, ThreadPool::TPHandle &handle) override { - osd->do_recovery(pg, handle); - pg->put("RecoveryWQ"); - } - void _clear() { - while (!osd->recovery_queue.empty()) { - PG *pg = osd->recovery_queue.front(); - osd->recovery_queue.pop_front(); - pg->put("RecoveryWQ"); - } - } - } recovery_wq; - void start_recovery_op(PG *pg, const hobject_t& soid); void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); - void do_recovery(PG *pg, ThreadPool::TPHandle &handle); + void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle); bool _recover_now(); // replay / delayed pg activation diff --git a/src/osd/PG.cc b/src/osd/PG.cc index fa6b8375bba6..a8c78164d6c2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -210,9 +210,10 @@ PG::PG(OSDService *o, OSDMapRef curmap, coll(p), pg_log(cct), pgmeta_oid(p.make_pgmeta_oid()), missing_loc(this), - recovery_item(this), stat_queue_item(this), + stat_queue_item(this), snap_trim_queued(false), scrub_queued(false), + recovery_queued(false), recovery_ops_active(0), role(-1), state(0), @@ -949,8 +950,6 @@ void PG::clear_primary_state() scrubber.reserved_peers.clear(); scrub_after_recovery = false; - osd->recovery_wq.dequeue(this); - agent_clear(); } @@ -1847,8 +1846,6 @@ void PG::activate(ObjectStore::Transaction& t, build_might_have_unfound(); state_set(PG_STATE_DEGRADED); - dout(10) << "activate - starting recovery" << dendl; - osd->queue_for_recovery(this); if (have_unfound()) discover_all_missing(query_map); } @@ -2069,6 +2066,20 @@ bool PG::requeue_scrub() } } +void PG::queue_recovery(bool front) +{ + if (!is_primary() || !is_peered()) { + dout(10) << "queue_recovery -- not primary or not peered " << dendl; + assert(!recovery_queued); + } else if (recovery_queued) { + dout(10) << "queue_recovery -- already queued" << dendl; + } else { + dout(10) << "queue_recovery -- queuing" << dendl; + recovery_queued = true; + osd->queue_for_recovery(this, front); + } +} + bool PG::queue_scrub() { assert(_lock.is_locked()); @@ -2229,6 +2240,9 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) #endif // TODOSAM: osd->osd-> not good osd->osd->finish_recovery_op(this, soid, dequeue); + + if (!dequeue) { + } } static void split_replay_queue( @@ -6034,7 +6048,7 @@ PG::RecoveryState::Backfilling::Backfilling(my_context ctx) context< RecoveryMachine >().log_enter(state_name); PG *pg = context< RecoveryMachine >().pg; pg->backfill_reserved = true; - pg->osd->queue_for_recovery(pg); + pg->queue_recovery(); pg->state_clear(PG_STATE_BACKFILL_TOOFULL); pg->state_clear(PG_STATE_BACKFILL_WAIT); pg->state_set(PG_STATE_BACKFILL); @@ -6063,8 +6077,6 @@ PG::RecoveryState::Backfilling::react(const RemoteReservationRejected &) } } - pg->osd->recovery_wq.dequeue(pg); - pg->waiting_on_backfill.clear(); pg->finish_recovery_op(hobject_t::get_max()); @@ -6466,7 +6478,7 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx) PG *pg = context< RecoveryMachine >().pg; pg->state_clear(PG_STATE_RECOVERY_WAIT); pg->state_set(PG_STATE_RECOVERING); - pg->osd->queue_for_recovery(pg); + pg->queue_recovery(); } void PG::RecoveryState::Recovering::release_reservations() @@ -6734,7 +6746,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&) if (!pg->is_clean() && !pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) && (!pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) || pg->is_degraded())) { - pg->osd->queue_for_recovery(pg); + pg->queue_recovery(); } return forward_event(); } @@ -6800,7 +6812,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt logevt.from, context< RecoveryMachine >().get_recovery_ctx()); if (got_missing) - pg->osd->queue_for_recovery(pg); + pg->queue_recovery(); return discard_event(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index efcd99d63348..f86bcfb5aede 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -488,9 +488,10 @@ public: /* You should not use these items without taking their respective queue locks * (if they have one) */ - xlist::item recovery_item, stat_queue_item; + xlist::item stat_queue_item; bool snap_trim_queued; bool scrub_queued; + bool recovery_queued; int recovery_ops_active; set waiting_on_backfill; @@ -2258,6 +2259,7 @@ public: void queue_snap_trim(); bool requeue_scrub(); + void queue_recovery(bool front = false); bool queue_scrub(); unsigned get_scrub_priority(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 43ef40ffb998..5a1f037b2e0d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -9885,7 +9885,7 @@ void ReplicatedPG::mark_all_unfound_lost( void operator()() { pg->requeue_ops(pg->waiting_for_all_missing); pg->waiting_for_all_missing.clear(); - pg->osd->queue_for_recovery(pg); + pg->queue_recovery(); } }; submit_log_entries( @@ -9895,7 +9895,7 @@ void ReplicatedPG::mark_all_unfound_lost( [=]() { requeue_ops(waiting_for_all_missing); waiting_for_all_missing.clear(); - osd->queue_for_recovery(this); + queue_recovery(); stringstream ss; ss << "pg has " << num_unfound @@ -10026,7 +10026,6 @@ void ReplicatedPG::on_shutdown() dout(10) << "on_shutdown" << dendl; // remove from queues - osd->recovery_wq.dequeue(this); osd->pg_stat_queue_dequeue(this); osd->dequeue_pg(this, 0); osd->peering_wq.dequeue(this); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index fc5e5d466731..d0cb85535eeb 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -842,7 +842,7 @@ protected: &requeue_recovery, &requeue_snaptrim); if (requeue_recovery) - osd->recovery_wq.queue(this); + queue_recovery(); if (requeue_snaptrim) queue_snap_trim();