From 9ba14085dabdd7a3a583baf3d2aaf2d6859f53ea Mon Sep 17 00:00:00 2001 From: Leonid Usov Date: Thu, 30 Nov 2023 16:42:22 +0200 Subject: [PATCH] mds/quiesce: MDSRankQuiesce - integration of the quiesce db manager * create an instance of the QuiesceDbManager in the rank * update membership with a new mdsmap * add an admin socket command for sending requests to the manager Signed-off-by: Leonid Usov (cherry picked from commit edf4bce948477ebd57eaeb9eecdd4beae6a2c6a7) --- src/mds/CMakeLists.txt | 4 + src/mds/MDCache.h | 73 ++++++++ src/mds/MDSDaemon.cc | 19 ++ src/mds/MDSRank.cc | 25 ++- src/mds/MDSRank.h | 9 + src/mds/MDSRankQuiesce.cc | 385 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 514 insertions(+), 1 deletion(-) create mode 100644 src/mds/MDSRankQuiesce.cc diff --git a/src/mds/CMakeLists.txt b/src/mds/CMakeLists.txt index 88c8a1db085..0c6c31a3c51 100644 --- a/src/mds/CMakeLists.txt +++ b/src/mds/CMakeLists.txt @@ -42,6 +42,10 @@ set(mds_srcs MDSPinger.cc MetricAggregator.cc MetricsHandler.cc + QuiesceDbManager.cc + QuiesceAgent.cc + MDSRankQuiesce.cc + BoostUrlImpl.cc ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 94347b8249c..30e004b97a6 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -15,6 +15,7 @@ #define CEPH_MDCACHE_H #include +#include #include #include @@ -137,6 +138,8 @@ static const int PREDIRTY_PRIMARY = 1; // primary dn, adjust nested accounting static const int PREDIRTY_DIR = 2; // update parent dir mtime/size static const int PREDIRTY_SHALLOW = 4; // only go to immediate parent (for easier rollback) +using namespace std::literals::chrono_literals; + class MDCache { public: typedef std::map> expiremap; @@ -525,6 +528,73 @@ class MDCache { std::map >& subtrees); ESubtreeMap *create_subtree_map(); + class QuiesceStatistics { +public: + void inc_inodes() { + inodes++; + } + void inc_inodes_quiesced() { + inodes_quiesced++; + } + uint64_t get_inodes() const { + return inodes; + } + uint64_t get_inodes_quiesced() const { + return inodes_quiesced; + } + void add_failed(const MDRequestRef& mdr, int rc) { + failed[mdr] = rc; + } + int get_failed(const MDRequestRef& mdr) const { + auto it = failed.find(mdr); + return it == failed.end() ? 0 : it->second; + } + const auto& get_failed() const { + return failed; + } + void dump(Formatter* f) const { + f->dump_unsigned("inodes", inodes); + f->dump_unsigned("inodes_quiesced", inodes_quiesced); + f->open_array_section("failed"); + for (auto& [mdr, rc] : failed) { + f->open_object_section("failure"); + f->dump_object("request", *mdr); + f->dump_int("result", rc); + f->close_section(); + } + f->close_section(); + } +private: + uint64_t inodes = 0; + uint64_t inodes_quiesced = 0; + std::map failed; + }; + class C_MDS_QuiescePath : public MDSInternalContext { + public: + C_MDS_QuiescePath(MDCache *c, Context* _finisher=nullptr) : + MDSInternalContext(c->mds), cache(c), finisher(_finisher) {} + ~C_MDS_QuiescePath() { + if (finisher) { + finisher->complete(-ECANCELED); + finisher = nullptr; + } + } + void set_req(const MDRequestRef& _mdr) { + mdr = _mdr; + } + void finish(int r) override { + if (finisher) { + finisher->complete(r); + finisher = nullptr; + } + } + QuiesceStatistics qs; + MDCache *cache; + MDRequestRef mdr; + Context* finisher = nullptr; + }; + MDRequestRef quiesce_path(filepath p, C_MDS_QuiescePath* c, Formatter *f = nullptr, std::chrono::milliseconds delay = 0ms) { c->complete(-ENOTSUP); return nullptr; } + void clean_open_file_lists(); void dump_openfiles(Formatter *f); bool dump_inode(Formatter *f, uint64_t number); @@ -1366,6 +1436,9 @@ class MDCache { void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op); void rollback_uncommitted_fragment(dirfrag_t basedirfrag, frag_vec_t&& old_frags); + void dispatch_quiesce_path(const MDRequestRef& mdr) { } + void dispatch_quiesce_inode(const MDRequestRef& mdr) { } + void upkeep_main(void); uint64_t cache_memory_limit; diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index dc9ea99e6c7..e69bacf8f49 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -458,6 +458,25 @@ void MDSDaemon::set_up_admin_socket() asok_hook, "Respawn this MDS"); ceph_assert(r == 0); + r = admin_socket->register_command("quiesce db " + "name=roots,type=CephString,n=N,req=false " + "-- " + "name=set_id,type=CephString,req=false " + "name=timeout,type=CephFloat,range=0,req=false " + "name=expiration,type=CephFloat,range=0,req=false " + "name=await_for,type=CephFloat,range=0,req=false " + "name=await,type=CephBool,req=false " + "name=if_version,type=CephInt,range=0,req=false " + "name=include,type=CephBool,req=false " + "name=exclude,type=CephBool,req=false " + "name=reset,type=CephBool,req=false " + "name=release,type=CephBool,req=false " + "name=query,type=CephBool,req=false " + "name=all,type=CephBool,req=false " + "name=cancel,type=CephBool,req=false", + asok_hook, + "submit queries to the local QuiesceDbManager"); + ceph_assert(r == 0); r = admin_socket->register_command( "heap " \ "name=heapcmd,type=CephChoices,strings=" \ diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 9cedeea1b13..7d3f9cda9a2 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -18,6 +18,7 @@ #include "common/errno.h" #include "common/likely.h" #include "common/async/blocked_completion.h" +#include "common/cmdparse.h" #include "messages/MClientRequestForward.h" #include "messages/MMDSLoadTargets.h" @@ -42,9 +43,13 @@ #include "events/ELid.h" #include "Mutation.h" - #include "MDSRank.h" +#include "QuiesceDbManager.h" +#include "QuiesceAgent.h" + +#include + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mds #undef dout_prefix @@ -548,6 +553,8 @@ MDSRank::MDSRank( server = new Server(this, &metrics_handler); locker = new Locker(this, mdcache); + quiesce_db_manager.reset(new QuiesceDbManager()); + _heartbeat_reset_grace = g_conf().get_val("mds_heartbeat_reset_grace"); heartbeat_grace = g_conf().get_val("mds_heartbeat_grace"); op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time, @@ -859,12 +866,21 @@ void MDSRankDispatcher::shutdown() progress_thread.shutdown(); + if (quiesce_db_manager) { + // shutdown the manager + quiesce_db_manager->update_membership({}); + } + // release mds_lock for finisher/messenger threads (e.g. // MDSDaemon::ms_handle_reset called from Messenger). mds_lock.unlock(); // shut down messenger messenger->shutdown(); + if (quiesce_agent) { + // reset any tracked roots + quiesce_agent->shutdown(); + } mds_lock.lock(); @@ -2131,6 +2147,8 @@ void MDSRank::active_start() mdcache->reissue_all_caps(); finish_contexts(g_ceph_context, waiting_for_active); // kick waiters + + quiesce_agent_setup(); } void MDSRank::recovery_done(int oldstate) @@ -2588,6 +2606,8 @@ void MDSRankDispatcher::handle_mds_map( metric_aggregator->notify_mdsmap(*mdsmap); } metrics_handler.notify_mdsmap(*mdsmap); + + quiesce_cluster_update(); } void MDSRank::handle_mds_recovery(mds_rank_t who) @@ -2935,6 +2955,9 @@ void MDSRankDispatcher::handle_asok_command( goto out; } damage_table.erase(id); + } else if (command == "quiesce db") { + command_quiesce_db(cmdmap, on_finish); + return; } else { r = -CEPHFS_ENOSYS; } diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index 6ee2e093478..02512be7916 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -151,6 +151,8 @@ class MgrClient; class Finisher; class ScrubStack; class C_ExecAndReply; +class QuiesceDbManager; +class QuiesceAgent; /** * The public part of this class's interface is what's exposed to all @@ -433,6 +435,9 @@ class MDSRank { bool cluster_degraded = false; + std::shared_ptr quiesce_db_manager; + std::shared_ptr quiesce_agent; + Finisher *finisher; protected: typedef enum { @@ -524,6 +529,7 @@ class MDSRank { void command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss); void command_dump_dir(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss); void command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish); + void command_quiesce_db(const cmdmap_t& cmdmap, std::function on_finish); // FIXME the state machine logic should be separable from the dispatch // logic that calls it. @@ -562,6 +568,9 @@ class MDSRank { void handle_mds_recovery(mds_rank_t who); void handle_mds_failure(mds_rank_t who); + void quiesce_cluster_update(); + void quiesce_agent_setup(); + /* Update MDSMap export_targets for this rank. Called on ::tick(). */ void update_targets(); diff --git a/src/mds/MDSRankQuiesce.cc b/src/mds/MDSRankQuiesce.cc new file mode 100644 index 00000000000..49e6f395225 --- /dev/null +++ b/src/mds/MDSRankQuiesce.cc @@ -0,0 +1,385 @@ +#include "MDSRank.h" +#include "MDCache.h" + +#include "QuiesceDbManager.h" +#include "QuiesceAgent.h" +#include +#include +#include +#include + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mds_quiesce +#undef dout_prefix +#define dout_prefix *_dout << "quiesce.mds." << whoami << " <" << __func__ << "> " + +#undef dout +#define dout(lvl) \ + do { \ + auto subsys = ceph_subsys_mds; \ + if ((dout_context)->_conf->subsys.should_gather(dout_subsys, lvl)) { \ + subsys = dout_subsys; \ + } \ + dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix + +#undef dendl +#define dendl \ + dendl_impl; \ + } \ + while (0) + +void MDSRank::command_quiesce_db(const cmdmap_t& cmdmap, std::function on_finish) +{ + // validate the command: + using ceph::common::cmd_getval; + using ceph::common::cmd_getval_or; + using std::chrono::duration_cast; + using dd = std::chrono::duration; + + bool op_include = cmd_getval_or(cmdmap, "include", false); + bool op_query = cmd_getval_or(cmdmap, "query", false); + bool op_exclude = cmd_getval_or(cmdmap, "exclude", false); + bool op_reset = cmd_getval_or(cmdmap, "reset", false); + bool op_release = cmd_getval_or(cmdmap, "release", false); + bool op_cancel = cmd_getval_or(cmdmap, "cancel", false); + bool all = cmd_getval_or(cmdmap, "all", false); + std::optional set_id = cmd_getval(cmdmap, "set_id"); + + auto roots = cmd_getval_or>(cmdmap, "roots", std::vector {}); + + int all_ops = op_include + op_exclude + op_reset + op_release + op_cancel + op_query; + + if (all_ops > 1) { + bufferlist bl; + on_finish(-EINVAL, "Operations [include, exclude, reset, release, cancel, query] are mutually exclusive", bl); + return; + } else if (all_ops == 0) { + op_include = true; + } + + if ((op_release || op_cancel) && roots.size() > 0) { + bufferlist bl; + on_finish(-EINVAL, "Operations [release, cancel] don't take roots", bl); + return; + } + + if (op_cancel && !set_id && !all) { + bufferlist bl; + on_finish(-EINVAL, "Operation `cancel` requires a `--set-id` or `--all` to cancel all active sets", bl); + return; + } + + if (op_reset && !set_id) { + bufferlist bl; + on_finish(-EINVAL, "Operation `reset` requires a `--set-id`", bl); + return; + } + + if (op_reset && roots.empty()) { + bufferlist bl; + on_finish(-EINVAL, "Operation `reset` expects at least one root", bl); + return; + } + + if (op_query && roots.size() > 0) { + bufferlist bl; + on_finish(-EINVAL, "Operation `query` doesn't take any roots", bl); + return; + } + + if (!quiesce_db_manager) { + bufferlist bl; + on_finish(-EFAULT, "No quiesce db manager instance", bl); + return; + } + + struct Ctx : public QuiesceDbManager::RequestContext { + std::function on_finish; + bool all = false; + + double sec(QuiesceTimeInterval duration) { + return duration_cast
(duration).count(); + } + + double age(QuiesceTimeInterval of, QuiesceTimeInterval ref) { + return sec(ref - of); + } + + double age(QuiesceTimeInterval of = QuiesceTimeInterval::zero()) { + return age(of, response.db_age); + } + + void finish(int rc) + { + auto f = Formatter::create_unique("json-pretty"); + CachedStackStringStream css; + bufferlist outbl; + + auto dump_seconds = [&f](const std::string_view& name, double seconds) { + f->dump_format_unquoted(name, "%0.1f", seconds); + }; + + f->open_object_section("response"); { + f->dump_int("epoch", response.db_version.epoch); + f->dump_int("set_version", response.db_version.set_version); + f->open_object_section("sets"); { + for (auto&& [set_id, set] : response.sets) { + if (!all && !set.is_active() && set_id != request.set_id) { + continue; + } + f->open_object_section(set_id); { + f->dump_int("version", set.version); + QuiesceTimeInterval ref = response.db_age; + if (!set.is_active()) { + ref = set.rstate.at_age; + } + dump_seconds("age_ref", age(ref)); + f->open_object_section("state"); { + f->dump_string("name", quiesce_state_name(set.rstate.state)); + dump_seconds("age", age(set.rstate.at_age, ref)); + } f->close_section(); + dump_seconds("timeout", sec(set.timeout)); + dump_seconds("expiration", sec(set.expiration)); + f->open_object_section("members"); { + for (auto&& [root, info] : set.members) { + f->open_object_section(root); { + f->dump_bool("excluded", info.excluded); + f->open_object_section("state"); { + f->dump_string("name", quiesce_state_name(info.rstate.state)); + dump_seconds("age", age(info.rstate.at_age, ref)); + } f->close_section(); + } f->close_section(); + } + } f->close_section(); + } f->close_section(); + } + } f->close_section(); + } f->close_section(); + + f->flush(outbl); + on_finish(rc, css->str(), outbl); + } + }; + + auto* ctx = new Ctx(); + + ctx->on_finish = std::move(on_finish); + ctx->all = all; + + ctx->request.reset([&](auto& r) { + r.set_id = set_id; + + if (op_include) { + r.include_roots(roots); + } else if (op_exclude) { + r.exclude_roots(roots); + } else if (op_reset) { + r.reset_roots(roots); + } else if (op_release) { + r.release_roots(); + } else if (op_cancel) { + r.cancel_roots(); + } + + double timeout; + + if (cmd_getval(cmdmap, "await_for", timeout)) { + r.await = duration_cast(dd(timeout)); + } else if (cmd_getval_or(cmdmap, "await", false)) { + r.await = QuiesceTimeInterval::max(); + } + + if (cmd_getval(cmdmap, "expiration", timeout)) { + r.expiration = duration_cast(dd(timeout)); + } + + if (cmd_getval(cmdmap, "timeout", timeout)) { + r.timeout = duration_cast(dd(timeout)); + } + + int64_t ifv; + if (cmd_getval(cmdmap, "if_version", ifv)) { + r.if_version = QuiesceSetVersion(ifv); + } + }); + + dout(20) << "Submitting a quiesce db request " << (set_id ? "for" : "without a") << " setid " << set_id.value_or("") << ", operation: " << ctx->request.op_string() << dendl; + int rc = quiesce_db_manager->submit_request(ctx); + if (rc != 0) { + bufferlist bl; + delete ctx; + on_finish(rc, "Error submitting the command to the local db manager", bl); + } +} + +void MDSRank::quiesce_cluster_update() { + QuiesceClusterMembership membership; + + mds_rank_t leader = 0; // MAYBE LATER: initialize this from the map + + membership.epoch = mdsmap->get_epoch(); + membership.leader = leader; + membership.me = whoami; + membership.fs_id = mdsmap->get_info(whoami).join_fscid; + membership.fs_name = mdsmap->get_fs_name(); + mdsmap->get_mds_set(membership.members); + + dout(5) << "epoch:" << membership.epoch << " leader:" << membership.leader << " members:" << membership.members << dendl; + + membership.send_ack = [=,this](QuiesceMap&& ack) { + if (whoami == leader) { + // loopback + quiesce_db_manager->submit_ack_from(whoami, std::move(ack)); + return 0; + } else { + // TODO: implement messaging + return -ENOTSUP; + } + }; + + membership.send_listing_to = [=](mds_rank_t to, QuiesceDbListing&& db) { + // TODO: implement messaging + return -ENOTSUP; + }; + + quiesce_db_manager->update_membership(membership); +} + +void MDSRank::quiesce_agent_setup() { + // TODO: replace this with a non-debug implementation + // Potentially, allow the debug interface under some runtime configuration + + ceph_assert(quiesce_db_manager); + + using RequestHandle = QuiesceInterface::RequestHandle; + using QuiescingRoot = std::pair; + auto quiesce_requests = std::make_shared>(); + + QuiesceAgent::ControlInterface ci; + + ci.submit_request = [this, quiesce_requests](QuiesceRoot root, Context* c) + -> std::optional { + auto uri = boost::urls::parse_uri_reference(root); + if (!uri) { + dout(5) << "error parsing the quiesce root as an URI: " << uri.error() << dendl; + c->complete(uri.error()); + return std::nullopt; + } else { + dout(20) << "parsed root '" << root <<"' as : " << uri->path() << " " << uri->query() << dendl; + } + + std::chrono::milliseconds quiesce_delay_ms = 0ms; + if (auto pit = uri->params().find("delayms"); pit != uri->params().end()) { + try { + quiesce_delay_ms = std::chrono::milliseconds((*pit).has_value ? std::stoul((*pit).value) : 1000); + } catch (...) { + dout(5) << "error parsing the time to quiesce for query: " << uri->query() << dendl; + c->complete(-EINVAL); + return std::nullopt; + } + } + std::optional dummy_quiesce_after; + if (auto pit = uri->params().find("q"); pit != uri->params().end()) { + try { + dummy_quiesce_after = (*pit).has_value ? std::stod((*pit).value) : 1 /*second*/; + } catch (...) { + dout(5) << "error parsing the time for debug quiesce for query: " << uri->query() << dendl; + c->complete(-EINVAL); + return std::nullopt; + } + } + + auto path = uri->path(); + dout(20) << "got request to quiesce '" << path << "'" << dendl; + + std::lock_guard l(mds_lock); + + if (!dummy_quiesce_after) { + // the real deal! + auto qc = new MDCache::C_MDS_QuiescePath(mdcache, c); + auto mdr = mdcache->quiesce_path(filepath(path), qc, nullptr, quiesce_delay_ms); + return mdr ? mdr->reqid : std::optional(); + } else { + /* dummy quiesce */ + // always create a new request id + auto req_id = metareqid_t(entity_name_t::MDS(whoami), issue_tid()); + auto [it, inserted] = quiesce_requests->try_emplace(path, req_id, c); + + if (!inserted) { + dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl; + // we must update the request id so that old one can't cancel this request. + it->second.first = req_id; + if (it->second.second) { + it->second.second->complete(-EINTR); + it->second.second = c; + } else { + // if we have no context, it means we've completed it + // since we weren't inserted, we must have successfully quiesced + c->complete(0); + } + } else { + // do quiesce if needed + + auto quiesce_task = new LambdaContext([quiesce_requests, req_id, this](int) { + // the mds lock should be held by the timer + dout(20) << "quiesce_task: callback by the timer" << dendl; + auto it = std::ranges::find(*quiesce_requests, req_id, [](auto x) { return x.second.first; }); + if (it != quiesce_requests->end() && it->second.second != nullptr) { + dout(20) << "quiesce_task: completing the root '" << it->first << "'" << dendl; + it->second.second->complete(0); + it->second.second = nullptr; + } + dout(20) << "quiesce_task: done" << dendl; + }); + + dout(20) << "scheduling a quiesce_task (" << quiesce_task + << ") to fire after " << *dummy_quiesce_after + << " seconds on timer " << &timer << dendl; + timer.add_event_after(*dummy_quiesce_after, quiesce_task); + } + return it->second.first; + } + }; + + ci.cancel_request = [this, quiesce_requests](RequestHandle h) { + std::lock_guard l(mds_lock); + + if (mdcache->have_request(h)) { + auto qimdr = mdcache->request_get(h); + mdcache->request_kill(qimdr); + return 0; + } + + auto it = std::ranges::find(*quiesce_requests, h, [](auto x) { return x.second.first; }); + if (it != quiesce_requests->end()) { + if (auto ctx = it->second.second; ctx) { + dout(20) << "canceling request with id '" << h << "' for root '" << it->first << "'" << dendl; + ctx->complete(-ECANCELED); + } + quiesce_requests->erase(it); + return 0; + } + + return ENOENT; + }; + + std::weak_ptr weak_db_manager = quiesce_db_manager; + ci.agent_ack = [weak_db_manager](QuiesceMap && update) { + if (auto manager = weak_db_manager.lock()) { + return manager->submit_agent_ack( std::move(update)); + } else { + return ENOENT; + } + }; + + quiesce_agent.reset(new QuiesceAgent(ci)); + + std::weak_ptr weak_agent = quiesce_agent; + quiesce_db_manager->reset_agent_callback([weak_agent](QuiesceMap& update) { + if (auto agent = weak_agent.lock()) { + return agent->db_update(update); + } else { + return false; + } + }); +}; -- 2.39.5