coming according to new schema fixes for failover/failback/gw fast-reboot.
src/messages/MNVMeofGwBeacon.h: add sequence number
truncate prev_beacon_subsystems on beacon sequence mismatch
do not process OOO map
Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
nvmeofgw : monitor changes for the beacon-diff feature
process beacons by beacon-diff new schema
detect sequence out of order(ooo) condition and handle it
in case ooo detected send ack to the gw with the expected correct sequence
skip failovers for some interval when ooo detected
ignore all becons with incorrect sequences until gw sends expected one
Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
nvmeofgw: this commit adds upgrade rules for the beacon-diff
Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
nvmeofgw: CR change: use fmt::format() instead of std::hex stream manipulators
Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
nvmeofgw: CR change: use BEACON_SUBSYS_VERSION_ENHANCED instead of verson number
Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
mon: CR change: centralize beacon version constants to eliminate duplication
Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
Fixes: https://tracker.ceph.com/issues/72394
${nvmeof_monitor_grpc_hdrs}
ceph_nvmeof_monitor_client.cc
nvmeof/NVMeofGwClient.cc
+ nvmeof/NVMeofGwUtils.cc
nvmeof/NVMeofGwMonitorGroupClient.cc
nvmeof/NVMeofGwMonitorClient.cc)
add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs})
DEFINE_CEPH_FEATURE(44, 2, NVMEOFHA)
DEFINE_CEPH_FEATURE_RETIRED(45, 1, OSD_SET_ALLOC_HINT, JEWEL, LUMINOUS)
DEFINE_CEPH_FEATURE(45, 2, NVMEOFHAMAP)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACONDIFF)
// available
DEFINE_CEPH_FEATURE(46, 1, OSD_FADVISE_FLAGS)
DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_REPOP, JEWEL, LUMINOUS) // overlap
DEFINE_CEPH_FEATURE_RETIRED(50, 1, MON_METADATA, MIMIC, OCTOPUS)
DEFINE_CEPH_FEATURE(50, 2, SERVER_TENTACLE);
DEFINE_CEPH_FEATURE_RETIRED(51, 1, OSD_BITWISE_HOBJ_SORT, MIMIC, OCTOPUS)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACON_DIFF)
// available
DEFINE_CEPH_FEATURE_RETIRED(52, 1, OSD_PROXY_WRITE_FEATURES, MIMIC, OCTOPUS)
// available
CEPH_FEATUREMASK_SERVER_REEF | \
CEPH_FEATUREMASK_SERVER_SQUID | \
CEPH_FEATUREMASK_SERVER_TENTACLE | \
+ CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF | \
0ULL)
#define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL
#include "mon/MonCommand.h"
#include "mon/NVMeofGwMap.h"
#include "include/types.h"
+#include "mon/NVMeofGwBeaconConstants.h"
class MNVMeofGwBeacon final : public PaxosServiceMessage {
private:
- static constexpr int HEAD_VERSION = 1;
- static constexpr int COMPAT_VERSION = 1;
protected:
std::string gw_id;
gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable
epoch_t last_osd_epoch;
epoch_t last_gwmap_epoch;
+ uint64_t sequence = 0; // sequence number for each beacon message
public:
MNVMeofGwBeacon()
- : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}
+ : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, BEACON_VERSION_ENHANCED,
+ BEACON_VERSION_LEGACY}, sequence(0)
{
set_priority(CEPH_MSG_PRIO_HIGH);
}
const BeaconSubsystems& subsystems_,
const gw_availability_t& availability_,
const epoch_t& last_osd_epoch_,
- const epoch_t& last_gwmap_epoch_
- )
- : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
+ const epoch_t& last_gwmap_epoch_,
+ uint64_t sequence_ = 0, // default sequence for backward compatibility
+ bool enable_diff = false) // default to legacy behavior for backward compatibility
+ : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON,
+ static_cast<version_t>(enable_diff ? 1 : 0), // user_version: 1=enhanced, 0=legacy
+ enable_diff ? BEACON_VERSION_ENHANCED :
+ BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY},// Minimum compatible version
gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_),
- availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_)
+ availability(availability_), last_osd_epoch(last_osd_epoch_),
+ last_gwmap_epoch(last_gwmap_epoch_), sequence(sequence_)
{
set_priority(CEPH_MSG_PRIO_HIGH);
}
const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; }
const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; }
const BeaconSubsystems& get_subsystems() const { return subsystems; };
+ uint64_t get_sequence() const { return sequence; }
private:
~MNVMeofGwBeacon() final {}
encode(gw_id, payload);
encode(gw_pool, payload);
encode(gw_group, payload);
- encode(subsystems, payload);
+ encode(subsystems, payload, features);
encode((uint32_t)availability, payload);
encode(last_osd_epoch, payload);
encode(last_gwmap_epoch, payload);
+ // Only encode sequence for enhanced beacons (HEAD_VERSION >= 2)
+ if (get_header().version >= BEACON_VERSION_ENHANCED) {
+ encode(sequence, payload);
+ }
}
void decode_payload() override {
availability = static_cast<gw_availability_t>(tmp);
decode(last_osd_epoch, p);
decode(last_gwmap_epoch, p);
+ // Only decode sequence for enhanced beacons (HEAD_VERSION >= 2)
+ if (get_header().version >= BEACON_VERSION_ENHANCED && !p.end()) {
+ decode(sequence, p);
+ } else {
+ sequence = 0; // Legacy beacons don't have sequence field
+ }
}
private:
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef CEPH_NVMEOFGWBEACONCONSTANTS_H
+#define CEPH_NVMEOFGWBEACONCONSTANTS_H
+
+// This header contains version constants used across multiple files
+// to avoid duplication and maintain consistency.
+
+// Beacon version constants
+#define BEACON_VERSION_LEGACY 1 // Legacy beacon format (no diff support)
+#define BEACON_VERSION_ENHANCED 2 // Enhanced beacon format (with diff support)
+
+#endif /* CEPH_NVMEOFGWBEACONCONSTANTS_H */
}
auto gw_state = NvmeGwClientState(
- gw_created.ana_grp_id, epoch, availability);
+ gw_created.ana_grp_id, epoch, availability, gw_created.beacon_sequence,
+ gw_created.beacon_sequence_ooo);
for (const auto& sub: gw_created.subsystems) {
gw_state.subsystems.insert({
sub.nqn,
}
int NVMeofGwMap::cfg_add_gw(
- const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+ const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool test)
{
std::set<NvmeAnaGrpId> allocated;
- if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+ if (test || HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
auto gw_epoch_it = gw_epoch.find(group_key);
if (gw_epoch_it == gw_epoch.end()) {
gw_epoch[group_key] = epoch;
}
int NVMeofGwMap::cfg_delete_gw(
- const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+ const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool test)
{
+ if (test)
+ return do_delete_gw(gw_id, group_key);
+
if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHA)) {
dout(10) << " has NVMEOFHA: 1" << dendl;
for (auto& gws_states: created_gws[group_key]) {
if (gws_states.first == gw_id) {
auto& state = gws_states.second;
+ if (state.availability == gw_availability_t::GW_AVAILABLE) {
+ /*prevent failover because blocklisting right now cause IO errors */
+ dout(4) << "Delete GW: set skip-failovers for group " << gw_id
+ << " group " << group_key << dendl;
+ skip_failovers_for_group(group_key, 5);
+ }
state.availability = gw_availability_t::GW_DELETING;
dout(4) << " Deleting GW :"<< gw_id << " in state "
<< state.availability << " Resulting GW availability: "
<< state.availability << dendl;
- state.subsystems.clear();//ignore subsystems of this GW
utime_t now = ceph_clock_now();
mon->nvmegwmon()->gws_deleting_time[group_key][gw_id] = now;
return 0;
}
}
-void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key)
+void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key,
+ int interval_sec)
{
- const auto skip_failovers = g_conf().get_val<std::chrono::seconds>
- ("mon_nvmeofgw_skip_failovers_interval");
+ std::chrono::seconds skip_failovers;
+ if (interval_sec == 0) {
+ skip_failovers = g_conf().get_val<std::chrono::seconds>
+ ("mon_nvmeofgw_skip_failovers_interval");
+ } else {
+ skip_failovers = std::chrono::seconds(interval_sec);
+ }
for (auto& gw_created: created_gws[group_key]) {
gw_created.second.allow_failovers_ts = std::chrono::system_clock::now()
+ skip_failovers;
auto& st = gw_state->second;
st.set_unavailable_state();
st.set_last_gw_down_ts();
+ st.reset_beacon_sequence();
for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
fsm_handle_gw_down(
gw_id, group_key, state_itr.second,
// find_already_created_gw(gw_id, group_key);
NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
NvmeNonceVector nonces;
+
+ NvmeAnaNonceMap nonce_map;
+ for (const auto& sub: gw_map.subsystems) { // recreate nonce map from subsystems
+ for (const auto& ns: sub.namespaces) {
+ auto& nonce_vec = nonce_map[ns.anagrpid-1]; //Converting ana groups to offsets
+ if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end())
+ nonce_vec.push_back(ns.nonce);
+ }
+ }
for (auto& state_itr: gw_map.sm_state) {
// to make blocklist on all clusters of the failing GW
- nonces.insert(nonces.end(), gw_map.nonce_map[state_itr.first].begin(),
- gw_map.nonce_map[state_itr.first].end());
+ nonces.insert(nonces.end(), nonce_map[state_itr.first].begin(),
+ nonce_map[state_itr.first].end());
}
-
+ gw_map.subsystems.clear();
if (nonces.size() > 0) {
- NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];;
+ NvmeNonceVector &nonce_vector = nonces;
std::string str = "[";
entity_addrvec_t addr_vect;
}
}
+bool NVMeofGwMap::put_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+ int gw_version, const NvmeGroupKey& group_key,
+ uint64_t beacon_sequence, uint64_t& old_sequence)
+{
+ bool rc = true;
+ NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+ (gw_version > 0) ) {
+ uint64_t seq_number = gw_map.beacon_sequence;
+ if ((beacon_sequence != seq_number+1) &&
+ !(beacon_sequence == 0 && seq_number == 0 )) {// new GW startup
+ rc = false;
+ old_sequence = seq_number;
+ dout(4) << "Warning: GW " << gw_id
+ << " sent beacon sequence out of order, expected "
+ << seq_number +1 << " received " << beacon_sequence << dendl;
+ gw_map.beacon_sequence_ooo = true;
+ } else {
+ gw_map.beacon_sequence = beacon_sequence;
+ }
+ }
+ return rc;
+}
+
+bool NVMeofGwMap::set_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+ int gw_version, const NvmeGroupKey& group_key, uint64_t beacon_sequence)
+{
+ NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+ (gw_version > 0)) {
+ gw_map.beacon_sequence = beacon_sequence;
+ gw_map.beacon_sequence_ooo = false;
+ dout(10) << gw_id << " set beacon_sequence " << beacon_sequence << dendl;
+ }
+ return true;
+}
+
+
void NVMeofGwMap::update_active_timers(bool &propose_pending)
{
const auto now = std::chrono::system_clock::now();
void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
void track_deleting_gws(const NvmeGroupKey& group_key,
const BeaconSubsystems& subs, bool &propose_pending);
- int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
- int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
void check_all_gws_in_deleting_state(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key);
+ int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+ bool test = false);
+ int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+ bool test = false);
void process_gw_map_ka(
const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
epoch_t& last_osd_epoch, bool &propose_pending);
int process_gw_map_gw_down(
- const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+ const NvmeGwId &gw_id, const Nv/meGroupKey& group_key,
bool &propose_pending);
int process_gw_map_gw_no_subsys_no_listeners(
const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
const NvmeGroupKey& group_key, bool &propose_pending);
void set_addr_vect(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, const entity_addr_t &addr_vect);
- void skip_failovers_for_group(const NvmeGroupKey& group_key);
+ void skip_failovers_for_group(const NvmeGroupKey& group_key,
+ int interval_sec = 0);
+ bool put_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, uint64_t beacon_sequence,
+ uint64_t& old_sequence);
+ bool set_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, uint64_t beacon_sequence);
private:
int do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
int do_erase_gw_id(const NvmeGwId &gw_id,
// force send ack after nearest beacon after leader re-election
gw_created_pair.second.beacon_index =
g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+ // force send full beacon after leader election
+ gw_created_pair.second.beacon_sequence = 0;
+ gw_created_pair.second.beacon_sequence_ooo = true;
}
}
}
for (auto &[group_key, gws_states]: pending_map.created_gws) {
BeaconSubsystems *subsystems = &empty_subsystems;
for (auto& gw_state : gws_states) { // loop for GWs inside nqn group
- subsystems = &gw_state.second.subsystems;
+ if (gw_state.second.availability == gw_availability_t::GW_AVAILABLE) {
+ subsystems = &gw_state.second.subsystems;
+ }
if (subsystems->size()) { // Set subsystems to the valid value
break;
}
* function called to restore in pending map all data that is not serialized
* to paxos peons. Othervise it would be overriden in "pending_map = map"
* currently "allow_failovers_ts", "last_gw_down_ts",
- * "last_gw_map_epoch_valid" variables are restored
+ * "last_gw_map_epoch_valid", "beacon_sequence", "beacon_index" variables are restored
*/
void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
gw_created_pair.second.last_gw_down_ts;
pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
gw_created_pair.second.last_gw_map_epoch_valid;
+ pending_map.created_gws[group_key][gw_id].beacon_index =
+ gw_created_pair.second.beacon_index;
+ pending_map.created_gws[group_key][gw_id].beacon_sequence =
+ gw_created_pair.second.beacon_sequence;
+ pending_map.created_gws[group_key][gw_id].beacon_sequence_ooo =
+ gw_created_pair.second.beacon_sequence_ooo;
}
}
}
}
pending_map.encode(bl, features);
dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
- << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
+ << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP)
+ << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) << dendl;
put_version(t, pending_map.epoch, bl);
put_last_committed(t, pending_map.epoch);
void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_last_committed();
-
+ uint64_t features = mon.get_quorum_con_features();
+ dout(20) << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)
+ << dendl;
if (version != map.epoch) {
dout(10) << " NVMeGW loading version " << version
<< " " << map.epoch << dendl;
return rc;
}
+void NVMeofGwMon::do_send_map_ack(MonOpRequestRef op,
+ bool gw_created, bool gw_propose,
+ uint64_t stored_sequence, bool is_correct_sequence,
+ const NvmeGroupKey& group_key, const NvmeGwId &gw_id) {
+ /* always send beacon ack to gw in Created state,
+ * it should be temporary state
+ * if epoch-filter-bit: send ack to beacon in case no propose
+ * or if changed something not relevant to gw-epoch
+ */
+ NVMeofGwMap ack_map;
+ if (gw_created) {
+ NvmeGwMonState& pending_gw_map = pending_map.created_gws[group_key][gw_id];
+ // respond with a map slice correspondent to the same GW
+ ack_map.created_gws[group_key][gw_id] = (gw_propose) ? //avail = CREATED
+ pending_gw_map : map.created_gws[group_key][gw_id];
+ ack_map.created_gws[group_key][gw_id].beacon_sequence =
+ pending_gw_map.beacon_sequence;
+ if (!is_correct_sequence) {
+ dout(4) << " GW " << gw_id <<
+ " sending ACK due to receiving beacon_sequence out of order" << dendl;
+ ack_map.created_gws[group_key][gw_id].beacon_sequence = stored_sequence;
+ ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = true;
+ } else {
+ ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = false;
+ }
+ if (gw_propose) {
+ dout(10) << "GW in Created " << gw_id << " ack map " << ack_map << dendl;
+ }
+ }
+ ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+ if (!gw_created)
+ dout(10) << "gw not created, ack map "
+ << ack_map << " epoch " << ack_map.epoch << dendl;
+ dout(20) << "ack_map " << ack_map <<dendl;
+ auto msg = make_message<MNVMeofGwMap>(ack_map);
+ mon.send_reply(op, msg.detach());
+}
+
+/*
+ * Any subsystem was added to the Beacon only if it( or it's encapsulated fields)
+ * was changed, added or deleted. If nothing changed the beacon did not
+ * encode any subsystem.
+ * New descriptor was added under the subsystem to describe
+ * the change : ADDED, DELETED, CHANGED
+ * rules for apply the subsystems to the map:
+ * Pass all subsystems in the beacon->sub list
+ * if descriptor is ADDED or CHANGED do the following
+ * look for subs nqn in the gw.subs list and if found - substitute,
+ * if not found - add
+ * if descriptor is DELETED do the following
+ * look for subs nqn in the gw.subs list and if found - delete
+ */
+int NVMeofGwMon::apply_beacon(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, void *msg,
+ const BeaconSubsystems& sub, gw_availability_t &avail,
+ bool &propose_pending)
+{
+ bool found = false;
+ bool changed = false;
+ BeaconSubsystems &gw_subs =
+ pending_map.created_gws[group_key][gw_id].subsystems;
+ auto &state = pending_map.created_gws[group_key][gw_id];
+
+ if (!HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOF_BEACON_DIFF) &&
+ gw_version == 0) {
+ if (gw_subs != sub) {
+ dout(10) << "BEACON_DIFF logic not applied."
+ "subsystems of GW changed, propose pending " << gw_id << dendl;
+ gw_subs = sub;
+ //rebuild nonce map.
+ state.nonce_map = ((MNVMeofGwBeacon *)msg)->get_nonce_map();
+ changed = true;
+ }
+ } else {
+ if (state.beacon_sequence_ooo) {
+ dout(10) << "Good sequence after out of order detection "
+ "sequence "<< state.beacon_sequence << " " << gw_id << dendl;
+ // need to clear subsystems for correct calculation of difference
+ state.subsystems.clear();
+ state.beacon_sequence_ooo = false;
+ propose_pending = true;
+ }
+
+ for (auto &subs_it: sub) {
+ if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED ||
+ subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED) {
+ found = false;
+ for (auto &gw_subs_it: gw_subs) {
+ if (gw_subs_it.nqn == subs_it.nqn) {
+ gw_subs_it = subs_it;
+ dout(10) << "subsystem changed " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ found = true;
+ changed = true;
+ break;
+ }
+ }
+ if (!found) {
+ gw_subs.push_back(subs_it);
+ changed = true;
+ dout(10) << "subsystem added " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ }
+ }
+ else
+ if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED)
+ {
+ auto it = std::find_if(gw_subs.begin(), gw_subs.end(),
+ [&](const auto& gw_subs_it) {
+ return gw_subs_it.nqn == subs_it.nqn;
+ });
+ if (it != gw_subs.end()) {
+ gw_subs.erase(it);
+ dout(10) << "subsystem deleted " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ changed = true;
+ }
+ }
+ }
+ }
+ if (changed) {
+ avail = gw_availability_t::GW_AVAILABLE;
+ }
+ if (gw_subs.size() == 0) {
+ avail = gw_availability_t::GW_CREATED;
+ dout(10) << "No-subsystems condition detected for GW " << gw_id <<dendl;
+ } else {
+ bool listener_found = false;
+ for (auto &subs: gw_subs) {
+ if (subs.listeners.size()) {
+ listener_found = true;
+ break;
+ }
+ }
+ if (!listener_found) {
+ dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
+ avail = gw_availability_t::GW_CREATED;
+ }
+ }// for HA no-subsystems and no-listeners are same usecases
+ if (avail == gw_availability_t::GW_UNAVAILABLE) {
+ dout(4) << "Warning: UNAVAILABLE gw " << gw_id << dendl;
+ }
+ return (changed == true ? 1:0);
+}
+
bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
{
auto m = op->get_req<MNVMeofGwBeacon>();
-
- dout(20) << "availability " << m->get_availability()
+ uint64_t sequence = m->get_sequence();
+ int version = m->version;
+ dout(10) << "availability " << m->get_availability()
+ << " sequence " << sequence << " version " << version
<< " GW : " << m->get_gw_id()
<< " osdmap_epoch " << m->get_last_osd_epoch()
- << " subsystems " << m->get_subsystems() << dendl;
+ << " subsystems " << m->get_subsystems().size() << dendl;
+
ConnectionRef con = op->get_connection();
NvmeGwId gw_id = m->get_gw_id();
NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group());
+ //"avail" variable will be changed inside the function
+ // when it becomes CREATED for several reasons GW's load balance group
+ // is serviced by another GW
gw_availability_t avail = m->get_availability();
bool propose = false;
bool nonce_propose = false;
bool timer_propose = false;
bool gw_propose = false;
bool gw_created = true;
+ bool correct_sequence = true;
+ uint64_t stored_sequence;
NVMeofGwMap ack_map;
bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
NVMEOFHAMAP);
<< map.created_gws << dendl;
goto set_propose;
} else {
+ pending_map.created_gws[group_key][gw_id].subsystems.clear();
+ pending_map.set_gw_beacon_sequence_number(gw_id, version,
+ group_key, sequence);
dout(4) << "GW beacon: Created state - full startup done " << gw_id
<< " GW state in monitor data-base : "
<< pending_map.created_gws[group_key][gw_id].availability
} else { // first GW beacon should come with avail = Created
// if GW reports Avail/Unavail but in monitor's database it is Unavailable
if (gw != group_gws.end()) {
+ correct_sequence = pending_map.put_gw_beacon_sequence_number
+ (gw_id, version, group_key, sequence, stored_sequence);
// it means it did not perform "exit" after failover was set by
// NVMeofGWMon
if ((pending_map.created_gws[group_key][gw_id].availability ==
mon.send_reply(op, msg.detach());
goto false_return;
}
+ if (!correct_sequence) {
+ if (avail == gw_availability_t::GW_AVAILABLE) {
+ /*prevent failover - give GW a chance to send the expected sequence */
+ dout(4) << "sequence ooo: set skip-failovers for group " << gw_id
+ << " group " << group_key << dendl;
+ pending_map.skip_failovers_for_group(group_key, 7);
+ }
+ avail = gw_availability_t::GW_CREATED;
+ // availability would be set to Active and GW receive the full map
+ // when it sends the correct beacon-sequence,
+ goto check_availability;
+ }
}
}
// Beacon from GW in !Created state but it does not appear in the map
pending_map.set_addr_vect(gw_id, group_key, con->get_peer_addr());
gw_propose = true;
}
- // deep copy the whole nonce map of this GW
- if (m->get_nonce_map().size()) {
- if (pending_map.created_gws[group_key][gw_id].nonce_map !=
- m->get_nonce_map()) {
- dout(10) << "nonce map of GW changed , propose pending "
- << gw_id << dendl;
- pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map();
- dout(10) << "nonce map of GW " << gw_id << " "
- << pending_map.created_gws[group_key][gw_id].nonce_map << dendl;
- nonce_propose = true;
- }
- } else {
- dout(10) << "Warning: received empty nonce map in the beacon of GW "
- << gw_id << " avail " << (int)avail << dendl;
- }
-
- if (sub.size() == 0) {
- avail = gw_availability_t::GW_CREATED;
- dout(20) << "No-subsystems condition detected for GW " << gw_id <<dendl;
- } else {
- bool listener_found = false;
- for (auto &subs: sub) {
- if (subs.listeners.size()) {
- listener_found = true;
- break;
- }
- }
- if (!listener_found) {
- dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
- avail = gw_availability_t::GW_CREATED;
- }
- }// for HA no-subsystems and no-listeners are same usecases
- if (pending_map.created_gws[group_key][gw_id].subsystems != sub) {
- dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl;
- pending_map.created_gws[group_key][gw_id].subsystems = sub;
- dout(20) << "subsystems of GW " << gw_id << " "
- << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
+ if (apply_beacon(gw_id, version, group_key, (void *)m, sub, avail, propose) !=0) {
nonce_propose = true;
+ dout(10) << "subsystem(subs/listener/nonce/NM) of GW changed, propose pending "
+ << gw_id << " available " << avail << dendl;
+ dout(20) << "subsystems of GW " << gw_id << " "
+ << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
}
pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
(get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
<< " epoch " << get_ack_map_epoch(true, group_key)
<< " beacon_epoch " << m->get_last_gwmap_epoch() << dendl;
}
+
+check_availability:
if (avail == gw_availability_t::GW_AVAILABLE) {
// check pending_map.epoch vs m->get_version() -
// if different - drop the beacon
-
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = now;
epoch_t last_osd_epoch = m->get_last_osd_epoch();
// Periodic: check active FSM timers
pending_map.update_active_timers(timer_propose);
- set_propose:
+set_propose:
propose |= (timer_propose | gw_propose | nonce_propose);
- apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+ apply_ack_logic = ((avail == gw_availability_t::GW_AVAILABLE)
+ && correct_sequence) ? true : false;
if ( (apply_ack_logic &&
((pending_map.created_gws[group_key][gw_id].beacon_index++
% beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
(!propose && !epoch_filter_enabled) ||
(avail == gw_availability_t::GW_CREATED)) ) {
- /* always send beacon ack to gw in Created state,
- * it should be temporary state
- * if epoch-filter-bit: send ack to beacon in case no propose
- * or if changed something not relevant to gw-epoch
- */
- if (gw_created) {
- // respond with a map slice correspondent to the same GW
- ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
- }
- ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
- if (!gw_created)
- dout(10) << "gw not created, ack map "
- << ack_map << " epoch " << ack_map.epoch << dendl;
- dout(20) << "ack_map " << ack_map <<dendl;
- auto msg = make_message<MNVMeofGwMap>(ack_map);
- mon.send_reply(op, msg.detach());
+ do_send_map_ack(op, gw_created, gw_propose, stored_sequence,
+ correct_sequence, group_key, gw_id);
} else {
mon.no_reply(op);
}
void recreate_gw_epoch();
void restore_pending_map_info(NVMeofGwMap & tmp_map);
void cleanup_pending_map();
+ int apply_beacon(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, void *msg,
+ const BeaconSubsystems& sub, gw_availability_t &avail, bool &propose_pending);
+ void do_send_map_ack(MonOpRequestRef op, bool gw_created, bool gw_propose,
+ uint64_t stored_sequence, bool is_correct_sequence,
+ const NvmeGroupKey& group_key, const NvmeGwId &gw_id);
};
#endif /* MON_NVMEGWMONITOR_H_ */
*/
#ifndef MON_NVMEOFGWSERIALIZE_H_
#define MON_NVMEOFGWSERIALIZE_H_
+
+#include "mon/NVMeofGwBeaconConstants.h"
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mon
#undef dout_prefix
}
inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) {
- os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ ";
+ os << "BeaconSubsystem( nqn:" << value.nqn << " descr "
+ << (uint32_t)value.change_descriptor << ", listeners [ ";
for (const auto& list: value.listeners) os << list << " ";
os << "] namespaces [ ";
for (const auto& ns: value.namespaces) os << ns << " ";
std::ostream& os, const NvmeGwClientState value) {
os << "NvmeGwState { group id: " << value.group_id
<< " gw_map_epoch " << value.gw_map_epoch
- << " availablilty "<< value.availability
+ << " availablilty " << value.availability
+ << " sequence " << value.last_beacon_seq_number
+ << " sequence-ooo " << value.last_beacon_seq_ooo
<< " GwSubsystems: [ ";
for (const auto& sub: value.subsystems) {
os << sub.second << " ";
for (const auto& sub: subsystems) {
encode(sub.second.nqn, bl);
if (version == 1) {
- dout(20) << "encode ana_state vector version1 = " << version << dendl;
+ dout(20) << "encode ana_state vector version1 = " << (int)version << dendl;
/* Version 1 requires exactly 16 entries */
ana_state_t filled(sub.second.ana_state);
filled.resize(
0));
encode(filled, bl);
} else {
- dout(20) << "encode ana_state vector version2 = " << version << dendl;
+ dout(20) << "encode ana_state vector version2 = " << (int)version << dendl;
encode(sub.second.ana_state, bl);
}
}
}
inline void encode(const NvmeGwClientState& state, ceph::bufferlist &bl, uint64_t features) {
- ENCODE_START(1, 1, bl);
+ uint8_t version = 1;
+ if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+ version = BEACON_VERSION_ENHANCED;
+ }
+ ENCODE_START(version, version, bl);
encode(state.group_id, bl);
encode(state.gw_map_epoch, bl);
encode (state.subsystems, bl, features);
encode((uint32_t)state.availability, bl);
+ if (version >= 2) {
+ encode((uint64_t)state.last_beacon_seq_number, bl);
+ encode(state.last_beacon_seq_ooo, bl);
+ }
ENCODE_FINISH(bl);
}
inline void decode(
NvmeGwClientState& state, ceph::bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(state.group_id, bl);
decode(state.gw_map_epoch, bl);
decode(state.subsystems, bl);
uint32_t avail;
+ uint64_t last_beacon_seq_number;
decode(avail, bl);
state.availability = (gw_availability_t)avail;
+ if (struct_v >= 2) {
+ decode(last_beacon_seq_number, bl);
+ state.last_beacon_seq_number = last_beacon_seq_number;
+ decode(state.last_beacon_seq_ooo, bl);
+ }
DECODE_FINISH(bl);
}
uint64_t features) {
ENCODE_START(1, 1, bl);
encode((uint32_t)nonce_map.size(), bl);
+ dout(20) << "encode nonce map size " << nonce_map.size() << dendl;
for (auto& ana_group_nonces : nonce_map) {
// ana group id
encode(ana_group_nonces.first, bl);
version = 3;
}
ENCODE_START(version, version, bl);
+ dout(20) << "encode NvmeGwMonStates. struct_v: " << (int)version << dendl;
encode ((uint32_t)gws.size(), bl); // number of gws in the group
for (auto& gw : gws) {
encode(gw.first, bl);// GW_id
encode(gw.second.ana_grp_id, bl); // GW owns this group-id
+ dout(20) << "encode gw-id " << gw.first << dendl;
if (version >= 2) {
encode((uint32_t)gw.second.sm_state.size(), bl);
for (auto &state_it:gw.second.sm_state) {
encode((uint32_t)gw.second.availability, bl);
encode((uint16_t)gw.second.performed_full_startup, bl);
encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
- encode(gw.second.subsystems, bl);
+ dout(20) << "encode availability " << gw.second.availability
+ << " startup " << (int)gw.second.performed_full_startup << dendl;
+ encode(gw.second.subsystems, bl, features);
encode((uint32_t)gw.second.blocklist_data.size(), bl);
for (auto &blklst_itr: gw.second.blocklist_data) {
encode((uint32_t)gw.second.availability, bl);
encode((uint16_t)gw.second.performed_full_startup, bl);
encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
- encode(gw.second.subsystems, bl); // TODO reuse but put features - encode version
+ encode(gw.second.subsystems, bl, features);
Blocklist_data bl_data[MAX_SUPPORTED_ANA_GROUPS];
for (auto &blklst_itr: gw.second.blocklist_data) {
bl_data[blklst_itr.first].osd_epoch = blklst_itr.second.osd_epoch;
dout(20) << "decode addr_vect and beacon_index" << dendl;
gw_created.addr_vect.decode(bl);
decode(gw_created.beacon_index, bl);
+ dout(20) << "decoded beacon_index " << gw_created.beacon_index << dendl;
}
gws[gw_name] = gw_created;
}
inline void encode(const BeaconNamespace& ns, ceph::bufferlist &bl) {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY, bl);
encode(ns.anagrpid, bl);
encode(ns.nonce, bl);
ENCODE_FINISH(bl);
}
inline void encode(const BeaconListener& ls, ceph::bufferlist &bl) {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(BEACON_VERSION_LEGACY, BEACON_VERSION_LEGACY, bl);
encode(ls.address_family, bl);
encode(ls.address, bl);
encode(ls.svcid, bl);
DECODE_FINISH(bl);
}
-inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl) {
- ENCODE_START(1, 1, bl);
+inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl, uint64_t features) {
+ uint8_t version = BEACON_VERSION_LEGACY; // Default to legacy version
+ if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+ version = BEACON_VERSION_ENHANCED; // Use enhanced version if feature supported
+ }
+ // For legacy encoding, skip deleted subsystems to maintain compatibility
+ if (version == BEACON_VERSION_LEGACY &&
+ sub.change_descriptor != subsystem_change_t::SUBSYSTEM_ADDED) {
+ dout(4) << "encode BeaconSubsystem: skipping subsystem " << sub.nqn
+ << " with change_descriptor " << (int)sub.change_descriptor
+ << " in legacy mode" << dendl;
+ return; // Skip encoding this subsystem entirely
+ }
+
+ ENCODE_START(version, version, bl);
encode(sub.nqn, bl);
+ dout(20) << "encode BeaconSubsystems " << sub.nqn <<" features " << features
+ << " version " << (int)version << dendl;
encode((uint32_t)sub.listeners.size(), bl);
for (const auto& ls: sub.listeners)
encode(ls, bl);
encode((uint32_t)sub.namespaces.size(), bl);
for (const auto& ns: sub.namespaces)
encode(ns, bl);
+ if (version >= BEACON_VERSION_ENHANCED) {
+ encode((uint32_t)sub.change_descriptor, bl);
+ dout(20) << "encode BeaconSubsystems change-descr: " << (uint32_t)sub.change_descriptor << dendl;
+ }
ENCODE_FINISH(bl);
}
inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) {
- DECODE_START(1, bl);
- dout(20) << "decode BeaconSubsystems " << dendl;
+ DECODE_START(BEACON_VERSION_ENHANCED, bl); // Always decode with enhanced version support
decode(sub.nqn, bl);
+ dout(20) << "decode BeaconSubsystems " << sub.nqn << dendl;
uint32_t s;
sub.listeners.clear();
decode(s, bl);
decode(ns, bl);
sub.namespaces.push_back(ns);
}
+ if (struct_v >= BEACON_VERSION_ENHANCED) {
+ uint32_t change_desc;
+ decode(change_desc, bl);
+ sub.change_descriptor = static_cast<subsystem_change_t>(change_desc);
+ dout(20) << "decode BeaconSubsystems version >= " << BEACON_VERSION_ENHANCED << dendl;
+ }
DECODE_FINISH(bl);
}
#include <iomanip>
#include <map>
#include <iostream>
+#include <list>
+#include <vector>
+#include <cstdint>
+#include <chrono>
+#include "include/types.h"
+#include "msg/msg_types.h"
using NvmeGwId = std::string;
using NvmeGroupKey = std::pair<std::string, std::string>;
GW_DELETED
};
+enum class subsystem_change_t {
+ SUBSYSTEM_ADDED,
+ SUBSYSTEM_CHANGED,
+ SUBSYSTEM_DELETED
+};
+
#define REDUNDANT_GW_ANA_GROUP_ID 0xFF
using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>;
NvmeNqnId nqn;
std::list<BeaconListener> listeners;
std::list<BeaconNamespace> namespaces;
+ subsystem_change_t change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
// Define the equality operator
bool operator==(const BeaconSubsystem& other) const {
BlocklistData blocklist_data;
//ceph entity address allocated for the GW-client that represents this GW-id
entity_addrvec_t addr_vect;
- uint16_t beacon_index = 0;
+ uint64_t beacon_sequence = 0;// sequence number of last beacon copied to GW state
+ bool beacon_sequence_ooo = false; // last beacon sequence was out of order;
+ uint16_t beacon_index = 0; // used for filter acks sent to the client as response to beacon
/**
* during redeploy action and maybe other emergency use-cases gw performs scenario
* that we call fast-reboot. It quickly reboots(due to redeploy f.e) and sends the
// it expects it performed the full startup
performed_full_startup = false;
}
+ void reset_beacon_sequence(){
+ beacon_sequence = 0;
+ }
void standby_state(NvmeAnaGrpId grpid) {
sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE;
}
epoch_t gw_map_epoch;
GwSubsystems subsystems;
gw_availability_t availability;
- NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available)
- : group_id(id), gw_map_epoch(epoch), availability(available) {}
+ uint64_t last_beacon_seq_number;
+ bool last_beacon_seq_ooo; //out of order sequence
+ NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available,
+ uint64_t sequence, bool sequence_ooo)
+ : group_id(id), gw_map_epoch(epoch), availability(available),
+ last_beacon_seq_number(sequence), last_beacon_seq_ooo(sequence_ooo) {}
NvmeGwClientState()
: NvmeGwClientState(
- REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}
+ REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE, 0, 0) {}
};
struct Tmdata {
case MSG_MNVMEOF_GW_BEACON:
m = make_message<MNVMeofGwBeacon>();
- break;
+ break;
case MSG_MON_MGR_REPORT:
m = make_message<MMonMgrReport>();
*/
#include <boost/algorithm/string/replace.hpp>
+#include <fmt/format.h>
#include "common/errno.h"
#include "common/signal.h"
#include "include/compat.h"
#include "include/stringify.h"
+#include "include/ceph_features.h"
#include "global/global_context.h"
#include "global/signal_handler.h"
#include "NVMeofGwMonitorClient.h"
#include "NVMeofGwClient.h"
#include "NVMeofGwMonitorGroupClient.h"
+#include "nvmeof/NVMeofGwUtils.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mon
last_map_time(std::chrono::steady_clock::now()),
reset_timestamp(std::chrono::steady_clock::now()),
start_time(last_map_time),
+ cluster_features(0),
+ poolctx(),
monc{g_ceph_context, poolctx},
client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())),
objecter{g_ceph_context, client_messenger.get(), &monc, poolctx},
{
ceph_assert(ceph_mutex_is_locked_by_me(beacon_lock));
gw_availability_t gw_availability = gw_availability_t::GW_CREATED;
- BeaconSubsystems subs;
+ BeaconSubsystems current_subsystems;
NVMeofGwClient gw_client(
grpc::CreateChannel(gateway_address, gw_creds()));
subsystems_info gw_subsystems;
BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() };
bsub.listeners.push_back(bls);
}
- subs.push_back(bsub);
+ current_subsystems.push_back(bsub);
}
}
+ // Determine change descriptors by comparing with previous beacon's subsystems
+ BeaconSubsystems subs = current_subsystems;
+ determine_subsystem_changes(prev_beacon_subsystems, subs);
+
auto group_key = std::make_pair(pool, group);
NvmeGwClientState old_gw_state;
// if already got gateway state in the map
if (first_beacon == false && get_gw_state("old map", map, group_key, name, old_gw_state))
gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE;
- dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
+ dout(1) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
" osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl;
+
+ // Check if NVMEOF_BEACON_DIFF feature is supported by the cluster
+ bool include_diff = HAVE_FEATURE(cluster_features, NVMEOF_BEACON_DIFF);
+
+ dout(10) << fmt::format("Cluster features: 0x{:x}, NVMEOF_BEACON_DIFF supported: {}",
+ cluster_features, include_diff ? "yes" : "no") << dendl;
+
+ // Send beacon with appropriate version based on cluster features
auto m = ceph::make_message<MNVMeofGwBeacon>(
name,
pool,
group,
- subs,
+ include_diff ? subs : current_subsystems,
gw_availability,
osdmap_epoch,
- gwmap_epoch);
+ gwmap_epoch,
+ beacon_sequence,
+ include_diff // Pass the feature flag directly to constructor
+ );
+ dout(10) << "sending beacon with diff support: " << (include_diff ? "enabled" : "disabled") << dendl;
+
monc.send_mon_message(std::move(m));
+ ++beacon_sequence;
+ prev_beacon_subsystems = std::move(current_subsystems);
}
void NVMeofGwMonitorClient::disconnect_panic()
// ensure that the gateway state has not vanished
ceph_assert(got_new_gw_state || !got_old_gw_state);
+ // Check if the last_beacon_seq_number in the received map doesn't match our last sent beacon_sequence
+ // beacon_sequence is incremented after sending, so we compare with (beacon_sequence - 1)
+ {
+ std::lock_guard bl(beacon_lock);
+ if (got_new_gw_state && new_gw_state.last_beacon_seq_ooo) {
+ //new_gw_state.last_beacon_seq_number != (beacon_sequence - 1)) {
+ dout(4) << "Beacon sequence mismatch detected. Expected: " << (beacon_sequence - 1)
+ << ", received: " << new_gw_state.last_beacon_seq_number
+ << ". Truncating previous subsystems list." << dendl;
+ beacon_sequence = new_gw_state.last_beacon_seq_number + 1;
+ prev_beacon_subsystems.clear();
+ dout(4) << "OOO map received, Ignore it" << dendl;
+ return;
+ }
+ }
+
if (!got_old_gw_state) {
if (!got_new_gw_state) {
dout(10) << "Can not find new gw state" << dendl;
}
}
+ // Combined subsystems
+ const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
+ GwSubsystems combined_subsystems = new_gw_state.subsystems;
+ for (const auto& nqn_state_pair: old_gw_state.subsystems) {
+ const auto& nqn = nqn_state_pair.first;
+ auto& old_nqn_state = nqn_state_pair.second;
+
+ // The monitor might remove active subsystems from the new distributed GwSubsystems.
+ // In such cases, ensure an INACCESSIBLE state is generated for subsystems
+ // that were present in the old state but are now missing.
+ if (new_gw_state.subsystems.find(nqn) == new_gw_state.subsystems.end()) {
+ ana_state_t all_disabled(old_nqn_state.ana_state.size(), initial_ana_state);
+ dout(4) << "set all groups to Inacccessible stat for " << nqn << dendl;
+ NqnState nqn_state(nqn, all_disabled);
+
+ combined_subsystems.insert({nqn, nqn_state});
+ }
+ }
+
// Gather all state changes
ana_info ai;
epoch_t max_blocklist_epoch = 0;
- for (const auto& nqn_state_pair: new_gw_state.subsystems) {
+ for (const auto& nqn_state_pair: combined_subsystems) {
auto& sub = nqn_state_pair.second;
const auto& nqn = nqn_state_pair.first;
nqn_ana_states nas;
sub.ana_state.size();
for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) {
- const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
auto new_group_state = (ana_grp_index < sub.ana_state.size()) ?
sub.ana_state[ana_grp_index] :
initial_ana_state;
std::lock_guard l(lock);
dout(10) << "got map type " << m->get_type() << dendl;
+ // print connection features for all incoming messages and update cluster features
+ if (m->get_connection()) {
+ uint64_t features = m->get_connection()->get_features();
+ dout(4) << fmt::format("Monitor connection features: 0x{:x}", features) << dendl;
+
+ // Update cluster features with the union of all seen features
+ // This ensures we track the highest level of features supported by the cluster
+ cluster_features |= features;
+ dout(10) << fmt::format("Updated cluster features: 0x{:x}", cluster_features) << dendl;
+ }
+
if (m->get_type() == MSG_MNVMEOF_GW_MAP) {
handle_nvmeof_gw_map(ref_cast<MNVMeofGwMap>(m));
return Dispatcher::HANDLED();
bool first_beacon = true;
bool set_group_id = false;
-
+ uint64_t beacon_sequence = 0;
+ BeaconSubsystems prev_beacon_subsystems;
+ uint64_t cluster_features = 0; // track cluster features for beacon encoding
// init gw ssl opts
void init_gw_ssl_opts();
void send_config_beacon();
void send_beacon();
-
+
public:
NVMeofGwMonitorClient(int argc, const char **argv);
~NVMeofGwMonitorClient() override;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+ BeaconSubsystems& new_subsystems) {
+ BeaconSubsystems result;
+
+ // for each subsystem in new_subsystems, check if it's added or changed
+ for (const auto& new_sub : new_subsystems) {
+ auto old_it = std::find_if(old_subsystems.begin(), old_subsystems.end(),
+ [&](const BeaconSubsystem& s) { return s.nqn == new_sub.nqn; });
+ if (old_it == old_subsystems.end()) {
+ // Subsystem not found in old list - it's new
+ BeaconSubsystem added = new_sub;
+ added.change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
+ result.push_back(std::move(added));
+ } else {
+ // subsystem exists - check if it changed
+ if (!(*old_it == new_sub)) {
+ BeaconSubsystem changed = new_sub;
+ changed.change_descriptor = subsystem_change_t::SUBSYSTEM_CHANGED;
+ result.push_back(std::move(changed));
+ }
+ // else: unchanged, do not add
+ }
+ }
+
+ // for any subsystem in old_subsystems not present in new_subsystems, add as deleted
+ for (const auto& old_sub : old_subsystems) {
+ auto found = std::find_if(new_subsystems.begin(), new_subsystems.end(),
+ [&](const BeaconSubsystem& s) { return s.nqn == old_sub.nqn; });
+ if (found == new_subsystems.end()) {
+ BeaconSubsystem deleted_sub = old_sub;
+ deleted_sub.change_descriptor = subsystem_change_t::SUBSYSTEM_DELETED;
+ result.push_back(std::move(deleted_sub));
+ }
+ }
+
+ new_subsystems = std::move(result);
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef __NVMEOFGWUTILS_H__
+#define __NVMEOFGWUTILS_H__
+#include "mon/NVMeofGwTypes.h"
+#include <list>
+
+// utility for diffing nvmeof subsystems changes
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+ BeaconSubsystems& new_subsystems);
+
+#endif
target_link_libraries(unittest_ceph_assert ceph-common global)
endif()
+add_executable(test_nvmeof_gw_utils
+ test_nvmeof_gw_utils.cc
+ ../nvmeof/NVMeofGwUtils.cc
+ )
+target_link_libraries(test_nvmeof_gw_utils
+ mon ceph-common global-static
+ )
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+#include "mon/NVMeofGwTypes.h"
+#include <iostream>
+#include <algorithm>
+#include "include/ceph_assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void test_determine_subsystem_changes() {
+ std::cout << __func__ << "\n\n" << std::endl;
+ // Prepare old and new subsystems
+ BeaconSubsystem sub1_old = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2_old = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub3_old = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems old_subs = { sub1_old, sub2_old, sub3_old };
+
+ // sub1 unchanged, sub2 changed, sub4 added, sub3 deleted
+ BeaconSubsystem sub1_new = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2_new = { "nqn2", { {"IPv4", "1.2.3.4", "4420"} }, {}, subsystem_change_t::SUBSYSTEM_ADDED }; // changed listeners
+ BeaconSubsystem sub4_new = { "nqn4", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems new_subs = { sub1_new, sub2_new, sub4_new };
+
+ determine_subsystem_changes(old_subs, new_subs);
+
+ // After call, new_subs should only contain changed, added, and deleted subsystems
+ // sub1 (unchanged) should be removed
+ // sub2 (changed) should be present with SUBSYSTEM_CHANGED
+ // sub4 (added) should be present with SUBSYSTEM_ADDED
+ // sub3 (deleted) should be present with SUBSYSTEM_DELETED
+ bool found_sub2 = false, found_sub3 = false, found_sub4 = false;
+ for (const auto& s : new_subs) {
+ if (s.nqn == "nqn2") {
+ found_sub2 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+ } else if (s.nqn == "nqn3") {
+ found_sub3 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED);
+ } else if (s.nqn == "nqn4") {
+ found_sub4 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ } else {
+ ceph_assert(false && "Unexpected subsystem in result");
+ }
+ }
+ ceph_assert(found_sub2 && found_sub3 && found_sub4);
+ std::cout << "determine_subsystem_changes test passed" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+ test_determine_subsystem_changes();
+ return 0;
+}
std::string pool = "pool1";
std::string group = "grp1";
auto group_key = std::make_pair(pool, group);
- pending_map.cfg_add_gw("GW1" ,group_key);
- pending_map.cfg_add_gw("GW2" ,group_key);
- pending_map.cfg_add_gw("GW3" ,group_key);
+ std::string nqn = "nqn-nqn";
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+ BeaconSubsystems subs = {sub};
+
+ pending_map.cfg_add_gw("GW1" ,group_key, true);
+ pending_map.cfg_add_gw("GW2" ,group_key, true);
+ pending_map.cfg_add_gw("GW3" ,group_key, true);
NvmeNonceVector new_nonces = {"abc", "def","hij"};
pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
pending_map.created_gws[group_key]["GW1"].performed_full_startup = true;
+ pending_map.created_gws[group_key]["GW1"].subsystems = subs;
int i = 0;
for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
blklst_itr.second.osd_epoch = 2*(i++);
dout(0) << pending_map << dendl;
ceph::buffer::list bl;
+ dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
+
pending_map.encode(bl, CEPH_FEATURES_ALL);
auto p = bl.cbegin();
pending_map.decode(p);
dout(0) << " == Dump map after Decode: == " <<dendl;
dout(0) << pending_map << dendl;
+ dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
}
void test_MNVMeofGwMap() {
+ //test message to the Mon Client
dout(0) << __func__ << "\n\n" << dendl;
std::map<NvmeGroupKey, NvmeGwMonClientStates> map;
std::string pool = "pool1";
std::string group = "grp1";
std::string gw_id = "GW1";
- NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE);
+ NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE, 0, false);
std::string nqn = "nqn";
ana_state_t ana_state;
NqnState nqn_state(nqn, ana_state);
encode(map, bl, CEPH_FEATURES_ALL);
dout(0) << "encoded: " << map << dendl;
decode(map, bl);
- dout(0) << "decode: " << map << dendl;
+ dout(0) << "decoded: " << map << dendl;
- BeaconSubsystem sub = { nqn, {}, {} };
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
NVMeofGwMap pending_map;
pending_map.epoch = 2;
auto msg1 = make_message<MNVMeofGwMap>(pending_map);
int epoch = msg1->get_gwmap_epoch();
dout(0) << "after decode empty msg: " << *msg1 << " epoch " << epoch << dendl;
- pending_map.cfg_add_gw("GW1" ,group_key);
- pending_map.cfg_add_gw("GW2" ,group_key);
- pending_map.cfg_add_gw("GW3" ,group_key);
+ pending_map.cfg_add_gw("GW1" ,group_key, true);
+ pending_map.cfg_add_gw("GW2" ,group_key, true);
+ pending_map.cfg_add_gw("GW3" ,group_key, true);
NvmeNonceVector new_nonces = {"abc", "def","hij"};
pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub);
+
int i = 0;
for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
blklst_itr.second.osd_epoch = 2*(i++);
dout(0) << "after encode msg: " << *msg << dendl;
msg->decode_payload();
dout(0) << "after decode msg: " << *msg << dendl;
-
+
//dout(0) << "\n == Test GW Delete ==" << dendl;
//pending_map.cfg_delete_gw("GW1" ,group_key);
//dout(0) << "deleted GW1 " << pending_map << dendl;
//dout(0) << "deleted GW2 " << pending_map << dendl;
//dout(0) << "delete of wrong gw id" << dendl;
- //pending_map.cfg_delete_gw("wow" ,group_key);
+ //pending_map.cfg_delete_gw("wow" ,group_key, true);
- pending_map.cfg_delete_gw("GW3" ,group_key);
- dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
+ //pending_map.cfg_delete_gw("GW3" ,group_key);
+ //dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
}
std::string gw_group = "group";
gw_availability_t availability = gw_availability_t::GW_AVAILABLE;
std::string nqn = "nqn";
- BeaconSubsystem sub = { nqn, {}, {} };
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
BeaconSubsystems subs = { sub };
epoch_t osd_epoch = 17;
epoch_t gwmap_epoch = 42;
-
+ uint64_t sequence = 12345;
+
+ // Test legacy beacon (without diff support)
auto msg = make_message<MNVMeofGwBeacon>(
gw_id,
gw_pool,
subs,
availability,
osd_epoch,
- gwmap_epoch);
- msg->encode_payload(0);
+ gwmap_epoch
+ // sequence defaults to 0
+ // enable_diff defaults to false
+ );
+ msg->encode_payload(CEPH_FEATURES_ALL);
msg->decode_payload();
- dout(0) << "decode msg: " << *msg << dendl;
+ dout(0) << "decode msg (revision 1): " << *msg << dendl;
ceph_assert(msg->get_gw_id() == gw_id);
ceph_assert(msg->get_gw_pool() == gw_pool);
ceph_assert(msg->get_gw_group() == gw_group);
ceph_assert(msg->get_availability() == availability);
ceph_assert(msg->get_last_osd_epoch() == osd_epoch);
ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch);
+ // Legacy beacons don't preserve sequence field - it gets reset to 0
+ ceph_assert(msg->get_sequence() == 0);
const auto& dsubs = msg->get_subsystems();
auto it = std::find_if(dsubs.begin(), dsubs.end(),
[&nqn](const auto& element) {
return element.nqn == nqn;
});
ceph_assert(it != dsubs.end());
+ ceph_assert(it->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+
+ // Test enhanced beacon (with diff support)
+ auto msg2 = make_message<MNVMeofGwBeacon>(
+ gw_id,
+ gw_pool,
+ gw_group,
+ subs,
+ availability,
+ osd_epoch,
+ gwmap_epoch,
+ sequence,
+ true // enable_diff = true
+ );
+ msg2->encode_payload(CEPH_FEATURES_ALL);
+ msg2->decode_payload();
+ dout(0) << "decode msg (revision 2): " << *msg2 << dendl;
+ ceph_assert(msg2->get_gw_id() == gw_id);
+ ceph_assert(msg2->get_gw_pool() == gw_pool);
+ ceph_assert(msg2->get_gw_group() == gw_group);
+ ceph_assert(msg2->get_availability() == availability);
+ ceph_assert(msg2->get_last_osd_epoch() == osd_epoch);
+ ceph_assert(msg2->get_last_gwmap_epoch() == gwmap_epoch);
+ ceph_assert(msg2->get_sequence() == sequence);
+ const auto& dsubs2 = msg2->get_subsystems();
+ auto it2 = std::find_if(dsubs2.begin(), dsubs2.end(),
+ [&nqn](const auto& element) {
+ return element.nqn == nqn;
+ });
+ ceph_assert(it2 != dsubs2.end());
+ ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+}
+
+void test_subsystem_change_descriptors() {
+ dout(0) << __func__ << "\n\n" << dendl;
+ // Test different change descriptors
+ BeaconSubsystem sub1 = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2 = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+ BeaconSubsystem sub3 = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems subs = { sub1, sub2, sub3 };
+ // Encode and decode
+ ceph::buffer::list bl;
+ encode(subs, bl, CEPH_FEATURES_ALL);
+ auto p = bl.cbegin();
+ BeaconSubsystems decoded_subs;
+ decode(decoded_subs, p);
+ // Verify change descriptors are preserved
+ auto it1 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn1"; });
+ auto it2 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn2"; });
+ auto it3 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn3"; });
+ ceph_assert(it1 != decoded_subs.end());
+ ceph_assert(it2 != decoded_subs.end());
+ ceph_assert(it3 != decoded_subs.end());
+ ceph_assert(it1->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+ ceph_assert(it3->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ dout(0) << "Subsystem change descriptors test passed" << dendl;
}
void test_NVMeofGwTimers()
test_NVMeofGwMap();
test_MNVMeofGwMap();
test_MNVMeofGwBeacon();
+ test_subsystem_change_descriptors();
test_NVMeofGwTimers();
}