From: Yongseok Oh Date: Wed, 9 Dec 2020 07:54:48 +0000 (+0900) Subject: mds: add MDS dmClock scheduler for subvolume QoS support X-Git-Tag: testing/wip-vshankar-testing-20260212.053105~1^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3283138c962c98f9bc986368883434a98c5cdcd5;p=ceph-ci.git mds: add MDS dmClock scheduler for subvolume QoS support Signed-off-by: Yongseok Oh Signed-off-by: Venky Shankar --- diff --git a/src/common/options/mds.yaml.in b/src/common/options/mds.yaml.in index b7ec143ab6f..a31f2ec13d2 100644 --- a/src/common/options/mds.yaml.in +++ b/src/common/options/mds.yaml.in @@ -1811,4 +1811,40 @@ options: default: 30 min: 30 services: - - mds \ No newline at end of file + - mds +- name: mds_dmclock_enable + type: bool + level: advanced + desc: enable MDS dmClock QoS scheduler + log_dest: This option can be configured when reservation, weight, and limit values are + greater than zero. + default: false + services: + - mds +- name: mds_dmclock_reservation + type: float + level: advanced + desc: dmclock reservation for each MDS client + default: 1000.0 + services: + - mds + flags: + - runtime +- name: mds_dmclock_weight + type: float + level: advanced + desc: dmclock weight for each MDS client + default: 1000.0 + services: + - mds + flags: + - runtime +- name: mds_dmclock_limit + type: float + level: advanced + desc: dmclock limit for each MDS client + default: 1000.0 + services: + - mds + flags: + - runtime diff --git a/src/mds/CMakeLists.txt b/src/mds/CMakeLists.txt index f3980c7e04b..8def0893896 100644 --- a/src/mds/CMakeLists.txt +++ b/src/mds/CMakeLists.txt @@ -45,6 +45,7 @@ set(mds_srcs QuiesceDbManager.cc QuiesceAgent.cc MDSRankQuiesce.cc + MDSDmclockScheduler.cc ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc @@ -52,5 +53,6 @@ set(mds_srcs add_library(mds STATIC ${mds_srcs}) target_link_libraries(mds PRIVATE legacy-option-headers Boost::url - heap_profiler cpu_profiler osdc ${LUA_LIBRARIES}) + heap_profiler cpu_profiler osdc ${LUA_LIBRARIES} + PUBLIC dmclock::dmclock) target_include_directories(mds PRIVATE "${LUA_INCLUDE_DIR}") diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 1e0a6ed07b9..15731bb90bf 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -565,6 +565,28 @@ void MDSDaemon::set_up_admin_socket() asok_hook, "dump stray folder content"); ceph_assert(r == 0); + r = admin_socket->register_command("dump qos", + asok_hook, + "dump qos info"); + ceph_assert(r == 0); + r = admin_socket->register_command("qos set " + "name=path,type=CephString,req=true " + "name=reservation,type=CephInt,req=true " + "name=weight,type=CephInt,req=true " + "name=limit,type=CephInt,req=true", + asok_hook, + "set qos info"); + ceph_assert(r == 0); + r = admin_socket->register_command("qos rm " + "name=path,type=CephString,req=true", + asok_hook, + "rm qos info"); + ceph_assert(r == 0); + r = admin_socket->register_command("qos get " + "name=path,type=CephString,req=true", + asok_hook, + "get qos info"); + ceph_assert(r == 0); } void MDSDaemon::clean_up_admin_socket() diff --git a/src/mds/MDSDmclockScheduler.cc b/src/mds/MDSDmclockScheduler.cc new file mode 100644 index 00000000000..2dec598b6e1 --- /dev/null +++ b/src/mds/MDSDmclockScheduler.cc @@ -0,0 +1,888 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 LINE + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "Server.h" +#include "SessionMap.h" +#include "MDSDmclockScheduler.h" +#include "mds/MDSMap.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix *_dout << "mds." << get_nodeid() << ".dmclock_scheduler " + +using std::string; +using TOPNSPC::common::cmd_getval; + +ostream& operator<<(ostream& os, VolumeInfo* vi) +{ + if (vi) { + os << "VolumeInfo: (session_cnt " << vi->get_session_cnt() << ") " + << " reservation " << vi->get_reservation() + << " weight" << vi->get_weight() + << " limit " << vi->get_limit(); + } else { + os << "VolumeInfo has nullptr"; + } + return os; +} + +const VolumeId MDSDmclockScheduler::convert_subvol_root(const VolumeId &volume_id) +{ + filepath subvol_root_path(volume_id); + + if (subvol_root_path.depth() > SUBVOL_ROOT_DEPTH) { + return "/" + subvol_root_path.prefixpath(SUBVOL_ROOT_DEPTH).get_path(); + } + + return volume_id; +} + +const VolumeId MDSDmclockScheduler::get_volume_id(Session *session) +{ + ceph_assert(session != nullptr); + auto client_root_entry = session->info.client_metadata.find("root"); + if (client_root_entry == session->info.client_metadata.end() || client_root_entry->second == "") { + return ""; + } + return convert_subvol_root(client_root_entry->second); +} + +const VolumeId MDSDmclockScheduler::get_session_id(Session *session) +{ + ceph_assert(session != nullptr); + return std::to_string(session->info.inst.name.num()); +} + +template +void MDSDmclockScheduler::enqueue_client_request(const R &mds_req, VolumeId volume_id, Cost cost) +{ + dout(10) << __func__ << " volume_id " << volume_id << dendl; + + std::unique_lock lock(queue_mutex); + request_queue.emplace_back(new ClientRequest(mds_req, volume_id, crimson::dmclock::get_time(), cost)); + /* wake up*/ + lock.unlock(); + queue_cvar.notify_all(); +} + +void MDSDmclockScheduler::handle_client_request(const MDSReqRef &mds_req) +{ + dout(10) << __func__ << " " << *mds_req << dendl; + + if (mds->mds_dmclock_scheduler->default_conf.is_enabled() == true && + mds_req->get_orig_source().is_client() && + mds->is_active() && + check_volume_info_updated(get_volume_id(mds->get_session(mds_req))) && + check_volume_info_validity(get_volume_id(mds->get_session(mds_req)))) { + Cost cost = get_op_cost(mds_req->get_op()); + enqueue_client_request(mds_req, get_volume_id(mds->get_session(mds_req)), cost); + } else { + mds->server->handle_client_request(mds_req); + } +} + +void MDSDmclockScheduler::submit_request_to_mds(const VolumeId& vid, std::unique_ptr&& request, + const PhaseType& phase_type, const uint64_t cost) +{ + dout(10) << __func__ << " volume_id " << vid << dendl; + + hit_volume_throttle(vid, request->time); + + const MDSReqRef& req = request->mds_req_ref; + + mds_lock(); + + mds->server->handle_client_request(req); + + mds_unlock(); + + decrease_inflight_request(request->get_volume_id()); +} + +void MDSDmclockScheduler::shutdown() +{ + dout(10) << __func__ << " state " << get_state_str() << dendl; + + if (default_conf.is_enabled() == true) { + disable_qos_feature(); + } + + std::unique_lock lock(queue_mutex); + state = SchedulerState::FINISHING; + lock.unlock(); + queue_cvar.notify_all(); + + if (scheduler_thread.joinable()) { + scheduler_thread.join(); + } + + state = SchedulerState::SHUTDOWN; + + dout(10) << __func__ << " state " << get_state_str() << dendl; +} + +MDSDmclockScheduler::~MDSDmclockScheduler() +{ + dout(10) << __func__ << dendl; + shutdown(); + delete dmclock_queue; +} + +const ClientInfo *MDSDmclockScheduler::get_client_info(const VolumeId &vid) +{ + dout(10) << __func__ << " volume_id " << vid << dendl; + std::lock_guard lock(volume_info_lock); + + auto vi = get_volume_info_ptr(vid); + const ClientInfo *ci = nullptr; + if (vi != nullptr) { + if (vi->is_use_default() == true) { + dout(15) << __func__ << " default QoS " << *default_conf.get_qos_info() << dendl; + ci = default_conf.get_qos_info(); + } else { + dout(15) << __func__ << " per client specific QoS " << vi->get_qos_info() << dendl; + ci = vi->get_qos_info(); + } + } + return ci; +} + +void MDSDmclockScheduler::dump(Formatter *f) const +{ + f->open_object_section("qos_info"); + + f->open_object_section("qos_state"); + f->dump_bool("qos_enabled", default_conf.is_enabled()); + if (default_conf.is_enabled()) { + f->dump_string("state", get_state_str()); + f->dump_float("default_reservation", default_conf.get_reservation()); + f->dump_float("default_weight", default_conf.get_weight()); + f->dump_float("default_limit", default_conf.get_limit()); + f->dump_int("mds_dmclock_queue_size", get_request_queue_size()); + f->dump_int("inflight_requests", total_inflight_requests); + } + f->close_section(); // qos_state + + f->open_array_section("volume_infos"); + std::lock_guard lock(volume_info_lock); + for (auto it = volume_info_map.begin(); it != volume_info_map.end(); it++) { + auto vol_info = it->second; + f->open_object_section("volume_info"); + vol_info.dump(f, it->first); + f->close_section(); + } + f->close_section(); // volume_infos + + f->close_section(); // qos_info +} + +VolumeInfo *MDSDmclockScheduler::get_volume_info_ptr(const VolumeId &vid) +{ + auto it = volume_info_map.find(vid); + if (it != volume_info_map.end()) { + return &it->second; + } + return nullptr; +} + +bool MDSDmclockScheduler::copy_volume_info(const VolumeId &vid, VolumeInfo &vi) +{ + std::lock_guard lock(volume_info_lock); + auto it = volume_info_map.find(vid); + if (it != volume_info_map.end()) { + vi = it->second; + return true; + } + return false; +} + +bool MDSDmclockScheduler::check_volume_info_existence(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + if (get_volume_info_ptr(vid) != nullptr) { + return true; + } + return false; +} + +bool MDSDmclockScheduler::check_volume_info_updated(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + if (vi != nullptr && vi->get_updated()) { + return true; + } + return false; +} + +void MDSDmclockScheduler::hit_volume_throttle(const VolumeId &vid, Time arrival) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + + if (vi != nullptr && vi->get_updated()) { + double expected_lat; + double expected_iops; + + if (vi->is_use_default()) { + expected_lat = (double)1.0 / default_conf.get_limit(); + expected_iops = default_conf.get_limit(); + } else { + expected_lat = (double)1.0 / vi->get_limit(); + expected_iops = vi->get_limit(); + } + + double latency = crimson::dmclock::get_time() - arrival; + + dout(10) << __func__ << " Volume " << vid << + " current latency " << latency << + " expected latency " << expected_lat << + " inflight request " << vi->get_inflight_request() << + " expected iops " << expected_iops << dendl; + + if (latency > expected_lat && vi->get_inflight_request() > expected_iops) { + vi->hit_throttle(); + } + + vi->update_latency(latency); + } +} + +bool MDSDmclockScheduler::check_volume_info_validity(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + if (vi != nullptr) { + if (vi->is_use_default() == true) { + const ClientInfo *ci = default_conf.get_qos_info(); + if (ci->reservation > 0.0 && + ci->weight > 0.0 && + ci->limit > 0.0) { + return true; + } + + } else { + if (vi->get_reservation() > 0.0 && + vi->get_weight() > 0.0 && + vi->get_limit() > 0.0) + return true; + } + } + return false; +} + +void MDSDmclockScheduler::set_volume_info_updated(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + if (vi != nullptr) { + vi->set_updated(true); + } +} + +void MDSDmclockScheduler::increase_inflight_request(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + ceph_assert(vi!=nullptr); + vi->increase_inflight_request(); + total_inflight_requests++; +} + +void MDSDmclockScheduler::decrease_inflight_request(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + ceph_assert(vi!=nullptr); + vi->decrease_inflight_request(); + total_inflight_requests--; +} + +int MDSDmclockScheduler::get_inflight_request(const VolumeId &vid) +{ + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + ceph_assert(vi!=nullptr); + return vi->get_inflight_request(); +} + +void MDSDmclockScheduler::create_volume_info(const VolumeId &vid, const ClientInfo &client_info, + const bool use_default) +{ + dout(10) << __func__ << " volume_id " << vid << " client_info " + << client_info << " use_default" << use_default << dendl; + + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + + if (vi == nullptr) { + auto [it, success] = volume_info_map.insert(std::make_pair(std::move(vid), VolumeInfo())); + ceph_assert(success==true); + vi = &it->second; + } + + enqueue_update_request(vid, client_info, use_default); +} + +void MDSDmclockScheduler::add_session_to_volume_info(const VolumeId &vid, const SessionId &sid) +{ + dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl; + + std::lock_guard lock(volume_info_lock); + VolumeInfo* vi = get_volume_info_ptr(vid); + ceph_assert(vi!=nullptr); + vi->add_session(sid); +} + +void MDSDmclockScheduler::delete_session_from_volume_info(const VolumeId &vid, const SessionId &sid) +{ + dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl; + + std::lock_guard lock(volume_info_lock); + + auto it = volume_info_map.find(vid); + if (it != volume_info_map.end()) { + auto vi = &it->second; + vi->remove_session(sid); + if (vi->get_session_cnt() == 0) { + dout(15) << __func__ << " erase volume info due to no sessions (volume_id " + << vid << " session_id " << sid << ")" << dendl; + ceph_assert(vi->get_inflight_request()==0); + volume_info_map.erase(it); + } + /* the dmclock library supports only removal of idle clients in the background */ + } +} + +void MDSDmclockScheduler::update_volume_info(const VolumeId &vid, const ClientInfo& client_info, const bool use_default) +{ + dout(10) << __func__ << " volume_id " << vid << " " << client_info << " use_default " << use_default << dendl; + + std::lock_guard lock(volume_info_lock); + + VolumeInfo* vi = get_volume_info_ptr(vid); + if (vi) { + vi->update(client_info, use_default); + } else { + dout(5) << " VolumeInfo is unavaiable (vid = " << vid << ")" << dendl; + } +} + +void MDSDmclockScheduler::set_default_volume_info(const VolumeId &vid) +{ + ClientInfo client_info(0.0, 0.0, 0.0); + dout(10) << __func__ << " vid " << vid << " " << client_info << dendl; + enqueue_update_request(vid, client_info, true); +} + +void MDSDmclockScheduler::add_or_remove_session_map(const bool is_add) +{ + auto sessionmap = get_session_map(); + if (sessionmap == nullptr) { + dout(10) << __func__ << " sessionmap has nullptr" << dendl; + return; + } + dout(0) << __func__ << dendl; + + std::list session_state_list = {Session::STATE_OPEN, Session::STATE_STALE}; + + for (int session_state : session_state_list) { + dout(0) << " session state " << session_state << dendl; + if (auto it = sessionmap->by_state.find(session_state); it != sessionmap->by_state.end()) { + for (const auto &session : *(it->second)) { + dout(0) << " session " << session << dendl; + if (is_add) { + add_session(session); + } else { + dout(0) << " remove session " << session << dendl; + remove_session(session); + } + } + } + } +} + +void MDSDmclockScheduler::add_session(Session *session) +{ + if (get_default_conf().is_enabled() == false) { + return; + } + + if (!get_mds_is_active()) { + dout(5) << __func__ << " Session cannot be added as mds is not active" << dendl; + } + + if (session == nullptr) { + dout(5) << __func__ << " session is nullptr" << dendl; + return; + } + + VolumeId vid = get_volume_id(session); + SessionId sid = get_session_id(session); + + dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl; + + if (check_volume_info_existence(vid) == false) { + ClientInfo client_info(0.0, 0.0, 0.0); + bool use_default = true; + + create_volume_info(vid, client_info, use_default); + } + + add_session_to_volume_info(vid, sid); +} + +void MDSDmclockScheduler::remove_session(Session *session) +{ + if (get_default_conf().is_enabled() == false) { + return; + } + VolumeId vid = get_volume_id(session); + SessionId sid = get_session_id(session); + dout(10) << __func__ << " volume_id " << vid << " session_id " << sid << dendl; + delete_session_from_volume_info(vid, sid); +} + +uint32_t MDSDmclockScheduler::get_request_queue_size() const +{ + std::unique_lock lock(queue_mutex); + return request_queue.size(); +} + +void MDSDmclockScheduler::enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default) +{ + dout(10) << __func__ << " volume_id " << vid << dendl; + + std::unique_lock lock(queue_mutex); + + request_queue.emplace_back(new UpdateRequest(vid, client_info, use_default)); + + /* wake up*/ + lock.unlock(); + queue_cvar.notify_all(); +} + +void MDSDmclockScheduler::enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default, RequestCB cb_func) +{ + dout(10) << __func__ << " volume_id " << vid << dendl; + + std::unique_lock lock(queue_mutex); + + request_queue.emplace_back(new UpdateRequest(vid, client_info, use_default, cb_func)); + /* wake up*/ + lock.unlock(); + queue_cvar.notify_all(); +} + +void MDSDmclockScheduler::process_request_handler() +{ + std::unique_lock lock(queue_mutex); + + if (request_queue.size() == 0) { + queue_cvar.wait(lock); + } + + while (request_queue.size()) { + std::unique_ptr request = std::move(request_queue.front()); + request_queue.erase(request_queue.begin()); + + dout(10) << __func__ << " Process request type " << request->get_request_type_str() << dendl; + + lock.unlock(); + + switch(request->get_request_type()) { + case RequestType::CLIENT_REQUEST: + { + std::unique_ptr c_request(static_cast(request.release())); + + dout(10) << " Process client request (queue size " << request_queue.size() + << " volume_id " << c_request->get_volume_id() + << " time " << c_request->time + << " cost " << c_request->cost << ")" << dendl; + + increase_inflight_request(c_request->get_volume_id()); + + dmclock_queue->add_request(std::move(c_request), std::move(c_request->get_volume_id()), + {0, 0}, c_request->time, c_request->cost); + + break; + } + case RequestType::UPDATE_REQUEST: + { + std::unique_ptr c_request(static_cast(request.release())); + + dout(10) << " Process update request (queue size " << request_queue.size() + << " volume_id " << c_request->get_volume_id() << ")" << dendl; + + update_volume_info(c_request->get_volume_id(), c_request->client_info, c_request->use_default); + dmclock_queue->update_client_info(c_request->get_volume_id()); + set_volume_info_updated(c_request->get_volume_id()); + + if (c_request->cb_func) { + c_request->cb_func(); + } + break; + } + default: + derr << __func__ << " received unknown message " << request->get_request_type_str() << dendl; + } + + lock.lock(); + } +} + +void MDSDmclockScheduler::process_request() +{ + dout(10) << __func__ << " thread has been invoked" << dendl; + /* + * process_request_handler() is responsible for handling request_queue. + * The function uses mutex-based locking, and + * if request_queue is empty, it waits until the request is queued. + * Therefore, it prevents excessive use of CPU by not busy waiting. + */ + while (state == SchedulerState::RUNNING) { + process_request_handler(); + } + dout(10) << __func__ << " thread has been joined" << dendl; +} + +void MDSDmclockScheduler::begin_schedule_thread() +{ + scheduler_thread = std::thread([this](){process_request();}); +} + +void MDSDmclockScheduler::enable_qos_feature() +{ + dout(10) << __func__ << dendl; + + default_conf.set_status(true); + + if (!get_mds_is_active()) { + dout(0) << " MDS Rank is not active and QoS feature cannot be enabled." << dendl; + return; + } + + add_or_remove_session_map(true); +} + +void MDSDmclockScheduler::cancel_inflight_request() +{ + dout(10) << __func__ << dendl; + std::lock_guard lock(volume_info_lock); + std::list req_list; + + auto accum_f = [&req_list] (Queue::RequestRef&& r) + { + req_list.push_front(std::move(r)); + }; + + for (auto it : volume_info_map) { + if (it.second.get_inflight_request()) { + dmclock_queue->remove_by_client(it.first, true, accum_f); + } + } + + dout(10) << __func__ << " canceled requests " << req_list.size() << dendl; + + for (auto& it : req_list) { + dout(10) << " canceled request volume_id " << it->get_volume_id() << " " << *it->mds_req_ref << dendl; + handle_request_func(it->get_volume_id(), std::move(it), PhaseType::reservation, 1); + } + ceph_assert(dmclock_queue->empty() == true); +} + +void MDSDmclockScheduler::disable_qos_feature() +{ + uint32_t queue_size; + bool dmclock_empty; + + dout(10) << __func__ << dendl; + + default_conf.set_status(false); + + do + { + mds_unlock(); + queue_cvar.notify_all(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + queue_size = get_request_queue_size(); + mds_lock(); + } while(queue_size); + + do + { + mds_unlock(); + cancel_inflight_request(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + dmclock_empty = dmclock_queue->empty(); + mds_lock(); + } while(!dmclock_empty); + + add_or_remove_session_map(false); +} + +void MDSDmclockScheduler::try_enable_qos_feature() +{ + if (g_conf().get_val("mds_dmclock_enable")) { + enable_qos_feature(); + } +} + +void MDSDmclockScheduler::try_disable_qos_feature() +{ + if (g_conf().get_val("mds_dmclock_enable")) { + disable_qos_feature(); + } +} + +void MDSDmclockScheduler::handle_conf_change(const std::set& changed) +{ + dout(10) << __func__ << dendl; + + if (changed.count("mds_dmclock_enable")) { + bool new_val = g_conf().get_val("mds_dmclock_enable"); + if (default_conf.is_enabled() != new_val) + { + if (new_val == true) { + enable_qos_feature(); + } else { + disable_qos_feature(); + } + } + } + + if (changed.count("mds_dmclock_reservation") || default_conf.is_enabled() == true) { + dout(10) << " set reservation " << g_conf().get_val("mds_dmclock_reservation") << dendl; + default_conf.set_reservation(g_conf().get_val("mds_dmclock_reservation")); + ceph_assert(default_conf.get_reservation() == g_conf().get_val("mds_dmclock_reservation")); + } + if (changed.count("mds_dmclock_weight") || default_conf.is_enabled() == true) { + dout(10) << " set weight " << g_conf().get_val("mds_dmclock_weight") << dendl; + default_conf.set_weight(g_conf().get_val("mds_dmclock_weight")); + ceph_assert(default_conf.get_weight() == g_conf().get_val("mds_dmclock_weight")); + } + if (changed.count("mds_dmclock_limit") || default_conf.is_enabled() == true) { + dout(10) << " set limit " << g_conf().get_val("mds_dmclock_limit") << dendl; + default_conf.set_limit(g_conf().get_val("mds_dmclock_limit")); + ceph_assert(default_conf.get_limit() == g_conf().get_val("mds_dmclock_limit")); + } + + /* need to check whether conf is updated from ceph.conf when the MDS is restarted */ + dout(10) << __func__ << " enable " << default_conf.is_enabled() + << " reservation " << default_conf.get_reservation() + << " weight " << default_conf.get_weight() + << " limit " << default_conf.get_limit() << dendl; +} + +mds_rank_t MDSDmclockScheduler::get_nodeid() +{ + if (mds != nullptr) { + return mds->get_nodeid(); + } + return 0; +} + +void MDSDmclockScheduler::set_mds_is_active(bool value) +{ + mds_is_active = value; +} + +bool MDSDmclockScheduler::get_mds_is_active() +{ + return mds_is_active; +} +SessionMap *MDSDmclockScheduler::get_session_map() +{ + if (mds != nullptr) { + return &mds->sessionmap; + } + return 0; +} + +void MDSDmclockScheduler::mds_lock() +{ + if (mds != nullptr) { + mds->mds_lock.lock(); + } +} + +void MDSDmclockScheduler::mds_unlock() +{ + if (mds != nullptr) { + mds->mds_lock.unlock(); + } +} + +std::string_view MDSDmclockScheduler::get_state_str() const +{ + switch(state) { + case SchedulerState::INIT: + return "INIT"; + case SchedulerState::RUNNING: + return "RUNNING"; + case SchedulerState::FINISHING: + return "FINISHING"; + case SchedulerState::SHUTDOWN: + return "SHUTDOWN"; + default: + return "UNKONWN"; + } +} + +Cost MDSDmclockScheduler::get_op_cost(const int op) +{ + switch (op) { + case CEPH_MDS_OP_LOOKUP: + case CEPH_MDS_OP_LOOKUPHASH: + case CEPH_MDS_OP_LOOKUPPARENT: + case CEPH_MDS_OP_LOOKUPINO: + case CEPH_MDS_OP_LOOKUPNAME: + case CEPH_MDS_OP_GETATTR: + case CEPH_MDS_OP_READDIR: + case CEPH_MDS_OP_OPEN: + case CEPH_MDS_OP_LOOKUPSNAP: + case CEPH_MDS_OP_LSSNAP: + case CEPH_MDS_OP_GETFILELOCK: + return 1; + case CEPH_MDS_OP_SETXATTR: + case CEPH_MDS_OP_SETATTR: + case CEPH_MDS_OP_RMXATTR: + case CEPH_MDS_OP_SETLAYOUT: + case CEPH_MDS_OP_SETDIRLAYOUT: + case CEPH_MDS_OP_MKNOD: + case CEPH_MDS_OP_LINK: + case CEPH_MDS_OP_UNLINK: + case CEPH_MDS_OP_RENAME: + case CEPH_MDS_OP_MKDIR: + case CEPH_MDS_OP_RMDIR: + case CEPH_MDS_OP_SYMLINK: + case CEPH_MDS_OP_CREATE: + case CEPH_MDS_OP_MKSNAP: + case CEPH_MDS_OP_RMSNAP: + case CEPH_MDS_OP_RENAMESNAP: + case CEPH_MDS_OP_SETFILELOCK: + default: + return 3; + } +} + +bool MDSDmclockScheduler::_process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss) +{ + bool rc; + string path; + rc = cmd_getval(cmdmap, "path", path); + if (!rc) { + ss << "malformed path"; + return false; + } + + int64_t reservation; + rc = cmd_getval(cmdmap, "reservation", reservation); + if (!rc || reservation < 1) { + ss << "malformed reservation"; + return false; + } + + int64_t weight; + rc = cmd_getval(cmdmap, "weight", weight); + if (!rc || weight < 1) { + ss << "malformed weight"; + return false; + } + + int64_t limit; + rc = cmd_getval(cmdmap, "limit", limit); + if (!rc || limit < 1) { + ss << "malformed limit"; + return false; + } + + if (reservation > limit) { + ss << "reservation cannot be greater than limit"; + return false; + } + + if (check_volume_info_existence(path) == false) { + ss << "volume_info doesn't exist (" << path << ")" ; + return false; + } + + dout(5) << "dmclock path " << path << " reservation=" << reservation + << " weight=" << weight + << " limit=" << limit <open_object_section("results"); + f->dump_int("return_code", int(success)); + f->close_section(); // results +} + +bool MDSDmclockScheduler::_process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss) +{ + string path; + if(!cmd_getval(cmdmap, "path", path)) { + ss << "malformed path"; + return false; + } + + if (check_volume_info_existence(path) == false) { + ss << "volume_info doesn't exist (" << path << ")" ; + return false; + } + + dout(5) << "dmclock path " << path << dendl; + + set_default_volume_info(path); + + return true; +} + + +void MDSDmclockScheduler::process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f) +{ + bool success = _process_asok_qos_rm(cmdmap, ss); + f->open_object_section("results"); + f->dump_int("return_code", int(success)); + f->close_section(); // results +} + +void MDSDmclockScheduler::process_asok_qos_get(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f) +{ + string path; + if(!cmd_getval(cmdmap, "path", path)) { + ss << "malformed path"; + return; + } + + VolumeInfo vi; + + if (!copy_volume_info(path, vi)) { + ss << "volume_info doesn't exist (" << path << ")" ; + return; + } + + f->open_object_section("volume_qos_info"); + vi.dump(f, path); + f->close_section(); +} diff --git a/src/mds/MDSDmclockScheduler.h b/src/mds/MDSDmclockScheduler.h new file mode 100644 index 00000000000..f224bfff3fc --- /dev/null +++ b/src/mds/MDSDmclockScheduler.h @@ -0,0 +1,495 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2020 LINE + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef MDS_DMCLOCK_SCHEDULER_H_ +#define MDS_DMCLOCK_SCHEDULER_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include "include/types.h" +#include "mdstypes.h" + +#include "MDSRank.h" +#include "messages/MClientReply.h" +#include "messages/MClientRequest.h" +#include "messages/MClientSession.h" +#include "msg/Messenger.h" +#include "dmclock/src/dmclock_server.h" +#include "CInode.h" + +class ClientRequest; + +using MDSReqRef = cref_t; +using crimson::dmclock::ClientInfo; +using crimson::dmclock::AtLimit; +using crimson::dmclock::PhaseType; +using crimson::dmclock::ReqParams; +using crimson::dmclock::Cost; +using Time = double; +using ClientId = std::string; +using VolumeId = ClientId; +using SessionId = std::string; +using std::placeholders::_1; +using std::placeholders::_2; +using std::placeholders::_3; +using std::placeholders::_4; +using std::ostream; +using std::to_string; +using std::vector; + +using Queue = crimson::dmclock::PushPriorityQueue; + +enum class RequestType { + CLIENT_REQUEST = 0, + UPDATE_REQUEST +}; + +using RequestCB = std::function; + +class Request { +private: + RequestType type; + VolumeId volume_id; +public: + Request(RequestType _type, VolumeId _volume_id) : + type(_type), volume_id(_volume_id) {}; + + Request(RequestType _type, VolumeId _volume_id, RequestCB _cb_func) : + type(_type), volume_id(_volume_id), cb_func(_cb_func) {}; + + RequestType get_request_type() const + { + return type; + } + + std::string_view get_request_type_str() const + { + switch(type) { + case RequestType::CLIENT_REQUEST: + return "CLIENT_REQUEST"; + case RequestType::UPDATE_REQUEST: + return "UPDATE_REQUEST"; + default: + return "UNKOWN_REQUEST"; + } + } + + const VolumeId& get_volume_id() const + { + return volume_id; + } + RequestCB cb_func; +}; + +class ClientRequest : public Request { +public: + const MDSReqRef mds_req_ref; + Time time; + uint32_t cost; + explicit ClientRequest(const MDSReqRef &_mds_req_ref, VolumeId _id, + double _time, uint32_t _cost) : + Request(RequestType::CLIENT_REQUEST, _id), + mds_req_ref(_mds_req_ref), time(_time), cost(_cost) {}; +}; + +class UpdateRequest : public Request { +public: + ClientInfo client_info; + bool use_default; + + UpdateRequest(VolumeId _id, const ClientInfo& _client_info, const bool _use_default): + Request(RequestType::UPDATE_REQUEST, _id), + client_info(_client_info.reservation, _client_info.weight, _client_info.limit), + use_default(_use_default) {}; + UpdateRequest(VolumeId _id, const ClientInfo& _client_info, const bool _use_default, RequestCB _cb_func): + Request(RequestType::UPDATE_REQUEST, _id, _cb_func), + client_info(_client_info.reservation, _client_info.weight, _client_info.limit), + use_default(_use_default) {}; +}; + +class QoSInfo : public ClientInfo { +public: + explicit QoSInfo(const double reservation, const double weight, const double limit) : + ClientInfo(reservation, weight, limit) {}; + + void set_reservation(const double _reservation) + { + reservation = _reservation; + reservation_inv = 1.0 / _reservation; + } + + void set_weight(const double _weight) + { + weight = _weight; + weight_inv = 1.0 / _weight; + } + + void set_limit(const double _limit) + { + limit = _limit; + limit_inv = 1.0 / _limit; + } + + double get_reservation() const + { + return reservation; + } + + double get_weight() const + { + return weight; + } + + double get_limit() const + { + return limit; + } + + const ClientInfo* get_qos_info() const + { + return this; + } +}; + +class VolumeInfo : public QoSInfo { +private: + bool use_default; + bool updated; + std::set session_list; + int inflight_requests; + DecayCounter throttle; + double latency_sum; + double latency_cnt; + +public: + explicit VolumeInfo(): + QoSInfo(0.0, 0.0, 0.0), use_default(true), inflight_requests(0) + { + throttle = DecayCounter(60.0); // 60secs + latency_sum = 0.0; + latency_cnt = 0.0; + }; + + uint64_t get_throttle_cnt() const + { + return (uint64_t)throttle.get(); + } + + void hit_throttle() + { + throttle.adjust(); + } + + void update_latency(double latency) + { + latency_sum += latency; + latency_cnt += 1; + + if (latency_cnt >= 1000) { + latency_sum /= 2; + latency_cnt /= 2; + } + } + + double get_latency_avg() const + { + double val = latency_sum / latency_cnt; + if (std::isnan(val)) + return 0.0; + return val; + } + + int32_t get_session_cnt() const + { + return session_list.size(); + } + + bool is_use_default() const + { + return use_default; + } + void set_use_default(bool _use_default) + { + use_default = _use_default; + } + + void set_updated(bool flag) + { + updated = flag; + } + + bool get_updated() + { + return updated; + } + + void update(const ClientInfo& client_info, const bool use_default) + { + set_reservation(client_info.reservation); + set_weight(client_info.weight); + set_limit(client_info.limit); + set_use_default(use_default); + } + + void add_session(const SessionId &sid) + { + session_list.insert(sid); + } + + void remove_session(const SessionId &sid) + { + auto it = session_list.find(sid); + if (it != session_list.end()) { + session_list.erase(it); + } + } + + int get_inflight_request() const + { + return inflight_requests; + } + + void increase_inflight_request() + { + inflight_requests++; + } + + void decrease_inflight_request() + { + inflight_requests--; + } + + void dump(Formatter *f, const std::string &vid) const + { + f->dump_string("volume_id", vid); + f->dump_bool("use_default", is_use_default()); + if (!is_use_default()) { + f->dump_float("reservation", get_reservation()); + f->dump_float("weight", get_weight()); + f->dump_float("limit", get_limit()); + } + f->dump_int("throttle", get_throttle_cnt()); + f->dump_float("latency_avg", get_latency_avg()); + f->dump_int("inflight_requests", get_inflight_request()); + f->dump_int("session_cnt", get_session_cnt()); + { + f->open_array_section("session_list"); + for (auto &it : session_list) { + f->dump_string("session_id", it); + } + f->close_section(); + } + } +}; + +ostream& operator<<(ostream& os, const VolumeInfo* vi); + +class mds_dmclock_conf : public QoSInfo { +private: + bool enabled; + +public: + mds_dmclock_conf(): QoSInfo(0.0, 0.0, 0.0), enabled(false){}; + + bool get_status() const + { + return enabled; + } + + bool is_enabled() const + { + return enabled; + } + + void set_status(const bool _enabled) + { + enabled = _enabled; + } +}; + +enum class SchedulerState { + INIT, + RUNNING, + FINISHING, + SHUTDOWN, +}; + +class MDSRank; + +class MDSDmclockScheduler { +private: + SchedulerState state; + mds_dmclock_conf default_conf; + int total_inflight_requests; + MDSRank *mds; + bool mds_is_active; + Queue *dmclock_queue; + std::map volume_info_map; + mutable std::mutex volume_info_lock; + +public: + static constexpr uint32_t SUBVOL_ROOT_DEPTH = 3; + + std::map &get_volume_info_map() + { + return volume_info_map; + } + mds_dmclock_conf get_default_conf() + { + return default_conf; + } + + /* volume QoS info management */ + void create_volume_info(const VolumeId &vid, const ClientInfo &client_info, const bool use_default); + void add_session_to_volume_info(const VolumeId &vid, const SessionId &sid); + void update_volume_info(const VolumeId &vid, const ClientInfo& client_info, const bool use_default); + VolumeInfo *get_volume_info_ptr(const VolumeId &vid); + bool copy_volume_info(const VolumeId &vid, VolumeInfo &vi); + bool check_volume_info_existence(const VolumeId &vid); + bool check_volume_info_updated(const VolumeId &vid); + bool check_volume_info_validity(const VolumeId &vid); + void hit_volume_throttle(const VolumeId &vid, Time arrival); + void set_volume_info_updated(const VolumeId &vid); + void delete_session_from_volume_info(const VolumeId &vid, const SessionId &sid); + void set_default_volume_info(const VolumeId &vid); + void dump(Formatter *f) const; + + void add_session(Session *session); + void remove_session(Session *session); + void add_or_remove_session_map(const bool is_add); + + void handle_client_request(const MDSReqRef &req); + template + void enqueue_client_request(const R &mds_req, VolumeId volume_id, Cost cost); + void submit_request_to_mds(const VolumeId &, std::unique_ptr &&, const PhaseType&, const uint64_t); + const ClientInfo *get_client_info(const VolumeId &vid); + + void handle_conf_change(const std::set& changed); + + void enable_qos_feature(); + void disable_qos_feature(); + + void try_enable_qos_feature(); + void try_disable_qos_feature(); + + CInode *read_xattrs(const VolumeId vid); + + /* request event handler */ + void begin_schedule_thread(); + void process_request(); + void process_request_handler(); + std::thread scheduler_thread; + mutable std::mutex queue_mutex; + std::condition_variable queue_cvar; + + std::deque> request_queue; + void enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default); + void enqueue_update_request(const VolumeId& vid, const ClientInfo& client_info, const bool use_default, RequestCB cb_func); + uint32_t get_request_queue_size() const; + + const VolumeId get_volume_id(Session *session); + const SessionId get_session_id(Session *session); + const VolumeId convert_subvol_root(const VolumeId& volume_id); + + using RejectThreshold = Time; + using AtLimitParam = boost::variant; + + Queue::ClientInfoFunc client_info_func; + Queue::CanHandleRequestFunc can_handle_func; + Queue::HandleRequestFunc handle_request_func; + + MDSDmclockScheduler(MDSRank *m, const Queue::ClientInfoFunc _client_info_func, + const Queue::CanHandleRequestFunc _can_handle_func, + const Queue::HandleRequestFunc _handle_request_func) : mds(m) + { + if (_client_info_func) { + client_info_func = _client_info_func; + } else { + client_info_func = std::bind(&MDSDmclockScheduler::get_client_info, this, _1); + } + + if (_can_handle_func) { + can_handle_func = _can_handle_func; + } else { + can_handle_func = []()->bool{ return true;}; + } + + if (_handle_request_func) { + handle_request_func = _handle_request_func; + } else { + handle_request_func = std::bind(&MDSDmclockScheduler::submit_request_to_mds, this, _1, _2, _3, _4); + } + + dmclock_queue = new Queue(client_info_func, + can_handle_func, + handle_request_func); + + state = SchedulerState::RUNNING; + total_inflight_requests = 0; + + begin_schedule_thread(); + + default_conf.set_reservation(g_conf().get_val("mds_dmclock_reservation")); + default_conf.set_weight(g_conf().get_val("mds_dmclock_weight")); + default_conf.set_limit(g_conf().get_val("mds_dmclock_limit")); + default_conf.set_status(g_conf().get_val("mds_dmclock_enable")); + } + + MDSDmclockScheduler(MDSRank *m) : + MDSDmclockScheduler(m, + Queue::ClientInfoFunc(), + Queue::CanHandleRequestFunc(), + Queue::HandleRequestFunc()) + { + // empty + } + + ~MDSDmclockScheduler(); + + SessionMap *get_session_map(); + mds_rank_t get_nodeid(); + void mds_lock(); + void mds_unlock(); + void set_mds_is_active(bool value); + bool get_mds_is_active(); + + Queue *get_dmclock_queue() + { + return dmclock_queue; + } + + void cancel_inflight_request(); + void increase_inflight_request(const VolumeId &vid); + void decrease_inflight_request(const VolumeId &vid); + int get_inflight_request(const VolumeId &vid); + Cost get_op_cost(int op); + bool _process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss); + void process_asok_qos_set(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f); + void process_asok_qos_get(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f); + bool _process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss); + void process_asok_qos_rm(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f); + + void shutdown(); + friend ostream& operator<<(ostream& os, const VolumeInfo* vi); + + std::string_view get_state_str() const; +}; + +#endif // MDS_DMCLOCK_SCHEDULER_H_ diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 02816d700c5..8de63c111f7 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -526,6 +526,8 @@ MDSRank::MDSRank( _heartbeat_reset_grace = g_conf().get_val("mds_heartbeat_reset_grace"); heartbeat_grace = g_conf().get_val("mds_heartbeat_grace"); + mds_dmclock_scheduler = new MDSDmclockScheduler(this); + op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time, cct->_conf->mds_op_log_threshold); op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size, @@ -553,6 +555,7 @@ MDSRank::~MDSRank() if (server) { delete server; server = 0; } if (locker) { delete locker; locker = 0; } + if (mds_dmclock_scheduler) { delete mds_dmclock_scheduler; mds_dmclock_scheduler = 0; } if (logger) { g_ceph_context->get_perfcounters_collection()->remove(logger); @@ -2122,6 +2125,8 @@ void MDSRank::active_start() dout(1) << "active_start" << dendl; m_is_active = true; + mds_dmclock_scheduler->set_mds_is_active(true); + mds_dmclock_scheduler->try_enable_qos_feature(); if (last_state == MDSMap::STATE_CREATING || last_state == MDSMap::STATE_STARTING) { @@ -2242,6 +2247,9 @@ void MDSRank::stopping_start() { dout(2) << "Stopping..." << dendl; + mds_dmclock_scheduler->try_disable_qos_feature(); + mds_dmclock_scheduler->set_mds_is_active(false); + if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) { std::vector victims; const auto& sessions = sessionmap.get_sessions(); @@ -3117,6 +3125,18 @@ void MDSRankDispatcher::handle_asok_command( dout(10) << "dump_stray wait" << dendl; } return; + } else if (command == "dump qos") { + std::lock_guard l(mds_lock); + mds_dmclock_scheduler->dump(f); + } else if (command == "qos set") { + std::lock_guard l(mds_lock); + mds_dmclock_scheduler->process_asok_qos_set(cmdmap, *css, f); + } else if (command == "qos rm") { + std::lock_guard l(mds_lock); + mds_dmclock_scheduler->process_asok_qos_rm(cmdmap, *css, f); + } else if (command == "qos get") { + std::lock_guard l(mds_lock); + mds_dmclock_scheduler->process_asok_qos_get(cmdmap, *css, f); } else { r = -ENOSYS; } @@ -4129,6 +4149,10 @@ std::vector MDSRankDispatcher::get_tracked_keys() "mds_cap_revoke_eviction_timeout", "mds_debug_subtrees", "mds_dir_max_entries", + "mds_dmclock_enable", + "mds_dmclock_limit", + "mds_dmclock_reservation", + "mds_dmclock_weight", "mds_dump_cache_threshold_file", "mds_dump_cache_threshold_formatter", "mds_enable_op_tracker", @@ -4261,6 +4285,7 @@ void MDSRankDispatcher::handle_conf_change(const ConfigProxy& conf, const std::s mdlog->handle_conf_change(changed, *mdsmap); purge_queue.handle_conf_change(changed, *mdsmap); scrubstack->handle_conf_change(changed); + mds_dmclock_scheduler->handle_conf_change(changed); })); } diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 80657432c43..2621f85b3cb 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -30,6 +30,7 @@ #include "SessionMap.h" #include "PurgeQueue.h" #include "MetricsHandler.h" +#include "MDSDmclockScheduler.h" // Full .h import instead of forward declaration for PerfCounter, for the // benefit of those including this header and using MDSRank::logger @@ -154,6 +155,7 @@ class ScrubStack; class C_ExecAndReply; class QuiesceDbManager; class QuiesceAgent; +class MDSDmclockScheduler; /** * The public part of this class's interface is what's exposed to all @@ -426,6 +428,8 @@ class MDSRank { SessionMap sessionmap; + MDSDmclockScheduler *mds_dmclock_scheduler = nullptr; + PerfCounters *logger = nullptr, *mlogger = nullptr; OpTracker op_tracker; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 56d55cc42d1..ee50177f5af 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -400,7 +400,7 @@ void Server::dispatch(const cref_t &m) handle_client_session(ref_cast(m)); return; case CEPH_MSG_CLIENT_REQUEST: - handle_client_request(ref_cast(m)); + mds->mds_dmclock_scheduler->handle_client_request(ref_cast(m)); return; case CEPH_MSG_CLIENT_REPLY: handle_client_reply(ref_cast(m)); @@ -953,6 +953,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve << ", noop" << dendl; // close must have been canceled (by an import?), or any number of other things.. } else if (open) { + mds->mds_dmclock_scheduler->add_session(session); ceph_assert(session->is_opening()); mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->sessionmap.touch_session(session); @@ -1013,6 +1014,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve session->get_connection()->mark_disposable(); } + mds->mds_dmclock_scheduler->remove_session(session); // reset session mds->send_message_client(make_message(CEPH_SESSION_CLOSE), session); mds->sessionmap.set_state(session, Session::STATE_CLOSED); @@ -1026,6 +1028,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve mds->sessionmap.set_state(session, Session::STATE_CLOSED); session->set_connection(nullptr); } + mds->mds_dmclock_scheduler->remove_session(session); metrics_handler->remove_session(session); mds->sessionmap.remove_session(session); } else { @@ -1114,6 +1117,7 @@ void Server::finish_force_open_sessions(map >& it.second.second = mds->sessionmap.set_state(session, Session::STATE_OPEN); mds->sessionmap.touch_session(session); metrics_handler->add_session(session); + mds->mds_dmclock_scheduler->add_session(session); auto reply = make_message(CEPH_SESSION_OPEN); if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) { @@ -1675,6 +1679,7 @@ void Server::handle_client_reconnect(const cref_t &m) if (!m->has_more()) { metrics_handler->add_session(session); + mds->mds_dmclock_scheduler->add_session(session); // notify client of success with an OPEN auto reply = make_message(CEPH_SESSION_OPEN); if (session->info.has_feature(CEPHFS_FEATURE_MIMIC)) {