From: Leonid Usov Date: Thu, 29 Feb 2024 12:08:18 +0000 (+0200) Subject: mds/quiesce: declare QuiesceDbPeerListing and QuiesceDbPeerAck X-Git-Tag: testing/wip-root-testing-20240411.174241~99^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=eef16a18a6e4e253475531460148f2cc9e3de843;p=ceph-ci.git mds/quiesce: declare QuiesceDbPeerListing and QuiesceDbPeerAck With these dedicated structs we can fully defer to QuiesceDbEncoding when encoding/decoding quiesce db messages Signed-off-by: Leonid Usov (cherry picked from commit 205fd3388ef0ed3011bb21384e79b34b6a6611ec) --- diff --git a/src/include/cephfs/types.h b/src/include/cephfs/types.h index 23f6e44a9c2..1b66929240b 100644 --- a/src/include/cephfs/types.h +++ b/src/include/cephfs/types.h @@ -56,6 +56,17 @@ struct std::hash { } }; +inline void encode(const mds_gid_t &v, bufferlist& bl, uint64_t features = 0) { + uint64_t vv = v; + encode_raw(vv, bl); +} + +inline void decode(mds_gid_t &v, bufferlist::const_iterator& p) { + uint64_t vv; + decode_raw(vv, p); + v = vv; +} + typedef int32_t fs_cluster_id_t; constexpr fs_cluster_id_t FS_CLUSTER_ID_NONE = -1; diff --git a/src/mds/MDSRankQuiesce.cc b/src/mds/MDSRankQuiesce.cc index bcc8e149ed7..53d82cb1364 100644 --- a/src/mds/MDSRankQuiesce.cc +++ b/src/mds/MDSRankQuiesce.cc @@ -260,7 +260,7 @@ void MDSRank::quiesce_cluster_update() { membership.send_ack = [=, this](QuiesceMap&& ack) { if (me == membership.leader) { // loopback - quiesce_db_manager->submit_ack_from(me, std::move(ack)); + quiesce_db_manager->submit_peer_ack({me, std::move(ack)}); return 0; } else { std::lock_guard guard(mds_lock); @@ -273,7 +273,7 @@ void MDSRank::quiesce_cluster_update() { auto ack_msg = make_message(); dout(10) << "sending ack " << ack << " to the leader " << membership.leader << dendl; - ack_msg->encode_payload_from(me, ack); + ack_msg->encode_payload_from({me, std::move(ack)}); return send_message_mds(ack_msg, addrs); } }; @@ -287,7 +287,7 @@ void MDSRank::quiesce_cluster_update() { auto addrs = mdsmap->get_info_gid(to).addrs; auto listing_msg = make_message(); dout(10) << "sending listing " << db << " to the peer " << to << dendl; - listing_msg->encode_payload_from(me, db); + listing_msg->encode_payload_from({me, std::move(db)}); return send_message_mds(listing_msg, addrs); }; } @@ -363,16 +363,16 @@ bool MDSRank::quiesce_dispatch(const cref_t &m) { { const auto& req = ref_cast(m); mds_gid_t gid; - QuiesceDbListing db_listing; - req->decode_payload_into(gid, db_listing); + QuiesceDbPeerListing peer_listing; + req->decode_payload_into(peer_listing); if (quiesce_db_manager) { - dout(10) << "got " << db_listing << " from peer " << gid << dendl; - int result = quiesce_db_manager->submit_listing_from(gid, std::move(db_listing)); + dout(10) << "got " << peer_listing << dendl; + int result = quiesce_db_manager->submit_peer_listing(std::move(peer_listing)); if (result != 0) { - dout(3) << "error (" << result << ") submitting " << db_listing << " from peer " << gid << dendl; + dout(3) << "error (" << result << ") submitting " << peer_listing << dendl; } } else { - dout(5) << "no db manager to process " << db_listing << dendl; + dout(5) << "no db manager to process " << peer_listing << dendl; } return true; } @@ -380,16 +380,16 @@ bool MDSRank::quiesce_dispatch(const cref_t &m) { { const auto& req = ref_cast(m); mds_gid_t gid; - QuiesceMap diff_map; - req->decode_payload_into(gid, diff_map); + QuiesceDbPeerAck peer_ack; + req->decode_payload_into(peer_ack); if (quiesce_db_manager) { - dout(10) << "got ack " << diff_map << " from peer " << gid << dendl; - int result = quiesce_db_manager->submit_ack_from(gid, std::move(diff_map)); + dout(10) << "got " << peer_ack << dendl; + int result = quiesce_db_manager->submit_peer_ack(std::move(peer_ack)); if (result != 0) { - dout(3) << "error (" << result << ") submitting an ack from peer " << gid << dendl; + dout(3) << "error (" << result << ") submitting and ack from " << peer_ack.origin << dendl; } } else { - dout(5) << "no db manager to process an ack: " << diff_map << dendl; + dout(5) << "no db manager to process " << peer_ack << dendl; } return true; } diff --git a/src/mds/QuiesceDb.h b/src/mds/QuiesceDb.h index 5fedcfe5a5c..8fccc43d448 100644 --- a/src/mds/QuiesceDb.h +++ b/src/mds/QuiesceDb.h @@ -119,6 +119,10 @@ using QuiesceSetId = std::string; using QuiesceRoot = std::string; using QuiesceSetVersion = uint64_t; +namespace QuiesceInterface { + using PeerId = mds_gid_t; +} + struct QuiesceDbVersion { epoch_t epoch; QuiesceSetVersion set_version; @@ -563,7 +567,7 @@ operator<<(std::basic_ostream& os, const QuiesceDbRequest& req) /// contain all sets that have their version > than the last acked by the peer. struct QuiesceDbListing { QuiesceDbVersion db_version = {0, 0}; - /// @brief Crusially, the precise `db_age` must be included in every db listing + /// @brief Crucially, the precise `db_age` must be included in every db listing /// This data is used by all replicas to update their calculated DB TIME ZERO. /// All events in the database are measured relative to the DB TIME ZERO QuiesceTimeInterval db_age = QuiesceTimeInterval::zero(); @@ -600,6 +604,18 @@ operator<<(std::basic_ostream& os, const QuiesceDbListing& dbl) return os << "q-db[v:" << dbl.db_version << " sets:" << active << "/" << inactive << "]"; } +struct QuiesceDbPeerListing { + QuiesceInterface::PeerId origin; + QuiesceDbListing db; +}; + +template +static std::basic_ostream& +operator<<(std::basic_ostream& os, const QuiesceDbPeerListing& dbl) +{ + return os << dbl.db << " from " << dbl.origin; +} + /// @brief `QuiesceMap` is a root-centric representation of the quiesce database /// It lists roots with their effective states as of particular version. /// Additionally, the same structure is used by the peers when reporting @@ -662,6 +678,18 @@ operator<<(std::basic_ostream& os, const QuiesceMap& map) return os << "q-map[v:" << map.db_version << " roots:" << active << "/" << inactive << "]"; } +struct QuiesceDbPeerAck { + QuiesceInterface::PeerId origin; + QuiesceMap diff_map; +}; + +template +static std::basic_ostream& +operator<<(std::basic_ostream& os, const QuiesceDbPeerAck& ack) +{ + return os << "ack " << ack.diff_map << " from " << ack.origin; +} + inline QuiesceTimeInterval interval_saturate_add(QuiesceTimeInterval lhs, QuiesceTimeInterval rhs) { // assuming an unsigned time interval. @@ -685,7 +713,6 @@ inline QuiesceTimePoint interval_saturate_add_now(QuiesceTimeInterval interval) }; namespace QuiesceInterface { - using PeerId = mds_gid_t; /// @brief A callback from the manager to the agent with an up-to-date root list /// The map is mutable and will be used as synchronous agent ack if the return value is true using AgentNotify = std::function; diff --git a/src/mds/QuiesceDbEncoding.h b/src/mds/QuiesceDbEncoding.h index 756e63cf9e3..ff2167c5d40 100644 --- a/src/mds/QuiesceDbEncoding.h +++ b/src/mds/QuiesceDbEncoding.h @@ -15,6 +15,11 @@ #include "include/encoding.h" #include +struct QuiesceDbEncoding { + static constexpr int version = 1; + static constexpr int compat = 1; +}; + void encode(QuiesceDbVersion const& v, bufferlist& bl, uint64_t features = 0) { encode(v.epoch, bl, features); @@ -131,6 +136,18 @@ void decode(QuiesceDbListing& listing, bufferlist::const_iterator& p) decode(listing.sets, p); } +void encode(QuiesceDbPeerListing const& listing, bufferlist& bl, uint64_t features = 0) +{ + encode(listing.origin, bl, features); + encode(listing.db, bl, features); +} + +void decode(QuiesceDbPeerListing& listing, bufferlist::const_iterator& p) +{ + decode(listing.origin, p); + decode(listing.db, p); +} + void encode(QuiesceMap::RootInfo const& root, bufferlist& bl, uint64_t features = 0) { encode(root.state, bl, features); @@ -155,3 +172,14 @@ void decode(QuiesceMap& map, bufferlist::const_iterator& p) decode(map.roots, p); } +void encode(QuiesceDbPeerAck const& ack, bufferlist& bl, uint64_t features = 0) +{ + encode(ack.origin, bl, features); + encode(ack.diff_map, bl, features); +} + +void decode(QuiesceDbPeerAck& ack, bufferlist::const_iterator& p) +{ + decode(ack.origin, p); + decode(ack.diff_map, p); +} diff --git a/src/mds/QuiesceDbManager.cc b/src/mds/QuiesceDbManager.cc index 1ff998f6068..ff497aafe45 100644 --- a/src/mds/QuiesceDbManager.cc +++ b/src/mds/QuiesceDbManager.cc @@ -256,7 +256,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates { // as a replica, we only care about the latest update while (db_updates.size() > 1) { - dout(10) << "skipping an older update from " << db_updates.front().first << " version " << db_updates.front().second.db_version << dendl; + dout(10) << "skipping an older update from " << db_updates.front().origin << " version " << db_updates.front().db.db_version << dendl; db_updates.pop(); } @@ -265,7 +265,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates return QuiesceTimeInterval::max(); } - QuiesceDbListing &update = db_updates.back().second; + QuiesceDbListing &update = db_updates.back().db; if (update.db_version.epoch != membership.epoch) { dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl; @@ -317,7 +317,8 @@ bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_update // only consider db submissions from unknown peers while (!unknown_peers.empty() && !db_updates.empty()) { - auto &[from, update] = db_updates.front(); + auto &from = db_updates.front().origin; + auto &update = db_updates.front().db; if (update.db_version.epoch == membership.epoch && unknown_peers.erase(from) > 0) { // see if this peer's version is newer than mine if (db.set_version < update.db_version.set_version) { diff --git a/src/mds/QuiesceDbManager.h b/src/mds/QuiesceDbManager.h index 7b7e77ff2f6..08c8392d981 100644 --- a/src/mds/QuiesceDbManager.h +++ b/src/mds/QuiesceDbManager.h @@ -75,19 +75,20 @@ class QuiesceDbManager { submit_condition.notify_all(); return 0; } + // acks the messaging system - int submit_ack_from(QuiesceInterface::PeerId sender, const QuiesceMap& diff_map) { + int submit_peer_ack(QuiesceDbPeerAck&& ack) { std::lock_guard l(submit_mutex); if (!cluster_membership || !cluster_membership->is_leader()) { return -EPERM; } - if (!cluster_membership->members.contains(sender)) { + if (!cluster_membership->members.contains(ack.origin)) { return -ESTALE; } - pending_acks.push({ sender, diff_map }); + pending_acks.push(std::move(ack)); submit_condition.notify_all(); return 0; } @@ -97,18 +98,18 @@ class QuiesceDbManager { // -> EPERM if this is the leader // process an incoming listing from a leader - int submit_listing_from(QuiesceInterface::PeerId sender, QuiesceDbListing&& listing) { + int submit_peer_listing(QuiesceDbPeerListing&& listing) { std::lock_guard l(submit_mutex); if (!cluster_membership) { return -EPERM; } - if (cluster_membership->epoch != listing.db_version.epoch) { + if (cluster_membership->epoch != listing.db.db_version.epoch) { return -ESTALE; } - pending_db_updates.push({sender, std::move(listing)}); + pending_db_updates.push(std::move(listing)); submit_condition.notify_all(); return 0; } @@ -187,8 +188,8 @@ class QuiesceDbManager { std::optional agent_callback; std::optional cluster_membership; - std::queue> pending_db_updates; - std::queue> pending_acks; + std::queue pending_db_updates; + std::queue pending_acks; std::deque pending_requests; class QuiesceDbThread : public Thread { diff --git a/src/messages/MMDSQuiesceDbAck.h b/src/messages/MMDSQuiesceDbAck.h index 1d56451e89b..b68445dfb09 100644 --- a/src/messages/MMDSQuiesceDbAck.h +++ b/src/messages/MMDSQuiesceDbAck.h @@ -34,15 +34,10 @@ public: // noop to prevent unnecessary overheads } - void encode_payload_from(mds_gid_t const&gid, QuiesceMap const&diff_map) + void encode_payload_from(QuiesceDbPeerAck const& ack) { - using ceph::encode; - - ceph_assert(gid != MDS_GID_NONE); - - ENCODE_START(1, 1, payload); - encode(gid, payload); - encode(diff_map, payload); + ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload); + encode(ack, payload); ENCODE_FINISH(payload); } @@ -50,13 +45,11 @@ public: // noop to prevent unnecessary overheads } - void decode_payload_into(mds_gid_t &gid, QuiesceMap &diff_map) const + void decode_payload_into(QuiesceDbPeerAck &ack) const { - using ceph::decode; auto p = payload.cbegin(); - DECODE_START(1, p); - decode(gid, p); - decode(diff_map, p); + DECODE_START(QuiesceDbEncoding::version, p); + decode(ack, p); DECODE_FINISH(p); } diff --git a/src/messages/MMDSQuiesceDbListing.h b/src/messages/MMDSQuiesceDbListing.h index f57de50e22f..5fd068adb56 100644 --- a/src/messages/MMDSQuiesceDbListing.h +++ b/src/messages/MMDSQuiesceDbListing.h @@ -33,15 +33,10 @@ public: // noop to prevent unnecessary overheads } - void encode_payload_from(mds_gid_t const& gid, QuiesceDbListing const& db_listing) + void encode_payload_from(QuiesceDbPeerListing const& peer_listing) { - using ceph::encode; - - ceph_assert(gid != MDS_GID_NONE); - - ENCODE_START(1, 1, payload); - encode(gid, payload); - encode(db_listing, payload); + ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload); + encode(peer_listing, payload); ENCODE_FINISH(payload); } @@ -49,13 +44,11 @@ public: // noop to prevent unnecessary overheads } - void decode_payload_into(mds_gid_t &gid, QuiesceDbListing &db_listing) const + void decode_payload_into(QuiesceDbPeerListing &peer_listing) const { - using ceph::decode; auto p = payload.cbegin(); - DECODE_START(1, p); - decode(gid, p); - decode(db_listing, p); + DECODE_START(QuiesceDbEncoding::version, p); + decode(peer_listing, p); DECODE_FINISH(p); } diff --git a/src/test/mds/TestQuiesceDb.cc b/src/test/mds/TestQuiesceDb.cc index 3c48474fa32..8cd168424e5 100644 --- a/src/test/mds/TestQuiesceDb.cc +++ b/src/test/mds/TestQuiesceDb.cc @@ -151,7 +151,7 @@ class QuiesceDbTest: public testing::Test { if (epoch == this->epoch) { if (this->managers.contains(recipient)) { dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl; - this->managers[recipient]->submit_listing_from(me, std::move(listing)); + this->managers[recipient]->submit_peer_listing({me, std::move(listing)}); comms_cond.notify_all(); return 0; } @@ -181,7 +181,7 @@ class QuiesceDbTest: public testing::Test { it++; } } - this->managers[leader]->submit_ack_from(me, std::move(diff_map)); + this->managers[leader]->submit_peer_ack({me, std::move(diff_map)}); comms_cond.notify_all(); l.unlock(); while(!done_hooks.empty()) {