]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds/quiesce: declare QuiesceDbPeerListing and QuiesceDbPeerAck
authorLeonid Usov <leonid.usov@ibm.com>
Thu, 29 Feb 2024 12:08:18 +0000 (14:08 +0200)
committerLeonid Usov <leonid.usov@ibm.com>
Thu, 14 Mar 2024 19:10:04 +0000 (15:10 -0400)
With these dedicated structs we can fully defer to QuiesceDbEncoding
when encoding/decoding quiesce db messages

Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
(cherry picked from commit 205fd3388ef0ed3011bb21384e79b34b6a6611ec)

src/include/cephfs/types.h
src/mds/MDSRankQuiesce.cc
src/mds/QuiesceDb.h
src/mds/QuiesceDbEncoding.h
src/mds/QuiesceDbManager.cc
src/mds/QuiesceDbManager.h
src/messages/MMDSQuiesceDbAck.h
src/messages/MMDSQuiesceDbListing.h
src/test/mds/TestQuiesceDb.cc

index 23f6e44a9c2b37a6e9252b8a7ca7275ce5110d9e..1b66929240b67e5a0f991722461c01f9a7226f93 100644 (file)
@@ -56,6 +56,17 @@ struct std::hash<mds_gid_t> {
   }
 };
 
+inline void encode(const mds_gid_t &v, bufferlist& bl, uint64_t features = 0) {
+  uint64_t vv = v;
+  encode_raw(vv, bl);
+}
+
+inline void decode(mds_gid_t &v, bufferlist::const_iterator& p) {
+  uint64_t vv;
+  decode_raw(vv, p);
+  v = vv;
+}
+
 typedef int32_t fs_cluster_id_t;
 constexpr fs_cluster_id_t FS_CLUSTER_ID_NONE = -1;
 
index bcc8e149ed7fa6708a190e3be49330405f4e159e..53d82cb1364835afe49b0fae06d2abdd15cb50e9 100644 (file)
@@ -260,7 +260,7 @@ void MDSRank::quiesce_cluster_update() {
     membership.send_ack = [=, this](QuiesceMap&& ack) {
       if (me == membership.leader) {
         // loopback
-        quiesce_db_manager->submit_ack_from(me, std::move(ack));
+        quiesce_db_manager->submit_peer_ack({me, std::move(ack)});
         return 0;
       } else {
         std::lock_guard guard(mds_lock);
@@ -273,7 +273,7 @@ void MDSRank::quiesce_cluster_update() {
 
         auto ack_msg = make_message<MMDSQuiesceDbAck>();
         dout(10) << "sending ack " << ack << " to the leader " << membership.leader << dendl;
-        ack_msg->encode_payload_from(me, ack);
+        ack_msg->encode_payload_from({me, std::move(ack)});
         return send_message_mds(ack_msg, addrs);
       }
     };
@@ -287,7 +287,7 @@ void MDSRank::quiesce_cluster_update() {
       auto addrs = mdsmap->get_info_gid(to).addrs;
       auto listing_msg = make_message<MMDSQuiesceDbListing>();
       dout(10) << "sending listing " << db << " to the peer " << to << dendl;
-      listing_msg->encode_payload_from(me, db);
+      listing_msg->encode_payload_from({me, std::move(db)});
       return send_message_mds(listing_msg, addrs);
     };
   }
@@ -363,16 +363,16 @@ bool MDSRank::quiesce_dispatch(const cref_t<Message> &m) {
       {
         const auto& req = ref_cast<MMDSQuiesceDbListing>(m);
         mds_gid_t gid;
-        QuiesceDbListing db_listing;
-        req->decode_payload_into(gid, db_listing);
+        QuiesceDbPeerListing peer_listing;
+        req->decode_payload_into(peer_listing);
         if (quiesce_db_manager) {
-          dout(10) << "got " << db_listing << " from peer " << gid << dendl;
-          int result = quiesce_db_manager->submit_listing_from(gid, std::move(db_listing));
+          dout(10) << "got " << peer_listing << dendl;
+          int result = quiesce_db_manager->submit_peer_listing(std::move(peer_listing));
           if (result != 0) {
-            dout(3) << "error (" << result << ") submitting " << db_listing << " from peer " << gid << dendl;
+            dout(3) << "error (" << result << ") submitting " << peer_listing << dendl;
           }
         } else {
-          dout(5) << "no db manager to process " << db_listing << dendl;
+          dout(5) << "no db manager to process " << peer_listing << dendl;
         }
         return true;
       }
@@ -380,16 +380,16 @@ bool MDSRank::quiesce_dispatch(const cref_t<Message> &m) {
       {
         const auto& req = ref_cast<MMDSQuiesceDbAck>(m);
         mds_gid_t gid;
-        QuiesceMap diff_map;
-        req->decode_payload_into(gid, diff_map);
+        QuiesceDbPeerAck peer_ack;
+        req->decode_payload_into(peer_ack);
         if (quiesce_db_manager) {
-          dout(10) << "got ack " << diff_map << " from peer " << gid << dendl;
-          int result = quiesce_db_manager->submit_ack_from(gid, std::move(diff_map));
+          dout(10) << "got " << peer_ack << dendl;
+          int result = quiesce_db_manager->submit_peer_ack(std::move(peer_ack));
           if (result != 0) {
-            dout(3) << "error (" << result << ") submitting an ack from peer " << gid << dendl;
+            dout(3) << "error (" << result << ") submitting and ack from " << peer_ack.origin << dendl;
           }
         } else {
-          dout(5) << "no db manager to process an ack: " << diff_map << dendl;
+          dout(5) << "no db manager to process " << peer_ack << dendl;
         }
         return true;
       }
index 5fedcfe5a5c1cd2af1baeee6274ce0248f83eb43..8fccc43d448e047f630a10592476d0a050d449d6 100644 (file)
@@ -119,6 +119,10 @@ using QuiesceSetId = std::string;
 using QuiesceRoot = std::string;
 using QuiesceSetVersion = uint64_t;
 
+namespace QuiesceInterface {
+  using PeerId = mds_gid_t;
+}
+
 struct QuiesceDbVersion {
   epoch_t epoch;
   QuiesceSetVersion set_version;
@@ -563,7 +567,7 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbRequest& req)
 ///         contain all sets that have their version > than the last acked by the peer.
 struct QuiesceDbListing {
   QuiesceDbVersion db_version = {0, 0};
-  /// @brief  Crusially, the precise `db_age` must be included in every db listing
+  /// @brief  Crucially, the precise `db_age` must be included in every db listing
   ///         This data is used by all replicas to update their calculated DB TIME ZERO.
   ///         All events in the database are measured relative to the DB TIME ZERO
   QuiesceTimeInterval db_age = QuiesceTimeInterval::zero();
@@ -600,6 +604,18 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbListing& dbl)
   return os << "q-db[v:" << dbl.db_version << " sets:" << active << "/" << inactive << "]";
 }
 
+struct QuiesceDbPeerListing {
+  QuiesceInterface::PeerId origin;
+  QuiesceDbListing db;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerListing& dbl)
+{
+  return os << dbl.db << " from " << dbl.origin;
+}
+
 /// @brief  `QuiesceMap` is a root-centric representation of the quiesce database
 ///         It lists roots with their effective states as of particular version.
 ///         Additionally, the same structure is used by the peers when reporting
@@ -662,6 +678,18 @@ operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceMap& map)
   return os << "q-map[v:" << map.db_version << " roots:" << active << "/" << inactive << "]";
 }
 
+struct QuiesceDbPeerAck {
+  QuiesceInterface::PeerId origin;
+  QuiesceMap diff_map;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerAck& ack)
+{
+  return os << "ack " << ack.diff_map << " from " << ack.origin;
+}
+
 inline QuiesceTimeInterval interval_saturate_add(QuiesceTimeInterval lhs, QuiesceTimeInterval rhs)
 {
   // assuming an unsigned time interval.
@@ -685,7 +713,6 @@ inline QuiesceTimePoint interval_saturate_add_now(QuiesceTimeInterval interval)
 };
 
 namespace QuiesceInterface {
-  using PeerId = mds_gid_t;
   /// @brief  A callback from the manager to the agent with an up-to-date root list
   ///         The map is mutable and will be used as synchronous agent ack if the return value is true
   using AgentNotify = std::function<bool(QuiesceMap&)>;
index 756e63cf9e3e9d065ad0751050847377cc4f331d..ff2167c5d405b2c8888debed7e77f686c08c8345 100644 (file)
 #include "include/encoding.h"
 #include <stdint.h>
 
+struct QuiesceDbEncoding {
+  static constexpr int version = 1;
+  static constexpr int compat = 1;
+};
+
 void encode(QuiesceDbVersion const& v, bufferlist& bl, uint64_t features = 0)
 {
   encode(v.epoch, bl, features);
@@ -131,6 +136,18 @@ void decode(QuiesceDbListing& listing, bufferlist::const_iterator& p)
   decode(listing.sets, p);
 }
 
+void encode(QuiesceDbPeerListing const& listing, bufferlist& bl, uint64_t features = 0)
+{
+  encode(listing.origin, bl, features);
+  encode(listing.db, bl, features);
+}
+
+void decode(QuiesceDbPeerListing& listing, bufferlist::const_iterator& p)
+{
+  decode(listing.origin, p);
+  decode(listing.db, p);
+}
+
 void encode(QuiesceMap::RootInfo const& root, bufferlist& bl, uint64_t features = 0)
 {
   encode(root.state, bl, features);
@@ -155,3 +172,14 @@ void decode(QuiesceMap& map, bufferlist::const_iterator& p)
   decode(map.roots, p);
 }
 
+void encode(QuiesceDbPeerAck const& ack, bufferlist& bl, uint64_t features = 0)
+{
+  encode(ack.origin, bl, features);
+  encode(ack.diff_map, bl, features);
+}
+
+void decode(QuiesceDbPeerAck& ack, bufferlist::const_iterator& p)
+{
+  decode(ack.origin, p);
+  decode(ack.diff_map, p);
+}
index 1ff998f606868fde6f9df9f0a84f7ceaa5cfba17..ff497aafe45211b858088d9d770da603fc562d57 100644 (file)
@@ -256,7 +256,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
 {
   // as a replica, we only care about the latest update
   while (db_updates.size() > 1) {
-    dout(10) << "skipping an older update from " << db_updates.front().first << " version " << db_updates.front().second.db_version << dendl;
+    dout(10) << "skipping an older update from " << db_updates.front().origin << " version " << db_updates.front().db.db_version << dendl;
     db_updates.pop();
   }
 
@@ -265,7 +265,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
     return QuiesceTimeInterval::max();
   }
 
-  QuiesceDbListing &update = db_updates.back().second;
+  QuiesceDbListing &update = db_updates.back().db;
 
   if (update.db_version.epoch != membership.epoch) {
     dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
@@ -317,7 +317,8 @@ bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_update
 
   // only consider db submissions from unknown peers
   while (!unknown_peers.empty() && !db_updates.empty()) {
-    auto &[from, update] = db_updates.front();
+    auto &from = db_updates.front().origin;
+    auto &update = db_updates.front().db;
     if (update.db_version.epoch == membership.epoch && unknown_peers.erase(from) > 0) {
       // see if this peer's version is newer than mine
       if (db.set_version < update.db_version.set_version) {
index 7b7e77ff2f676c6ee2d0566b8238cb015073600c..08c8392d9815243848f6fc889355be89b4bfddfb 100644 (file)
@@ -75,19 +75,20 @@ class QuiesceDbManager {
       submit_condition.notify_all();
       return 0;
     }
+  
     // acks the messaging system
-    int submit_ack_from(QuiesceInterface::PeerId sender, const QuiesceMap& diff_map) {
+    int submit_peer_ack(QuiesceDbPeerAck&& ack) {
       std::lock_guard l(submit_mutex);
 
       if (!cluster_membership || !cluster_membership->is_leader()) {
         return -EPERM;
       }
 
-      if (!cluster_membership->members.contains(sender)) {
+      if (!cluster_membership->members.contains(ack.origin)) {
         return -ESTALE;
       }
 
-      pending_acks.push({ sender, diff_map });
+      pending_acks.push(std::move(ack));
       submit_condition.notify_all();
       return 0;
     }
@@ -97,18 +98,18 @@ class QuiesceDbManager {
     //    -> EPERM if this is the leader
 
     // process an incoming listing from a leader
-    int submit_listing_from(QuiesceInterface::PeerId sender, QuiesceDbListing&& listing) {
+    int submit_peer_listing(QuiesceDbPeerListing&& listing) {
       std::lock_guard l(submit_mutex);
 
       if (!cluster_membership) {
         return -EPERM;
       }
 
-      if (cluster_membership->epoch != listing.db_version.epoch) {
+      if (cluster_membership->epoch != listing.db.db_version.epoch) {
         return -ESTALE;
       }
 
-      pending_db_updates.push({sender, std::move(listing)});
+      pending_db_updates.push(std::move(listing));
       submit_condition.notify_all();
       return 0;
     }
@@ -187,8 +188,8 @@ class QuiesceDbManager {
 
     std::optional<AgentCallback> agent_callback;
     std::optional<QuiesceClusterMembership> cluster_membership;
-    std::queue<std::pair<QuiesceInterface::PeerId, QuiesceDbListing>> pending_db_updates;
-    std::queue<std::pair<QuiesceInterface::PeerId, QuiesceMap>> pending_acks;
+    std::queue<QuiesceDbPeerListing> pending_db_updates;
+    std::queue<QuiesceDbPeerAck> pending_acks;
     std::deque<RequestContext*> pending_requests;
 
     class QuiesceDbThread : public Thread {
index 1d56451e89bc1b566ae02d9a9522abfbdec918dc..b68445dfb09136c7cf9d1c9e390ec35b8bb21b39 100644 (file)
@@ -34,15 +34,10 @@ public:
     // noop to prevent unnecessary overheads
   }
 
-  void encode_payload_from(mds_gid_t const&gid, QuiesceMap const&diff_map)
+  void encode_payload_from(QuiesceDbPeerAck const& ack)
   {
-    using ceph::encode;
-
-    ceph_assert(gid != MDS_GID_NONE);
-
-    ENCODE_START(1, 1, payload);
-    encode(gid, payload);
-    encode(diff_map, payload);
+    ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
+    encode(ack, payload);
     ENCODE_FINISH(payload);
   }
 
@@ -50,13 +45,11 @@ public:
     // noop to prevent unnecessary overheads
   }
 
-  void decode_payload_into(mds_gid_t &gid, QuiesceMap &diff_map) const
+  void decode_payload_into(QuiesceDbPeerAck &ack) const
   {
-    using ceph::decode;
     auto p = payload.cbegin();
-    DECODE_START(1, p);
-    decode(gid, p);
-    decode(diff_map, p);
+    DECODE_START(QuiesceDbEncoding::version, p);
+    decode(ack, p);
     DECODE_FINISH(p);
   }
 
index f57de50e22fb52c049295467de40ed97f293fb2d..5fd068adb5601aed6265e0b1033474d18f1f4334 100644 (file)
@@ -33,15 +33,10 @@ public:
     // noop to prevent unnecessary overheads
   }
 
-  void encode_payload_from(mds_gid_t const& gid, QuiesceDbListing const& db_listing)
+  void encode_payload_from(QuiesceDbPeerListing const& peer_listing)
   {
-    using ceph::encode;
-
-    ceph_assert(gid != MDS_GID_NONE);
-
-    ENCODE_START(1, 1, payload);
-    encode(gid, payload);
-    encode(db_listing, payload);
+    ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
+    encode(peer_listing, payload);
     ENCODE_FINISH(payload);
   }
 
@@ -49,13 +44,11 @@ public:
     // noop to prevent unnecessary overheads
   }
 
-  void decode_payload_into(mds_gid_t &gid, QuiesceDbListing &db_listing) const
+  void decode_payload_into(QuiesceDbPeerListing &peer_listing) const
   {
-    using ceph::decode;
     auto p = payload.cbegin();
-    DECODE_START(1, p);
-    decode(gid, p);
-    decode(db_listing, p);
+    DECODE_START(QuiesceDbEncoding::version, p);
+    decode(peer_listing, p);
     DECODE_FINISH(p);
   }
 
index 3c48474fa32b516c92b6836786cedebe15383093..8cd168424e56b9f5e690b00a13aa125f0c270c7c 100644 (file)
@@ -151,7 +151,7 @@ class QuiesceDbTest: public testing::Test {
             if (epoch == this->epoch) {
               if (this->managers.contains(recipient)) {
                 dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl;
-                this->managers[recipient]->submit_listing_from(me, std::move(listing));
+                this->managers[recipient]->submit_peer_listing({me, std::move(listing)});
                 comms_cond.notify_all();
                 return 0;
               }
@@ -181,7 +181,7 @@ class QuiesceDbTest: public testing::Test {
                     it++;
                   }
                 }
-                this->managers[leader]->submit_ack_from(me, std::move(diff_map));
+                this->managers[leader]->submit_peer_ack({me, std::move(diff_map)});
                 comms_cond.notify_all();
                 l.unlock();
                 while(!done_hooks.empty()) {