From: Leonid Chernin Date: Mon, 15 Sep 2025 11:04:04 +0000 (+0300) Subject: nvmeofgw: beacon diff implementation in the monitor and in the MonClient. X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3555a28e45c5b44289f12abe2fc843e21c7ebf87;p=ceph.git nvmeofgw: beacon diff implementation in the monitor and in the MonClient. -monclient encodes subsystems by beacon-diff rules if BEACON_DIFF bit is enabled by quorum -monitor processes beacons by beacon-diff new schema -monitor detects sequence out of order(ooo) condition and handles it -in case ooo detected monitor send ack to the gw with the expected correct sequence -monitor skips failovers for some interval when ooo detected -monitor ignores all becons with incorrect sequences until gw sends expected one -coding upgrade rules Signed-off-by: Leonid Chernin Fixes: https://tracker.ceph.com/issues/72394 --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1051d6b74650..85071cd93984 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1084,6 +1084,7 @@ if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT) ${nvmeof_monitor_grpc_hdrs} ceph_nvmeof_monitor_client.cc nvmeof/NVMeofGwClient.cc + nvmeof/NVMeofGwUtils.cc nvmeof/NVMeofGwMonitorGroupClient.cc nvmeof/NVMeofGwMonitorClient.cc) add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs}) diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index 5d8b38fc0a1d..5a607ef4b5a4 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -161,7 +161,7 @@ DEFINE_CEPH_FEATURE_RETIRED(49, 1, OSD_PROXY_FEATURES, JEWEL, LUMINOUS) // overl DEFINE_CEPH_FEATURE(49, 2, SERVER_SQUID); DEFINE_CEPH_FEATURE_RETIRED(50, 1, MON_METADATA, MIMIC, OCTOPUS) DEFINE_CEPH_FEATURE(50, 2, SERVER_TENTACLE); -DEFINE_CEPH_FEATURE_RETIRED(51, 1, OSD_BITWISE_HOBJ_SORT, MIMIC, OCTOPUS) +DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACON_DIFF) // available DEFINE_CEPH_FEATURE_RETIRED(52, 1, OSD_PROXY_WRITE_FEATURES, MIMIC, OCTOPUS) // available @@ -258,6 +258,7 @@ DEFINE_CEPH_FEATURE_RETIRED(63, 1, RESERVED_BROKEN, LUMINOUS, QUINCY) // client- CEPH_FEATUREMASK_SERVER_REEF | \ CEPH_FEATUREMASK_SERVER_SQUID | \ CEPH_FEATUREMASK_SERVER_TENTACLE | \ + CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF | \ 0ULL) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h index 61daa5269bf6..239410f49da5 100644 --- a/src/messages/MNVMeofGwBeacon.h +++ b/src/messages/MNVMeofGwBeacon.h @@ -22,11 +22,9 @@ #include "mon/MonCommand.h" #include "mon/NVMeofGwMap.h" #include "include/types.h" +#include "mon/NVMeofGwBeaconConstants.h" class MNVMeofGwBeacon final : public PaxosServiceMessage { -private: - static constexpr int HEAD_VERSION = 1; - static constexpr int COMPAT_VERSION = 1; protected: std::string gw_id; @@ -36,10 +34,13 @@ protected: gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable epoch_t last_osd_epoch; epoch_t last_gwmap_epoch; + uint64_t sequence = 0; // sequence number for each beacon message + uint64_t affected_features = 0; public: MNVMeofGwBeacon() - : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION} + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, BEACON_VERSION_ENHANCED, + BEACON_VERSION_ENHANCED}, sequence(0) { set_priority(CEPH_MSG_PRIO_HIGH); } @@ -50,11 +51,17 @@ public: const BeaconSubsystems& subsystems_, const gw_availability_t& availability_, const epoch_t& last_osd_epoch_, - const epoch_t& last_gwmap_epoch_ - ) - : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}, + const epoch_t& last_gwmap_epoch_, + uint64_t sequence_ = 0, // default sequence for backward compatibility + uint64_t features = 0) + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, + 0, + features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY, + features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY}, gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_), - availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_) + availability(availability_), last_osd_epoch(last_osd_epoch_), + last_gwmap_epoch(last_gwmap_epoch_), sequence(sequence_), + affected_features(features) { set_priority(CEPH_MSG_PRIO_HIGH); } @@ -78,6 +85,7 @@ public: const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; } const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; } const BeaconSubsystems& get_subsystems() const { return subsystems; }; + uint64_t get_sequence() const { return sequence; } private: ~MNVMeofGwBeacon() final {} @@ -92,10 +100,14 @@ public: encode(gw_id, payload); encode(gw_pool, payload); encode(gw_group, payload); - encode(subsystems, payload); + encode(subsystems, payload, affected_features); encode((uint32_t)availability, payload); encode(last_osd_epoch, payload); encode(last_gwmap_epoch, payload); + // Only encode sequence for enhanced beacons (version >= 2) + if (get_header().version >= 2) { + encode(sequence, payload); + } } void decode_payload() override { @@ -112,6 +124,12 @@ public: availability = static_cast(tmp); decode(last_osd_epoch, p); decode(last_gwmap_epoch, p); + // Only decode sequence for enhanced beacons (version >= 2) + if (get_header().version >= 2 && !p.end()) { + decode(sequence, p); + } else { + sequence = 0; // Legacy beacons don't have sequence field + } } private: diff --git a/src/mon/NVMeofGwBeaconConstants.h b/src/mon/NVMeofGwBeaconConstants.h new file mode 100644 index 000000000000..7453905012f4 --- /dev/null +++ b/src/mon/NVMeofGwBeaconConstants.h @@ -0,0 +1,24 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2025 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef CEPH_NVMEOFGWBEACONCONSTANTS_H +#define CEPH_NVMEOFGWBEACONCONSTANTS_H + +// This header contains version constants used across multiple files +// to avoid duplication and maintain consistency. + +// Beacon version constants +#define BEACON_VERSION_LEGACY 1 // Legacy beacon format (no diff support) +#define BEACON_VERSION_ENHANCED 2 // Enhanced beacon format (with diff support) + +#endif /* CEPH_NVMEOFGWBEACONCONSTANTS_H */ diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc index f6461ebac41d..5d1a42089e47 100755 --- a/src/mon/NVMeofGwMap.cc +++ b/src/mon/NVMeofGwMap.cc @@ -49,7 +49,8 @@ void NVMeofGwMap::to_gmap( } auto gw_state = NvmeGwClientState( - gw_created.ana_grp_id, epoch, availability); + gw_created.ana_grp_id, epoch, availability, gw_created.beacon_sequence, + gw_created.beacon_sequence_ooo); for (const auto& sub: gw_created.subsystems) { gw_state.subsystems.insert({ sub.nqn, @@ -82,10 +83,10 @@ void NVMeofGwMap::remove_grp_id( } int NVMeofGwMap::cfg_add_gw( - const NvmeGwId &gw_id, const NvmeGroupKey& group_key) + const NvmeGwId &gw_id, const NvmeGroupKey& group_key, uint64_t features) { std::set allocated; - if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) { + if (HAVE_FEATURE(features, NVMEOFHAMAP)) { auto gw_epoch_it = gw_epoch.find(group_key); if (gw_epoch_it == gw_epoch.end()) { gw_epoch[group_key] = epoch; @@ -176,11 +177,16 @@ int NVMeofGwMap::cfg_delete_gw( for (auto& gws_states: created_gws[group_key]) { if (gws_states.first == gw_id) { auto& state = gws_states.second; + if (state.availability == gw_availability_t::GW_AVAILABLE) { + /*prevent failover because blocklisting right now cause IO errors */ + dout(4) << "Delete GW: set skip-failovers for group " << gw_id + << " group " << group_key << dendl; + skip_failovers_for_group(group_key, 5); + } state.availability = gw_availability_t::GW_DELETING; dout(4) << " Deleting GW :"<< gw_id << " in state " << state.availability << " Resulting GW availability: " << state.availability << dendl; - state.subsystems.clear();//ignore subsystems of this GW utime_t now = ceph_clock_now(); mon->nvmegwmon()->gws_deleting_time[group_key][gw_id] = now; return 0; @@ -360,10 +366,16 @@ void NVMeofGwMap::track_deleting_gws(const NvmeGroupKey& group_key, } } -void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key) +void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key, + int interval_sec) { - const auto skip_failovers = g_conf().get_val - ("mon_nvmeofgw_skip_failovers_interval"); + std::chrono::seconds skip_failovers; + if (interval_sec == 0) { + skip_failovers = g_conf().get_val + ("mon_nvmeofgw_skip_failovers_interval"); + } else { + skip_failovers = std::chrono::seconds(interval_sec); + } for (auto& gw_created: created_gws[group_key]) { gw_created.second.allow_failovers_ts = std::chrono::system_clock::now() + skip_failovers; @@ -408,6 +420,7 @@ int NVMeofGwMap::process_gw_map_gw_down( auto& st = gw_state->second; st.set_unavailable_state(); st.set_last_gw_down_ts(); + st.reset_beacon_sequence(); for (auto& state_itr: created_gws[group_key][gw_id].sm_state) { fsm_handle_gw_down( gw_id, group_key, state_itr.second, @@ -1063,14 +1076,23 @@ int NVMeofGwMap::blocklist_gw( // find_already_created_gw(gw_id, group_key); NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; NvmeNonceVector nonces; + + NvmeAnaNonceMap nonce_map; + for (const auto& sub: gw_map.subsystems) { // recreate nonce map from subsystems + for (const auto& ns: sub.namespaces) { + auto& nonce_vec = nonce_map[ns.anagrpid-1]; //Converting ana groups to offsets + if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end()) + nonce_vec.push_back(ns.nonce); + } + } for (auto& state_itr: gw_map.sm_state) { // to make blocklist on all clusters of the failing GW - nonces.insert(nonces.end(), gw_map.nonce_map[state_itr.first].begin(), - gw_map.nonce_map[state_itr.first].end()); + nonces.insert(nonces.end(), nonce_map[state_itr.first].begin(), + nonce_map[state_itr.first].end()); } - + gw_map.subsystems.clear(); if (nonces.size() > 0) { - NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];; + NvmeNonceVector &nonce_vector = nonces; std::string str = "["; entity_addrvec_t addr_vect; @@ -1144,6 +1166,45 @@ void NVMeofGwMap::validate_gw_map(const NvmeGroupKey& group_key) } } +bool NVMeofGwMap::put_gw_beacon_sequence_number(const NvmeGwId &gw_id, + int gw_version, const NvmeGroupKey& group_key, + uint64_t beacon_sequence, uint64_t& old_sequence) +{ + bool rc = true; + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; + + if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) || + (gw_version > 0) ) { + uint64_t seq_number = gw_map.beacon_sequence; + if ((beacon_sequence != seq_number+1) && + !(beacon_sequence == 0 && seq_number == 0 )) {// new GW startup + rc = false; + old_sequence = seq_number; + dout(4) << "Warning: GW " << gw_id + << " sent beacon sequence out of order, expected " + << seq_number +1 << " received " << beacon_sequence << dendl; + gw_map.beacon_sequence_ooo = true; + } else { + gw_map.beacon_sequence = beacon_sequence; + } + } + return rc; +} + +bool NVMeofGwMap::set_gw_beacon_sequence_number(const NvmeGwId &gw_id, + int gw_version, const NvmeGroupKey& group_key, uint64_t beacon_sequence) +{ + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; + if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) || + (gw_version > 0)) { + gw_map.beacon_sequence = beacon_sequence; + gw_map.beacon_sequence_ooo = false; + dout(10) << gw_id << " set beacon_sequence " << beacon_sequence << dendl; + } + return true; +} + + void NVMeofGwMap::update_active_timers(bool &propose_pending) { const auto now = std::chrono::system_clock::now(); diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h index f01a3d740c3e..bfc45a009854 100755 --- a/src/mon/NVMeofGwMap.h +++ b/src/mon/NVMeofGwMap.h @@ -65,10 +65,11 @@ public: void to_gmap(std::map& Gmap) const; void track_deleting_gws(const NvmeGroupKey& group_key, const BeaconSubsystems& subs, bool &propose_pending); - int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); - int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); void check_all_gws_in_deleting_state(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, + uint64_t features); + int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); void process_gw_map_ka( const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending); @@ -92,7 +93,13 @@ public: const NvmeGroupKey& group_key, bool &propose_pending); void set_addr_vect(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const entity_addr_t &addr_vect); - void skip_failovers_for_group(const NvmeGroupKey& group_key); + void skip_failovers_for_group(const NvmeGroupKey& group_key, + int interval_sec = 0); + bool put_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version, + const NvmeGroupKey& group_key, uint64_t beacon_sequence, + uint64_t& old_sequence); + bool set_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version, + const NvmeGroupKey& group_key, uint64_t beacon_sequence); private: int do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); int do_erase_gw_id(const NvmeGwId &gw_id, diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc index f8e6f3c488b8..9809b9eff610 100644 --- a/src/mon/NVMeofGwMon.cc +++ b/src/mon/NVMeofGwMon.cc @@ -71,6 +71,9 @@ void NVMeofGwMon::synchronize_last_beacon() // force send ack after nearest beacon after leader re-election gw_created_pair.second.beacon_index = g_conf().get_val("mon_nvmeofgw_beacons_till_ack"); + // force send full beacon after leader election + gw_created_pair.second.beacon_sequence = 0; + gw_created_pair.second.beacon_sequence_ooo = true; } } } @@ -138,7 +141,9 @@ void NVMeofGwMon::tick() for (auto &[group_key, gws_states]: pending_map.created_gws) { BeaconSubsystems *subsystems = &empty_subsystems; for (auto& gw_state : gws_states) { // loop for GWs inside nqn group - subsystems = &gw_state.second.subsystems; + if (gw_state.second.availability == gw_availability_t::GW_AVAILABLE) { + subsystems = &gw_state.second.subsystems; + } if (subsystems->size()) { // Set subsystems to the valid value break; } @@ -172,7 +177,7 @@ version_t NVMeofGwMon::get_trim_to() const * function called to restore in pending map all data that is not serialized * to paxos peons. Othervise it would be overriden in "pending_map = map" * currently "allow_failovers_ts", "last_gw_down_ts", - * "last_gw_map_epoch_valid" variables are restored + * "last_gw_map_epoch_valid", "beacon_sequence", "beacon_index" variables are restored */ void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) { std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); @@ -196,6 +201,12 @@ void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) { gw_created_pair.second.last_gw_down_ts; pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid = gw_created_pair.second.last_gw_map_epoch_valid; + pending_map.created_gws[group_key][gw_id].beacon_index = + gw_created_pair.second.beacon_index; + pending_map.created_gws[group_key][gw_id].beacon_sequence = + gw_created_pair.second.beacon_sequence; + pending_map.created_gws[group_key][gw_id].beacon_sequence_ooo = + gw_created_pair.second.beacon_sequence_ooo; } } } @@ -234,7 +245,8 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t) } pending_map.encode(bl, features); dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA) - << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl; + << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) + << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) << dendl; put_version(t, pending_map.epoch, bl); put_last_committed(t, pending_map.epoch); @@ -247,7 +259,9 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t) void NVMeofGwMon::update_from_paxos(bool *need_bootstrap) { version_t version = get_last_committed(); - + uint64_t features = mon.get_quorum_con_features(); + dout(20) << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) + << dendl; if (version != map.epoch) { dout(10) << " NVMeGW loading version " << version << " " << map.epoch << dendl; @@ -627,7 +641,8 @@ bool NVMeofGwMon::prepare_command(MonOpRequestRef op) auto group_key = std::make_pair(pool, group); dout(10) << " id "<< id <<" pool "<< pool << " group "<< group << dendl; if (prefix == "nvme-gw create") { - rc = pending_map.cfg_add_gw(id, group_key); + rc = pending_map.cfg_add_gw(id, group_key, + mon.get_quorum_con_features()); if (rc == -EINVAL) { err = rc; dout (4) << "Error: GW cannot be created " << id @@ -712,23 +727,176 @@ epoch_t NVMeofGwMon::get_ack_map_epoch(bool gw_created, return rc; } +void NVMeofGwMon::do_send_map_ack(MonOpRequestRef op, + bool gw_created, bool gw_propose, + uint64_t stored_sequence, bool is_correct_sequence, + const NvmeGroupKey& group_key, const NvmeGwId &gw_id) { + /* always send beacon ack to gw in Created state, + * it should be temporary state + * if epoch-filter-bit: send ack to beacon in case no propose + * or if changed something not relevant to gw-epoch + */ + NVMeofGwMap ack_map; + if (gw_created) { + NvmeGwMonState& pending_gw_map = pending_map.created_gws[group_key][gw_id]; + // respond with a map slice correspondent to the same GW + ack_map.created_gws[group_key][gw_id] = (gw_propose) ? //avail = CREATED + pending_gw_map : map.created_gws[group_key][gw_id]; + ack_map.created_gws[group_key][gw_id].beacon_sequence = + pending_gw_map.beacon_sequence; + if (!is_correct_sequence) { + dout(4) << " GW " << gw_id << + " sending ACK due to receiving beacon_sequence out of order" << dendl; + ack_map.created_gws[group_key][gw_id].beacon_sequence = stored_sequence; + ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = true; + } else { + ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = false; + } + if (gw_propose) { + dout(10) << "GW in Created " << gw_id << " ack map " << ack_map << dendl; + } + } + ack_map.epoch = get_ack_map_epoch(gw_created, group_key); + if (!gw_created) + dout(10) << "gw not created, ack map " + << ack_map << " epoch " << ack_map.epoch << dendl; + dout(20) << "ack_map " << ack_map <(ack_map); + mon.send_reply(op, msg.detach()); +} + +/* + * Any subsystem was added to the Beacon only if it( or it's encapsulated fields) + * was changed, added or deleted. If nothing changed the beacon did not + * encode any subsystem. + * New descriptor was added under the subsystem to describe + * the change : ADDED, DELETED, CHANGED + * rules for apply the subsystems to the map: + * Pass all subsystems in the beacon->sub list + * if descriptor is ADDED or CHANGED do the following + * look for subs nqn in the gw.subs list and if found - substitute, + * if not found - add + * if descriptor is DELETED do the following + * look for subs nqn in the gw.subs list and if found - delete + */ +int NVMeofGwMon::apply_beacon(const NvmeGwId &gw_id, int gw_version, + const NvmeGroupKey& group_key, void *msg, + const BeaconSubsystems& sub, gw_availability_t &avail, + bool &propose_pending) +{ + bool found = false; + bool changed = false; + BeaconSubsystems &gw_subs = + pending_map.created_gws[group_key][gw_id].subsystems; + auto &state = pending_map.created_gws[group_key][gw_id]; + + if (!HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOF_BEACON_DIFF) && + gw_version == 0) { + if (gw_subs != sub) { + dout(10) << "BEACON_DIFF logic not applied." + "subsystems of GW changed, propose pending " << gw_id << dendl; + gw_subs = sub; + //rebuild nonce map. + state.nonce_map = ((MNVMeofGwBeacon *)msg)->get_nonce_map(); + changed = true; + } + } else { + if (state.beacon_sequence_ooo) { + dout(10) << "Good sequence after out of order detection " + "sequence "<< state.beacon_sequence << " " << gw_id << dendl; + // need to clear subsystems for correct calculation of difference + state.subsystems.clear(); + state.beacon_sequence_ooo = false; + propose_pending = true; + } + + for (auto &subs_it: sub) { + if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED || + subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED) { + found = false; + for (auto &gw_subs_it: gw_subs) { + if (gw_subs_it.nqn == subs_it.nqn) { + gw_subs_it = subs_it; + dout(10) << "subsystem changed " << subs_it.nqn << " change descr " + << (uint32_t)subs_it.change_descriptor << dendl; + found = true; + changed = true; + break; + } + } + if (!found) { + gw_subs.push_back(subs_it); + changed = true; + dout(10) << "subsystem added " << subs_it.nqn << " change descr " + << (uint32_t)subs_it.change_descriptor << dendl; + } + } + else + if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED) + { + auto it = std::find_if(gw_subs.begin(), gw_subs.end(), + [&](const auto& gw_subs_it) { + return gw_subs_it.nqn == subs_it.nqn; + }); + if (it != gw_subs.end()) { + gw_subs.erase(it); + dout(10) << "subsystem deleted " << subs_it.nqn << " change descr " + << (uint32_t)subs_it.change_descriptor << dendl; + changed = true; + } + } + } + } + if (changed) { + avail = gw_availability_t::GW_AVAILABLE; + } + if (gw_subs.size() == 0) { + avail = gw_availability_t::GW_CREATED; + dout(10) << "No-subsystems condition detected for GW " << gw_id <get_req(); - - dout(20) << "availability " << m->get_availability() + uint64_t sequence = m->get_sequence(); + int version = m->version; + dout(10) << "availability " << m->get_availability() + << " sequence " << sequence << " version " << version << " GW : " << m->get_gw_id() << " osdmap_epoch " << m->get_last_osd_epoch() - << " subsystems " << m->get_subsystems() << dendl; + << " subsystems " << m->get_subsystems().size() << dendl; + ConnectionRef con = op->get_connection(); NvmeGwId gw_id = m->get_gw_id(); NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); + //"avail" variable will be changed inside the function + // when it becomes CREATED for several reasons GW's load balance group + // is serviced by another GW gw_availability_t avail = m->get_availability(); bool propose = false; bool nonce_propose = false; bool timer_propose = false; bool gw_propose = false; bool gw_created = true; + bool correct_sequence = true; + uint64_t stored_sequence; NVMeofGwMap ack_map; bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOFHAMAP); @@ -749,6 +917,9 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) << map.created_gws << dendl; goto set_propose; } else { + pending_map.created_gws[group_key][gw_id].subsystems.clear(); + pending_map.set_gw_beacon_sequence_number(gw_id, version, + group_key, sequence); dout(4) << "GW beacon: Created state - full startup done " << gw_id << " GW state in monitor data-base : " << pending_map.created_gws[group_key][gw_id].availability @@ -777,6 +948,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) } else { // first GW beacon should come with avail = Created // if GW reports Avail/Unavail but in monitor's database it is Unavailable if (gw != group_gws.end()) { + correct_sequence = pending_map.put_gw_beacon_sequence_number + (gw_id, version, group_key, sequence, stored_sequence); // it means it did not perform "exit" after failover was set by // NVMeofGWMon if ((pending_map.created_gws[group_key][gw_id].availability == @@ -794,6 +967,18 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) mon.send_reply(op, msg.detach()); goto false_return; } + if (!correct_sequence) { + if (avail == gw_availability_t::GW_AVAILABLE) { + /*prevent failover - give GW a chance to send the expected sequence */ + dout(4) << "sequence ooo: set skip-failovers for group " << gw_id + << " group " << group_key << dendl; + pending_map.skip_failovers_for_group(group_key, 7); + } + avail = gw_availability_t::GW_CREATED; + // availability would be set to Active and GW receive the full map + // when it sends the correct beacon-sequence, + goto check_availability; + } } } // Beacon from GW in !Created state but it does not appear in the map @@ -819,44 +1004,12 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) pending_map.set_addr_vect(gw_id, group_key, con->get_peer_addr()); gw_propose = true; } - // deep copy the whole nonce map of this GW - if (m->get_nonce_map().size()) { - if (pending_map.created_gws[group_key][gw_id].nonce_map != - m->get_nonce_map()) { - dout(10) << "nonce map of GW changed , propose pending " - << gw_id << dendl; - pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map(); - dout(10) << "nonce map of GW " << gw_id << " " - << pending_map.created_gws[group_key][gw_id].nonce_map << dendl; - nonce_propose = true; - } - } else { - dout(10) << "Warning: received empty nonce map in the beacon of GW " - << gw_id << " avail " << (int)avail << dendl; - } - - if (sub.size() == 0) { - avail = gw_availability_t::GW_CREATED; - dout(20) << "No-subsystems condition detected for GW " << gw_id <get_last_gwmap_epoch()); @@ -866,10 +1019,11 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) << " epoch " << get_ack_map_epoch(true, group_key) << " beacon_epoch " << m->get_last_gwmap_epoch() << dendl; } + +check_availability: if (avail == gw_availability_t::GW_AVAILABLE) { // check pending_map.epoch vs m->get_version() - // if different - drop the beacon - LastBeacon lb = {gw_id, group_key}; last_beacon[lb] = now; epoch_t last_osd_epoch = m->get_last_osd_epoch(); @@ -882,9 +1036,10 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) // Periodic: check active FSM timers pending_map.update_active_timers(timer_propose); - set_propose: +set_propose: propose |= (timer_propose | gw_propose | nonce_propose); - apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false; + apply_ack_logic = ((avail == gw_availability_t::GW_AVAILABLE) + && correct_sequence) ? true : false; if ( (apply_ack_logic && ((pending_map.created_gws[group_key][gw_id].beacon_index++ % beacons_till_ack) == 0))|| (!apply_ack_logic) ) { @@ -899,22 +1054,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) if (send_ack && ((!gw_propose && epoch_filter_enabled) || (!propose && !epoch_filter_enabled) || (avail == gw_availability_t::GW_CREATED)) ) { - /* always send beacon ack to gw in Created state, - * it should be temporary state - * if epoch-filter-bit: send ack to beacon in case no propose - * or if changed something not relevant to gw-epoch - */ - if (gw_created) { - // respond with a map slice correspondent to the same GW - ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id]; - } - ack_map.epoch = get_ack_map_epoch(gw_created, group_key); - if (!gw_created) - dout(10) << "gw not created, ack map " - << ack_map << " epoch " << ack_map.epoch << dendl; - dout(20) << "ack_map " << ack_map <(ack_map); - mon.send_reply(op, msg.detach()); + do_send_map_ack(op, gw_created, gw_propose, stored_sequence, + correct_sequence, group_key, gw_id); } else { mon.no_reply(op); } diff --git a/src/mon/NVMeofGwMon.h b/src/mon/NVMeofGwMon.h index a27eedc27816..ce8b9473d6dd 100644 --- a/src/mon/NVMeofGwMon.h +++ b/src/mon/NVMeofGwMon.h @@ -102,6 +102,12 @@ private: void restore_pending_map_info(NVMeofGwMap & tmp_map); void cleanup_pending_map(); void get_gw_listeners(ceph::Formatter *f, std::pair& group_key); + int apply_beacon(const NvmeGwId &gw_id, int gw_version, + const NvmeGroupKey& group_key, void *msg, + const BeaconSubsystems& sub, gw_availability_t &avail, bool &propose_pending); + void do_send_map_ack(MonOpRequestRef op, bool gw_created, bool gw_propose, + uint64_t stored_sequence, bool is_correct_sequence, + const NvmeGroupKey& group_key, const NvmeGwId &gw_id); }; #endif /* MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h index b259d7b87466..4ddbe18c9f57 100755 --- a/src/mon/NVMeofGwSerialize.h +++ b/src/mon/NVMeofGwSerialize.h @@ -13,6 +13,7 @@ */ #ifndef MON_NVMEOFGWSERIALIZE_H_ #define MON_NVMEOFGWSERIALIZE_H_ + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mon #undef dout_prefix @@ -107,7 +108,8 @@ inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) { } inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) { - os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ "; + os << "BeaconSubsystem( nqn:" << value.nqn << " descr " + << (uint32_t)value.change_descriptor << ", listeners [ "; for (const auto& list: value.listeners) os << list << " "; os << "] namespaces [ "; for (const auto& ns: value.namespaces) os << ns << " "; @@ -124,7 +126,9 @@ inline std::ostream& operator<<( std::ostream& os, const NvmeGwClientState value) { os << "NvmeGwState { group id: " << value.group_id << " gw_map_epoch " << value.gw_map_epoch - << " availablilty "<< value.availability + << " availablilty " << value.availability + << " sequence " << value.last_beacon_seq_number + << " sequence-ooo " << value.last_beacon_seq_ooo << " GwSubsystems: [ "; for (const auto& sub: value.subsystems) { os << sub.second << " "; @@ -274,7 +278,7 @@ inline void encode( for (const auto& sub: subsystems) { encode(sub.second.nqn, bl); if (version == 1) { - dout(20) << "encode ana_state vector version1 = " << version << dendl; + dout(20) << "encode ana_state vector version1 = " << (int)version << dendl; /* Version 1 requires exactly 16 entries */ ana_state_t filled(sub.second.ana_state); filled.resize( @@ -284,7 +288,7 @@ inline void encode( 0)); encode(filled, bl); } else { - dout(20) << "encode ana_state vector version2 = " << version << dendl; + dout(20) << "encode ana_state vector version2 = " << (int)version << dendl; encode(sub.second.ana_state, bl); } } @@ -308,23 +312,35 @@ inline void decode( } inline void encode(const NvmeGwClientState& state, ceph::bufferlist &bl, uint64_t features) { - ENCODE_START(1, 1, bl); + uint8_t version = 1; + if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) { + version = 2; + } + ENCODE_START(version, version, bl); encode(state.group_id, bl); encode(state.gw_map_epoch, bl); encode (state.subsystems, bl, features); encode((uint32_t)state.availability, bl); + if (version >= 2) { + encode((uint64_t)state.last_beacon_seq_number, bl); + encode(state.last_beacon_seq_ooo, bl); + } ENCODE_FINISH(bl); } inline void decode( NvmeGwClientState& state, ceph::bufferlist::const_iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); decode(state.group_id, bl); decode(state.gw_map_epoch, bl); decode(state.subsystems, bl); uint32_t avail; decode(avail, bl); state.availability = (gw_availability_t)avail; + if (struct_v >= 2) { + decode(state.last_beacon_seq_number, bl); + decode(state.last_beacon_seq_ooo, bl); + } DECODE_FINISH(bl); } @@ -418,6 +434,7 @@ inline void encode(const NvmeAnaNonceMap& nonce_map, ceph::bufferlist &bl, uint64_t features) { ENCODE_START(1, 1, bl); encode((uint32_t)nonce_map.size(), bl); + dout(20) << "encode nonce map size " << nonce_map.size() << dendl; for (auto& ana_group_nonces : nonce_map) { // ana group id encode(ana_group_nonces.first, bl); @@ -458,10 +475,12 @@ inline void encode(const NvmeGwMonStates& gws, ceph::bufferlist &bl, version = 3; } ENCODE_START(version, version, bl); + dout(20) << "encode NvmeGwMonStates. struct_v: " << (int)version << dendl; encode ((uint32_t)gws.size(), bl); // number of gws in the group for (auto& gw : gws) { encode(gw.first, bl);// GW_id encode(gw.second.ana_grp_id, bl); // GW owns this group-id + dout(20) << "encode gw-id " << gw.first << dendl; if (version >= 2) { encode((uint32_t)gw.second.sm_state.size(), bl); for (auto &state_it:gw.second.sm_state) { @@ -471,7 +490,9 @@ inline void encode(const NvmeGwMonStates& gws, ceph::bufferlist &bl, encode((uint32_t)gw.second.availability, bl); encode((uint16_t)gw.second.performed_full_startup, bl); encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl); - encode(gw.second.subsystems, bl); + dout(20) << "encode availability " << gw.second.availability + << " startup " << (int)gw.second.performed_full_startup << dendl; + encode(gw.second.subsystems, bl, features); encode((uint32_t)gw.second.blocklist_data.size(), bl); for (auto &blklst_itr: gw.second.blocklist_data) { @@ -488,7 +509,7 @@ inline void encode(const NvmeGwMonStates& gws, ceph::bufferlist &bl, encode((uint32_t)gw.second.availability, bl); encode((uint16_t)gw.second.performed_full_startup, bl); encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl); - encode(gw.second.subsystems, bl); // TODO reuse but put features - encode version + encode(gw.second.subsystems, bl, features); Blocklist_data bl_data[MAX_SUPPORTED_ANA_GROUPS]; for (auto &blklst_itr: gw.second.blocklist_data) { bl_data[blklst_itr.first].osd_epoch = blklst_itr.second.osd_epoch; @@ -590,6 +611,7 @@ inline void decode( dout(20) << "decode addr_vect and beacon_index" << dendl; gw_created.addr_vect.decode(bl); decode(gw_created.beacon_index, bl); + dout(20) << "decoded beacon_index " << gw_created.beacon_index << dendl; } gws[gw_name] = gw_created; @@ -824,22 +846,41 @@ inline void decode(BeaconListener& ls, ceph::buffer::list::const_iterator &bl) { DECODE_FINISH(bl); } -inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl) { - ENCODE_START(1, 1, bl); +inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl, uint64_t features) { + uint8_t version = 1; + if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) { + version = 2; + } + // For legacy encoding, skip deleted subsystems to maintain compatibility + if (version == 1 && + sub.change_descriptor != subsystem_change_t::SUBSYSTEM_ADDED) { + dout(4) << "encode BeaconSubsystem: skipping subsystem " << sub.nqn + << " with change_descriptor " << (int)sub.change_descriptor + << " in legacy mode" << dendl; + return; // Skip encoding this subsystem entirely + } + + ENCODE_START(version, version, bl); encode(sub.nqn, bl); + dout(20) << "encode BeaconSubsystems " << sub.nqn <<" features " << features + << " version " << (int)version << dendl; encode((uint32_t)sub.listeners.size(), bl); for (const auto& ls: sub.listeners) encode(ls, bl); encode((uint32_t)sub.namespaces.size(), bl); for (const auto& ns: sub.namespaces) encode(ns, bl); + if (version >= 2) { + encode((uint32_t)sub.change_descriptor, bl); + dout(20) << "encode BeaconSubsystems change-descr: " << (uint32_t)sub.change_descriptor << dendl; + } ENCODE_FINISH(bl); } inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) { - DECODE_START(1, bl); - dout(20) << "decode BeaconSubsystems " << dendl; + DECODE_START(2, bl); decode(sub.nqn, bl); + dout(20) << "decode BeaconSubsystems " << sub.nqn << dendl; uint32_t s; sub.listeners.clear(); decode(s, bl); @@ -856,6 +897,12 @@ inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) decode(ns, bl); sub.namespaces.push_back(ns); } + if (struct_v >= 2) { + uint32_t change_desc; + decode(change_desc, bl); + sub.change_descriptor = static_cast(change_desc); + dout(20) << "decode BeaconSubsystems version >= " << 2 << dendl; + } DECODE_FINISH(bl); } diff --git a/src/mon/NVMeofGwTypes.h b/src/mon/NVMeofGwTypes.h index 6a2f9506e2e3..cd22dcbc4fe6 100755 --- a/src/mon/NVMeofGwTypes.h +++ b/src/mon/NVMeofGwTypes.h @@ -18,6 +18,12 @@ #include #include #include +#include +#include +#include +#include +#include "include/types.h" +#include "msg/msg_types.h" using NvmeGwId = std::string; using NvmeGroupKey = std::pair; @@ -47,6 +53,12 @@ enum class gw_availability_t { GW_DELETED }; +enum class subsystem_change_t { + SUBSYSTEM_ADDED, + SUBSYSTEM_CHANGED, + SUBSYSTEM_DELETED +}; + #define REDUNDANT_GW_ANA_GROUP_ID 0xFF using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>; @@ -86,6 +98,7 @@ struct BeaconSubsystem { NvmeNqnId nqn; std::list listeners; std::list namespaces; + subsystem_change_t change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED; // Define the equality operator bool operator==(const BeaconSubsystem& other) const { @@ -131,7 +144,9 @@ struct NvmeGwMonState { BlocklistData blocklist_data; //ceph entity address allocated for the GW-client that represents this GW-id entity_addrvec_t addr_vect; - uint16_t beacon_index = 0; + uint64_t beacon_sequence = 0;// sequence number of last beacon copied to GW state + bool beacon_sequence_ooo = false; // last beacon sequence was out of order; + uint16_t beacon_index = 0; // used for filter acks sent to the client as response to beacon /** * during redeploy action and maybe other emergency use-cases gw performs scenario * that we call fast-reboot. It quickly reboots(due to redeploy f.e) and sends the @@ -170,6 +185,9 @@ struct NvmeGwMonState { // it expects it performed the full startup performed_full_startup = false; } + void reset_beacon_sequence(){ + beacon_sequence = 0; + } void standby_state(NvmeAnaGrpId grpid) { sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE; } @@ -231,12 +249,16 @@ struct NvmeGwClientState { epoch_t gw_map_epoch; GwSubsystems subsystems; gw_availability_t availability; - NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available) - : group_id(id), gw_map_epoch(epoch), availability(available) {} + uint64_t last_beacon_seq_number; + bool last_beacon_seq_ooo; //out of order sequence + NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available, + uint64_t sequence, bool sequence_ooo) + : group_id(id), gw_map_epoch(epoch), availability(available), + last_beacon_seq_number(sequence), last_beacon_seq_ooo(sequence_ooo) {} NvmeGwClientState() : NvmeGwClientState( - REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {} + REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE, 0, 0) {} }; struct Tmdata { diff --git a/src/nvmeof/NVMeofGwMonitorClient.cc b/src/nvmeof/NVMeofGwMonitorClient.cc index 5d37924573a3..3cd31a389696 100644 --- a/src/nvmeof/NVMeofGwMonitorClient.cc +++ b/src/nvmeof/NVMeofGwMonitorClient.cc @@ -12,6 +12,7 @@ */ #include +#include #include "common/errno.h" #include "common/signal.h" @@ -19,6 +20,7 @@ #include "include/compat.h" #include "include/stringify.h" +#include "include/ceph_features.h" #include "global/global_context.h" #include "global/signal_handler.h" @@ -28,6 +30,7 @@ #include "NVMeofGwMonitorClient.h" #include "NVMeofGwClient.h" #include "NVMeofGwMonitorGroupClient.h" +#include "nvmeof/NVMeofGwUtils.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mon @@ -41,6 +44,8 @@ NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) : last_map_time(std::chrono::steady_clock::now()), reset_timestamp(std::chrono::steady_clock::now()), start_time(last_map_time), + cluster_beacon_diff_included(0), + poolctx(), monc{g_ceph_context, poolctx}, client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())), objecter{g_ceph_context, client_messenger.get(), &monc, poolctx}, @@ -214,7 +219,7 @@ void NVMeofGwMonitorClient::send_beacon() { ceph_assert(ceph_mutex_is_locked_by_me(beacon_lock)); gw_availability_t gw_availability = gw_availability_t::GW_CREATED; - BeaconSubsystems subs; + BeaconSubsystems current_subsystems; NVMeofGwClient gw_client( grpc::CreateChannel(gateway_address, gw_creds())); subsystems_info gw_subsystems; @@ -234,26 +239,43 @@ void NVMeofGwMonitorClient::send_beacon() BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() }; bsub.listeners.push_back(bls); } - subs.push_back(bsub); + current_subsystems.push_back(bsub); } } + // Determine change descriptors by comparing with previous beacon's subsystems + BeaconSubsystems subsystem_diff = current_subsystems; + determine_subsystem_changes(prev_beacon_subsystems, subsystem_diff); + auto group_key = std::make_pair(pool, group); NvmeGwClientState old_gw_state; // if already got gateway state in the map if (first_beacon == false && get_gw_state("old map", map, group_key, name, old_gw_state)) gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE; - dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << + dout(1) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl; + + // Check if NVMEOF_BEACON_DIFF feature is supported by the cluster + dout(10) << fmt::format("NVMEOF_BEACON_DIFF supported: {}", cluster_beacon_diff_included ? "yes" : "no") << dendl; + + // Send beacon with appropriate version based on cluster features auto m = ceph::make_message( name, pool, group, - subs, + cluster_beacon_diff_included ? subsystem_diff : current_subsystems, gw_availability, osdmap_epoch, - gwmap_epoch); + gwmap_epoch, + beacon_sequence, + // Pass affected features to the constructor + cluster_beacon_diff_included ? CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF : 0 + ); + dout(10) << "sending beacon with diff support: " << (cluster_beacon_diff_included ? "enabled" : "disabled") << dendl; + monc.send_mon_message(std::move(m)); + ++beacon_sequence; + prev_beacon_subsystems = std::move(current_subsystems); } void NVMeofGwMonitorClient::disconnect_panic() @@ -391,6 +413,22 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) // ensure that the gateway state has not vanished ceph_assert(got_new_gw_state || !got_old_gw_state); + // Check if the last_beacon_seq_number in the received map doesn't match our last sent beacon_sequence + // beacon_sequence is incremented after sending, so we compare with (beacon_sequence - 1) + { + std::lock_guard bl(beacon_lock); + if (got_new_gw_state && new_gw_state.last_beacon_seq_ooo) { + //new_gw_state.last_beacon_seq_number != (beacon_sequence - 1)) { + dout(4) << "Beacon sequence mismatch detected. Expected: " << (beacon_sequence - 1) + << ", received: " << new_gw_state.last_beacon_seq_number + << ". Truncating previous subsystems list." << dendl; + beacon_sequence = new_gw_state.last_beacon_seq_number + 1; + prev_beacon_subsystems.clear(); + dout(4) << "OOO map received, Ignore it" << dendl; + return; + } + } + if (!got_old_gw_state) { if (!got_new_gw_state) { dout(10) << "Can not find new gw state" << dendl; @@ -425,10 +463,29 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) } } + // Combined subsystems + const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0); + GwSubsystems combined_subsystems = new_gw_state.subsystems; + for (const auto& nqn_state_pair: old_gw_state.subsystems) { + const auto& nqn = nqn_state_pair.first; + auto& old_nqn_state = nqn_state_pair.second; + + // The monitor might remove active subsystems from the new distributed GwSubsystems. + // In such cases, ensure an INACCESSIBLE state is generated for subsystems + // that were present in the old state but are now missing. + if (new_gw_state.subsystems.find(nqn) == new_gw_state.subsystems.end()) { + ana_state_t all_disabled(old_nqn_state.ana_state.size(), initial_ana_state); + dout(4) << "set all groups to Inacccessible stat for " << nqn << dendl; + NqnState nqn_state(nqn, all_disabled); + + combined_subsystems.insert({nqn, nqn_state}); + } + } + // Gather all state changes ana_info ai; epoch_t max_blocklist_epoch = 0; - for (const auto& nqn_state_pair: new_gw_state.subsystems) { + for (const auto& nqn_state_pair: combined_subsystems) { auto& sub = nqn_state_pair.second; const auto& nqn = nqn_state_pair.first; nqn_ana_states nas; @@ -442,7 +499,6 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) sub.ana_state.size(); for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) { - const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0); auto new_group_state = (ana_grp_index < sub.ana_state.size()) ? sub.ana_state[ana_grp_index] : initial_ana_state; @@ -496,6 +552,12 @@ Dispatcher::dispatch_result_t NVMeofGwMonitorClient::ms_dispatch2(const ref_tget_type() << dendl; + // print connection features for all incoming messages and update cluster features + if (m->get_connection()) { + cluster_beacon_diff_included = monc.get_monmap_required_features().contains_all(ceph::features::mon::FEATURE_NVMEOF_BEACON_DIFF); + dout(10) << fmt::format("Updated cluster features: 0x{:x}", cluster_beacon_diff_included) << dendl; + } + if (m->get_type() == MSG_MNVMEOF_GW_MAP) { handle_nvmeof_gw_map(ref_cast(m)); return Dispatcher::HANDLED(); diff --git a/src/nvmeof/NVMeofGwMonitorClient.h b/src/nvmeof/NVMeofGwMonitorClient.h index 65ab9bb5eb7c..217a70441470 100644 --- a/src/nvmeof/NVMeofGwMonitorClient.h +++ b/src/nvmeof/NVMeofGwMonitorClient.h @@ -54,7 +54,9 @@ private: bool first_beacon = true; bool set_group_id = false; - + uint64_t beacon_sequence = 0; + BeaconSubsystems prev_beacon_subsystems; + bool cluster_beacon_diff_included = 0; // track cluster features for beacon encoding // init gw ssl opts void init_gw_ssl_opts(); diff --git a/src/nvmeof/NVMeofGwUtils.cc b/src/nvmeof/NVMeofGwUtils.cc new file mode 100644 index 000000000000..9beccc8668c9 --- /dev/null +++ b/src/nvmeof/NVMeofGwUtils.cc @@ -0,0 +1,53 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2025 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "nvmeof/NVMeofGwUtils.h" + +void determine_subsystem_changes(const BeaconSubsystems& old_subsystems, + BeaconSubsystems& new_subsystems) { + BeaconSubsystems result; + + // for each subsystem in new_subsystems, check if it's added or changed + for (const auto& new_sub : new_subsystems) { + auto old_it = std::find_if(old_subsystems.begin(), old_subsystems.end(), + [&](const BeaconSubsystem& s) { return s.nqn == new_sub.nqn; }); + if (old_it == old_subsystems.end()) { + // Subsystem not found in old list - it's new + BeaconSubsystem added = new_sub; + added.change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED; + result.push_back(std::move(added)); + } else { + // subsystem exists - check if it changed + if (!(*old_it == new_sub)) { + BeaconSubsystem changed = new_sub; + changed.change_descriptor = subsystem_change_t::SUBSYSTEM_CHANGED; + result.push_back(std::move(changed)); + } + // else: unchanged, do not add + } + } + + // for any subsystem in old_subsystems not present in new_subsystems, add as deleted + for (const auto& old_sub : old_subsystems) { + auto found = std::find_if(new_subsystems.begin(), new_subsystems.end(), + [&](const BeaconSubsystem& s) { return s.nqn == old_sub.nqn; }); + if (found == new_subsystems.end()) { + BeaconSubsystem deleted_sub = old_sub; + deleted_sub.change_descriptor = subsystem_change_t::SUBSYSTEM_DELETED; + result.push_back(std::move(deleted_sub)); + } + } + + new_subsystems = std::move(result); +} + diff --git a/src/nvmeof/NVMeofGwUtils.h b/src/nvmeof/NVMeofGwUtils.h new file mode 100644 index 000000000000..a9c25317d81f --- /dev/null +++ b/src/nvmeof/NVMeofGwUtils.h @@ -0,0 +1,23 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2025 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef __NVMEOFGWUTILS_H__ +#define __NVMEOFGWUTILS_H__ +#include "mon/NVMeofGwTypes.h" +#include + +// utility for diffing nvmeof subsystems changes +void determine_subsystem_changes(const BeaconSubsystems& old_subsystems, + BeaconSubsystems& new_subsystems); + +#endif diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 37d957e1be5b..fbec0c9ba3fd 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -1081,3 +1081,11 @@ add_ceph_unittest(unittest_ceph_assert) target_link_libraries(unittest_ceph_assert ceph-common global) endif() +add_executable(test_nvmeof_gw_utils + test_nvmeof_gw_utils.cc + ../nvmeof/NVMeofGwUtils.cc + ) +target_link_libraries(test_nvmeof_gw_utils + mon ceph-common global-static + ) + diff --git a/src/test/test_nvmeof_gw_utils.cc b/src/test/test_nvmeof_gw_utils.cc new file mode 100644 index 000000000000..c7d2ccfa561e --- /dev/null +++ b/src/test/test_nvmeof_gw_utils.cc @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2025 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "nvmeof/NVMeofGwUtils.h" +#include "mon/NVMeofGwTypes.h" +#include +#include +#include "include/ceph_assert.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout + +void test_determine_subsystem_changes() { + std::cout << __func__ << "\n\n" << std::endl; + // Prepare old and new subsystems + BeaconSubsystem sub1_old = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystem sub2_old = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystem sub3_old = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystems old_subs = { sub1_old, sub2_old, sub3_old }; + + // sub1 unchanged, sub2 changed, sub4 added, sub3 deleted + BeaconSubsystem sub1_new = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystem sub2_new = { "nqn2", { {"IPv4", "1.2.3.4", "4420"} }, {}, subsystem_change_t::SUBSYSTEM_ADDED }; // changed listeners + BeaconSubsystem sub4_new = { "nqn4", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystems new_subs = { sub1_new, sub2_new, sub4_new }; + + determine_subsystem_changes(old_subs, new_subs); + + // After call, new_subs should only contain changed, added, and deleted subsystems + // sub1 (unchanged) should be removed + // sub2 (changed) should be present with SUBSYSTEM_CHANGED + // sub4 (added) should be present with SUBSYSTEM_ADDED + // sub3 (deleted) should be present with SUBSYSTEM_DELETED + bool found_sub2 = false, found_sub3 = false, found_sub4 = false; + for (const auto& s : new_subs) { + if (s.nqn == "nqn2") { + found_sub2 = true; + ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED); + } else if (s.nqn == "nqn3") { + found_sub3 = true; + ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED); + } else if (s.nqn == "nqn4") { + found_sub4 = true; + ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED); + } else { + ceph_assert(false && "Unexpected subsystem in result"); + } + } + ceph_assert(found_sub2 && found_sub3 && found_sub4); + std::cout << "determine_subsystem_changes test passed" << std::endl; +} + +int main(int argc, const char **argv) { + test_determine_subsystem_changes(); + return 0; +} diff --git a/src/test/test_nvmeof_mon_encoding.cc b/src/test/test_nvmeof_mon_encoding.cc index 337d232eb336..9d936b8021d8 100644 --- a/src/test/test_nvmeof_mon_encoding.cc +++ b/src/test/test_nvmeof_mon_encoding.cc @@ -36,12 +36,17 @@ void test_NVMeofGwMap() { std::string pool = "pool1"; std::string group = "grp1"; auto group_key = std::make_pair(pool, group); - pending_map.cfg_add_gw("GW1" ,group_key); - pending_map.cfg_add_gw("GW2" ,group_key); - pending_map.cfg_add_gw("GW3" ,group_key); + std::string nqn = "nqn-nqn"; + BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED }; + BeaconSubsystems subs = {sub}; + + pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL); + pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL); + pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL); NvmeNonceVector new_nonces = {"abc", "def","hij"}; pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; pending_map.created_gws[group_key]["GW1"].performed_full_startup = true; + pending_map.created_gws[group_key]["GW1"].subsystems = subs; int i = 0; for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ blklst_itr.second.osd_epoch = 2*(i++); @@ -53,21 +58,25 @@ void test_NVMeofGwMap() { dout(0) << pending_map << dendl; ceph::buffer::list bl; + dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl; + pending_map.encode(bl, CEPH_FEATURES_ALL); auto p = bl.cbegin(); pending_map.decode(p); dout(0) << " == Dump map after Decode: == " < map; std::string pool = "pool1"; std::string group = "grp1"; std::string gw_id = "GW1"; - NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE); + NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE, 0, false); std::string nqn = "nqn"; ana_state_t ana_state; NqnState nqn_state(nqn, ana_state); @@ -82,9 +91,9 @@ void test_MNVMeofGwMap() { encode(map, bl, CEPH_FEATURES_ALL); dout(0) << "encoded: " << map << dendl; decode(map, bl); - dout(0) << "decode: " << map << dendl; + dout(0) << "decoded: " << map << dendl; - BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; NVMeofGwMap pending_map; pending_map.epoch = 2; auto msg1 = make_message(pending_map); @@ -95,12 +104,13 @@ void test_MNVMeofGwMap() { int epoch = msg1->get_gwmap_epoch(); dout(0) << "after decode empty msg: " << *msg1 << " epoch " << epoch << dendl; - pending_map.cfg_add_gw("GW1" ,group_key); - pending_map.cfg_add_gw("GW2" ,group_key); - pending_map.cfg_add_gw("GW3" ,group_key); + pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL); + pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL); + pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL); NvmeNonceVector new_nonces = {"abc", "def","hij"}; pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub); + int i = 0; for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ blklst_itr.second.osd_epoch = 2*(i++); @@ -116,7 +126,7 @@ void test_MNVMeofGwMap() { dout(0) << "after encode msg: " << *msg << dendl; msg->decode_payload(); dout(0) << "after decode msg: " << *msg << dendl; - + //dout(0) << "\n == Test GW Delete ==" << dendl; //pending_map.cfg_delete_gw("GW1" ,group_key); //dout(0) << "deleted GW1 " << pending_map << dendl; @@ -128,10 +138,10 @@ void test_MNVMeofGwMap() { //dout(0) << "deleted GW2 " << pending_map << dendl; //dout(0) << "delete of wrong gw id" << dendl; - //pending_map.cfg_delete_gw("wow" ,group_key); + //pending_map.cfg_delete_gw("wow" ,group_key, true); - pending_map.cfg_delete_gw("GW3" ,group_key); - dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl; + //pending_map.cfg_delete_gw("GW3" ,group_key); + //dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl; } @@ -142,11 +152,13 @@ void test_MNVMeofGwBeacon() { std::string gw_group = "group"; gw_availability_t availability = gw_availability_t::GW_AVAILABLE; std::string nqn = "nqn"; - BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED }; BeaconSubsystems subs = { sub }; epoch_t osd_epoch = 17; epoch_t gwmap_epoch = 42; - + uint64_t sequence = 12345; + + // Test legacy beacon (without diff support) auto msg = make_message( gw_id, gw_pool, @@ -154,22 +166,87 @@ void test_MNVMeofGwBeacon() { subs, availability, osd_epoch, - gwmap_epoch); - msg->encode_payload(0); + gwmap_epoch + // sequence defaults to 0 + // enable_diff defaults to false + ); + msg->encode_payload(CEPH_FEATURES_ALL); msg->decode_payload(); - dout(0) << "decode msg: " << *msg << dendl; + dout(0) << "decode msg (revision 1): " << *msg << dendl; ceph_assert(msg->get_gw_id() == gw_id); ceph_assert(msg->get_gw_pool() == gw_pool); ceph_assert(msg->get_gw_group() == gw_group); ceph_assert(msg->get_availability() == availability); ceph_assert(msg->get_last_osd_epoch() == osd_epoch); ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch); + // Legacy beacons don't preserve sequence field - it gets reset to 0 + ceph_assert(msg->get_sequence() == 0); const auto& dsubs = msg->get_subsystems(); auto it = std::find_if(dsubs.begin(), dsubs.end(), [&nqn](const auto& element) { return element.nqn == nqn; }); ceph_assert(it != dsubs.end()); + ceph_assert(it->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED); + + // Test enhanced beacon (with diff support) + auto msg2 = make_message( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch, + sequence, + true // enable_diff = true + ); + msg2->encode_payload(CEPH_FEATURES_ALL); + msg2->decode_payload(); + dout(0) << "decode msg (revision 2): " << *msg2 << dendl; + ceph_assert(msg2->get_gw_id() == gw_id); + ceph_assert(msg2->get_gw_pool() == gw_pool); + ceph_assert(msg2->get_gw_group() == gw_group); + ceph_assert(msg2->get_availability() == availability); + ceph_assert(msg2->get_last_osd_epoch() == osd_epoch); + ceph_assert(msg2->get_last_gwmap_epoch() == gwmap_epoch); + ceph_assert(msg2->get_sequence() == sequence); + const auto& dsubs2 = msg2->get_subsystems(); + auto it2 = std::find_if(dsubs2.begin(), dsubs2.end(), + [&nqn](const auto& element) { + return element.nqn == nqn; + }); + ceph_assert(it2 != dsubs2.end()); + ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED); +} + +void test_subsystem_change_descriptors() { + dout(0) << __func__ << "\n\n" << dendl; + // Test different change descriptors + BeaconSubsystem sub1 = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystem sub2 = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED }; + BeaconSubsystem sub3 = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED }; + BeaconSubsystems subs = { sub1, sub2, sub3 }; + // Encode and decode + ceph::buffer::list bl; + encode(subs, bl, CEPH_FEATURES_ALL); + auto p = bl.cbegin(); + BeaconSubsystems decoded_subs; + decode(decoded_subs, p); + // Verify change descriptors are preserved + auto it1 = std::find_if(decoded_subs.begin(), decoded_subs.end(), + [](const auto& s) { return s.nqn == "nqn1"; }); + auto it2 = std::find_if(decoded_subs.begin(), decoded_subs.end(), + [](const auto& s) { return s.nqn == "nqn2"; }); + auto it3 = std::find_if(decoded_subs.begin(), decoded_subs.end(), + [](const auto& s) { return s.nqn == "nqn3"; }); + ceph_assert(it1 != decoded_subs.end()); + ceph_assert(it2 != decoded_subs.end()); + ceph_assert(it3 != decoded_subs.end()); + ceph_assert(it1->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED); + ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED); + ceph_assert(it3->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED); + dout(0) << "Subsystem change descriptors test passed" << dendl; } void test_NVMeofGwTimers() @@ -207,6 +284,7 @@ int main(int argc, const char **argv) test_NVMeofGwMap(); test_MNVMeofGwMap(); test_MNVMeofGwBeacon(); + test_subsystem_change_descriptors(); test_NVMeofGwTimers(); }