]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: Handle possible future work items returned from mclock scheduler.
authorSridhar Seshasayee <sseshasa@redhat.com>
Thu, 8 Oct 2020 14:03:02 +0000 (19:33 +0530)
committerSridhar Seshasayee <sseshasa@redhat.com>
Tue, 20 Oct 2020 07:35:51 +0000 (13:05 +0530)
It's quite possible for a scheduler like mClock to return no work items
based on parameters like reservation, weight and limit. For e.g., if
a client exceeds its limit for a given time period, the scheduler returns
a response indicating that the operation in its queue will be scheduled at
a later point in time. The osd in such a scenario must handle this case.

mClockScheduler::dequeue() now either returns an item to be scheduled or
a time (double) indicating when the op will be ready for dequeuing. The
next dequeue is attempted in the subsequent cycle after waiting until the
'future_time' expires or when the worker thread is notified as part of
OSD::ShardedOpWQ::_process() within the sharded threadpool worker loop.

Signed-off-by: Sridhar Seshasayee <sseshasa@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/scheduler/OpScheduler.h
src/osd/scheduler/mClockScheduler.cc
src/osd/scheduler/mClockScheduler.h
src/test/osd/TestMClockScheduler.cc

index 932ce8539f9389872f1bad1b772e7327ba6e803e..cfbb82feb0e0a39c9bd94f8a1752c753344c773a 100644 (file)
@@ -10487,21 +10487,54 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     sdata->context_queue.move_to(oncommits);
   }
 
-  if (sdata->scheduler->empty()) {
+  WorkItem work_item;
+  while (!std::get_if<OpSchedulerItem>(&work_item)) {
+    if (sdata->scheduler->empty()) {
+      if (osd->is_stopping()) {
+        sdata->shard_lock.unlock();
+        for (auto c : oncommits) {
+          dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+          delete c;
+        }
+        return;    // OSD shutdown, discard.
+      }
+      sdata->shard_lock.unlock();
+      handle_oncommits(oncommits);
+      return;
+    }
+
+    work_item = sdata->scheduler->dequeue();
     if (osd->is_stopping()) {
       sdata->shard_lock.unlock();
       for (auto c : oncommits) {
-       dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
-       delete c;
+        dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+        delete c;
       }
       return;    // OSD shutdown, discard.
     }
-    sdata->shard_lock.unlock();
-    handle_oncommits(oncommits);
-    return;
-  }
 
-  OpSchedulerItem item = sdata->scheduler->dequeue();
+    // If the work item is scheduled in the future, wait until
+    // the time returned in the dequeue response before retrying.
+    if (auto when_ready = std::get_if<double>(&work_item)) {
+      if (is_smallest_thread_index) {
+        sdata->shard_lock.unlock();
+        handle_oncommits(oncommits);
+        return;
+      }
+      std::unique_lock wait_lock{sdata->sdata_wait_lock};
+      auto future_time = ceph::real_clock::from_double(*when_ready);
+      dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+      sdata->shard_lock.unlock();
+      ++sdata->waiting_threads;
+      sdata->sdata_cond.wait_until(wait_lock, future_time);
+      --sdata->waiting_threads;
+      wait_lock.unlock();
+      sdata->shard_lock.lock();
+    }
+  } // while
+
+  // Access the stored item
+  auto item = std::move(std::get<OpSchedulerItem>(work_item));
   if (osd->is_stopping()) {
     sdata->shard_lock.unlock();
     for (auto c : oncommits) {
@@ -10750,9 +10783,13 @@ void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
     sdata->scheduler->enqueue(std::move(item));
   }
 
-  if (empty) {
+  {
     std::lock_guard l{sdata->sdata_wait_lock};
-    sdata->sdata_cond.notify_all();
+    if (empty) {
+      sdata->sdata_cond.notify_all();
+    } else if (sdata->waiting_threads) {
+      sdata->sdata_cond.notify_one();
+    }
   }
 }
 
index dc25f604c35aaf4b1198e94cecb720d7485a9d28..c12ec84c75a5a75f5636a8208fb122b03919310c 100644 (file)
@@ -984,6 +984,7 @@ struct OSDShard {
   std::string sdata_wait_lock_name;
   ceph::mutex sdata_wait_lock;
   ceph::condition_variable sdata_cond;
+  int waiting_threads = 0;
 
   ceph::mutex osdmap_lock;  ///< protect shard_osdmap updates vs users w/o shard_lock
   OSDMapRef shard_osdmap;
index 4f95dfe1da2b623057f9661a2698adfa89264775..0a17118f02f4b0c97f6052b4cad0003289685303 100644 (file)
@@ -15,6 +15,7 @@
 #pragma once
 
 #include <ostream>
+#include <variant>
 
 #include "common/ceph_context.h"
 #include "osd/scheduler/OpSchedulerItem.h"
@@ -22,6 +23,7 @@
 namespace ceph::osd::scheduler {
 
 using client = uint64_t;
+using WorkItem = std::variant<std::monostate, OpSchedulerItem, double>;
 
 /**
  * Base interface for classes responsible for choosing
@@ -40,7 +42,7 @@ public:
   virtual bool empty() const = 0;
 
   // Return next op to be processed
-  virtual OpSchedulerItem dequeue() = 0;
+  virtual WorkItem dequeue() = 0;
 
   // Dump formatted representation for the queue
   virtual void dump(ceph::Formatter &f) const = 0;
@@ -117,7 +119,7 @@ public:
     return queue.empty();
   }
 
-  OpSchedulerItem dequeue() final {
+  WorkItem dequeue() final {
     return queue.dequeue();
   }
 
index 0e96fbfae0d7f5788904726935620f363f80b941..c075ed7d2d3095aba64dd4996d10f81da4e83c8a 100644 (file)
@@ -35,7 +35,7 @@ mClockScheduler::mClockScheduler(CephContext *cct) :
     std::bind(&mClockScheduler::ClientRegistry::get_info,
              &client_registry,
              _1),
-    dmc::AtLimit::Allow,
+    dmc::AtLimit::Wait,
     cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
 {
   cct->_conf.add_observer(this);
@@ -114,18 +114,16 @@ void mClockScheduler::enqueue_front(OpSchedulerItem&& item)
   // putting the item back in the queue
 }
 
-OpSchedulerItem mClockScheduler::dequeue()
+WorkItem mClockScheduler::dequeue()
 {
   if (!immediate.empty()) {
     auto ret = std::move(immediate.back());
     immediate.pop_back();
-    return ret;
+    return std::move(ret);
   } else {
     mclock_queue_t::PullReq result = scheduler.pull_request();
     if (result.is_future()) {
-      ceph_assert(
-       0 == "Not implemented, user would have to be able to be woken up");
-      return std::move(*(OpSchedulerItem*)nullptr);
+      return result.getTime();
     } else if (result.is_none()) {
       ceph_assert(
        0 == "Impossible, must have checked empty() first");
index 1b469a1a12e28a1e6b909936bdbbbb2d64d9324c..2573afead3b362d6d18ff842b3bef8b59bdbf0d1 100644 (file)
@@ -110,7 +110,7 @@ public:
   void enqueue_front(OpSchedulerItem &&item) final;
 
   // Return an op to be dispatch
-  OpSchedulerItem dequeue() final;
+  WorkItem dequeue() final;
 
   // Returns if the queue is empty
   bool empty() const final {
index 65e2a1ae91fc9cb8dd07d09560f93360cfef3793..775dbb2f993433bacb57b255f4cd18a73395dbb9 100644 (file)
@@ -77,6 +77,11 @@ OpSchedulerItem create_item(
     utime_t(), owner, e);
 }
 
+OpSchedulerItem get_item(WorkItem item)
+{
+  return std::move(std::get<OpSchedulerItem>(item));
+}
+
 TEST_F(mClockSchedulerTest, TestEmpty) {
   ASSERT_TRUE(q.empty());
 
@@ -88,8 +93,8 @@ TEST_F(mClockSchedulerTest, TestEmpty) {
 
   std::list<OpSchedulerItem> reqs;
 
-  reqs.push_back(q.dequeue());
-  reqs.push_back(q.dequeue());
+  reqs.push_back(get_item(q.dequeue()));
+  reqs.push_back(get_item(q.dequeue()));
 
   ASSERT_FALSE(q.empty());
 
@@ -115,19 +120,19 @@ TEST_F(mClockSchedulerTest, TestSingleClientOrderedEnqueueDequeue) {
   q.enqueue(create_item(103, client1));
   q.enqueue(create_item(104, client1));
 
-  auto r = q.dequeue();
+  auto r = get_item(q.dequeue());
   ASSERT_EQ(100u, r.get_map_epoch());
 
-  r = q.dequeue();
+  r = get_item(q.dequeue());
   ASSERT_EQ(101u, r.get_map_epoch());
 
-  r = q.dequeue();
+  r = get_item(q.dequeue());
   ASSERT_EQ(102u, r.get_map_epoch());
 
-  r = q.dequeue();
+  r = get_item(q.dequeue());
   ASSERT_EQ(103u, r.get_map_epoch());
 
-  r = q.dequeue();
+  r = get_item(q.dequeue());
   ASSERT_EQ(104u, r.get_map_epoch());
 }
 
@@ -145,7 +150,7 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) {
   }
   for (unsigned i = 0; i < NUM * 3; ++i) {
     ASSERT_FALSE(q.empty());
-    auto r = q.dequeue();
+    auto r = get_item(q.dequeue());
     auto owner = r.get_owner();
     auto niter = next.find(owner);
     ASSERT_FALSE(niter == next.end());