]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
src/osd: replace OpQueue abstraction in osd with Scheduler
authorSamuel Just <sjust@redhat.com>
Wed, 11 Sep 2019 20:42:44 +0000 (13:42 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 22 Oct 2019 22:11:51 +0000 (15:11 -0700)
OpQueue is overkill for mclock based schedulers.  The interface doesn't
need to externalize the _strict modifiers, the scheduler can figure that
out from the item itself.  Introduce simpler Scheduler interface and add
an adapter for the existing OpQueue based implementations.

Signed-off-by: Samuel Just <sjust@redhat.com>
22 files changed:
src/ceph_osd.cc
src/common/OpQueue.h
src/common/PrioritizedQueue.h
src/common/WeightedPriorityQueue.h
src/common/mClockPriorityQueue.h
src/osd/CMakeLists.txt
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpQueueItem.cc [deleted file]
src/osd/OpQueueItem.h [deleted file]
src/osd/PG.cc
src/osd/PrimaryLogPG.cc
src/osd/mClockClientQueue.h
src/osd/mClockOpClassQueue.h
src/osd/mClockOpClassSupport.cc
src/osd/mClockOpClassSupport.h
src/osd/scheduler/OpScheduler.cc [new file with mode: 0644]
src/osd/scheduler/OpScheduler.h [new file with mode: 0644]
src/osd/scheduler/OpSchedulerItem.cc [new file with mode: 0644]
src/osd/scheduler/OpSchedulerItem.h [new file with mode: 0644]
src/test/osd/TestMClockClientQueue.cc
src/test/osd/TestMClockOpClassQueue.cc

index b24ca32ab02562e0d7736181b43a02720af1c862..3e03e8632396e461f45efa134aec0c4264d55e98 100644 (file)
@@ -68,12 +68,12 @@ TracepointProvider::Traits cyg_profile_traits("libcyg_profile_tp.so",
 
 } // anonymous namespace
 
-OSD *osd = nullptr;
+OSD *osdptr = nullptr;
 
 void handle_osd_signal(int signum)
 {
-  if (osd)
-    osd->handle_signal(signum);
+  if (osdptr)
+    osdptr->handle_signal(signum);
 }
 
 static void usage()
@@ -673,21 +673,21 @@ flushjournal_out:
     forker.exit(1);
   }
 
-  osd = new OSD(g_ceph_context,
-                store,
-                whoami,
-                ms_cluster,
-                ms_public,
-                ms_hb_front_client,
-                ms_hb_back_client,
-                ms_hb_front_server,
-                ms_hb_back_server,
-                ms_objecter,
-                &mc,
-                data_path,
-                journal_path);
-
-  int err = osd->pre_init();
+  osdptr = new OSD(g_ceph_context,
+                  store,
+                  whoami,
+                  ms_cluster,
+                  ms_public,
+                  ms_hb_front_client,
+                  ms_hb_back_client,
+                  ms_hb_front_server,
+                  ms_hb_back_server,
+                  ms_objecter,
+                  &mc,
+                  data_path,
+                  journal_path);
+
+  int err = osdptr->pre_init();
   if (err < 0) {
     derr << TEXT_RED << " ** ERROR: osd pre_init failed: " << cpp_strerror(-err)
         << TEXT_NORMAL << dendl;
@@ -703,7 +703,7 @@ flushjournal_out:
   ms_objecter->start();
 
   // start osd
-  err = osd->init();
+  err = osdptr->init();
   if (err < 0) {
     derr << TEXT_RED << " ** ERROR: osd init failed: " << cpp_strerror(-err)
          << TEXT_NORMAL << dendl;
@@ -721,7 +721,7 @@ flushjournal_out:
   register_async_signal_handler_oneshot(SIGINT, handle_osd_signal);
   register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal);
 
-  osd->final_init();
+  osdptr->final_init();
 
   if (g_conf().get_val<bool>("inject_early_sigterm"))
     kill(getpid(), SIGTERM);
@@ -740,7 +740,7 @@ flushjournal_out:
   shutdown_async_signal_handler();
 
   // done
-  delete osd;
+  delete osdptr;
   delete ms_public;
   delete ms_hb_front_client;
   delete ms_hb_back_client;
index 451130e5389436aa925c0f2171568b2d4078b675..0204f4b44039727507107ba0b01cdd84159e3fac 100644 (file)
@@ -63,6 +63,9 @@ public:
   // Formatted output of the queue
   virtual void dump(ceph::Formatter *f) const = 0;
 
+  // Human readable brief description of queue and relevant parameters
+  virtual void print(std::ostream &f) const = 0;
+
   // Don't leak resources on destruction
   virtual ~OpQueue() {};
 };
index 6d7de1291f66de5ad66ddb208d053a63f204e394..9adf21aafe11214e3e2b46ed7a6755c22dbe5d40 100644 (file)
@@ -343,6 +343,10 @@ public:
     }
     f->close_section();
   }
+
+  void print(std::ostream &ostream) const final {
+    ostream << "PrioritizedQueue";
+  }
 };
 
 #endif
index a05174e8185da34683111f3df0f0fc61a46b6894..cf34709b979483f7476b9e22b7fbdca9c1551b3a 100644 (file)
@@ -344,6 +344,10 @@ class WeightedPriorityQueue :  public OpQueue <T, K>
       normal.dump(f);
       f->close_section();
     }
+
+    void print(std::ostream &ostream) const final {
+      ostream << "WeightedPriorityQueue";
+    }
 };
 
 #endif
index ae4259207a32ab17bf689cbc57ea9fdaade9732c..c1f9f3c2517dd7a7fc5770a9dc0c4e15ff251b5a 100644 (file)
@@ -360,6 +360,10 @@ namespace ceph {
       f->dump_int("size", queue.request_count());
       f->close_section();
     } // dump
+
+    void print(std::ostream &os) const final {
+      os << "mClockPriorityQueue";
+    }
   };
 
 } // namespace ceph
index 3d7950ce5cc49a7e51c829a59466012af9b8ca87..3cc60752999456c2833f39f9786e03af37136799 100644 (file)
@@ -32,7 +32,8 @@ set(osd_srcs
   mClockOpClassSupport.cc
   mClockOpClassQueue.cc
   mClockClientQueue.cc
-  OpQueueItem.cc
+  scheduler/OpScheduler.cc
+  scheduler/OpSchedulerItem.cc
   PeeringState.cc
   PGStateUtils.cc
   MissingLoc.cc
index dbf1e754f70fa700be9db38e8ffbac5365fd620e..1de0cf47e1afc35275883bc7b82ce7d62bb973b9 100644 (file)
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
 
+using namespace ceph::osd::scheduler;
 
 static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
   return *_dout << "osd." << whoami << " " << epoch << " ";
@@ -1673,12 +1674,12 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
               << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
 }
 
-void OSDService::enqueue_back(OpQueueItem&& qi)
+void OSDService::enqueue_back(OpSchedulerItem&& qi)
 {
   osd->op_shardedwq.queue(std::move(qi));
 }
 
-void OSDService::enqueue_front(OpQueueItem&& qi)
+void OSDService::enqueue_front(OpSchedulerItem&& qi)
 {
   osd->op_shardedwq.queue_front(std::move(qi));
 }
@@ -1689,8 +1690,8 @@ void OSDService::queue_recovery_context(
 {
   epoch_t e = get_osdmap_epoch();
   enqueue_back(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(
        new PGRecoveryContext(pg->get_pgid(), c, e)),
       cct->_conf->osd_recovery_cost,
       cct->_conf->osd_recovery_priority,
@@ -1703,8 +1704,8 @@ void OSDService::queue_for_snap_trim(PG *pg)
 {
   dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
   enqueue_back(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(
        new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())),
       cct->_conf->osd_snap_trim_cost,
       cct->_conf->osd_snap_trim_priority,
@@ -1721,8 +1722,8 @@ void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
   }
   const auto epoch = pg->get_osdmap_epoch();
   enqueue_back(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
       cct->_conf->osd_scrub_cost,
       scrub_queue_priority,
       ceph_clock_now(),
@@ -1734,8 +1735,8 @@ void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
 {
   dout(10) << __func__ << " on " << pgid << " e " << e  << dendl;
   enqueue_back(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(
        new PGDelete(pgid, e)),
       cct->_conf->osd_pg_delete_cost,
       cct->_conf->osd_pg_delete_priority,
@@ -1888,8 +1889,8 @@ void OSDService::_queue_for_recovery(
 {
   ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
   enqueue_back(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(
        new PGRecovery(
          p.second->get_pgid(), p.first, reserved_pushes)),
       cct->_conf->osd_recovery_cost,
@@ -2148,8 +2149,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   op_tracker(cct, cct->_conf->osd_enable_op_tracker,
                   cct->_conf->osd_num_op_tracker_shard),
   test_ops_hook(NULL),
-  op_queue(get_io_queue()),
-  op_prio_cutoff(get_io_prio_cut()),
   op_shardedwq(
     this,
     cct->_conf->osd_op_thread_timeout,
@@ -2199,10 +2198,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     OSDShard *one_shard = new OSDShard(
       i,
       cct,
-      this,
-      cct->_conf->osd_op_pq_max_tokens_per_priority,
-      cct->_conf->osd_op_pq_min_cost,
-      op_queue);
+      this);
     shards.push_back(one_shard);
   }
 }
@@ -3453,8 +3449,6 @@ int OSD::init()
   load_pgs();
 
   dout(2) << "superblock: I am osd." << superblock.whoami << dendl;
-  dout(0) << "using " << op_queue << " op queue with priority op cut off at " <<
-    op_prio_cutoff << "." << dendl;
 
   create_logger();
 
@@ -9533,8 +9527,8 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
   op->mark_queued_for_pg();
   logger->tinc(l_osd_op_before_queue_op_lat, latency);
   op_shardedwq.queue(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
       cost, priority, stamp, owner, epoch));
 }
 
@@ -9542,8 +9536,8 @@ void OSD::enqueue_peering_evt(spg_t pgid, PGPeeringEventRef evt)
 {
   dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl;
   op_shardedwq.queue(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
       10,
       cct->_conf->osd_peering_op_priority,
       utime_t(),
@@ -9555,8 +9549,8 @@ void OSD::enqueue_peering_evt_front(spg_t pgid, PGPeeringEventRef evt)
 {
   dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl;
   op_shardedwq.queue_front(
-    OpQueueItem(
-      unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
+    OpSchedulerItem(
+      unique_ptr<OpSchedulerItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
       10,
       cct->_conf->osd_peering_op_priority,
       utime_t(),
@@ -10272,13 +10266,13 @@ void OSDShard::_wake_pg_slot(
   for (auto i = slot->to_process.rbegin();
        i != slot->to_process.rend();
        ++i) {
-    _enqueue_front(std::move(*i), osd->op_prio_cutoff);
+    scheduler->enqueue_front(std::move(*i));
   }
   slot->to_process.clear();
   for (auto i = slot->waiting.rbegin();
        i != slot->waiting.rend();
        ++i) {
-    _enqueue_front(std::move(*i), osd->op_prio_cutoff);
+    scheduler->enqueue_front(std::move(*i));
   }
   slot->waiting.clear();
   for (auto i = slot->waiting_peering.rbegin();
@@ -10288,7 +10282,7 @@ void OSDShard::_wake_pg_slot(
     // items are waiting for maps we don't have yet.  FIXME, maybe,
     // someday, if we decide this inefficiency matters
     for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
-      _enqueue_front(std::move(*j), osd->op_prio_cutoff);
+      scheduler->enqueue_front(std::move(*j));
     }
   }
   slot->waiting_peering.clear();
@@ -10480,6 +10474,26 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
   }
 }
 
+OSDShard::OSDShard(
+  int id,
+  CephContext *cct,
+  OSD *osd)
+  : shard_id(id),
+    cct(cct),
+    osd(osd),
+    shard_name(string("OSDShard.") + stringify(id)),
+    sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
+    sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
+    osdmap_lock_name(shard_name + "::osdmap_lock"),
+    osdmap_lock{make_mutex(osdmap_lock_name)},
+    shard_lock_name(shard_name + "::shard_lock"),
+    shard_lock{make_mutex(shard_lock_name)},
+    scheduler(ceph::osd::scheduler::make_scheduler(cct)),
+    context_queue(sdata_wait_lock, sdata_cond)
+{
+  dout(0) << "using op scheduler " << *scheduler << dendl;
+}
+
 
 // =============================================================
 
@@ -10491,7 +10505,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
 void OSD::ShardedOpWQ::_add_slot_waiter(
   spg_t pgid,
   OSDShardPGSlot *slot,
-  OpQueueItem&& qi)
+  OpSchedulerItem&& qi)
 {
   if (qi.is_peering()) {
     dout(20) << __func__ << " " << pgid
@@ -10525,7 +10539,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
   // peek at spg_t
   sdata->shard_lock.lock();
-  if (sdata->pqueue->empty() &&
+  if (sdata->scheduler->empty() &&
       (!is_smallest_thread_index || sdata->context_queue.empty())) {
     std::unique_lock wait_lock{sdata->sdata_wait_lock};
     if (is_smallest_thread_index && !sdata->context_queue.empty()) {
@@ -10538,7 +10552,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       sdata->sdata_cond.wait(wait_lock);
       wait_lock.unlock();
       sdata->shard_lock.lock();
-      if (sdata->pqueue->empty() &&
+      if (sdata->scheduler->empty() &&
          !(is_smallest_thread_index && !sdata->context_queue.empty())) {
        sdata->shard_lock.unlock();
        return;
@@ -10558,7 +10572,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     sdata->context_queue.move_to(oncommits);
   }
 
-  if (sdata->pqueue->empty()) {
+  if (sdata->scheduler->empty()) {
     if (osd->is_stopping()) {
       sdata->shard_lock.unlock();
       for (auto c : oncommits) {
@@ -10572,7 +10586,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     return;
   }
 
-  OpQueueItem item = sdata->pqueue->dequeue();
+  OpSchedulerItem item = sdata->scheduler->dequeue();
   if (osd->is_stopping()) {
     sdata->shard_lock.unlock();
     for (auto c : oncommits) {
@@ -10805,25 +10819,21 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   handle_oncommits(oncommits);
 }
 
-void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
+void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
   uint32_t shard_index =
     item.get_ordering_token().hash_to_shard(osd->shards.size());
 
+  dout(20) << __func__ << " " << item << dendl;
+
   OSDShard* sdata = osd->shards[shard_index];
   assert (NULL != sdata);
-  unsigned priority = item.get_priority();
-  unsigned cost = item.get_cost();
-  sdata->shard_lock.lock();
 
-  dout(20) << __func__ << " " << item << dendl;
-  bool empty = sdata->pqueue->empty();
-  if (priority >= osd->op_prio_cutoff)
-    sdata->pqueue->enqueue_strict(
-      item.get_owner(), priority, std::move(item));
-  else
-    sdata->pqueue->enqueue(
-      item.get_owner(), priority, cost, std::move(item));
-  sdata->shard_lock.unlock();
+  bool empty = true;
+  {
+    std::lock_guard l{sdata->shard_lock};
+    empty = sdata->scheduler->empty();
+    sdata->scheduler->enqueue(std::move(item));
+  }
 
   if (empty) {
     std::lock_guard l{sdata->sdata_wait_lock};
@@ -10831,7 +10841,7 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
   }
 }
 
-void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
+void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
 {
   auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
   auto& sdata = osd->shards[shard_index];
@@ -10841,7 +10851,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
   if (p != sdata->pg_slots.end() &&
       !p->second->to_process.empty()) {
     // we may be racing with _process, which has dequeued a new item
-    // from pqueue, put it on to_process, and is now busy taking the
+    // from scheduler, put it on to_process, and is now busy taking the
     // pg lock.  ensure this old requeued item is ordered before any
     // such newer item in to_process.
     p->second->to_process.push_front(std::move(item));
@@ -10853,7 +10863,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
   } else {
     dout(20) << __func__ << " " << item << dendl;
   }
-  sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
+  sdata->scheduler->enqueue_front(std::move(item));
   sdata->shard_lock.unlock();
   std::lock_guard l{sdata->sdata_wait_lock};
   sdata->sdata_cond.notify_one();
@@ -10890,22 +10900,3 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
 }
  
 }} // namespace ceph::osd_cmds
-
-
-std::ostream& operator<<(std::ostream& out, const io_queue& q) {
-  switch(q) {
-  case io_queue::prioritized:
-    out << "prioritized";
-    break;
-  case io_queue::weightedpriority:
-    out << "weightedpriority";
-    break;
-  case io_queue::mclock_opclass:
-    out << "mclock_opclass";
-    break;
-  case io_queue::mclock_client:
-    out << "mclock_client";
-    break;
-  }
-  return out;
-}
index eeef8c5d206bfcf092b9a9f6c37df81a7e813034..7713c208f8a6e3227c595cd1961f000a79d1c6fd 100644 (file)
@@ -41,7 +41,7 @@
 #include "OpRequest.h"
 #include "Session.h"
 
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpScheduler.h"
 
 #include <atomic>
 #include <map>
@@ -55,8 +55,6 @@
 #include "common/sharedptr_registry.hpp"
 #include "common/WeightedPriorityQueue.h"
 #include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
 #include "messages/MOSDOp.h"
 #include "common/EventTrace.h"
 #include "osd/osd_perf_counters.h"
@@ -105,6 +103,7 @@ class MMonGetPurgedSnapsReply;
 class OSD;
 
 class OSDService {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
 public:
   OSD *osd;
   CephContext *cct;
@@ -125,8 +124,8 @@ public:
   md_config_cacher_t<Option::size_t> osd_max_object_size;
   md_config_cacher_t<bool> osd_skip_data_digest;
 
-  void enqueue_back(OpQueueItem&& qi);
-  void enqueue_front(OpQueueItem&& qi);
+  void enqueue_back(OpSchedulerItem&& qi);
+  void enqueue_front(OpSchedulerItem&& qi);
 
   void maybe_inject_dispatch_delay() {
     if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
@@ -903,15 +902,6 @@ public:
   ~OSDService();
 };
 
-
-enum class io_queue {
-  prioritized,
-  weightedpriority,
-  mclock_opclass,
-  mclock_client,
-};
-
-
 /*
 
   Each PG slot includes queues for events that are processing and/or waiting
@@ -941,7 +931,7 @@ enum class io_queue {
     don't affect the given PG.)
 
   - we maintain two separate wait lists, *waiting* and *waiting_peering*. The
-    OpQueueItem has an is_peering() bool to determine which we use.  Waiting
+    OpSchedulerItem has an is_peering() bool to determine which we use.  Waiting
     peering events are queued up by epoch required.
 
   - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
@@ -963,14 +953,15 @@ enum class io_queue {
   */
 
 struct OSDShardPGSlot {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
   PGRef pg;                      ///< pg reference
-  deque<OpQueueItem> to_process; ///< order items for this slot
+  deque<OpSchedulerItem> to_process; ///< order items for this slot
   int num_running = 0;          ///< _process threads doing pg lookup/lock
 
-  deque<OpQueueItem> waiting;   ///< waiting for pg (or map + pg)
+  deque<OpSchedulerItem> waiting;   ///< waiting for pg (or map + pg)
 
   /// waiting for map (peering evt)
-  map<epoch_t,deque<OpQueueItem>> waiting_peering;
+  map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
 
   /// incremented by wake_pg_waiters; indicates racing _process threads
   /// should bail out (their op has been requeued)
@@ -1010,7 +1001,7 @@ struct OSDShard {
   ceph::mutex shard_lock;   ///< protects remaining members below
 
   /// map of slots for each spg_t.  maintains ordering of items dequeued
-  /// from pqueue while _process thread drops shard lock to acquire the
+  /// from scheduler while _process thread drops shard lock to acquire the
   /// pg lock.  stale slots are removed by consume_map.
   unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
 
@@ -1032,25 +1023,12 @@ struct OSDShard {
   ceph::condition_variable min_pg_epoch_cond;
 
   /// priority queue
-  std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+  ceph::osd::scheduler::OpSchedulerRef scheduler;
 
   bool stop_waiting = false;
 
   ContextQueue context_queue;
 
-  void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
-    unsigned priority = item.get_priority();
-    unsigned cost = item.get_cost();
-    if (priority >= cutoff)
-      pqueue->enqueue_strict_front(
-       item.get_owner(),
-       priority, std::move(item));
-    else
-      pqueue->enqueue_front(
-       item.get_owner(),
-       priority, cost, std::move(item));
-  }
-
   void _attach_pg(OSDShardPGSlot *slot, PG *pg);
   void _detach_pg(OSDShardPGSlot *slot);
 
@@ -1083,38 +1061,13 @@ struct OSDShard {
   OSDShard(
     int id,
     CephContext *cct,
-    OSD *osd,
-    uint64_t max_tok_per_prio, uint64_t min_cost,
-    io_queue opqueue)
-    : shard_id(id),
-      cct(cct),
-      osd(osd),
-      shard_name(string("OSDShard.") + stringify(id)),
-      sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
-      sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
-      osdmap_lock_name(shard_name + "::osdmap_lock"),
-      osdmap_lock{make_mutex(osdmap_lock_name)},
-      shard_lock_name(shard_name + "::shard_lock"),
-      shard_lock{make_mutex(shard_lock_name)},
-      context_queue(sdata_wait_lock, sdata_cond) {
-    if (opqueue == io_queue::weightedpriority) {
-      pqueue = std::make_unique<
-       WeightedPriorityQueue<OpQueueItem,uint64_t>>(
-         max_tok_per_prio, min_cost);
-    } else if (opqueue == io_queue::prioritized) {
-      pqueue = std::make_unique<
-       PrioritizedQueue<OpQueueItem,uint64_t>>(
-         max_tok_per_prio, min_cost);
-    } else if (opqueue == io_queue::mclock_opclass) {
-      pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
-    } else if (opqueue == io_queue::mclock_client) {
-      pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
-    }
-  }
+    OSD *osd);
 };
 
 class OSD : public Dispatcher,
            public md_config_obs_t {
+  using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
   /** OSD **/
   // global lock
   ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
@@ -1591,24 +1544,18 @@ private:
   friend struct C_FinishSplits;
   friend struct C_OpenPGs;
 
-  // -- op queue --
-  friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-  const io_queue op_queue;
-public:
-  const unsigned int op_prio_cutoff;
 protected:
 
   /*
    * The ordered op delivery chain is:
    *
-   *   fast dispatch -> pqueue back
-   *                    pqueue front <-> to_process back
+   *   fast dispatch -> scheduler back
+   *                    scheduler front <-> to_process back
    *                                     to_process front  -> RunVis(item)
    *                                                      <- queue_front()
    *
-   * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
-   * pushed back up into to_process and/or pqueue while order is preserved.
+   * The scheduler is per-shard, and to_process is per pg_slot.  Items can be
+   * pushed back up into to_process and/or scheduler while order is preserved.
    *
    * Multiple worker threads can operate on each shard.
    *
@@ -1619,13 +1566,13 @@ protected:
    * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
    * and already requeued the items.
    */
-  friend class PGOpItem;
-  friend class PGPeeringItem;
-  friend class PGRecovery;
-  friend class PGDelete;
+  friend class ceph::osd::scheduler::PGOpItem;
+  friend class ceph::osd::scheduler::PGPeeringItem;
+  friend class ceph::osd::scheduler::PGRecovery;
+  friend class ceph::osd::scheduler::PGDelete;
 
   class ShardedOpWQ
-    : public ShardedThreadPool::ShardedWQ<OpQueueItem>
+    : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
   {
     OSD *osd;
 
@@ -1634,23 +1581,23 @@ protected:
                time_t ti,
                time_t si,
                ShardedThreadPool* tp)
-      : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
+      : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
         osd(o) {
     }
 
     void _add_slot_waiter(
       spg_t token,
       OSDShardPGSlot *slot,
-      OpQueueItem&& qi);
+      OpSchedulerItem&& qi);
 
     /// try to do some work
     void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
 
     /// enqueue a new item
-    void _enqueue(OpQueueItem&& item) override;
+    void _enqueue(OpSchedulerItem&& item) override;
 
     /// requeue an old item (at the front of the line)
-    void _enqueue_front(OpQueueItem&& item) override;
+    void _enqueue_front(OpSchedulerItem&& item) override;
       
     void return_waiting_threads() override {
       for(uint32_t i = 0; i < osd->num_shards; i++) {
@@ -1681,7 +1628,7 @@ protected:
 
        std::scoped_lock l{sdata->shard_lock};
        f->open_object_section(queue_name);
-       sdata->pqueue->dump(f);
+       sdata->scheduler->dump(*f);
        f->close_section();
       }
     }
@@ -1692,9 +1639,9 @@ protected:
       ceph_assert(sdata);
       std::lock_guard l(sdata->shard_lock);
       if (thread_index < osd->num_shards) {
-       return sdata->pqueue->empty() && sdata->context_queue.empty();
+       return sdata->scheduler->empty() && sdata->context_queue.empty();
       } else {
-       return sdata->pqueue->empty();
+       return sdata->scheduler->empty();
       }
     }
 
@@ -2041,39 +1988,6 @@ private:
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
 
-  io_queue get_io_queue() const {
-    if (cct->_conf->osd_op_queue == "debug_random") {
-      static io_queue index_lookup[] = { io_queue::prioritized,
-                                        io_queue::weightedpriority,
-                                        io_queue::mclock_opclass,
-                                        io_queue::mclock_client };
-      srand(time(NULL));
-      unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
-      return index_lookup[which];
-    } else if (cct->_conf->osd_op_queue == "prioritized") {
-      return io_queue::prioritized;
-    } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
-      return io_queue::mclock_opclass;
-    } else if (cct->_conf->osd_op_queue == "mclock_client") {
-      return io_queue::mclock_client;
-    } else {
-      // default / catch-all is 'wpq'
-      return io_queue::weightedpriority;
-    }
-  }
-
-  unsigned int get_io_prio_cut() const {
-    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
-      srand(time(NULL));
-      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
-    } else if (cct->_conf->osd_op_queue_cut_off == "high") {
-      return CEPH_MSG_PRIO_HIGH;
-    } else {
-      // default / catch-all is 'low'
-      return CEPH_MSG_PRIO_LOW;
-    }
-  }
-
  public:
   /* internal and external can point to the same messenger, they will still
    * be cleaned up properly*/
@@ -2172,9 +2086,6 @@ private:
 };
 
 
-std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-
 //compatibility of the executable
 extern const CompatSet::Feature ceph_osd_feature_compat[];
 extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
diff --git a/src/osd/OpQueueItem.cc b/src/osd/OpQueueItem.cc
deleted file mode 100644 (file)
index 1deb1e7..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 Red Hat Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "OpQueueItem.h"
-#include "OSD.h"
-
-void PGOpItem::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  osd->dequeue_op(pg, op, handle);
-  pg->unlock();
-}
-
-void PGPeeringItem::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
-}
-
-void PGSnapTrim::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  pg->snap_trimmer(epoch_queued);
-  pg->unlock();
-}
-
-void PGScrub::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  pg->scrub(epoch_queued, handle);
-  pg->unlock();
-}
-
-void PGRecovery::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
-  pg->unlock();
-}
-
-void PGRecoveryContext::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  c.release()->complete(handle);
-  pg->unlock();
-}
-
-void PGDelete::run(
-  OSD *osd,
-  OSDShard *sdata,
-  PGRef& pg,
-  ThreadPool::TPHandle &handle)
-{
-  osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle);
-}
diff --git a/src/osd/OpQueueItem.h b/src/osd/OpQueueItem.h
deleted file mode 100644 (file)
index 0680e0b..0000000
+++ /dev/null
@@ -1,342 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 Red Hat Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#pragma once
-
-#include <ostream>
-
-#include "include/types.h"
-#include "include/utime.h"
-#include "osd/OpRequest.h"
-#include "osd/PG.h"
-#include "PGPeeringEvent.h"
-
-class OSD;
-class OSDShard;
-
-class OpQueueItem {
-public:
-  class OrderLocker {
-  public:
-    using Ref = unique_ptr<OrderLocker>;
-    virtual void lock() = 0;
-    virtual void unlock() = 0;
-    virtual ~OrderLocker() {}
-  };
-  // Abstraction for operations queueable in the op queue
-  class OpQueueable {
-  public:
-    enum class op_type_t {
-      client_op,
-      peering_event,
-      bg_snaptrim,
-      bg_recovery,
-      bg_scrub,
-      bg_pg_delete
-    };
-    using Ref = std::unique_ptr<OpQueueable>;
-
-    /// Items with the same queue token will end up in the same shard
-    virtual uint32_t get_queue_token() const = 0;
-
-    /* Items will be dequeued and locked atomically w.r.t. other items with the
-       * same ordering token */
-    virtual const spg_t& get_ordering_token() const = 0;
-    virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
-    virtual op_type_t get_op_type() const = 0;
-    virtual std::optional<OpRequestRef> maybe_get_op() const {
-      return std::nullopt;
-    }
-
-    virtual uint64_t get_reserved_pushes() const {
-      return 0;
-    }
-
-    virtual bool is_peering() const {
-      return false;
-    }
-    virtual bool peering_requires_pg() const {
-      ceph_abort();
-    }
-    virtual const PGCreateInfo *creates_pg() const {
-      return nullptr;
-    }
-
-    virtual ostream &print(ostream &rhs) const = 0;
-
-    virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
-    virtual ~OpQueueable() {}
-    friend ostream& operator<<(ostream& out, const OpQueueable& q) {
-      return q.print(out);
-    }
-
-  };
-
-private:
-  OpQueueable::Ref qitem;
-  int cost;
-  unsigned priority;
-  utime_t start_time;
-  uint64_t owner;  ///< global id (e.g., client.XXX)
-  epoch_t map_epoch;    ///< an epoch we expect the PG to exist in
-
-public:
-  OpQueueItem(
-    OpQueueable::Ref &&item,
-    int cost,
-    unsigned priority,
-    utime_t start_time,
-    uint64_t owner,
-    epoch_t e)
-    : qitem(std::move(item)),
-      cost(cost),
-      priority(priority),
-      start_time(start_time),
-      owner(owner),
-      map_epoch(e)
-  {}
-  OpQueueItem(OpQueueItem &&) = default;
-  OpQueueItem(const OpQueueItem &) = delete;
-  OpQueueItem &operator=(OpQueueItem &&) = default;
-  OpQueueItem &operator=(const OpQueueItem &) = delete;
-
-  OrderLocker::Ref get_order_locker(PGRef pg) {
-    return qitem->get_order_locker(pg);
-  }
-  uint32_t get_queue_token() const {
-    return qitem->get_queue_token();
-  }
-  const spg_t& get_ordering_token() const {
-    return qitem->get_ordering_token();
-  }
-  using op_type_t = OpQueueable::op_type_t;
-  OpQueueable::op_type_t get_op_type() const {
-    return qitem->get_op_type();
-  }
-  std::optional<OpRequestRef> maybe_get_op() const {
-    return qitem->maybe_get_op();
-  }
-  uint64_t get_reserved_pushes() const {
-    return qitem->get_reserved_pushes();
-  }
-  void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
-    qitem->run(osd, sdata, pg, handle);
-  }
-  unsigned get_priority() const { return priority; }
-  int get_cost() const { return cost; }
-  utime_t get_start_time() const { return start_time; }
-  uint64_t get_owner() const { return owner; }
-  epoch_t get_map_epoch() const { return map_epoch; }
-
-  bool is_peering() const {
-    return qitem->is_peering();
-  }
-
-  const PGCreateInfo *creates_pg() const {
-    return qitem->creates_pg();
-  }
-
-  bool peering_requires_pg() const {
-    return qitem->peering_requires_pg();
-  }
-
-  friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
-     out << "OpQueueItem("
-        << item.get_ordering_token() << " " << *item.qitem
-        << " prio " << item.get_priority()
-        << " cost " << item.get_cost()
-        << " e" << item.get_map_epoch();
-     if (item.get_reserved_pushes()) {
-       out << " reserved_pushes " << item.get_reserved_pushes();
-     }
-    return out << ")";
-  }
-}; // class OpQueueItem
-
-/// Implements boilerplate for operations queued for the pg lock
-class PGOpQueueable : public OpQueueItem::OpQueueable {
-  spg_t pgid;
-protected:
-  const spg_t& get_pgid() const {
-    return pgid;
-  }
-public:
-  explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
-  uint32_t get_queue_token() const override final {
-    return get_pgid().ps();
-  }
-
-  const spg_t& get_ordering_token() const override final {
-    return get_pgid();
-  }
-
-  OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final {
-    class Locker : public OpQueueItem::OrderLocker {
-      PGRef pg;
-    public:
-      explicit Locker(PGRef pg) : pg(pg) {}
-      void lock() override final {
-       pg->lock();
-      }
-      void unlock() override final {
-       pg->unlock();
-      }
-    };
-    return OpQueueItem::OrderLocker::Ref(
-      new Locker(pg));
-  }
-};
-
-class PGOpItem : public PGOpQueueable {
-  OpRequestRef op;
-public:
-  PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::client_op;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
-  }
-  std::optional<OpRequestRef> maybe_get_op() const override final {
-    return op;
-  }
-  void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGPeeringItem : public PGOpQueueable {
-  PGPeeringEventRef evt;
-public:
-  PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::peering_event;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
-  }
-  void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-  bool is_peering() const override {
-    return true;
-  }
-  bool peering_requires_pg() const override {
-    return evt->requires_pg;
-  }
-  const PGCreateInfo *creates_pg() const override {
-    return evt->create_info.get();
-  }
-};
-
-class PGSnapTrim : public PGOpQueueable {
-  epoch_t epoch_queued;
-public:
-  PGSnapTrim(
-    spg_t pg,
-    epoch_t epoch_queued)
-    : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::bg_snaptrim;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGSnapTrim(pgid=" << get_pgid()
-              << "epoch_queued=" << epoch_queued
-              << ")";
-  }
-  void run(
-    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGScrub : public PGOpQueueable {
-  epoch_t epoch_queued;
-public:
-  PGScrub(
-    spg_t pg,
-    epoch_t epoch_queued)
-    : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::bg_scrub;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGScrub(pgid=" << get_pgid()
-              << "epoch_queued=" << epoch_queued
-              << ")";
-  }
-  void run(
-    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGRecovery : public PGOpQueueable {
-  epoch_t epoch_queued;
-  uint64_t reserved_pushes;
-public:
-  PGRecovery(
-    spg_t pg,
-    epoch_t epoch_queued,
-    uint64_t reserved_pushes)
-    : PGOpQueueable(pg),
-      epoch_queued(epoch_queued),
-      reserved_pushes(reserved_pushes) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::bg_recovery;
-  }
-  virtual ostream &print(ostream &rhs) const override final {
-    return rhs << "PGRecovery(pgid=" << get_pgid()
-              << "epoch_queued=" << epoch_queued
-              << "reserved_pushes=" << reserved_pushes
-              << ")";
-  }
-  virtual uint64_t get_reserved_pushes() const override final {
-    return reserved_pushes;
-  }
-  virtual void run(
-    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGRecoveryContext : public PGOpQueueable {
-  unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
-  epoch_t epoch;
-public:
-  PGRecoveryContext(spg_t pgid,
-                   GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
-    : PGOpQueueable(pgid),
-      c(c), epoch(epoch) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::bg_recovery;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGRecoveryContext(pgid=" << get_pgid()
-              << " c=" << c.get() << " epoch=" << epoch
-              << ")";
-  }
-  void run(
-    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGDelete : public PGOpQueueable {
-  epoch_t epoch_queued;
-public:
-  PGDelete(
-    spg_t pg,
-    epoch_t epoch_queued)
-    : PGOpQueueable(pg),
-      epoch_queued(epoch_queued) {}
-  op_type_t get_op_type() const override final {
-    return op_type_t::bg_pg_delete;
-  }
-  ostream &print(ostream &rhs) const override final {
-    return rhs << "PGDelete(" << get_pgid()
-              << " e" << epoch_queued
-              << ")";
-  }
-  void run(
-    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
index 949e8507d6660bdad6c72906a5c7f9bb40c5a097..e4e82891c5ddc0920f826cec4ba8e312b35145b4 100644 (file)
@@ -25,6 +25,7 @@
 #include "OpRequest.h"
 #include "ScrubStore.h"
 #include "Session.h"
+#include "osd/scheduler/OpSchedulerItem.h"
 
 #include "common/Timer.h"
 #include "common/perf_counters.h"
@@ -76,6 +77,8 @@
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 
+using namespace ceph::osd::scheduler;
+
 template <class T>
 static ostream& _prefix(std::ostream *_dout, T *t)
 {
@@ -1270,8 +1273,8 @@ void PG::requeue_op(OpRequestRef op)
   } else {
     dout(20) << __func__ << " " << op << dendl;
     osd->enqueue_front(
-      OpQueueItem(
-        unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
        op->get_req()->get_cost(),
        op->get_req()->get_priority(),
        op->get_req()->get_recv_stamp(),
@@ -1304,8 +1307,8 @@ void PG::requeue_map_waiters()
       dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
       for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
        auto req = *q;
-       osd->enqueue_front(OpQueueItem(
-          unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, req)),
+       osd->enqueue_front(OpSchedulerItem(
+          unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
          req->get_req()->get_cost(),
          req->get_req()->get_priority(),
          req->get_req()->get_recv_stamp(),
index ec013eb085ac45b6abd38c6472c7f759fb68546e..2ef4ebf8b6cf67c56400de01e2cc0f73a4b4bee4 100644 (file)
@@ -76,6 +76,8 @@ static ostream& _prefix(std::ostream *_dout, T *pg) {
 
 MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
 
+using namespace ceph::osd::scheduler;
+
 /**
  * The CopyCallback class defines an interface for completions to the
  * copy_start code. Users of the copy infrastructure must implement
@@ -11606,8 +11608,8 @@ void PrimaryLogPG::_applied_recovered_object_replica()
        scrubber.active_rep_scrub->get_req())->chunky) {
     auto& op = scrubber.active_rep_scrub;
     osd->enqueue_back(
-      OpQueueItem(
-        unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+      OpSchedulerItem(
+        unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
        op->get_req()->get_cost(),
        op->get_req()->get_priority(),
        op->get_req()->get_recv_stamp(),
index 84454ff6bd0504dbd96bf8d42f792f0aaa29710f..ea0866e3031792824ad1bdef3bb037957a2e5ba5 100644 (file)
 #include "common/config.h"
 #include "common/ceph_context.h"
 #include "common/mClockPriorityQueue.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
 #include "osd/mClockOpClassSupport.h"
 
 
 namespace ceph {
 
-  using Request = OpQueueItem;
+  using Request = ceph::osd::scheduler::OpSchedulerItem;
   using Client = uint64_t;
 
   // This class exists to bridge the ceph code, which treats the class
@@ -103,9 +103,13 @@ namespace ceph {
     // Formatted output of the queue
     void dump(ceph::Formatter *f) const override final;
 
+    void print(std::ostream &ostream) const final {
+      ostream << "mClockClientQueue";
+    }
+
   protected:
 
     InnerClient get_inner_client(const Client& cl, const Request& request);
-  }; // class mClockClientAdapter
+  }; // class mClockClientQueue
 
 } // namespace ceph
index 3ad7f71972d179e0b972e1c4937d0fa902483c18..0e069ee3f267d409e95f13cd8fc83c6be70bb070 100644 (file)
 #include "common/config.h"
 #include "common/ceph_context.h"
 #include "common/mClockPriorityQueue.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
 #include "osd/mClockOpClassSupport.h"
 
 
 namespace ceph {
 
-  using Request = OpQueueItem;
+  using Request = ceph::osd::scheduler::OpSchedulerItem;
   using Client = uint64_t;
 
   // This class exists to bridge the ceph code, which treats the class
@@ -121,5 +121,9 @@ namespace ceph {
 
     // Formatted output of the queue
     void dump(ceph::Formatter *f) const override final;
-  }; // class mClockOpClassAdapter
+
+    void print(std::ostream &ostream) const final {
+      ostream << "mClockOpClassQueue";
+    }
+  }; // class mClockOpClassQueue
 } // namespace ceph
index 49cad6ced58e25b6a1193f1ef9e16dc4b90d15f3..c8501eb90fd76495adbf062b17b9876c802ed69b 100644 (file)
  */
 
 
-#include "common/dout.h"
 #include "osd/mClockOpClassSupport.h"
-#include "osd/OpQueueItem.h"
-
+#include "common/dout.h"
 #include "include/ceph_assert.h"
 
 namespace ceph {
@@ -80,7 +78,8 @@ namespace ceph {
     }
 
     osd_op_type_t
-    OpClassClientInfoMgr::osd_op_type(const OpQueueItem& op) const {
+    OpClassClientInfoMgr::osd_op_type(
+      const ceph::osd::scheduler::OpSchedulerItem& op) const {
       osd_op_type_t type = convert_op_type(op.get_op_type());
       if (osd_op_type_t::client_op != type) {
        return type;
index 1ea1043eb08457ab7b79ec7c0c252a09cbd81bbf..f99c64176993fa52c95ddfc170bc34aadc08f32e 100644 (file)
 
 #include "dmclock/src/dmclock_server.h"
 #include "osd/OpRequest.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
 
 
 namespace ceph {
   namespace mclock {
 
-    using op_item_type_t = OpQueueItem::OpQueueable::op_type_t;
+    using op_item_type_t =
+      ceph::osd::scheduler::OpSchedulerItem::OpQueueable::op_type_t;
     
     enum class osd_op_type_t {
       client_op, osd_rep_op, bg_snaptrim, bg_recovery, bg_scrub, bg_pg_delete,
@@ -93,7 +94,8 @@ namespace ceph {
        }
       }
 
-      osd_op_type_t osd_op_type(const OpQueueItem&) const;
+      osd_op_type_t osd_op_type(
+       const ceph::osd::scheduler::OpSchedulerItem&) const;
 
       // used for debugging since faster implementation can be done
       // with rep_op_msg_bitmap
diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc
new file mode 100644 (file)
index 0000000..f888627
--- /dev/null
@@ -0,0 +1,77 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include "osd/scheduler/OpScheduler.h"
+
+#include "common/PrioritizedQueue.h"
+#include "common/WeightedPriorityQueue.h"
+#include "osd/scheduler/mClockScheduler.h"
+#include "osd/mClockClientQueue.h"
+#include "osd/mClockOpClassQueue.h"
+
+namespace ceph::osd::scheduler {
+
+OpSchedulerRef make_scheduler(CephContext *cct)
+{
+  const std::string *type = &cct->_conf->osd_op_queue;
+  if (*type == "debug_random") {
+    static const std::string index_lookup[] = { "prioritized",
+                                               "mclock_opclass",
+                                               "mclock_client",
+                                               "wpq" };
+    srand(time(NULL));
+    unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+    type = &index_lookup[which];
+  }
+
+  if (*type == "prioritized") {
+    return std::make_unique<
+      ClassedOpQueueScheduler<PrioritizedQueue<OpSchedulerItem, client>>>(
+       cct,
+       cct->_conf->osd_op_pq_max_tokens_per_priority,
+       cct->_conf->osd_op_pq_min_cost
+      );
+  } else if (*type == "mclock_opclass") {
+    return std::make_unique<
+      ClassedOpQueueScheduler<mClockOpClassQueue>>(
+       cct,
+       cct
+    );
+  } else if (*type == "mclock_client") {
+    return std::make_unique<
+      ClassedOpQueueScheduler<mClockClientQueue>>(
+       cct,
+       cct
+    );
+  } else if (*type == "wpq" ) {
+    // default is 'wpq'
+    return std::make_unique<
+      ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>(
+       cct,
+       cct->_conf->osd_op_pq_max_tokens_per_priority,
+       cct->_conf->osd_op_pq_min_cost
+    );
+  } else {
+    ceph_assert("Invalid choice of wq" == 0);
+  }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &rhs) {
+  rhs.print(lhs);
+  return lhs;
+}
+
+}
diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h
new file mode 100644 (file)
index 0000000..ae770a7
--- /dev/null
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "common/ceph_context.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+namespace ceph::osd::scheduler {
+
+using client = uint64_t;
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class OpScheduler {
+public:
+  // Enqueue op for scheduling
+  virtual void enqueue(OpSchedulerItem &&item) = 0;
+
+  // Enqueue op for processing as though it were enqueued prior
+  // to other items already scheduled.
+  virtual void enqueue_front(OpSchedulerItem &&item) = 0;
+
+  // Returns true iff there are no ops scheduled
+  virtual bool empty() const = 0;
+
+  // Return next op to be processed
+  virtual OpSchedulerItem dequeue() = 0;
+
+  // Dump formatted representation for the queue
+  virtual void dump(ceph::Formatter &f) const = 0;
+
+  // Print human readable brief description with relevant parameters
+  virtual void print(std::ostream &out) const = 0;
+
+  // Destructor
+  virtual ~OpScheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &);
+using OpSchedulerRef = std::unique_ptr<OpScheduler>;
+
+OpSchedulerRef make_scheduler(CephContext *cct);
+
+/**
+ * Implements OpScheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<OpSchedulerItem, client>.  This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler : public OpScheduler {
+  unsigned cutoff;
+  T queue;
+
+  static unsigned int get_io_prio_cut(CephContext *cct) {
+    if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+      srand(time(NULL));
+      return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+    } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+      return CEPH_MSG_PRIO_HIGH;
+    } else {
+      // default / catch-all is 'low'
+      return CEPH_MSG_PRIO_LOW;
+    }
+  }
+public:
+  template <typename... Args>
+  ClassedOpQueueScheduler(CephContext *cct, Args&&... args) :
+    cutoff(get_io_prio_cut(cct)),
+    queue(std::forward<Args>(args)...)
+  {}
+
+  void enqueue(OpSchedulerItem &&item) final {
+    unsigned priority = item.get_priority();
+    unsigned cost = item.get_cost();
+
+    if (priority >= cutoff)
+      queue.enqueue_strict(
+       item.get_owner(), priority, std::move(item));
+    else
+      queue.enqueue(
+       item.get_owner(), priority, cost, std::move(item));
+  }
+
+  void enqueue_front(OpSchedulerItem &&item) final {
+    unsigned priority = item.get_priority();
+    unsigned cost = item.get_cost();
+    if (priority >= cutoff)
+      queue.enqueue_strict_front(
+       item.get_owner(),
+       priority, std::move(item));
+    else
+      queue.enqueue_front(
+       item.get_owner(),
+       priority, cost, std::move(item));
+  }
+
+  bool empty() const final {
+    return queue.empty();
+  }
+
+  OpSchedulerItem dequeue() final {
+    return queue.dequeue();
+  }
+
+  void dump(ceph::Formatter &f) const final {
+    return queue.dump(&f);
+  }
+
+  void print(std::ostream &out) const final {
+    out << "ClassedOpQueueScheduler(queue=";
+    queue.print(out);
+    out << ", cutoff=" << cutoff << ")";
+  }
+
+  ~ClassedOpQueueScheduler() final {};
+};
+
+}
diff --git a/src/osd/scheduler/OpSchedulerItem.cc b/src/osd/scheduler/OpSchedulerItem.cc
new file mode 100644 (file)
index 0000000..28069dc
--- /dev/null
@@ -0,0 +1,88 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "osd/scheduler/OpSchedulerItem.h"
+#include "osd/OSD.h"
+
+namespace ceph::osd::scheduler {
+
+void PGOpItem::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  osd->dequeue_op(pg, op, handle);
+  pg->unlock();
+}
+
+void PGPeeringItem::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
+}
+
+void PGSnapTrim::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  pg->snap_trimmer(epoch_queued);
+  pg->unlock();
+}
+
+void PGScrub::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  pg->scrub(epoch_queued, handle);
+  pg->unlock();
+}
+
+void PGRecovery::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
+  pg->unlock();
+}
+
+void PGRecoveryContext::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  c.release()->complete(handle);
+  pg->unlock();
+}
+
+void PGDelete::run(
+  OSD *osd,
+  OSDShard *sdata,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle);
+}
+
+}
diff --git a/src/osd/scheduler/OpSchedulerItem.h b/src/osd/scheduler/OpSchedulerItem.h
new file mode 100644 (file)
index 0000000..5636221
--- /dev/null
@@ -0,0 +1,438 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "osd/OpRequest.h"
+#include "osd/PG.h"
+#include "osd/PGPeeringEvent.h"
+#include "common/mClockCommon.h"
+#include "messages/MOSDOp.h"
+
+
+class OSD;
+class OSDShard;
+
+namespace ceph::osd::scheduler {
+
+enum class op_scheduler_class : uint8_t {
+  background_recovery = 0,
+  background_best_effort,
+  immediate,
+  client,
+};
+
+class OpSchedulerItem {
+public:
+  class OrderLocker {
+  public:
+    using Ref = unique_ptr<OrderLocker>;
+    virtual void lock() = 0;
+    virtual void unlock() = 0;
+    virtual ~OrderLocker() {}
+  };
+
+  // Abstraction for operations queueable in the op queue
+  class OpQueueable {
+  public:
+    enum class op_type_t {
+      client_op,
+      peering_event,
+      bg_snaptrim,
+      bg_recovery,
+      bg_scrub,
+      bg_pg_delete
+    };
+    using Ref = std::unique_ptr<OpQueueable>;
+
+    /// Items with the same queue token will end up in the same shard
+    virtual uint32_t get_queue_token() const = 0;
+
+    /* Items will be dequeued and locked atomically w.r.t. other items with the
+       * same ordering token */
+    virtual const spg_t& get_ordering_token() const = 0;
+    virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+    virtual op_type_t get_op_type() const = 0;
+    virtual std::optional<OpRequestRef> maybe_get_op() const {
+      return std::nullopt;
+    }
+
+    virtual uint64_t get_reserved_pushes() const {
+      return 0;
+    }
+
+    virtual bool is_peering() const {
+      return false;
+    }
+    virtual bool peering_requires_pg() const {
+      ceph_abort();
+    }
+    virtual const PGCreateInfo *creates_pg() const {
+      return nullptr;
+    }
+
+    virtual ostream &print(ostream &rhs) const = 0;
+
+    virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+    virtual op_scheduler_class get_scheduler_class() const = 0;
+
+    virtual std::optional<ceph::qos::mclock_profile_params_t>
+    get_mclock_profile_params() const {
+      return std::nullopt;
+    }
+
+    virtual std::optional<ceph::qos::dmclock_request_t>
+    get_dmclock_request_state() const {
+      return std::nullopt;
+    }
+
+    virtual ~OpQueueable() {}
+    friend ostream& operator<<(ostream& out, const OpQueueable& q) {
+      return q.print(out);
+    }
+
+  };
+
+private:
+  OpQueueable::Ref qitem;
+  int cost;
+  unsigned priority;
+  utime_t start_time;
+  uint64_t owner;  ///< global id (e.g., client.XXX)
+  epoch_t map_epoch;    ///< an epoch we expect the PG to exist in
+
+public:
+  OpSchedulerItem(
+    OpQueueable::Ref &&item,
+    int cost,
+    unsigned priority,
+    utime_t start_time,
+    uint64_t owner,
+    epoch_t e)
+    : qitem(std::move(item)),
+      cost(cost),
+      priority(priority),
+      start_time(start_time),
+      owner(owner),
+      map_epoch(e)
+  {}
+  OpSchedulerItem(OpSchedulerItem &&) = default;
+  OpSchedulerItem(const OpSchedulerItem &) = delete;
+  OpSchedulerItem &operator=(OpSchedulerItem &&) = default;
+  OpSchedulerItem &operator=(const OpSchedulerItem &) = delete;
+
+  OrderLocker::Ref get_order_locker(PGRef pg) {
+    return qitem->get_order_locker(pg);
+  }
+  uint32_t get_queue_token() const {
+    return qitem->get_queue_token();
+  }
+  const spg_t& get_ordering_token() const {
+    return qitem->get_ordering_token();
+  }
+  using op_type_t = OpQueueable::op_type_t;
+  OpQueueable::op_type_t get_op_type() const {
+    return qitem->get_op_type();
+  }
+  std::optional<OpRequestRef> maybe_get_op() const {
+    return qitem->maybe_get_op();
+  }
+  uint64_t get_reserved_pushes() const {
+    return qitem->get_reserved_pushes();
+  }
+  void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
+    qitem->run(osd, sdata, pg, handle);
+  }
+  unsigned get_priority() const { return priority; }
+  int get_cost() const { return cost; }
+  utime_t get_start_time() const { return start_time; }
+  uint64_t get_owner() const { return owner; }
+  epoch_t get_map_epoch() const { return map_epoch; }
+
+  auto get_mclock_profile_params() const {
+    return qitem->get_mclock_profile_params();
+  }
+  auto get_dmclock_request_state() const {
+    return qitem->get_dmclock_request_state();
+  }
+
+  bool is_peering() const {
+    return qitem->is_peering();
+  }
+
+  const PGCreateInfo *creates_pg() const {
+    return qitem->creates_pg();
+  }
+
+  bool peering_requires_pg() const {
+    return qitem->peering_requires_pg();
+  }
+
+  op_scheduler_class get_scheduler_class() const {
+    return qitem->get_scheduler_class();
+  }
+
+  friend ostream& operator<<(ostream& out, const OpSchedulerItem& item) {
+     out << "OpSchedulerItem("
+        << item.get_ordering_token() << " " << *item.qitem
+        << " prio " << item.get_priority()
+        << " cost " << item.get_cost()
+        << " e" << item.get_map_epoch();
+     if (item.get_reserved_pushes()) {
+       out << " reserved_pushes " << item.get_reserved_pushes();
+     }
+    return out << ")";
+  }
+}; // class OpSchedulerItem
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpSchedulerItem::OpQueueable {
+  spg_t pgid;
+protected:
+  const spg_t& get_pgid() const {
+    return pgid;
+  }
+public:
+  explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
+  uint32_t get_queue_token() const final {
+    return get_pgid().ps();
+  }
+
+  const spg_t& get_ordering_token() const final {
+    return get_pgid();
+  }
+
+  OpSchedulerItem::OrderLocker::Ref get_order_locker(PGRef pg) final {
+    class Locker : public OpSchedulerItem::OrderLocker {
+      PGRef pg;
+    public:
+      explicit Locker(PGRef pg) : pg(pg) {}
+      void lock() final {
+       pg->lock();
+      }
+      void unlock() final {
+       pg->unlock();
+      }
+    };
+    return OpSchedulerItem::OrderLocker::Ref(
+      new Locker(pg));
+  }
+};
+
+class PGOpItem : public PGOpQueueable {
+  OpRequestRef op;
+
+  const MOSDOp *maybe_get_mosd_op() const {
+    auto req = op->get_req();
+    if (req->get_type() == CEPH_MSG_OSD_OP) {
+      return op->get_req<MOSDOp>();
+    } else {
+      return nullptr;
+    }
+  }
+
+public:
+  PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
+  op_type_t get_op_type() const final {
+
+    return op_type_t::client_op;
+  }
+
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+  }
+
+  std::optional<OpRequestRef> maybe_get_op() const final {
+    return op;
+  }
+
+  op_scheduler_class get_scheduler_class() const final {
+    if (maybe_get_mosd_op()) {
+      return op_scheduler_class::client;
+    } else {
+      return op_scheduler_class::immediate;
+    }
+  }
+
+  std::optional<ceph::qos::mclock_profile_params_t>
+  get_mclock_profile_params() const final {
+    auto op = maybe_get_mosd_op();
+    if (!op)
+      return std::nullopt;
+
+    return op->get_mclock_profile_params();
+  }
+
+  std::optional<ceph::qos::dmclock_request_t>
+  get_dmclock_request_state() const final {
+    auto op = maybe_get_mosd_op();
+    if (!op)
+      return std::nullopt;
+
+    return op->get_dmclock_request_state();
+  }
+
+  void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+};
+
+class PGPeeringItem : public PGOpQueueable {
+  PGPeeringEventRef evt;
+public:
+  PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::peering_event;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+  }
+  void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  bool is_peering() const override {
+    return true;
+  }
+  bool peering_requires_pg() const override {
+    return evt->requires_pg;
+  }
+  const PGCreateInfo *creates_pg() const override {
+    return evt->create_info.get();
+  }
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::immediate;
+  }
+};
+
+class PGSnapTrim : public PGOpQueueable {
+  epoch_t epoch_queued;
+public:
+  PGSnapTrim(
+    spg_t pg,
+    epoch_t epoch_queued)
+    : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::bg_snaptrim;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGSnapTrim(pgid=" << get_pgid()
+              << "epoch_queued=" << epoch_queued
+              << ")";
+  }
+  void run(
+    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::background_best_effort;
+  }
+};
+
+class PGScrub : public PGOpQueueable {
+  epoch_t epoch_queued;
+public:
+  PGScrub(
+    spg_t pg,
+    epoch_t epoch_queued)
+    : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::bg_scrub;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGScrub(pgid=" << get_pgid()
+              << "epoch_queued=" << epoch_queued
+              << ")";
+  }
+  void run(
+    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::background_best_effort;
+  }
+};
+
+class PGRecovery : public PGOpQueueable {
+  epoch_t epoch_queued;
+  uint64_t reserved_pushes;
+public:
+  PGRecovery(
+    spg_t pg,
+    epoch_t epoch_queued,
+    uint64_t reserved_pushes)
+    : PGOpQueueable(pg),
+      epoch_queued(epoch_queued),
+      reserved_pushes(reserved_pushes) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::bg_recovery;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGRecovery(pgid=" << get_pgid()
+              << "epoch_queued=" << epoch_queued
+              << "reserved_pushes=" << reserved_pushes
+              << ")";
+  }
+  uint64_t get_reserved_pushes() const final {
+    return reserved_pushes;
+  }
+  void run(
+    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::background_recovery;
+  }
+};
+
+class PGRecoveryContext : public PGOpQueueable {
+  unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+  epoch_t epoch;
+public:
+  PGRecoveryContext(spg_t pgid,
+                   GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
+    : PGOpQueueable(pgid),
+      c(c), epoch(epoch) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::bg_recovery;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGRecoveryContext(pgid=" << get_pgid()
+              << " c=" << c.get() << " epoch=" << epoch
+              << ")";
+  }
+  void run(
+    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::background_recovery;
+  }
+};
+
+class PGDelete : public PGOpQueueable {
+  epoch_t epoch_queued;
+public:
+  PGDelete(
+    spg_t pg,
+    epoch_t epoch_queued)
+    : PGOpQueueable(pg),
+      epoch_queued(epoch_queued) {}
+  op_type_t get_op_type() const final {
+    return op_type_t::bg_pg_delete;
+  }
+  ostream &print(ostream &rhs) const final {
+    return rhs << "PGDelete(" << get_pgid()
+              << " e" << epoch_queued
+              << ")";
+  }
+  void run(
+    OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+  op_scheduler_class get_scheduler_class() const final {
+    return op_scheduler_class::background_best_effort;
+  }
+};
+
+}
index 70e054c7b933feccd53bf8437f81580102b3fa67..157f32fb70813a60ca5ff6c780ae77228c9376ad 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "osd/mClockClientQueue.h"
 
+using namespace ceph::osd::scheduler;
 
 int main(int argc, char **argv) {
   std::vector<const char*> args(argv, argv+argc);
@@ -36,26 +37,59 @@ public:
     client3(100000001)
   {}
 
-#if 0 // more work needed here
-  Request create_client_op(epoch_t e, uint64_t owner) {
-    return Request(spg_t(), OpQueueItem(OpRequestRef(), e));
+  struct MockDmclockItem : public PGOpQueueable {
+    ceph::qos::dmclock_request_t request;
+    MockDmclockItem(decltype(request) _request) :
+      PGOpQueueable(spg_t()), request(_request) {}
+
+public:
+    op_type_t get_op_type() const final {
+      return op_type_t::client_op;
+    }
+
+    ostream &print(ostream &rhs) const final { return rhs; }
+
+    std::optional<OpRequestRef> maybe_get_op() const final {
+      return std::nullopt;
+    }
+
+    op_scheduler_class get_scheduler_class() const final {
+      return op_scheduler_class::client;
+    }
+
+    std::optional<ceph::qos::dmclock_request_t>
+    get_dmclock_request_state() const final {
+      return request;
+    }
+
+    void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final {}
+  };
+
+  template <typename... Args>
+  Request create_dmclock(epoch_t e, uint64_t owner, Args... args) {
+    return Request(
+      OpSchedulerItem(
+       unique_ptr<OpSchedulerItem::OpQueueable>(
+         new MockDmclockItem(
+           std::forward<Args>(args)...)),
+       12, 12,
+       utime_t(), owner, e));
   }
-#endif
 
   Request create_snaptrim(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
                               12, 12,
                               utime_t(), owner, e));
   }
 
   Request create_scrub(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(spg_t(), e)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(spg_t(), e)),
                               12, 12,
                               utime_t(), owner, e));
   }
 
   Request create_recovery(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
                               12, 12,
                               utime_t(), owner, e));
   }
@@ -131,6 +165,36 @@ TEST_F(MClockClientQueueTest, TestEnqueue) {
 }
 
 
+TEST_F(MClockClientQueueTest, TestDistributedEnqueue) {
+  Request r1 = create_snaptrim(100, client1);
+  Request r2 = create_snaptrim(101, client2);
+  Request r3 = create_snaptrim(102, client3);
+  Request r4 = create_dmclock(103, client1, dmc::ReqParams(50,1));
+  Request r5 = create_dmclock(104, client2, dmc::ReqParams(30,1));
+  Request r6 = create_dmclock(105, client3, dmc::ReqParams(10,1));
+
+  q.enqueue(client1, 12, 0, std::move(r1));
+  q.enqueue(client2, 12, 0, std::move(r2));
+  q.enqueue(client3, 12, 0, std::move(r3));
+  q.enqueue(client1, 12, 0, std::move(r4));
+  q.enqueue(client2, 12, 0, std::move(r5));
+  q.enqueue(client3, 12, 0, std::move(r6));
+
+  Request r = q.dequeue();
+  r = q.dequeue();
+  r = q.dequeue();
+
+  r = q.dequeue();
+  ASSERT_EQ(105u, r.get_map_epoch());
+
+  r = q.dequeue();
+  ASSERT_EQ(104u, r.get_map_epoch());
+
+  r = q.dequeue();
+  ASSERT_EQ(103u, r.get_map_epoch());
+}
+
+
 TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
   q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
   q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
index 0f6b564ab5052f757f5c167cfa15d251d3ced9c8..b9da8379fb5149fcde7eb743ccdb21f66d15757d 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "osd/mClockOpClassQueue.h"
 
+using namespace ceph::osd::scheduler;
 
 int main(int argc, char **argv) {
   std::vector<const char*> args(argv, argv+argc);
@@ -40,24 +41,24 @@ public:
 
 #if 0 // more work needed here
   Request create_client_op(epoch_t e, uint64_t owner) {
-    return Request(spg_t(), OpQueueItem(OpRequestRef(), e));
+    return Request(spg_t(), OpSchedulerItem(OpRequestRef(), e));
   }
 #endif
 
   Request create_snaptrim(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
                               12, 12,
                               utime_t(), owner, e));
   }
 
   Request create_scrub(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(spg_t(), e)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(spg_t(), e)),
                               12, 12,
                               utime_t(), owner, e));
   }
 
   Request create_recovery(epoch_t e, uint64_t owner) {
-    return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
+    return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
                               12, 12,
                               utime_t(), owner, e));
   }