]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds: add MDS dmClock scheduler for subvolume QoS support
authorYongseok Oh <yongseok.oh@linecorp.com>
Wed, 9 Dec 2020 07:54:48 +0000 (16:54 +0900)
committerVenky Shankar <vshankar@redhat.com>
Thu, 27 Nov 2025 08:13:47 +0000 (13:43 +0530)
Signed-off-by: Yongseok Oh <yongseok.oh@linecorp.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options/mds.yaml.in
src/mds/CMakeLists.txt
src/mds/MDSDaemon.cc
src/mds/MDSDmclockScheduler.cc [new file with mode: 0644]
src/mds/MDSDmclockScheduler.h [new file with mode: 0644]
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/Server.cc

index b7ec143ab6fd53fb62d4170774d0e256624fde8b..a31f2ec13d20979e3e64f23ef15269189cf8a3eb 100644 (file)
@@ -1811,4 +1811,40 @@ options:
   default: 30
   min: 30
   services:
-  - mds
\ No newline at end of file
+  - mds
+- name: mds_dmclock_enable
+  type: bool
+  level: advanced
+  desc: enable MDS dmClock QoS scheduler
+  log_dest: This option can be configured when reservation, weight, and limit values are
+    greater than zero.
+  default: false
+  services:
+  - mds
+- name: mds_dmclock_reservation
+  type: float
+  level: advanced
+  desc: dmclock reservation for each MDS client
+  default: 1000.0
+  services:
+  - mds
+  flags:
+  - runtime
+- name: mds_dmclock_weight
+  type: float
+  level: advanced
+  desc: dmclock weight for each MDS client
+  default: 1000.0
+  services:
+  - mds
+  flags:
+  - runtime
+- name: mds_dmclock_limit
+  type: float
+  level: advanced
+  desc: dmclock limit for each MDS client
+  default: 1000.0
+  services:
+  - mds
+  flags:
+  - runtime
index f3980c7e04b509a569ee35664be2f83568342959..8def0893896b0060256015c4bc7d2799f209e8fc 100644 (file)
@@ -45,6 +45,7 @@ set(mds_srcs
   QuiesceDbManager.cc
   QuiesceAgent.cc
   MDSRankQuiesce.cc
+  MDSDmclockScheduler.cc
   ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
   ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
   ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
@@ -52,5 +53,6 @@ set(mds_srcs
 add_library(mds STATIC ${mds_srcs})
 target_link_libraries(mds PRIVATE
   legacy-option-headers Boost::url
-  heap_profiler cpu_profiler osdc ${LUA_LIBRARIES})
+  heap_profiler cpu_profiler osdc ${LUA_LIBRARIES}
+  PUBLIC dmclock::dmclock)
 target_include_directories(mds PRIVATE "${LUA_INCLUDE_DIR}")
index 1e0a6ed07b97ec86108257fcd4c8521b8104fb8c..15731bb90bf801a8e7f5b34263e0226096fdeb0a 100644 (file)
@@ -565,6 +565,28 @@ void MDSDaemon::set_up_admin_socket()
     asok_hook,
     "dump stray folder content");
   ceph_assert(r == 0);
+  r = admin_socket->register_command("dump qos",
+                                     asok_hook,
+                                     "dump qos info");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("qos set "
+                                     "name=path,type=CephString,req=true "
+                                     "name=reservation,type=CephInt,req=true "
+                                     "name=weight,type=CephInt,req=true "
+                                     "name=limit,type=CephInt,req=true",
+                                     asok_hook,
+                                     "set qos info");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("qos rm "
+                                     "name=path,type=CephString,req=true",
+                                     asok_hook,
+                                     "rm qos info");
+  ceph_assert(r == 0);
+  r = admin_socket->register_command("qos get "
+                                     "name=path,type=CephString,req=true",
+                                     asok_hook,
+                                     "get qos info");
+  ceph_assert(r == 0);
 }
 
 void MDSDaemon::clean_up_admin_socket()
diff --git a/src/mds/MDSDmclockScheduler.cc b/src/mds/MDSDmclockScheduler.cc
new file mode 100644 (file)
index 0000000..2dec598
--- /dev/null
@@ -0,0 +1,888 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 LINE
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "Server.h"
+#include "SessionMap.h"
+#include "MDSDmclockScheduler.h"
+#include "mds/MDSMap.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds." << get_nodeid() << ".dmclock_scheduler "
+
+using std::string;
+using TOPNSPC::common::cmd_getval;
+
+ostream& operator<<(ostream& os, VolumeInfo* vi)
+{
+  if (vi) {
+    os << "VolumeInfo: (session_cnt " << vi->get_session_cnt() << ") "
+        << " reservation " << vi->get_reservation()
+        << " weight" << vi->get_weight()
+        << " limit " << vi->get_limit();
+  } else {
+    os << "VolumeInfo has nullptr";
+  }
+  return os;
+}
+
+const VolumeId MDSDmclockScheduler::convert_subvol_root(const VolumeId &volume_id)
+{
+  filepath subvol_root_path(volume_id);
+
+  if (subvol_root_path.depth() > SUBVOL_ROOT_DEPTH) {
+    return "/" + subvol_root_path.prefixpath(SUBVOL_ROOT_DEPTH).get_path();
+  }
+
+  return volume_id;
+}
+
+const VolumeId MDSDmclockScheduler::get_volume_id(Session *session)
+{
+  ceph_assert(session != nullptr);
+  auto client_root_entry = session->info.client_metadata.find("root");
+  if (client_root_entry == session->info.client_metadata.end() || client_root_entry->second == "") {
+    return "";
+  }
+  return convert_subvol_root(client_root_entry->second);
+}
+
+const VolumeId MDSDmclockScheduler::get_session_id(Session *session)
+{
+  ceph_assert(session != nullptr);
+  return std::to_string(session->info.inst.name.num());
+}
+
+template<typename R>
+void MDSDmclockScheduler::enqueue_client_request(const R &mds_req, VolumeId volume_id, Cost cost)
+{
+  dout(10) << __func__ << " volume_id " << volume_id << dendl;
+
+  std::unique_lock<std::mutex> lock(queue_mutex);
+  request_queue.emplace_back(new ClientRequest(mds_req, volume_id, crimson::dmclock::get_time(), cost));
+  /* wake up*/
+  lock.unlock();
+  queue_cvar.notify_all();
+}
+
+void MDSDmclockScheduler::handle_client_request(const MDSReqRef &mds_req)
+{
+  dout(10) << __func__ << " " << *mds_req << dendl;
+
+  if (mds->mds_dmclock_scheduler->default_conf.is_enabled() == true &&
+      mds_req->get_orig_source().is_client() &&
+      mds->is_active() &&
+      check_volume_info_updated(get_volume_id(mds->get_session(mds_req))) &&
+      check_volume_info_validity(get_volume_id(mds->get_session(mds_req)))) {
+    Cost cost = get_op_cost(mds_req->get_op());
+    enqueue_client_request<MDSReqRef>(mds_req, get_volume_id(mds->get_session(mds_req)), cost);
+  } else {
+    mds->server->handle_client_request(mds_req);
+  }
+}
+
+void MDSDmclockScheduler::submit_request_to_mds(const VolumeId& vid, std::unique_ptr<ClientRequest>&& request,
+                                                const PhaseType& phase_type, const uint64_t cost)
+{
+  dout(10) << __func__ << " volume_id " << vid << dendl;
+
+  hit_volume_throttle(vid, request->time);
+
+  const MDSReqRef& req = request->mds_req_ref;
+
+  mds_lock();
+
+  mds->server->handle_client_request(req);
+
+  mds_unlock();
+
+  decrease_inflight_request(request->get_volume_id());
+}
+
+void MDSDmclockScheduler::shutdown()
+{
+  dout(10) << __func__ << " state " << get_state_str() << dendl;
+
+  if (default_conf.is_enabled() == true) {
+    disable_qos_feature();
+  }
+
+  std::unique_lock<std::mutex> lock(queue_mutex);
+  state = SchedulerState::FINISHING;
+  lock.unlock();
+  queue_cvar.notify_all();
+
+  if (scheduler_thread.joinable()) {
+    scheduler_thread.join();
+  }
+
+  state = SchedulerState::SHUTDOWN;
+
+  dout(10) << __func__ << " state " << get_state_str() << dendl;
+}
+
+MDSDmclockScheduler::~MDSDmclockScheduler()
+{
+  dout(10) << __func__ << dendl;
+  shutdown();
+  delete dmclock_queue;
+}
+
+const ClientInfo *MDSDmclockScheduler::get_client_info(const VolumeId &vid)
+{
+  dout(10) << __func__ << " volume_id " << vid << dendl;
+  std::lock_guard lock(volume_info_lock);
+
+  auto vi = get_volume_info_ptr(vid);
+  const ClientInfo *ci = nullptr;
+  if (vi != nullptr) {
+    if (vi->is_use_default() == true) {
+      dout(15) << __func__ << " default QoS " << *default_conf.get_qos_info() << dendl;
+      ci = default_conf.get_qos_info();
+    } else {
+      dout(15) << __func__ <<  " per client specific QoS " << vi->get_qos_info() << dendl;
+      ci = vi->get_qos_info();
+    }
+  }
+  return ci;
+}
+
+void MDSDmclockScheduler::dump(Formatter *f) const
+{
+  f->open_object_section("qos_info");
+
+  f->open_object_section("qos_state");
+  f->dump_bool("qos_enabled", default_conf.is_enabled());
+  if (default_conf.is_enabled()) {
+    f->dump_string("state", get_state_str());
+    f->dump_float("default_reservation", default_conf.get_reservation());
+    f->dump_float("default_weight", default_conf.get_weight());
+    f->dump_float("default_limit", default_conf.get_limit());
+    f->dump_int("mds_dmclock_queue_size", get_request_queue_size());
+    f->dump_int("inflight_requests", total_inflight_requests);
+  }
+  f->close_section(); // qos_state
+
+  f->open_array_section("volume_infos");
+  std::lock_guard lock(volume_info_lock);
+  for (auto it = volume_info_map.begin(); it != volume_info_map.end(); it++) {
+    auto vol_info = it->second;
+    f->open_object_section("volume_info");
+    vol_info.dump(f, it->first);
+    f->close_section();
+  }
+  f->close_section(); // volume_infos
+
+  f->close_section(); // qos_info
+}
+
+VolumeInfo *MDSDmclockScheduler::get_volume_info_ptr(const VolumeId &vid)
+{
+  auto it = volume_info_map.find(vid);
+  if (it != volume_info_map.end()) {
+    return &it->second;
+  }
+  return nullptr;
+}
+
+bool MDSDmclockScheduler::copy_volume_info(const VolumeId &vid, VolumeInfo &vi)
+{
+  std::lock_guard lock(volume_info_lock);
+  auto it = volume_info_map.find(vid);
+  if (it != volume_info_map.end()) {
+    vi = it->second;
+    return true;
+  }
+  return false;
+}
+
+bool MDSDmclockScheduler::check_volume_info_existence(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  if (get_volume_info_ptr(vid) != nullptr) {
+    return true;
+  }
+  return false;
+}
+
+bool MDSDmclockScheduler::check_volume_info_updated(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  if (vi != nullptr && vi->get_updated()) {
+    return true;
+  }
+  return false;
+}
+
+void MDSDmclockScheduler::hit_volume_throttle(const VolumeId &vid, Time arrival)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+
+  if (vi != nullptr && vi->get_updated()) {
+    double expected_lat;
+    double expected_iops;
+
+    if (vi->is_use_default()) {
+      expected_lat = (double)1.0 / default_conf.get_limit();
+      expected_iops = default_conf.get_limit();
+    } else {
+      expected_lat = (double)1.0 / vi->get_limit();
+      expected_iops = vi->get_limit();
+    }
+
+    double latency = crimson::dmclock::get_time() - arrival;
+
+    dout(10) << __func__ << " Volume " << vid <<
+      " current latency " << latency <<
+      " expected latency " << expected_lat <<
+      " inflight request " << vi->get_inflight_request() <<
+      " expected iops " << expected_iops << dendl;
+
+    if (latency > expected_lat && vi->get_inflight_request() > expected_iops) {
+      vi->hit_throttle();
+    }
+
+    vi->update_latency(latency);
+  }
+}
+
+bool MDSDmclockScheduler::check_volume_info_validity(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  if (vi != nullptr) {
+    if (vi->is_use_default() == true) {
+      const ClientInfo *ci = default_conf.get_qos_info();
+      if (ci->reservation > 0.0 &&
+          ci->weight > 0.0 &&
+          ci->limit > 0.0) {
+        return true;
+      }
+
+    } else {
+      if (vi->get_reservation() > 0.0 &&
+          vi->get_weight() > 0.0 &&
+          vi->get_limit() > 0.0)
+      return true;
+    }
+  }
+  return false;
+}
+
+void MDSDmclockScheduler::set_volume_info_updated(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  if (vi != nullptr) {
+    vi->set_updated(true);
+  }
+}
+
+void MDSDmclockScheduler::increase_inflight_request(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  ceph_assert(vi!=nullptr);
+  vi->increase_inflight_request();
+  total_inflight_requests++;
+}
+
+void MDSDmclockScheduler::decrease_inflight_request(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  ceph_assert(vi!=nullptr);
+  vi->decrease_inflight_request();
+  total_inflight_requests--;
+}
+
+int MDSDmclockScheduler::get_inflight_request(const VolumeId &vid)
+{
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  ceph_assert(vi!=nullptr);
+  return vi->get_inflight_request();
+}
+
+void MDSDmclockScheduler::create_volume_info(const VolumeId &vid, const ClientInfo &client_info,
+                                              const bool use_default)
+{
+  dout(10) << __func__ << " volume_id " << vid << " client_info "
+          << client_info << " use_default" <<  use_default << dendl;
+
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+
+  if (vi == nullptr) {
+    auto [it, success]  = volume_info_map.insert(std::make_pair(std::move(vid), VolumeInfo()));
+    ceph_assert(success==true);
+    vi = &it->second;
+  }
+
+  enqueue_update_request(vid, client_info, use_default);
+}
+
+void MDSDmclockScheduler::add_session_to_volume_info(const VolumeId &vid, const SessionId &sid)
+{
+  dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl;
+
+  std::lock_guard lock(volume_info_lock);
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  ceph_assert(vi!=nullptr);
+  vi->add_session(sid);
+}
+
+void MDSDmclockScheduler::delete_session_from_volume_info(const VolumeId &vid, const SessionId &sid)
+{
+  dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl;
+
+  std::lock_guard lock(volume_info_lock);
+
+  auto it = volume_info_map.find(vid);
+  if (it != volume_info_map.end()) {
+    auto vi = &it->second;
+    vi->remove_session(sid);
+    if (vi->get_session_cnt() == 0) {
+      dout(15) << __func__ << " erase volume info due to no sessions (volume_id  "
+                <<  vid << " session_id " << sid << ")" << dendl;
+      ceph_assert(vi->get_inflight_request()==0);
+      volume_info_map.erase(it);
+    }
+    /* the dmclock library supports only removal of idle clients in the background */
+  }
+}
+
+void MDSDmclockScheduler::update_volume_info(const VolumeId &vid, const ClientInfo& client_info, const bool use_default)
+{
+  dout(10) << __func__ << " volume_id " << vid << " " << client_info << " use_default " << use_default << dendl;
+
+  std::lock_guard lock(volume_info_lock);
+  
+  VolumeInfo* vi = get_volume_info_ptr(vid);
+  if (vi) {
+    vi->update(client_info, use_default);
+  } else {
+    dout(5) << " VolumeInfo is unavaiable (vid = " << vid << ")" << dendl;
+  }
+}
+
+void MDSDmclockScheduler::set_default_volume_info(const VolumeId &vid)
+{
+  ClientInfo client_info(0.0, 0.0, 0.0);
+  dout(10) << __func__ << " vid " << vid << " " << client_info << dendl;
+  enqueue_update_request(vid, client_info, true);
+}
+
+void MDSDmclockScheduler::add_or_remove_session_map(const bool is_add)
+{
+  auto sessionmap = get_session_map();
+  if (sessionmap == nullptr) {
+    dout(10) << __func__ << " sessionmap has nullptr" << dendl;
+    return;
+  }
+  dout(0) << __func__ << dendl;
+
+  std::list<int> session_state_list = {Session::STATE_OPEN, Session::STATE_STALE};
+
+  for (int session_state : session_state_list) {
+      dout(0) << " session state " << session_state << dendl;
+    if (auto it = sessionmap->by_state.find(session_state); it != sessionmap->by_state.end()) {
+      for (const auto &session : *(it->second)) {
+        dout(0) << " session " << session << dendl;
+        if (is_add) {
+          add_session(session);
+        } else {
+          dout(0) << " remove session " << session << dendl;
+          remove_session(session);
+        }
+      }
+    }
+  }
+}
+
+void MDSDmclockScheduler::add_session(Session *session)
+{
+  if (get_default_conf().is_enabled() == false) {
+    return;
+  }
+
+  if (!get_mds_is_active()) {
+    dout(5) << __func__ << " Session cannot be added as mds is not active" << dendl;
+  }
+
+  if (session == nullptr) {
+    dout(5) << __func__ << " session is nullptr" << dendl;
+    return;
+  }
+
+  VolumeId vid = get_volume_id(session);
+  SessionId sid = get_session_id(session);
+
+  dout(10) << __func__ << " volume_id " << vid << " session_id " << sid <<  dendl;
+
+  if (check_volume_info_existence(vid) == false) {
+    ClientInfo client_info(0.0, 0.0, 0.0);
+    bool use_default = true;
+
+    create_volume_info(vid, client_info, use_default);
+  } 
+
+  add_session_to_volume_info(vid, sid);
+}
+
+void MDSDmclockScheduler::remove_session(Session *session)
+{
+  if (get_default_conf().is_enabled() == false) {
+    return;
+  }
+  VolumeId vid = get_volume_id(session);
+  SessionId sid = get_session_id(session);
+  dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl;
+  delete_session_from_volume_info(vid, sid);
+}
+
+uint32_t MDSDmclockScheduler::get_request_queue_size() const
+{
+  std::unique_lock<std::mutex> lock(queue_mutex);
+  return request_queue.size();
+}
+
+void MDSDmclockScheduler::enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default)
+{
+    dout(10) << __func__ <<  " volume_id " << vid << dendl;
+
+    std::unique_lock<std::mutex> lock(queue_mutex);
+
+    request_queue.emplace_back(new UpdateRequest(vid, client_info, use_default));
+
+    /* wake up*/
+    lock.unlock();
+    queue_cvar.notify_all();
+}
+
+void MDSDmclockScheduler::enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default, RequestCB cb_func)
+{
+    dout(10) << __func__ <<  " volume_id " << vid << dendl;
+
+    std::unique_lock<std::mutex> lock(queue_mutex);
+
+    request_queue.emplace_back(new UpdateRequest(vid, client_info, use_default, cb_func));
+    /* wake up*/
+    lock.unlock();
+    queue_cvar.notify_all();
+}
+
+void MDSDmclockScheduler::process_request_handler()
+{
+  std::unique_lock<std::mutex> lock(queue_mutex);
+
+  if (request_queue.size() == 0) {
+    queue_cvar.wait(lock);
+  }
+
+  while (request_queue.size()) {
+    std::unique_ptr<Request> request = std::move(request_queue.front());
+    request_queue.erase(request_queue.begin());
+
+    dout(10) << __func__ << " Process request type " << request->get_request_type_str() << dendl;
+
+    lock.unlock();
+
+    switch(request->get_request_type()) {
+      case RequestType::CLIENT_REQUEST:
+      {
+        std::unique_ptr<ClientRequest> c_request(static_cast<ClientRequest *>(request.release()));
+
+        dout(10) << " Process client request (queue size " << request_queue.size()
+                << " volume_id " << c_request->get_volume_id()
+                << " time " << c_request->time
+                << " cost " << c_request->cost << ")" << dendl;
+
+        increase_inflight_request(c_request->get_volume_id());
+
+        dmclock_queue->add_request(std::move(c_request), std::move(c_request->get_volume_id()),
+            {0, 0}, c_request->time, c_request->cost);
+
+        break;
+      }
+      case RequestType::UPDATE_REQUEST:
+      {
+        std::unique_ptr<UpdateRequest> c_request(static_cast<UpdateRequest *>(request.release()));
+
+        dout(10) << " Process update request (queue size " << request_queue.size()
+                  << " volume_id " << c_request->get_volume_id() << ")" << dendl;
+
+        update_volume_info(c_request->get_volume_id(), c_request->client_info, c_request->use_default);
+        dmclock_queue->update_client_info(c_request->get_volume_id());
+        set_volume_info_updated(c_request->get_volume_id());
+
+        if (c_request->cb_func) {
+          c_request->cb_func();
+        }
+        break;
+      }
+      default:
+        derr << __func__ << " received unknown message " << request->get_request_type_str() << dendl;
+    }
+
+    lock.lock();
+  }
+}
+
+void MDSDmclockScheduler::process_request()
+{
+  dout(10) << __func__ << " thread has been invoked" << dendl;
+  /*
+   * process_request_handler() is responsible for handling request_queue.
+   * The function uses mutex-based locking, and
+   * if request_queue is empty, it waits until the request is queued.
+   * Therefore, it prevents excessive use of CPU by not busy waiting.
+   */
+  while (state == SchedulerState::RUNNING) {
+    process_request_handler();
+  }
+  dout(10) << __func__ << " thread has been joined" << dendl;
+}
+
+void MDSDmclockScheduler::begin_schedule_thread()
+{
+  scheduler_thread = std::thread([this](){process_request();});
+}
+
+void MDSDmclockScheduler::enable_qos_feature()
+{
+  dout(10) << __func__ << dendl;
+
+  default_conf.set_status(true);
+
+  if (!get_mds_is_active()) {
+    dout(0) << " MDS Rank is not active and QoS feature cannot be enabled." << dendl;
+    return;
+  }
+
+  add_or_remove_session_map(true);
+}
+
+void MDSDmclockScheduler::cancel_inflight_request()
+{
+  dout(10) << __func__ << dendl;
+  std::lock_guard lock(volume_info_lock);
+  std::list<Queue::RequestRef> req_list;
+
+  auto accum_f = [&req_list] (Queue::RequestRef&& r)
+                  {
+                    req_list.push_front(std::move(r));
+                  };
+
+  for (auto it : volume_info_map) {
+    if (it.second.get_inflight_request()) {
+      dmclock_queue->remove_by_client(it.first, true, accum_f);
+    }
+  }
+
+  dout(10) << __func__ << " canceled requests " << req_list.size() << dendl;
+
+  for (auto& it : req_list) {
+    dout(10) << " canceled request volume_id " << it->get_volume_id() << " " << *it->mds_req_ref << dendl;
+    handle_request_func(it->get_volume_id(), std::move(it), PhaseType::reservation, 1);
+  }
+  ceph_assert(dmclock_queue->empty() == true);
+}
+
+void MDSDmclockScheduler::disable_qos_feature()
+{
+  uint32_t queue_size;
+  bool dmclock_empty;
+
+  dout(10) << __func__ << dendl;
+
+  default_conf.set_status(false);
+
+  do
+  {
+    mds_unlock();
+    queue_cvar.notify_all();
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    queue_size = get_request_queue_size();
+    mds_lock();
+  } while(queue_size);
+
+  do
+  {
+    mds_unlock();
+    cancel_inflight_request();
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    dmclock_empty = dmclock_queue->empty();
+    mds_lock();
+  } while(!dmclock_empty);
+
+  add_or_remove_session_map(false);
+}
+
+void MDSDmclockScheduler::try_enable_qos_feature()
+{
+  if (g_conf().get_val<bool>("mds_dmclock_enable")) {
+    enable_qos_feature();
+  }
+}
+
+void MDSDmclockScheduler::try_disable_qos_feature()
+{
+  if (g_conf().get_val<bool>("mds_dmclock_enable")) {
+    disable_qos_feature();
+  }
+}
+
+void MDSDmclockScheduler::handle_conf_change(const std::set<std::string>& changed)
+{
+  dout(10) << __func__ << dendl;
+
+  if (changed.count("mds_dmclock_enable")) {
+    bool new_val = g_conf().get_val<bool>("mds_dmclock_enable");
+    if (default_conf.is_enabled() != new_val)
+    {
+      if (new_val == true) {
+        enable_qos_feature();
+      } else {
+        disable_qos_feature();
+      }
+    }
+  }
+
+  if (changed.count("mds_dmclock_reservation") || default_conf.is_enabled() == true) {
+    dout(10) << " set reservation " << g_conf().get_val<double>("mds_dmclock_reservation") << dendl;
+    default_conf.set_reservation(g_conf().get_val<double>("mds_dmclock_reservation"));
+    ceph_assert(default_conf.get_reservation() == g_conf().get_val<double>("mds_dmclock_reservation"));
+  }
+  if (changed.count("mds_dmclock_weight") || default_conf.is_enabled() == true) {
+    dout(10) << " set weight " << g_conf().get_val<double>("mds_dmclock_weight") << dendl;
+    default_conf.set_weight(g_conf().get_val<double>("mds_dmclock_weight"));
+    ceph_assert(default_conf.get_weight() == g_conf().get_val<double>("mds_dmclock_weight"));
+  }
+  if (changed.count("mds_dmclock_limit") || default_conf.is_enabled() == true) {
+    dout(10) << " set limit " << g_conf().get_val<double>("mds_dmclock_limit") << dendl;
+    default_conf.set_limit(g_conf().get_val<double>("mds_dmclock_limit"));
+    ceph_assert(default_conf.get_limit() == g_conf().get_val<double>("mds_dmclock_limit"));
+  }
+
+  /* need to check whether conf is updated from ceph.conf when the MDS is restarted */
+  dout(10) << __func__ <<  " enable " << default_conf.is_enabled()
+            << " reservation " << default_conf.get_reservation()
+            << " weight " << default_conf.get_weight()
+            << " limit " << default_conf.get_limit() << dendl;
+}
+
+mds_rank_t MDSDmclockScheduler::get_nodeid()
+{
+  if (mds != nullptr) {
+    return mds->get_nodeid();
+  }
+  return 0;
+}
+
+void MDSDmclockScheduler::set_mds_is_active(bool value)
+{
+  mds_is_active = value;
+}
+
+bool MDSDmclockScheduler::get_mds_is_active()
+{
+  return mds_is_active;
+}
+SessionMap *MDSDmclockScheduler::get_session_map()
+{
+  if (mds != nullptr) {
+    return &mds->sessionmap;
+  }
+  return 0;
+}
+
+void MDSDmclockScheduler::mds_lock()
+{
+  if (mds != nullptr) {
+    mds->mds_lock.lock();
+  }
+}
+
+void MDSDmclockScheduler::mds_unlock()
+{
+  if (mds != nullptr) {
+    mds->mds_lock.unlock();
+  }
+}
+
+std::string_view MDSDmclockScheduler::get_state_str() const
+{
+  switch(state) {
+    case SchedulerState::INIT:
+      return "INIT";
+    case SchedulerState::RUNNING:
+      return "RUNNING";
+    case SchedulerState::FINISHING:
+      return "FINISHING";
+    case SchedulerState::SHUTDOWN:
+      return "SHUTDOWN";
+    default:
+      return "UNKONWN";
+  }
+}
+
+Cost MDSDmclockScheduler::get_op_cost(const int op)
+{
+  switch (op) {
+    case CEPH_MDS_OP_LOOKUP:
+    case CEPH_MDS_OP_LOOKUPHASH:
+    case CEPH_MDS_OP_LOOKUPPARENT:
+    case CEPH_MDS_OP_LOOKUPINO:
+    case CEPH_MDS_OP_LOOKUPNAME:
+    case CEPH_MDS_OP_GETATTR:
+    case CEPH_MDS_OP_READDIR:
+    case CEPH_MDS_OP_OPEN:
+    case CEPH_MDS_OP_LOOKUPSNAP:
+    case CEPH_MDS_OP_LSSNAP:
+    case CEPH_MDS_OP_GETFILELOCK:
+      return 1;
+    case CEPH_MDS_OP_SETXATTR:
+    case CEPH_MDS_OP_SETATTR:
+    case CEPH_MDS_OP_RMXATTR:
+    case CEPH_MDS_OP_SETLAYOUT:
+    case CEPH_MDS_OP_SETDIRLAYOUT:
+    case CEPH_MDS_OP_MKNOD:
+    case CEPH_MDS_OP_LINK:
+    case CEPH_MDS_OP_UNLINK:
+    case CEPH_MDS_OP_RENAME:
+    case CEPH_MDS_OP_MKDIR:
+    case CEPH_MDS_OP_RMDIR:
+    case CEPH_MDS_OP_SYMLINK:
+    case CEPH_MDS_OP_CREATE:
+    case CEPH_MDS_OP_MKSNAP:
+    case CEPH_MDS_OP_RMSNAP:
+    case CEPH_MDS_OP_RENAMESNAP:
+    case CEPH_MDS_OP_SETFILELOCK:
+    default:
+      return 3;
+  }
+}
+
+bool MDSDmclockScheduler::_process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss)
+{
+  bool rc;
+  string path;
+  rc = cmd_getval(cmdmap, "path", path);
+  if (!rc) {
+    ss << "malformed path";
+    return false;
+  }
+
+  int64_t reservation;
+  rc = cmd_getval(cmdmap, "reservation", reservation);
+  if (!rc || reservation < 1) {
+    ss << "malformed reservation";
+    return false;
+  }
+
+  int64_t weight;
+  rc = cmd_getval(cmdmap, "weight", weight);
+  if (!rc || weight < 1) {
+    ss << "malformed weight";
+    return false;
+  }
+
+  int64_t limit;
+  rc = cmd_getval(cmdmap, "limit", limit);
+  if (!rc || limit < 1) {
+    ss << "malformed limit";
+    return false;
+  }
+
+  if (reservation > limit) {
+    ss << "reservation cannot be greater than limit";
+    return false;
+  }
+
+  if (check_volume_info_existence(path) == false) {
+    ss << "volume_info doesn't exist (" << path << ")" ;
+    return false;
+  }
+
+  dout(5) << "dmclock path " << path << " reservation=" << reservation
+    << " weight=" << weight
+    << " limit=" << limit <<dendl;
+
+  ClientInfo info(reservation, weight, limit);
+
+  enqueue_update_request(path, info, false);
+
+  return true;
+}
+
+void MDSDmclockScheduler::process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f)
+{
+  bool success = _process_asok_qos_set(cmdmap, ss);
+  f->open_object_section("results");
+  f->dump_int("return_code", int(success));
+  f->close_section(); // results
+}
+
+bool MDSDmclockScheduler::_process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss)
+{
+  string path;
+  if(!cmd_getval(cmdmap, "path", path)) {
+    ss << "malformed path";
+    return false;
+  }
+
+  if (check_volume_info_existence(path) == false) {
+    ss << "volume_info doesn't exist (" << path << ")" ;
+    return false;
+  }
+
+  dout(5) << "dmclock path " << path << dendl;
+
+  set_default_volume_info(path);
+
+  return true;
+}
+
+
+void MDSDmclockScheduler::process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f)
+{
+  bool success = _process_asok_qos_rm(cmdmap, ss);
+  f->open_object_section("results");
+  f->dump_int("return_code", int(success));
+  f->close_section(); // results
+}
+
+void MDSDmclockScheduler::process_asok_qos_get(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f)
+{
+  string path;
+  if(!cmd_getval(cmdmap, "path", path)) {
+    ss << "malformed path";
+    return;
+  }
+
+  VolumeInfo vi;
+
+  if (!copy_volume_info(path, vi)) {
+    ss << "volume_info doesn't exist (" << path << ")" ;
+    return;
+  }
+
+  f->open_object_section("volume_qos_info");
+  vi.dump(f, path);
+  f->close_section();
+}
diff --git a/src/mds/MDSDmclockScheduler.h b/src/mds/MDSDmclockScheduler.h
new file mode 100644 (file)
index 0000000..f224bff
--- /dev/null
@@ -0,0 +1,495 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 LINE
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef MDS_DMCLOCK_SCHEDULER_H_
+#define MDS_DMCLOCK_SCHEDULER_H_
+
+#include <string>
+#include <chrono>
+#include <functional>
+#include <map>
+#include <mutex>
+#include <deque>
+#include <vector>
+
+#include "include/types.h"
+#include "mdstypes.h"
+
+#include "MDSRank.h"
+#include "messages/MClientReply.h"
+#include "messages/MClientRequest.h"
+#include "messages/MClientSession.h"
+#include "msg/Messenger.h"
+#include "dmclock/src/dmclock_server.h"
+#include "CInode.h"
+
+class ClientRequest;
+
+using MDSReqRef = cref_t<MClientRequest>;
+using crimson::dmclock::ClientInfo;
+using crimson::dmclock::AtLimit;
+using crimson::dmclock::PhaseType;
+using crimson::dmclock::ReqParams;
+using crimson::dmclock::Cost;
+using Time = double;
+using ClientId = std::string;
+using VolumeId = ClientId;
+using SessionId = std::string;
+using std::placeholders::_1;
+using std::placeholders::_2;
+using std::placeholders::_3;
+using std::placeholders::_4;
+using std::ostream;
+using std::to_string;
+using std::vector;
+
+using Queue = crimson::dmclock::PushPriorityQueue<VolumeId, ClientRequest>;
+
+enum class RequestType {
+  CLIENT_REQUEST = 0,
+  UPDATE_REQUEST
+};
+
+using RequestCB = std::function<void()>;
+
+class Request {
+private:
+  RequestType type;
+  VolumeId volume_id;
+public:
+  Request(RequestType _type, VolumeId _volume_id) :
+    type(_type), volume_id(_volume_id) {};
+
+  Request(RequestType _type, VolumeId _volume_id, RequestCB _cb_func) :
+    type(_type), volume_id(_volume_id), cb_func(_cb_func) {};
+
+  RequestType get_request_type() const
+  {
+    return type;
+  }
+
+  std::string_view get_request_type_str() const
+  {
+    switch(type) {
+      case RequestType::CLIENT_REQUEST:
+        return "CLIENT_REQUEST";
+      case RequestType::UPDATE_REQUEST:
+        return "UPDATE_REQUEST";
+      default:
+        return "UNKOWN_REQUEST";
+    }
+  }
+
+  const VolumeId& get_volume_id() const
+  {
+    return volume_id;
+  }
+  RequestCB cb_func;
+};
+
+class ClientRequest : public Request {
+public:
+  const MDSReqRef mds_req_ref;
+  Time time;
+  uint32_t cost;
+  explicit ClientRequest(const MDSReqRef &_mds_req_ref, VolumeId _id,
+      double _time, uint32_t _cost) :
+      Request(RequestType::CLIENT_REQUEST, _id),
+      mds_req_ref(_mds_req_ref), time(_time), cost(_cost) {};
+};
+
+class UpdateRequest : public Request {
+public:
+  ClientInfo client_info;
+  bool use_default;
+
+  UpdateRequest(VolumeId _id, const ClientInfo& _client_info, const bool _use_default):
+    Request(RequestType::UPDATE_REQUEST, _id),
+    client_info(_client_info.reservation, _client_info.weight, _client_info.limit),
+    use_default(_use_default) {};
+  UpdateRequest(VolumeId _id, const ClientInfo& _client_info, const bool _use_default, RequestCB _cb_func):
+    Request(RequestType::UPDATE_REQUEST, _id, _cb_func),
+    client_info(_client_info.reservation, _client_info.weight, _client_info.limit),
+    use_default(_use_default) {};
+};
+
+class QoSInfo : public ClientInfo {
+public:
+  explicit QoSInfo(const double reservation, const double weight, const double limit) :
+    ClientInfo(reservation, weight, limit) {};
+
+  void set_reservation(const double _reservation)
+  {
+    reservation = _reservation;
+    reservation_inv = 1.0 / _reservation;
+  }
+
+  void set_weight(const double _weight)
+  {
+    weight = _weight;
+    weight_inv = 1.0 / _weight;
+  }
+
+  void set_limit(const double _limit)
+  {
+    limit = _limit;
+    limit_inv = 1.0 / _limit;
+  }
+
+  double get_reservation() const
+  {
+    return reservation;
+  }
+
+  double get_weight() const
+  {
+    return weight;
+  }
+
+  double get_limit() const
+  {
+    return limit;
+  }
+
+  const ClientInfo* get_qos_info() const
+  {
+    return this;
+  }
+};
+
+class VolumeInfo : public QoSInfo {
+private:
+  bool use_default;
+  bool updated;
+  std::set<SessionId> session_list;
+  int inflight_requests;
+  DecayCounter throttle;
+  double latency_sum;
+  double latency_cnt;
+
+public:
+  explicit VolumeInfo():
+    QoSInfo(0.0, 0.0, 0.0), use_default(true), inflight_requests(0)
+  {
+    throttle = DecayCounter(60.0); // 60secs
+    latency_sum = 0.0;
+    latency_cnt = 0.0;
+  };
+
+  uint64_t get_throttle_cnt() const
+  {
+    return (uint64_t)throttle.get();
+  }
+
+  void hit_throttle()
+  {
+    throttle.adjust();
+  }
+
+  void update_latency(double latency)
+  {
+    latency_sum += latency;
+    latency_cnt += 1;
+
+    if (latency_cnt >= 1000) {
+      latency_sum /= 2;
+      latency_cnt /= 2;
+    }
+  }
+
+  double get_latency_avg() const
+  {
+    double val = latency_sum / latency_cnt;
+    if (std::isnan(val))
+      return 0.0;
+    return val;
+  }
+
+  int32_t get_session_cnt() const
+  {
+    return session_list.size();
+  }
+
+  bool is_use_default() const
+  {
+    return use_default;
+  }
+  void set_use_default(bool _use_default)
+  {
+    use_default = _use_default;
+  }
+
+  void set_updated(bool flag)
+  {
+    updated = flag;
+  }
+
+  bool get_updated()
+  {
+    return updated;
+  }
+
+  void update(const ClientInfo& client_info, const bool use_default)
+  {
+    set_reservation(client_info.reservation);
+    set_weight(client_info.weight);
+    set_limit(client_info.limit);
+    set_use_default(use_default);
+  }
+
+  void add_session(const SessionId &sid)
+  {
+    session_list.insert(sid);
+  }
+
+  void remove_session(const SessionId &sid)
+  {
+    auto it = session_list.find(sid);
+    if (it != session_list.end()) {
+      session_list.erase(it);
+    }
+  }
+
+  int get_inflight_request() const
+  {
+    return inflight_requests;
+  }
+
+  void increase_inflight_request()
+  {
+    inflight_requests++;
+  }
+
+  void decrease_inflight_request()
+  {
+    inflight_requests--;
+  }
+
+  void dump(Formatter *f, const std::string &vid) const
+  {
+    f->dump_string("volume_id", vid);
+    f->dump_bool("use_default", is_use_default());
+    if (!is_use_default()) {
+      f->dump_float("reservation", get_reservation());
+      f->dump_float("weight", get_weight());
+      f->dump_float("limit", get_limit());
+    }
+    f->dump_int("throttle", get_throttle_cnt());
+    f->dump_float("latency_avg", get_latency_avg());
+    f->dump_int("inflight_requests", get_inflight_request());
+    f->dump_int("session_cnt", get_session_cnt());
+    {
+      f->open_array_section("session_list");
+      for (auto &it : session_list) {
+        f->dump_string("session_id", it);
+      }
+      f->close_section();
+    }
+  }
+};
+
+ostream& operator<<(ostream& os, const VolumeInfo* vi);
+
+class mds_dmclock_conf : public QoSInfo {
+private:
+  bool enabled;
+
+public:
+  mds_dmclock_conf(): QoSInfo(0.0, 0.0, 0.0), enabled(false){};
+
+  bool get_status() const
+  {
+    return enabled;
+  }
+
+  bool is_enabled() const
+  {
+    return enabled;
+  }
+
+  void set_status(const bool _enabled)
+  {
+    enabled = _enabled;
+  }
+};
+
+enum class SchedulerState {
+  INIT,
+  RUNNING,
+  FINISHING,
+  SHUTDOWN,
+};
+
+class MDSRank;
+
+class MDSDmclockScheduler {
+private:
+  SchedulerState state;
+  mds_dmclock_conf default_conf;
+  int total_inflight_requests;
+  MDSRank *mds;
+  bool mds_is_active;
+  Queue *dmclock_queue;
+  std::map<VolumeId, VolumeInfo> volume_info_map;
+  mutable std::mutex volume_info_lock;
+
+public:
+  static constexpr uint32_t SUBVOL_ROOT_DEPTH = 3;
+
+  std::map<VolumeId, VolumeInfo> &get_volume_info_map()
+  {
+    return volume_info_map;
+  }
+  mds_dmclock_conf get_default_conf()
+  {
+    return default_conf;
+  }
+
+  /* volume QoS info management */
+  void create_volume_info(const VolumeId &vid, const ClientInfo &client_info, const bool use_default);
+  void add_session_to_volume_info(const VolumeId &vid, const SessionId &sid);
+  void update_volume_info(const VolumeId &vid, const ClientInfo& client_info, const bool use_default);
+  VolumeInfo *get_volume_info_ptr(const VolumeId &vid);
+  bool copy_volume_info(const VolumeId &vid, VolumeInfo &vi);
+  bool check_volume_info_existence(const VolumeId &vid);
+  bool check_volume_info_updated(const VolumeId &vid);
+  bool check_volume_info_validity(const VolumeId &vid);
+  void hit_volume_throttle(const VolumeId &vid, Time arrival);
+  void set_volume_info_updated(const VolumeId &vid);
+  void delete_session_from_volume_info(const VolumeId &vid, const SessionId &sid);
+  void set_default_volume_info(const VolumeId &vid);
+  void dump(Formatter *f) const;
+
+  void add_session(Session *session);
+  void remove_session(Session *session);
+  void add_or_remove_session_map(const bool is_add);
+
+  void handle_client_request(const MDSReqRef &req);
+  template<typename R>
+  void enqueue_client_request(const R &mds_req, VolumeId volume_id, Cost cost);
+  void submit_request_to_mds(const VolumeId &, std::unique_ptr<ClientRequest> &&, const PhaseType&, const uint64_t);
+  const ClientInfo *get_client_info(const VolumeId &vid);
+
+  void handle_conf_change(const std::set<std::string>& changed);
+
+  void enable_qos_feature();
+  void disable_qos_feature();
+
+  void try_enable_qos_feature();
+  void try_disable_qos_feature();
+
+  CInode *read_xattrs(const VolumeId vid);
+
+  /* request event handler */
+  void begin_schedule_thread();
+  void process_request();
+  void process_request_handler();
+  std::thread scheduler_thread;
+  mutable std::mutex queue_mutex;
+  std::condition_variable queue_cvar;
+
+  std::deque<std::unique_ptr<Request>> request_queue;
+  void enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default);
+  void enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default, RequestCB cb_func);
+  uint32_t get_request_queue_size() const;
+
+  const VolumeId get_volume_id(Session *session);
+  const SessionId get_session_id(Session *session);
+  const VolumeId convert_subvol_root(const VolumeId& volume_id);
+
+  using RejectThreshold = Time;
+  using AtLimitParam = boost::variant<AtLimit, RejectThreshold>;
+
+  Queue::ClientInfoFunc client_info_func;
+  Queue::CanHandleRequestFunc can_handle_func;
+  Queue::HandleRequestFunc handle_request_func;
+
+  MDSDmclockScheduler(MDSRank *m, const Queue::ClientInfoFunc _client_info_func,
+      const Queue::CanHandleRequestFunc _can_handle_func,
+      const Queue::HandleRequestFunc _handle_request_func) : mds(m)
+  {
+    if (_client_info_func) {
+      client_info_func = _client_info_func;
+    } else {
+      client_info_func = std::bind(&MDSDmclockScheduler::get_client_info, this, _1);
+    }
+
+    if (_can_handle_func) {
+      can_handle_func = _can_handle_func;
+    } else {
+      can_handle_func = []()->bool{ return true;};
+    }
+
+    if (_handle_request_func) {
+      handle_request_func = _handle_request_func;
+    } else {
+      handle_request_func = std::bind(&MDSDmclockScheduler::submit_request_to_mds, this, _1, _2, _3, _4);
+    }
+
+    dmclock_queue = new Queue(client_info_func,
+        can_handle_func,
+        handle_request_func);
+
+    state = SchedulerState::RUNNING;
+    total_inflight_requests = 0;
+
+    begin_schedule_thread();
+
+    default_conf.set_reservation(g_conf().get_val<double>("mds_dmclock_reservation"));
+    default_conf.set_weight(g_conf().get_val<double>("mds_dmclock_weight"));
+    default_conf.set_limit(g_conf().get_val<double>("mds_dmclock_limit"));
+    default_conf.set_status(g_conf().get_val<bool>("mds_dmclock_enable"));
+  }
+
+  MDSDmclockScheduler(MDSRank *m) :
+    MDSDmclockScheduler(m,
+      Queue::ClientInfoFunc(),
+      Queue::CanHandleRequestFunc(),
+      Queue::HandleRequestFunc())
+  {
+    // empty
+  }
+
+  ~MDSDmclockScheduler();
+
+  SessionMap *get_session_map();
+  mds_rank_t get_nodeid();
+  void mds_lock();
+  void mds_unlock();
+  void set_mds_is_active(bool value);
+  bool get_mds_is_active();
+
+  Queue *get_dmclock_queue()
+  {
+    return dmclock_queue;
+  }
+
+  void cancel_inflight_request();
+  void increase_inflight_request(const VolumeId &vid);
+  void decrease_inflight_request(const VolumeId &vid);
+  int get_inflight_request(const VolumeId &vid);
+  Cost get_op_cost(int op);
+  bool _process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss);
+  void process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f);
+  void process_asok_qos_get(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f);
+  bool _process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss);
+  void process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f);
+
+  void shutdown();
+  friend ostream& operator<<(ostream& os, const VolumeInfo* vi);
+
+  std::string_view get_state_str() const;
+};
+
+#endif // MDS_DMCLOCK_SCHEDULER_H_
index 02816d700c53bf8d9a631c866abbd2a636dd30a3..8de63c111f7fd09254df740c4fe278ac8fa5a132 100644 (file)
@@ -526,6 +526,8 @@ MDSRank::MDSRank(
 
   _heartbeat_reset_grace = g_conf().get_val<uint64_t>("mds_heartbeat_reset_grace");
   heartbeat_grace = g_conf().get_val<double>("mds_heartbeat_grace");
+  mds_dmclock_scheduler = new MDSDmclockScheduler(this);
+
   op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
                                          cct->_conf->mds_op_log_threshold);
   op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
@@ -553,6 +555,7 @@ MDSRank::~MDSRank()
 
   if (server) { delete server; server = 0; }
   if (locker) { delete locker; locker = 0; }
+  if (mds_dmclock_scheduler) { delete mds_dmclock_scheduler; mds_dmclock_scheduler = 0; }
 
   if (logger) {
     g_ceph_context->get_perfcounters_collection()->remove(logger);
@@ -2122,6 +2125,8 @@ void MDSRank::active_start()
   dout(1) << "active_start" << dendl;
 
   m_is_active = true;
+  mds_dmclock_scheduler->set_mds_is_active(true);
+  mds_dmclock_scheduler->try_enable_qos_feature();
 
   if (last_state == MDSMap::STATE_CREATING ||
       last_state == MDSMap::STATE_STARTING) {
@@ -2242,6 +2247,9 @@ void MDSRank::stopping_start()
 {
   dout(2) << "Stopping..." << dendl;
 
+  mds_dmclock_scheduler->try_disable_qos_feature();
+  mds_dmclock_scheduler->set_mds_is_active(false);
+
   if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
     std::vector<Session*> victims;
     const auto& sessions = sessionmap.get_sessions();
@@ -3117,6 +3125,18 @@ void MDSRankDispatcher::handle_asok_command(
      dout(10) << "dump_stray wait" << dendl;
     }
     return;
+  } else if (command == "dump qos") {
+    std::lock_guard l(mds_lock);
+    mds_dmclock_scheduler->dump(f);
+  } else if (command == "qos set") {
+    std::lock_guard l(mds_lock);
+    mds_dmclock_scheduler->process_asok_qos_set(cmdmap, *css, f);
+  } else if (command == "qos rm") {
+    std::lock_guard l(mds_lock);
+    mds_dmclock_scheduler->process_asok_qos_rm(cmdmap, *css, f);
+  } else if (command == "qos get") {
+    std::lock_guard l(mds_lock);
+    mds_dmclock_scheduler->process_asok_qos_get(cmdmap, *css, f);
   } else {
     r = -ENOSYS;
   }
@@ -4129,6 +4149,10 @@ std::vector<std::string> MDSRankDispatcher::get_tracked_keys()
     "mds_cap_revoke_eviction_timeout",
     "mds_debug_subtrees",
     "mds_dir_max_entries",
+    "mds_dmclock_enable",
+    "mds_dmclock_limit",
+    "mds_dmclock_reservation",
+    "mds_dmclock_weight",
     "mds_dump_cache_threshold_file",
     "mds_dump_cache_threshold_formatter",
     "mds_enable_op_tracker",
@@ -4261,6 +4285,7 @@ void MDSRankDispatcher::handle_conf_change(const ConfigProxy& conf, const std::s
     mdlog->handle_conf_change(changed, *mdsmap);
     purge_queue.handle_conf_change(changed, *mdsmap);
     scrubstack->handle_conf_change(changed);
+    mds_dmclock_scheduler->handle_conf_change(changed);
   }));
 }
 
index 80657432c430cfebfb595cd5cefdeac474be73b9..2621f85b3cb2afaa4d8a32f6609e581a7f008126 100644 (file)
@@ -30,6 +30,7 @@
 #include "SessionMap.h"
 #include "PurgeQueue.h"
 #include "MetricsHandler.h"
+#include "MDSDmclockScheduler.h"
 
 // Full .h import instead of forward declaration for PerfCounter, for the
 // benefit of those including this header and using MDSRank::logger
@@ -154,6 +155,7 @@ class ScrubStack;
 class C_ExecAndReply;
 class QuiesceDbManager;
 class QuiesceAgent;
+class MDSDmclockScheduler;
 
 /**
  * The public part of this class's interface is what's exposed to all
@@ -426,6 +428,8 @@ class MDSRank {
 
     SessionMap sessionmap;
 
+    MDSDmclockScheduler *mds_dmclock_scheduler = nullptr;
+
     PerfCounters *logger = nullptr, *mlogger = nullptr;
     OpTracker op_tracker;
 
index 56d55cc42d1c9f42194d4e2ab4a61b5650047d69..ee50177f5afc25679aff80f6f35b64f249be9154 100644 (file)
@@ -400,7 +400,7 @@ void Server::dispatch(const cref_t<Message> &m)
     handle_client_session(ref_cast<MClientSession>(m));
     return;
   case CEPH_MSG_CLIENT_REQUEST:
-    handle_client_request(ref_cast<MClientRequest>(m));
+    mds->mds_dmclock_scheduler->handle_client_request(ref_cast<MClientRequest>(m));
     return;
   case CEPH_MSG_CLIENT_REPLY:
     handle_client_reply(ref_cast<MClientReply>(m));
@@ -953,6 +953,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
             << ", noop" << dendl;
     // close must have been canceled (by an import?), or any number of other things..
   } else if (open) {
+    mds->mds_dmclock_scheduler->add_session(session);
     ceph_assert(session->is_opening());
     mds->sessionmap.set_state(session, Session::STATE_OPEN);
     mds->sessionmap.touch_session(session);
@@ -1013,6 +1014,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
         session->get_connection()->mark_disposable();
       }
 
+      mds->mds_dmclock_scheduler->remove_session(session);
       // reset session
       mds->send_message_client(make_message<MClientSession>(CEPH_SESSION_CLOSE), session);
       mds->sessionmap.set_state(session, Session::STATE_CLOSED);
@@ -1026,6 +1028,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve
         mds->sessionmap.set_state(session, Session::STATE_CLOSED);
         session->set_connection(nullptr);
       }
+      mds->mds_dmclock_scheduler->remove_session(session);
       metrics_handler->remove_session(session);
       mds->sessionmap.remove_session(session);
     } else {
@@ -1114,6 +1117,7 @@ void Server::finish_force_open_sessions(map<client_t,pair<Session*,uint64_t> >&
        it.second.second = mds->sessionmap.set_state(session, Session::STATE_OPEN);
        mds->sessionmap.touch_session(session);
         metrics_handler->add_session(session);
+        mds->mds_dmclock_scheduler->add_session(session);
 
        auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
        if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {
@@ -1675,6 +1679,7 @@ void Server::handle_client_reconnect(const cref_t<MClientReconnect> &m)
 
   if (!m->has_more()) {
     metrics_handler->add_session(session);
+    mds->mds_dmclock_scheduler->add_session(session);
     // notify client of success with an OPEN
     auto reply = make_message<MClientSession>(CEPH_SESSION_OPEN);
     if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {