#include "QuiesceDbManager.h"
#include "QuiesceAgent.h"
+
+#include "messages/MMDSQuiesceDbListing.h"
+#include "messages/MMDSQuiesceDbAck.h"
+
#include <boost/url.hpp>
#include <chrono>
#include <ranges>
#include <algorithm>
+#include <queue>
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds_quiesce
}
});
- dout(20) << "Submitting a quiesce db request " << (set_id ? "for" : "without a") << " setid " << set_id.value_or("") << ", operation: " << ctx->request.op_string() << dendl;
+ dout(20) << "Submitting " << ctx->request << dendl;
int rc = quiesce_db_manager->submit_request(ctx);
if (rc != 0) {
bufferlist bl;
+ // on_finish was moved there, so should only call via the ctx.
+ ctx->on_finish(rc, "Error submitting the command to the local db manager", bl);
delete ctx;
- on_finish(rc, "Error submitting the command to the local db manager", bl);
}
}
+static void rebind_agent_callback(std::shared_ptr<QuiesceAgent> agt, std::shared_ptr<QuiesceDbManager> mgr) {
+ if (!agt || !mgr) {
+ return;
+ }
+ std::weak_ptr<QuiesceAgent> weak_agent = agt;
+ mgr->reset_agent_callback([weak_agent](QuiesceMap& update) {
+ if (auto agent = weak_agent.lock()) {
+ return agent->db_update(update);
+ } else {
+ return false;
+ }
+ });
+}
+
void MDSRank::quiesce_cluster_update() {
+ // the quiesce leader is the lowest rank with the highest state up to ACTIVE
+ auto less_leader = [](MDSMap::mds_info_t const* l, MDSMap::mds_info_t const* r) {
+ ceph_assert(l->rank != MDS_RANK_NONE);
+ ceph_assert(r->rank != MDS_RANK_NONE);
+ ceph_assert(l->state <= MDSMap::STATE_ACTIVE);
+ ceph_assert(r->state <= MDSMap::STATE_ACTIVE);
+ if (l->rank == r->rank) {
+ return l->state < r->state;
+ } else {
+ return l->rank > r->rank;
+ }
+ };
+
+ std::priority_queue<MDSMap::mds_info_t const*, std::vector<MDSMap::mds_info_t const*>, decltype(less_leader)> member_info(less_leader);
QuiesceClusterMembership membership;
- mds_rank_t leader = 0; // MAYBE LATER: initialize this from the map
+ QuiesceInterface::PeerId me = mds_gid_t(monc->get_global_id());
+
+ for (auto&& [gid, info] : mdsmap->get_mds_info()) {
+ // if it has a rank and state <= ACTIVE, it's good enough
+ if (info.rank != MDS_RANK_NONE && info.state <= MDSMap::STATE_ACTIVE) {
+ member_info.push(&info);
+ membership.members.insert(info.global_id);
+ }
+ }
+
+ QuiesceInterface::PeerId leader =
+ member_info.empty()
+ ? QuiesceClusterMembership::INVALID_MEMBER
+ : member_info.top()->global_id;
membership.epoch = mdsmap->get_epoch();
membership.leader = leader;
- membership.me = whoami;
- membership.fs_id = mdsmap->get_info(whoami).join_fscid;
+ membership.me = me;
membership.fs_name = mdsmap->get_fs_name();
- mdsmap->get_mds_set(membership.members);
- dout(5) << "epoch:" << membership.epoch << " leader:" << membership.leader << " members:" << membership.members << dendl;
+ dout(5) << "epoch:" << membership.epoch << " me:" << me << " leader:" << leader << " members:" << membership.members
+ << (mdsmap->is_degraded() ? " (degraded)" : "") << dendl;
- membership.send_ack = [=,this](QuiesceMap&& ack) {
- if (whoami == leader) {
- // loopback
- quiesce_db_manager->submit_ack_from(whoami, std::move(ack));
- return 0;
- } else {
- // TODO: implement messaging
- return -ENOTSUP;
+ if (leader != QuiesceClusterMembership::INVALID_MEMBER) {
+ membership.send_ack = [=, this](QuiesceMap&& ack) {
+ if (me == leader) {
+ // loopback
+ quiesce_db_manager->submit_ack_from(me, std::move(ack));
+ return 0;
+ } else {
+ std::lock_guard guard(mds_lock);
+
+ if (mdsmap->get_state_gid(leader) == MDSMap::STATE_NULL) {
+ dout(5) << "couldn't find the leader " << leader << " in the map" << dendl;
+ return -ENOENT;
+ }
+ auto addrs = mdsmap->get_info_gid(leader).addrs;
+
+ auto ack_msg = make_message<MMDSQuiesceDbAck>(me);
+ dout(10) << "sending ack " << ack << " to the leader " << leader << dendl;
+ ack_msg->diff_map = std::move(ack);
+ return send_message_mds(ack_msg, addrs);
+ }
+ };
+
+ membership.send_listing_to = [=, this](QuiesceInterface::PeerId to, QuiesceDbListing&& db) {
+ std::lock_guard guard(mds_lock);
+ if (mdsmap->get_state_gid(to) == MDSMap::STATE_NULL) {
+ dout(5) << "couldn't find the peer " << to << " in the map" << dendl;
+ return -ENOENT;
+ }
+ auto addrs = mdsmap->get_info_gid(to).addrs;
+ auto listing_msg = make_message<MMDSQuiesceDbListing>(me);
+ dout(10) << "sending listing " << db << " to the peer " << to << dendl;
+ listing_msg->db_listing = std::move(db);
+ return send_message_mds(listing_msg, addrs);
+ };
+ }
+
+ QuiesceDbManager::RequestContext* inject_request = nullptr;
+
+ bool degraded = mdsmap->is_degraded();
+
+ if (degraded && membership.is_leader()) {
+ dout(5) << "WARNING: injecting a cancel all request"
+ << " members: " << membership.members
+ << " in: " << mdsmap->get_num_in_mds()
+ << " up: " << mdsmap->get_num_up_mds()
+ << " sr: " << mdsmap->get_num_standby_replay_mds()
+ << dendl;
+
+ struct CancelAll: public QuiesceDbManager::RequestContext {
+ mds_rank_t whoami;
+ CancelAll(mds_rank_t whoami) : whoami(whoami) {
+ request.cancel_roots();
+ }
+ void finish(int rc) override {
+ dout(rc == 0 ? 15 : 3) << "injected cancel all completed with rc: " << rc << dendl;
+ }
+ };
+
+ inject_request = new CancelAll(whoami);
+ }
+
+ if (!is_active()) {
+ quiesce_db_manager->reset_agent_callback([whoami = whoami, degraded, is_sr = is_standby_replay()](QuiesceMap& quiesce_map) {
+ for (auto it = quiesce_map.roots.begin(); it != quiesce_map.roots.end();) {
+ switch (it->second.state) {
+ case QS_QUIESCING:
+ if (degraded) {
+ it->second.state = QS_FAILED;
+ dout(3) << "DEGRADED RESPONDER: reporting '" << it->first << "' as " << it->second.state << dendl;
+ ++it;
+ } else if (is_sr) {
+ it->second.state = QS_QUIESCED;
+ dout(15) << "STANDBY REPLAY RESPONDER: reporting '" << it->first << "' as " << it->second.state << dendl;
+ ++it;
+ } else {
+ // just ack.
+ dout(20) << "INTACTIVE RESPONDER: reporting '" << it->first << "' as " << it->second.state << dendl;
+ it = quiesce_map.roots.erase(it);
+ }
+ break;
+ default:
+ it = quiesce_map.roots.erase(it);
+ break;
+ }
+ }
+ return true;
+ });
+
+ if (quiesce_agent) {
+ // reset the agent if it's present
+ // because it won't receive any more callbacks
+ quiesce_agent->reset_async();
}
- };
+ } else {
+ rebind_agent_callback(quiesce_agent, quiesce_db_manager);
+ }
- membership.send_listing_to = [=](mds_rank_t to, QuiesceDbListing&& db) {
- // TODO: implement messaging
- return -ENOTSUP;
- };
+ quiesce_db_manager->update_membership(membership, inject_request);
+}
- quiesce_db_manager->update_membership(membership);
+bool MDSRank::quiesce_dispatch(const cref_t<Message> &m) {
+ switch(m->get_type()) {
+ case MSG_MDS_QUIESCE_DB_LISTING:
+ {
+ const auto& req = ref_cast<MMDSQuiesceDbListing>(m);
+ if (quiesce_db_manager) {
+ dout(10) << "got " << req->db_listing << " from peer " << req->gid << dendl;
+ int result = quiesce_db_manager->submit_listing_from(req->gid, std::move(req->db_listing));
+ if (result != 0) {
+ dout(3) << "error (" << result << ") submitting " << req->db_listing << " from peer " << req->gid << dendl;
+ }
+ } else {
+ dout(5) << "no db manager to process " << req->db_listing << dendl;
+ }
+ return true;
+ }
+ case MSG_MDS_QUIESCE_DB_ACK:
+ {
+ const auto& req = ref_cast<MMDSQuiesceDbAck>(m);
+ if (quiesce_db_manager) {
+ dout(10) << "got ack " << req->diff_map << " from peer " << req->gid << dendl;
+ int result = quiesce_db_manager->submit_ack_from(req->gid, std::move(req->diff_map));
+ if (result != 0) {
+ dout(3) << "error (" << result << ") submitting an ack from peer " << req->gid << dendl;
+ }
+ } else {
+ dout(5) << "no db manager to process an ack: " << req->diff_map << dendl;
+ }
+ return true;
+ }
+ default:
+ return false;
+ }
}
void MDSRank::quiesce_agent_setup() {
return std::nullopt;
}
}
- std::optional<double> dummy_quiesce_after;
+ std::optional<double> debug_quiesce_after;
if (auto pit = uri->params().find("q"); pit != uri->params().end()) {
try {
- dummy_quiesce_after = (*pit).has_value ? std::stod((*pit).value) : 1 /*second*/;
+ debug_quiesce_after = (*pit).has_value ? std::stod((*pit).value) : 1 /*second*/;
} catch (...) {
dout(5) << "error parsing the time for debug quiesce for query: " << uri->query() << dendl;
c->complete(-EINVAL);
return std::nullopt;
}
}
+ std::optional<double> debug_fail_after;
+ if (auto pit = uri->params().find("f"); pit != uri->params().end()) {
+ try {
+ debug_fail_after = (*pit).has_value ? std::stod((*pit).value) : 1 /*second*/;
+ } catch (...) {
+ dout(5) << "error parsing the time for debug fail for query: " << uri->query() << dendl;
+ c->complete(-EINVAL);
+ return std::nullopt;
+ }
+ }
+ std::optional<mds_rank_t> debug_rank;
+ if (auto pit = uri->params().find("r"); pit != uri->params().end()) {
+ try {
+ if ((*pit).has_value) {
+ debug_rank = (mds_rank_t)std::stoul((*pit).value);
+ }
+ } catch (...) {
+ dout(5) << "error parsing the rank for debug pin for query: " << uri->query() << dendl;
+ c->complete(-EINVAL);
+ return std::nullopt;
+ }
+ }
+
+ if (debug_rank && (debug_rank >= mdsmap->get_max_mds())) {
+ dout(5) << "invalid rank: " << uri->query() << dendl;
+ c->complete(-EINVAL);
+ return std::nullopt;
+ }
auto path = uri->path();
dout(20) << "got request to quiesce '" << path << "'" << dendl;
std::lock_guard l(mds_lock);
- if (!dummy_quiesce_after) {
+ if (!debug_quiesce_after && !debug_fail_after && !debug_rank) {
// the real deal!
+ if (mdsmap->is_degraded()) {
+ dout(3) << "DEGRADED: refusing to quiesce" << dendl;
+ c->complete(EPERM);
+ return std::nullopt;
+ }
auto qc = new MDCache::C_MDS_QuiescePath(mdcache, c);
auto mdr = mdcache->quiesce_path(filepath(path), qc, nullptr, quiesce_delay_ms);
return mdr ? mdr->reqid : std::optional<RequestHandle>();
} else {
- /* dummy quiesce */
+ /* dummy quiesce/fail */
// always create a new request id
auto req_id = metareqid_t(entity_name_t::MDS(whoami), issue_tid());
auto [it, inserted] = quiesce_requests->try_emplace(path, req_id, c);
// since we weren't inserted, we must have successfully quiesced
c->complete(0);
}
+ } else if (debug_rank && (debug_rank != whoami)) {
+ // the root was pinned to a different rank
+ // we should acknowledge the quiesce regardless of the other flags
+ it->second.second->complete(0);
+ it->second.second = nullptr;
} else {
- // do quiesce if needed
+ // do quiesce or fail
+
+ bool do_fail = false;
+ double delay;
+ if (debug_quiesce_after.has_value() && debug_fail_after.has_value()) {
+ do_fail = debug_fail_after < debug_quiesce_after;
+ } else {
+ do_fail = debug_fail_after.has_value();
+ }
+
+ if (do_fail) {
+ delay = debug_fail_after.value();
+ } else {
+ delay = debug_quiesce_after.value();
+ }
- auto quiesce_task = new LambdaContext([quiesce_requests, req_id, this](int) {
+ auto quiesce_task = new LambdaContext([quiesce_requests, req_id, do_fail, this](int) {
// the mds lock should be held by the timer
dout(20) << "quiesce_task: callback by the timer" << dendl;
auto it = std::ranges::find(*quiesce_requests, req_id, [](auto x) { return x.second.first; });
if (it != quiesce_requests->end() && it->second.second != nullptr) {
- dout(20) << "quiesce_task: completing the root '" << it->first << "'" << dendl;
- it->second.second->complete(0);
+ dout(20) << "quiesce_task: completing the root '" << it->first << "' as failed: " << do_fail << dendl;
+ it->second.second->complete(do_fail ? -EBADF : 0);
it->second.second = nullptr;
}
dout(20) << "quiesce_task: done" << dendl;
});
dout(20) << "scheduling a quiesce_task (" << quiesce_task
- << ") to fire after " << *dummy_quiesce_after
+ << ") to fire after " << delay
<< " seconds on timer " << &timer << dendl;
- timer.add_event_after(*dummy_quiesce_after, quiesce_task);
+ timer.add_event_after(delay, quiesce_task);
}
return it->second.first;
}
};
quiesce_agent.reset(new QuiesceAgent(ci));
-
- std::weak_ptr<QuiesceAgent> weak_agent = quiesce_agent;
- quiesce_db_manager->reset_agent_callback([weak_agent](QuiesceMap& update) {
- if (auto agent = weak_agent.lock()) {
- return agent->db_update(update);
- } else {
- return false;
- }
- });
+ rebind_agent_callback(quiesce_agent, quiesce_db_manager);
};
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 IBM, Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+#include "QuiesceDb.h"
+#include "include/encoding.h"
+#include <stdint.h>
+
+void encode(QuiesceDbVersion const& v, bufferlist& bl, uint64_t features = 0)
+{
+ encode(v.epoch, bl, features);
+ encode(v.set_version, bl, features);
+}
+
+void decode(QuiesceDbVersion& v, bufferlist::const_iterator& p)
+{
+ decode(v.epoch, p);
+ decode(v.set_version, p);
+}
+
+void encode(QuiesceState const & state, bufferlist& bl, uint64_t features=0)
+{
+ static_assert(QuiesceState::QS__MAX <= UINT8_MAX);
+ uint8_t v = (uint8_t)state;
+ encode(v, bl, features);
+}
+
+void decode(QuiesceState & state, bufferlist::const_iterator& p)
+{
+ uint8_t v = 0;
+ decode(v, p);
+ state = (QuiesceState)v;
+}
+
+void encode(QuiesceTimeInterval const & interval, bufferlist& bl, uint64_t features=0)
+{
+ encode(interval.count(), bl, features);
+}
+
+void decode(QuiesceTimeInterval & interval, bufferlist::const_iterator& p)
+{
+ QuiesceClock::rep count;
+ decode(count, p);
+ interval = QuiesceTimeInterval { count };
+}
+
+void encode(RecordedQuiesceState const& rstate, bufferlist& bl, uint64_t features = 0)
+{
+ encode(rstate.state, bl, features);
+ encode(rstate.at_age.count(), bl, features);
+}
+
+void decode(RecordedQuiesceState& rstate, bufferlist::const_iterator& p)
+{
+ decode(rstate.state, p);
+ decode(rstate.at_age, p);
+}
+
+void encode(QuiesceSet::MemberInfo const& member, bufferlist& bl, uint64_t features = 0)
+{
+ encode(member.rstate, bl, features);
+ encode(member.excluded, bl, features);
+}
+
+void decode(QuiesceSet::MemberInfo& member, bufferlist::const_iterator& p)
+{
+ decode(member.rstate, p);
+ decode(member.excluded, p);
+}
+
+void encode(QuiesceSet const& set, bufferlist& bl, uint64_t features = 0)
+{
+ encode(set.version, bl, features);
+ encode(set.rstate, bl, features);
+ encode(set.timeout, bl, features);
+ encode(set.expiration, bl, features);
+ encode(set.members, bl, features);
+}
+
+void decode(QuiesceSet& set, bufferlist::const_iterator& p)
+{
+ decode(set.version, p);
+ decode(set.rstate, p);
+ decode(set.timeout, p);
+ decode(set.expiration, p);
+ decode(set.members, p);
+}
+
+void encode(QuiesceDbRequest const& req, bufferlist& bl, uint64_t features = 0)
+{
+ encode(req.control.raw, bl, features);
+ encode(req.set_id, bl);
+ encode(req.if_version, bl);
+ encode(req.timeout, bl);
+ encode(req.expiration, bl);
+ encode(req.await, bl);
+ encode(req.roots, bl);
+}
+
+void decode(QuiesceDbRequest& req, bufferlist::const_iterator& p)
+{
+ decode(req.control.raw, p);
+ decode(req.set_id, p);
+ decode(req.if_version, p);
+ decode(req.timeout, p);
+ decode(req.expiration, p);
+ decode(req.await, p);
+ decode(req.roots, p);
+}
+
+void encode(QuiesceDbListing const& listing, bufferlist& bl, uint64_t features = 0)
+{
+ encode(listing.db_version, bl, features);
+ encode(listing.db_age, bl, features);
+ encode(listing.sets, bl, features);
+}
+
+void decode(QuiesceDbListing& listing, bufferlist::const_iterator& p)
+{
+ decode(listing.db_version, p);
+ decode(listing.db_age, p);
+ decode(listing.sets, p);
+}
+
+void encode(QuiesceMap::RootInfo const& root, bufferlist& bl, uint64_t features = 0)
+{
+ encode(root.state, bl, features);
+ encode(root.ttl, bl, features);
+}
+
+void decode(QuiesceMap::RootInfo& root, bufferlist::const_iterator& p)
+{
+ decode(root.state, p);
+ decode(root.ttl, p);
+}
+
+void encode(QuiesceMap const& map, bufferlist& bl, uint64_t features = 0)
+{
+ encode(map.db_version, bl, features);
+ encode(map.roots, bl, features);
+}
+
+void decode(QuiesceMap& map, bufferlist::const_iterator& p)
+{
+ decode(map.db_version, p);
+ decode(map.roots, p);
+}
+