]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds/quiesce: QuiesceAgent implementation and unit tests
authorLeonid Usov <leonid.usov@ibm.com>
Sun, 26 Nov 2023 11:29:11 +0000 (13:29 +0200)
committerLeonid Usov <leonid.usov@ibm.com>
Thu, 14 Mar 2024 19:07:52 +0000 (15:07 -0400)
QuiesceAgent is the layer that converts updates from the QuiesceDb
into calls to the QuiesceProtocol APIs, and then sends async acks
back to the db manager following the quiesce protocol events.

Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
(cherry picked from commit 3de0882ad36d4f08f2f171b1ffd263da5a78f00f)

src/mds/QuiesceAgent.cc [new file with mode: 0644]
src/mds/QuiesceAgent.h
src/test/mds/CMakeLists.txt
src/test/mds/TestQuiesceAgent.cc [new file with mode: 0644]

diff --git a/src/mds/QuiesceAgent.cc b/src/mds/QuiesceAgent.cc
new file mode 100644 (file)
index 0000000..c155054
--- /dev/null
@@ -0,0 +1,255 @@
+#include "mds/QuiesceAgent.h"
+#include "common/debug.h"
+#include "include/ceph_assert.h"
+#include <future>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds_quiesce
+#undef dout_prefix
+#define dout_prefix *_dout << "quiesce.agt <" << __func__ << "> "
+
+#undef dout
+#define dout(lvl)                                                        \
+  do {                                                                   \
+    auto subsys = ceph_subsys_mds;                                       \
+    if ((dout_context)->_conf->subsys.should_gather(dout_subsys, lvl)) { \
+      subsys = dout_subsys;                                              \
+    }                                                                    \
+  dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix
+
+#undef dendl
+#define dendl \
+  dendl_impl; \
+  }           \
+  while (0)
+
+template <class CharT, class Traits>
+std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr)
+{
+  return os << "tracked_roots[armed: " << tr.armed << ", v: " << tr.db_version << " r: " << tr.roots.size() << "]";
+}
+
+bool QuiesceAgent::db_update(QuiesceMap& map)
+{
+  // copy of the current roots
+  TrackedRoots current_roots = tracked_roots();
+  TrackedRoots new_roots;
+
+  dout(20) << "got a db update version " << map.db_version << " with " << map.roots.size() << " roots" << dendl;
+
+  for (auto their_it = map.roots.begin(); their_it != map.roots.end();) {
+    auto &[root, info] = *their_it;
+    TrackedRootRef tracked_root_ref;
+
+    ceph_assert(info.state > QS__INVALID);
+
+    if (info.state >= QS__FAILURE) {
+      // we don't care about roots in failed states
+      dout(5) << "ignoring a root in a failed state: '" << root << "', " << info.state << dendl;
+      their_it = map.roots.erase(their_it);
+      continue;
+    }
+
+    if (const auto& my_it = current_roots.find(root); my_it != current_roots.end()) {
+      // keep the existing root
+      new_roots.insert(*my_it);
+      tracked_root_ref = my_it->second;
+    } else {
+      // introduce a new root
+      tracked_root_ref = std::make_shared<TrackedRoot>(info.state, info.ttl);
+      new_roots[root] = tracked_root_ref;
+    }
+
+    std::lock_guard l(*tracked_root_ref);
+    tracked_root_ref->update_committed(info);
+
+    auto actual_state = tracked_root_ref->get_actual_state();
+    
+    if (actual_state != info.state) {
+      // we have an update for the state
+      info.state = actual_state;
+      info.ttl = tracked_root_ref->get_ttl();
+    } else {
+      // our tracked root has the same state as the db
+      // we can just drop it from the response
+      their_it = map.roots.erase(their_it);
+      continue;
+    }
+    ++their_it;
+  }
+
+  // ack with the known state stored in `map`
+  set_pending_roots(map.db_version, std::move(new_roots));
+
+  // always send a synchronous ack
+  return true;
+}
+
+void* QuiesceAgent::agent_thread_main() {
+  working.clear();
+  std::unique_lock lock(agent_mutex);
+
+  while(!stop_agent_thread) {
+    if (pending.armed) {
+      working.roots.swap(pending.roots);
+      working.db_version = pending.db_version;
+    } else {
+      // copy current roots
+      working.roots = current.roots;
+      working.db_version = current.db_version;
+    }
+
+    dout(20)
+        << "current = " << current.db_version
+        << ", working = " << working.db_version
+        << ", pending = " << pending.db_version << dendl;
+
+    current.armed = false;
+    working.armed = true;
+
+    // it's safe to clear the pending roots under lock because it shouldn't
+    // ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
+    pending.clear();
+    lock.unlock();
+
+    QuiesceMap ack(working.db_version);
+  
+    // upkeep what we believe is the current state.
+    for (auto& [root, info] : working.roots) {
+
+      info->lock();
+      bool should_quiesce = info->should_quiesce();
+      bool issue_quiesce = should_quiesce && !info->quiesce_request && !info->quiesce_result;
+      std::optional<QuiesceInterface::RequestHandle> cancel_handle;
+      if (!should_quiesce && !info->cancel_result) {
+        cancel_handle = info->quiesce_request;
+      }
+      auto actual_state = info->get_actual_state();
+      if (info->committed_state != actual_state) {
+        ack.roots[root] = { actual_state, info->get_ttl() };
+      }
+      info->unlock();
+
+      if (issue_quiesce) {
+        std::weak_ptr<TrackedRoot> weak_info = info;
+        auto request_handle = quiesce_control.submit_request(root, new LambdaContext([weak_info, submitted_root = root, this](int rc) {
+          if (auto info = weak_info.lock()) {
+            dout(20) << "completing request (rc=" << rc << ") for '" << submitted_root << "'" << dendl;
+            info->lock();
+            info->quiesce_result = rc;
+            info->unlock();
+
+            // TODO: capturing QuiesceAgent& `this` is potentially dangerous
+            //       the assumption is that since the root pointer is weak
+            //       it will have been deleted by the QuiesceAgent shutdown sequence
+            set_upkeep_needed();
+          }
+          dout(20) << "done with submit callback for '" << submitted_root << "'" << dendl;
+        }));
+
+        dout(10) << "got request handle <" << request_handle << "> for '" << root << "'" << dendl;
+        info->lock();
+        info->quiesce_request = request_handle;
+        info->cancel = quiesce_control.cancel_request;
+        info->unlock();
+      } else if (cancel_handle) {
+        dout(10) << "Calling `cancel` on `" << root << "` with handle <" << *cancel_handle << ">" << dendl;
+        int rc = quiesce_control.cancel_request(*cancel_handle);
+        if (rc != 0) {
+          dout(1) << "ERROR (" << rc << ") when trying to cancel quiesce request id: " << *cancel_handle << dendl;
+        }
+        info->lock();
+        info->cancel_result = rc;
+        info->unlock();
+      }
+    }
+
+    lock.lock();
+
+    bool new_version = current.db_version < working.db_version;
+    current.roots.swap(working.roots);
+    current.db_version = working.db_version;
+
+    lock.unlock();
+
+    // clear the old roots and send the ack outside of the lock
+    working.roots.clear();
+    if (new_version || !ack.roots.empty()) {
+      dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
+      int rc = quiesce_control.agent_ack(std::move(ack));
+      if (rc != 0) {
+        dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
+      }
+    }
+    ack.reset();
+
+    lock.lock();
+
+    // notify that we're done working on this version and all acks (if any) were sent
+    working.clear();
+
+    // a new pending version could be set while we weren't locked
+    // if that's the case just go for another pass
+    // otherwise, wait for updates
+    if (!pending.armed && !current.armed && !stop_agent_thread) {
+      // for somebody waiting for the thread to idle
+      agent_cond.notify_all();
+      agent_cond.wait(lock);
+    }
+  }
+  agent_cond.notify_all();
+  return nullptr;
+}
+
+void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& new_roots)
+{
+  std::unique_lock l(agent_mutex);
+
+  auto actual_version = std::max(current.db_version, working.db_version);
+  bool rollback = actual_version > version;
+  
+  if (rollback) {
+    dout(5) << "version rollback to " << version 
+      << ". current = " << current.db_version 
+      << ", working = " << working.db_version 
+      << ", pending = " << pending.db_version << dendl;
+  }
+
+  // set the pending version unconditionally
+  pending.db_version = version;
+  pending.roots = std::move(new_roots);
+  pending.armed = true;
+
+  agent_cond.notify_all();
+}
+
+void QuiesceAgent::set_upkeep_needed()
+{
+  std::unique_lock l(agent_mutex);
+
+  dout(20)
+      << "current = " << current.db_version
+      << ", working = " << working.db_version
+      << ", pending = " << pending.db_version << dendl;
+
+  current.armed = true;
+  agent_cond.notify_all();
+}
+
+QuiesceAgent::TrackedRoot::~TrackedRoot()
+{
+  std::optional<QuiesceInterface::RequestHandle> request_handle;
+
+  lock();
+  request_handle.swap(quiesce_request);
+  bool should_cancel = !cancel_result.has_value();
+  unlock();
+
+  if (should_cancel && request_handle && cancel) {
+    dout(10) << "Calling `cancel` on an abandoned root with handle <" << request_handle << ">" << dendl;
+    cancel(*request_handle);
+  }
+
+  dout(20) << "done with request handle <" << request_handle << ">" << dendl;
+}
index 6a059dec3bc5b4c72075c4ed6c96390dba12e9a4..f5be435f2a2019a421beed1886828491fba19d16 100644 (file)
  *
  * This is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License db_version 2.1, as published by the Free Software
  * Foundation.  See file COPYING.
  *
  */
 #pragma once
-#include "QuiesceDb.h"
+#include "mds/QuiesceDb.h"
+#include <functional>
+#include <optional>
+#include <map>
+#include <mutex>
+#include <thread>
 
 class QuiesceAgent {
-  // subscribe to the QM map
-  // keeps runtime version of the map
-  // notifies the QM when runtime version is different from the last know requested version
-};
\ No newline at end of file
+  public:
+    struct ControlInterface {
+      QuiesceInterface::RequestSubmit submit_request;
+      QuiesceInterface::RequestCancel cancel_request;
+      QuiesceInterface::AgentAck agent_ack;
+      // TODO: do we need a "cancel all"?
+    };
+
+    QuiesceAgent(const ControlInterface& quiesce_control)
+        : quiesce_control(quiesce_control)
+        , stop_agent_thread(false)
+        , agent_thread(this) {
+      agent_thread.create("quiesce.agt");
+    };
+
+    ~QuiesceAgent() {
+      agent_thread.kill(SIGTERM);
+    }
+
+    /// @brief  WARNING: will reset syncrhonously
+    ///         this may call cancel on active roots
+    ///         which may lead to a deadlock if the MDS
+    ///         lock is being held when calling this.
+    ///         Consider `reset_async` if you're holding
+    ///         the MDS lock.
+    void reset() {
+      std::unique_lock l(agent_mutex);
+
+      // prevent any pending change
+      pending.clear();
+
+      // let the system settle
+      await_idle_locked(l);
+
+      // we are idle, hence the current holds
+      // our only tracked set
+      TrackedRoots current_roots = current.clear();
+
+      l.unlock();
+
+      // do this outside of the lock
+      current_roots.clear();
+    }
+
+    void reset_async() {
+      set_pending_roots({0, 0}, {});
+    }
+
+    void shutdown()
+    {
+      std::unique_lock l(agent_mutex);
+      stop_agent_thread = true;
+      agent_cond.notify_all();
+      l.unlock();
+      agent_thread.join();
+
+      current.clear();
+      pending.clear();
+    }
+
+    bool db_update(QuiesceMap& map);
+
+    struct TrackedRoot {
+        std::optional<QuiesceInterface::RequestHandle> quiesce_request;
+        // we could have hidden the request handle
+        // inside the cancel functor, but then we'd lose
+        // the ability to identify individual requests
+        // when looking at the tracked root.
+        QuiesceInterface::RequestCancel cancel; 
+        std::optional<int> quiesce_result;
+        std::optional<int> cancel_result;
+
+        QuiesceState committed_state;
+        QuiesceTimePoint expires_at;
+
+        TrackedRoot(QuiesceState state, QuiesceTimeInterval ttl)
+            : committed_state(state)
+            , expires_at(interval_saturate_add_now(ttl))
+            , busy_lock(false)
+        {
+        }
+
+        TrackedRoot() : TrackedRoot(QS__INVALID, QuiesceTimeInterval::zero()) {}
+
+        bool should_quiesce() const
+        {
+          return committed_state == QS_QUIESCING || committed_state == QS_QUIESCED;
+        }
+
+        bool should_release() const {
+          return committed_state == QS_RELEASING || committed_state == QS_RELEASED;
+        }
+
+        ~TrackedRoot();
+
+        void update_committed(QuiesceMap::RootInfo const & info) {
+          committed_state = info.state;
+          expires_at = interval_saturate_add_now(info.ttl);
+        }
+
+        QuiesceTimeInterval get_ttl() const
+        {
+          auto now = QuiesceClock::now();
+          if (expires_at.time_since_epoch() == QuiesceTimeInterval::max()) {
+            return QuiesceTimeInterval::max();
+          }
+          if (expires_at > now) {
+            return expires_at - now;
+          } else {
+            return QuiesceTimeInterval::zero();
+          }
+        }
+
+        QuiesceState get_actual_state() const {
+          QuiesceState result = QS_QUIESCING;
+          bool did_quiesce = quiesce_result == 0;
+          bool did_cancel = cancel_result == 0;
+          if (did_quiesce) {
+            if (cancel_result.has_value()) {
+              result = did_cancel ? QS_RELEASED : QS_EXPIRED;
+            } else {
+              result = QS_QUIESCED;
+            }
+          } else {
+            if (quiesce_result.has_value()) {
+              result = QS_FAILED;
+            } else if (should_release()) {
+              // we must have lost track of this root,
+              // probably, due to expiration. But even if due to an error,
+              // this is our best guess for the situation
+              result = QS_EXPIRED;
+            }
+          }
+          return result;
+        }
+
+        void lock() const {
+          while (busy_lock.test_and_set(std::memory_order_acquire))
+            ; // spin
+        }
+
+        void unlock() const {
+          busy_lock.clear(std::memory_order_release);
+        }
+      private:
+        mutable std::atomic_flag busy_lock;
+    };
+
+    using TrackedRootRef = std::shared_ptr<TrackedRoot>;
+
+    using TrackedRoots = std::unordered_map<QuiesceRoot, TrackedRootRef>;
+
+    TrackedRoots tracked_roots() {
+      std::lock_guard l(agent_mutex);
+      return current.roots;
+    }
+
+    TrackedRootRef get_tracked_root(QuiesceRoot root) {
+      std::lock_guard l(agent_mutex);
+      if (auto it = current.roots.find(root); it != current.roots.end()) {
+        return it->second;
+      } else {
+        return nullptr;
+      }
+    }
+
+    QuiesceDbVersion get_current_version() {
+      std::lock_guard l(agent_mutex);
+      return current.db_version;
+    }
+
+  protected:
+    ControlInterface quiesce_control;
+
+    struct TrackedRootsVersion {
+      TrackedRoots roots;
+      QuiesceDbVersion db_version = {0, 0};
+      bool armed = false;
+      TrackedRoots clear() {
+        armed = false;
+        db_version = {0, 0};
+        TrackedRoots old = std::move(roots);
+        roots.clear();
+        return old;
+      }
+    };
+
+    template <class CharT, class Traits>
+    friend std::basic_ostream<CharT, Traits>&
+    operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
+
+    TrackedRootsVersion current;
+    TrackedRootsVersion working;
+    TrackedRootsVersion pending;
+
+    std::mutex agent_mutex;
+    std::condition_variable agent_cond;
+    bool stop_agent_thread;
+  
+    template<class L>
+    QuiesceDbVersion await_idle_locked(L &lock) {
+      agent_cond.wait(lock, [this] {
+        return !(current.armed || working.armed || pending.armed);
+      });
+
+      return current.db_version;
+    }
+
+    void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
+
+    void set_upkeep_needed();
+
+    class AgentThread : public Thread {
+      public:
+          explicit AgentThread(QuiesceAgent* qa)
+              : qa(qa)
+          {
+          }
+          void* entry() override
+          {
+            return qa->agent_thread_main();
+          }
+
+      private:
+          QuiesceAgent* qa;
+    } agent_thread;
+
+    void* agent_thread_main();
+};
index 39b47bb2da4d056c5007d081525cb36662ce2b14..f80abe75083f1a12fcca8354e38b8c26674481a6 100644 (file)
@@ -24,3 +24,11 @@ add_executable(unittest_mds_quiesce_db
 add_ceph_unittest(unittest_mds_quiesce_db)
 target_link_libraries(unittest_mds_quiesce_db ceph-common global)
 
+# unittest_mds_quiesce_agent
+add_executable(unittest_mds_quiesce_agent
+  TestQuiesceAgent.cc
+  ../../../src/mds/QuiesceAgent.cc
+  $<TARGET_OBJECTS:unit-main>
+)
+add_ceph_unittest(unittest_mds_quiesce_agent)
+target_link_libraries(unittest_mds_quiesce_agent ceph-common global)
diff --git a/src/test/mds/TestQuiesceAgent.cc b/src/test/mds/TestQuiesceAgent.cc
new file mode 100644 (file)
index 0000000..d7d526a
--- /dev/null
@@ -0,0 +1,545 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, RedHat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+#include "common/Cond.h"
+#include "mds/QuiesceAgent.h"
+#include "gtest/gtest.h"
+#include <algorithm>
+#include <functional>
+#include <queue>
+#include <ranges>
+#include <system_error>
+#include <thread>
+#include <future>
+
+class QuiesceAgentTest : public testing::Test {
+  using RequestHandle = QuiesceInterface::RequestHandle;
+  using QuiescingRoot = std::pair<RequestHandle, Context*>;
+  protected:
+    template< class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>, class Function, class... Args >
+    static bool timed_run(D timeout, Function&& f, Args&&... args ) {
+      std::promise<void> done;
+      auto future = done.get_future();
+
+      auto job = std::bind(f, args...);
+
+      auto tt = std::thread([job=std::move(job)](std::promise<void> done) {
+        job();
+        done.set_value();
+      }, std::move(done));
+
+      tt.detach();
+
+      return future.wait_for(timeout) != std::future_status::timeout;
+    }
+
+    struct TestQuiesceAgent : public QuiesceAgent {
+      using QuiesceAgent::QuiesceAgent;
+      AgentThread& get_agent_thread() {
+        return agent_thread;
+      }
+
+      QuiesceDbVersion get_latest_version()
+      {
+        std::lock_guard l(agent_mutex);
+        return std::max({current.db_version, working.db_version, pending.db_version});
+      }
+      TrackedRoots& mutable_tracked_roots() {
+        return current.roots;
+      }
+
+      QuiesceDbVersion await_idle() {
+        std::unique_lock l(agent_mutex);
+        return await_idle_locked(l);
+      }
+    };
+    QuiesceMap latest_ack;
+    std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
+    ceph_tid_t last_tid;
+    std::mutex mutex;
+
+    std::unique_ptr<TestQuiesceAgent> agent;
+
+    bool complete_quiesce(QuiesceRoot root, int rc = 0) {
+      std::lock_guard l(mutex);
+      if (auto it = quiesce_requests.find(root); it != quiesce_requests.end()) {
+        if (it->second.second) {
+          it->second.second->complete(rc);
+          it->second.second = nullptr;
+          if (rc != 0) {
+            // there was an error, no need to keep this request anymore
+            quiesce_requests.erase(it);
+          }
+          return true;
+        }
+      }
+      return false;
+    }
+
+    void SetUp() override {
+      
+      QuiesceAgent::ControlInterface ci;
+      quiesce_requests.clear();
+
+      ci.submit_request = [this](QuiesceRoot r, Context* c) {
+        std::lock_guard l(mutex);
+
+        // always create a new request id
+        auto req_id = metareqid_t(entity_name_t::MDS(0), ++last_tid);
+
+        auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
+
+        if (!inserted) {
+          // we must update the request id so that old one can't cancel this request.
+          it->second.first = req_id;
+          if (it->second.second) {
+            it->second.second->complete(-EINTR);
+            it->second.second = c;
+          } else {
+            // if we have no context, it means we've completed it
+            // since we weren't inserted, we must have successfully quiesced
+            c->complete(0);
+          }
+        }
+
+        return it->second.first;
+      };
+      
+      ci.cancel_request = [this](RequestHandle h) {
+        std::lock_guard l(mutex);
+        
+        for (auto it = quiesce_requests.cbegin(); it != quiesce_requests.cend(); it++) {
+          if (it->second.first == h) {
+            if (it->second.second) {
+              it->second.second->complete(-ECANCELED);
+            }
+            quiesce_requests.erase(it);
+            return 0;
+          }
+        }
+
+        return ENOENT;
+      };
+
+      ci.agent_ack = [this](QuiesceMap const& update) {
+        std::lock_guard l(mutex);
+        latest_ack = update;
+        return 0;
+      };
+
+      agent = std::make_unique<TestQuiesceAgent>(ci);
+    }
+
+    void TearDown() override {
+      if (agent) {
+        agent->shutdown();
+        agent.reset();
+      }
+    }
+
+
+    using R = QuiesceMap::Roots::value_type;
+    using RootInitList = std::initializer_list<R>;
+
+    std::optional<QuiesceMap> update(QuiesceDbVersion v, RootInitList roots)
+    {
+      QuiesceMap map(v, QuiesceMap::Roots { roots });
+
+      if (agent->db_update(map)) {
+        return map;
+      }
+
+      return std::nullopt;
+    }
+
+    std::optional<QuiesceMap> update(QuiesceSetVersion v, RootInitList roots)
+    {
+      return update(QuiesceDbVersion { 1, v }, roots);
+    }
+
+    std::optional<QuiesceMap> update(RootInitList roots)
+    {
+      return update(agent->get_latest_version() + 1, roots);
+    }
+
+    template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
+    bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+    {
+      return timed_run(timeout, [this, version] {
+        while (version > agent->await_idle()) { };
+      });
+    }
+
+    template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
+    bool await_idle(D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+    {
+      return timed_run(timeout, [this] {
+        agent->await_idle();
+      });
+    }
+};
+
+TEST_F(QuiesceAgentTest, ThreadManagement) {
+  EXPECT_TRUE(agent->get_agent_thread().is_started());
+
+  EXPECT_TRUE(await_idle());
+
+  EXPECT_TRUE(update({ { "root1", QS_QUIESCING } }).has_value());
+
+  EXPECT_TRUE(await_idle());
+
+  EXPECT_TRUE(update({ { "root2", QS_QUIESCING } }).has_value());
+
+  agent->reset();
+
+  EXPECT_TRUE(await_idle());
+
+  EXPECT_TRUE(update({ { "root3", QS_QUIESCING } }).has_value());
+
+  // make sure that the agent thread completes in a timely fashion
+  EXPECT_TRUE(timed_run(std::chrono::seconds(1), [this] { agent->shutdown(); agent.reset(); }));
+}
+
+TEST_F(QuiesceAgentTest, DbUpdates) {
+  {
+    auto ack = update(1, { 
+      { "root0", QS_QUIESCING }, // this shouldn't be reported because its state isn't different from QUIESCING
+      { "root1", QS_QUIESCING }, // ditto
+      { "root2", QS_QUIESCED }, // this should be reported back as quiescing
+      { "root3", QS_RELEASING }, // this should be reported back as expired
+      { "root4", QS_RELEASED }, // this should be reported back as expired
+      { "root5", QS_EXPIRED }, // this should be ignored
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(1, ack->db_version);
+    EXPECT_EQ(3, ack->roots.size());
+    EXPECT_EQ(QS_QUIESCING, ack->roots.at("root2").state);
+    EXPECT_EQ(QS_EXPIRED, ack->roots.at("root3").state);
+    EXPECT_EQ(QS_EXPIRED, ack->roots.at("root4").state);
+    EXPECT_TRUE(await_idle());
+  }
+
+  EXPECT_EQ(1, agent->get_current_version());
+
+  {
+    auto roots = agent->tracked_roots();
+    EXPECT_EQ(5, roots.size());
+    EXPECT_EQ(QS_QUIESCING, roots.at("root0")->committed_state);
+    EXPECT_EQ(QS_QUIESCING, roots.at("root1")->committed_state);
+    EXPECT_EQ(QS_QUIESCED, roots.at("root2")->committed_state);
+    EXPECT_EQ(QS_RELEASING, roots.at("root3")->committed_state);
+    EXPECT_EQ(QS_RELEASED, roots.at("root4")->committed_state);
+  
+    // manipulate root0 and root1 as if they were quiesced and root2 as if it was released
+    auto& root0 = *roots.at("root0");
+    root0.quiesce_result = 0;
+    EXPECT_EQ(QS_QUIESCED, root0.get_actual_state());
+
+    auto& root1 = *roots.at("root1");
+    root1.quiesce_result = 0;
+    EXPECT_EQ(QS_QUIESCED, root1.get_actual_state());
+
+    auto& root2 = *roots.at("root2");
+    root2.quiesce_result = 0;
+    root2.cancel_result = 0;
+    EXPECT_EQ(QS_RELEASED, root2.get_actual_state());
+  }
+
+  {
+    auto ack = update(2, { 
+      { "root0", QS_RELEASING }, // this should be reported back as quiesced
+      { "root1", QS_QUIESCING }, // this should be reported back as quiesced
+      { "root2", QS_RELEASING }, // this should be reported back as released
+    });
+
+    EXPECT_EQ(2, ack->db_version);
+    EXPECT_EQ(3, ack->roots.size());
+    EXPECT_EQ(QS_QUIESCED, ack->roots.at("root0").state);
+    EXPECT_EQ(QS_QUIESCED, ack->roots.at("root1").state);
+    EXPECT_EQ(QS_RELEASED, ack->roots.at("root2").state);
+  }
+
+  EXPECT_TRUE(await_idle());
+  {
+    auto roots = agent->tracked_roots();
+    EXPECT_EQ(3, roots.size());
+    EXPECT_EQ(QS_RELEASING, roots.at("root0")->committed_state);
+    EXPECT_EQ(QS_QUIESCING, roots.at("root1")->committed_state);
+    EXPECT_EQ(QS_RELEASING, roots.at("root2")->committed_state);
+  }
+
+  {
+    // we should be able to set pending version to anything
+    // and the agent should follow, including rolling back to 0
+    auto ack = update({200, 0}, {});
+
+    EXPECT_TRUE(await_idle());
+    EXPECT_EQ(0, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+    EXPECT_EQ((QuiesceDbVersion {200, 0}), agent->get_current_version());
+  }
+}
+
+TEST_F(QuiesceAgentTest, QuiesceProtocol) {
+
+  {
+    auto ack = update(1, { 
+      { "root1", QS_QUIESCING },
+      { "root2", QS_QUIESCING },
+      { "root3", QS_QUIESCING },
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(1, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  {
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(3, tracked.size());
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root1")->committed_state);
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->committed_state);
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root3")->committed_state);
+
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root1")->get_actual_state());
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->get_actual_state());
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root3")->get_actual_state());
+
+    // we should have seen the quiesce requests for all roots
+    EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+    EXPECT_EQ(tracked.at("root2")->quiesce_request.value(), quiesce_requests.at("root2").first);
+    EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+  }
+
+  EXPECT_EQ(3, quiesce_requests.size());
+
+  // complete one root with success
+  EXPECT_TRUE(complete_quiesce("root1"));
+
+  EXPECT_TRUE(await_idle());
+  // we should have seen an ack sent
+  EXPECT_EQ(1, latest_ack.db_version);
+  EXPECT_EQ(1, latest_ack.roots.size());
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+
+  latest_ack.reset();
+
+  // complete the other root with failure
+  EXPECT_TRUE(complete_quiesce("root2", -1));
+
+  EXPECT_TRUE(await_idle());
+  EXPECT_EQ(1, latest_ack.db_version);
+  ASSERT_EQ(2, latest_ack.roots.size());
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+
+  latest_ack.reset();
+
+  // complete the third root with success
+  // complete one root with success
+  EXPECT_TRUE(complete_quiesce("root3"));
+
+  EXPECT_TRUE(await_idle());
+
+  // we should see the two quiesced roots in the ack,
+  EXPECT_EQ(1, latest_ack.db_version);
+  ASSERT_EQ(3, latest_ack.roots.size());
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
+
+  {
+    auto ack = update(2, {
+      { "root2", QS_QUIESCING },
+      { "root3", QS_RELEASING },
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(2, ack->db_version);
+    // this update doesn't have root1, so it should be untracked and cancelled
+    // root2 is still quiescing, no updates for it
+    // root3 is released asyncrhonously so for now it should be QUIESCED
+    ASSERT_EQ(2, ack->roots.size());
+    EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+    EXPECT_EQ(QS_QUIESCED, ack->roots.at("root3").state);
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  {
+    // make sure that root1 isn't tracked
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(2, agent->get_current_version());
+    ASSERT_EQ(2, tracked.size());
+    EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->committed_state);
+    EXPECT_EQ(QS_RELEASING, tracked.at("root3")->committed_state);
+  }
+
+  // we should have also seen cancelations for root1 and root3.
+  // We observe this by missing them from the quiesce_requests
+  // NB: root2 shouldn't be part of requests either since it was completed with failure
+  EXPECT_EQ(0, quiesce_requests.size());
+}
+
+TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) {
+  {
+    auto ack = update(1, { 
+      { "root1", QS_QUIESCING },
+      { "root2", QS_QUIESCING },
+      { "root3", QS_QUIESCING },
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(1, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  QuiesceAgent::TrackedRootRef pinned1, pinned2;
+
+  {
+    auto tracked = agent->tracked_roots();
+    ASSERT_EQ(3, tracked.size());
+    EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+    EXPECT_EQ(tracked.at("root2")->quiesce_request.value(), quiesce_requests.at("root2").first);
+    EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+
+    // copying the shared ref will keep the object alive
+    pinned1 = tracked.at("root1");
+    pinned2 = tracked.at("root2");
+  }
+
+  // root 1 should be quiesced now
+  EXPECT_TRUE(complete_quiesce("root1"));
+
+  EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+  EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
+
+  // imagine that we lost our root records for a moment
+  {
+    auto ack = update(2, {
+      { "root3", QS_QUIESCING },
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(2, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  {
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(1, tracked.size());
+    EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+  }
+
+  // since we have those pinned, they should still be live
+
+  EXPECT_TRUE(pinned1.unique());
+  EXPECT_TRUE(pinned2.unique());
+
+  EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+  EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
+
+  EXPECT_TRUE(quiesce_requests.contains("root1"));
+  EXPECT_TRUE(quiesce_requests.contains("root2"));
+
+  latest_ack.reset();
+  // now, bring the roots back
+  {
+    auto ack = update(3, { 
+      { "root1", QS_QUIESCING },
+      { "root2", QS_QUIESCING },
+      { "root3", QS_QUIESCING },
+    });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(3, ack->db_version);
+    // even though root1 is already quiesced,
+    // we should not know about it synchronously
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  // now we should have seen the ack with root2 quiesced
+  EXPECT_EQ(3, latest_ack.db_version);
+  EXPECT_EQ(1, latest_ack.roots.size());
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+
+  // the actual state of the pinned objects shouldn't have changed
+  EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+  EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
+
+  EXPECT_EQ(0, *pinned1->quiesce_result);
+  EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
+
+  // releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
+  pinned1.reset();
+  pinned2.reset();
+
+  EXPECT_TRUE(quiesce_requests.contains("root1"));
+  EXPECT_TRUE(quiesce_requests.contains("root2"));
+
+  EXPECT_TRUE(complete_quiesce("root2"));
+
+  EXPECT_TRUE(await_idle());
+  EXPECT_EQ(3, latest_ack.db_version);
+  EXPECT_EQ(2, latest_ack.roots.size());
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
+}
+
+TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
+{
+  {
+    auto ack = update(1, {
+                             { "root1", QS_QUIESCING },
+                         });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(1, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  // QuiesceAgent::TrackedRootRef pinned1, pinned2;
+
+  {
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(1, tracked.size());
+    EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+  }
+
+  // with a new update we got our root 1 timedout (this is the same as not listing it at all)
+  {
+    auto ack = update(2, {
+                             { "root1", QS_TIMEDOUT },
+                         });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(2, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle());
+
+  {
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(0, tracked.size());
+  }
+}