From de8110ebb39401d3e09cb871f843776421ccd56f Mon Sep 17 00:00:00 2001 From: Byungsu Park Date: Wed, 1 Nov 2017 15:56:50 +0900 Subject: [PATCH] osd: Handle the mclock queue's return value to track the each dmclock queue client's status. Signed-off-by: Byungsu Park --- src/common/mClockPriorityQueue.h | 29 ++++++++++++++++++++++- src/osd/OSD.cc | 2 +- src/osd/OpRequest.cc | 2 +- src/osd/OpRequest.h | 2 ++ src/osd/PrimaryLogPG.cc | 22 ++++++++++------- src/osd/mClockClientQueue.cc | 7 +++++- src/osdc/Objecter.cc | 3 +++ src/test/osd/TestMClockClientQueue.cc | 34 +++++++++++++++++++++++++++ 8 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/common/mClockPriorityQueue.h b/src/common/mClockPriorityQueue.h index f51c22b53b4..cbf06fa30e7 100644 --- a/src/common/mClockPriorityQueue.h +++ b/src/common/mClockPriorityQueue.h @@ -39,6 +39,7 @@ namespace ceph { using priority_t = unsigned; using cost_t = unsigned; + using Retn = std::pair; typedef std::list > ListPairs; @@ -329,7 +330,7 @@ namespace ceph { T dequeue() override final { assert(!empty()); - if (!(high_queue.empty())) { + if (!high_queue.empty()) { T ret = std::move(high_queue.rbegin()->second.front().second); high_queue.rbegin()->second.pop_front(); if (high_queue.rbegin()->second.empty()) { @@ -350,6 +351,32 @@ namespace ceph { return std::move(*(retn.request)); } + Retn dequeue_distributed() { + assert(!empty()); + dmc::PhaseType resp_params = dmc::PhaseType(); + + if (!high_queue.empty()) { + T ret = std::move(high_queue.rbegin()->second.front().second); + high_queue.rbegin()->second.pop_front(); + if (high_queue.rbegin()->second.empty()) { + high_queue.erase(high_queue.rbegin()->first); + } + return std::make_pair(std::move(ret), resp_params); + } + + if (!queue_front.empty()) { + T ret = std::move(queue_front.front().second); + queue_front.pop_front(); + return std::make_pair(std::move(ret), resp_params); + } + + auto pr = queue.pull_request(); + assert(pr.is_retn()); + auto& retn = pr.get_retn(); + resp_params = retn.phase; + return std::make_pair(std::move(*(retn.request)), resp_params); + } + void dump(ceph::Formatter *f) const override final { f->open_array_section("high_queues"); for (typename SubQueues::const_iterator p = high_queue.begin(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7518ac79732..9569f3f051d 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1613,7 +1613,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, - true); + true, op->qos_resp); reply->set_reply_versions(v, uv); m->get_connection()->send_message(reply); } diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 235db029d49..970984b5a92 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -27,7 +27,7 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) : TrackedOp(tracker, req->get_recv_stamp()), rmw_flags(0), request(req), hit_flag_points(0), latest_flag_point(0), - hitset_inserted(false) { + hitset_inserted(false), qos_resp(dmc::PhaseType::reservation) { if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) { // don't warn as quickly for low priority ops warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple; diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index c0e77730dcb..3d95ea341d4 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -24,6 +24,7 @@ #include "include/memory.h" #include "osd/osd_types.h" #include "common/TrackedOp.h" +#include "common/mClockCommon.h" /** * The OpRequest takes in a Message* and takes over a single reference @@ -115,6 +116,7 @@ public: epoch_t min_epoch = 0; ///< min epoch needed to handle this msg bool hitset_inserted; + dmc::PhaseType qos_resp; const Message *get_req() const { return request; } Message *get_nonconst_req() { return request; } diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 3ceee7cb48b..27eab290393 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1489,7 +1489,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op) // reply MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, - false); + false, op->qos_resp); reply->claim_op_out_data(ops); reply->set_result(result); reply->set_reply_versions(info.last_update, info.last_user_version); @@ -2391,7 +2391,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid, MOSDOpReply *reply = orig_reply.detach(); if (reply == nullptr) { reply = new MOSDOpReply(m, r, pg->get_osdmap()->get_epoch(), - flags, true); + flags, true, op->qos_resp); } ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl; pg->osd->send_message_osd_client(reply, m->get_connection()); @@ -2676,8 +2676,8 @@ void PrimaryLogPG::do_cache_redirect(OpRequestRef op) { const MOSDOp *m = static_cast(op->get_req()); int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); - MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, - get_osdmap()->get_epoch(), flags, false); + MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(), + flags, false, op->qos_resp); request_redirect_t redir(m->get_object_locator(), pool.info.tier_of); reply->set_redirect(redir); dout(10) << "sending redirect to pool " << pool.info.tier_of << " for op " @@ -2831,7 +2831,8 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r) const MOSDOp *m = static_cast(op->get_req()); OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this); - ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); + ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false, + op->qos_resp); ctx->user_at_version = prdop->user_version; ctx->data_off = prdop->data_offset; ctx->ignore_log_op_stats = true; @@ -3020,7 +3021,8 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r) if (reply) pwop->ctx->reply = NULL; else { - reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true); + reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true, + pwop->op->qos_resp); reply->set_reply_versions(eversion_t(), pwop->user_version); } reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); @@ -3227,7 +3229,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx) bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0; // prepare the reply ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, - successful_write); + successful_write, op->qos_resp); // Write operations aren't allowed to return a data payload because // we can't do so reliably. If the client has to resend the request @@ -3315,7 +3317,8 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx) if (reply) ctx->reply = nullptr; else { - reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true, + ctx->op->qos_resp); reply->set_reply_versions(ctx->at_version, ctx->user_at_version); } @@ -7841,7 +7844,8 @@ void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid, dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl; ::encode(reply_obj, osd_op.outdata, features); osd_op.rval = -ENOENT; - MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false, + op->qos_resp); reply->claim_op_out_data(m->ops); reply->set_result(-ENOENT); reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); diff --git a/src/osd/mClockClientQueue.cc b/src/osd/mClockClientQueue.cc index 47f86a50e05..993f223b102 100644 --- a/src/osd/mClockClientQueue.cc +++ b/src/osd/mClockClientQueue.cc @@ -93,6 +93,11 @@ namespace ceph { // Return an op to be dispatched inline Request mClockClientQueue::dequeue() { - return queue.dequeue(); + std::pair retn = queue.dequeue_distributed(); + + if (boost::optional _op = retn.first.maybe_get_op()) { + (*_op)->qos_resp = retn.second; + } + return std::move(retn.first); } } // namespace ceph diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 470b5b9fd95..310e14bd4b9 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -3510,6 +3510,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) /* get it before we call _finish_op() */ auto completion_lock = s->get_lock(op->target.base_oid); + if (mclock_service_tracker) { + qos_trk->track_resp(op->target.osd, m->get_qos_resp()); + } ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl; _finish_op(op, 0); diff --git a/src/test/osd/TestMClockClientQueue.cc b/src/test/osd/TestMClockClientQueue.cc index 82490e594d2..3799a7e26b6 100644 --- a/src/test/osd/TestMClockClientQueue.cc +++ b/src/test/osd/TestMClockClientQueue.cc @@ -131,6 +131,40 @@ TEST_F(MClockClientQueueTest, TestEnqueue) { } +TEST_F(MClockClientQueueTest, TestDistributedEnqueue) { + Request r1 = create_snaptrim(100, client1); + Request r2 = create_snaptrim(101, client2); + Request r3 = create_snaptrim(102, client3); + Request r4 = create_snaptrim(103, client1); + Request r5 = create_snaptrim(104, client2); + Request r6 = create_snaptrim(105, client3); + + r4.set_qos_params(dmc::ReqParams(50,1)); + r5.set_qos_params(dmc::ReqParams(30,1)); + r6.set_qos_params(dmc::ReqParams(10,1)); + + q.enqueue(client1, 12, 0, std::move(r1)); + q.enqueue(client2, 12, 0, std::move(r2)); + q.enqueue(client3, 12, 0, std::move(r3)); + q.enqueue(client1, 12, 0, std::move(r4)); + q.enqueue(client2, 12, 0, std::move(r5)); + q.enqueue(client3, 12, 0, std::move(r6)); + + Request r = q.dequeue(); + r = q.dequeue(); + r = q.dequeue(); + + r = q.dequeue(); + ASSERT_EQ(105u, r.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.get_map_epoch()); +} + + TEST_F(MClockClientQueueTest, TestEnqueueStrict) { q.enqueue_strict(client1, 12, create_snaptrim(100, client1)); q.enqueue_strict(client2, 13, create_snaptrim(101, client2)); -- 2.39.5