From: Samuel Just Date: Thu, 18 Dec 2014 20:01:57 +0000 (-0800) Subject: osd/: fast dispatch, standard op path for MOSDRepScrub X-Git-Tag: v9.0.2~48^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=732e5fe6c164d301a9c9cedfc2365f1e88cbb9c7;p=ceph.git osd/: fast dispatch, standard op path for MOSDRepScrub Previously, it had its own queue, but better to simply use the OpWQ along with everything else. It also allows MOSDRepScrub to be fast dispatched. We use NORMAL priority since we want the replica scrub message processed before recovery or client IO messages -- the primary is blocking writes on the objects until the replica responds. Signed-off-by: Samuel Just --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 311d0d364810..f6e941901934 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -198,7 +198,6 @@ OSDService::OSDService(OSD *osd) : recovery_wq(osd->recovery_wq), snap_trim_wq(osd->snap_trim_wq), scrub_wq(osd->scrub_wq), - rep_scrub_wq(osd->rep_scrub_wq), recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp), op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp), @@ -1545,11 +1544,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, cct->_conf->osd_scrub_thread_timeout, cct->_conf->osd_scrub_thread_suicide_timeout, &disk_tp), - rep_scrub_wq( - this, - cct->_conf->osd_scrub_thread_timeout, - cct->_conf->osd_scrub_thread_suicide_timeout, - &disk_tp), remove_wq( store, cct->_conf->osd_remove_thread_timeout, @@ -5595,7 +5589,6 @@ void OSD::dispatch_op(OpRequestRef op) case MSG_OSD_PG_CREATE: handle_pg_create(op); break; - case MSG_OSD_PG_NOTIFY: handle_pg_notify(op); break; @@ -5696,6 +5689,8 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap) case MSG_OSD_EC_READ_REPLY: handle_replica_op(op, osdmap); break; + case MSG_OSD_REP_SCRUB: + handle_replica_op(op, osdmap); default: assert(0); } @@ -5740,10 +5735,6 @@ void OSD::_dispatch(Message *m) handle_scrub(static_cast(m)); break; - case MSG_OSD_REP_SCRUB: - handle_rep_scrub(static_cast(m)); - break; - // -- need OSDMap -- default: @@ -5766,26 +5757,6 @@ void OSD::_dispatch(Message *m) } -void OSD::handle_rep_scrub(MOSDRepScrub *m) -{ - dout(10) << __func__ << " " << *m << dendl; - if (!require_self_aliveness(m, m->map_epoch)) { - m->put(); - return; - } - if (!require_osd_peer(m)) { - m->put(); - return; - } - if (osdmap->get_epoch() >= m->map_epoch && - !require_same_peer_instance(m, osdmap, true)) { - m->put(); - return; - } - - rep_scrub_wq.queue(m); -} - void OSD::handle_scrub(MOSDScrub *m) { dout(10) << "handle_scrub " << *m << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 56ccb4ffc136..16cde08bac33 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -339,7 +339,6 @@ public: ThreadPool::WorkQueue &recovery_wq; ThreadPool::WorkQueue &snap_trim_wq; ThreadPool::WorkQueue &scrub_wq; - ThreadPool::WorkQueue &rep_scrub_wq; GenContextWQ recovery_gen_wq; GenContextWQ op_gen_wq; ClassHandler *&class_handler; @@ -2168,60 +2167,6 @@ protected: } } scrub_wq; - struct RepScrubWQ : public ThreadPool::WorkQueue { - private: - OSD *osd; - list rep_scrub_queue; - - public: - RepScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp) - : ThreadPool::WorkQueue("OSD::RepScrubWQ", ti, si, tp), osd(o) {} - - bool _empty() { - return rep_scrub_queue.empty(); - } - bool _enqueue(MOSDRepScrub *msg) { - rep_scrub_queue.push_back(msg); - return true; - } - void _dequeue(MOSDRepScrub *msg) { - assert(0); // Not applicable for this wq - return; - } - MOSDRepScrub *_dequeue() { - if (rep_scrub_queue.empty()) - return NULL; - MOSDRepScrub *msg = rep_scrub_queue.front(); - rep_scrub_queue.pop_front(); - return msg; - } - void _process( - MOSDRepScrub *msg, - ThreadPool::TPHandle &handle) { - PG *pg = NULL; - { - Mutex::Locker lock(osd->osd_lock); - if (osd->is_stopping() || - !osd->_have_pg(msg->pgid)) { - msg->put(); - return; - } - pg = osd->_lookup_lock_pg(msg->pgid); - } - assert(pg); - pg->replica_scrub(msg, handle); - msg->put(); - pg->unlock(); - } - void _clear() { - while (!rep_scrub_queue.empty()) { - MOSDRepScrub *msg = rep_scrub_queue.front(); - rep_scrub_queue.pop_front(); - msg->put(); - } - } - } rep_scrub_wq; - // -- removing -- struct RemoveWQ : public ThreadPool::WorkQueueVal > { @@ -2274,6 +2219,7 @@ protected: case MSG_OSD_EC_WRITE_REPLY: case MSG_OSD_EC_READ: case MSG_OSD_EC_READ_REPLY: + case MSG_OSD_REP_SCRUB: return true; default: return false; @@ -2327,7 +2273,6 @@ private: static int write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami); - void handle_rep_scrub(MOSDRepScrub *m); void handle_scrub(struct MOSDScrub *m); void handle_osd_ping(class MOSDPing *m); void handle_op(OpRequestRef& op, OSDMapRef& osdmap); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 7d338bc640a4..7bbf7626ccbf 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3377,6 +3377,8 @@ void PG::_request_scrub_map( spg_t(info.pgid.pgid, replica.shard), version, get_osdmap()->get_epoch(), start, end, deep, seed); + // default priority, we want the rep scrub processed prior to any recovery + // or client io messages (we are holding a lock!) osd->send_message_osd_cluster( replica.osd, repscrubop, get_osdmap()->get_epoch()); } @@ -3715,9 +3717,10 @@ void PG::repair_object( * scrubmap of objects that are in the range [msg->start, msg->end). */ void PG::replica_scrub( - MOSDRepScrub *msg, + OpRequestRef op, ThreadPool::TPHandle &handle) { + MOSDRepScrub *msg = static_cast(op->get_req()); assert(!scrubber.active_rep_scrub); dout(7) << "replica_scrub" << dendl; @@ -3733,15 +3736,14 @@ void PG::replica_scrub( assert(msg->chunky); if (last_update_applied < msg->scrub_to) { dout(10) << "waiting for last_update_applied to catch up" << dendl; - scrubber.active_rep_scrub = msg; + scrubber.active_rep_scrub = op; msg->get(); return; } if (active_pushes > 0) { dout(10) << "waiting for active pushes to finish" << dendl; - scrubber.active_rep_scrub = msg; - msg->get(); + scrubber.active_rep_scrub = op; return; } diff --git a/src/osd/PG.h b/src/osd/PG.h index d51c2c96c79c..2f4dc35b110d 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1035,7 +1035,6 @@ public: epoch_start(0), active(false), queue_snap_trim(false), waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0), - active_rep_scrub(0), must_scrub(false), must_deep_scrub(false), must_repair(false), num_digest_updates_pending(0), state(INACTIVE), @@ -1059,7 +1058,7 @@ public: int fixed; ScrubMap primary_scrubmap; map received_maps; - MOSDRepScrub *active_rep_scrub; + OpRequestRef active_rep_scrub; utime_t scrub_reg_stamp; // stamp we registered for // flags to indicate explicitly requested scrubs (by admin) @@ -1145,8 +1144,7 @@ public: waiting_on = 0; waiting_on_whom.clear(); if (active_rep_scrub) { - active_rep_scrub->put(); - active_rep_scrub = NULL; + active_rep_scrub = OpRequestRef(); } received_maps.clear(); @@ -1230,7 +1228,7 @@ public: void unreg_next_scrub(); void replica_scrub( - struct MOSDRepScrub *op, + OpRequestRef op, ThreadPool::TPHandle &handle); void sub_op_scrub_map(OpRequestRef op); void sub_op_scrub_reserve(OpRequestRef op); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4c549a5fd3d3..f2225b7eef22 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1315,6 +1315,10 @@ void ReplicatedPG::do_request( do_backfill(op); break; + case MSG_OSD_REP_SCRUB: + replica_scrub(op, handle); + break; + default: assert(0 == "bad message type in do_request"); } @@ -7272,9 +7276,13 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version) } } else { if (scrubber.active_rep_scrub) { - if (last_update_applied == scrubber.active_rep_scrub->scrub_to) { - osd->rep_scrub_wq.queue(scrubber.active_rep_scrub); - scrubber.active_rep_scrub = 0; + if (last_update_applied == static_cast( + scrubber.active_rep_scrub->get_req())->scrub_to) { + osd->op_wq.queue( + make_pair( + this, + scrubber.active_rep_scrub)); + scrubber.active_rep_scrub = OpRequestRef(); } } } @@ -8374,9 +8382,13 @@ void ReplicatedPG::_applied_recovered_object_replica() // requeue an active chunky scrub waiting on recovery ops if (!deleting && active_pushes == 0 && - scrubber.active_rep_scrub && scrubber.active_rep_scrub->chunky) { - osd->rep_scrub_wq.queue(scrubber.active_rep_scrub); - scrubber.active_rep_scrub = 0; + scrubber.active_rep_scrub && static_cast( + scrubber.active_rep_scrub->get_req())->chunky) { + osd->op_wq.queue( + make_pair( + this, + scrubber.active_rep_scrub)); + scrubber.active_rep_scrub = OpRequestRef(); } unlock();