]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds/quiesce: MDSRankQuiesce - integration of the quiesce db manager
authorLeonid Usov <leonid.usov@ibm.com>
Thu, 30 Nov 2023 14:42:22 +0000 (16:42 +0200)
committerLeonid Usov <leonid.usov@ibm.com>
Thu, 14 Mar 2024 19:07:52 +0000 (15:07 -0400)
* create an instance of the QuiesceDbManager in the rank
* update membership with a new mdsmap
* add an admin socket command for sending requests to the manager

Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
(cherry picked from commit edf4bce948477ebd57eaeb9eecdd4beae6a2c6a7)

src/mds/CMakeLists.txt
src/mds/MDCache.h
src/mds/MDSDaemon.cc
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/MDSRankQuiesce.cc [new file with mode: 0644]

index 88c8a1db0854aaabeee0c56bc160511da0cb8532..0c6c31a3c51a090add184350ca327d4df9b16010 100644 (file)
@@ -42,6 +42,10 @@ set(mds_srcs
   MDSPinger.cc
   MetricAggregator.cc
   MetricsHandler.cc
+  QuiesceDbManager.cc
+  QuiesceAgent.cc
+  MDSRankQuiesce.cc
+  BoostUrlImpl.cc
   ${CMAKE_SOURCE_DIR}/src/common/TrackedOp.cc
   ${CMAKE_SOURCE_DIR}/src/common/MemoryModel.cc
   ${CMAKE_SOURCE_DIR}/src/osdc/Journaler.cc
index 94347b8249cc361514b3187a95ac7c3209f443ef..30e004b97a6eaddcf0be5b8fa6b165b35c310321 100644 (file)
@@ -15,6 +15,7 @@
 #define CEPH_MDCACHE_H
 
 #include <atomic>
+#include <chrono>
 #include <string_view>
 #include <thread>
 
@@ -137,6 +138,8 @@ static const int PREDIRTY_PRIMARY = 1; // primary dn, adjust nested accounting
 static const int PREDIRTY_DIR = 2;     // update parent dir mtime/size
 static const int PREDIRTY_SHALLOW = 4; // only go to immediate parent (for easier rollback)
 
+using namespace std::literals::chrono_literals;
+
 class MDCache {
  public:
   typedef std::map<mds_rank_t, ref_t<MCacheExpire>> expiremap;
@@ -525,6 +528,73 @@ class MDCache {
                               std::map<dirfrag_t,std::vector<dirfrag_t> >& subtrees);
   ESubtreeMap *create_subtree_map();
 
+  class QuiesceStatistics {
+public:
+    void inc_inodes() {
+      inodes++;
+    }
+    void inc_inodes_quiesced() {
+      inodes_quiesced++;
+    }
+    uint64_t get_inodes() const {
+      return inodes;
+    }
+    uint64_t get_inodes_quiesced() const {
+      return inodes_quiesced;
+    }
+    void add_failed(const MDRequestRef& mdr, int rc) {
+      failed[mdr] = rc;
+    }
+    int get_failed(const MDRequestRef& mdr) const {
+      auto it = failed.find(mdr);
+      return it == failed.end() ? 0 : it->second;
+    }
+    const auto& get_failed() const {
+      return failed;
+    }
+    void dump(Formatter* f) const {
+      f->dump_unsigned("inodes", inodes);
+      f->dump_unsigned("inodes_quiesced", inodes_quiesced);
+      f->open_array_section("failed");
+      for (auto& [mdr, rc] : failed) {
+        f->open_object_section("failure");
+        f->dump_object("request", *mdr);
+        f->dump_int("result", rc);
+        f->close_section();
+      }
+      f->close_section();
+    }
+private:
+    uint64_t inodes = 0;
+    uint64_t inodes_quiesced = 0;
+    std::map<MDRequestRef, int> failed;
+  };
+  class C_MDS_QuiescePath : public MDSInternalContext {
+  public:
+    C_MDS_QuiescePath(MDCache *c, Context* _finisher=nullptr) :
+      MDSInternalContext(c->mds), cache(c), finisher(_finisher) {}
+    ~C_MDS_QuiescePath() {
+      if (finisher) {
+        finisher->complete(-ECANCELED);
+        finisher = nullptr;
+      }
+    }
+    void set_req(const MDRequestRef& _mdr) {
+      mdr = _mdr;
+    }
+    void finish(int r) override {
+      if (finisher) {
+        finisher->complete(r);
+        finisher = nullptr;
+      }
+    }
+    QuiesceStatistics qs;
+    MDCache *cache;
+    MDRequestRef mdr;
+    Context* finisher = nullptr;
+  };
+  MDRequestRef quiesce_path(filepath p, C_MDS_QuiescePath* c, Formatter *f = nullptr, std::chrono::milliseconds delay = 0ms) { c->complete(-ENOTSUP); return nullptr; }
+
   void clean_open_file_lists();
   void dump_openfiles(Formatter *f);
   bool dump_inode(Formatter *f, uint64_t number);
@@ -1366,6 +1436,9 @@ class MDCache {
   void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op);
   void rollback_uncommitted_fragment(dirfrag_t basedirfrag, frag_vec_t&& old_frags);
 
+  void dispatch_quiesce_path(const MDRequestRef& mdr) { }
+  void dispatch_quiesce_inode(const MDRequestRef& mdr) { }
+
   void upkeep_main(void);
 
   uint64_t cache_memory_limit;
index dc9ea99e6c73365f176922fba97f36b391fc0489..e69bacf8f49d7509177bb6927bd2bc5acd60f558 100644 (file)
@@ -458,6 +458,25 @@ void MDSDaemon::set_up_admin_socket()
                                     asok_hook,
                                     "Respawn this MDS");
   ceph_assert(r == 0);
+  r = admin_socket->register_command("quiesce db "
+                                     "name=roots,type=CephString,n=N,req=false "
+                                     "-- "
+                                     "name=set_id,type=CephString,req=false "
+                                     "name=timeout,type=CephFloat,range=0,req=false "
+                                     "name=expiration,type=CephFloat,range=0,req=false "
+                                     "name=await_for,type=CephFloat,range=0,req=false "
+                                     "name=await,type=CephBool,req=false "
+                                     "name=if_version,type=CephInt,range=0,req=false "
+                                     "name=include,type=CephBool,req=false "
+                                     "name=exclude,type=CephBool,req=false "
+                                     "name=reset,type=CephBool,req=false "
+                                     "name=release,type=CephBool,req=false "
+                                     "name=query,type=CephBool,req=false "
+                                     "name=all,type=CephBool,req=false "
+                                     "name=cancel,type=CephBool,req=false",
+      asok_hook,
+      "submit queries to the local QuiesceDbManager");
+  ceph_assert(r == 0);
   r = admin_socket->register_command(
     "heap " \
     "name=heapcmd,type=CephChoices,strings="                           \
index 9cedeea1b1360798a0e1e77f4129c19f8cf5252a..7d3f9cda9a226ce8d484f2795a24711763543544 100644 (file)
@@ -18,6 +18,7 @@
 #include "common/errno.h"
 #include "common/likely.h"
 #include "common/async/blocked_completion.h"
+#include "common/cmdparse.h"
 
 #include "messages/MClientRequestForward.h"
 #include "messages/MMDSLoadTargets.h"
 #include "events/ELid.h"
 #include "Mutation.h"
 
-
 #include "MDSRank.h"
 
+#include "QuiesceDbManager.h"
+#include "QuiesceAgent.h"
+
+#include <cmath>
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds
 #undef dout_prefix
@@ -548,6 +553,8 @@ MDSRank::MDSRank(
   server = new Server(this, &metrics_handler);
   locker = new Locker(this, mdcache);
 
+  quiesce_db_manager.reset(new QuiesceDbManager());
+
   _heartbeat_reset_grace = g_conf().get_val<uint64_t>("mds_heartbeat_reset_grace");
   heartbeat_grace = g_conf().get_val<double>("mds_heartbeat_grace");
   op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
@@ -859,12 +866,21 @@ void MDSRankDispatcher::shutdown()
 
   progress_thread.shutdown();
 
+  if (quiesce_db_manager) {
+    // shutdown the manager
+    quiesce_db_manager->update_membership({});
+  }
+
   // release mds_lock for finisher/messenger threads (e.g.
   // MDSDaemon::ms_handle_reset called from Messenger).
   mds_lock.unlock();
 
   // shut down messenger
   messenger->shutdown();
+  if (quiesce_agent) {
+    // reset any tracked roots
+    quiesce_agent->shutdown();
+  }
 
   mds_lock.lock();
 
@@ -2131,6 +2147,8 @@ void MDSRank::active_start()
   mdcache->reissue_all_caps();
 
   finish_contexts(g_ceph_context, waiting_for_active);  // kick waiters
+
+  quiesce_agent_setup();
 }
 
 void MDSRank::recovery_done(int oldstate)
@@ -2588,6 +2606,8 @@ void MDSRankDispatcher::handle_mds_map(
     metric_aggregator->notify_mdsmap(*mdsmap);
   }
   metrics_handler.notify_mdsmap(*mdsmap);
+
+  quiesce_cluster_update();
 }
 
 void MDSRank::handle_mds_recovery(mds_rank_t who)
@@ -2935,6 +2955,9 @@ void MDSRankDispatcher::handle_asok_command(
       goto out;
     }
     damage_table.erase(id);
+  } else if (command == "quiesce db") {
+    command_quiesce_db(cmdmap, on_finish);
+    return;
   } else {
     r = -CEPHFS_ENOSYS;
   }
index 6ee2e093478ef613f42c75b09cdf8cbb3ab8cd86..02512be7916b584b11eb43242a6d5b60d7c94162 100644 (file)
@@ -151,6 +151,8 @@ class MgrClient;
 class Finisher;
 class ScrubStack;
 class C_ExecAndReply;
+class QuiesceDbManager;
+class QuiesceAgent;
 
 /**
  * The public part of this class's interface is what's exposed to all
@@ -433,6 +435,9 @@ class MDSRank {
 
     bool cluster_degraded = false;
 
+    std::shared_ptr<QuiesceDbManager> quiesce_db_manager;
+    std::shared_ptr<QuiesceAgent> quiesce_agent;
+
     Finisher *finisher;
   protected:
     typedef enum {
@@ -524,6 +529,7 @@ class MDSRank {
     void command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss);
     void command_dump_dir(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss);
     void command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish);
+    void command_quiesce_db(const cmdmap_t& cmdmap, std::function<void(int, const std::string&, bufferlist&)> on_finish);
 
     // FIXME the state machine logic should be separable from the dispatch
     // logic that calls it.
@@ -562,6 +568,9 @@ class MDSRank {
     void handle_mds_recovery(mds_rank_t who);
     void handle_mds_failure(mds_rank_t who);
 
+    void quiesce_cluster_update();
+    void quiesce_agent_setup();
+
     /* Update MDSMap export_targets for this rank. Called on ::tick(). */
     void update_targets();
 
diff --git a/src/mds/MDSRankQuiesce.cc b/src/mds/MDSRankQuiesce.cc
new file mode 100644 (file)
index 0000000..49e6f39
--- /dev/null
@@ -0,0 +1,385 @@
+#include "MDSRank.h"
+#include "MDCache.h"
+
+#include "QuiesceDbManager.h"
+#include "QuiesceAgent.h"
+#include <boost/url.hpp>
+#include <chrono>
+#include <ranges>
+#include <algorithm>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds_quiesce
+#undef dout_prefix
+#define dout_prefix *_dout << "quiesce.mds." << whoami << " <" << __func__ << "> "
+
+#undef dout
+#define dout(lvl)                                                        \
+  do {                                                                   \
+    auto subsys = ceph_subsys_mds;                                       \
+    if ((dout_context)->_conf->subsys.should_gather(dout_subsys, lvl)) { \
+      subsys = dout_subsys;                                              \
+    }                                                                    \
+  dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix
+
+#undef dendl
+#define dendl \
+  dendl_impl; \
+  }           \
+  while (0)
+
+void MDSRank::command_quiesce_db(const cmdmap_t& cmdmap, std::function<void(int, const std::string&, bufferlist&)> on_finish)
+{
+  // validate the command:
+  using ceph::common::cmd_getval;
+  using ceph::common::cmd_getval_or;
+  using std::chrono::duration_cast;
+  using dd = std::chrono::duration<double>;
+
+  bool op_include = cmd_getval_or<bool>(cmdmap, "include", false);
+  bool op_query = cmd_getval_or<bool>(cmdmap, "query", false);
+  bool op_exclude = cmd_getval_or<bool>(cmdmap, "exclude", false);
+  bool op_reset = cmd_getval_or<bool>(cmdmap, "reset", false);
+  bool op_release = cmd_getval_or<bool>(cmdmap, "release", false);
+  bool op_cancel = cmd_getval_or<bool>(cmdmap, "cancel", false);
+  bool all = cmd_getval_or<bool>(cmdmap, "all", false);
+  std::optional<std::string> set_id = cmd_getval<std::string>(cmdmap, "set_id");
+
+  auto roots = cmd_getval_or<std::vector<std::string>>(cmdmap, "roots", std::vector<std::string> {});
+
+  int all_ops = op_include + op_exclude + op_reset + op_release + op_cancel + op_query;
+
+  if (all_ops > 1) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operations [include, exclude, reset, release, cancel, query] are mutually exclusive", bl);
+    return;
+  } else if (all_ops == 0) {
+    op_include = true;
+  }
+
+  if ((op_release || op_cancel) && roots.size() > 0) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operations [release, cancel] don't take roots", bl);
+    return;
+  }
+
+  if (op_cancel && !set_id && !all) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operation `cancel` requires a `--set-id` or `--all` to cancel all active sets", bl);
+    return;
+  }
+
+  if (op_reset && !set_id) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operation `reset` requires a `--set-id`", bl);
+    return;
+  }
+
+  if (op_reset && roots.empty()) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operation `reset` expects at least one root", bl);
+    return;
+  }
+
+  if (op_query && roots.size() > 0) {
+    bufferlist bl;
+    on_finish(-EINVAL, "Operation `query` doesn't take any roots", bl);
+    return;
+  }
+
+  if (!quiesce_db_manager) {
+    bufferlist bl;
+    on_finish(-EFAULT, "No quiesce db manager instance", bl);
+    return;
+  }
+
+  struct Ctx : public QuiesceDbManager::RequestContext {
+    std::function<void(int, const std::string&, bufferlist&)> on_finish;
+    bool all = false;
+
+    double sec(QuiesceTimeInterval duration) {
+      return duration_cast<dd>(duration).count();
+    }
+
+    double age(QuiesceTimeInterval of, QuiesceTimeInterval ref) {
+      return sec(ref - of);
+    }
+
+    double age(QuiesceTimeInterval of = QuiesceTimeInterval::zero()) {
+      return age(of, response.db_age);
+    }
+
+    void finish(int rc)
+    {
+      auto f = Formatter::create_unique("json-pretty");
+      CachedStackStringStream css;
+      bufferlist outbl;
+
+      auto dump_seconds = [&f](const std::string_view& name, double seconds) {
+        f->dump_format_unquoted(name, "%0.1f", seconds);
+      };
+
+      f->open_object_section("response"); {
+        f->dump_int("epoch", response.db_version.epoch);
+        f->dump_int("set_version", response.db_version.set_version);
+        f->open_object_section("sets"); {
+          for (auto&& [set_id, set] : response.sets) {
+            if (!all && !set.is_active() && set_id != request.set_id) {
+              continue;
+            }
+            f->open_object_section(set_id); {
+              f->dump_int("version", set.version);
+              QuiesceTimeInterval ref = response.db_age;
+              if (!set.is_active()) {
+                ref = set.rstate.at_age;
+              }
+              dump_seconds("age_ref", age(ref));
+              f->open_object_section("state"); {
+                f->dump_string("name", quiesce_state_name(set.rstate.state));
+                dump_seconds("age", age(set.rstate.at_age, ref));
+              } f->close_section();
+              dump_seconds("timeout", sec(set.timeout));
+              dump_seconds("expiration", sec(set.expiration));
+              f->open_object_section("members"); {
+                for (auto&& [root, info] : set.members) {
+                  f->open_object_section(root); {
+                    f->dump_bool("excluded", info.excluded);
+                    f->open_object_section("state"); {
+                      f->dump_string("name", quiesce_state_name(info.rstate.state));
+                      dump_seconds("age", age(info.rstate.at_age, ref));
+                    } f->close_section();
+                  } f->close_section();
+                }
+              } f->close_section();
+            } f->close_section();
+          }
+        } f->close_section();
+      } f->close_section();
+
+      f->flush(outbl);
+      on_finish(rc, css->str(), outbl);
+    }
+  };
+
+  auto* ctx = new Ctx();
+
+  ctx->on_finish = std::move(on_finish);
+  ctx->all = all;
+
+  ctx->request.reset([&](auto& r) {
+    r.set_id = set_id;
+
+    if (op_include) {
+      r.include_roots(roots);
+    } else if (op_exclude) {
+      r.exclude_roots(roots);
+    } else if (op_reset) {
+      r.reset_roots(roots);
+    } else if (op_release) {
+      r.release_roots();
+    } else if (op_cancel) {
+      r.cancel_roots();
+    }
+
+    double timeout;
+
+    if (cmd_getval(cmdmap, "await_for", timeout)) {
+      r.await = duration_cast<QuiesceTimeInterval>(dd(timeout));
+    } else if (cmd_getval_or<bool>(cmdmap, "await", false)) {
+      r.await = QuiesceTimeInterval::max();
+    }
+
+    if (cmd_getval(cmdmap, "expiration", timeout)) {
+      r.expiration = duration_cast<QuiesceTimeInterval>(dd(timeout));
+    }
+
+    if (cmd_getval(cmdmap, "timeout", timeout)) {
+      r.timeout = duration_cast<QuiesceTimeInterval>(dd(timeout));
+    }
+
+    int64_t ifv;
+    if (cmd_getval(cmdmap, "if_version", ifv)) {
+      r.if_version = QuiesceSetVersion(ifv);
+    }
+  });
+
+  dout(20) << "Submitting a quiesce db request " << (set_id ? "for" : "without a") << " setid " << set_id.value_or("") << ", operation: " << ctx->request.op_string() << dendl;
+  int rc = quiesce_db_manager->submit_request(ctx);
+  if (rc != 0) {
+    bufferlist bl;
+    delete ctx;
+    on_finish(rc, "Error submitting the command to the local db manager", bl);
+  }
+}
+
+void MDSRank::quiesce_cluster_update() {
+  QuiesceClusterMembership membership;
+
+  mds_rank_t leader = 0; // MAYBE LATER: initialize this from the map
+
+  membership.epoch = mdsmap->get_epoch();
+  membership.leader = leader;
+  membership.me = whoami;
+  membership.fs_id = mdsmap->get_info(whoami).join_fscid;
+  membership.fs_name = mdsmap->get_fs_name();
+  mdsmap->get_mds_set(membership.members);
+
+  dout(5) << "epoch:" << membership.epoch << " leader:" << membership.leader << " members:" << membership.members << dendl;
+
+  membership.send_ack = [=,this](QuiesceMap&& ack) {
+    if (whoami == leader) {
+      // loopback
+      quiesce_db_manager->submit_ack_from(whoami, std::move(ack));
+      return 0;
+    } else {
+      // TODO: implement messaging
+      return -ENOTSUP;
+    }
+  };
+
+  membership.send_listing_to = [=](mds_rank_t to, QuiesceDbListing&& db) {
+    // TODO: implement messaging
+    return -ENOTSUP;
+  };
+
+  quiesce_db_manager->update_membership(membership);
+}
+
+void MDSRank::quiesce_agent_setup() {
+  // TODO: replace this with a non-debug implementation
+  //       Potentially, allow the debug interface under some runtime configuration
+
+  ceph_assert(quiesce_db_manager);
+
+  using RequestHandle = QuiesceInterface::RequestHandle;
+  using QuiescingRoot = std::pair<RequestHandle, Context*>;
+  auto quiesce_requests = std::make_shared<std::unordered_map<QuiesceRoot, QuiescingRoot>>();
+
+  QuiesceAgent::ControlInterface ci;
+
+  ci.submit_request = [this, quiesce_requests](QuiesceRoot root, Context* c)
+      -> std::optional<RequestHandle> {
+    auto uri = boost::urls::parse_uri_reference(root);
+    if (!uri) {
+      dout(5) << "error parsing the quiesce root as an URI: " << uri.error() << dendl;
+      c->complete(uri.error());
+      return std::nullopt;
+    } else {
+      dout(20) << "parsed root '" << root <<"' as : " << uri->path() << " " << uri->query() << dendl;
+    }
+
+    std::chrono::milliseconds quiesce_delay_ms = 0ms;
+    if (auto pit = uri->params().find("delayms"); pit != uri->params().end()) {
+      try {
+        quiesce_delay_ms = std::chrono::milliseconds((*pit).has_value ? std::stoul((*pit).value) : 1000);
+      } catch (...) {
+        dout(5) << "error parsing the time to quiesce for query: " << uri->query() << dendl;
+        c->complete(-EINVAL);
+        return std::nullopt;
+      }
+    }
+    std::optional<double> dummy_quiesce_after;
+    if (auto pit = uri->params().find("q"); pit != uri->params().end()) {
+      try {
+        dummy_quiesce_after = (*pit).has_value ? std::stod((*pit).value) : 1 /*second*/;
+      } catch (...) {
+        dout(5) << "error parsing the time for debug quiesce for query: " << uri->query() << dendl;
+        c->complete(-EINVAL);
+        return std::nullopt;
+      }
+    }
+
+    auto path = uri->path();
+    dout(20) << "got request to quiesce '" << path << "'" << dendl;
+
+    std::lock_guard l(mds_lock);
+
+    if (!dummy_quiesce_after) {
+      // the real deal!
+      auto qc = new MDCache::C_MDS_QuiescePath(mdcache, c);
+      auto mdr = mdcache->quiesce_path(filepath(path), qc, nullptr, quiesce_delay_ms);
+      return mdr ? mdr->reqid : std::optional<RequestHandle>();
+    } else {
+      /* dummy quiesce */
+      // always create a new request id
+      auto req_id = metareqid_t(entity_name_t::MDS(whoami), issue_tid());
+      auto [it, inserted] = quiesce_requests->try_emplace(path, req_id, c);
+
+      if (!inserted) {
+        dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl;
+        // we must update the request id so that old one can't cancel this request.
+        it->second.first = req_id;
+        if (it->second.second) {
+          it->second.second->complete(-EINTR);
+          it->second.second = c;
+        } else {
+          // if we have no context, it means we've completed it
+          // since we weren't inserted, we must have successfully quiesced
+          c->complete(0);
+        }
+      } else {
+        // do quiesce if needed
+
+        auto quiesce_task = new LambdaContext([quiesce_requests, req_id, this](int) {
+          // the mds lock should be held by the timer
+          dout(20) << "quiesce_task: callback by the timer" << dendl;
+          auto it = std::ranges::find(*quiesce_requests, req_id, [](auto x) { return x.second.first; });
+          if (it != quiesce_requests->end() && it->second.second != nullptr) {
+            dout(20) << "quiesce_task: completing the root '" << it->first << "'" << dendl;
+            it->second.second->complete(0);
+            it->second.second = nullptr;
+          }
+          dout(20) << "quiesce_task: done" << dendl;
+        });
+
+        dout(20) << "scheduling a quiesce_task (" << quiesce_task
+                 << ") to fire after " << *dummy_quiesce_after
+                 << " seconds on timer " << &timer << dendl;
+        timer.add_event_after(*dummy_quiesce_after, quiesce_task);
+      }
+      return it->second.first;
+    }
+  };
+
+  ci.cancel_request = [this, quiesce_requests](RequestHandle h) {
+    std::lock_guard l(mds_lock);
+
+    if (mdcache->have_request(h)) {
+      auto qimdr = mdcache->request_get(h);
+      mdcache->request_kill(qimdr);
+      return 0;
+    }
+
+    auto it = std::ranges::find(*quiesce_requests, h, [](auto x) { return x.second.first; });
+    if (it != quiesce_requests->end()) {
+      if (auto ctx = it->second.second; ctx) {
+        dout(20) << "canceling request with id '" << h << "' for root '" << it->first << "'" << dendl;
+        ctx->complete(-ECANCELED);
+      }
+      quiesce_requests->erase(it);
+      return 0;
+    }
+
+    return ENOENT;
+  };
+
+  std::weak_ptr<QuiesceDbManager> weak_db_manager = quiesce_db_manager;
+  ci.agent_ack = [weak_db_manager](QuiesceMap && update) {
+    if (auto manager = weak_db_manager.lock()) {
+      return manager->submit_agent_ack( std::move(update));
+    } else {
+      return ENOENT;
+    }
+  };
+
+  quiesce_agent.reset(new QuiesceAgent(ci));
+
+  std::weak_ptr<QuiesceAgent> weak_agent = quiesce_agent;
+  quiesce_db_manager->reset_agent_callback([weak_agent](QuiesceMap& update) {
+    if (auto agent = weak_agent.lock()) {
+      return agent->db_update(update);
+    } else {
+      return false;
+    }
+  });
+};