From: Samuel Just Date: Fri, 15 May 2015 17:34:29 +0000 (-0700) Subject: osd/: convert scrub to use the OpWQ X-Git-Tag: v9.0.2~48^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0f9979607629a0844f88b05140cb20bad4f48ea5;p=ceph.git osd/: convert scrub to use the OpWQ Fixes: 8635 Signed-off-by: Samuel Just --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 98f15ff550d6..998e68c61b5c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -588,9 +588,6 @@ OPTION(osd_recovery_thread_timeout, OPT_INT, 30) OPTION(osd_recovery_thread_suicide_timeout, OPT_INT, 300) OPTION(osd_recovery_sleep, OPT_FLOAT, 0) // seconds to sleep between recovery ops OPTION(osd_snap_trim_sleep, OPT_FLOAT, 0) -OPTION(osd_scrub_thread_timeout, OPT_INT, 60) -OPTION(osd_scrub_thread_suicide_timeout, OPT_INT, 60) -OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10) OPTION(osd_scrub_invalid_stats, OPT_BOOL, true) OPTION(osd_remove_thread_timeout, OPT_INT, 60*60) OPTION(osd_remove_thread_suicide_timeout, OPT_INT, 10*60*60) @@ -753,6 +750,10 @@ OPTION(osd_recovery_op_priority, OPT_U32, 10) OPTION(osd_snap_trim_priority, OPT_U32, 5) OPTION(osd_snap_trim_cost, OPT_U32, 1<<20) // set default cost equal to 1MB io +OPTION(osd_scrub_priority, OPT_U32, 5) +// set default cost equal to 50MB io +OPTION(osd_scrub_cost, OPT_U32, 50<<20) + /** * osd_recovery_op_warn_multiple scales the normal warning threshhold, * osd_op_complaint_time, so that slow recovery ops won't cause noise diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8c810f0fe4f3..b0d9d279423c 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -160,6 +160,10 @@ void PGQueueable::RunVis::operator()(PGSnapTrim &op) { return pg->snap_trimmer(op.epoch_queued); } +void PGQueueable::RunVis::operator()(PGScrub &op) { + return pg->scrub(op.epoch_queued, handle); +} + //Initial features in new superblock. //Features here are also automatically upgraded CompatSet OSD::get_osd_initial_compat_set() { @@ -204,7 +208,6 @@ OSDService::OSDService(OSD *osd) : op_wq(osd->op_shardedwq), peering_wq(osd->peering_wq), recovery_wq(osd->recovery_wq), - scrub_wq(osd->scrub_wq), recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp), op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp), @@ -1544,11 +1547,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_recovery_thread_suicide_timeout, &recovery_tp), replay_queue_lock("OSD::replay_queue_lock"), - scrub_wq( - this, - cct->_conf->osd_scrub_thread_timeout, - cct->_conf->osd_scrub_thread_suicide_timeout, - &disk_tp), remove_wq( store, cct->_conf->osd_remove_thread_timeout, @@ -5581,6 +5579,8 @@ epoch_t op_required_epoch(OpRequestRef op) return replica_op_required_epoch(op); case MSG_OSD_EC_READ_REPLY: return replica_op_required_epoch(op); + case MSG_OSD_REP_SCRUB: + return replica_op_required_epoch(op); default: assert(0); return 0; @@ -5696,6 +5696,7 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap) break; case MSG_OSD_REP_SCRUB: handle_replica_op(op, osdmap); + break; default: assert(0); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index d01d8f3ea936..df968594edf6 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -317,18 +317,27 @@ typedef ceph::shared_ptr DeletingStateRef; class OSD; +struct PGScrub { + epoch_t epoch_queued; + PGScrub(epoch_t e) : epoch_queued(e) {} + ostream &operator<<(ostream &rhs) { + return rhs << "PGScrub"; + } +}; + struct PGSnapTrim { epoch_t epoch_queued; PGSnapTrim(epoch_t e) : epoch_queued(e) {} ostream &operator<<(ostream &rhs) { - return rhs << "SnapTrim"; + return rhs << "PGSnapTrim"; } }; class PGQueueable { typedef boost::variant< OpRequestRef, - PGSnapTrim + PGSnapTrim, + PGScrub > QVariant; QVariant qvariant; int cost; @@ -343,6 +352,7 @@ class PGQueueable { : osd(osd), pg(pg), handle(handle) {} void operator()(OpRequestRef &op); void operator()(PGSnapTrim &op); + void operator()(PGScrub &op); }; public: PGQueueable(OpRequestRef op) @@ -356,6 +366,11 @@ public: const entity_inst_t &owner) : qvariant(op), cost(cost), priority(priority), start_time(start_time), owner(owner) {} + PGQueueable( + const PGScrub &op, int cost, unsigned priority, utime_t start_time, + const entity_inst_t &owner) + : qvariant(op), cost(cost), priority(priority), start_time(start_time), + owner(owner) {} boost::optional maybe_get_op() { OpRequestRef *op = boost::get(&qvariant); return op ? *op : boost::optional(); @@ -391,7 +406,6 @@ public: ShardedThreadPool::ShardedWQ < pair > &op_wq; ThreadPool::BatchWorkQueue &peering_wq; ThreadPool::WorkQueue &recovery_wq; - ThreadPool::WorkQueue &scrub_wq; GenContextWQ recovery_gen_wq; GenContextWQ op_gen_wq; ClassHandler *&class_handler; @@ -769,8 +783,16 @@ public: ceph_clock_now(cct), entity_inst_t()))); } - bool queue_for_scrub(PG *pg) { - return scrub_wq.queue(pg); + void queue_for_scrub(PG *pg) { + op_wq.queue( + make_pair( + pg, + PGQueueable( + PGScrub(pg->get_osdmap()->get_epoch()), + cct->_conf->osd_scrub_cost, + cct->_conf->osd_scrub_priority, + ceph_clock_now(cct), + entity_inst_t()))); } // osd map cache (past osd maps) @@ -2156,51 +2178,6 @@ protected: bool scrub_should_schedule(); bool scrub_time_permit(utime_t now); - xlist scrub_queue; - - struct ScrubWQ : public ThreadPool::WorkQueue { - OSD *osd; - ScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) - : ThreadPool::WorkQueue("OSD::ScrubWQ", ti, si, tp), osd(o) {} - - bool _empty() { - return osd->scrub_queue.empty(); - } - bool _enqueue(PG *pg) { - if (pg->scrub_item.is_on_list()) { - return false; - } - pg->get("ScrubWQ"); - osd->scrub_queue.push_back(&pg->scrub_item); - return true; - } - void _dequeue(PG *pg) { - if (pg->scrub_item.remove_myself()) { - pg->put("ScrubWQ"); - } - } - PG *_dequeue() { - if (osd->scrub_queue.empty()) - return NULL; - PG *pg = osd->scrub_queue.front(); - osd->scrub_queue.pop_front(); - return pg; - } - void _process( - PG *pg, - ThreadPool::TPHandle &handle) { - pg->scrub(handle); - pg->put("ScrubWQ"); - } - void _clear() { - while (!osd->scrub_queue.empty()) { - PG *pg = osd->scrub_queue.front(); - osd->scrub_queue.pop_front(); - pg->put("ScrubWQ"); - } - } - } scrub_wq; - // -- removing -- struct RemoveWQ : public ThreadPool::WorkQueueVal > { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 22df0afc4828..c90b67ba9297 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -195,8 +195,9 @@ PG::PG(OSDService *o, OSDMapRef curmap, coll(p), pg_log(cct), pgmeta_oid(p.make_pgmeta_oid()), missing_loc(this), - recovery_item(this), scrub_item(this), stat_queue_item(this), + recovery_item(this), stat_queue_item(this), snap_trim_queued(false), + scrub_queued(false), recovery_ops_active(0), role(0), state(0), @@ -1937,6 +1938,20 @@ void PG::queue_snap_trim() } } +bool PG::requeue_scrub() +{ + assert(is_locked()); + if (scrub_queued) { + dout(10) << __func__ << ": already queued" << dendl; + return false; + } else { + dout(10) << __func__ << ": queueing" << dendl; + scrub_queued = true; + osd->queue_for_scrub(this); + return true; + } +} + bool PG::queue_scrub() { assert(_lock.is_locked()); @@ -1953,7 +1968,7 @@ bool PG::queue_scrub() state_set(PG_STATE_REPAIR); scrubber.must_repair = false; } - osd->queue_for_scrub(this); + requeue_scrub(); return true; } @@ -3365,7 +3380,7 @@ void PG::sub_op_scrub_map(OpRequestRef op) scrubber.waiting_on_whom.erase(m->from); if (scrubber.waiting_on == 0) { - osd->scrub_wq.queue(this); + requeue_scrub(); } } @@ -3491,7 +3506,6 @@ void PG::schedule_backfill_full_retry() void PG::clear_scrub_reserved() { - osd->scrub_wq.dequeue(this); scrubber.reserved_peers.clear(); scrubber.reserve_failed = false; @@ -3782,13 +3796,8 @@ void PG::replica_scrub( * scrub will be chunky if all OSDs in PG support chunky scrub * scrub will fail if OSDs are too old. */ -void PG::scrub(ThreadPool::TPHandle &handle) +void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle) { - lock(); - if (deleting) { - unlock(); - return; - } if (g_conf->osd_scrub_sleep > 0 && (scrubber.state == PG::Scrubber::NEW_CHUNK || scrubber.state == PG::Scrubber::INACTIVE)) { @@ -3800,6 +3809,11 @@ void PG::scrub(ThreadPool::TPHandle &handle) lock(); dout(20) << __func__ << " slept for " << t << dendl; } + if (deleting || pg_has_reset_since(queued)) { + return; + } + assert(scrub_queued); + scrub_queued = false; if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) { dout(10) << "scrub -- not primary or active or not clean" << dendl; @@ -3807,7 +3821,6 @@ void PG::scrub(ThreadPool::TPHandle &handle) state_clear(PG_STATE_REPAIR); state_clear(PG_STATE_DEEP_SCRUB); publish_stats_to_osd(); - unlock(); return; } @@ -3838,8 +3851,6 @@ void PG::scrub(ThreadPool::TPHandle &handle) } chunky_scrub(handle); - - unlock(); } /* @@ -4135,7 +4146,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle) if (scrubber.end < hobject_t::get_max()) { scrubber.state = PG::Scrubber::NEW_CHUNK; - osd->scrub_wq.queue(this); + requeue_scrub(); done = true; } else { scrubber.state = PG::Scrubber::FINISH; @@ -4816,6 +4827,7 @@ void PG::start_peering_interval( peer_purged.clear(); actingbackfill.clear(); snap_trim_queued = false; + scrub_queued = false; // reset primary state? if (was_old_primary || is_primary()) { @@ -5132,6 +5144,8 @@ bool PG::can_discard_request(OpRequestRef& op) return can_discard_replica_op(op); case MSG_OSD_EC_READ_REPLY: return can_discard_replica_op(op); + case MSG_OSD_REP_SCRUB: + return can_discard_replica_op(op); case MSG_OSD_PG_SCAN: return can_discard_scan(op); @@ -5213,6 +5227,11 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op) return !have_same_or_newer_map( cur_epoch, static_cast(op->get_req())->map_epoch); + + case MSG_OSD_REP_SCRUB: + return !have_same_or_newer_map( + cur_epoch, + static_cast(op->get_req())->map_epoch); } assert(0); return false; diff --git a/src/osd/PG.h b/src/osd/PG.h index ae87319b40f7..b6808eba4eef 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -434,8 +434,9 @@ public: /* You should not use these items without taking their respective queue locks * (if they have one) */ - xlist::item recovery_item, scrub_item, stat_queue_item; + xlist::item recovery_item, stat_queue_item; bool snap_trim_queued; + bool scrub_queued; int recovery_ops_active; set waiting_on_backfill; @@ -1182,7 +1183,7 @@ public: const hobject_t& soid, list > *ok_peers, pg_shard_t bad_peer); - void scrub(ThreadPool::TPHandle &handle); + void scrub(epoch_t queued, ThreadPool::TPHandle &handle); void chunky_scrub(ThreadPool::TPHandle &handle); void scrub_compare_maps(); void scrub_process_inconsistent(); @@ -2156,6 +2157,7 @@ public: void log_weirdness(); void queue_snap_trim(); + bool requeue_scrub(); bool queue_scrub(); /// share pg info after a pg is active diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3d776a5696c4..737305648d97 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -7266,7 +7266,7 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version) if (is_primary()) { if (scrubber.active) { if (last_update_applied == scrubber.subset_last_update) { - osd->scrub_wq.queue(this); + requeue_scrub(); } } else { assert(scrubber.start == scrubber.end); @@ -8130,7 +8130,7 @@ void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc) waiting_for_blocked_object.erase(p); if (obc->requeue_scrub_on_unblock) - osd->queue_for_scrub(this); + requeue_scrub(); } SnapSetContext *ReplicatedPG::create_snapset_context(const hobject_t& oid) @@ -8363,7 +8363,7 @@ void ReplicatedPG::_applied_recovered_object(ObjectContextRef obc) // requeue an active chunky scrub waiting on recovery ops if (!deleting && active_pushes == 0 && scrubber.is_chunky_scrub_active()) { - osd->scrub_wq.queue(this); + requeue_scrub(); } unlock(); @@ -8747,7 +8747,6 @@ void ReplicatedPG::on_shutdown() // remove from queues osd->recovery_wq.dequeue(this); - osd->scrub_wq.dequeue(this); osd->pg_stat_queue_dequeue(this); osd->dequeue_pg(this, 0); osd->peering_wq.dequeue(this); @@ -11196,7 +11195,7 @@ void ReplicatedPG::_scrub_digest_updated() { dout(20) << __func__ << dendl; if (--scrubber.num_digest_updates_pending == 0) { - osd->scrub_wq.queue(this); + requeue_scrub(); } }