SUBSYS(mds_log, 1, 5)
SUBSYS(mds_log_expire, 1, 5)
SUBSYS(mds_migrator, 1, 5)
+SUBSYS(mds_quiesce, 3, 5)
SUBSYS(buffer, 0, 1)
SUBSYS(timer, 0, 1)
SUBSYS(filer, 0, 1)
--- /dev/null
+#include <boost/url/src.hpp>
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#pragma once
+#include "QuiesceDb.h"
+
+class QuiesceAgent {
+ // subscribe to the QM map
+ // keeps runtime version of the map
+ // notifies the QM when runtime version is different from the last know requested version
+};
\ No newline at end of file
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#pragma once
+#include <string>
+#include <map>
+#include <unordered_map>
+#include <optional>
+#include <vector>
+#include <ranges>
+
+#include "mds/mdstypes.h"
+#include "common/ceph_time.h"
+
+// NB! The order of the states in the enum is important!
+// There are places in the code that aggregate multiple states
+// via min or max, depending on the task.
+// The order of states represents the natural lifecycle
+// of a set and its members, this is specifically important
+// for the active states.
+enum QuiesceState: uint8_t {
+ QS__INVALID,
+
+ // these states are considered "active"
+ QS_QUIESCING, QS__ACTIVE = QS_QUIESCING,
+ QS_QUIESCED,
+ QS_RELEASING,
+
+ // the below states are all terminal, or "inactive"
+ QS_RELEASED, QS__TERMINAL = QS_RELEASED,
+ // the below states are all about types of failure
+ QS_EXPIRED, QS__FAILURE = QS_EXPIRED,
+
+ QS_FAILED,
+
+ // the below states aren't allowed for roots, only for sets
+ QS_CANCELED, QS__SET_ONLY = QS_CANCELED,
+ QS_TIMEDOUT,
+
+ QS__MAX,
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceState& qs)
+{
+ switch (qs) {
+ case QS__INVALID:
+ return os << "QS__INVALID (" << (int)qs << ")";
+ case QS_QUIESCING:
+ return os << "QS_QUIESCING (" << (int)qs << ")";
+ case QS_QUIESCED:
+ return os << "QS_QUIESCED (" << (int)qs << ")";
+ case QS_RELEASING:
+ return os << "QS_RELEASING (" << (int)qs << ")";
+ case QS_RELEASED:
+ return os << "QS_RELEASED (" << (int)qs << ")";
+ case QS_FAILED:
+ return os << "QS_FAILED (" << (int)qs << ")";
+ case QS_CANCELED:
+ return os << "QS_CANCELED (" << (int)qs << ")";
+ case QS_TIMEDOUT:
+ return os << "QS_TIMEDOUT (" << (int)qs << ")";
+ case QS_EXPIRED:
+ return os << "QS_EXPIRED (" << (int)qs << ")";
+ default:
+ return os << "!Unknown quiesce state! (" << (int)qs << ")";
+ }
+};
+
+inline const char * quiesce_state_name(QuiesceState state) {
+ switch (state) {
+ case QS__INVALID:
+ return "<invalid>";
+ case QS_QUIESCING:
+ return "QUIESCING";
+ case QS_QUIESCED:
+ return "QUIESCED";
+ case QS_RELEASING:
+ return "RELEASING";
+ case QS_RELEASED:
+ return "RELEASED";
+ case QS_FAILED:
+ return "FAILED";
+ case QS_CANCELED:
+ return "CANCELED";
+ case QS_TIMEDOUT:
+ return "TIMEDOUT";
+ case QS_EXPIRED:
+ return "EXPIRED";
+ default:
+ return "<unknown>";
+ }
+}
+
+// Since MDS clock is not syncrhonized, and the quiesce db has to be replicated,
+// we measure all events in the quiesce database relative to the database age.
+// The age of the database is just large enough to have earliest events carry
+// a non-negative age stamp.
+// This is sufficient because we only care to honor the timeouts that are relative
+// to the other recorded database events.
+// This approach also relieves us from storing or transfering absolute time stamps:
+// every client can deduce the lower boundary of event's absolute time given the
+// message roundrip timing - if they bother enough. Otherwise, they can just subtract
+// the received database age from now() and get their own absolute time reference.
+
+using QuiesceClock = ceph::coarse_real_clock;
+using QuiesceTimePoint = QuiesceClock::time_point;
+using QuiesceTimeInterval = QuiesceClock::duration;
+using QuiesceSetId = std::string;
+using QuiesceRoot = std::string;
+using QuiesceSetVersion = uint64_t;
+
+struct QuiesceDbVersion {
+ epoch_t epoch;
+ QuiesceSetVersion set_version;
+ auto operator<=>(QuiesceDbVersion const& other) const = default;
+ QuiesceDbVersion& operator+(unsigned int delta) {
+ set_version += delta;
+ return *this;
+ }
+};
+
+inline auto operator==(int const& set_version, QuiesceDbVersion const& db_version)
+{
+ return db_version.set_version == (QuiesceSetVersion)set_version;
+}
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbVersion& dbv)
+{
+ return os << "(" << dbv.epoch << ":" << dbv.set_version << ")";
+}
+
+struct QuiesceTimeIntervalSec {
+ const QuiesceTimeInterval interval;
+ QuiesceTimeIntervalSec(const QuiesceTimeInterval &interval) : interval(interval) {}
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceTimeIntervalSec& sec)
+{
+ using std::chrono::duration_cast;
+ using dd = std::chrono::duration<double>;
+ const auto precision = os.precision();
+ const auto flags = os.flags();
+
+ os
+ << std::fixed
+ << std::setprecision(1)
+ << duration_cast<dd>(sec.interval).count()
+ << std::setprecision(precision);
+
+ os.flags(flags);
+ return os;
+}
+
+struct RecordedQuiesceState {
+ QuiesceState state;
+ QuiesceTimeInterval at_age;
+
+ operator QuiesceState() {
+ return state;
+ }
+
+ bool update(const QuiesceState &state, const QuiesceTimeInterval &at_age) {
+ if (state != this->state) {
+ this->state = state;
+ this->at_age = at_age;
+ return true;
+ }
+ return false;
+ }
+
+ RecordedQuiesceState(QuiesceState state, QuiesceTimeInterval at_age) : state(state), at_age(at_age) {}
+ RecordedQuiesceState() : RecordedQuiesceState (QS__INVALID, QuiesceTimeInterval::zero()) {}
+ RecordedQuiesceState(RecordedQuiesceState const&) = default;
+ RecordedQuiesceState(RecordedQuiesceState &&) = default;
+ RecordedQuiesceState& operator=(RecordedQuiesceState const&) = default;
+ RecordedQuiesceState& operator=(RecordedQuiesceState &&) = default;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const RecordedQuiesceState& rstate)
+{
+ return os << rstate.state;
+}
+
+/// @brief `QuiesceSet` is the only record type in the quiesce database
+/// It encodes sufficient information to have the db taken over by
+/// a new manager and correctly decide on the next state transition.
+struct QuiesceSet {
+ /// @brief A member of a set represents a single root this set wants quiesced
+ /// It carries the information about the current known state of this root
+ /// and whether it got excluded from this set.
+ /// It's possible that holding on to excluded members is an overkill.
+ struct MemberInfo {
+ RecordedQuiesceState rstate;
+ bool excluded = false;
+ MemberInfo(QuiesceState state, QuiesceTimeInterval at_age)
+ : rstate(state, at_age)
+ , excluded(false)
+ {
+ }
+ MemberInfo(QuiesceTimeInterval at_age)
+ : MemberInfo(QS_QUIESCING, at_age)
+ {
+ }
+ MemberInfo() = default;
+ MemberInfo(MemberInfo const& o) = default;
+ MemberInfo(MemberInfo &&) = default;
+ MemberInfo& operator=(MemberInfo const& o) = default;
+ MemberInfo& operator=(MemberInfo &&) = default;
+
+ bool is_quiescing() const { return rstate.state < QS_QUIESCED; }
+ bool is_failed() const { return rstate.state >= QS__FAILURE; }
+ };
+
+ /// @brief The db version when this set got modified last
+ QuiesceSetVersion version = 0;
+ /// @brief The last recorded state change of this set
+ RecordedQuiesceState rstate;
+ /// @brief How much time to give every new member to reach the quiesced state
+ /// By default the value is zero. It means that new sets which don't
+ /// have this field explicitly updated will immediately reach QS_TIMEDOUT.
+ QuiesceTimeInterval timeout = QuiesceTimeInterval::zero();
+ /// @brief How much time to allow the set to spend in QS_QUIESCED or QS_RELEASING states
+ /// The expiration timer is reset every time a successful await is executed
+ /// on a QS_QUIESCED set.
+ QuiesceTimeInterval expiration = QuiesceTimeInterval::zero();
+ using Members = std::unordered_map<QuiesceRoot, MemberInfo>;
+ Members members;
+
+ /// @brief The effective state of a member is just a max of its parent set state and its own state
+ /// The exception is when the set is releasing: we want to consider any ack from peers
+ /// that confirms quiesced state of the member to be treated as RELEASED.
+ /// @param member_state the reported state of the member
+ /// @return the effective member state
+ QuiesceState get_effective_member_state(QuiesceState reported_state) const
+ {
+ if (is_releasing()) {
+ if (reported_state >= QS_QUIESCED && reported_state <= QS_RELEASED) {
+ return QS_RELEASED;
+ }
+ }
+ if (is_quiesced() && reported_state < QS_QUIESCED) {
+ // we need to change back to quiescing
+ return QS_QUIESCING;
+ }
+ return std::max(reported_state, rstate.state);
+ }
+
+ /// @brief The requested state of a member is what we send to the agents for
+ /// executing the quiesce protocol. This state is deliberately reduced
+ /// to provoke clients to ack back and thus confirm their current state
+ /// @param set_state the state of the set this member is from
+ /// @return the effective member state
+ QuiesceState get_requested_member_state() const
+ {
+ if (rstate.state >= QS__TERMINAL) {
+ return rstate.state;
+ }
+ if (rstate.state <= QS_QUIESCED) {
+ // request quiescing even if we are already quiesced
+ return QS_QUIESCING;
+ }
+ // we can't have anything else unless the state enum was changed
+ // which will have to be addressed here.
+ ceph_assert(rstate.state == QS_RELEASING);
+ return QS_RELEASING;
+ }
+
+ bool is_active() const {
+ return
+ rstate.state > QS__INVALID
+ && rstate.state < QS__TERMINAL;
+ }
+
+ QuiesceState next_state(QuiesceState min_member_state) const;
+
+ bool is_quiescing() const { return rstate.state < QS_QUIESCED; }
+ bool is_quiesced() const { return rstate.state == QS_QUIESCED; }
+ bool is_releasing() const { return rstate.state == QS_RELEASING; }
+ bool is_released() const { return rstate.state == QS_RELEASED; }
+
+ QuiesceSet() = default;
+ QuiesceSet(QuiesceSet const &) = default;
+ QuiesceSet(QuiesceSet &&) = default;
+ QuiesceSet& operator=(QuiesceSet const &) = default;
+ QuiesceSet& operator=(QuiesceSet &&) = default;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceSet::MemberInfo& member)
+{
+ return os << (member.excluded ? "(excluded)" : "") << member.rstate;
+}
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceSet& set)
+{
+ size_t active = 0, inactive = 0;
+
+ for (auto && [_, m]: set.members) {
+ if (m.excluded) {
+ ++inactive;
+ } else {
+ ++active;
+ }
+ }
+
+ return os << "q-set[" << set.rstate << " v:" << set.version << ", m:" << active << "/" << inactive
+ << ", t:" << QuiesceTimeIntervalSec(set.timeout) << ", e:" << QuiesceTimeIntervalSec(set.expiration) << "]";
+}
+
+/// @brief QuiesceDbRequest is the only client interface to the database.
+/// This structure alone should be capable of encoding the full variety
+/// of different requests that can be submitted by the client.
+struct QuiesceDbRequest {
+ /// @brief `RootsOp` should be considered together with the `roots` set below
+ /// to know the operation. Each name in the enum has two verbs: first
+ /// verb is for the case when `roots` is not empty, and the second is
+ /// for when `roots` is empty
+ enum RootsOp: uint8_t {
+ INCLUDE_OR_QUERY,
+ EXCLUDE_OR_RELEASE,
+ RESET_OR_CANCEL,
+ __INVALID
+ };
+
+ enum Flags: uint8_t {
+ NONE = 0,
+ VERBOSE = 1,
+ EXCLUSIVE = 2,
+ };
+
+ struct Control {
+ union {
+ struct {
+ RootsOp roots_op;
+ Flags flags;
+ };
+ uint64_t raw;
+ };
+ Control() : raw(0) {}
+ Control(RootsOp op) : raw(0) {
+ roots_op = op;
+ }
+ bool operator==(const Control& other) const {
+ return other.raw == raw;
+ }
+ };
+
+ Control control;
+
+ /// @brief `set_id` is optional to allow for the following operations:
+ /// * including roots without providing a set id will generate a new set with a unique id
+ /// ** NB! the new set id will stored in this field by the db manager
+ /// * querying without a set id will return the full db
+ /// * cancelling without a set id will cancel all active sets
+ std::optional<std::string> set_id;
+ /// @brief When `if_version` provided, the request will only be executed
+ /// if the named set has exactly the version, otherwise ESTALE is returned
+ /// and no set modification is performed.
+ /// Requires a set_id.
+ std::optional<QuiesceSetVersion> if_version;
+ /// @brief Updates the quiesce timeout of an active set.
+ /// Requires a set_id. Attempt to update an inactive set will result in EPERM
+ std::optional<QuiesceTimeInterval> timeout;
+ /// @brief Updates the quiesce expiration of an active set.
+ /// Requires a set_id. Attempt to update an inactive set will result in EPERM
+ std::optional<QuiesceTimeInterval> expiration;
+ /// @brief When `await` is non-null, then after performing other encoded operations
+ /// this request is put on the await queue of the given set.
+ /// The value of this member defines the await timeout.
+ /// Requires a set id. The result code is one of the following:
+ /// EPERM - the set is not in one of the awaitable states:
+ /// [QS_QUIESCING, QS_QUIESCED, QS_RELEASING, QS_RELEASED]
+ /// SUCCESS - the set is currently QS_QUIESCED or QS_RELEASED.
+ /// When an await is completed successfully for a QS_QUIESCED set,
+ /// this set's quiesce expiration timer is reset.
+ /// EINTR - the set had a change in members or in state
+ /// ECANCELED - the set was canceled
+ /// ETIMEDOUT - at least one of the set members failed to quiesce
+ /// within the configured quiesce timeout.
+ /// OR the set is RELEASING and it couldn't reach RELEASED before it expired
+ /// NB: the quiesce timeout is measured for every member separately
+ /// from the moment that member is included.
+ /// EINPROGRESS - the time limit configured for this await call has elapsed
+ /// before the set changed state.
+ std::optional<QuiesceTimeInterval> await;
+ using Roots = std::unordered_set<QuiesceRoot>;
+ /// @brief `roots` help identify the wanted operation as well as providing
+ /// the actual roots to mutate the members of the set.
+ Roots roots;
+
+ bool operator==(const QuiesceDbRequest&) const = default;
+
+ bool is_valid() const {
+ return control.roots_op < __INVALID && (
+ // Everything goes if a set id is provided
+ set_id
+ // or it's a new set creation, in which case the request should be including roots
+ || includes_roots()
+ // Otherwise, the allowed wildcard operations are: query and cancel all.
+ // Also, one can't await a wildcard
+ || ((is_cancel_all() || is_query()) && !await && !timeout && !expiration && !if_version)
+ );
+ }
+
+ bool is_mutating() const { return (control.roots_op != INCLUDE_OR_QUERY) || !roots.empty() || timeout || expiration; }
+ bool is_cancel_all() const { return !set_id && is_cancel(); }
+ bool excludes_roots() const { return control.roots_op == RESET_OR_CANCEL || (control.roots_op == EXCLUDE_OR_RELEASE && !roots.empty()); }
+ bool includes_roots() const { return (control.roots_op == RESET_OR_CANCEL || control.roots_op == INCLUDE_OR_QUERY) && !roots.empty(); }
+
+ bool is_include() const { return control.roots_op == INCLUDE_OR_QUERY && !roots.empty(); }
+ bool is_query() const { return control.roots_op == INCLUDE_OR_QUERY && roots.empty(); }
+ bool is_exclude() const { return control.roots_op == EXCLUDE_OR_RELEASE && !roots.empty(); }
+ bool is_release() const { return control.roots_op == EXCLUDE_OR_RELEASE && roots.empty(); }
+ bool is_reset() const { return control.roots_op == RESET_OR_CANCEL && !roots.empty(); }
+ bool is_cancel() const { return control.roots_op == RESET_OR_CANCEL && roots.empty(); }
+
+ bool is_verbose() const { return control.flags & Flags::VERBOSE; }
+ bool is_exclusive() const { return control.flags & Flags::EXCLUSIVE; }
+
+ bool should_exclude(QuiesceRoot root) const {
+ switch (control.roots_op) {
+ case INCLUDE_OR_QUERY:
+ return false;
+ case EXCLUDE_OR_RELEASE:
+ return roots.contains(root);
+ case RESET_OR_CANCEL:
+ return !roots.contains(root);
+ default: ceph_abort("unknown roots_op"); return false;
+ }
+ }
+
+ void reset(std::invocable<QuiesceDbRequest&> auto const &config)
+ {
+ set_id.reset();
+ if_version.reset();
+ timeout.reset();
+ expiration.reset();
+ await.reset();
+ roots.clear();
+ control.raw = 0; // implies roots_op == INCLUDE_OR_QUERY;
+
+ config(*this);
+ }
+ void reset() {
+ reset([](auto&r){});
+ }
+
+ template<typename R = Roots>
+ requires requires ( R&& roots) {
+ Roots(std::forward<R>(roots));
+ }
+ void set_roots(RootsOp op, R&& roots) {
+ control.roots_op = op;
+ this->roots = Roots(std::forward<R>(roots));
+ }
+
+ template <std::ranges::range R>
+ void set_roots(RootsOp op, const R& roots_range)
+ {
+ control.roots_op = op;
+ this->roots = Roots(roots_range.begin(), roots_range.end());
+ }
+
+ template <typename R = Roots>
+ void include_roots(R&& roots)
+ {
+ set_roots(INCLUDE_OR_QUERY, std::forward<R>(roots));
+ }
+
+ template <typename R = Roots>
+ void exclude_roots(R&& roots)
+ {
+ set_roots(EXCLUDE_OR_RELEASE, std::forward<R>(roots));
+ }
+
+ void release_roots() {
+ set_roots(EXCLUDE_OR_RELEASE, {});
+ }
+
+ template <typename R = Roots>
+ void reset_roots(R&& roots)
+ {
+ set_roots(RESET_OR_CANCEL, std::forward<R>(roots));
+ }
+
+ void cancel_roots()
+ {
+ set_roots(RESET_OR_CANCEL, {});
+ }
+
+ template <typename S = std::string>
+ void query(S&& set_id) {
+ reset([set_id](auto &r){
+ r.set_id = std::forward<S>(set_id);
+ });
+ }
+
+ const char * op_string() const {
+ switch (control.roots_op) {
+ case INCLUDE_OR_QUERY:
+ return roots.empty() ? "query" : "include";
+ case EXCLUDE_OR_RELEASE:
+ return roots.empty() ? "release" : "exclude";
+ case RESET_OR_CANCEL:
+ return roots.empty() ? "cancel" : "reset";
+ default:
+ return "<unknown>";
+ }
+ }
+
+
+ QuiesceDbRequest() {}
+ QuiesceDbRequest(const QuiesceDbRequest &) = default;
+ QuiesceDbRequest(QuiesceDbRequest &&) = default;
+ QuiesceDbRequest(std::invocable<QuiesceDbRequest&> auto const &config) {
+ reset(config);
+ }
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbRequest& req)
+{
+ os << "q-req[" << req.op_string();
+
+ if (req.set_id) {
+ os << " \"" << req.set_id << "\"";
+ }
+
+ if (req.if_version) {
+ os << " ?v:" << req.if_version;
+ }
+
+ if (req.await) {
+ os << " a:" << QuiesceTimeIntervalSec(*req.await);
+ }
+
+ return os << " roots:" << req.roots.size() << "]";
+}
+
+/// @brief A `QuiesceDbListing` represents a subset of the database, up to
+/// a full database. The contents of the listing is decided by the leader
+/// based on the acks it got from every given replica: the update will
+/// contain all sets that have their version > than the last acked by the peer.
+struct QuiesceDbListing {
+ QuiesceDbVersion db_version = {0, 0};
+ /// @brief Crusially, the precise `db_age` must be included in every db listing
+ /// This data is used by all replicas to update their calculated DB TIME ZERO.
+ /// All events in the database are measured relative to the DB TIME ZERO
+ QuiesceTimeInterval db_age = QuiesceTimeInterval::zero();
+ std::unordered_map<QuiesceSetId, QuiesceSet> sets;
+
+ void clear() {
+ db_version = {0, 0};
+ db_age = QuiesceTimeInterval::zero();
+ sets.clear();
+ }
+
+ QuiesceDbListing(epoch_t epoch) : db_version {epoch, 0} {}
+ QuiesceDbListing() = default;
+ QuiesceDbListing(QuiesceDbListing const&) = default;
+ QuiesceDbListing(QuiesceDbListing &&) = default;
+ QuiesceDbListing& operator=(QuiesceDbListing const&) = default;
+ QuiesceDbListing& operator=(QuiesceDbListing &&) = default;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbListing& dbl)
+{
+ size_t active = 0, inactive = 0;
+
+ for (auto&& [_, s] : dbl.sets) {
+ if (s.is_active()) {
+ ++active;
+ } else {
+ ++inactive;
+ }
+ }
+
+ return os << "q-db[v:" << dbl.db_version << " sets:" << active << "/" << inactive << "]";
+}
+
+/// @brief `QuiesceMap` is a root-centric representation of the quiesce database
+/// It lists roots with their effective states as of particular version.
+/// Additionally, the same structure is used by the peers when reporting
+/// actual roots states which are different from what the DB version encodes
+struct QuiesceMap {
+ QuiesceDbVersion db_version;
+ struct RootInfo {
+ QuiesceState state;
+ QuiesceTimeInterval ttl = QuiesceTimeInterval::zero();
+ bool is_valid() const { return state > QS__INVALID && state < QS__SET_ONLY; }
+ RootInfo() : RootInfo(QS__INVALID) {}
+ RootInfo(QuiesceState state) : RootInfo(state,QuiesceTimeInterval::zero()) {}
+ RootInfo(QuiesceState state, QuiesceTimeInterval ttl)
+ : state(state)
+ , ttl(ttl)
+ {
+ }
+ inline bool operator==(const RootInfo& other) const {
+ return state == other.state && ttl == other.ttl;
+ }
+
+ RootInfo(RootInfo const&) = default;
+ RootInfo(RootInfo &&) = default;
+ RootInfo& operator=(RootInfo const&) = default;
+ RootInfo& operator=(RootInfo &&) = default;
+ };
+ using Roots = std::unordered_map<QuiesceRoot, RootInfo>;
+ Roots roots;
+ void reset() {
+ db_version = {0, 0};
+ roots.clear();
+ }
+
+ QuiesceMap() : db_version({0, 0}), roots() { }
+ QuiesceMap(QuiesceDbVersion db_version) : db_version(db_version), roots() { }
+ QuiesceMap(QuiesceDbVersion db_version, Roots &&roots) : db_version(db_version), roots(roots) { }
+ QuiesceMap(QuiesceDbVersion db_version, Roots const& roots) : db_version(db_version), roots(roots) { }
+
+ QuiesceMap(QuiesceMap const&) = default;
+ QuiesceMap(QuiesceMap &&) = default;
+ QuiesceMap& operator=(QuiesceMap const&) = default;
+ QuiesceMap& operator=(QuiesceMap &&) = default;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceMap& map)
+{
+
+ size_t active = 0, inactive = 0;
+
+ for (auto&& [_, r] : map.roots) {
+ if (r.state < QS__TERMINAL) {
+ ++active;
+ } else {
+ ++inactive;
+ }
+ }
+
+ return os << "q-map[v:" << map.db_version << " roots:" << active << "/" << inactive << "]";
+}
+
+inline QuiesceTimeInterval interval_saturate_add(QuiesceTimeInterval lhs, QuiesceTimeInterval rhs)
+{
+ // assuming an unsigned time interval.
+ // TODO: make this function generic and also saturate add signed values
+ assert(std::is_unsigned_v<QuiesceTimeInterval::rep>);
+
+ QuiesceTimeInterval result = lhs + rhs;
+
+ // the sum can't be smaller than either part
+ // since we're working with an unsigned value
+ if (result < lhs || result < rhs) {
+ // this must have been an overflow
+ return QuiesceTimeInterval::max();
+ }
+
+ return result;
+};
+
+inline QuiesceTimePoint interval_saturate_add_now(QuiesceTimeInterval interval) {
+ return QuiesceTimePoint(interval_saturate_add(QuiesceClock::now().time_since_epoch(), interval));
+};
+
+namespace QuiesceInterface {
+ using PeerId = mds_gid_t;
+ /// @brief A callback from the manager to the agent with an up-to-date root list
+ /// The map is mutable and will be used as synchronous agent ack if the return value is true
+ using AgentNotify = std::function<bool(QuiesceMap&)>;
+ /// @brief Used to send asyncrhonous acks from agents about changes to the root states
+ /// The transport layer should include sufficient information to know the sender of the ack
+ using AgentAck = std::function<int(QuiesceMap&&)>;
+ /// @brief Used by the leader to replicate the DB changes to its peers
+ using DbPeerUpdate = std::function<int(PeerId, QuiesceDbListing&&)>;
+
+ using RequestHandle = metareqid_t;
+ /// @brief Used by the agent to initiate an ongoing quiesce request for the given quiesce root
+ /// The context will be completed when the quiescing is achieved by this rank. The IO pause
+ /// should continue until the request is canceled.
+ /// Repeated requests for the same root should succeed, returning a _new_ request id;
+ /// the old context should be completed with an error EINTR, and the old request id should be invalidated.
+ /// If the root has already reached quiescence by the time the repeated request is submitted
+ /// then the new context should be immediately (syncrhonously) completed with success and then discarded.
+ /// Syncrhonous errors should be reported by completing the supplied context, and the return value
+ /// should be std::nullopt in such cases
+ using RequestSubmit = std::function<std::optional<RequestHandle>(QuiesceRoot, Context*)>;
+ /// @brief Cancels the quiesce request. May be called at any time after the request got submitted
+ using RequestCancel = std::function<int(const RequestHandle&)>;
+};
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "mds/QuiesceDbManager.h"
+#include "common/debug.h"
+#include "fmt/format.h"
+#include "include/ceph_assert.h"
+#include <algorithm>
+#include <random>
+#include <ranges>
+#include <type_traits>
+#include "boost/url.hpp"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds_quiesce
+#undef dout_prefix
+#define dout_prefix *_dout << "quiesce.mgr <" << __func__ << "> "
+
+#undef dout
+#define dout(lvl) \
+ do { \
+ auto subsys = ceph_subsys_mds; \
+ if ((dout_context)->_conf->subsys.should_gather(dout_subsys, lvl)) { \
+ subsys = dout_subsys; \
+ } \
+ dout_impl(dout_context, ceph::dout::need_dynamic(subsys), lvl) dout_prefix
+
+#undef dendl
+#define dendl \
+ dendl_impl; \
+ } \
+ while (0)
+
+#define dset(suffix) "[" << set_id << "@" << set.version << "] " << suffix
+#define dsetroot(suffix) "[" << set_id << "@" << set.version << "," << root << "] " << suffix
+
+const QuiesceInterface::PeerId QuiesceClusterMembership::INVALID_MEMBER = MDS_GID_NONE;
+
+static QuiesceTimeInterval time_distance(QuiesceTimePoint lhs, QuiesceTimePoint rhs) {
+ if (lhs > rhs) {
+ return lhs - rhs;
+ } else {
+ return rhs - lhs;
+ }
+}
+
+bool QuiesceDbManager::db_thread_has_work() const
+{
+ return false
+ || pending_acks.size() > 0
+ || pending_requests.size() > 0
+ || pending_db_updates.size() > 0
+ || (agent_callback.has_value() && agent_callback->if_newer < db_version())
+ || (!cluster_membership.has_value() || cluster_membership->epoch != membership.epoch);
+}
+
+void* QuiesceDbManager::quiesce_db_thread_main()
+{
+ db_thread_enter();
+
+ std::unique_lock ls(submit_mutex);
+ QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
+ QuiesceDbVersion last_acked = {0, 0};
+
+ while (true) {
+
+ auto db_age = db.get_age();
+
+ if (!db_thread_has_work() && next_event_at_age > db_age) {
+ submit_condition.wait_for(ls, next_event_at_age - db_age);
+ }
+
+ if (!membership_upkeep()) {
+ break;
+ }
+
+ {
+ decltype(pending_acks) acks(std::move(pending_acks));
+ decltype(pending_requests) requests(std::move(pending_requests));
+ decltype(pending_db_updates) db_updates(std::move(pending_db_updates));
+
+ ls.unlock();
+
+ if (membership.is_leader()) {
+ if (leader_bootstrap(std::move(db_updates), next_event_at_age)) {
+ // we're good to process things
+ next_event_at_age = leader_upkeep(std::move(acks), std::move(requests));
+ } else {
+ // not yet there. Put the requests back onto the queue
+ ls.lock();
+ while (!requests.empty()) {
+ pending_requests.emplace_front(std::move(requests.back()));
+ requests.pop_back();
+ }
+ continue;
+ }
+ } else {
+ next_event_at_age = replica_upkeep(std::move(db_updates));
+ }
+ }
+
+ complete_requests();
+
+ // by default, only send ack if the version has changed
+ bool send_ack = last_acked != db_version();
+ QuiesceMap quiesce_map(db_version());
+ {
+ std::lock_guard lc(agent_mutex);
+ if (agent_callback) {
+ if (agent_callback->if_newer < db_version()) {
+ dout(20) << "notifying agent with db version " << db_version() << dendl;
+ calculate_quiesce_map(quiesce_map);
+ send_ack = agent_callback->notify(quiesce_map);
+ agent_callback->if_newer = db_version();
+ } else {
+ send_ack = false;
+ }
+ } else {
+ // by default, ack the db version and agree to whatever was sent
+ // This means that a quiesce cluster member with an empty agent callback
+ // will cause roots to stay quiescing indefinitely
+ dout(5) << "no agent callback registered, responding with an empty ack" << dendl;
+ }
+ }
+
+ if (send_ack) {
+ auto db_version = quiesce_map.db_version;
+ dout(20) << "synchronous agent ack: " << quiesce_map << dendl;
+ auto rc = membership.send_ack(std::move(quiesce_map));
+ if (rc != 0) {
+ dout(1) << "ERROR ("<< rc <<") when sending synchronous agent ack "
+ << quiesce_map << dendl;
+ } else {
+ last_acked = db_version;
+ }
+ }
+
+ ls.lock();
+ }
+
+ ls.unlock();
+
+ db_thread_exit();
+
+ return 0;
+}
+
+void QuiesceDbManager::update_membership(const QuiesceClusterMembership& new_membership, RequestContext* inject_request)
+{
+ std::unique_lock lock(submit_mutex);
+
+ bool will_participate = new_membership.members.contains(new_membership.me);
+ dout(20) << "will participate: " << std::boolalpha << will_participate << std::noboolalpha << dendl;
+
+ if (cluster_membership && !will_participate) {
+ // stop the thread
+ cluster_membership.reset();
+ submit_condition.notify_all();
+ lock.unlock();
+ ceph_assert(quiesce_db_thread.is_started());
+ dout(5) << "stopping the db mgr thread at epoch: " << new_membership.epoch << dendl;
+ quiesce_db_thread.join();
+ } else if (will_participate) {
+ if (!cluster_membership) {
+ // start the thread
+ dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
+ quiesce_db_thread.create("quiesce_db_mgr");
+ } else {
+ submit_condition.notify_all();
+ }
+ if (inject_request) {
+ pending_requests.push_front(inject_request);
+ }
+ cluster_membership = new_membership;
+
+ std::lock_guard lc(agent_mutex);
+ if (agent_callback) {
+ agent_callback->if_newer = {0, 0};
+ }
+ }
+
+ if (!will_participate && inject_request) {
+ inject_request->complete(-EPERM);
+ }
+}
+
+bool QuiesceDbManager::membership_upkeep()
+{
+ if (cluster_membership && cluster_membership->epoch == membership.epoch) {
+ // no changes
+ return true;
+ }
+
+ bool was_leader = membership.epoch > 0 && membership.leader == membership.me;
+ bool is_leader = cluster_membership && cluster_membership->leader == cluster_membership->me;
+ if (cluster_membership) {
+ dout(10) << "epoch: " << cluster_membership->epoch << " is_leader: " << is_leader << " was_leader: " << was_leader << dendl;
+ } else {
+ dout(10) << "shutdown! was_leader: " << was_leader << dendl;
+ }
+
+ if (is_leader) {
+ // remove peers that aren't present anymore
+ for (auto peer_it = peers.begin(); peer_it != peers.end();) {
+ if (cluster_membership->members.contains(peer_it->first)) {
+ peer_it++;
+ } else {
+ peer_it = peers.erase(peer_it);
+ }
+ }
+ // create empty info for new peers
+ for (auto peer : cluster_membership->members) {
+ peers.try_emplace(peer);
+ }
+
+ if (db.set_version == 0) {
+ db.time_zero = QuiesceClock::now();
+ db.sets.clear();
+ }
+
+ } else {
+ peers.clear();
+ // abort awaits with EINPROGRESS
+ // the reason is that we don't really have a new version
+ // of any of the sets, we just aren't authoritative anymore
+ // hence, EINPROGRESS is a more appropriate response than, say, EINTR
+ for (auto & [_, await_ctx]: awaits) {
+ done_requests[await_ctx.req_ctx] = EINPROGRESS;
+ }
+ awaits.clear();
+ // reject pending requests
+ while (!pending_requests.empty()) {
+ done_requests[pending_requests.front()] = EPERM;
+ pending_requests.pop_front();
+ }
+ }
+
+ if (cluster_membership) {
+ membership = *cluster_membership;
+ }
+
+ return cluster_membership.has_value();
+}
+
+QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates)&& db_updates)
+{
+ // as a replica, we only care about the latest update
+ while (db_updates.size() > 1) {
+ dout(10) << "skipping an older update from " << db_updates.front().first << " version " << db_updates.front().second.db_version << dendl;
+ db_updates.pop();
+ }
+
+ if (db_updates.empty()) {
+ // no db updates, wait forever
+ return QuiesceTimeInterval::max();
+ }
+
+ QuiesceDbListing &update = db_updates.back().second;
+
+ if (update.db_version.epoch != membership.epoch) {
+ dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
+ return QuiesceTimeInterval::max();
+ }
+
+ if (update.db_version.set_version == 0) {
+ // this is a call from a leader
+ // to upload our local db version
+ update.sets = db.sets;
+ update.db_version.set_version = db.set_version;
+ update.db_age = db.get_age();
+ membership.send_listing_to(membership.leader, std::move(update));
+ return QuiesceTimeInterval::max();
+ }
+
+ auto time_zero = QuiesceClock::now() - update.db_age;
+ if (time_distance(time_zero, db.time_zero) > std::chrono::seconds(1)) {
+ dout(10) << "significant db_time_zero change to " << time_zero << " from " << db.time_zero << dendl;
+ }
+ db.time_zero = time_zero;
+
+ if (db.set_version > update.db_version.set_version) {
+ dout(3) << "got an older version of DB from the leader: " << db.set_version << " > " << update.db_version.set_version << dendl;
+ dout(3) << "discarding the DB" << dendl;
+ db.reset();
+ } else {
+ for (auto& [qs_id, qs] : update.sets) {
+ db.sets.insert_or_assign(qs_id, std::move(qs));
+ }
+ db.set_version = update.db_version.set_version;
+ }
+
+ // wait forever
+ return QuiesceTimeInterval::max();
+}
+
+bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age)
+{
+ // check that we've heard from all peers in this epoch
+ std::unordered_set<QuiesceInterface::PeerId> unknown_peers;
+ for (auto&& [peer, info] : peers) {
+ if (info.diff_map.db_version.epoch < membership.epoch && info.diff_map.db_version.set_version == 0) {
+ if (peer != membership.me) {
+ unknown_peers.insert(peer);
+ }
+ }
+ }
+
+ // only consider db submissions from unknown peers
+ while (!unknown_peers.empty() && !db_updates.empty()) {
+ auto &[from, update] = db_updates.front();
+ if (update.db_version.epoch == membership.epoch && unknown_peers.erase(from) > 0) {
+ // see if this peer's version is newer than mine
+ if (db.set_version < update.db_version.set_version) {
+ dout(3) << "preferring version from peer "
+ << from << " (" << update.db_version
+ << ") over mine (" << db_version() << ")"
+ << " and incrementing it to collect acks" << dendl;
+ db.time_zero = QuiesceClock::now() - update.db_age;
+ db.set_version = update.db_version.set_version + 1;
+ db.sets = update.sets;
+ }
+ // record that we've seen this peer;
+ // set the epoch correctly but use set version 0 because it's not an ack yet.
+ peers[from] = PeerInfo {QuiesceMap({membership.epoch, 0}), QuiesceClock::now()};
+ }
+ db_updates.pop();
+ }
+
+ for (auto & peer: unknown_peers) {
+ PeerInfo & info = peers[peer];
+
+ QuiesceTimePoint next_discovery = info.last_seen + std::chrono::seconds(1);
+ if (info.last_seen == QuiesceClock::zero() || next_discovery < QuiesceClock::now()) {
+ // send a discovery request to unknown peers
+ dout(5) << " sending a discovery request to " << peer << dendl;
+ membership.send_listing_to(peer, QuiesceDbListing(membership.epoch));
+ info.last_seen = QuiesceClock::now();
+ next_discovery = info.last_seen + std::chrono::seconds(1);
+ }
+ QuiesceTimeInterval next_discovery_at_age = next_discovery - db.time_zero;
+
+ next_event_at_age = std::min(next_event_at_age, next_discovery_at_age);
+ }
+
+ // true if all peers are known
+ return unknown_peers.empty();
+}
+
+QuiesceTimeInterval QuiesceDbManager::leader_upkeep(decltype(pending_acks)&& acks, decltype(pending_requests)&& requests)
+{
+ // record peer acks
+ while (!acks.empty()) {
+ auto& [from, diff_map] = acks.front();
+ leader_record_ack(from, std::move(diff_map));
+ acks.pop();
+ }
+
+ // process requests
+ while (!requests.empty()) {
+ auto req_ctx = requests.front();
+ int result = leader_process_request(req_ctx);
+ if (result != EBUSY) {
+ done_requests[req_ctx] = result;
+ }
+ requests.pop_front();
+ }
+
+ QuiesceTimeInterval next_db_event_at_age = leader_upkeep_db();
+ QuiesceTimeInterval next_await_event_at_age = leader_upkeep_awaits();
+
+ return std::min(next_db_event_at_age, next_await_event_at_age);
+}
+
+void QuiesceDbManager::complete_requests() {
+ for (auto [req, res]: done_requests) {
+ auto & r = req->response;
+ r.clear();
+ if (membership.leader == membership.me) {
+ r.db_age = db.get_age();
+ r.db_version = db_version();
+
+ if (req->request.set_id) {
+ Db::Sets::const_iterator it = db.sets.find(*req->request.set_id);
+ if (it != db.sets.end()) {
+ r.sets.emplace(*it);
+ }
+ } else if (req->request.is_query()) {
+ for (auto && it : std::as_const(db.sets)) {
+ r.sets.emplace(it);
+ }
+ }
+ }
+ // non-zero result codes are all errors
+ req->complete(-res);
+ }
+ done_requests.clear();
+}
+
+void QuiesceDbManager::leader_record_ack(QuiesceInterface::PeerId from, QuiesceMap&& diff_map)
+{
+ auto it = peers.find(from);
+
+ if (it == peers.end()) {
+ // ignore updates from unknown peers
+ return;
+ }
+
+ auto & info = it->second;
+
+ if (diff_map.db_version > db_version()) {
+ dout(3) << "ignoring unknown version ack by rank " << from << " (" << diff_map.db_version << " > " << db_version() << ")" << dendl;
+ dout(5) << "will send the peer a full DB" << dendl;
+ info.diff_map.reset();
+ } else {
+ info.diff_map = std::move(diff_map);
+ info.last_seen = QuiesceClock::now();
+ }
+}
+
+static std::string random_hex_string() {
+ std::mt19937 gen(std::random_device {} ());
+ return fmt::format("{:x}", gen());
+}
+
+bool QuiesceDbManager::sanitize_roots(QuiesceDbRequest::Roots& roots)
+{
+ static const std::string file_scheme = "file";
+ static const std::string inode_scheme = "inode";
+ static const std::unordered_set<std::string> supported_schemes { file_scheme, inode_scheme };
+ QuiesceDbRequest::Roots result;
+ for (auto &root : roots) {
+ auto parsed_uri = boost::urls::parse_uri_reference(root);
+ if (!parsed_uri) {
+ dout(2) << "Couldn't parse root '" << root << "' as URI (error: " << parsed_uri.error() << ")" << dendl;
+ return false;
+ }
+
+ boost::url root_url = parsed_uri.value();
+ root_url.normalize();
+
+ if (!root_url.has_scheme()) {
+ root_url.set_scheme(file_scheme);
+ } else if (!supported_schemes.contains(root_url.scheme())) {
+ dout(2) << "Unsupported root URL scheme '" << root_url.scheme() << "'" << dendl;
+ return false;
+ }
+
+ if (root_url.has_authority()) {
+ auto auth_str = root_url.authority().buffer();
+ bool ok_remove = false;
+ if (auth_str == membership.fs_name) {
+ ok_remove = true;
+ } else {
+ try {
+ ok_remove = std::stoll(auth_str) == membership.fs_id;
+ } catch (...) { }
+ }
+ if (ok_remove) {
+ // OK, but remove the authority for now
+ // we may want to enforce it if we decide to keep a single database for all file systems
+ dout(10) << "Removing the fs name or id '" << auth_str << "' from the root url authority section" << dendl;
+ root_url.remove_authority();
+ } else {
+ dout(2) << "The root url '" << root_url.buffer()
+ << "' includes an authority section '" << auth_str
+ << "' which doesn't match the fs id (" << membership.fs_id
+ << ") or name ('" << membership.fs_name << "')" << dendl;
+ return false;
+ }
+ }
+
+ std::string sanitized_path;
+ sanitized_path.reserve(root_url.path().size());
+ // deal with the file path
+ // * make it absolute (start with a slash)
+ // * remove repeated slashes
+ // * remove the trailing slash
+ bool skip_slash = true;
+ for (auto&& c : root_url.path()) {
+ if (c != '/' || !skip_slash) {
+ sanitized_path.push_back(c);
+ }
+ skip_slash = c == '/';
+ }
+
+ if (sanitized_path.size() > 0 && sanitized_path.back() == '/') {
+ sanitized_path.pop_back();
+ }
+
+ if (root_url.scheme() == file_scheme) {
+ sanitized_path.insert(sanitized_path.begin(), '/');
+ } else if (root_url.scheme() == inode_scheme) {
+ uint64_t inodeno = 0;
+ try {
+ inodeno = std::stoull(sanitized_path);
+ } catch (...) { }
+
+ if (!inodeno || fmt::format("{}", inodeno) != sanitized_path) {
+ dout(2) << "Root '" << root << "' does not encode a vaild inode number" << dendl;
+ return false;
+ }
+ }
+
+ root_url.set_path(sanitized_path);
+
+ if (root_url.buffer() != root) {
+ dout(10) << "Normalized root '" << root << "' to '" << root_url.buffer() << "'" << dendl;
+ }
+ result.insert(root_url.buffer());
+ }
+ roots.swap(result);
+ return true;
+}
+
+int QuiesceDbManager::leader_process_request(RequestContext* req_ctx)
+{
+ QuiesceDbRequest &request = req_ctx->request;
+
+ if (!request.is_valid()) {
+ dout(2) << "rejecting an invalid request" << dendl;
+ return EINVAL;
+ }
+
+ if (!sanitize_roots(request.roots)) {
+ dout(2) << "failed to sanitize roots for a request" << dendl;
+ return EINVAL;
+ }
+
+ const auto db_age = db.get_age();
+
+ if (request.is_cancel_all()) {
+ dout(3) << "WARNING: got a cancel all request" << dendl;
+ // special case - reset all
+ // this will only work on active sets
+ for (auto &[set_id, set]: db.sets) {
+ if (set.is_active()) {
+ bool did_update = false;
+ for (auto&& [_, member]: set.members) {
+ did_update |= !member.excluded;
+ member.excluded = true;
+ }
+
+ ceph_assert(did_update);
+ ceph_assert(set.rstate.update(QS_CANCELED, db_age));
+ set.version = db.set_version+1;
+ }
+ }
+ return 0;
+ }
+
+ // figure out the set to update
+ auto set_it = db.sets.end();
+
+ if (request.set_id) {
+ set_it = db.sets.find(*request.set_id);
+ } else if (request.if_version > 0) {
+ dout(2) << "can't expect a non-zero version (" << *request.if_version << ") for a new set" << dendl;
+ return EINVAL;
+ }
+
+ if (set_it == db.sets.end()) {
+ if (request.includes_roots() && request.if_version <= 0) {
+ // such requests may introduce a new set
+ if (!request.set_id) {
+ // we should generate a unique set id
+ QuiesceSetId new_set_id;
+ do {
+ new_set_id = random_hex_string();
+ } while (db.sets.contains(new_set_id));
+ // update the set_id in the request so that we can
+ // later know which set got created
+ request.set_id.emplace(std::move(new_set_id));
+ }
+ set_it = db.sets.emplace(*request.set_id, QuiesceSet()).first;
+ } else if (request.is_mutating() || request.await) {
+ ceph_assert(request.set_id.has_value());
+ dout(2) << "coudn't find set with id '" << *request.set_id << "'" << dendl;
+ return ENOENT;
+ }
+ }
+
+ if (set_it != db.sets.end()) {
+ auto& [set_id, set] = *set_it;
+
+ int result = leader_update_set(*set_it, request);
+ if (result != 0) {
+ return result;
+ }
+
+ if (request.await) {
+ // this check may have a false negative for a quiesced set
+ // that will be released in another request in the same batch
+ // in that case, this await will be enqueued but then found and completed
+ // with the same error in `leader_upkeep_awaits`
+ if ((set.is_releasing() || set.is_released()) && !request.is_release()) {
+ dout(2) << dset("can't quiesce-await a set that was released (") << set.rstate.state << ")" << dendl;
+ return EPERM;
+ }
+
+ auto expire_at_age = interval_saturate_add(db_age, *request.await);
+ awaits.emplace(std::piecewise_construct,
+ std::forward_as_tuple(set_id),
+ std::forward_as_tuple(expire_at_age, req_ctx));
+ // let the caller know that the request isn't done yet
+ return EBUSY;
+ }
+ }
+
+ // if we got here it must be a success
+ return 0;
+}
+
+int QuiesceDbManager::leader_update_set(Db::Sets::value_type& set_it, const QuiesceDbRequest& request)
+{
+ auto & [set_id, set] = set_it;
+ if (request.if_version && set.version != *request.if_version) {
+ dout(10) << dset("is newer than requested (") << *request.if_version << ") " << dendl;
+ return ESTALE;
+ }
+
+ if (!request.is_mutating()) {
+ return 0;
+ }
+
+ bool did_update = false;
+ bool did_update_roots = false;
+
+ if (request.is_release()) {
+ // the release command is allowed in states
+ // quiesced, releasing, released
+ switch (set.rstate.state) {
+ case QS_QUIESCED:
+ // we only update the state to RELEASING,
+ // and not the age. This is to keep counting
+ // towards the quiesce expiration.
+ // TODO: this could be reconsidered, but would
+ // then probably require an additional timestamp
+ set.rstate.state = QS_RELEASING;
+ did_update = true;
+ dout(15) << dset("") << "updating state to: " << set.rstate.state << dendl;
+ case QS_RELEASING:
+ case QS_RELEASED:
+ break;
+ default:
+ dout(2) << dset("can't release in the state: ") << set.rstate.state << dendl;
+ return EPERM;
+ }
+ } else {
+ const auto db_age = db.get_age();
+ bool reset = false;
+
+ if (!request.is_reset()) {
+ // only active or new sets can be modified
+ if (!set.is_active() && set.version > 0) {
+ dout(2) << dset("rejecting modification in the terminal state: ") << set.rstate.state << dendl;
+ return EPERM;
+ } else if (request.includes_roots() && set.is_releasing()) {
+ dout(2) << dset("rejecting new roots in the QS_RELEASING state") << dendl;
+ return EPERM;
+ }
+ } else {
+ // a reset request can be used to resurrect a set from whichever state it's in now
+ if (set.rstate.state > QS_QUIESCED) {
+ dout(5) << dset("reset back to a QUIESCING state") << dendl;
+ did_update = set.rstate.update(QS_QUIESCING, db_age);
+ ceph_assert(did_update);
+ reset = true;
+ }
+ }
+
+ if (request.timeout) {
+ set.timeout = *request.timeout;
+ did_update = true;
+ }
+
+ if (request.expiration) {
+ set.expiration = *request.expiration;
+ did_update = true;
+ }
+
+ size_t included_count = 0;
+ QuiesceState min_member_state = QS__MAX;
+
+ for (auto& [root, info] : set.members) {
+ if (request.should_exclude(root)) {
+ did_update_roots |= !info.excluded;
+ info.excluded = true;
+ } else if (!info.excluded) {
+ included_count ++;
+
+ QuiesceState effective_member_state;
+
+ if (reset) {
+ dout(5) << dsetroot("reset back to a QUIESCING state") << dendl;
+ info.rstate.state = QS_QUIESCING;
+ info.rstate.at_age = db_age;
+ did_update_roots = true;
+ effective_member_state = info.rstate.state;
+ } else {
+ QuiesceState min_reported_state;
+ QuiesceState max_reported_state;
+ size_t reporting_peers = check_peer_reports(set_id, set, root, info, min_reported_state, max_reported_state);
+
+ if (reporting_peers == peers.size() && max_reported_state < QS__FAILURE) {
+ effective_member_state = set.get_effective_member_state(min_reported_state);
+ } else {
+ effective_member_state = info.rstate.state;
+ }
+ }
+
+ min_member_state = std::min(min_member_state, effective_member_state);
+ }
+ }
+
+ if (request.includes_roots()) {
+ for (auto const& root : request.roots) {
+ auto const& [member_it, emplaced] = set.members.try_emplace(root, db_age);
+ auto& [_, info] = *member_it;
+ if (emplaced || info.excluded) {
+ info.excluded = false;
+ did_update_roots = true;
+ included_count++;
+ info.rstate = { QS_QUIESCING, db_age };
+ min_member_state = std::min(min_member_state, QS_QUIESCING);
+ }
+ }
+ }
+
+ did_update |= did_update_roots;
+
+ if (included_count == 0) {
+ dout(20) << dset("cancelled due to 0 included members") << dendl;
+ did_update = set.rstate.update(QS_CANCELED, db_age);
+ ceph_assert(did_update);
+ } else if (min_member_state < QS__MAX) {
+ auto next_state = set.next_state(min_member_state);
+ if (did_update |= set.rstate.update(next_state, db_age)) {
+ dout(15) << dset("updated to match the min state of the remaining (") << included_count << ") members: " << set.rstate.state << dendl;
+ }
+ }
+ }
+
+ if (did_update) {
+ dout(20) << dset("updating version from ") << set.version << " to " << db.set_version + 1 << dendl;
+ set.version = db.set_version + 1;
+ if (did_update_roots) {
+ // any awaits pending on this set must be interrupted
+ // NB! even though the set may be QUIESCED now, it could only
+ // get there due to exclusion of quiescing roots, which is
+ // not a vaild way to successfully await a set, hence EINTR
+ // However, if the set had all roots removed then we
+ // should respond in ECANCELED to notify that no more await
+ // attempts will be permitted
+ auto range = awaits.equal_range(set_id);
+ int rc = EINTR;
+ if (!set.is_active()) {
+ ceph_assert(set.rstate.state == QS_CANCELED);
+ rc = ECANCELED;
+ }
+ for (auto it = range.first; it != range.second; it++) {
+ done_requests[it->second.req_ctx] = rc;
+ }
+ if (range.first != range.second) {
+ dout(10) << dset("interrupting awaits with rc = ") << rc << " due to a change in members" << dendl;
+ }
+ awaits.erase(range.first, range.second);
+ }
+ }
+
+ return 0;
+}
+
+QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
+{
+ std::map<QuiesceInterface::PeerId, std::deque<std::reference_wrapper<Db::Sets::value_type>>> peer_updates;
+
+ QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
+ QuiesceSetVersion max_set_version = db.set_version;
+
+ for(auto & set_it: db.sets) {
+ auto & [set_id, set] = set_it;
+ auto next_set_event_at_age = leader_upkeep_set(set_it);
+
+ max_set_version = std::max(max_set_version, set.version);
+ next_event_at_age = std::min(next_event_at_age, next_set_event_at_age);
+
+ for(auto const & [peer, info]: peers) {
+ // update remote peers if their version is lower than this set's
+ // don't update myself
+ if (peer == membership.me) {
+ continue;
+ }
+ if (info.diff_map.db_version.set_version < set.version) {
+ peer_updates[peer].emplace_back(set_it);
+ }
+ }
+ }
+
+ db.set_version = max_set_version;
+
+ // update the peers
+ for (auto &[peer, sets]: peer_updates) {
+ QuiesceDbListing update;
+ update.db_age = db.get_age();
+ update.db_version = db_version();
+ std::ranges::copy(sets, std::inserter(update.sets, update.sets.end()));
+
+ dout(20) << "updating peer " << peer << " with " << sets.size()
+ << " sets modified in db version range ("
+ << peers[peer].diff_map.db_version << ".." << db.set_version << "]" << dendl;
+
+ auto rc = membership.send_listing_to(peer, std::move(update));
+ if (rc != 0) {
+ dout(1) << "ERROR (" << rc << ") trying to replicate db version "
+ << db.set_version << " with " << sets.size()
+ << " sets to the peer " << peer << dendl;
+ }
+ }
+
+ return next_event_at_age;
+}
+
+QuiesceState QuiesceSet::next_state(QuiesceState min_member_state) const {
+ ceph_assert(min_member_state > QS__INVALID);
+ ceph_assert(rstate.state < QS__TERMINAL);
+
+ if (is_releasing() && min_member_state == QS_QUIESCED) {
+ // keep releasing
+ return QS_RELEASING;
+ }
+
+ // otherwise, follow the member state
+ return min_member_state;
+}
+
+size_t QuiesceDbManager::check_peer_reports(const QuiesceSetId& set_id, const QuiesceSet& set, const QuiesceRoot& root, const QuiesceSet::MemberInfo& member, QuiesceState& min_reported_state, QuiesceState& max_reported_state) {
+ min_reported_state = QS__MAX;
+ max_reported_state = QS__INVALID;
+
+ size_t up_to_date_peers = 0;
+
+ for (auto& [peer, info] : peers) {
+ // we consider the last bit of information we had from a given peer
+ // however, we want to skip peers which haven't been bootstrapped yet
+ if (info.diff_map.db_version.set_version == 0) {
+ continue;
+ }
+ auto dit = info.diff_map.roots.find(root);
+ QuiesceState reported_state = set.get_requested_member_state();
+
+ if (dit != info.diff_map.roots.end()) {
+ // the peer has something to say about this root
+ auto const& pr_state = dit->second;
+ if (!pr_state.is_valid()) {
+ dout(5) << dsetroot("ignoring an invalid peer state ") << pr_state.state << dendl;
+ continue;
+ }
+ reported_state = pr_state.state;
+ }
+
+ // but we only consider the peer up to date given the version
+ if (info.diff_map.db_version >= QuiesceDbVersion { membership.epoch, set.version }) {
+ up_to_date_peers++;
+ }
+
+ min_reported_state = std::min(min_reported_state, reported_state);
+ max_reported_state = std::max(max_reported_state, reported_state);
+ }
+
+ if (min_reported_state == QS__MAX) {
+ min_reported_state = set.get_requested_member_state();
+ max_reported_state = set.get_requested_member_state();
+ }
+
+ return up_to_date_peers;
+}
+
+QuiesceTimeInterval QuiesceDbManager::leader_upkeep_set(Db::Sets::value_type& set_it)
+{
+ auto& [set_id, set] = set_it;
+
+ if (!set.is_active()) {
+ return QuiesceTimeInterval::max();
+ }
+
+ QuiesceTimeInterval end_of_life = QuiesceTimeInterval::max();
+
+ const auto db_age = db.get_age();
+ // no quiescing could have started before the current db_age
+
+ QuiesceState min_member_state = QS__MAX;
+ size_t included_members = 0;
+ // for each included member, apply recorded acks and check quiesce timeouts
+ for (auto& [root, member] : set.members) {
+ if (member.excluded) {
+ continue;
+ }
+ included_members++;
+
+ QuiesceState min_reported_state;
+ QuiesceState max_reported_state;
+
+ size_t reporting_peers = check_peer_reports(set_id, set, root, member, min_reported_state, max_reported_state);
+ auto effective_state = set.get_effective_member_state(min_reported_state);
+
+ if (max_reported_state >= QS__FAILURE) {
+ // if at least one peer is reporting a failure state then move to it
+ dout(5) << dsetroot("reported by at least one peer as: ") << max_reported_state << dendl;
+ if (member.rstate.update(max_reported_state, db_age)) {
+ dout(15) << dsetroot("updating member state to ") << member.rstate.state << dendl;
+ set.version = db.set_version + 1;
+ }
+ } else if (effective_state < member.rstate.state) {
+ // someone has reported a rollback state for the root
+ dout(15) << dsetroot("reported by at least one peer as ") << min_reported_state << " vs. the expected " << member.rstate.state << dendl;
+ if (member.rstate.update(effective_state, db_age)) {
+ dout(15) << dsetroot("updating member state to ") << member.rstate.state << dendl;
+ set.version = db.set_version + 1;
+ }
+ } else if (reporting_peers == peers.size()) {
+ dout(20) << dsetroot("min reported state for all (") << reporting_peers << ") peers: " << min_reported_state
+ << ". Effective state: " << effective_state << dendl;
+ if (member.rstate.update(effective_state, db_age)) {
+ dout(15) << dsetroot("updating member state to ") << member.rstate.state << dendl;
+ set.version = db.set_version + 1;
+ }
+ }
+
+ if (member.is_quiescing()) {
+ // the quiesce timeout applies in this case
+ auto timeout_at_age = interval_saturate_add(member.rstate.at_age, set.timeout);
+ if (timeout_at_age <= db_age) {
+ // NB: deliberately not changing the member state
+ dout(10) << dsetroot("detected a member quiesce timeout") << dendl;
+ ceph_assert(set.rstate.update(QS_TIMEDOUT, db_age));
+ set.version = db.set_version + 1;
+ break;
+ }
+ end_of_life = std::min(end_of_life, timeout_at_age);
+ } else if (member.is_failed()) {
+ // if at least one member is in a failure state
+ // then the set must receive it as well
+ dout(5) << dsetroot("propagating the terminal member state to the set level: ") << member.rstate.state << dendl;
+ ceph_assert(set.rstate.update(member.rstate.state, db_age));
+ set.version = db.set_version + 1;
+ break;
+ }
+
+ min_member_state = std::min(min_member_state, member.rstate.state);
+ }
+
+ if (!set.is_active()) {
+ return QuiesceTimeInterval::max();
+ }
+
+ // we should have at least one included members to be active
+ ceph_assert(included_members > 0);
+ auto next_state = set.next_state(min_member_state);
+
+ if (set.rstate.update(next_state, db_age)) {
+ set.version = db.set_version + 1;
+ dout(15) << dset("updated set state to match member reports: ") << set.rstate.state << dendl;
+ }
+
+ if (set.is_quiesced() || set.is_released()) {
+ // any awaits pending on this set should be completed now,
+ // before the set may enter a QS_EXPIRED state
+ // due to a zero expiration timeout.
+ // this could be used for barriers.
+ auto range = awaits.equal_range(set_id);
+ for (auto it = range.first; it != range.second; it++) {
+ done_requests[it->second.req_ctx] = 0;
+ if (set.is_quiesced()) {
+ // since we've just completed a _quiesce_ await
+ // we should also reset the recorded age of the QUIESCED state
+ // to postpone the expiration time checked below
+ set.rstate.at_age = db_age;
+ set.version = db.set_version + 1;
+ dout(20) << dset("reset quiesced state age upon successful await") << dendl;
+ }
+ }
+ awaits.erase(range.first, range.second);
+ }
+
+ // check timeouts:
+ if (set.is_quiescing()) {
+ // sanity check that we haven't missed this before
+ ceph_assert(end_of_life > db_age);
+ } else if (set.is_active()) {
+ auto expire_at_age = interval_saturate_add(set.rstate.at_age, set.expiration);
+ if (expire_at_age <= db_age) {
+ // we have expired
+ ceph_assert(set.rstate.update(QS_EXPIRED, db_age));
+ set.version = db.set_version + 1;
+ } else {
+ end_of_life = std::min(end_of_life, expire_at_age);
+ }
+ }
+
+ return end_of_life;
+}
+
+QuiesceTimeInterval QuiesceDbManager::leader_upkeep_awaits()
+{
+ QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
+ for (auto it = awaits.begin(); it != awaits.end();) {
+ auto & [set_id, actx] = *it;
+ Db::Sets::const_iterator set_it = db.sets.find(set_id);
+
+ int rc = db.get_age() >= actx.expire_at_age ? EINPROGRESS : EBUSY;
+
+ if (set_it == db.sets.cend()) {
+ rc = ENOENT;
+ } else {
+ auto const & set = set_it->second;
+
+ switch(set.rstate.state) {
+ case QS_CANCELED:
+ rc = ECANCELED;
+ break;
+ case QS_EXPIRED:
+ case QS_TIMEDOUT:
+ rc = ETIMEDOUT;
+ break;
+ case QS_QUIESCED:
+ rc = 0; // fallthrough
+ case QS_QUIESCING:
+ ceph_assert(!actx.req_ctx->request.is_release());
+ break;
+ case QS_RELEASED:
+ rc = 0; // fallthrough
+ case QS_RELEASING:
+ if (!actx.req_ctx->request.is_release()) {
+ // technically possible for a quiesce await
+ // to get here if a concurrent release request
+ // was submitted in the same batch;
+ // see the corresponding check in
+ // `leader_process_request`
+ rc = EPERM;
+ }
+ break;
+ case QS_FAILED:
+ rc = EBADF;
+ break;
+ default: ceph_abort("unexpected quiesce set state");
+ }
+ }
+
+ if (rc != EBUSY) {
+ dout(10) << "completing an await for the set '" << set_id << "' with rc: " << rc << dendl;
+ done_requests[actx.req_ctx] = rc;
+ it = awaits.erase(it);
+ } else {
+ next_event_at_age = std::min(next_event_at_age, actx.expire_at_age);
+ ++it;
+ }
+ }
+ return next_event_at_age;
+}
+
+static QuiesceTimeInterval get_root_ttl(const QuiesceSet & set, const QuiesceSet::MemberInfo &member, QuiesceTimeInterval db_age) {
+
+ QuiesceTimeInterval end_of_life = db_age;
+
+ if (set.is_quiesced() || set.is_releasing()) {
+ end_of_life = set.rstate.at_age + set.expiration;
+ } else if (set.is_active()) {
+ auto age = db_age; // taking the upper bound here
+ if (member.is_quiescing()) {
+ // we know that this member is on a timer
+ age = member.rstate.at_age;
+ }
+ end_of_life = age + set.timeout;
+ }
+
+ if (end_of_life > db_age) {
+ return end_of_life - db_age;
+ } else {
+ return QuiesceTimeInterval::zero();
+ }
+}
+
+void QuiesceDbManager::calculate_quiesce_map(QuiesceMap &map)
+{
+ map.roots.clear();
+ map.db_version = db_version();
+ auto db_age = db.get_age();
+
+ for(auto & [set_id, set]: db.sets) {
+ if (set.is_active()) {
+ // we only report active sets;
+ for(auto & [root, member]: set.members) {
+ if (member.excluded) {
+ continue;
+ }
+
+ // for a quiesce map, we want to report active roots as either QUIESCING or RELEASING
+ // this is to make sure that clients always have a reason to report back and confirm
+ // the quiesced state.
+ auto requested = set.get_requested_member_state();
+ auto ttl = get_root_ttl(set, member, db_age);
+ auto root_it = map.roots.try_emplace(root, QuiesceMap::RootInfo { requested, ttl }).first;
+
+ // the min below resolves conditions when members representing the same root have different state/ttl
+ // e.g. if at least one member is QUIESCING then the root should be QUIESCING
+ root_it->second.state = std::min(root_it->second.state, requested);
+ root_it->second.ttl = std::min(root_it->second.ttl, ttl);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#pragma once
+#include "mds/QuiesceDb.h"
+#include "include/Context.h"
+#include <memory>
+#include <functional>
+#include <mutex>
+#include <condition_variable>
+#include <set>
+#include <queue>
+
+template <>
+struct std::hash<mds_gid_t> {
+ size_t operator()(const mds_gid_t& gid) const
+ {
+ return hash<uint64_t> {}(gid);
+ }
+};
+
+struct QuiesceClusterMembership {
+ static const QuiesceInterface::PeerId INVALID_MEMBER;
+
+ epoch_t epoch = 0;
+ fs_cluster_id_t fs_id = FS_CLUSTER_ID_NONE;
+ std::string fs_name;
+
+ QuiesceInterface::PeerId me = INVALID_MEMBER;
+ QuiesceInterface::PeerId leader = INVALID_MEMBER;
+ std::set<QuiesceInterface::PeerId> members;
+
+ // A courier interface to decouple from the messaging layer
+ // Failures can be ignored, manager will call this repeatedly if needed
+ QuiesceInterface::DbPeerUpdate send_listing_to;
+ QuiesceInterface::AgentAck send_ack;
+
+ bool is_leader() const { return leader == me && me != INVALID_MEMBER; }
+};
+
+class QuiesceDbManager {
+ public:
+
+ struct RequestContext : public Context {
+ QuiesceDbRequest request;
+ QuiesceDbListing response;
+ };
+
+ QuiesceDbManager() : quiesce_db_thread(this) {};
+ virtual ~QuiesceDbManager()
+ {
+ update_membership({});
+ }
+
+ // This will reset the manager state
+ // according to the new cluster config
+ void update_membership(const QuiesceClusterMembership& new_membership) {
+ update_membership(new_membership, nullptr);
+ }
+ void update_membership(const QuiesceClusterMembership& new_membership, RequestContext* inject_request);
+
+ // ============================
+ // quiesce db leader interface:
+ // -> EPERM unless this is the leader
+
+ // client interface to the DB
+ int submit_request(RequestContext* ctx) {
+ std::lock_guard l(submit_mutex);
+
+ if (!cluster_membership || !cluster_membership->is_leader()) {
+ return -ENOTTY;
+ }
+
+ pending_requests.push_back(ctx);
+ submit_condition.notify_all();
+ return 0;
+ }
+ // acks the messaging system
+ int submit_ack_from(QuiesceInterface::PeerId sender, const QuiesceMap& diff_map) {
+ std::lock_guard l(submit_mutex);
+
+ if (!cluster_membership || !cluster_membership->is_leader()) {
+ return -EPERM;
+ }
+
+ if (!cluster_membership->members.contains(sender)) {
+ return -ESTALE;
+ }
+
+ pending_acks.push({ sender, diff_map });
+ submit_condition.notify_all();
+ return 0;
+ }
+
+ // =============================
+ // quiesce db replica interface:
+ // -> EPERM if this is the leader
+
+ // process an incoming listing from a leader
+ int submit_listing_from(QuiesceInterface::PeerId sender, QuiesceDbListing&& listing) {
+ std::lock_guard l(submit_mutex);
+
+ if (!cluster_membership) {
+ return -EPERM;
+ }
+
+ if (cluster_membership->epoch != listing.db_version.epoch) {
+ return -ESTALE;
+ }
+
+ pending_db_updates.push({sender, std::move(listing)});
+ submit_condition.notify_all();
+ return 0;
+ }
+
+ // =============================
+ // Quiesce Agent interface:
+
+ int submit_agent_ack(QuiesceMap&& diff_map)
+ {
+ std::unique_lock l(submit_mutex);
+ if (!cluster_membership) {
+ return -EPERM;
+ }
+
+ if (cluster_membership->leader == cluster_membership->me) {
+ // local delivery
+ pending_acks.push({ cluster_membership->me, std::move(diff_map) });
+ submit_condition.notify_all();
+ } else {
+ // send to the leader outside of the lock
+ auto send_ack = cluster_membership->send_ack;
+ l.unlock();
+ send_ack(std::move(diff_map));
+ }
+ return 0;
+ }
+
+ struct AgentCallback {
+ using Notify = std::function<bool(QuiesceMap&)>;
+ Notify notify;
+ QuiesceDbVersion if_newer = {0, 0};
+
+ AgentCallback(const Notify ¬ify, QuiesceDbVersion if_newer = {0, 0})
+ : notify(notify)
+ , if_newer(if_newer)
+ {
+ }
+ };
+
+ std::optional<AgentCallback> reset_agent_callback(AgentCallback::Notify notify, QuiesceDbVersion if_newer = {0, 0}) {
+ return reset_agent_callback(AgentCallback(notify, if_newer));
+ }
+
+ std::optional<AgentCallback> reset_agent_callback(std::optional<AgentCallback> callback_if_newer = std::nullopt)
+ {
+ std::lock_guard ls(submit_mutex);
+ std::lock_guard lc(agent_mutex);
+ agent_callback.swap(callback_if_newer);
+ if (agent_callback) {
+ submit_condition.notify_all();
+ }
+ return callback_if_newer;
+ }
+
+ std::optional<AgentCallback> reset_agent_callback(QuiesceDbVersion if_newer)
+ {
+ std::lock_guard ls(submit_mutex);
+ std::lock_guard lc(agent_mutex);
+ if (agent_callback) {
+ agent_callback->if_newer = if_newer;
+ submit_condition.notify_all();
+ }
+ return agent_callback;
+ }
+
+ std::optional<AgentCallback> get_agent_callback() const
+ {
+ std::lock_guard lc(agent_mutex);
+ return agent_callback;
+ }
+
+ protected:
+ mutable std::mutex submit_mutex;
+ mutable std::mutex agent_mutex;
+ std::condition_variable submit_condition;
+
+ std::optional<AgentCallback> agent_callback;
+ std::optional<QuiesceClusterMembership> cluster_membership;
+ std::queue<std::pair<QuiesceInterface::PeerId, QuiesceDbListing>> pending_db_updates;
+ std::queue<std::pair<QuiesceInterface::PeerId, QuiesceMap>> pending_acks;
+ std::deque<RequestContext*> pending_requests;
+
+ class QuiesceDbThread : public Thread {
+ public:
+ explicit QuiesceDbThread(QuiesceDbManager* qm)
+ : qm(qm)
+ {
+ }
+ void* entry() override
+ {
+ return qm->quiesce_db_thread_main();
+ }
+
+ private:
+ QuiesceDbManager* qm;
+ } quiesce_db_thread;
+
+ // =============================================
+ // The below is managed by the quiesce db thread
+
+ // the database.
+ struct Db {
+ QuiesceTimePoint time_zero;
+ QuiesceSetVersion set_version = 0;
+ using Sets = std::unordered_map<QuiesceSetId, QuiesceSet>;
+ Sets sets;
+
+ QuiesceTimeInterval get_age() const {
+ return QuiesceClock::now() - time_zero;
+ }
+ void reset() {
+ set_version = 0;
+ sets.clear();
+ time_zero = QuiesceClock::now();
+ }
+ } db;
+
+ QuiesceDbVersion db_version() const { return {membership.epoch, db.set_version}; }
+
+ QuiesceClusterMembership membership;
+
+ struct PeerInfo {
+ QuiesceMap diff_map;
+ QuiesceTimePoint last_seen;
+ PeerInfo(QuiesceMap&& diff_map, QuiesceTimePoint last_seen)
+ : diff_map(diff_map)
+ , last_seen(last_seen)
+ {
+ }
+ PeerInfo() { }
+ };
+ std::unordered_map<QuiesceInterface::PeerId, PeerInfo> peers;
+
+ struct AwaitContext {
+ QuiesceTimeInterval expire_at_age = QuiesceTimeInterval::zero();
+ RequestContext* req_ctx = nullptr;
+ AwaitContext(QuiesceTimeInterval exp, RequestContext* r)
+ : expire_at_age(exp)
+ , req_ctx(r)
+ {
+ }
+ };
+ // multiple awaits may be active per set
+ std::unordered_multimap<QuiesceSetId, AwaitContext> awaits;
+ std::unordered_map<RequestContext*, int> done_requests;
+
+ void* quiesce_db_thread_main();
+
+ void db_thread_enter() {
+ // this will invalidate the membership, see membership_upkeep()
+ membership.epoch = 0;
+ peers.clear();
+ awaits.clear();
+ done_requests.clear();
+ db.reset();
+ }
+
+ void db_thread_exit() {
+ complete_requests();
+ }
+
+ bool db_thread_has_work() const;
+
+ bool membership_upkeep();
+
+ QuiesceTimeInterval replica_upkeep(decltype(pending_db_updates)&& db_updates);
+ bool leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age);
+ QuiesceTimeInterval leader_upkeep(decltype(pending_acks)&& acks, decltype(pending_requests)&& requests);
+
+
+ void leader_record_ack(QuiesceInterface::PeerId from, QuiesceMap&& diff_map);
+ int leader_process_request(RequestContext* req_ctx);
+ bool sanitize_roots(QuiesceDbRequest::Roots &roots);
+ int leader_update_set(Db::Sets::value_type& set_it, const QuiesceDbRequest& req);
+ QuiesceTimeInterval leader_upkeep_set(Db::Sets::value_type& set_it);
+ QuiesceTimeInterval leader_upkeep_db();
+ QuiesceTimeInterval leader_upkeep_awaits();
+
+ size_t check_peer_reports(const QuiesceSetId& set_id, const QuiesceSet& set, const QuiesceRoot& root, const QuiesceSet::MemberInfo& member, QuiesceState& min_reported_state, QuiesceState& max_reported_state);
+
+ void calculate_quiesce_map(QuiesceMap &map);
+
+ void complete_requests();
+};
\ No newline at end of file
add_ceph_unittest(unittest_mds_sessionfilter)
target_link_libraries(unittest_mds_sessionfilter mds osdc ceph-common global ${BLKID_LIBRARIES})
+# unittest_mds_quiesce_db
+add_executable(unittest_mds_quiesce_db
+ TestQuiesceDb.cc
+ ../../../src/mds/QuiesceDbManager.cc
+ ../../../src/mds/BoostUrlImpl.cc
+ $<TARGET_OBJECTS:unit-main>
+)
+add_ceph_unittest(unittest_mds_quiesce_db)
+target_link_libraries(unittest_mds_quiesce_db ceph-common global)
+
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, RedHat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "mds/QuiesceDbManager.h"
+#include "gtest/gtest.h"
+#include "common/Cond.h"
+#include <ranges>
+#include <system_error>
+#include <thread>
+#include <queue>
+#include <functional>
+#include <algorithm>
+#include <iostream>
+#include <future>
+#include <list>
+#include <array>
+#include <utility>
+#include <cstdlib>
+#include "fmt/format.h"
+#include "common/debug.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mds_quiesce
+#undef dout_prefix
+#define dout_prefix *_dout << "== test == "
+
+struct GenericVerboseErrorCode {
+ int error_code;
+ GenericVerboseErrorCode(int error_code) : error_code(std::abs(error_code)) {}
+ auto operator<=>(const GenericVerboseErrorCode&) const = default;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const GenericVerboseErrorCode& ec)
+{
+ if (0 == ec.error_code) {
+ return os << "Success(0)";
+ } else {
+ return os << std::generic_category().message(ec.error_code) << "(" << ec.error_code << ")";
+ }
+};
+
+class QuiesceDbTest: public testing::Test {
+ protected:
+ template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>, class Function, class... Args>
+ static bool timed_run(D timeout, Function&& f, Args&&... args)
+ {
+ std::promise<void> done;
+ auto future = done.get_future();
+
+ auto job = std::bind(f, args...);
+
+ auto tt = std::thread([job = std::move(job)](std::promise<void> done) {
+ job();
+ done.set_value();
+ },
+ std::move(done));
+
+ tt.detach();
+
+ return future.wait_for(timeout) != std::future_status::timeout;
+ }
+ struct TestQuiesceDbManager: public QuiesceDbManager
+ {
+ using QuiesceDbManager::QuiesceDbManager;
+ using QuiesceDbManager::Db;
+ Db& internal_db() {
+ return db;
+ }
+ QuiesceClusterMembership& internal_membership() {
+ return membership;
+ }
+ decltype(pending_requests)& internal_pending_requests() {
+ return pending_requests;
+ }
+ decltype(awaits)& internal_awaits() {
+ return awaits;
+ }
+ decltype(peers)& internal_peers() {
+ return peers;
+ }
+ };
+
+ epoch_t epoch = 0;
+ std::map<QuiesceInterface::PeerId, std::unique_ptr<TestQuiesceDbManager>> managers;
+
+ std::mutex comms_mutex;
+ std::condition_variable comms_cond;
+
+ fs_cluster_id_t fs_id = 1;
+ std::string fs_name = "a";
+
+ std::unordered_map<QuiesceInterface::PeerId, QuiesceMap> latest_acks;
+ using AckHook = std::function<bool(QuiesceInterface::PeerId, QuiesceMap&)>;
+ std::list<std::pair<AckHook, std::promise<void>>> ack_hooks;
+
+ std::future<void> add_ack_hook(AckHook&& predicate)
+ {
+ std::lock_guard l(comms_mutex);
+ auto &&[_, promise] = ack_hooks.emplace_back(predicate, std::promise<void> {});
+ return promise.get_future();
+ }
+
+ void SetUp() override {
+ for (QuiesceInterface::PeerId r = mds_gid_t(1); r < mds_gid_t(11); r++) {
+ managers[r].reset(new TestQuiesceDbManager());
+ }
+ }
+
+ void TearDown() override
+ {
+ dout(6) << "\n tearing down the cluster" << dendl;
+ // We want to cause the managers to destruct
+ // before we have the last_request destructed.
+ // We should remove entries from `managers` under the comms lock
+ // to avoid race with attempts of messaging between the managers.
+ // Then we actually clear the map, destructing the managers,
+ // outside the lock: the destruction will join the db threads
+ // which in turn migh attempt to send a message
+ std::unique_lock l(comms_mutex);
+ auto mgrs = std::move(managers);
+ l.unlock();
+ mgrs.clear();
+ }
+
+ void configure_cluster(std::vector<QuiesceInterface::PeerId> leader_and_replicas = { mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) })
+ {
+ ++epoch;
+ ASSERT_GE(leader_and_replicas.size(), 1);
+ std::set<QuiesceInterface::PeerId> members(leader_and_replicas.begin(), leader_and_replicas.end());
+ auto leader = leader_and_replicas[0];
+ for (const auto &[this_peer, mgr] : managers) {
+ QuiesceClusterMembership mem = {
+ epoch,
+ fs_id,
+ fs_name,
+ this_peer,
+ leader,
+ members,
+ [epoch = this->epoch, this, leader, me = this_peer](auto recipient, auto listing) {
+ std::unique_lock l(comms_mutex);
+ if (epoch == this->epoch) {
+ if (this->managers.contains(recipient)) {
+ dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl;
+ this->managers[recipient]->submit_listing_from(me, std::move(listing));
+ comms_cond.notify_all();
+ return 0;
+ }
+ }
+ return -1;
+ },
+ [epoch = this->epoch, this, leader, me = this_peer](auto diff_map) {
+ std::unique_lock l(comms_mutex);
+ if (epoch == this->epoch) {
+ if (this->managers.contains(leader)) {
+ std::queue<std::promise<void>> done_hooks;
+ dout(10) << "ack from " << me << " to the leader (" << leader << ") for version " << diff_map.db_version << " with " << diff_map.roots.size() << " roots" << dendl;
+ auto [it, inserted] = latest_acks.insert({me, diff_map});
+ if (!inserted) {
+ if (it->second.db_version == diff_map.db_version) {
+ if (it->second.roots == diff_map.roots) {
+ dout(1) << "WARNING: detected a potentialy redundant ack" << dendl;
+ }
+ }
+ it->second = diff_map;
+ }
+ for (auto it = ack_hooks.begin(); it != ack_hooks.end();) {
+ if (it->first(me, diff_map)) {
+ done_hooks.emplace(std::move(it->second));
+ it = ack_hooks.erase(it);
+ } else {
+ it++;
+ }
+ }
+ this->managers[leader]->submit_ack_from(me, std::move(diff_map));
+ comms_cond.notify_all();
+ l.unlock();
+ while(!done_hooks.empty()) {
+ done_hooks.front().set_value();
+ done_hooks.pop();
+ }
+ return 0;
+ }
+ }
+ return -1;
+ }
+ };
+ mgr->update_membership(mem);
+ }
+ dout(6) << "\n === configured cluster with the following members, starting with the leader: " << leader_and_replicas << dendl;
+ }
+
+ struct TestRequestContext: public QuiesceDbManager::RequestContext, public C_SaferCond {
+ void finish(int r) override { C_SaferCond::finish(r); }
+ void complete(int r) override { C_SaferCond::complete(r); }
+
+ const QuiesceDbTest& parent;
+ TestRequestContext(const QuiesceDbTest& parent) : parent(parent) {}
+ ~TestRequestContext() {
+ wait();
+ }
+
+ bool start(std::invocable<QuiesceDbRequest&> auto const & c)
+ {
+ done = false;
+ response.clear();
+ request.reset(c);
+
+ int rr = -1;
+
+ for (auto& [rank, mgr] : parent.managers) {
+ if (!(rr = mgr->submit_request(this))) {
+ break;
+ }
+ }
+
+ if (rr == EPERM) {
+ // change the error to something never returned for a request
+ // EPIPE seems reasonable as we couldn't find the leader to send the command to
+ complete(EPIPE);
+ return false;
+ }
+
+ return true;
+ }
+
+ GenericVerboseErrorCode check_result() {
+ std::unique_lock l{lock};
+ if (done) {
+ return ERR(rval);
+ }
+ // this error is never returned by the manager
+ return NA();
+ }
+
+ GenericVerboseErrorCode wait_result() {
+ return ERR(wait());
+ }
+
+ GenericVerboseErrorCode wait_result_for(double seconds)
+ {
+ return ERR(wait_for(seconds));
+ }
+ };
+
+ std::deque<std::unique_ptr<TestRequestContext>> requests;
+ std::unique_ptr<TestRequestContext> last_request;
+
+ const QuiesceDbManager::AgentCallback::Notify QUIESCING_AGENT_CB = [](QuiesceMap& quiesce_map) {
+ dout(15) << "QUIESCING_AGENT_CB: notified with " << quiesce_map.roots.size() << " roots for version " << quiesce_map.db_version << dendl;
+ for (auto it = quiesce_map.roots.begin(); it != quiesce_map.roots.end();) {
+ switch (it->second.state) {
+ case QS_QUIESCING:
+ it->second.state = QS_QUIESCED;
+ dout(10) << "QUIESCING_AGENT_CB: reporting '" << it->first << "' as " << it->second.state << dendl;
+ it++;
+ break;
+ default:
+ it = quiesce_map.roots.erase(it);
+ break;
+ }
+ }
+ return true;
+ };
+
+ const QuiesceDbManager::AgentCallback::Notify FAILING_AGENT_CB = [](QuiesceMap& quiesce_map) {
+ dout(15) << "FAILING_AGENT_CB: notified with " << quiesce_map.roots.size() << " roots for version " << quiesce_map.db_version << dendl;
+ for (auto it = quiesce_map.roots.begin(); it != quiesce_map.roots.end();) {
+ switch (it->second.state) {
+ case QS_QUIESCING:
+ it->second.state = QS_FAILED;
+ dout(10) << "FAILING_AGENT_CB: reporting '" << it->first << "' as " << it->second.state << dendl;
+ it++;
+ break;
+ default:
+ it = quiesce_map.roots.erase(it);
+ break;
+ }
+ }
+ return true;
+ };
+
+ const QuiesceDbManager::AgentCallback::Notify SILENT_AGENT_CB = [](QuiesceMap& quiesce_map) {
+ dout(15) << "SILENT_AGENT_CB: nacking quiesce map version " << quiesce_map.db_version << " with " << quiesce_map.roots.size() << " roots" << dendl;
+ return false;
+ };
+
+ GenericVerboseErrorCode
+ run_request(std::invocable<QuiesceDbRequest&> auto const& c)
+ {
+ last_request.reset(new TestRequestContext(*this));
+ last_request->start(c);
+ return ERR(last_request->wait());
+ }
+
+ GenericVerboseErrorCode
+ run_request_for(double seconds, std::invocable<QuiesceDbRequest&> auto const& c)
+ {
+ last_request.reset(new TestRequestContext(*this));
+ last_request->start(c);
+ return ERR(last_request->wait_for(seconds));
+ }
+
+ TestRequestContext& start_request(std::invocable<QuiesceDbRequest&> auto const& c)
+ {
+ auto &ptr = requests.emplace_back(new TestRequestContext(*this));
+ ptr->start(c);
+ return *ptr;
+ }
+
+ TestQuiesceDbManager::Db& db(QuiesceInterface::PeerId peer) {
+ return managers[peer]->internal_db();
+ }
+
+ static GenericVerboseErrorCode ERR(int val) {
+ return GenericVerboseErrorCode(val);
+ }
+ static GenericVerboseErrorCode OK()
+ {
+ return ERR(0);
+ }
+ static GenericVerboseErrorCode NA() {
+ return ERR(EBUSY);
+ }
+
+ static QuiesceTimeInterval sec(double val) {
+ return std::chrono::duration_cast<QuiesceTimeInterval>(std::chrono::duration<double>(val));
+ }
+};
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, ManagerStartup) {
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+ ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(2) }));
+ ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
+ ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, SetCreation) {
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ // create a named set by resetting roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set0";
+ r.reset_roots({"root1"});
+ }));
+
+ // the set must have timed out immediately since we haven't configured
+ // the expiration timeout.
+ ASSERT_TRUE(last_request->response.sets.contains("set0"));
+ EXPECT_EQ(QS_TIMEDOUT, last_request->response.sets.at("set0").rstate.state);
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.contains(*last_request->request.set_id));
+
+ // create a named set by including roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.include_roots({"root1"});
+ }));
+
+ // the set must have timed out immediately since we haven't configured
+ // the expiration timeout.
+ ASSERT_TRUE(last_request->response.sets.contains("set1"));
+ EXPECT_EQ(QS_TIMEDOUT, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.contains(*last_request->request.set_id));
+
+ // create a new unique set by including roots
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.include_roots({"root2"});
+ }));
+
+ // the manager must have filled the set id with a unique value
+ ASSERT_TRUE(last_request->request.set_id.has_value());
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.contains(*last_request->request.set_id));
+
+ // create a new unique set by resetting roots
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.reset_roots({"root2"});
+ }));
+
+ // the manager must have filled the set id with a unique value
+ ASSERT_TRUE(last_request->request.set_id.has_value());
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.contains(*last_request->request.set_id));
+
+ // prevent modification of a named set when a new set is desired
+ EXPECT_EQ(ERR(ESTALE), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.if_version = 0;
+ r.roots.emplace("root3");
+ }));
+ EXPECT_EQ(1, last_request->response.sets.size());
+ EXPECT_TRUE(last_request->response.sets.contains("set1"));
+
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.if_version = 0;
+ r.roots.emplace("root4");
+ }));
+
+ EXPECT_EQ(1, last_request->response.sets.size());
+ EXPECT_TRUE(last_request->response.sets.contains("set2"));
+ EXPECT_EQ(QS_TIMEDOUT, last_request->response.sets.at("set2").rstate.state);
+
+ // let's try to create a new named but expect it to have non-zero version
+ EXPECT_EQ(ERR(ENOENT), run_request([](auto& r) {
+ r.set_id = "set3";
+ r.if_version = 1;
+ r.roots.emplace("root4");
+ }));
+
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ // let's try to create a new anonymous but expect it to have non-zero version
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.if_version = 2;
+ r.roots.emplace("root4");
+ }));
+
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ // an empty string is a valid set id.
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "";
+ r.roots.emplace("root1");
+ }));
+}
+
+template<class T>
+constexpr
+std::array<std::optional<T>, 2> nullopt_and_default() {
+ return {std::nullopt, T{}};
+}
+
+template<class F, class... V, size_t... S>
+ requires std::invocable<F, V...>
+void cartesian_apply(F func, std::array<V, S> const & ... array_args) {
+ // inspired by https://stackoverflow.com/a/31169617/5171225
+
+ // the iteration count is a product of all array sizes
+ const long long N = (S * ...);
+
+ for (long long n = 0; n < N; ++n) {
+ std::lldiv_t q { n, 0 };
+
+ // we use parameter pack expansion as part of the brace initializer
+ // to perform sequential calculation of the
+ auto apply_tuple = std::tuple<V const &...> {
+ (q = div(q.quot, array_args.size()), array_args.at(q.rem))
+ ...
+ };
+
+ if (!std::apply(func, apply_tuple)) {
+ return;
+ }
+ }
+}
+
+template<class... Args>
+void coutall(Args&&... args) {
+ int dummy[sizeof...(args)] = { (std::cout << args, std::cout << " ", 0)... };
+ std::cout << std::endl;
+}
+
+TEST_F(QuiesceDbTest, QuiesceRequestValidation)
+{
+
+ auto checkRequest = [](
+ decltype(std::declval<QuiesceDbRequest>().control.roots_op) const& op,
+ decltype(std::declval<QuiesceDbRequest>().set_id) const& set_id,
+ decltype(std::declval<QuiesceDbRequest>().if_version) const& if_version,
+ decltype(std::declval<QuiesceDbRequest>().timeout) const& timeout,
+ decltype(std::declval<QuiesceDbRequest>().expiration) const& expiration,
+ decltype(std::declval<QuiesceDbRequest>().await) const& await,
+ decltype(std::declval<QuiesceDbRequest>().roots) const& roots) {
+ QuiesceDbRequest r;
+ r.control.roots_op = op;
+ r.set_id = set_id;
+ r.if_version = if_version;
+ r.timeout = timeout;
+ r.expiration = expiration;
+ r.await = await;
+ r.roots = roots;
+
+ if (op >= QuiesceDbRequest::RootsOp::__INVALID) {
+ EXPECT_FALSE(r.is_valid())
+ << "op: " << r.op_string() << ", set_id: " << bool(set_id)
+ << ", if_version: " << bool(if_version)
+ << ", timeout: " << bool(timeout) << ", expiration: "
+ << bool(expiration) << ", await: "
+ << bool(await) << ", roots.size(): " << roots.size();
+ } else {
+ // if set id is provided, all goes
+ if (set_id) {
+ EXPECT_TRUE(r.is_valid())
+ << "op: " << r.op_string() << ", set_id: " << bool(set_id)
+ << ", if_version: " << bool(if_version)
+ << ", timeout: " << bool(timeout) << ", expiration: "
+ << bool(expiration) << ", await: "
+ << bool(await) << ", roots.size(): " << roots.size();
+ } else {
+ // without the set id we can create a new set
+ // or perform operations on all sets
+ if (roots.size() > 0) {
+ // if roots are provided, we assume creation
+ // all combinations are valid unless it's an exclude,
+ // which doesn't make sense without a set id
+ EXPECT_NE(r.is_exclude(), r.is_valid())
+ << "op: " << r.op_string() << ", set_id: " << bool(set_id)
+ << ", if_version: " << bool(if_version)
+ << ", timeout: " << bool(timeout) << ", expiration: "
+ << bool(expiration) << ", await: "
+ << bool(await) << ", roots.size(): " << roots.size();
+ } else {
+ // means it's a query or a "cancel all"
+ // no other parameters should be set
+ if (if_version || timeout || expiration || await) {
+ EXPECT_FALSE(r.is_valid())
+ << "op: " << r.op_string() << ", set_id: " << bool(set_id)
+ << ", if_version: " << bool(if_version)
+ << ", timeout: " << bool(timeout) << ", expiration: "
+ << bool(expiration) << ", await: "
+ << bool(await) << ", roots.size(): " << roots.size();
+ } else {
+ EXPECT_NE(r.is_release(), r.is_valid())
+ << "op: " << r.op_string() << ", set_id: " << bool(set_id)
+ << ", if_version: " << bool(if_version)
+ << ", timeout: " << bool(timeout) << ", expiration: "
+ << bool(expiration) << ", await: "
+ << bool(await) << ", roots.size(): " << roots.size();
+ }
+ }
+ }
+ }
+
+ return !testing::Test::HasFailure();
+ };
+
+ const auto ops = std::array { QuiesceDbRequest::RootsOp::INCLUDE_OR_QUERY, QuiesceDbRequest::RootsOp::EXCLUDE_OR_RELEASE, QuiesceDbRequest::RootsOp::RESET_OR_CANCEL, QuiesceDbRequest::RootsOp::__INVALID };
+ const auto strings = nullopt_and_default<std::string>();
+ const auto versions = nullopt_and_default<QuiesceSetVersion>();
+ const auto intervals = nullopt_and_default<QuiesceTimeInterval>();
+ const auto roots = std::array { QuiesceDbRequest::Roots {}, QuiesceDbRequest::Roots { "root1" } };
+
+ cartesian_apply(checkRequest,
+ ops, strings, versions, intervals, intervals, intervals, roots);
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, RootSanitization)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+ // a positive test with all kinds of expected fixes
+ ASSERT_EQ(OK(), run_request([this](auto& r) {
+ r.set_id = "set1";
+ r.include_roots({
+ "file:root1",
+ fmt::format("file://{}/root2", fs_id),
+ fmt::format("//{}/root3", fs_name),
+ fmt::format("inode://{}/4", fs_id),
+ fmt::format("inode://{}/5", fs_name),
+ "inode:18446744073709551615",
+ "inode:/18446744073709551614",
+ "inode:/18446744073709551613/",
+ "root6/.///./..////root6//"
+ });
+ }));
+
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root1"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root2"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root3"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("inode:4"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("inode:5"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("inode:18446744073709551615"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("inode:18446744073709551614"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("inode:18446744073709551613"));
+ EXPECT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root6/root6"));
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.include_roots({
+ "//10/root1"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "//badfsname/root1"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode://badfsname/1"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:-4"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:18446744073709551616" // too big to fit a uint64_t
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:1/2/3/4"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:abcd"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:123-456"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:"
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.set_id = "badset";
+ r.include_roots({
+ "inode:0" // zero is an invalid inodeno
+ });
+ }));
+ EXPECT_EQ(0, last_request->response.sets.size());
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, SetModification)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ // create a named set by including roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.include_roots({"root1"});
+ }));
+
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.contains("set1"));
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root1"));
+
+ // include more roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.include_roots({"root2", "root3"});
+ }));
+
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root2"));
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root3"));
+ ASSERT_EQ(db(mds_gid_t(1)).sets.at("set1").members.size(), 3);
+
+ auto latest_v = last_request->response.sets.at("set1").version;
+
+ // including present roots shouldn't bump the version
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.include_roots({ "root2", "root3" });
+ }));
+
+ ASSERT_EQ(latest_v, last_request->response.sets.at("set1").version);
+ ASSERT_EQ(latest_v, db(mds_gid_t(1)).sets.at("set1").version);
+
+ // resetting to the same roots shouldn't bump the version
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.reset_roots({ "root1","root2", "root3" });
+ }));
+
+ ASSERT_EQ(latest_v, last_request->response.sets.at("set1").version);
+ ASSERT_EQ(latest_v, db(mds_gid_t(1)).sets.at("set1").version);
+
+ // exclude roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.exclude_roots({ "root1", "root4" }); // root4 wasn't included, noop
+ }));
+
+ // the db doesn't delete set memebers, only marks them as excluded
+ ASSERT_EQ(db(mds_gid_t(1)).sets.at("set1").members.size(), 3);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root1").excluded);
+ ASSERT_FALSE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root2").excluded);
+ ASSERT_FALSE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root3").excluded);
+ ASSERT_FALSE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root4"));
+
+ // reset roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.reset_roots({"root4"});
+ }));
+
+ ASSERT_EQ(db(mds_gid_t(1)).sets.at("set1").members.size(), 4);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root1").excluded);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root2").excluded);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root3").excluded);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.contains("file:/root4"));
+ ASSERT_FALSE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root4").excluded);
+
+ // reset is an including op, should allow creating a set with it
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.reset_roots({"root5"});
+ }));
+
+ ASSERT_FALSE(db(mds_gid_t(1)).sets.at("set2").members.at("file:/root5").excluded);
+
+ // cancel with no set_id should cancel all active sets
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.control.roots_op = QuiesceDbRequest::RootsOp::RESET_OR_CANCEL;
+ }));
+
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set1").members.at("file:/root4").excluded);
+ ASSERT_TRUE(db(mds_gid_t(1)).sets.at("set2").members.at("file:/root5").excluded);
+
+ ASSERT_EQ(QuiesceState::QS_CANCELED, db(mds_gid_t(1)).sets.at("set1").rstate.state);
+ ASSERT_EQ(QuiesceState::QS_CANCELED, db(mds_gid_t(1)).sets.at("set2").rstate.state);
+
+ // reset can be used to resurrect a set from a terminal state
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.reset_roots({ "root5" });
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, db(mds_gid_t(1)).sets.at("set1").rstate.state);
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, Timeouts) {
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ // install the agent callback to reach the QUIESCED state
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(0.1);
+ r.expiration = sec(0.1);
+ r.include_roots({"root1"});
+ r.await = sec(1);
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+
+ std::this_thread::sleep_for(sec(0.15));
+
+ ASSERT_EQ(QuiesceState::QS_EXPIRED, db(mds_gid_t(1)).sets.at("set1").rstate.state);
+
+ // reset can be used to resurrect a set from a terminal state
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.reset_roots({ "root5" });
+ }));
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.timeout = sec(0.1);
+ r.expiration = sec(0.1);
+ r.include_roots({ "root1" });
+ r.await = sec(1);
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+
+ // prevent the db agent from acking the roots
+ managers.at(mds_gid_t(1))->reset_agent_callback(SILENT_AGENT_CB);
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.release_roots();
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_RELEASING, last_request->response.sets.at("set2").rstate.state);
+
+ std::this_thread::sleep_for(sec(0.15));
+
+ ASSERT_EQ(QuiesceState::QS_EXPIRED, db(mds_gid_t(1)).sets.at("set2").rstate.state);
+
+ // reset can be used to resurrect a set from a terminal state
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.reset_roots({ "root1" });
+ }));
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, last_request->response.sets.at("set2").rstate.state);
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set3";
+ r.timeout = sec(0.1);
+ r.include_roots({ "root1" });
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, db(mds_gid_t(1)).sets.at("set3").rstate.state);
+
+ std::this_thread::sleep_for(sec(0.15));
+
+ ASSERT_EQ(QuiesceState::QS_TIMEDOUT, db(mds_gid_t(1)).sets.at("set3").rstate.state); // reset can be used to resurrect a set from a terminal state
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set3";
+ r.reset_roots({ "root1" });
+ }));
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, last_request->response.sets.at("set3").rstate.state);
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, Failures) {
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(0.1);
+ r.expiration = sec(0.1);
+ r.include_roots({"root1"});
+ }));
+
+ EXPECT_EQ(QuiesceState::QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
+
+ {
+ // wait for the agent to ack root1 as failed
+ auto did_ack = add_ack_hook([](auto rank, auto const& ack) {
+ return ack.roots.contains("file:/root1") && ack.roots.at("file:/root1").state == QS_FAILED;
+ });
+
+ // allow acks
+ managers.at(mds_gid_t(1))->reset_agent_callback(FAILING_AGENT_CB);
+
+ EXPECT_EQ(std::future_status::ready, did_ack.wait_for(std::chrono::milliseconds(100)));
+ }
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ }));
+
+ EXPECT_EQ(QuiesceState::QS_FAILED, db(mds_gid_t(1)).sets.at("set1").rstate.state);
+ EXPECT_EQ(QuiesceState::QS_FAILED, last_request->response.sets.at("set1").rstate.state);
+
+ ASSERT_EQ(ERR(EBADF), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.timeout = sec(0.1);
+ r.expiration = sec(0.1);
+ r.include_roots({ "root1" });
+ r.await = sec(1);
+ }));
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, InterruptedQuiesceAwait)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ auto then = QuiesceClock::now();
+
+ // await timeout should result in a EINPROGRESS given that the set
+ // isn't modified in the meantime
+ ASSERT_EQ(ERR(EINPROGRESS), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(100);
+ r.roots.emplace("root1");
+ r.await = sec(0.1);
+ }));
+
+ ASSERT_EQ(QuiesceState::QS_QUIESCING, db(mds_gid_t(1)).sets.at("set1").rstate.state);
+ ASSERT_GE(QuiesceClock::now() - then, *last_request->request.await);
+
+ // start an asyncrhonous await request
+ auto & await = start_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ });
+
+ // flush the pending requests by running a simple query
+ EXPECT_EQ(OK(), run_request([](auto& r) { r.query("set1"); }));
+
+ // still running
+ EXPECT_EQ(NA(), await.check_result());
+
+ // modify the set but don't change roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.expiration = sec(100);
+ r.timeout = sec(10);
+ r.roots.emplace("root1");
+ }));
+
+ // should still be running
+ EXPECT_EQ(NA(), await.check_result());
+
+ // add another set
+ then = QuiesceClock::now();
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.timeout = sec(0.1);
+ r.roots.emplace("root1");
+ }));
+
+ // should still be running
+ EXPECT_EQ(NA(), await.check_result());
+
+ // modify roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.roots.emplace("root2");
+ }));
+
+ EXPECT_EQ(ERR(EINTR), await.wait_result());
+
+ // start async await on set2
+ auto & await2 = start_request([](auto& r) {
+ r.set_id = "set2";
+ r.await = sec(100);
+ });
+
+ // should be running
+ EXPECT_EQ(NA(), await2.check_result());
+
+ // and another one, this time wait for it to finish
+ ASSERT_EQ(ERR(ETIMEDOUT), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.await = sec(100);
+ }));
+
+ // the other await on the same set must have finished with the same result
+ EXPECT_EQ(ERR(ETIMEDOUT), await2.wait_result());
+
+ // shouldn't have taken much longer than the timeout configured on the set
+ auto epsilon = sec(0.01);
+ ASSERT_LE(QuiesceClock::now() - then - epsilon, last_request->response.sets.at("set2").timeout);
+
+ // let's cancel set 1 while awaiting it a few times
+
+ // start async await on set1
+ auto& await3 = start_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ });
+
+ auto& await4 = start_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ });
+
+ // should be running
+ EXPECT_EQ(NA(), await3.check_result());
+ EXPECT_EQ(NA(), await4.check_result());
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.reset_roots({});
+ }));
+
+ EXPECT_EQ(ERR(ECANCELED), await3.wait_result());
+ EXPECT_EQ(ERR(ECANCELED), await4.wait_result());
+
+ // awaiting a set in a terminal state should immediately
+ // complete with the corresponding error
+ ASSERT_EQ(ERR(ECANCELED), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ }));
+ ASSERT_EQ(ERR(ETIMEDOUT), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.await = sec(100);
+ }));
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, RepeatedQuiesceAwait) {
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ // let us reach quiescing
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ // pick an expiration timeout
+ auto expiration = sec(0.1);
+
+ // create a set and let it quiesce
+ ASSERT_EQ(OK(), run_request([=](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(0.1);
+ r.expiration = expiration;
+ r.roots.emplace("root1");
+ r.await = QuiesceTimeInterval::max();
+ }));
+
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+
+ // sleep for half the expiration interval multiple times
+ // each time sending another await request
+ // the expectation is that every time we call await
+ // the expiration timer is reset, hence we should be able to
+ // sustain the loop for arbitrarily long
+ for (int i = 0; i < 10; i++) {
+ std::this_thread::sleep_for(expiration/2);
+ ASSERT_EQ(OK(), run_request([i](auto& r) {
+ r.set_id = "set1";
+ if (i % 2) {
+ // this shouldn't affect anything
+ r.reset_roots({"root1"});
+ }
+ r.await = sec(0);
+ }));
+ }
+
+ // Prevent the set from reaching the RELEASED state
+ managers.at(mds_gid_t(1))->reset_agent_callback(SILENT_AGENT_CB);
+
+ // start releasing and observe that the timer isn't reset in this case,
+ // so after a few EINPROGRESS we eventually reach timeout due to expiration
+ for (int i = 0; i < 2; i++) {
+ ASSERT_EQ(ERR(EINPROGRESS), run_request([=](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ r.await = (expiration*2)/5;
+ }));
+ }
+
+ // NB: the ETIMEDOUT is the await result, while the set itself should be EXPIRED
+ EXPECT_EQ(ERR(ETIMEDOUT), run_request([=](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ r.await = expiration;
+ }));
+
+ EXPECT_EQ(QS_EXPIRED, last_request->response.sets.at("set1").rstate.state);
+
+ EXPECT_EQ(ERR(ETIMEDOUT), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(0.1);
+ }));
+
+ EXPECT_EQ(ERR(EPERM), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ }));
+
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, ReleaseAwait)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ // create some sets
+ for (auto&& set_id : { "set1", "set2", "set3" }) {
+ ASSERT_EQ(OK(), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.timeout = sec(100);
+ r.expiration = sec(100);
+ r.include_roots({ "root1", "root2"});
+ })) << "creating " << set_id;
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at(set_id).rstate.state);
+ }
+
+ // we shouldn't be able to release-await a quiescing set
+ for (auto&& set_id : { "set1", "set2" }) {
+ ASSERT_EQ(ERR(EPERM), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.release_roots();
+ r.await = sec(1);
+ })) << "bad release-await " << set_id;
+ }
+
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ for (auto&& set_id : { "set1", "set2", "set3" }) {
+ ASSERT_EQ(OK(), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.await = sec(0.1);
+ })) << "quiesce-await " << set_id;
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at(set_id).rstate.state);
+ }
+
+ managers.at(mds_gid_t(1))->reset_agent_callback(SILENT_AGENT_CB);
+
+ auto & release_await1 = start_request([](auto &r) {
+ r.set_id = "set1";
+ r.release_roots();
+ r.await = sec(100);
+ });
+
+ auto& release_await2 = start_request([](auto& r) {
+ r.set_id = "set2";
+ r.release_roots();
+ r.await = sec(100);
+ });
+
+ EXPECT_EQ(OK(), run_request([](auto &r){}));
+ // releasing should be in progress
+ EXPECT_EQ(NA(), release_await1.check_result());
+ EXPECT_EQ(NA(), release_await2.check_result());
+ EXPECT_EQ(QS_RELEASING, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_RELEASING, last_request->response.sets.at("set2").rstate.state);
+ auto releasing_v1 = last_request->response.sets.at("set1").version;
+
+ // we can request release again without any version bump
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ }));
+
+ EXPECT_EQ(releasing_v1, last_request->response.sets.at("set1").version );
+
+ // we can release-await with a short await timeout
+ EXPECT_EQ(ERR(EINPROGRESS), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ r.await = sec(0.1);
+ }));
+
+ // we can't quiesce-await a set that's releasing
+ EXPECT_EQ(ERR(EPERM), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(0.1);
+ }));
+
+ // shouldn't be able to add roots to a releasing set
+ EXPECT_EQ(ERR(EPERM), run_request([](auto &r) {
+ r.set_id = "set1";
+ r.include_roots({"root3"});
+ }));
+
+ // still on the same set version
+ EXPECT_EQ(releasing_v1, last_request->response.sets.at("set1").version );
+
+ // it should be allowed to exclude roots from a releasing set
+ EXPECT_EQ(OK(), run_request([](auto &r) {
+ r.set_id = "set2";
+ r.exclude_roots({"root2"});
+ }));
+
+ // the corresponding await must have been interrupted due to the change to the members
+ EXPECT_EQ(ERR(EINTR), release_await2.wait_result_for(0.1));
+
+ // still releasing
+ EXPECT_EQ(QS_RELEASING, last_request->response.sets.at("set2").rstate.state);
+
+ // await again
+ auto& release_await22 = start_request([](auto& r) {
+ r.set_id = "set2";
+ r.release_roots();
+ r.await = sec(100);
+ });
+
+ EXPECT_EQ(NA(), release_await22.check_result());
+
+ // excluding the last root should cancel the set
+ EXPECT_EQ(OK(), run_request([](auto &r) {
+ r.set_id = "set2";
+ r.exclude_roots({"root1"});
+ }));
+
+ EXPECT_EQ(ERR(ECANCELED), release_await22.wait_result_for(0.1));
+
+ std::atomic<QuiesceState> root1_state(QS__INVALID);
+ managers.at(mds_gid_t(1))->reset_agent_callback([&](auto &map){
+ if (map.roots.contains("file:/root1")) {
+ root1_state = map.roots.at("file:/root1").state;
+ root1_state.notify_all();
+ }
+ return false;
+ });
+
+ // validate that root1 is still reported to the agents as QUIESCING
+ // even though we are already releasing set1
+ // this is because there is another set with this root which is not releasing
+ EXPECT_TRUE(timed_run(sec(0.1), [&](){root1_state.wait(QS__INVALID);}));
+ EXPECT_EQ(QS_QUIESCING, root1_state.load());
+
+ // allow acks
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+ EXPECT_EQ(OK(), release_await1.wait_result_for(0.1));
+
+ EXPECT_EQ(QS_RELEASED, release_await1.response.sets.at("set1").rstate.state);
+
+ // it should be OK to request release or release-await on a RELEASED set
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ }));
+
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ r.await = sec(0.1);
+ }));
+
+ // it's invalid to send a release without a set id
+ EXPECT_EQ(ERR(EINVAL), run_request([](auto& r) {
+ r.release_roots();
+ }));
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, LeaderShutdown)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1) }));
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.include_roots({ "root1" });
+ }));
+
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.include_roots({ "root2", "root3"});
+ }));
+
+ std::queue<TestRequestContext*> outstanding_awaits;
+ std::queue<TestRequestContext*> pending_requests;
+
+ // let's have several awaits pending
+ for(auto&& set_id: {"set1", "set2"}) {
+ for (int i=0; i<2; i++) {
+ outstanding_awaits.emplace(&start_request([set_id](auto&r) {
+ r.set_id = set_id;
+ r.await = sec(100);
+ }));
+ EXPECT_EQ(NA(), outstanding_awaits.front()->check_result());
+ }
+ }
+
+ // flush the pending requests by running a simple query
+ EXPECT_EQ(OK(), run_request([](auto& r) { r.query("set1"); }));
+
+ ASSERT_EQ(outstanding_awaits.size(), managers.at(mds_gid_t(1))->internal_awaits().size());
+
+ std::mutex agent_mutex;
+ std::condition_variable agent_cond;
+ bool callback_reached = false;
+
+ // block the db thread with a malicious agent callback
+ managers.at(mds_gid_t(1))->reset_agent_callback([&](auto& map) {
+ std::unique_lock l(agent_mutex);
+ callback_reached = true;
+ agent_cond.notify_all();
+ l.unlock();
+ std::this_thread::sleep_for(sec(0.1));
+ return false;
+ });
+
+ {
+ std::unique_lock l(agent_mutex);
+ agent_cond.wait(l, [&]{return callback_reached;});
+ }
+
+ // now that the db thread is sleeping we can pile up some pending requests
+ pending_requests.emplace(&start_request([](auto& r) {
+ r.set_id = "set3";
+ r.include_roots({"root4"});
+ }));
+ EXPECT_EQ(NA(), pending_requests.front()->check_result());
+
+ pending_requests.emplace(&start_request([](auto& r) {
+ r.set_id = "set4";
+ r.include_roots({"root5"});
+ }));
+ EXPECT_EQ(NA(), pending_requests.front()->check_result());
+
+ pending_requests.emplace(&start_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ }));
+ EXPECT_EQ(NA(), pending_requests.front()->check_result());
+
+ ASSERT_EQ(managers.at(mds_gid_t(1))->internal_pending_requests().size(), pending_requests.size());
+
+ // reset the membership of the manager
+ // this will block until the db thread exits
+ managers.at(mds_gid_t(1))->update_membership({});
+
+ // as of now all requests must have finished
+ while(!outstanding_awaits.empty()) {
+ auto& r = *outstanding_awaits.front();
+ EXPECT_EQ(ERR(EINPROGRESS), r.check_result());
+ outstanding_awaits.pop();
+ }
+
+ while (!pending_requests.empty()) {
+ auto& r = *pending_requests.front();
+ EXPECT_EQ(ERR(EPERM), r.check_result());
+ pending_requests.pop();
+ }
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, MultiRankQuiesce)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
+
+ std::vector<TestRequestContext*> awaits;
+
+ // create and await several sets
+ // we deliberately avoid setting the expiration timeout in this test
+ for (auto&& set_id: {"set1", "set2", "set3"}) {
+ awaits.emplace_back(&start_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.timeout = sec(100);
+ r.include_roots({"root1"});
+ r.await = sec(100);
+ }));
+ }
+
+ // flush the pending requests by running a simple query
+ ASSERT_EQ(OK(), run_request([](auto&r){r.query("set1");}));
+
+ ASSERT_EQ(awaits.size(), managers.at(mds_gid_t(1))->internal_awaits().size());
+
+ for (auto&& await: awaits) {
+ EXPECT_EQ(NA(), await->check_result()) << await->request.set_id.value();
+ }
+
+ {
+ std::unordered_set<QuiesceInterface::PeerId> peers_quiesced;
+ auto did_ack = add_ack_hook([&](auto p, auto const &m) {
+ if (m.roots.contains("file:/root1") && (m.roots.at("file:/root1").state == QS_QUIESCED)) {
+ peers_quiesced.insert(p);
+ }
+ return peers_quiesced.size() >= 2;
+ });
+
+ // let two of the three peers ack quiescing of the root
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+ managers.at(mds_gid_t(2))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ ASSERT_EQ(std::future_status::ready, did_ack.wait_for(std::chrono::milliseconds(100)));
+ }
+
+ // kick the db queue with a simple query
+ ASSERT_EQ(OK(), run_request([](auto& r) { r.query("set1"); }));
+
+ // should still be waiting for the last agent
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
+ for (auto&& await: awaits) {
+ EXPECT_EQ(NA(), await->check_result()) << await->request.set_id.value();
+ }
+
+ {
+ // wait for the late peer to ack root1 as released
+ auto did_ack = add_ack_hook([](auto gid, auto const& ack) {
+ return gid == mds_gid_t(3) && ack.roots.contains("file:/root1") && ack.roots.at("file:/root1").state == QS_QUIESCED;
+ });
+
+ // allow acks
+ managers.at(mds_gid_t(3))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ EXPECT_EQ(std::future_status::ready, did_ack.wait_for(std::chrono::milliseconds(100)));
+ }
+
+ // kick the db queue with a simple query
+ ASSERT_EQ(OK(), run_request([](auto& r) {}));
+
+ // first three sets must be expired because they had 0 expiration
+ EXPECT_EQ(QS_EXPIRED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_EXPIRED, last_request->response.sets.at("set2").rstate.state);
+ EXPECT_EQ(QS_EXPIRED, last_request->response.sets.at("set3").rstate.state);
+
+ // pending quiesce requests must have all completed successfully
+ // even though some of the sets got expired immediately
+ for (auto&& await : awaits) {
+ EXPECT_EQ(OK(), await->check_result()) << await->request.set_id.value();
+ }
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, MultiRankRelease)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+ managers.at(mds_gid_t(2))->reset_agent_callback(QUIESCING_AGENT_CB);
+ managers.at(mds_gid_t(3))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ // quiesce two sets
+ for (auto&& set_id : { "set1", "set2" }) {
+ ASSERT_EQ(OK(), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.await = sec(100);
+ r.include_roots({ "root1" });
+ }));
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at(set_id).rstate.state);
+ }
+
+ auto quiesced_v = db(mds_gid_t(1)).sets.at("set1").version;
+
+ // prevent one of the acks
+ managers.at(mds_gid_t(2))->reset_agent_callback(SILENT_AGENT_CB);
+
+ // release roots
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ }));
+
+ EXPECT_EQ(QS_RELEASING, last_request->response.sets.at("set1").rstate.state);
+ auto releasing_v = last_request->response.sets.at("set1").version;
+ ASSERT_NE(quiesced_v, releasing_v);
+
+ auto &async_release = start_request([](auto& r) {
+ r.set_id = "set2";
+ r.await = sec(100);
+ r.release_roots();
+ });
+
+ EXPECT_EQ(NA(), async_release.check_result());
+
+ // shouldn't hurt to run release twice for set 1
+ ASSERT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.release_roots();
+ }));
+
+ EXPECT_EQ(releasing_v, last_request->response.sets.at("set1").version);
+
+ // we shouldn't be able to quiesce-await a releasing set
+ ASSERT_EQ(ERR(EPERM), run_request_for(1, [](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(100);
+ }));
+
+ auto latest_v = db(mds_gid_t(1)).set_version;
+
+ // wait for all peers to sync version
+ {
+ std::unique_lock l(comms_mutex);
+ auto result = comms_cond.wait_for(l, std::chrono::milliseconds(100), [&] {
+ auto min_v = std::min({ db(mds_gid_t(1)).set_version, db(mds_gid_t(2)).set_version, db(mds_gid_t(3)).set_version });
+ return min_v >= latest_v;
+ });
+ ASSERT_TRUE(result);
+ }
+
+ // all replicas must agree
+ for (auto&& gid : {mds_gid_t(1), mds_gid_t(2), mds_gid_t(3)}) {
+ EXPECT_EQ(QS_RELEASING, db(gid).sets.at("set1").rstate.state) << "db of gid " << gid;
+ EXPECT_EQ(QS_RELEASING, db(gid).sets.at("set2").rstate.state) << "db of gid " << gid;
+ }
+
+ // wait for the late peer to ack back
+ auto did_ack = add_ack_hook([](auto gid, auto const &ack){
+ return gid == mds_gid_t(2);
+ });
+
+ // allow acks
+ managers.at(mds_gid_t(2))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ EXPECT_EQ(std::future_status::ready, did_ack.wait_for(std::chrono::milliseconds(100)));
+
+ ASSERT_EQ(OK(), run_request([](auto& r) { }));
+
+ EXPECT_EQ(QS_RELEASED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_RELEASED, last_request->response.sets.at("set2").rstate.state);
+ EXPECT_EQ(OK(), async_release.check_result());
+
+ // validate that we can release-await RELEASED sets
+ // but can't quiesce-await the same
+ for (auto&& set_id : { "set1", "set2" }) {
+ ASSERT_EQ(OK(), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.await = sec(100);
+ r.release_roots();
+ }));
+ ASSERT_EQ(ERR(EPERM), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.await = sec(100);
+ }));
+ }
+}
+
+/* ================================================================ */
+TEST_F(QuiesceDbTest, MultiRankRecovery)
+{
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
+ managers.at(mds_gid_t(1))->reset_agent_callback(QUIESCING_AGENT_CB);
+ managers.at(mds_gid_t(2))->reset_agent_callback(QUIESCING_AGENT_CB);
+ managers.at(mds_gid_t(3))->reset_agent_callback(QUIESCING_AGENT_CB);
+
+ // quiesce two sets
+ for (auto&& set_id : { "set1", "set2" }) {
+ ASSERT_EQ(OK(), run_request([set_id](auto& r) {
+ r.set_id = set_id;
+ r.timeout = sec(60);
+ r.expiration = sec(60);
+ r.await = sec(100);
+ r.include_roots({ "root1" });
+ }));
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at(set_id).rstate.state);
+ }
+
+
+ auto did_ack41 = add_ack_hook([](auto gid, auto const &ack){
+ return gid == mds_gid_t(4) && ack.db_version.set_version > 0;
+ });
+
+ // reconfigure the cluster so that a new member is assigned leader
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(4), mds_gid_t(2), mds_gid_t(3) }));
+
+ EXPECT_EQ(std::future_status::ready, did_ack41.wait_for(std::chrono::milliseconds(2000)));
+
+ // we expect the db to be populated since the new leader must have discovered newer versions
+ // we expect the sets to become quiescing since there's at least one member that's not acking (the new one)
+ EXPECT_EQ(OK(), run_request([](auto& r) {}));
+ ASSERT_EQ(2, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set2").rstate.state);
+
+ // reconfigure the cluster back to quiescing members
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
+
+ // we expect the db to be populated since the new leader must have discovered newer versions
+ // we expect the sets to become quiescing since there's at least one member that's not acking (the new one)
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set1";
+ r.await = sec(1);
+ }));
+ ASSERT_EQ(1, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(OK(), run_request([](auto& r) {
+ r.set_id = "set2";
+ r.await = sec(1);
+ }));
+ ASSERT_EQ(1, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+
+ // lose a non-leader node
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
+
+ EXPECT_EQ(OK(), run_request([](auto& r) {}));
+ ASSERT_EQ(2, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+
+ auto did_ack3 = add_ack_hook([](auto gid, auto const &ack){
+ return gid == mds_gid_t(3) && ack.db_version.set_version > 0;
+ });
+
+ // add back a quiescing peer
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3)}));
+
+ EXPECT_EQ(OK(), run_request([](auto& r) {}));
+ ASSERT_EQ(2, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+
+ EXPECT_EQ(std::future_status::ready, did_ack3.wait_for(std::chrono::milliseconds(2000)));
+
+ EXPECT_EQ(OK(), run_request([](auto& r) {}));
+ ASSERT_EQ(2, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+
+ auto did_ack42 = add_ack_hook([](auto gid, auto const &ack){
+ return gid == mds_gid_t(4) && ack.db_version.set_version > 0;
+ });
+
+ // add a non-quiescing peer
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3), mds_gid_t(4) }));
+
+ EXPECT_EQ(std::future_status::ready, did_ack42.wait_for(std::chrono::milliseconds(2000)));
+ EXPECT_EQ(OK(), run_request([](auto& r) {}));
+ ASSERT_EQ(2, last_request->response.sets.size());
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set1").rstate.state);
+ EXPECT_EQ(QS_QUIESCING, last_request->response.sets.at("set2").rstate.state);
+}
\ No newline at end of file