]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: Handle the mclock queue's return value to track the each dmclock queue
authorByungsu Park <bspark8@sk.com>
Wed, 1 Nov 2017 06:56:50 +0000 (15:56 +0900)
committerTaewoong Kim <taewoong.kim@sk.com>
Wed, 8 Nov 2017 12:45:09 +0000 (21:45 +0900)
client's status.

Signed-off-by: Byungsu Park <bspark8@sk.com>
src/common/mClockPriorityQueue.h
src/osd/OSD.cc
src/osd/OpRequest.cc
src/osd/OpRequest.h
src/osd/PrimaryLogPG.cc
src/osd/mClockClientQueue.cc
src/osdc/Objecter.cc
src/test/osd/TestMClockClientQueue.cc

index f51c22b53b47c026bfed6a0e33e80239b958d093..cbf06fa30e7ee7f4b1db4b4a16f193d9aeff7f4c 100644 (file)
@@ -39,6 +39,7 @@ namespace ceph {
 
     using priority_t = unsigned;
     using cost_t = unsigned;
+    using Retn = std::pair<T, dmc::PhaseType>;
 
     typedef std::list<std::pair<cost_t, T> > ListPairs;
 
@@ -329,7 +330,7 @@ namespace ceph {
     T dequeue() override final {
       assert(!empty());
 
-      if (!(high_queue.empty())) {
+      if (!high_queue.empty()) {
        T ret = std::move(high_queue.rbegin()->second.front().second);
        high_queue.rbegin()->second.pop_front();
        if (high_queue.rbegin()->second.empty()) {
@@ -350,6 +351,32 @@ namespace ceph {
       return std::move(*(retn.request));
     }
 
+    Retn dequeue_distributed() {
+      assert(!empty());
+      dmc::PhaseType resp_params = dmc::PhaseType();
+
+      if (!high_queue.empty()) {
+       T ret = std::move(high_queue.rbegin()->second.front().second);
+       high_queue.rbegin()->second.pop_front();
+       if (high_queue.rbegin()->second.empty()) {
+         high_queue.erase(high_queue.rbegin()->first);
+       }
+       return std::make_pair(std::move(ret), resp_params);
+      }
+
+      if (!queue_front.empty()) {
+       T ret = std::move(queue_front.front().second);
+       queue_front.pop_front();
+       return std::make_pair(std::move(ret), resp_params);
+      }
+
+      auto pr = queue.pull_request();
+      assert(pr.is_retn());
+      auto& retn = pr.get_retn();
+      resp_params = retn.phase;
+      return std::make_pair(std::move(*(retn.request)), resp_params);
+    }
+
     void dump(ceph::Formatter *f) const override final {
       f->open_array_section("high_queues");
       for (typename SubQueues::const_iterator p = high_queue.begin();
index 7518ac7973204c8c5b3d7d52c1191afcc3ce4a9b..9569f3f051dc6ef9baab73b240208e8f93eaca35 100644 (file)
@@ -1613,7 +1613,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
   flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
 
   MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
-                                      true);
+                                      true, op->qos_resp);
   reply->set_reply_versions(v, uv);
   m->get_connection()->send_message(reply);
 }
index 235db029d49f41e541c2c53ef205c8e2016546bb..970984b5a928d3fd9057ce5b96f5cc7e2a10f4ae 100644 (file)
@@ -27,7 +27,7 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) :
   TrackedOp(tracker, req->get_recv_stamp()),
   rmw_flags(0), request(req),
   hit_flag_points(0), latest_flag_point(0),
-  hitset_inserted(false) {
+  hitset_inserted(false), qos_resp(dmc::PhaseType::reservation) {
   if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
     // don't warn as quickly for low priority ops
     warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
index c0e77730dcb2a4e0568f2a5f592056761a466b8b..3d95ea341d403ccb80d125dd0762c68beda87c0b 100644 (file)
@@ -24,6 +24,7 @@
 #include "include/memory.h"
 #include "osd/osd_types.h"
 #include "common/TrackedOp.h"
+#include "common/mClockCommon.h"
 
 /**
  * The OpRequest takes in a Message* and takes over a single reference
@@ -115,6 +116,7 @@ public:
   epoch_t min_epoch = 0;      ///< min epoch needed to handle this msg
 
   bool hitset_inserted;
+  dmc::PhaseType qos_resp;
   const Message *get_req() const { return request; }
   Message *get_nonconst_req() { return request; }
 
index 3ceee7cb48b34c7b492f4d5ee4a154a5bcad93ae..27eab290393558f45b409e43c9c29aad4d418062 100644 (file)
@@ -1489,7 +1489,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
   // reply
   MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(),
                                       CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
-                                      false);
+                                      false, op->qos_resp);
   reply->claim_op_out_data(ops);
   reply->set_result(result);
   reply->set_reply_versions(info.last_update, info.last_user_version);
@@ -2391,7 +2391,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
       MOSDOpReply *reply = orig_reply.detach();
       if (reply == nullptr) {
        reply = new MOSDOpReply(m, r, pg->get_osdmap()->get_epoch(),
-                               flags, true);
+                               flags, true, op->qos_resp);
       }
       ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl;
       pg->osd->send_message_osd_client(reply, m->get_connection());
@@ -2676,8 +2676,8 @@ void PrimaryLogPG::do_cache_redirect(OpRequestRef op)
 {
   const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
   int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
-  MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
-                                      get_osdmap()->get_epoch(), flags, false);
+  MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(),
+                                       flags, false, op->qos_resp);
   request_redirect_t redir(m->get_object_locator(), pool.info.tier_of);
   reply->set_redirect(redir);
   dout(10) << "sending redirect to pool " << pool.info.tier_of << " for op "
@@ -2831,7 +2831,8 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
 
   const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
   OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
-  ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+  ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
+                               op->qos_resp);
   ctx->user_at_version = prdop->user_version;
   ctx->data_off = prdop->data_offset;
   ctx->ignore_log_op_stats = true;
@@ -3020,7 +3021,8 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
     if (reply)
       pwop->ctx->reply = NULL;
     else {
-      reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true);
+      reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true,
+                             pwop->op->qos_resp);
       reply->set_reply_versions(eversion_t(), pwop->user_version);
     }
     reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
@@ -3227,7 +3229,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
   // prepare the reply
   ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,
-                              successful_write);
+                              successful_write, op->qos_resp);
 
   // Write operations aren't allowed to return a data payload because
   // we can't do so reliably. If the client has to resend the request
@@ -3315,7 +3317,8 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
        if (reply)
          ctx->reply = nullptr;
        else {
-         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+         reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true,
+                                 ctx->op->qos_resp);
          reply->set_reply_versions(ctx->at_version,
                                    ctx->user_at_version);
        }
@@ -7841,7 +7844,8 @@ void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
   dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
   ::encode(reply_obj, osd_op.outdata, features);
   osd_op.rval = -ENOENT;
-  MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+  MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
+                                       op->qos_resp);
   reply->claim_op_out_data(m->ops);
   reply->set_result(-ENOENT);
   reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
index 47f86a50e0514c1fd9b09e68c38dcb1d98ee4fa7..993f223b10249218e7a13258e0b1b3d05695e330 100644 (file)
@@ -93,6 +93,11 @@ namespace ceph {
 
   // Return an op to be dispatched
   inline Request mClockClientQueue::dequeue() {
-    return queue.dequeue();
+    std::pair<Request, dmc::PhaseType> retn = queue.dequeue_distributed();
+
+    if (boost::optional<OpRequestRef> _op = retn.first.maybe_get_op()) {
+      (*_op)->qos_resp = retn.second;
+    }
+    return std::move(retn.first);
   }
 } // namespace ceph
index 470b5b9fd95df5471290a7b6a090a51963a4838d..310e14bd4b92754180ecb77f88447234103d07e2 100644 (file)
@@ -3510,6 +3510,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   /* get it before we call _finish_op() */
   auto completion_lock = s->get_lock(op->target.base_oid);
 
+  if (mclock_service_tracker) {
+    qos_trk->track_resp(op->target.osd, m->get_qos_resp());
+  }
   ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
   _finish_op(op, 0);
 
index 82490e594d2f10cb26ee670c3458b5ccb13a2c73..3799a7e26b6baae76cae06c9ed63e830caebe794 100644 (file)
@@ -131,6 +131,40 @@ TEST_F(MClockClientQueueTest, TestEnqueue) {
 }
 
 
+TEST_F(MClockClientQueueTest, TestDistributedEnqueue) {
+  Request r1 = create_snaptrim(100, client1);
+  Request r2 = create_snaptrim(101, client2);
+  Request r3 = create_snaptrim(102, client3);
+  Request r4 = create_snaptrim(103, client1);
+  Request r5 = create_snaptrim(104, client2);
+  Request r6 = create_snaptrim(105, client3);
+
+  r4.set_qos_params(dmc::ReqParams(50,1));
+  r5.set_qos_params(dmc::ReqParams(30,1));
+  r6.set_qos_params(dmc::ReqParams(10,1));
+
+  q.enqueue(client1, 12, 0, std::move(r1));
+  q.enqueue(client2, 12, 0, std::move(r2));
+  q.enqueue(client3, 12, 0, std::move(r3));
+  q.enqueue(client1, 12, 0, std::move(r4));
+  q.enqueue(client2, 12, 0, std::move(r5));
+  q.enqueue(client3, 12, 0, std::move(r6));
+
+  Request r = q.dequeue();
+  r = q.dequeue();
+  r = q.dequeue();
+
+  r = q.dequeue();
+  ASSERT_EQ(105u, r.get_map_epoch());
+
+  r = q.dequeue();
+  ASSERT_EQ(104u, r.get_map_epoch());
+
+  r = q.dequeue();
+  ASSERT_EQ(103u, r.get_map_epoch());
+}
+
+
 TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
   q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
   q.enqueue_strict(client2, 13, create_snaptrim(101, client2));