--- /dev/null
+#include "mds/QuiesceAgent.h"
+#include "common/debug.h"
+#include "include/ceph_assert.h"
+#include <future>
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds_quiesce
+#undef dout_prefix
+#define dout_prefix *_dout << "quiesce.agt <" << __func__ << "> "
+
+#undef dout
+#define dout(lvl) \
+ do { \
+ auto subsys = ceph_subsys_mds; \
+ if ((dout_context)->_conf->subsys.should_gather(dout_subsys, lvl)) { \
+ subsys = dout_subsys; \
+ } \
+ dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix
+
+#undef dendl
+#define dendl \
+ dendl_impl; \
+ } \
+ while (0)
+
+template <class CharT, class Traits>
+std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr)
+{
+ return os << "tracked_roots[armed: " << tr.armed << ", v: " << tr.db_version << " r: " << tr.roots.size() << "]";
+}
+
+bool QuiesceAgent::db_update(QuiesceMap& map)
+{
+ // copy of the current roots
+ TrackedRoots current_roots = tracked_roots();
+ TrackedRoots new_roots;
+
+ dout(20) << "got a db update version " << map.db_version << " with " << map.roots.size() << " roots" << dendl;
+
+ for (auto their_it = map.roots.begin(); their_it != map.roots.end();) {
+ auto &[root, info] = *their_it;
+ TrackedRootRef tracked_root_ref;
+
+ ceph_assert(info.state > QS__INVALID);
+
+ if (info.state >= QS__FAILURE) {
+ // we don't care about roots in failed states
+ dout(5) << "ignoring a root in a failed state: '" << root << "', " << info.state << dendl;
+ their_it = map.roots.erase(their_it);
+ continue;
+ }
+
+ if (const auto& my_it = current_roots.find(root); my_it != current_roots.end()) {
+ // keep the existing root
+ new_roots.insert(*my_it);
+ tracked_root_ref = my_it->second;
+ } else {
+ // introduce a new root
+ tracked_root_ref = std::make_shared<TrackedRoot>(info.state, info.ttl);
+ new_roots[root] = tracked_root_ref;
+ }
+
+ std::lock_guard l(*tracked_root_ref);
+ tracked_root_ref->update_committed(info);
+
+ auto actual_state = tracked_root_ref->get_actual_state();
+
+ if (actual_state != info.state) {
+ // we have an update for the state
+ info.state = actual_state;
+ info.ttl = tracked_root_ref->get_ttl();
+ } else {
+ // our tracked root has the same state as the db
+ // we can just drop it from the response
+ their_it = map.roots.erase(their_it);
+ continue;
+ }
+ ++their_it;
+ }
+
+ // ack with the known state stored in `map`
+ set_pending_roots(map.db_version, std::move(new_roots));
+
+ // always send a synchronous ack
+ return true;
+}
+
+void* QuiesceAgent::agent_thread_main() {
+ working.clear();
+ std::unique_lock lock(agent_mutex);
+
+ while(!stop_agent_thread) {
+ if (pending.armed) {
+ working.roots.swap(pending.roots);
+ working.db_version = pending.db_version;
+ } else {
+ // copy current roots
+ working.roots = current.roots;
+ working.db_version = current.db_version;
+ }
+
+ dout(20)
+ << "current = " << current.db_version
+ << ", working = " << working.db_version
+ << ", pending = " << pending.db_version << dendl;
+
+ current.armed = false;
+ working.armed = true;
+
+ // it's safe to clear the pending roots under lock because it shouldn't
+ // ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
+ pending.clear();
+ lock.unlock();
+
+ QuiesceMap ack(working.db_version);
+
+ // upkeep what we believe is the current state.
+ for (auto& [root, info] : working.roots) {
+
+ info->lock();
+ bool should_quiesce = info->should_quiesce();
+ bool issue_quiesce = should_quiesce && !info->quiesce_request && !info->quiesce_result;
+ std::optional<QuiesceInterface::RequestHandle> cancel_handle;
+ if (!should_quiesce && !info->cancel_result) {
+ cancel_handle = info->quiesce_request;
+ }
+ auto actual_state = info->get_actual_state();
+ if (info->committed_state != actual_state) {
+ ack.roots[root] = { actual_state, info->get_ttl() };
+ }
+ info->unlock();
+
+ if (issue_quiesce) {
+ std::weak_ptr<TrackedRoot> weak_info = info;
+ auto request_handle = quiesce_control.submit_request(root, new LambdaContext([weak_info, submitted_root = root, this](int rc) {
+ if (auto info = weak_info.lock()) {
+ dout(20) << "completing request (rc=" << rc << ") for '" << submitted_root << "'" << dendl;
+ info->lock();
+ info->quiesce_result = rc;
+ info->unlock();
+
+ // TODO: capturing QuiesceAgent& `this` is potentially dangerous
+ // the assumption is that since the root pointer is weak
+ // it will have been deleted by the QuiesceAgent shutdown sequence
+ set_upkeep_needed();
+ }
+ dout(20) << "done with submit callback for '" << submitted_root << "'" << dendl;
+ }));
+
+ dout(10) << "got request handle <" << request_handle << "> for '" << root << "'" << dendl;
+ info->lock();
+ info->quiesce_request = request_handle;
+ info->cancel = quiesce_control.cancel_request;
+ info->unlock();
+ } else if (cancel_handle) {
+ dout(10) << "Calling `cancel` on `" << root << "` with handle <" << *cancel_handle << ">" << dendl;
+ int rc = quiesce_control.cancel_request(*cancel_handle);
+ if (rc != 0) {
+ dout(1) << "ERROR (" << rc << ") when trying to cancel quiesce request id: " << *cancel_handle << dendl;
+ }
+ info->lock();
+ info->cancel_result = rc;
+ info->unlock();
+ }
+ }
+
+ lock.lock();
+
+ bool new_version = current.db_version < working.db_version;
+ current.roots.swap(working.roots);
+ current.db_version = working.db_version;
+
+ lock.unlock();
+
+ // clear the old roots and send the ack outside of the lock
+ working.roots.clear();
+ if (new_version || !ack.roots.empty()) {
+ dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
+ int rc = quiesce_control.agent_ack(std::move(ack));
+ if (rc != 0) {
+ dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
+ }
+ }
+ ack.reset();
+
+ lock.lock();
+
+ // notify that we're done working on this version and all acks (if any) were sent
+ working.clear();
+
+ // a new pending version could be set while we weren't locked
+ // if that's the case just go for another pass
+ // otherwise, wait for updates
+ if (!pending.armed && !current.armed && !stop_agent_thread) {
+ // for somebody waiting for the thread to idle
+ agent_cond.notify_all();
+ agent_cond.wait(lock);
+ }
+ }
+ agent_cond.notify_all();
+ return nullptr;
+}
+
+void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& new_roots)
+{
+ std::unique_lock l(agent_mutex);
+
+ auto actual_version = std::max(current.db_version, working.db_version);
+ bool rollback = actual_version > version;
+
+ if (rollback) {
+ dout(5) << "version rollback to " << version
+ << ". current = " << current.db_version
+ << ", working = " << working.db_version
+ << ", pending = " << pending.db_version << dendl;
+ }
+
+ // set the pending version unconditionally
+ pending.db_version = version;
+ pending.roots = std::move(new_roots);
+ pending.armed = true;
+
+ agent_cond.notify_all();
+}
+
+void QuiesceAgent::set_upkeep_needed()
+{
+ std::unique_lock l(agent_mutex);
+
+ dout(20)
+ << "current = " << current.db_version
+ << ", working = " << working.db_version
+ << ", pending = " << pending.db_version << dendl;
+
+ current.armed = true;
+ agent_cond.notify_all();
+}
+
+QuiesceAgent::TrackedRoot::~TrackedRoot()
+{
+ std::optional<QuiesceInterface::RequestHandle> request_handle;
+
+ lock();
+ request_handle.swap(quiesce_request);
+ bool should_cancel = !cancel_result.has_value();
+ unlock();
+
+ if (should_cancel && request_handle && cancel) {
+ dout(10) << "Calling `cancel` on an abandoned root with handle <" << request_handle << ">" << dendl;
+ cancel(*request_handle);
+ }
+
+ dout(20) << "done with request handle <" << request_handle << ">" << dendl;
+}
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License db_version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
-#include "QuiesceDb.h"
+#include "mds/QuiesceDb.h"
+#include <functional>
+#include <optional>
+#include <map>
+#include <mutex>
+#include <thread>
class QuiesceAgent {
- // subscribe to the QM map
- // keeps runtime version of the map
- // notifies the QM when runtime version is different from the last know requested version
-};
\ No newline at end of file
+ public:
+ struct ControlInterface {
+ QuiesceInterface::RequestSubmit submit_request;
+ QuiesceInterface::RequestCancel cancel_request;
+ QuiesceInterface::AgentAck agent_ack;
+ // TODO: do we need a "cancel all"?
+ };
+
+ QuiesceAgent(const ControlInterface& quiesce_control)
+ : quiesce_control(quiesce_control)
+ , stop_agent_thread(false)
+ , agent_thread(this) {
+ agent_thread.create("quiesce.agt");
+ };
+
+ ~QuiesceAgent() {
+ agent_thread.kill(SIGTERM);
+ }
+
+ /// @brief WARNING: will reset syncrhonously
+ /// this may call cancel on active roots
+ /// which may lead to a deadlock if the MDS
+ /// lock is being held when calling this.
+ /// Consider `reset_async` if you're holding
+ /// the MDS lock.
+ void reset() {
+ std::unique_lock l(agent_mutex);
+
+ // prevent any pending change
+ pending.clear();
+
+ // let the system settle
+ await_idle_locked(l);
+
+ // we are idle, hence the current holds
+ // our only tracked set
+ TrackedRoots current_roots = current.clear();
+
+ l.unlock();
+
+ // do this outside of the lock
+ current_roots.clear();
+ }
+
+ void reset_async() {
+ set_pending_roots({0, 0}, {});
+ }
+
+ void shutdown()
+ {
+ std::unique_lock l(agent_mutex);
+ stop_agent_thread = true;
+ agent_cond.notify_all();
+ l.unlock();
+ agent_thread.join();
+
+ current.clear();
+ pending.clear();
+ }
+
+ bool db_update(QuiesceMap& map);
+
+ struct TrackedRoot {
+ std::optional<QuiesceInterface::RequestHandle> quiesce_request;
+ // we could have hidden the request handle
+ // inside the cancel functor, but then we'd lose
+ // the ability to identify individual requests
+ // when looking at the tracked root.
+ QuiesceInterface::RequestCancel cancel;
+ std::optional<int> quiesce_result;
+ std::optional<int> cancel_result;
+
+ QuiesceState committed_state;
+ QuiesceTimePoint expires_at;
+
+ TrackedRoot(QuiesceState state, QuiesceTimeInterval ttl)
+ : committed_state(state)
+ , expires_at(interval_saturate_add_now(ttl))
+ , busy_lock(false)
+ {
+ }
+
+ TrackedRoot() : TrackedRoot(QS__INVALID, QuiesceTimeInterval::zero()) {}
+
+ bool should_quiesce() const
+ {
+ return committed_state == QS_QUIESCING || committed_state == QS_QUIESCED;
+ }
+
+ bool should_release() const {
+ return committed_state == QS_RELEASING || committed_state == QS_RELEASED;
+ }
+
+ ~TrackedRoot();
+
+ void update_committed(QuiesceMap::RootInfo const & info) {
+ committed_state = info.state;
+ expires_at = interval_saturate_add_now(info.ttl);
+ }
+
+ QuiesceTimeInterval get_ttl() const
+ {
+ auto now = QuiesceClock::now();
+ if (expires_at.time_since_epoch() == QuiesceTimeInterval::max()) {
+ return QuiesceTimeInterval::max();
+ }
+ if (expires_at > now) {
+ return expires_at - now;
+ } else {
+ return QuiesceTimeInterval::zero();
+ }
+ }
+
+ QuiesceState get_actual_state() const {
+ QuiesceState result = QS_QUIESCING;
+ bool did_quiesce = quiesce_result == 0;
+ bool did_cancel = cancel_result == 0;
+ if (did_quiesce) {
+ if (cancel_result.has_value()) {
+ result = did_cancel ? QS_RELEASED : QS_EXPIRED;
+ } else {
+ result = QS_QUIESCED;
+ }
+ } else {
+ if (quiesce_result.has_value()) {
+ result = QS_FAILED;
+ } else if (should_release()) {
+ // we must have lost track of this root,
+ // probably, due to expiration. But even if due to an error,
+ // this is our best guess for the situation
+ result = QS_EXPIRED;
+ }
+ }
+ return result;
+ }
+
+ void lock() const {
+ while (busy_lock.test_and_set(std::memory_order_acquire))
+ ; // spin
+ }
+
+ void unlock() const {
+ busy_lock.clear(std::memory_order_release);
+ }
+ private:
+ mutable std::atomic_flag busy_lock;
+ };
+
+ using TrackedRootRef = std::shared_ptr<TrackedRoot>;
+
+ using TrackedRoots = std::unordered_map<QuiesceRoot, TrackedRootRef>;
+
+ TrackedRoots tracked_roots() {
+ std::lock_guard l(agent_mutex);
+ return current.roots;
+ }
+
+ TrackedRootRef get_tracked_root(QuiesceRoot root) {
+ std::lock_guard l(agent_mutex);
+ if (auto it = current.roots.find(root); it != current.roots.end()) {
+ return it->second;
+ } else {
+ return nullptr;
+ }
+ }
+
+ QuiesceDbVersion get_current_version() {
+ std::lock_guard l(agent_mutex);
+ return current.db_version;
+ }
+
+ protected:
+ ControlInterface quiesce_control;
+
+ struct TrackedRootsVersion {
+ TrackedRoots roots;
+ QuiesceDbVersion db_version = {0, 0};
+ bool armed = false;
+ TrackedRoots clear() {
+ armed = false;
+ db_version = {0, 0};
+ TrackedRoots old = std::move(roots);
+ roots.clear();
+ return old;
+ }
+ };
+
+ template <class CharT, class Traits>
+ friend std::basic_ostream<CharT, Traits>&
+ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
+
+ TrackedRootsVersion current;
+ TrackedRootsVersion working;
+ TrackedRootsVersion pending;
+
+ std::mutex agent_mutex;
+ std::condition_variable agent_cond;
+ bool stop_agent_thread;
+
+ template<class L>
+ QuiesceDbVersion await_idle_locked(L &lock) {
+ agent_cond.wait(lock, [this] {
+ return !(current.armed || working.armed || pending.armed);
+ });
+
+ return current.db_version;
+ }
+
+ void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
+
+ void set_upkeep_needed();
+
+ class AgentThread : public Thread {
+ public:
+ explicit AgentThread(QuiesceAgent* qa)
+ : qa(qa)
+ {
+ }
+ void* entry() override
+ {
+ return qa->agent_thread_main();
+ }
+
+ private:
+ QuiesceAgent* qa;
+ } agent_thread;
+
+ void* agent_thread_main();
+};
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, RedHat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "common/Cond.h"
+#include "mds/QuiesceAgent.h"
+#include "gtest/gtest.h"
+#include <algorithm>
+#include <functional>
+#include <queue>
+#include <ranges>
+#include <system_error>
+#include <thread>
+#include <future>
+
+class QuiesceAgentTest : public testing::Test {
+ using RequestHandle = QuiesceInterface::RequestHandle;
+ using QuiescingRoot = std::pair<RequestHandle, Context*>;
+ protected:
+ template< class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>, class Function, class... Args >
+ static bool timed_run(D timeout, Function&& f, Args&&... args ) {
+ std::promise<void> done;
+ auto future = done.get_future();
+
+ auto job = std::bind(f, args...);
+
+ auto tt = std::thread([job=std::move(job)](std::promise<void> done) {
+ job();
+ done.set_value();
+ }, std::move(done));
+
+ tt.detach();
+
+ return future.wait_for(timeout) != std::future_status::timeout;
+ }
+
+ struct TestQuiesceAgent : public QuiesceAgent {
+ using QuiesceAgent::QuiesceAgent;
+ AgentThread& get_agent_thread() {
+ return agent_thread;
+ }
+
+ QuiesceDbVersion get_latest_version()
+ {
+ std::lock_guard l(agent_mutex);
+ return std::max({current.db_version, working.db_version, pending.db_version});
+ }
+ TrackedRoots& mutable_tracked_roots() {
+ return current.roots;
+ }
+
+ QuiesceDbVersion await_idle() {
+ std::unique_lock l(agent_mutex);
+ return await_idle_locked(l);
+ }
+ };
+ QuiesceMap latest_ack;
+ std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
+ ceph_tid_t last_tid;
+ std::mutex mutex;
+
+ std::unique_ptr<TestQuiesceAgent> agent;
+
+ bool complete_quiesce(QuiesceRoot root, int rc = 0) {
+ std::lock_guard l(mutex);
+ if (auto it = quiesce_requests.find(root); it != quiesce_requests.end()) {
+ if (it->second.second) {
+ it->second.second->complete(rc);
+ it->second.second = nullptr;
+ if (rc != 0) {
+ // there was an error, no need to keep this request anymore
+ quiesce_requests.erase(it);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void SetUp() override {
+
+ QuiesceAgent::ControlInterface ci;
+ quiesce_requests.clear();
+
+ ci.submit_request = [this](QuiesceRoot r, Context* c) {
+ std::lock_guard l(mutex);
+
+ // always create a new request id
+ auto req_id = metareqid_t(entity_name_t::MDS(0), ++last_tid);
+
+ auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
+
+ if (!inserted) {
+ // we must update the request id so that old one can't cancel this request.
+ it->second.first = req_id;
+ if (it->second.second) {
+ it->second.second->complete(-EINTR);
+ it->second.second = c;
+ } else {
+ // if we have no context, it means we've completed it
+ // since we weren't inserted, we must have successfully quiesced
+ c->complete(0);
+ }
+ }
+
+ return it->second.first;
+ };
+
+ ci.cancel_request = [this](RequestHandle h) {
+ std::lock_guard l(mutex);
+
+ for (auto it = quiesce_requests.cbegin(); it != quiesce_requests.cend(); it++) {
+ if (it->second.first == h) {
+ if (it->second.second) {
+ it->second.second->complete(-ECANCELED);
+ }
+ quiesce_requests.erase(it);
+ return 0;
+ }
+ }
+
+ return ENOENT;
+ };
+
+ ci.agent_ack = [this](QuiesceMap const& update) {
+ std::lock_guard l(mutex);
+ latest_ack = update;
+ return 0;
+ };
+
+ agent = std::make_unique<TestQuiesceAgent>(ci);
+ }
+
+ void TearDown() override {
+ if (agent) {
+ agent->shutdown();
+ agent.reset();
+ }
+ }
+
+
+ using R = QuiesceMap::Roots::value_type;
+ using RootInitList = std::initializer_list<R>;
+
+ std::optional<QuiesceMap> update(QuiesceDbVersion v, RootInitList roots)
+ {
+ QuiesceMap map(v, QuiesceMap::Roots { roots });
+
+ if (agent->db_update(map)) {
+ return map;
+ }
+
+ return std::nullopt;
+ }
+
+ std::optional<QuiesceMap> update(QuiesceSetVersion v, RootInitList roots)
+ {
+ return update(QuiesceDbVersion { 1, v }, roots);
+ }
+
+ std::optional<QuiesceMap> update(RootInitList roots)
+ {
+ return update(agent->get_latest_version() + 1, roots);
+ }
+
+ template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
+ bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+ {
+ return timed_run(timeout, [this, version] {
+ while (version > agent->await_idle()) { };
+ });
+ }
+
+ template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
+ bool await_idle(D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+ {
+ return timed_run(timeout, [this] {
+ agent->await_idle();
+ });
+ }
+};
+
+TEST_F(QuiesceAgentTest, ThreadManagement) {
+ EXPECT_TRUE(agent->get_agent_thread().is_started());
+
+ EXPECT_TRUE(await_idle());
+
+ EXPECT_TRUE(update({ { "root1", QS_QUIESCING } }).has_value());
+
+ EXPECT_TRUE(await_idle());
+
+ EXPECT_TRUE(update({ { "root2", QS_QUIESCING } }).has_value());
+
+ agent->reset();
+
+ EXPECT_TRUE(await_idle());
+
+ EXPECT_TRUE(update({ { "root3", QS_QUIESCING } }).has_value());
+
+ // make sure that the agent thread completes in a timely fashion
+ EXPECT_TRUE(timed_run(std::chrono::seconds(1), [this] { agent->shutdown(); agent.reset(); }));
+}
+
+TEST_F(QuiesceAgentTest, DbUpdates) {
+ {
+ auto ack = update(1, {
+ { "root0", QS_QUIESCING }, // this shouldn't be reported because its state isn't different from QUIESCING
+ { "root1", QS_QUIESCING }, // ditto
+ { "root2", QS_QUIESCED }, // this should be reported back as quiescing
+ { "root3", QS_RELEASING }, // this should be reported back as expired
+ { "root4", QS_RELEASED }, // this should be reported back as expired
+ { "root5", QS_EXPIRED }, // this should be ignored
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(1, ack->db_version);
+ EXPECT_EQ(3, ack->roots.size());
+ EXPECT_EQ(QS_QUIESCING, ack->roots.at("root2").state);
+ EXPECT_EQ(QS_EXPIRED, ack->roots.at("root3").state);
+ EXPECT_EQ(QS_EXPIRED, ack->roots.at("root4").state);
+ EXPECT_TRUE(await_idle());
+ }
+
+ EXPECT_EQ(1, agent->get_current_version());
+
+ {
+ auto roots = agent->tracked_roots();
+ EXPECT_EQ(5, roots.size());
+ EXPECT_EQ(QS_QUIESCING, roots.at("root0")->committed_state);
+ EXPECT_EQ(QS_QUIESCING, roots.at("root1")->committed_state);
+ EXPECT_EQ(QS_QUIESCED, roots.at("root2")->committed_state);
+ EXPECT_EQ(QS_RELEASING, roots.at("root3")->committed_state);
+ EXPECT_EQ(QS_RELEASED, roots.at("root4")->committed_state);
+
+ // manipulate root0 and root1 as if they were quiesced and root2 as if it was released
+ auto& root0 = *roots.at("root0");
+ root0.quiesce_result = 0;
+ EXPECT_EQ(QS_QUIESCED, root0.get_actual_state());
+
+ auto& root1 = *roots.at("root1");
+ root1.quiesce_result = 0;
+ EXPECT_EQ(QS_QUIESCED, root1.get_actual_state());
+
+ auto& root2 = *roots.at("root2");
+ root2.quiesce_result = 0;
+ root2.cancel_result = 0;
+ EXPECT_EQ(QS_RELEASED, root2.get_actual_state());
+ }
+
+ {
+ auto ack = update(2, {
+ { "root0", QS_RELEASING }, // this should be reported back as quiesced
+ { "root1", QS_QUIESCING }, // this should be reported back as quiesced
+ { "root2", QS_RELEASING }, // this should be reported back as released
+ });
+
+ EXPECT_EQ(2, ack->db_version);
+ EXPECT_EQ(3, ack->roots.size());
+ EXPECT_EQ(QS_QUIESCED, ack->roots.at("root0").state);
+ EXPECT_EQ(QS_QUIESCED, ack->roots.at("root1").state);
+ EXPECT_EQ(QS_RELEASED, ack->roots.at("root2").state);
+ }
+
+ EXPECT_TRUE(await_idle());
+ {
+ auto roots = agent->tracked_roots();
+ EXPECT_EQ(3, roots.size());
+ EXPECT_EQ(QS_RELEASING, roots.at("root0")->committed_state);
+ EXPECT_EQ(QS_QUIESCING, roots.at("root1")->committed_state);
+ EXPECT_EQ(QS_RELEASING, roots.at("root2")->committed_state);
+ }
+
+ {
+ // we should be able to set pending version to anything
+ // and the agent should follow, including rolling back to 0
+ auto ack = update({200, 0}, {});
+
+ EXPECT_TRUE(await_idle());
+ EXPECT_EQ(0, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ EXPECT_EQ((QuiesceDbVersion {200, 0}), agent->get_current_version());
+ }
+}
+
+TEST_F(QuiesceAgentTest, QuiesceProtocol) {
+
+ {
+ auto ack = update(1, {
+ { "root1", QS_QUIESCING },
+ { "root2", QS_QUIESCING },
+ { "root3", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(1, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ {
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(3, tracked.size());
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root1")->committed_state);
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->committed_state);
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root3")->committed_state);
+
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root1")->get_actual_state());
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->get_actual_state());
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root3")->get_actual_state());
+
+ // we should have seen the quiesce requests for all roots
+ EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+ EXPECT_EQ(tracked.at("root2")->quiesce_request.value(), quiesce_requests.at("root2").first);
+ EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+ }
+
+ EXPECT_EQ(3, quiesce_requests.size());
+
+ // complete one root with success
+ EXPECT_TRUE(complete_quiesce("root1"));
+
+ EXPECT_TRUE(await_idle());
+ // we should have seen an ack sent
+ EXPECT_EQ(1, latest_ack.db_version);
+ EXPECT_EQ(1, latest_ack.roots.size());
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+
+ latest_ack.reset();
+
+ // complete the other root with failure
+ EXPECT_TRUE(complete_quiesce("root2", -1));
+
+ EXPECT_TRUE(await_idle());
+ EXPECT_EQ(1, latest_ack.db_version);
+ ASSERT_EQ(2, latest_ack.roots.size());
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+
+ latest_ack.reset();
+
+ // complete the third root with success
+ // complete one root with success
+ EXPECT_TRUE(complete_quiesce("root3"));
+
+ EXPECT_TRUE(await_idle());
+
+ // we should see the two quiesced roots in the ack,
+ EXPECT_EQ(1, latest_ack.db_version);
+ ASSERT_EQ(3, latest_ack.roots.size());
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
+
+ {
+ auto ack = update(2, {
+ { "root2", QS_QUIESCING },
+ { "root3", QS_RELEASING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(2, ack->db_version);
+ // this update doesn't have root1, so it should be untracked and cancelled
+ // root2 is still quiescing, no updates for it
+ // root3 is released asyncrhonously so for now it should be QUIESCED
+ ASSERT_EQ(2, ack->roots.size());
+ EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+ EXPECT_EQ(QS_QUIESCED, ack->roots.at("root3").state);
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ {
+ // make sure that root1 isn't tracked
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(2, agent->get_current_version());
+ ASSERT_EQ(2, tracked.size());
+ EXPECT_EQ(QS_QUIESCING, tracked.at("root2")->committed_state);
+ EXPECT_EQ(QS_RELEASING, tracked.at("root3")->committed_state);
+ }
+
+ // we should have also seen cancelations for root1 and root3.
+ // We observe this by missing them from the quiesce_requests
+ // NB: root2 shouldn't be part of requests either since it was completed with failure
+ EXPECT_EQ(0, quiesce_requests.size());
+}
+
+TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) {
+ {
+ auto ack = update(1, {
+ { "root1", QS_QUIESCING },
+ { "root2", QS_QUIESCING },
+ { "root3", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(1, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ QuiesceAgent::TrackedRootRef pinned1, pinned2;
+
+ {
+ auto tracked = agent->tracked_roots();
+ ASSERT_EQ(3, tracked.size());
+ EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+ EXPECT_EQ(tracked.at("root2")->quiesce_request.value(), quiesce_requests.at("root2").first);
+ EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+
+ // copying the shared ref will keep the object alive
+ pinned1 = tracked.at("root1");
+ pinned2 = tracked.at("root2");
+ }
+
+ // root 1 should be quiesced now
+ EXPECT_TRUE(complete_quiesce("root1"));
+
+ EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+ EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
+
+ // imagine that we lost our root records for a moment
+ {
+ auto ack = update(2, {
+ { "root3", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(2, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ {
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(1, tracked.size());
+ EXPECT_EQ(tracked.at("root3")->quiesce_request.value(), quiesce_requests.at("root3").first);
+ }
+
+ // since we have those pinned, they should still be live
+
+ EXPECT_TRUE(pinned1.unique());
+ EXPECT_TRUE(pinned2.unique());
+
+ EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+ EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
+
+ EXPECT_TRUE(quiesce_requests.contains("root1"));
+ EXPECT_TRUE(quiesce_requests.contains("root2"));
+
+ latest_ack.reset();
+ // now, bring the roots back
+ {
+ auto ack = update(3, {
+ { "root1", QS_QUIESCING },
+ { "root2", QS_QUIESCING },
+ { "root3", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(3, ack->db_version);
+ // even though root1 is already quiesced,
+ // we should not know about it synchronously
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ // now we should have seen the ack with root2 quiesced
+ EXPECT_EQ(3, latest_ack.db_version);
+ EXPECT_EQ(1, latest_ack.roots.size());
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+
+ // the actual state of the pinned objects shouldn't have changed
+ EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
+ EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
+
+ EXPECT_EQ(0, *pinned1->quiesce_result);
+ EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
+
+ // releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
+ pinned1.reset();
+ pinned2.reset();
+
+ EXPECT_TRUE(quiesce_requests.contains("root1"));
+ EXPECT_TRUE(quiesce_requests.contains("root2"));
+
+ EXPECT_TRUE(complete_quiesce("root2"));
+
+ EXPECT_TRUE(await_idle());
+ EXPECT_EQ(3, latest_ack.db_version);
+ EXPECT_EQ(2, latest_ack.roots.size());
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+ EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
+}
+
+TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
+{
+ {
+ auto ack = update(1, {
+ { "root1", QS_QUIESCING },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(1, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ // QuiesceAgent::TrackedRootRef pinned1, pinned2;
+
+ {
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(1, tracked.size());
+ EXPECT_EQ(tracked.at("root1")->quiesce_request.value(), quiesce_requests.at("root1").first);
+ }
+
+ // with a new update we got our root 1 timedout (this is the same as not listing it at all)
+ {
+ auto ack = update(2, {
+ { "root1", QS_TIMEDOUT },
+ });
+
+ ASSERT_TRUE(ack.has_value());
+ EXPECT_EQ(2, ack->db_version);
+ EXPECT_EQ(0, ack->roots.size());
+ }
+
+ EXPECT_TRUE(await_idle());
+
+ {
+ auto tracked = agent->tracked_roots();
+ EXPECT_EQ(0, tracked.size());
+ }
+}