]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: introduce scheduler implementations and operation throttler
authorSamuel Just <sjust@redhat.com>
Tue, 1 Oct 2019 22:38:20 +0000 (15:38 -0700)
committerXuehan Xu <xxhdx1985126@163.com>
Sun, 26 Apr 2020 07:46:35 +0000 (15:46 +0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
12 files changed:
src/common/options.cc
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/background_recovery.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/background_recovery.h [new file with mode: 0644]
src/crimson/osd/scheduler/mclock_scheduler.cc [new file with mode: 0644]
src/crimson/osd/scheduler/mclock_scheduler.h [new file with mode: 0644]
src/crimson/osd/scheduler/scheduler.cc [new file with mode: 0644]
src/crimson/osd/scheduler/scheduler.h [new file with mode: 0644]
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 822bde5ce53d0d4f004d34cb6622932054e5e27f..fbf044d52958a8ac129ebf6e1c3721555262e1e0 100644 (file)
@@ -5338,7 +5338,11 @@ std::vector<Option> get_global_options() {
 
     Option("crimson_osd_obc_lru_size", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(10)
-    .set_description("Number of obcs to cache")
+    .set_description("Number of obcs to cache"),
+
+    Option("crimson_osd_scheduler_concurrency", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("The maximum number concurrent IO operations, 0 for unlimited")
 
   });
 }
index cfd7dc722a7e649b68e8a3d2f4e66e92dd0bdc3f..a4298c97fe7fea54c306e92244daa5db0ef41d2c 100644 (file)
@@ -18,6 +18,9 @@ add_executable(crimson-osd
   osd_operations/peering_event.cc
   osd_operations/pg_advance_map.cc
   osd_operations/replicated_request.cc
+  osd_operations/background_recovery.cc
+  scheduler/scheduler.cc
+  scheduler/mclock_scheduler.cc
   osdmap_gate.cc
   pg_map.cc
   objclass.cc
@@ -33,7 +36,11 @@ add_executable(crimson-osd
   watch.cc
   )
 target_link_libraries(crimson-osd
-  crimson-common crimson-os crimson fmt::fmt)
+  crimson-common
+  crimson-os
+  crimson
+  fmt::fmt
+  dmclock::dmclock)
 set_target_properties(crimson-osd PROPERTIES
   POSITION_INDEPENDENT_CODE ${EXE_LINKER_USE_PIE})
 install(TARGETS crimson-osd DESTINATION bin)
index 03024cb2017c966040e770862fb1b7012868c478..f906db29934ff5b340dc718013f91c66060a9bed 100644 (file)
@@ -51,6 +51,75 @@ void Blocker::dump(ceph::Formatter* f) const
   f->close_section();
 }
 
+
+OperationThrottler::OperationThrottler(ConfigProxy &conf)
+  : scheduler(crimson::osd::scheduler::make_scheduler(conf))
+{
+  conf.add_observer(this);
+  update_from_config(conf);
+}
+
+void OperationThrottler::wake()
+{
+  while ((!max_in_progress || in_progress < max_in_progress) &&
+        !scheduler->empty()) {
+    auto item = scheduler->dequeue();
+    item.wake.set_value();
+    ++in_progress;
+    --pending;
+  }
+}
+
+void OperationThrottler::release_throttle()
+{
+  ceph_assert(in_progress > 0);
+  --in_progress;
+  wake();
+}
+
+blocking_future<> OperationThrottler::acquire_throttle(
+  crimson::osd::scheduler::params_t params)
+{
+  crimson::osd::scheduler::item_t item{params, seastar::promise<>()};
+  auto fut = item.wake.get_future();
+  scheduler->enqueue(std::move(item));
+  return make_blocking_future(std::move(fut));
+}
+
+void OperationThrottler::dump_detail(Formatter *f) const
+{
+  f->dump_unsigned("max_in_progress", max_in_progress);
+  f->dump_unsigned("in_progress", in_progress);
+  f->open_object_section("scheduler");
+  {
+    scheduler->dump(*f);
+  }
+  f->close_section();
+}
+
+void OperationThrottler::update_from_config(const ConfigProxy &conf)
+{
+  max_in_progress = conf.get_val<uint64_t>("crimson_osd_scheduler_concurrency");
+  wake();
+}
+
+const char** OperationThrottler::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "crimson_osd_scheduler_concurrency",
+    NULL
+  };
+  return KEYS;
+}
+
+void OperationThrottler::handle_conf_change(
+  const ConfigProxy& conf,
+  const std::set<std::string> &changed)
+{
+  update_from_config(conf);
+}
+
+
 void OrderedPipelinePhase::Handle::exit()
 {
   if (phase) {
index fe06023a4416b0155f16a437daa820ecf71cefec..a4382c09586c4e4f71877a2605ee7b95db13980f 100644 (file)
@@ -14,6 +14,7 @@
 #include <seastar/core/future.hh>
 
 #include "include/ceph_assert.h"
+#include "crimson/osd/scheduler/scheduler.h"
 
 namespace ceph {
   class Formatter;
@@ -23,12 +24,13 @@ namespace crimson::osd {
 
 enum class OperationTypeCode {
   client_request = 0,
-  peering_event = 1,
-  compound_peering_request = 2,
-  pg_advance_map = 3,
-  pg_creation = 4,
-  replicated_request = 5,
-  last_op = 6
+  peering_event,
+  compound_peering_request,
+  pg_advance_map,
+  pg_creation,
+  replicated_request,
+  background_recovery,
+  last_op
 };
 
 static constexpr const char* const OP_NAMES[] = {
@@ -38,6 +40,7 @@ static constexpr const char* const OP_NAMES[] = {
   "pg_advance_map",
   "pg_creation",
   "replicated_request",
+  "background_recovery",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
@@ -226,6 +229,70 @@ public:
   }
 };
 
+/**
+ * Throttles set of currently running operations
+ *
+ * Very primitive currently, assumes all ops are equally
+ * expensive and simply limits the number that can be
+ * concurrently active.
+ */
+class OperationThrottler : public Blocker, md_config_obs_t {
+public:
+  OperationThrottler(ConfigProxy &conf);
+
+  const char** get_tracked_conf_keys() const final;
+  void handle_conf_change(const ConfigProxy& conf,
+                         const std::set<std::string> &changed) final;
+  void update_from_config(const ConfigProxy &conf);
+
+  template <typename F>
+  auto with_throttle(
+    OperationRef op,
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    if (!max_in_progress) return f();
+    auto fut = acquire_throttle(params);
+    return op->with_blocking_future(std::move(fut))
+      .then(std::forward<F>(f))
+      .then([this](auto x) {
+       release_throttle();
+       return x;
+      });
+  }
+
+  template <typename F>
+  seastar::future<> with_throttle_while(
+    OperationRef op,
+    crimson::osd::scheduler::params_t params,
+    F &&f) {
+    return with_throttle(op, params, f).then([this, params, op, f](bool cont) {
+      if (cont)
+       return with_throttle_while(op, params, f);
+      else
+       return seastar::now();
+    });
+  }
+protected:
+  void dump_detail(Formatter *f) const final;
+  const char *get_type_name() const final {
+    return "OperationThrottler";
+  }
+private:
+  crimson::osd::scheduler::SchedulerRef scheduler;
+
+  uint64_t max_in_progress = 0;
+  uint64_t in_progress = 0;
+
+  uint64_t pending = 0;
+
+  void wake();
+
+  blocking_future<> acquire_throttle(
+    crimson::osd::scheduler::params_t params);
+
+  void release_throttle();
+};
+
 /**
  * Ensures that at most one op may consider itself in the phase at a time.
  * Ops will see enter() unblock in the order in which they tried to enter
diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc
new file mode 100644 (file)
index 0000000..25f0934
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+BackgroundRecovery::BackgroundRecovery(
+  ShardServices &ss,
+  PG &pg,
+  : ss(ss), pg(pg), scheduler_class(scheduler_class)
+  crimson::osd::scheduler::scheduler_class_t scheduler_class)
+{}
+
+seastar::future<bool> BackgroundRecovery::do_recovery()
+{
+  return seastar::make_ready_future<bool>(false);
+}
+
+void BackgroundRecovery::print(std::ostream &lhs) const
+{
+  lhs << "BackgroundRecovery(" << pg.get_pgid() << ")";
+}
+
+void BackgroundRecovery::dump_detail(Formatter *f) const
+{
+  f->dump_stream("pgid") << pg.get_pgid();
+}
+
+seastar::future<> BackgroundRecovery::start()
+{
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  return ss.throttler.with_throttle_while(
+    this, get_scheduler_params(), [ref, this] {
+      return do_recovery();
+    });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h
new file mode 100644 (file)
index 0000000..57ca754
--- /dev/null
@@ -0,0 +1,44 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDOp;
+
+namespace crimson::osd {
+class PG;
+class ShardServices;
+
+class BackgroundRecovery final : public OperationT<BackgroundRecovery> {
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
+
+  BackgroundRecovery(
+    ShardServices &ss,
+    PG &pg,
+    crimson::osd::scheduler::scheduler_class_t scheduler_class);
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+private:
+  ShardServices &ss;
+  PG &pg;
+  crimson::osd::scheduler::scheduler_class_t scheduler_class;
+
+  auto get_scheduler_params() const {
+    return ceph::osd::scheduler::params_t{
+      1, // cost
+      0, // owner
+      scheduler_class
+    };
+  }
+
+  seastar::future<bool> do_recovery();
+};
+
+}
diff --git a/src/crimson/osd/scheduler/mclock_scheduler.cc b/src/crimson/osd/scheduler/mclock_scheduler.cc
new file mode 100644 (file)
index 0000000..195ea8d
--- /dev/null
@@ -0,0 +1,165 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#include <memory>
+#include <functional>
+
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/dout.h"
+
+namespace dmc = crimson::dmclock;
+using namespace std::placeholders;
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+#undef dout_prefix
+#define dout_prefix *_dout
+
+
+namespace crimson::osd::scheduler {
+
+mClockScheduler::mClockScheduler(ConfigProxy &conf) :
+  scheduler(
+    std::bind(&mClockScheduler::ClientRegistry::get_info,
+             &client_registry,
+             _1),
+    dmc::AtLimit::Allow,
+    conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
+{
+  conf.add_observer(this);
+  client_registry.update_from_config(conf);
+}
+
+void mClockScheduler::ClientRegistry::update_from_config(const ConfigProxy &conf)
+{
+  default_external_client_info.update(
+    conf.get_val<uint64_t>("osd_mclock_scheduler_client_res"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_client_wgt"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_client_lim"));
+
+  internal_client_infos[
+    static_cast<size_t>(scheduler_class_t::background_recovery)].update(
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_res"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_wgt"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_recovery_lim"));
+
+  internal_client_infos[
+    static_cast<size_t>(scheduler_class_t::background_best_effort)].update(
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_res"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_wgt"),
+    conf.get_val<uint64_t>("osd_mclock_scheduler_background_best_effort_lim"));
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_external_client(
+  const client_profile_id_t &client) const
+{
+  auto ret = external_client_infos.find(client);
+  if (ret == external_client_infos.end())
+    return &default_external_client_info;
+  else
+    return &(ret->second);
+}
+
+const dmc::ClientInfo *mClockScheduler::ClientRegistry::get_info(
+  const scheduler_id_t &id) const {
+  switch (id.class_id) {
+  case scheduler_class_t::immediate:
+    ceph_assert(0 == "Cannot schedule immediate");
+    return (dmc::ClientInfo*)nullptr;
+  case scheduler_class_t::repop:
+  case scheduler_class_t::client:
+    return get_external_client(id.client_profile_id);
+  default:
+    ceph_assert(static_cast<size_t>(id.class_id) < internal_client_infos.size());
+    return &internal_client_infos[static_cast<size_t>(id.class_id)];
+  }
+}
+
+void mClockScheduler::dump(ceph::Formatter &f) const
+{
+}
+
+void mClockScheduler::enqueue(item_t&& item)
+{
+  auto id = get_scheduler_id(item);
+  auto cost = item.params.cost;
+
+  if (scheduler_class_t::immediate == item.params.klass) {
+    immediate.push_front(std::move(item));
+  } else {
+    scheduler.add_request(
+      std::move(item),
+      id,
+      cost);
+  }
+}
+
+void mClockScheduler::enqueue_front(item_t&& item)
+{
+  immediate.push_back(std::move(item));
+  // TODO: item may not be immediate, update mclock machinery to permit
+  // putting the item back in the queue
+}
+
+item_t mClockScheduler::dequeue()
+{
+  if (!immediate.empty()) {
+    auto ret = std::move(immediate.back());
+    immediate.pop_back();
+    return ret;
+  } else {
+    mclock_queue_t::PullReq result = scheduler.pull_request();
+    if (result.is_future()) {
+      ceph_assert(
+       0 == "Not implemented, user would have to be able to be woken up");
+      return std::move(*(item_t*)nullptr);
+    } else if (result.is_none()) {
+      ceph_assert(
+       0 == "Impossible, must have checked empty() first");
+      return std::move(*(item_t*)nullptr);
+    } else {
+      ceph_assert(result.is_retn());
+
+      auto &retn = result.get_retn();
+      return std::move(*retn.request);
+    }
+  }
+}
+
+const char** mClockScheduler::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "osd_mclock_scheduler_client_res",
+    "osd_mclock_scheduler_client_wgt",
+    "osd_mclock_scheduler_client_lim",
+    "osd_mclock_scheduler_background_recovery_res",
+    "osd_mclock_scheduler_background_recovery_wgt",
+    "osd_mclock_scheduler_background_recovery_lim",
+    "osd_mclock_scheduler_background_best_effort_res",
+    "osd_mclock_scheduler_background_best_effort_wgt",
+    "osd_mclock_scheduler_background_best_effort_lim",
+    NULL
+  };
+  return KEYS;
+}
+
+void mClockScheduler::handle_conf_change(
+  const ConfigProxy& conf,
+  const std::set<std::string> &changed)
+{
+  client_registry.update_from_config(conf);
+}
+
+}
diff --git a/src/crimson/osd/scheduler/mclock_scheduler.h b/src/crimson/osd/scheduler/mclock_scheduler.h
new file mode 100644 (file)
index 0000000..c3edbe7
--- /dev/null
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include <ostream>
+#include <map>
+#include <vector>
+
+#include "boost/variant.hpp"
+
+#include "dmclock/src/dmclock_server.h"
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "common/config.h"
+#include "include/cmp.h"
+#include "common/ceph_context.h"
+
+
+namespace crimson::osd::scheduler {
+
+using client_id_t = uint64_t;
+using profile_id_t = uint64_t;
+
+struct client_profile_id_t {
+  client_id_t client_id;
+  profile_id_t profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+WRITE_CMP_OPERATORS_2(client_profile_id_t, client_id, profile_id)
+
+
+struct scheduler_id_t {
+  scheduler_class_t class_id;
+  client_profile_id_t client_profile_id;
+};
+
+WRITE_EQ_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+WRITE_CMP_OPERATORS_2(scheduler_id_t, class_id, client_profile_id)
+
+/**
+ * Scheduler implementation based on mclock.
+ *
+ * TODO: explain configs
+ */
+class mClockScheduler : public Scheduler, md_config_obs_t {
+
+  class ClientRegistry {
+    std::array<
+      crimson::dmclock::ClientInfo,
+      static_cast<size_t>(scheduler_class_t::client)
+    > internal_client_infos = {
+      // Placeholder, gets replaced with configured values
+      crimson::dmclock::ClientInfo(1, 1, 1),
+      crimson::dmclock::ClientInfo(1, 1, 1)
+    };
+
+    crimson::dmclock::ClientInfo default_external_client_info = {1, 1, 1};
+    std::map<client_profile_id_t,
+            crimson::dmclock::ClientInfo> external_client_infos;
+    const crimson::dmclock::ClientInfo *get_external_client(
+      const client_profile_id_t &client) const;
+  public:
+    void update_from_config(const ConfigProxy &conf);
+    const crimson::dmclock::ClientInfo *get_info(
+      const scheduler_id_t &id) const;
+  } client_registry;
+
+  using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
+    scheduler_id_t,
+    item_t,
+    true,
+    true,
+    2>;
+  mclock_queue_t scheduler;
+  std::list<item_t> immediate;
+
+  static scheduler_id_t get_scheduler_id(const item_t &item) {
+    return scheduler_id_t{
+      item.params.klass,
+       client_profile_id_t{
+       item.params.owner,
+         0
+         }
+    };
+  }
+
+public:
+  mClockScheduler(ConfigProxy &conf);
+
+  // Enqueue op in the back of the regular queue
+  void enqueue(item_t &&item) final;
+
+  // Enqueue the op in the front of the regular queue
+  void enqueue_front(item_t &&item) final;
+
+  // Return an op to be dispatch
+  item_t dequeue() final;
+
+  // Returns if the queue is empty
+  bool empty() const final {
+    return immediate.empty() && scheduler.empty();
+  }
+
+  // Formatted output of the queue
+  void dump(ceph::Formatter &f) const final;
+
+  void print(std::ostream &ostream) const final {
+    ostream << "mClockScheduler";
+  }
+
+  const char** get_tracked_conf_keys() const final;
+  void handle_conf_change(const ConfigProxy& conf,
+                         const std::set<std::string> &changed) final;
+};
+
+}
diff --git a/src/crimson/osd/scheduler/scheduler.cc b/src/crimson/osd/scheduler/scheduler.cc
new file mode 100644 (file)
index 0000000..93ffa40
--- /dev/null
@@ -0,0 +1,181 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include <seastar/core/print.hh>
+
+#include "crimson/osd/scheduler/scheduler.h"
+#include "crimson/osd/scheduler/mclock_scheduler.h"
+#include "common/WeightedPriorityQueue.h"
+
+namespace crimson::osd::scheduler {
+
+std::ostream &operator<<(std::ostream &lhs, const scheduler_class_t &c)
+{
+  switch (c) {
+  case scheduler_class_t::background_best_effort:
+    return lhs << "background_best_effort";
+  case scheduler_class_t::background_recovery:
+    return lhs << "background_recovery";
+  case scheduler_class_t::client:
+    return lhs << "client";
+  case scheduler_class_t::repop:
+    return lhs << "repop";
+  case scheduler_class_t::immediate:
+    return lhs << "immediate";
+  default:
+    return lhs;
+  }
+}
+
+/**
+ * Implements Scheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<Scheduleritem_t, client_t>.  This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler : public Scheduler {
+  const scheduler_class_t cutoff;
+  T queue;
+
+  using priority_t = uint64_t;
+  std::array<
+    priority_t,
+    static_cast<size_t>(scheduler_class_t::immediate)
+  > priority_map = {
+    // Placeholder, gets replaced with configured values
+    0, 0, 0
+  };
+
+  static scheduler_class_t get_io_prio_cut(ConfigProxy &conf) {
+    if (conf.get_val<std::string>("osd_op_queue_cut_off") == "debug_random") {
+      srand(time(NULL));
+      return (rand() % 2 < 1) ?
+       scheduler_class_t::repop : scheduler_class_t::immediate;
+    } else if (conf.get_val<std::string>("osd_op_queue_cut_off") == "high") {
+      return scheduler_class_t::immediate;
+    } else {
+      return scheduler_class_t::repop;
+    }
+  }
+
+  bool use_strict(scheduler_class_t kl) const {
+    return static_cast<uint8_t>(kl) >= static_cast<uint8_t>(cutoff);
+  }
+
+  priority_t get_priority(scheduler_class_t kl) const {
+    ceph_assert(static_cast<size_t>(kl) <
+               static_cast<size_t>(scheduler_class_t::immediate));
+    return priority_map[static_cast<size_t>(kl)];
+  }
+
+public:
+  template <typename... Args>
+  ClassedOpQueueScheduler(ConfigProxy &conf, Args&&... args) :
+    cutoff(get_io_prio_cut(conf)),
+    queue(std::forward<Args>(args)...)
+  {
+    priority_map[
+      static_cast<size_t>(scheduler_class_t::background_best_effort)
+    ] = conf.get_val<uint64_t>("osd_scrub_priority");
+    priority_map[
+      static_cast<size_t>(scheduler_class_t::background_recovery)
+    ] = conf.get_val<uint64_t>("osd_recovery_op_priority");
+    priority_map[
+      static_cast<size_t>(scheduler_class_t::client)
+    ] = conf.get_val<uint64_t>("osd_client_op_priority");
+    priority_map[
+      static_cast<size_t>(scheduler_class_t::repop)
+    ] = conf.get_val<uint64_t>("osd_client_op_priority");
+  }
+
+  void enqueue(item_t &&item) final {
+    if (use_strict(item.params.klass))
+      queue.enqueue_strict(
+       item.params.owner, get_priority(item.params.klass), std::move(item));
+    else
+      queue.enqueue(
+       item.params.owner, get_priority(item.params.klass),
+       item.params.cost, std::move(item));
+  }
+
+  void enqueue_front(item_t &&item) final {
+    if (use_strict(item.params.klass))
+      queue.enqueue_strict_front(
+       item.params.owner, get_priority(item.params.klass), std::move(item));
+    else
+      queue.enqueue_front(
+       item.params.owner, get_priority(item.params.klass),
+       item.params.cost, std::move(item));
+  }
+
+  bool empty() const final {
+    return queue.empty();
+  }
+
+  item_t dequeue() final {
+    return queue.dequeue();
+  }
+
+  void dump(ceph::Formatter &f) const final {
+    return queue.dump(&f);
+  }
+
+  void print(std::ostream &out) const final {
+    out << "ClassedOpQueueScheduler(queue=";
+    queue.print(out);
+    out << ", cutoff=" << cutoff << ")";
+  }
+
+  ~ClassedOpQueueScheduler() final {};
+};
+
+SchedulerRef make_scheduler(ConfigProxy &conf)
+{
+  const std::string _type = conf.get_val<std::string>("osd_op_queue");
+  const std::string *type = &_type;
+  if (*type == "debug_random") {
+    static const std::string index_lookup[] = { "mclock_scheduler",
+                                               "wpq" };
+    srand(time(NULL));
+    unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+    type = &index_lookup[which];
+  }
+
+  if (*type == "wpq" ) {
+    // default is 'wpq'
+    return std::make_unique<
+      ClassedOpQueueScheduler<WeightedPriorityQueue<item_t, client_t>>>(
+       conf,
+       conf.get_val<uint64_t>("osd_op_pq_max_tokens_per_priority"),
+       conf->osd_op_pq_min_cost
+      );
+  } else if (*type == "mclock_scheduler") {
+    return std::make_unique<mClockScheduler>(conf);
+  } else {
+    ceph_assert("Invalid choice of wq" == 0);
+    return std::unique_ptr<mClockScheduler>();
+  }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &rhs) {
+  rhs.print(lhs);
+  return lhs;
+}
+
+}
diff --git a/src/crimson/osd/scheduler/scheduler.h b/src/crimson/osd/scheduler/scheduler.h
new file mode 100644 (file)
index 0000000..a014991
--- /dev/null
@@ -0,0 +1,82 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <ostream>
+
+#include "crimson/common/config_proxy.h"
+
+namespace crimson::osd::scheduler {
+
+enum class scheduler_class_t : uint8_t {
+  background_best_effort = 0,
+  background_recovery,
+  client,
+  repop,
+  immediate,
+};
+
+std::ostream &operator<<(std::ostream &, const scheduler_class_t &);
+
+using client_t = uint64_t;
+using cost_t = uint64_t;
+
+struct params_t {
+  cost_t cost = 1;
+  client_t owner;
+  scheduler_class_t klass;
+};
+
+struct item_t {
+  params_t params;
+  seastar::promise<> wake;
+};
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class Scheduler {
+public:
+  // Enqueue op for scheduling
+  virtual void enqueue(item_t &&item) = 0;
+
+  // Enqueue op for processing as though it were enqueued prior
+  // to other items already scheduled.
+  virtual void enqueue_front(item_t &&item) = 0;
+
+  // Returns true iff there are no ops scheduled
+  virtual bool empty() const = 0;
+
+  // Return next op to be processed
+  virtual item_t dequeue() = 0;
+
+  // Dump formatted representation for the queue
+  virtual void dump(ceph::Formatter &f) const = 0;
+
+  // Print human readable brief description with relevant parameters
+  virtual void print(std::ostream &out) const = 0;
+
+  // Destructor
+  virtual ~Scheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const Scheduler &);
+using SchedulerRef = std::unique_ptr<Scheduler>;
+
+SchedulerRef make_scheduler(ConfigProxy &);
+
+}
index b89fd0b8310f24156e9c414d5243fbb13b98f884..fc81ecb02075709183d3f66f4579d3d6c8e9cdb8 100644 (file)
@@ -39,6 +39,7 @@ ShardServices::ShardServices(
       monc(monc),
       mgrc(mgrc),
       store(store),
+      throttler(crimson::common::local_conf()),
       obc_registry(crimson::common::local_conf()),
       local_reserver(
        &cct,
index 67d03eff09ec62934f7195d77136263c15631d30..443fd0ee45180dc3979fe660ffb09dabdb662f4f 100644 (file)
@@ -84,8 +84,9 @@ public:
     return osdmap_service;
   }
 
-  // Op Tracking
+  // Op Management
   OperationRegistry registry;
+  OperationThrottler throttler;
 
   template <typename T, typename... Args>
   auto start_operation(Args&&... args) {