]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
nvmeofgw: beacon diff implementation in the monitor unit tests works and tested...
authorLeonid Chernin <leonidc@il.ibm.com>
Mon, 15 Sep 2025 11:04:04 +0000 (14:04 +0300)
committerLeonid Chernin <leonidc@il.ibm.com>
Wed, 15 Oct 2025 07:28:09 +0000 (10:28 +0300)
           coming according to new schema fixes for failover/failback/gw fast-reboot.

           src/messages/MNVMeofGwBeacon.h: add sequence number
   truncate prev_beacon_subsystems on beacon sequence mismatch
   do not process OOO map

Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
nvmeofgw : monitor changes for the beacon-diff feature
     process beacons by beacon-diff new schema
     detect sequence out of order(ooo) condition and handle it
     in case ooo detected send ack to the gw with the expected correct sequence
     skip failovers for some interval when ooo detected
     ignore all becons with incorrect sequences until gw sends expected one

Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
nvmeofgw: this commit adds upgrade rules for the beacon-diff

Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
nvmeofgw: CR change: use fmt::format() instead of std::hex stream manipulators

Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
nvmeofgw: CR change: use BEACON_SUBSYS_VERSION_ENHANCED instead of verson number

Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
mon: CR change: centralize beacon version constants to eliminate duplication

Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
Fixes: https://tracker.ceph.com/issues/72394
18 files changed:
src/CMakeLists.txt
src/include/ceph_features.h
src/messages/MNVMeofGwBeacon.h
src/mon/NVMeofGwBeaconConstants.h [new file with mode: 0644]
src/mon/NVMeofGwMap.cc
src/mon/NVMeofGwMap.h
src/mon/NVMeofGwMon.cc
src/mon/NVMeofGwMon.h
src/mon/NVMeofGwSerialize.h
src/mon/NVMeofGwTypes.h
src/msg/Message.cc
src/nvmeof/NVMeofGwMonitorClient.cc
src/nvmeof/NVMeofGwMonitorClient.h
src/nvmeof/NVMeofGwUtils.cc [new file with mode: 0644]
src/nvmeof/NVMeofGwUtils.h [new file with mode: 0644]
src/test/CMakeLists.txt
src/test/test_nvmeof_gw_utils.cc [new file with mode: 0644]
src/test/test_nvmeof_mon_encoding.cc

index ab832ad7d7914a6041293e21933c5ba8dca5646d..0b0711f652625c75aa8aafdbd09ad9c643778ad7 100644 (file)
@@ -1077,6 +1077,7 @@ if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT)
     ${nvmeof_monitor_grpc_hdrs}
     ceph_nvmeof_monitor_client.cc
     nvmeof/NVMeofGwClient.cc
+    nvmeof/NVMeofGwUtils.cc
     nvmeof/NVMeofGwMonitorGroupClient.cc
     nvmeof/NVMeofGwMonitorClient.cc)
   add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs})
index 5d8b38fc0a1d7436993585a0e50bfa21989ff789..b9ca2690abef3333b9cf8a6c0b5d9ea98736819b 100644 (file)
@@ -149,6 +149,7 @@ DEFINE_CEPH_FEATURE(43, 1, OSD_POOLRESEND)   // 4.13
 DEFINE_CEPH_FEATURE(44, 2, NVMEOFHA)
 DEFINE_CEPH_FEATURE_RETIRED(45, 1, OSD_SET_ALLOC_HINT, JEWEL, LUMINOUS)
 DEFINE_CEPH_FEATURE(45, 2, NVMEOFHAMAP)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACONDIFF)
 // available
 DEFINE_CEPH_FEATURE(46, 1, OSD_FADVISE_FLAGS)
 DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_REPOP, JEWEL, LUMINOUS) // overlap
@@ -162,6 +163,7 @@ DEFINE_CEPH_FEATURE(49, 2, SERVER_SQUID);
 DEFINE_CEPH_FEATURE_RETIRED(50, 1, MON_METADATA, MIMIC, OCTOPUS)
 DEFINE_CEPH_FEATURE(50, 2, SERVER_TENTACLE);
 DEFINE_CEPH_FEATURE_RETIRED(51, 1, OSD_BITWISE_HOBJ_SORT, MIMIC, OCTOPUS)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACON_DIFF)
 // available
 DEFINE_CEPH_FEATURE_RETIRED(52, 1, OSD_PROXY_WRITE_FEATURES, MIMIC, OCTOPUS)
 // available
@@ -258,6 +260,7 @@ DEFINE_CEPH_FEATURE_RETIRED(63, 1, RESERVED_BROKEN, LUMINOUS, QUINCY) // client-
         CEPH_FEATUREMASK_SERVER_REEF | \
         CEPH_FEATUREMASK_SERVER_SQUID | \
         CEPH_FEATUREMASK_SERVER_TENTACLE | \
+        CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF | \
         0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
index 61daa5269bf6dee7785a5905b60cb5c58b0184b9..f18bb3cd7c8821695306d99656835bfdee82a12c 100644 (file)
 #include "mon/MonCommand.h"
 #include "mon/NVMeofGwMap.h"
 #include "include/types.h"
+#include "mon/NVMeofGwBeaconConstants.h"
 
 class MNVMeofGwBeacon final : public PaxosServiceMessage {
 private:
-  static constexpr int HEAD_VERSION = 1;
-  static constexpr int COMPAT_VERSION = 1;
 
 protected:
     std::string       gw_id;
@@ -36,10 +35,12 @@ protected:
     gw_availability_t availability;                         // in absence of  beacon  heartbeat messages it becomes inavailable
     epoch_t           last_osd_epoch;
     epoch_t           last_gwmap_epoch;
+    uint64_t          sequence = 0;                         // sequence number for each beacon message
 
 public:
   MNVMeofGwBeacon()
-    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}
+    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, BEACON_VERSION_ENHANCED,
+      BEACON_VERSION_LEGACY}, sequence(0)
   {
     set_priority(CEPH_MSG_PRIO_HIGH);
   }
@@ -50,11 +51,16 @@ public:
         const BeaconSubsystems& subsystems_,
         const gw_availability_t& availability_,
         const epoch_t& last_osd_epoch_,
-        const epoch_t& last_gwmap_epoch_
-  )
-    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
+        const epoch_t& last_gwmap_epoch_,
+        uint64_t sequence_ = 0,  // default sequence for backward compatibility
+        bool enable_diff = false)  // default to legacy behavior for backward compatibility
+    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON,
+                          static_cast<version_t>(enable_diff ? 1 : 0), // user_version: 1=enhanced, 0=legacy
+                          enable_diff ? BEACON_VERSION_ENHANCED :
+                          BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY},// Minimum compatible version
       gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_),
-      availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_)
+      availability(availability_), last_osd_epoch(last_osd_epoch_),
+      last_gwmap_epoch(last_gwmap_epoch_), sequence(sequence_)
   {
     set_priority(CEPH_MSG_PRIO_HIGH);
   }
@@ -78,6 +84,7 @@ public:
   const epoch_t&           get_last_osd_epoch() const   { return last_osd_epoch; }
   const epoch_t&           get_last_gwmap_epoch() const { return last_gwmap_epoch; }
   const BeaconSubsystems&  get_subsystems()     const   { return subsystems; };
+  uint64_t get_sequence() const { return sequence; }
 
 private:
   ~MNVMeofGwBeacon() final {}
@@ -92,10 +99,14 @@ public:
     encode(gw_id, payload);
     encode(gw_pool, payload);
     encode(gw_group, payload);
-    encode(subsystems, payload);
+    encode(subsystems, payload, features);
     encode((uint32_t)availability, payload);
     encode(last_osd_epoch, payload);
     encode(last_gwmap_epoch, payload);
+    // Only encode sequence for enhanced beacons (HEAD_VERSION >= 2)
+    if (get_header().version >= BEACON_VERSION_ENHANCED) {
+      encode(sequence, payload);
+    }
   }
 
   void decode_payload() override {
@@ -112,6 +123,12 @@ public:
     availability = static_cast<gw_availability_t>(tmp);
     decode(last_osd_epoch, p);
     decode(last_gwmap_epoch, p);
+    // Only decode sequence for enhanced beacons (HEAD_VERSION >= 2)
+    if (get_header().version >= BEACON_VERSION_ENHANCED && !p.end()) {
+      decode(sequence, p);
+    } else {
+      sequence = 0;  // Legacy beacons don't have sequence field
+    }
   }
 
 private:
diff --git a/src/mon/NVMeofGwBeaconConstants.h b/src/mon/NVMeofGwBeaconConstants.h
new file mode 100644 (file)
index 0000000..7453905
--- /dev/null
@@ -0,0 +1,24 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef CEPH_NVMEOFGWBEACONCONSTANTS_H
+#define CEPH_NVMEOFGWBEACONCONSTANTS_H
+
+// This header contains version constants used across multiple files
+// to avoid duplication and maintain consistency.
+
+// Beacon version constants
+#define BEACON_VERSION_LEGACY 1             // Legacy beacon format (no diff support)
+#define BEACON_VERSION_ENHANCED 2           // Enhanced beacon format (with diff support)
+
+#endif /* CEPH_NVMEOFGWBEACONCONSTANTS_H */
index f6461ebac41d4ac75403f35394201d0f2092bda6..b2202705a583431a1943a4c888003b7b6a086af6 100755 (executable)
@@ -49,7 +49,8 @@ void NVMeofGwMap::to_gmap(
       }
 
       auto gw_state = NvmeGwClientState(
-       gw_created.ana_grp_id, epoch, availability);
+       gw_created.ana_grp_id, epoch, availability, gw_created.beacon_sequence,
+       gw_created.beacon_sequence_ooo);
       for (const auto& sub: gw_created.subsystems) {
        gw_state.subsystems.insert({
            sub.nqn,
@@ -82,10 +83,10 @@ void NVMeofGwMap::remove_grp_id(
 }
 
 int NVMeofGwMap::cfg_add_gw(
-  const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+  const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool test)
 {
   std::set<NvmeAnaGrpId> allocated;
-  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+  if (test || HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
     auto gw_epoch_it = gw_epoch.find(group_key);
     if (gw_epoch_it == gw_epoch.end()) {
       gw_epoch[group_key] = epoch;
@@ -169,18 +170,26 @@ int NVMeofGwMap::cfg_add_gw(
 }
 
 int NVMeofGwMap::cfg_delete_gw(
-  const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+  const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool test)
 {
+  if (test)
+    return do_delete_gw(gw_id, group_key);
+
   if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHA)) {
     dout(10) << " has NVMEOFHA: 1" << dendl;
     for (auto& gws_states: created_gws[group_key]) {
       if (gws_states.first == gw_id) {
         auto& state = gws_states.second;
+        if (state.availability == gw_availability_t::GW_AVAILABLE) {
+                  /*prevent failover because blocklisting right now cause IO errors */
+                  dout(4) << "Delete GW: set skip-failovers for group " << gw_id
+                                  << " group " << group_key << dendl;
+                  skip_failovers_for_group(group_key, 5);
+               }
         state.availability = gw_availability_t::GW_DELETING;
         dout(4) << " Deleting  GW :"<< gw_id  << " in state  "
             << state.availability <<  " Resulting GW availability: "
             << state.availability  << dendl;
-        state.subsystems.clear();//ignore subsystems of this GW
         utime_t now = ceph_clock_now();
         mon->nvmegwmon()->gws_deleting_time[group_key][gw_id] = now;
         return 0;
@@ -360,10 +369,16 @@ void NVMeofGwMap::track_deleting_gws(const NvmeGroupKey& group_key,
   }
 }
 
-void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key)
+void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key,
+   int interval_sec)
 {
-  const auto skip_failovers = g_conf().get_val<std::chrono::seconds>
-    ("mon_nvmeofgw_skip_failovers_interval");
+  std::chrono::seconds skip_failovers;
+  if (interval_sec == 0) {
+    skip_failovers = g_conf().get_val<std::chrono::seconds>
+             ("mon_nvmeofgw_skip_failovers_interval");
+       } else {
+       skip_failovers = std::chrono::seconds(interval_sec);
+  }
   for (auto& gw_created: created_gws[group_key]) {
     gw_created.second.allow_failovers_ts = std::chrono::system_clock::now()
         + skip_failovers;
@@ -408,6 +423,7 @@ int NVMeofGwMap::process_gw_map_gw_down(
     auto& st = gw_state->second;
     st.set_unavailable_state();
     st.set_last_gw_down_ts();
+    st.reset_beacon_sequence();
     for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
       fsm_handle_gw_down(
        gw_id, group_key, state_itr.second,
@@ -1063,14 +1079,23 @@ int NVMeofGwMap::blocklist_gw(
   // find_already_created_gw(gw_id, group_key);
   NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
   NvmeNonceVector nonces;
+
+  NvmeAnaNonceMap nonce_map;
+  for (const auto& sub: gw_map.subsystems) { // recreate nonce map from subsystems
+    for (const auto& ns: sub.namespaces) {
+         auto& nonce_vec = nonce_map[ns.anagrpid-1]; //Converting  ana groups to offsets
+         if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end())
+           nonce_vec.push_back(ns.nonce);
+    }
+  }
   for (auto& state_itr: gw_map.sm_state) {
     // to make blocklist on all clusters of the failing GW
-    nonces.insert(nonces.end(), gw_map.nonce_map[state_itr.first].begin(),
-        gw_map.nonce_map[state_itr.first].end());
+    nonces.insert(nonces.end(), nonce_map[state_itr.first].begin(),
+        nonce_map[state_itr.first].end());
   }
-
+  gw_map.subsystems.clear();
   if (nonces.size() > 0) {
-    NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];;
+    NvmeNonceVector &nonce_vector = nonces;
     std::string str = "[";
     entity_addrvec_t addr_vect;
 
@@ -1144,6 +1169,45 @@ void  NVMeofGwMap::validate_gw_map(const NvmeGroupKey& group_key)
   }
 }
 
+bool NVMeofGwMap::put_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+         int gw_version, const NvmeGroupKey& group_key,
+         uint64_t beacon_sequence, uint64_t& old_sequence)
+{
+  bool rc = true;
+  NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+                 (gw_version > 0) ) {
+    uint64_t seq_number = gw_map.beacon_sequence;
+    if ((beacon_sequence != seq_number+1) &&
+        !(beacon_sequence == 0 && seq_number == 0 )) {// new GW startup
+        rc = false;
+        old_sequence = seq_number;
+        dout(4) << "Warning: GW " << gw_id
+                << " sent beacon sequence out of order, expected "
+                << seq_number +1 << " received " << beacon_sequence << dendl;
+        gw_map.beacon_sequence_ooo = true;
+    } else {
+      gw_map.beacon_sequence = beacon_sequence;
+    }
+  }
+  return rc;
+}
+
+bool NVMeofGwMap::set_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+        int gw_version, const NvmeGroupKey& group_key, uint64_t beacon_sequence)
+{
+  NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+                 (gw_version > 0)) {
+      gw_map.beacon_sequence = beacon_sequence;
+      gw_map.beacon_sequence_ooo = false;
+      dout(10) << gw_id << " set beacon_sequence " << beacon_sequence << dendl;
+  }
+  return true;
+}
+
+
 void NVMeofGwMap::update_active_timers(bool &propose_pending)
 {
   const auto now = std::chrono::system_clock::now();
index f01a3d740c3eb105a746113ad22da048e7713ba2..0328b12611705e413b1ed5c553f5ec325179b702 100755 (executable)
@@ -65,15 +65,17 @@ public:
   void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
   void track_deleting_gws(const NvmeGroupKey& group_key,
     const BeaconSubsystems&  subs, bool &propose_pending);
-  int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
-  int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
   void check_all_gws_in_deleting_state(const NvmeGwId &gw_id,
     const NvmeGroupKey& group_key);
+  int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+    bool test = false);
+  int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+    bool test = false);
   void process_gw_map_ka(
     const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
     epoch_t& last_osd_epoch,  bool &propose_pending);
   int process_gw_map_gw_down(
-    const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+    const NvmeGwId &gw_id, const Nv/meGroupKey& group_key,
     bool &propose_pending);
   int process_gw_map_gw_no_subsys_no_listeners(
     const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
@@ -92,7 +94,13 @@ public:
        const NvmeGroupKey& group_key, bool &propose_pending);
   void set_addr_vect(const NvmeGwId &gw_id,
       const NvmeGroupKey& group_key, const entity_addr_t &addr_vect);
-  void skip_failovers_for_group(const NvmeGroupKey& group_key);
+  void skip_failovers_for_group(const NvmeGroupKey& group_key,
+      int interval_sec = 0);
+  bool put_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+      const NvmeGroupKey& group_key, uint64_t beacon_sequence,
+      uint64_t& old_sequence);
+  bool set_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+         const NvmeGroupKey& group_key, uint64_t beacon_sequence);
 private:
   int  do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
   int  do_erase_gw_id(const NvmeGwId &gw_id,
index c3a13eba1ab68f97c4fb18e3ac566e179c2d72f1..2b029f8cf24646999fbd91d69a2e14b5099a5056 100644 (file)
@@ -71,6 +71,9 @@ void NVMeofGwMon::synchronize_last_beacon()
       // force send ack after nearest beacon after leader re-election
       gw_created_pair.second.beacon_index =
           g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+     // force send full beacon after leader election
+      gw_created_pair.second.beacon_sequence = 0;
+      gw_created_pair.second.beacon_sequence_ooo = true;
     }
   }
 }
@@ -138,7 +141,9 @@ void NVMeofGwMon::tick()
   for (auto &[group_key, gws_states]: pending_map.created_gws) {
     BeaconSubsystems *subsystems = &empty_subsystems;
     for (auto& gw_state : gws_states) { // loop for GWs inside nqn group
-      subsystems = &gw_state.second.subsystems;
+      if (gw_state.second.availability == gw_availability_t::GW_AVAILABLE) {
+        subsystems = &gw_state.second.subsystems;
+      }
       if (subsystems->size()) { // Set subsystems to the valid value
         break;
       }
@@ -172,7 +177,7 @@ version_t NVMeofGwMon::get_trim_to() const
  * function called to restore in pending map all data that is not serialized
  * to paxos peons. Othervise it would be overriden in "pending_map = map"
  * currently "allow_failovers_ts", "last_gw_down_ts",
- * "last_gw_map_epoch_valid" variables are restored
+ * "last_gw_map_epoch_valid", "beacon_sequence", "beacon_index" variables are restored
  */
 void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
   std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
@@ -196,6 +201,12 @@ void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
           gw_created_pair.second.last_gw_down_ts;
       pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
          gw_created_pair.second.last_gw_map_epoch_valid;
+      pending_map.created_gws[group_key][gw_id].beacon_index =
+            gw_created_pair.second.beacon_index;
+      pending_map.created_gws[group_key][gw_id].beacon_sequence =
+            gw_created_pair.second.beacon_sequence;
+      pending_map.created_gws[group_key][gw_id].beacon_sequence_ooo =
+            gw_created_pair.second.beacon_sequence_ooo;
     }
   }
 }
@@ -234,7 +245,8 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
   }
   pending_map.encode(bl, features);
   dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
-       << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
+       << " has NVMEOFHAMAP: " <<  HAVE_FEATURE(features, NVMEOFHAMAP)
+       << " has BEACON_DIFF: " <<  HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) << dendl;
   put_version(t, pending_map.epoch, bl);
   put_last_committed(t, pending_map.epoch);
 
@@ -247,7 +259,9 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
 void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)
 {
   version_t version = get_last_committed();
-
+  uint64_t features = mon.get_quorum_con_features();
+  dout(20) << " has BEACON_DIFF: " <<  HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)
+                  << dendl;
   if (version != map.epoch) {
     dout(10) << " NVMeGW loading version " << version
             << " " << map.epoch << dendl;
@@ -649,23 +663,176 @@ epoch_t NVMeofGwMon::get_ack_map_epoch(bool gw_created,
   return rc;
 }
 
+void NVMeofGwMon::do_send_map_ack(MonOpRequestRef op,
+       bool gw_created, bool gw_propose,
+    uint64_t stored_sequence, bool is_correct_sequence,
+    const NvmeGroupKey& group_key, const NvmeGwId &gw_id) {
+  /* always send beacon ack to gw in Created state,
+   * it should be temporary state
+   * if epoch-filter-bit: send ack to beacon in case no propose
+   * or if changed something not relevant to gw-epoch
+  */
+  NVMeofGwMap ack_map;
+  if (gw_created) {
+       NvmeGwMonState& pending_gw_map = pending_map.created_gws[group_key][gw_id];
+    // respond with a map slice correspondent to the same GW
+    ack_map.created_gws[group_key][gw_id] = (gw_propose) ? //avail = CREATED
+      pending_gw_map : map.created_gws[group_key][gw_id];
+    ack_map.created_gws[group_key][gw_id].beacon_sequence =
+      pending_gw_map.beacon_sequence;
+    if (!is_correct_sequence) {
+      dout(4) << " GW " << gw_id <<
+      " sending ACK due to receiving beacon_sequence out of order" << dendl;
+      ack_map.created_gws[group_key][gw_id].beacon_sequence = stored_sequence;
+      ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = true;
+    } else {
+        ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = false;
+    }
+    if (gw_propose) {
+     dout(10) << "GW in Created " << gw_id << " ack map " << ack_map << dendl;
+    }
+  }
+  ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+  if (!gw_created)
+    dout(10) << "gw not created, ack map "
+             << ack_map << " epoch " << ack_map.epoch << dendl;
+  dout(20) << "ack_map " << ack_map <<dendl;
+  auto msg = make_message<MNVMeofGwMap>(ack_map);
+  mon.send_reply(op, msg.detach());
+}
+
+/*
+ * Any subsystem was added to the Beacon only if it( or it's encapsulated fields)
+ * was changed, added or deleted. If nothing changed the beacon did not
+ * encode any subsystem.
+ * New descriptor was added under the subsystem to describe
+ * the change : ADDED, DELETED, CHANGED
+ * rules for apply the subsystems to the map:
+ * Pass all subsystems in the beacon->sub list
+ * if descriptor is ADDED or CHANGED do the following
+ * look for subs nqn in the gw.subs list and if found - substitute,
+ * if not found - add
+ * if descriptor is DELETED do the following
+ * look for subs nqn in the gw.subs list and if found - delete
+ */
+int NVMeofGwMon::apply_beacon(const NvmeGwId &gw_id, int gw_version,
+        const NvmeGroupKey& group_key, void *msg,
+        const BeaconSubsystems& sub, gw_availability_t &avail,
+        bool &propose_pending)
+{
+  bool found = false;
+  bool changed = false;
+  BeaconSubsystems &gw_subs =
+                     pending_map.created_gws[group_key][gw_id].subsystems;
+  auto &state = pending_map.created_gws[group_key][gw_id];
+
+  if (!HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOF_BEACON_DIFF) &&
+                 gw_version == 0) {
+    if (gw_subs != sub) {
+      dout(10) << "BEACON_DIFF logic not applied."
+          "subsystems of GW changed, propose pending " << gw_id << dendl;
+      gw_subs = sub;
+      //rebuild  nonce map.
+      state.nonce_map = ((MNVMeofGwBeacon *)msg)->get_nonce_map();
+      changed = true;
+    }
+  } else {
+    if (state.beacon_sequence_ooo) {
+      dout(10) << "Good sequence after out of order detection "
+         "sequence "<< state.beacon_sequence << " " << gw_id << dendl;
+      // need to clear subsystems for correct calculation of difference
+      state.subsystems.clear();
+      state.beacon_sequence_ooo = false;
+      propose_pending = true;
+    }
+
+    for (auto &subs_it: sub) {
+      if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED ||
+          subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED) {
+        found = false;
+        for (auto &gw_subs_it: gw_subs) {
+          if (gw_subs_it.nqn == subs_it.nqn) {
+            gw_subs_it = subs_it;
+            dout(10) << "subsystem changed " << subs_it.nqn << " change descr "
+                     << (uint32_t)subs_it.change_descriptor << dendl;
+            found = true;
+            changed = true;
+            break;
+          }
+        }
+        if (!found) {
+          gw_subs.push_back(subs_it);
+          changed = true;
+          dout(10) << "subsystem added " << subs_it.nqn <<  " change descr "
+                 << (uint32_t)subs_it.change_descriptor << dendl;
+        }
+      }
+      else
+        if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED)
+        {
+           auto it = std::find_if(gw_subs.begin(), gw_subs.end(),
+           [&](const auto& gw_subs_it) {
+             return gw_subs_it.nqn == subs_it.nqn;
+           });
+           if (it != gw_subs.end()) {
+             gw_subs.erase(it);
+             dout(10) << "subsystem deleted " << subs_it.nqn << " change descr "
+                      << (uint32_t)subs_it.change_descriptor << dendl;
+             changed = true;
+           }
+        }
+    }
+  }
+  if (changed) {
+    avail = gw_availability_t::GW_AVAILABLE;
+  }
+  if (gw_subs.size() == 0) {
+      avail = gw_availability_t::GW_CREATED;
+      dout(10) << "No-subsystems condition detected for GW " << gw_id <<dendl;
+    } else {
+      bool listener_found = false;
+      for (auto &subs: gw_subs) {
+        if (subs.listeners.size()) {
+          listener_found = true;
+          break;
+        }
+      }
+      if (!listener_found) {
+       dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
+       avail = gw_availability_t::GW_CREATED;
+      }
+    }// for HA no-subsystems and no-listeners are same usecases
+  if (avail == gw_availability_t::GW_UNAVAILABLE) {
+         dout(4) << "Warning: UNAVAILABLE gw " << gw_id << dendl;
+  }
+  return (changed == true ? 1:0);
+}
+
 bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 {
   auto m = op->get_req<MNVMeofGwBeacon>();
-
-  dout(20) << "availability " <<  m->get_availability()
+  uint64_t sequence = m->get_sequence();
+  int version = m->version;
+  dout(10) << "availability " << m->get_availability()
+       << " sequence " << sequence << " version " << version
           << " GW : " << m->get_gw_id()
           << " osdmap_epoch " << m->get_last_osd_epoch()
-          << " subsystems " << m->get_subsystems() << dendl;
+          << " subsystems " << m->get_subsystems().size() << dendl;
+
   ConnectionRef con = op->get_connection();
   NvmeGwId gw_id = m->get_gw_id();
   NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(),  m->get_gw_group());
+  //"avail" variable will be changed inside the function
+  // when it becomes CREATED for several reasons GW's load balance group
+  //  is serviced by another GW
   gw_availability_t  avail = m->get_availability();
   bool propose = false;
   bool nonce_propose = false;
   bool timer_propose = false;
   bool gw_propose    = false;
   bool gw_created = true;
+  bool correct_sequence = true;
+  uint64_t stored_sequence;
   NVMeofGwMap ack_map;
   bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
                               NVMEOFHAMAP);
@@ -686,6 +853,9 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
               << map.created_gws << dendl;
       goto set_propose;
     } else {
+      pending_map.created_gws[group_key][gw_id].subsystems.clear();
+      pending_map.set_gw_beacon_sequence_number(gw_id, version,
+            group_key, sequence);
       dout(4) << "GW beacon: Created state - full startup done " << gw_id
               << " GW state in monitor data-base : "
               << pending_map.created_gws[group_key][gw_id].availability
@@ -714,6 +884,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   } else { // first GW beacon should come with avail = Created
     // if GW reports Avail/Unavail but in monitor's database it is Unavailable
     if (gw != group_gws.end()) {
+      correct_sequence = pending_map.put_gw_beacon_sequence_number
+           (gw_id, version, group_key, sequence, stored_sequence);
       // it means it did not perform "exit" after failover was set by
       // NVMeofGWMon
       if ((pending_map.created_gws[group_key][gw_id].availability ==
@@ -731,6 +903,18 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
        mon.send_reply(op, msg.detach());
        goto false_return;
       }
+      if (!correct_sequence) {
+        if (avail == gw_availability_t::GW_AVAILABLE) {
+          /*prevent failover - give GW a chance to send the expected sequence */
+          dout(4) << "sequence ooo: set skip-failovers for group " << gw_id
+                  << " group " << group_key << dendl;
+          pending_map.skip_failovers_for_group(group_key, 7);
+        }
+        avail = gw_availability_t::GW_CREATED;
+        // availability would be set to Active and GW receive the full map
+        // when it sends the correct beacon-sequence,
+        goto check_availability;
+      }
     }
   }
   // Beacon from GW in !Created state but it does not appear in the map
@@ -756,44 +940,12 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
     pending_map.set_addr_vect(gw_id, group_key, con->get_peer_addr());
     gw_propose = true;
   }
-  // deep copy the whole nonce map of this GW
-  if (m->get_nonce_map().size()) {
-    if (pending_map.created_gws[group_key][gw_id].nonce_map !=
-       m->get_nonce_map()) {
-      dout(10) << "nonce map of GW  changed , propose pending "
-              << gw_id << dendl;
-      pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map();
-      dout(10) << "nonce map of GW " << gw_id << " "
-              << pending_map.created_gws[group_key][gw_id].nonce_map  << dendl;
-      nonce_propose = true;
-    }
-  } else {
-    dout(10) << "Warning: received empty nonce map in the beacon of GW "
-            << gw_id << " avail " << (int)avail << dendl;
-  }
-
-  if (sub.size() == 0) {
-    avail = gw_availability_t::GW_CREATED;
-    dout(20) << "No-subsystems condition detected for GW " << gw_id <<dendl;
-  } else {
-    bool listener_found = false;
-    for (auto &subs: sub) {
-      if (subs.listeners.size()) {
-        listener_found = true;
-        break;
-      }
-    }
-    if (!listener_found) {
-     dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
-     avail = gw_availability_t::GW_CREATED;
-    }
-  }// for HA no-subsystems and no-listeners are same usecases
-  if (pending_map.created_gws[group_key][gw_id].subsystems != sub) {
-    dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl;
-    pending_map.created_gws[group_key][gw_id].subsystems =  sub;
-    dout(20) << "subsystems of GW " << gw_id << " "
-            << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
+  if (apply_beacon(gw_id, version, group_key, (void *)m, sub, avail, propose) !=0) {
     nonce_propose = true;
+    dout(10) << "subsystem(subs/listener/nonce/NM) of GW changed, propose pending "
+             << gw_id << " available " << avail <<  dendl;
+    dout(20) << "subsystems of GW " << gw_id << " "
+             << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
   }
   pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
     (get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
@@ -803,10 +955,11 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
             << " epoch " << get_ack_map_epoch(true, group_key)
             << " beacon_epoch " << m->get_last_gwmap_epoch() <<  dendl;
   }
+
+check_availability:
   if (avail == gw_availability_t::GW_AVAILABLE) {
     // check pending_map.epoch vs m->get_version() -
     // if different - drop the beacon
-
     LastBeacon lb = {gw_id, group_key};
     last_beacon[lb] = now;
     epoch_t last_osd_epoch = m->get_last_osd_epoch();
@@ -819,9 +972,10 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   // Periodic: check active FSM timers
   pending_map.update_active_timers(timer_propose);
 
- set_propose:
+set_propose:
   propose |= (timer_propose | gw_propose | nonce_propose);
-  apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+  apply_ack_logic = ((avail == gw_availability_t::GW_AVAILABLE)
+                      && correct_sequence) ? true : false;
   if ( (apply_ack_logic &&
       ((pending_map.created_gws[group_key][gw_id].beacon_index++
           % beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
@@ -836,22 +990,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
                     (!propose && !epoch_filter_enabled) ||
                     (avail == gw_availability_t::GW_CREATED)) ) {
-          /* always send beacon ack to gw in Created state,
-           * it should be temporary state
-           * if epoch-filter-bit: send ack to beacon in case no propose
-           * or if changed something not relevant to gw-epoch
-          */
-    if (gw_created) {
-      // respond with a map slice correspondent to the same GW
-      ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
-    }
-    ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
-    if (!gw_created)
-      dout(10) << "gw not created, ack map "
-                        << ack_map << " epoch " << ack_map.epoch << dendl;
-    dout(20) << "ack_map " << ack_map <<dendl;
-    auto msg = make_message<MNVMeofGwMap>(ack_map);
-    mon.send_reply(op, msg.detach());
+    do_send_map_ack(op, gw_created, gw_propose, stored_sequence,
+                               correct_sequence, group_key, gw_id);
   } else {
     mon.no_reply(op);
   }
index 07e7f33524e6dfd858aa3fef62197e5efcc01cb8..548cd218bee9fcb4cf583be48fd396dd5ac51c2d 100644 (file)
@@ -101,6 +101,12 @@ private:
   void recreate_gw_epoch();
   void restore_pending_map_info(NVMeofGwMap & tmp_map);
   void cleanup_pending_map();
+  int apply_beacon(const NvmeGwId &gw_id, int gw_version,
+             const NvmeGroupKey& group_key, void *msg,
+                        const BeaconSubsystems& sub, gw_availability_t &avail, bool &propose_pending);
+  void do_send_map_ack(MonOpRequestRef op, bool gw_created, bool gw_propose,
+       uint64_t stored_sequence, bool is_correct_sequence,
+       const NvmeGroupKey& group_key, const NvmeGwId &gw_id);
 };
 
 #endif /* MON_NVMEGWMONITOR_H_ */
index b259d7b87466f25eb96affa7e21365e42ca663d0..244524c299c82a40dedf5db861311256b45f7aef 100755 (executable)
@@ -13,6 +13,9 @@
  */
 #ifndef MON_NVMEOFGWSERIALIZE_H_
 #define MON_NVMEOFGWSERIALIZE_H_
+
+#include "mon/NVMeofGwBeaconConstants.h"
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
@@ -107,7 +110,8 @@ inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) {
 }
 
 inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) {
-  os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ ";
+  os << "BeaconSubsystem( nqn:" << value.nqn << " descr "
+     << (uint32_t)value.change_descriptor << ", listeners [ ";
   for (const auto& list: value.listeners) os << list << " ";
   os << "] namespaces [ ";
   for (const auto& ns: value.namespaces) os << ns << " ";
@@ -124,7 +128,9 @@ inline std::ostream& operator<<(
   std::ostream& os, const NvmeGwClientState value) {
   os <<  "NvmeGwState { group id: " << value.group_id
      << " gw_map_epoch " <<  value.gw_map_epoch
-     << " availablilty "<< value.availability
+     << " availablilty " << value.availability
+     << " sequence " << value.last_beacon_seq_number
+     << " sequence-ooo " << value.last_beacon_seq_ooo
      << " GwSubsystems: [ ";
   for (const auto& sub: value.subsystems) {
     os << sub.second << " ";
@@ -274,7 +280,7 @@ inline void encode(
   for (const auto& sub: subsystems) {
     encode(sub.second.nqn, bl);
     if (version == 1) {
-      dout(20) << "encode ana_state vector version1 = " << version << dendl;
+      dout(20) << "encode ana_state vector version1 = " << (int)version << dendl;
       /* Version 1 requires exactly 16 entries */
       ana_state_t filled(sub.second.ana_state);
       filled.resize(
@@ -284,7 +290,7 @@ inline void encode(
          0));
       encode(filled, bl);
     } else {
-      dout(20) << "encode ana_state vector version2 = " << version << dendl;
+      dout(20) << "encode ana_state vector version2 = " << (int)version << dendl;
       encode(sub.second.ana_state, bl);
     }
   }
@@ -308,23 +314,37 @@ inline  void decode(
 }
 
 inline void encode(const NvmeGwClientState& state,  ceph::bufferlist &bl, uint64_t features) {
-  ENCODE_START(1, 1, bl);
+  uint8_t version = 1;
+  if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+     version = BEACON_VERSION_ENHANCED;
+  }
+  ENCODE_START(version, version, bl);
   encode(state.group_id, bl);
   encode(state.gw_map_epoch, bl);
   encode (state.subsystems, bl, features);
   encode((uint32_t)state.availability, bl);
+  if (version >= 2) {
+    encode((uint64_t)state.last_beacon_seq_number, bl);
+    encode(state.last_beacon_seq_ooo, bl);
+  }
   ENCODE_FINISH(bl);
 }
 
 inline  void decode(
   NvmeGwClientState& state,  ceph::bufferlist::const_iterator& bl) {
-  DECODE_START(1, bl);
+  DECODE_START(2, bl);
   decode(state.group_id, bl);
   decode(state.gw_map_epoch, bl);
   decode(state.subsystems, bl);
   uint32_t avail;
+  uint64_t last_beacon_seq_number;
   decode(avail, bl);
   state.availability = (gw_availability_t)avail;
+  if (struct_v >= 2) {
+    decode(last_beacon_seq_number, bl);
+    state.last_beacon_seq_number = last_beacon_seq_number;
+    decode(state.last_beacon_seq_ooo, bl);
+  }
   DECODE_FINISH(bl);
 }
 
@@ -418,6 +438,7 @@ inline void encode(const NvmeAnaNonceMap& nonce_map,  ceph::bufferlist &bl,
   uint64_t features) {
   ENCODE_START(1, 1, bl);
   encode((uint32_t)nonce_map.size(), bl);
+  dout(20) << "encode nonce map  size " << nonce_map.size() << dendl;
   for (auto& ana_group_nonces : nonce_map) {
     // ana group id
     encode(ana_group_nonces.first, bl);
@@ -458,10 +479,12 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
     version = 3;
   }
   ENCODE_START(version, version, bl);
+  dout(20) << "encode NvmeGwMonStates. struct_v: " << (int)version << dendl;
   encode ((uint32_t)gws.size(), bl); // number of gws in the group
   for (auto& gw : gws) {
     encode(gw.first, bl);// GW_id
     encode(gw.second.ana_grp_id, bl); // GW owns this group-id
+    dout(20) << "encode gw-id " << gw.first << dendl;
     if (version >= 2) {
       encode((uint32_t)gw.second.sm_state.size(), bl);
       for (auto &state_it:gw.second.sm_state) {
@@ -471,7 +494,9 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       encode((uint32_t)gw.second.availability, bl);
       encode((uint16_t)gw.second.performed_full_startup, bl);
       encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
-      encode(gw.second.subsystems, bl);
+      dout(20) << "encode availability " << gw.second.availability
+               << " startup " << (int)gw.second.performed_full_startup << dendl;
+      encode(gw.second.subsystems, bl, features);
 
       encode((uint32_t)gw.second.blocklist_data.size(), bl);
       for (auto &blklst_itr: gw.second.blocklist_data) {
@@ -488,7 +513,7 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       encode((uint32_t)gw.second.availability, bl);
       encode((uint16_t)gw.second.performed_full_startup, bl);
       encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
-      encode(gw.second.subsystems, bl); // TODO reuse but put features - encode version
+      encode(gw.second.subsystems, bl, features);
       Blocklist_data bl_data[MAX_SUPPORTED_ANA_GROUPS];
       for (auto &blklst_itr: gw.second.blocklist_data) {
         bl_data[blklst_itr.first].osd_epoch   = blklst_itr.second.osd_epoch;
@@ -590,6 +615,7 @@ inline void decode(
       dout(20) << "decode addr_vect and beacon_index" << dendl;
       gw_created.addr_vect.decode(bl);
       decode(gw_created.beacon_index, bl);
+      dout(20) << "decoded beacon_index " << gw_created.beacon_index << dendl;
     }
 
     gws[gw_name] = gw_created;
@@ -795,7 +821,7 @@ inline void decode(NvmeGwTimers& md, ceph::buffer::list::const_iterator &bl) {
 }
 
 inline void encode(const BeaconNamespace& ns,  ceph::bufferlist &bl) {
-  ENCODE_START(1, 1, bl);
+  ENCODE_START(BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY, bl);
   encode(ns.anagrpid, bl);
   encode(ns.nonce, bl);
   ENCODE_FINISH(bl);
@@ -809,7 +835,7 @@ inline void decode(BeaconNamespace& ns, ceph::buffer::list::const_iterator &bl)
 }
 
 inline void encode(const BeaconListener& ls,  ceph::bufferlist &bl) {
-  ENCODE_START(1, 1, bl);
+  ENCODE_START(BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY, bl);
   encode(ls.address_family, bl);
   encode(ls.address, bl);
   encode(ls.svcid, bl);
@@ -824,22 +850,41 @@ inline void decode(BeaconListener& ls, ceph::buffer::list::const_iterator &bl) {
   DECODE_FINISH(bl);
 }
 
-inline void encode(const BeaconSubsystem& sub,  ceph::bufferlist &bl) {
-  ENCODE_START(1, 1, bl);
+inline void encode(const BeaconSubsystem& sub,  ceph::bufferlist &bl, uint64_t features) {
+  uint8_t version = BEACON_VERSION_LEGACY;  // Default to legacy version
+  if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+    version = BEACON_VERSION_ENHANCED;  // Use enhanced version if feature supported
+  }
+  // For legacy encoding, skip deleted subsystems to maintain compatibility
+  if (version == BEACON_VERSION_LEGACY &&
+      sub.change_descriptor != subsystem_change_t::SUBSYSTEM_ADDED) {
+    dout(4) << "encode BeaconSubsystem: skipping subsystem " << sub.nqn
+            << " with change_descriptor " << (int)sub.change_descriptor
+            << " in legacy mode" << dendl;
+    return; // Skip encoding this subsystem entirely
+  }
+
+  ENCODE_START(version, version, bl);
   encode(sub.nqn, bl);
+  dout(20) << "encode BeaconSubsystems " << sub.nqn <<" features " <<  features
+           << " version " << (int)version << dendl;
   encode((uint32_t)sub.listeners.size(), bl);
   for (const auto& ls: sub.listeners)
     encode(ls, bl);
   encode((uint32_t)sub.namespaces.size(), bl);
   for (const auto& ns: sub.namespaces)
     encode(ns, bl);
+  if (version >= BEACON_VERSION_ENHANCED) {
+    encode((uint32_t)sub.change_descriptor, bl);
+    dout(20) << "encode BeaconSubsystems change-descr: " << (uint32_t)sub.change_descriptor << dendl;
+  }
   ENCODE_FINISH(bl);
 }
 
 inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) {
-  DECODE_START(1, bl);
-  dout(20) << "decode BeaconSubsystems " << dendl;
+  DECODE_START(BEACON_VERSION_ENHANCED, bl);  // Always decode with enhanced version support
   decode(sub.nqn, bl);
+  dout(20) << "decode BeaconSubsystems " << sub.nqn << dendl;
   uint32_t s;
   sub.listeners.clear();
   decode(s, bl);
@@ -856,6 +901,12 @@ inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl)
     decode(ns, bl);
     sub.namespaces.push_back(ns);
   }
+  if (struct_v >= BEACON_VERSION_ENHANCED) {
+    uint32_t change_desc;
+    decode(change_desc, bl);
+    sub.change_descriptor = static_cast<subsystem_change_t>(change_desc);
+    dout(20) << "decode BeaconSubsystems version >= " << BEACON_VERSION_ENHANCED << dendl;
+  }
   DECODE_FINISH(bl);
 }
 
index 6a2f9506e2e3cf8afb3fcf2e736f53f47c8ca53e..cd22dcbc4fe6a4250295e8fb23ba017580d51063 100755 (executable)
 #include <iomanip>
 #include <map>
 #include <iostream>
+#include <list>
+#include <vector>
+#include <cstdint>
+#include <chrono>
+#include "include/types.h"
+#include "msg/msg_types.h"
 
 using NvmeGwId = std::string;
 using NvmeGroupKey = std::pair<std::string, std::string>;
@@ -47,6 +53,12 @@ enum class gw_availability_t {
   GW_DELETED
 };
 
+enum class subsystem_change_t {
+  SUBSYSTEM_ADDED,
+  SUBSYSTEM_CHANGED,
+  SUBSYSTEM_DELETED
+};
+
 #define REDUNDANT_GW_ANA_GROUP_ID 0xFF
 using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>;
 
@@ -86,6 +98,7 @@ struct BeaconSubsystem {
   NvmeNqnId nqn;
   std::list<BeaconListener>  listeners;
   std::list<BeaconNamespace> namespaces;
+  subsystem_change_t change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
 
   // Define the equality operator
   bool operator==(const BeaconSubsystem& other) const {
@@ -131,7 +144,9 @@ struct NvmeGwMonState {
   BlocklistData blocklist_data;
   //ceph entity address allocated for the GW-client that represents this GW-id
   entity_addrvec_t addr_vect;
-  uint16_t beacon_index = 0;
+  uint64_t beacon_sequence = 0;// sequence number of last beacon copied to GW state
+  bool beacon_sequence_ooo = false; // last beacon sequence was out of order;
+  uint16_t beacon_index = 0; // used for filter acks sent to the client as response to beacon
   /**
    * during redeploy action and maybe other emergency use-cases gw performs scenario
    * that we call fast-reboot. It quickly reboots(due to redeploy f.e) and sends the
@@ -170,6 +185,9 @@ struct NvmeGwMonState {
      // it expects it performed the full startup
     performed_full_startup = false;
   }
+  void reset_beacon_sequence(){
+    beacon_sequence = 0;
+  }
   void standby_state(NvmeAnaGrpId grpid) {
     sm_state[grpid]       = gw_states_per_group_t::GW_STANDBY_STATE;
   }
@@ -231,12 +249,16 @@ struct NvmeGwClientState {
   epoch_t gw_map_epoch;
   GwSubsystems subsystems;
   gw_availability_t availability;
-  NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available)
-    : group_id(id), gw_map_epoch(epoch), availability(available) {}
+  uint64_t last_beacon_seq_number;
+  bool last_beacon_seq_ooo; //out of order sequence
+  NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available,
+     uint64_t sequence, bool sequence_ooo)
+    : group_id(id), gw_map_epoch(epoch), availability(available),
+      last_beacon_seq_number(sequence), last_beacon_seq_ooo(sequence_ooo) {}
 
   NvmeGwClientState()
     : NvmeGwClientState(
-      REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}
+      REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE, 0, 0) {}
 };
 
 struct Tmdata {
index e6600cda451ac52eacd2e5b647f9c7b3f533c17f..1d0a12cd752aa455a127d7ac32f16f29b10e692e 100644 (file)
@@ -897,7 +897,7 @@ Message *decode_message(CephContext *cct,
 
   case MSG_MNVMEOF_GW_BEACON:
     m = make_message<MNVMeofGwBeacon>();
-  break;
+    break;
 
   case MSG_MON_MGR_REPORT:
     m = make_message<MMonMgrReport>();
index c362bf21ca2a82020dd00884a269d5175b41f24d..30ce5dca1ffc6f2a4735db19b7cef4c5bd97be3a 100644 (file)
@@ -12,6 +12,7 @@
  */
 
 #include <boost/algorithm/string/replace.hpp>
+#include <fmt/format.h>
 
 #include "common/errno.h"
 #include "common/signal.h"
@@ -19,6 +20,7 @@
 #include "include/compat.h"
 
 #include "include/stringify.h"
+#include "include/ceph_features.h"
 #include "global/global_context.h"
 #include "global/signal_handler.h"
 
@@ -28,6 +30,7 @@
 #include "NVMeofGwMonitorClient.h"
 #include "NVMeofGwClient.h"
 #include "NVMeofGwMonitorGroupClient.h"
+#include "nvmeof/NVMeofGwUtils.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mon
@@ -41,6 +44,8 @@ NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) :
   last_map_time(std::chrono::steady_clock::now()),
   reset_timestamp(std::chrono::steady_clock::now()),
   start_time(last_map_time),
+  cluster_features(0),
+  poolctx(),
   monc{g_ceph_context, poolctx},
   client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())),
   objecter{g_ceph_context, client_messenger.get(), &monc, poolctx},
@@ -211,7 +216,7 @@ void NVMeofGwMonitorClient::send_beacon()
 {
   ceph_assert(ceph_mutex_is_locked_by_me(beacon_lock));
   gw_availability_t gw_availability = gw_availability_t::GW_CREATED;
-  BeaconSubsystems subs;
+  BeaconSubsystems current_subsystems;
   NVMeofGwClient gw_client(
      grpc::CreateChannel(gateway_address, gw_creds()));
   subsystems_info gw_subsystems;
@@ -231,26 +236,45 @@ void NVMeofGwMonitorClient::send_beacon()
         BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() };
         bsub.listeners.push_back(bls);
       }
-      subs.push_back(bsub);
+      current_subsystems.push_back(bsub);
     }
   }
 
+  // Determine change descriptors by comparing with previous beacon's subsystems
+  BeaconSubsystems subs = current_subsystems;
+  determine_subsystem_changes(prev_beacon_subsystems, subs);
+
   auto group_key = std::make_pair(pool, group);
   NvmeGwClientState old_gw_state;
   // if already got gateway state in the map
   if (first_beacon == false && get_gw_state("old map", map, group_key, name, old_gw_state))
     gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE;
-  dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
+  dout(1) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
     " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl;
+
+  // Check if NVMEOF_BEACON_DIFF feature is supported by the cluster
+  bool include_diff = HAVE_FEATURE(cluster_features, NVMEOF_BEACON_DIFF);
+
+  dout(10) << fmt::format("Cluster features: 0x{:x}, NVMEOF_BEACON_DIFF supported: {}",
+                          cluster_features, include_diff ? "yes" : "no") << dendl;
+
+  // Send beacon with appropriate version based on cluster features
   auto m = ceph::make_message<MNVMeofGwBeacon>(
       name,
       pool,
       group,
-      subs,
+      include_diff ? subs : current_subsystems,
       gw_availability,
       osdmap_epoch,
-      gwmap_epoch);
+      gwmap_epoch,
+      beacon_sequence,
+      include_diff  // Pass the feature flag directly to constructor
+  );
+  dout(10) << "sending beacon with diff support: " << (include_diff ? "enabled" : "disabled") << dendl;
+  
   monc.send_mon_message(std::move(m));
+  ++beacon_sequence;
+  prev_beacon_subsystems = std::move(current_subsystems);
 }
 
 void NVMeofGwMonitorClient::disconnect_panic()
@@ -352,6 +376,22 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
   // ensure that the gateway state has not vanished
   ceph_assert(got_new_gw_state || !got_old_gw_state);
 
+  // Check if the last_beacon_seq_number in the received map doesn't match our last sent beacon_sequence
+  // beacon_sequence is incremented after sending, so we compare with (beacon_sequence - 1)
+  {
+    std::lock_guard bl(beacon_lock);
+    if (got_new_gw_state && new_gw_state.last_beacon_seq_ooo) {
+        //new_gw_state.last_beacon_seq_number != (beacon_sequence - 1)) {
+      dout(4) << "Beacon sequence mismatch detected. Expected: " << (beacon_sequence - 1)
+               << ", received: " << new_gw_state.last_beacon_seq_number
+               << ". Truncating previous subsystems list." << dendl;
+      beacon_sequence = new_gw_state.last_beacon_seq_number + 1;
+      prev_beacon_subsystems.clear();
+      dout(4) << "OOO map received, Ignore it" << dendl;
+      return;
+    }
+  }
+
   if (!got_old_gw_state) {
     if (!got_new_gw_state) {
       dout(10) << "Can not find new gw state" << dendl;
@@ -386,10 +426,29 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
     }
   }
 
+  // Combined subsystems
+  const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
+  GwSubsystems combined_subsystems = new_gw_state.subsystems;
+  for (const auto& nqn_state_pair: old_gw_state.subsystems) {
+    const auto& nqn = nqn_state_pair.first;
+    auto& old_nqn_state = nqn_state_pair.second;
+
+    // The monitor might remove active subsystems from the new distributed GwSubsystems.
+    // In such cases, ensure an INACCESSIBLE state is generated for subsystems
+    // that were present in the old state but are now missing.
+    if (new_gw_state.subsystems.find(nqn) == new_gw_state.subsystems.end()) {
+      ana_state_t all_disabled(old_nqn_state.ana_state.size(), initial_ana_state);
+      dout(4) << "set all groups to Inacccessible stat for " << nqn << dendl;
+      NqnState    nqn_state(nqn, all_disabled);
+
+      combined_subsystems.insert({nqn, nqn_state});
+    }
+  }
+
   // Gather all state changes
   ana_info ai;
   epoch_t max_blocklist_epoch = 0;
-  for (const auto& nqn_state_pair: new_gw_state.subsystems) {
+  for (const auto& nqn_state_pair: combined_subsystems) {
     auto& sub = nqn_state_pair.second;
     const auto& nqn = nqn_state_pair.first;
     nqn_ana_states nas;
@@ -403,7 +462,6 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
        sub.ana_state.size();
 
     for (NvmeAnaGrpId  ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) {
-      const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
       auto new_group_state = (ana_grp_index < sub.ana_state.size()) ?
        sub.ana_state[ana_grp_index] :
        initial_ana_state;
@@ -457,6 +515,17 @@ Dispatcher::dispatch_result_t NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Me
   std::lock_guard l(lock);
   dout(10) << "got map type " << m->get_type() << dendl;
 
+  // print connection features for all incoming messages and update cluster features
+  if (m->get_connection()) {
+    uint64_t features = m->get_connection()->get_features();
+    dout(4) << fmt::format("Monitor connection features: 0x{:x}", features) << dendl;
+    
+    // Update cluster features with the union of all seen features
+    // This ensures we track the highest level of features supported by the cluster
+    cluster_features |= features;
+    dout(10) << fmt::format("Updated cluster features: 0x{:x}", cluster_features) << dendl;
+  }
+
   if (m->get_type() == MSG_MNVMEOF_GW_MAP) {
     handle_nvmeof_gw_map(ref_cast<MNVMeofGwMap>(m));
     return Dispatcher::HANDLED();
index b1ca5c94debb1b0a85b09de2b9234ab3d49a2aca..d62a4be22e0cfa99c1727768d73b6a2ebbd20ec2 100644 (file)
@@ -53,7 +53,9 @@ private:
 
   bool first_beacon = true;
   bool set_group_id = false;
-
+  uint64_t beacon_sequence = 0;
+  BeaconSubsystems prev_beacon_subsystems;
+  uint64_t cluster_features = 0;  // track cluster features for beacon encoding
   // init gw ssl opts
   void init_gw_ssl_opts();
 
@@ -76,7 +78,7 @@ protected:
 
   void send_config_beacon(); 
   void send_beacon();
+
 public:
   NVMeofGwMonitorClient(int argc, const char **argv);
   ~NVMeofGwMonitorClient() override;
diff --git a/src/nvmeof/NVMeofGwUtils.cc b/src/nvmeof/NVMeofGwUtils.cc
new file mode 100644 (file)
index 0000000..9beccc8
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+                                BeaconSubsystems& new_subsystems) {
+  BeaconSubsystems result;
+
+  // for each subsystem in new_subsystems, check if it's added or changed
+  for (const auto& new_sub : new_subsystems) {
+    auto old_it = std::find_if(old_subsystems.begin(), old_subsystems.end(),
+                              [&](const BeaconSubsystem& s) { return s.nqn == new_sub.nqn; });
+    if (old_it == old_subsystems.end()) {
+      // Subsystem not found in old list - it's new
+      BeaconSubsystem added = new_sub;
+      added.change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
+      result.push_back(std::move(added));
+    } else {
+      // subsystem exists - check if it changed
+      if (!(*old_it == new_sub)) {
+        BeaconSubsystem changed = new_sub;
+        changed.change_descriptor = subsystem_change_t::SUBSYSTEM_CHANGED;
+        result.push_back(std::move(changed));
+      }
+      // else: unchanged, do not add
+    }
+  }
+
+  // for any subsystem in old_subsystems not present in new_subsystems, add as deleted
+  for (const auto& old_sub : old_subsystems) {
+    auto found = std::find_if(new_subsystems.begin(), new_subsystems.end(),
+                             [&](const BeaconSubsystem& s) { return s.nqn == old_sub.nqn; });
+    if (found == new_subsystems.end()) {
+      BeaconSubsystem deleted_sub = old_sub;
+      deleted_sub.change_descriptor = subsystem_change_t::SUBSYSTEM_DELETED;
+      result.push_back(std::move(deleted_sub));
+    }
+  }
+
+  new_subsystems = std::move(result);
+}
+
diff --git a/src/nvmeof/NVMeofGwUtils.h b/src/nvmeof/NVMeofGwUtils.h
new file mode 100644 (file)
index 0000000..a9c2531
--- /dev/null
@@ -0,0 +1,23 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef  __NVMEOFGWUTILS_H__
+#define  __NVMEOFGWUTILS_H__
+#include "mon/NVMeofGwTypes.h"
+#include <list>
+
+// utility for diffing nvmeof subsystems changes
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+                                BeaconSubsystems& new_subsystems);
+
+#endif
index a22c7ccb236f07fc7618547cce7eeb9ff6a9e17d..9e190653b7fb29f2792c83d68c7800b314c43a27 100644 (file)
@@ -1043,3 +1043,11 @@ add_ceph_unittest(unittest_ceph_assert)
 target_link_libraries(unittest_ceph_assert ceph-common global)
 endif()
 
+add_executable(test_nvmeof_gw_utils
+  test_nvmeof_gw_utils.cc
+  ../nvmeof/NVMeofGwUtils.cc
+  )
+target_link_libraries(test_nvmeof_gw_utils
+  mon ceph-common global-static
+  )
+
diff --git a/src/test/test_nvmeof_gw_utils.cc b/src/test/test_nvmeof_gw_utils.cc
new file mode 100644 (file)
index 0000000..c7d2ccf
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+#include "mon/NVMeofGwTypes.h"
+#include <iostream>
+#include <algorithm>
+#include "include/ceph_assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void test_determine_subsystem_changes() {
+  std::cout << __func__ << "\n\n" << std::endl;
+  // Prepare old and new subsystems
+  BeaconSubsystem sub1_old = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2_old = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub3_old = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems old_subs = { sub1_old, sub2_old, sub3_old };
+
+  // sub1 unchanged, sub2 changed, sub4 added, sub3 deleted
+  BeaconSubsystem sub1_new = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2_new = { "nqn2", { {"IPv4", "1.2.3.4", "4420"} }, {}, subsystem_change_t::SUBSYSTEM_ADDED }; // changed listeners
+  BeaconSubsystem sub4_new = { "nqn4", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems new_subs = { sub1_new, sub2_new, sub4_new };
+
+  determine_subsystem_changes(old_subs, new_subs);
+
+  // After call, new_subs should only contain changed, added, and deleted subsystems
+  // sub1 (unchanged) should be removed
+  // sub2 (changed) should be present with SUBSYSTEM_CHANGED
+  // sub4 (added) should be present with SUBSYSTEM_ADDED
+  // sub3 (deleted) should be present with SUBSYSTEM_DELETED
+  bool found_sub2 = false, found_sub3 = false, found_sub4 = false;
+  for (const auto& s : new_subs) {
+    if (s.nqn == "nqn2") {
+      found_sub2 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+    } else if (s.nqn == "nqn3") {
+      found_sub3 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED);
+    } else if (s.nqn == "nqn4") {
+      found_sub4 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+    } else {
+      ceph_assert(false && "Unexpected subsystem in result");
+    }
+  }
+  ceph_assert(found_sub2 && found_sub3 && found_sub4);
+  std::cout << "determine_subsystem_changes test passed" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+  test_determine_subsystem_changes();
+  return 0;
+}
index 337d232eb3366ddfda2b2af9b1434586dcfd76e2..e03df6c852ef880b8a62a687c94fa1477ded6855 100644 (file)
@@ -36,12 +36,17 @@ void test_NVMeofGwMap() {
   std::string pool = "pool1";
   std::string group = "grp1";
   auto group_key = std::make_pair(pool, group);
-  pending_map.cfg_add_gw("GW1" ,group_key);
-  pending_map.cfg_add_gw("GW2" ,group_key);
-  pending_map.cfg_add_gw("GW3" ,group_key);
+  std::string nqn = "nqn-nqn";
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+  BeaconSubsystems subs = {sub};
+
+  pending_map.cfg_add_gw("GW1" ,group_key, true);
+  pending_map.cfg_add_gw("GW2" ,group_key, true);
+  pending_map.cfg_add_gw("GW3" ,group_key, true);
   NvmeNonceVector new_nonces = {"abc", "def","hij"};
   pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
   pending_map.created_gws[group_key]["GW1"].performed_full_startup = true;
+  pending_map.created_gws[group_key]["GW1"].subsystems = subs;
   int i = 0;
   for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
     blklst_itr.second.osd_epoch = 2*(i++);
@@ -53,21 +58,25 @@ void test_NVMeofGwMap() {
   dout(0) << pending_map << dendl;
 
   ceph::buffer::list bl;
+  dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
+
   pending_map.encode(bl, CEPH_FEATURES_ALL);
   auto p = bl.cbegin();
   pending_map.decode(p);
   dout(0) << " == Dump map after Decode: == " <<dendl;
   dout(0) << pending_map << dendl;
+  dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
 }
 
 void test_MNVMeofGwMap() {
+  //test message to the Mon Client
   dout(0) << __func__ << "\n\n" << dendl;
   std::map<NvmeGroupKey, NvmeGwMonClientStates> map;
 
   std::string pool = "pool1";
   std::string group = "grp1";
   std::string gw_id = "GW1";
-  NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE);
+  NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE, 0, false);
   std::string nqn = "nqn";
   ana_state_t ana_state;
   NqnState nqn_state(nqn, ana_state);
@@ -82,9 +91,9 @@ void test_MNVMeofGwMap() {
   encode(map, bl, CEPH_FEATURES_ALL);
   dout(0) << "encoded: " << map << dendl;
   decode(map, bl);
-  dout(0) << "decode: " << map << dendl;
+  dout(0) << "decoded: " << map << dendl;
 
-  BeaconSubsystem sub = { nqn, {}, {} };
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
   NVMeofGwMap pending_map;
   pending_map.epoch = 2;
   auto msg1 = make_message<MNVMeofGwMap>(pending_map);
@@ -95,12 +104,13 @@ void test_MNVMeofGwMap() {
   int epoch = msg1->get_gwmap_epoch();
   dout(0) << "after decode empty msg: " << *msg1 << " epoch " << epoch <<  dendl;
 
-  pending_map.cfg_add_gw("GW1" ,group_key);
-  pending_map.cfg_add_gw("GW2" ,group_key);
-  pending_map.cfg_add_gw("GW3" ,group_key);
+  pending_map.cfg_add_gw("GW1" ,group_key, true);
+  pending_map.cfg_add_gw("GW2" ,group_key, true);
+  pending_map.cfg_add_gw("GW3" ,group_key, true);
   NvmeNonceVector new_nonces = {"abc", "def","hij"};
   pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
   pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub);
+
   int i = 0;
   for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
      blklst_itr.second.osd_epoch = 2*(i++);
@@ -116,7 +126,7 @@ void test_MNVMeofGwMap() {
   dout(0) << "after encode msg: " << *msg << dendl;
   msg->decode_payload();
   dout(0) << "after decode msg: " << *msg << dendl;
-
+  
   //dout(0)   << "\n == Test GW Delete ==" << dendl;
   //pending_map.cfg_delete_gw("GW1" ,group_key);
   //dout(0) << "deleted GW1 " << pending_map << dendl;
@@ -128,10 +138,10 @@ void test_MNVMeofGwMap() {
   //dout(0) << "deleted GW2 " << pending_map << dendl;
 
   //dout(0) << "delete of wrong gw id" << dendl;
-  //pending_map.cfg_delete_gw("wow" ,group_key);
+  //pending_map.cfg_delete_gw("wow" ,group_key, true);
 
-  pending_map.cfg_delete_gw("GW3" ,group_key);
-  dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
+  //pending_map.cfg_delete_gw("GW3" ,group_key);
+  //dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
 
 
 }
@@ -142,11 +152,13 @@ void test_MNVMeofGwBeacon() {
   std::string gw_group = "group";
   gw_availability_t availability = gw_availability_t::GW_AVAILABLE;
   std::string nqn = "nqn";
-  BeaconSubsystem sub = { nqn, {}, {} };
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
   BeaconSubsystems subs = { sub };
   epoch_t osd_epoch = 17;
   epoch_t gwmap_epoch = 42;
-
+  uint64_t sequence = 12345;
+  
+  // Test legacy beacon (without diff support)
   auto msg = make_message<MNVMeofGwBeacon>(
       gw_id,
       gw_pool,
@@ -154,22 +166,87 @@ void test_MNVMeofGwBeacon() {
       subs,
       availability,
       osd_epoch,
-      gwmap_epoch);
-  msg->encode_payload(0);
+      gwmap_epoch
+      // sequence defaults to 0
+      // enable_diff defaults to false
+  );
+  msg->encode_payload(CEPH_FEATURES_ALL);
   msg->decode_payload();
-  dout(0) << "decode msg: " << *msg << dendl;
+  dout(0) << "decode msg (revision 1): " << *msg << dendl;
   ceph_assert(msg->get_gw_id() == gw_id);
   ceph_assert(msg->get_gw_pool() == gw_pool);
   ceph_assert(msg->get_gw_group() == gw_group);
   ceph_assert(msg->get_availability() == availability);
   ceph_assert(msg->get_last_osd_epoch() == osd_epoch);
   ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch);
+  // Legacy beacons don't preserve sequence field - it gets reset to 0
+  ceph_assert(msg->get_sequence() == 0);
   const auto& dsubs = msg->get_subsystems();
   auto it = std::find_if(dsubs.begin(), dsubs.end(),
                            [&nqn](const auto& element) {
                                return element.nqn == nqn;
                            });
   ceph_assert(it != dsubs.end());
+  ceph_assert(it->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+  
+  // Test enhanced beacon (with diff support)
+  auto msg2 = make_message<MNVMeofGwBeacon>(
+      gw_id,
+      gw_pool,
+      gw_group,
+      subs,
+      availability,
+      osd_epoch,
+      gwmap_epoch,
+      sequence,
+      true  // enable_diff = true
+  );
+  msg2->encode_payload(CEPH_FEATURES_ALL);
+  msg2->decode_payload();
+  dout(0) << "decode msg (revision 2): " << *msg2 << dendl;
+  ceph_assert(msg2->get_gw_id() == gw_id);
+  ceph_assert(msg2->get_gw_pool() == gw_pool);
+  ceph_assert(msg2->get_gw_group() == gw_group);
+  ceph_assert(msg2->get_availability() == availability);
+  ceph_assert(msg2->get_last_osd_epoch() == osd_epoch);
+  ceph_assert(msg2->get_last_gwmap_epoch() == gwmap_epoch);
+  ceph_assert(msg2->get_sequence() == sequence);
+  const auto& dsubs2 = msg2->get_subsystems();
+  auto it2 = std::find_if(dsubs2.begin(), dsubs2.end(),
+                           [&nqn](const auto& element) {
+                               return element.nqn == nqn;
+                           });
+  ceph_assert(it2 != dsubs2.end());
+  ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+}
+
+void test_subsystem_change_descriptors() {
+  dout(0) << __func__ << "\n\n" << dendl;
+  // Test different change descriptors
+  BeaconSubsystem sub1 = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2 = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+  BeaconSubsystem sub3 = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems subs = { sub1, sub2, sub3 };
+  // Encode and decode
+  ceph::buffer::list bl;
+  encode(subs, bl, CEPH_FEATURES_ALL);
+  auto p = bl.cbegin();
+  BeaconSubsystems decoded_subs;
+  decode(decoded_subs, p);
+  // Verify change descriptors are preserved
+  auto it1 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn1"; });
+  auto it2 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn2"; });
+  auto it3 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn3"; });
+  ceph_assert(it1 != decoded_subs.end());
+  ceph_assert(it2 != decoded_subs.end());
+  ceph_assert(it3 != decoded_subs.end());
+  ceph_assert(it1->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+  ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+  ceph_assert(it3->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+  dout(0) << "Subsystem change descriptors test passed" << dendl;
 }
 
 void test_NVMeofGwTimers()
@@ -207,6 +284,7 @@ int main(int argc, const char **argv)
   test_NVMeofGwMap();
   test_MNVMeofGwMap();
   test_MNVMeofGwBeacon();
+  test_subsystem_change_descriptors();
   test_NVMeofGwTimers();
 }