${nvmeof_monitor_grpc_hdrs}
ceph_nvmeof_monitor_client.cc
nvmeof/NVMeofGwClient.cc
+ nvmeof/NVMeofGwUtils.cc
nvmeof/NVMeofGwMonitorGroupClient.cc
nvmeof/NVMeofGwMonitorClient.cc)
add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs})
DEFINE_CEPH_FEATURE(49, 2, SERVER_SQUID);
DEFINE_CEPH_FEATURE_RETIRED(50, 1, MON_METADATA, MIMIC, OCTOPUS)
DEFINE_CEPH_FEATURE(50, 2, SERVER_TENTACLE);
-DEFINE_CEPH_FEATURE_RETIRED(51, 1, OSD_BITWISE_HOBJ_SORT, MIMIC, OCTOPUS)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACON_DIFF)
// available
DEFINE_CEPH_FEATURE_RETIRED(52, 1, OSD_PROXY_WRITE_FEATURES, MIMIC, OCTOPUS)
// available
CEPH_FEATUREMASK_SERVER_REEF | \
CEPH_FEATUREMASK_SERVER_SQUID | \
CEPH_FEATUREMASK_SERVER_TENTACLE | \
+ CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF | \
0ULL)
#define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL
#include "mon/MonCommand.h"
#include "mon/NVMeofGwMap.h"
#include "include/types.h"
+#include "mon/NVMeofGwBeaconConstants.h"
class MNVMeofGwBeacon final : public PaxosServiceMessage {
-private:
- static constexpr int HEAD_VERSION = 1;
- static constexpr int COMPAT_VERSION = 1;
protected:
std::string gw_id;
gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable
epoch_t last_osd_epoch;
epoch_t last_gwmap_epoch;
+ uint64_t sequence = 0; // sequence number for each beacon message
+ uint64_t affected_features = 0;
public:
MNVMeofGwBeacon()
- : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}
+ : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, BEACON_VERSION_ENHANCED,
+ BEACON_VERSION_ENHANCED}, sequence(0)
{
set_priority(CEPH_MSG_PRIO_HIGH);
}
const BeaconSubsystems& subsystems_,
const gw_availability_t& availability_,
const epoch_t& last_osd_epoch_,
- const epoch_t& last_gwmap_epoch_
- )
- : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
+ const epoch_t& last_gwmap_epoch_,
+ uint64_t sequence_ = 0, // default sequence for backward compatibility
+ uint64_t features = 0)
+ : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON,
+ 0,
+ features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY,
+ features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY},
gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_),
- availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_)
+ availability(availability_), last_osd_epoch(last_osd_epoch_),
+ last_gwmap_epoch(last_gwmap_epoch_), sequence(sequence_),
+ affected_features(features)
{
set_priority(CEPH_MSG_PRIO_HIGH);
}
const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; }
const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; }
const BeaconSubsystems& get_subsystems() const { return subsystems; };
+ uint64_t get_sequence() const { return sequence; }
private:
~MNVMeofGwBeacon() final {}
encode(gw_id, payload);
encode(gw_pool, payload);
encode(gw_group, payload);
- encode(subsystems, payload);
+ encode(subsystems, payload, affected_features);
encode((uint32_t)availability, payload);
encode(last_osd_epoch, payload);
encode(last_gwmap_epoch, payload);
+ // Only encode sequence for enhanced beacons (version >= 2)
+ if (get_header().version >= 2) {
+ encode(sequence, payload);
+ }
}
void decode_payload() override {
availability = static_cast<gw_availability_t>(tmp);
decode(last_osd_epoch, p);
decode(last_gwmap_epoch, p);
+ // Only decode sequence for enhanced beacons (version >= 2)
+ if (get_header().version >= 2 && !p.end()) {
+ decode(sequence, p);
+ } else {
+ sequence = 0; // Legacy beacons don't have sequence field
+ }
}
private:
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef CEPH_NVMEOFGWBEACONCONSTANTS_H
+#define CEPH_NVMEOFGWBEACONCONSTANTS_H
+
+// This header contains version constants used across multiple files
+// to avoid duplication and maintain consistency.
+
+// Beacon version constants
+#define BEACON_VERSION_LEGACY 1 // Legacy beacon format (no diff support)
+#define BEACON_VERSION_ENHANCED 2 // Enhanced beacon format (with diff support)
+
+#endif /* CEPH_NVMEOFGWBEACONCONSTANTS_H */
}
auto gw_state = NvmeGwClientState(
- gw_created.ana_grp_id, epoch, availability);
+ gw_created.ana_grp_id, epoch, availability, gw_created.beacon_sequence,
+ gw_created.beacon_sequence_ooo);
for (const auto& sub: gw_created.subsystems) {
gw_state.subsystems.insert({
sub.nqn,
}
int NVMeofGwMap::cfg_add_gw(
- const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+ const NvmeGwId &gw_id, const NvmeGroupKey& group_key, uint64_t features)
{
std::set<NvmeAnaGrpId> allocated;
- if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+ if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
auto gw_epoch_it = gw_epoch.find(group_key);
if (gw_epoch_it == gw_epoch.end()) {
gw_epoch[group_key] = epoch;
for (auto& gws_states: created_gws[group_key]) {
if (gws_states.first == gw_id) {
auto& state = gws_states.second;
+ if (state.availability == gw_availability_t::GW_AVAILABLE) {
+ /*prevent failover because blocklisting right now cause IO errors */
+ dout(4) << "Delete GW: set skip-failovers for group " << gw_id
+ << " group " << group_key << dendl;
+ skip_failovers_for_group(group_key, 5);
+ }
state.availability = gw_availability_t::GW_DELETING;
dout(4) << " Deleting GW :"<< gw_id << " in state "
<< state.availability << " Resulting GW availability: "
<< state.availability << dendl;
- state.subsystems.clear();//ignore subsystems of this GW
utime_t now = ceph_clock_now();
mon->nvmegwmon()->gws_deleting_time[group_key][gw_id] = now;
return 0;
}
}
-void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key)
+void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key,
+ int interval_sec)
{
- const auto skip_failovers = g_conf().get_val<std::chrono::seconds>
- ("mon_nvmeofgw_skip_failovers_interval");
+ std::chrono::seconds skip_failovers;
+ if (interval_sec == 0) {
+ skip_failovers = g_conf().get_val<std::chrono::seconds>
+ ("mon_nvmeofgw_skip_failovers_interval");
+ } else {
+ skip_failovers = std::chrono::seconds(interval_sec);
+ }
for (auto& gw_created: created_gws[group_key]) {
gw_created.second.allow_failovers_ts = std::chrono::system_clock::now()
+ skip_failovers;
auto& st = gw_state->second;
st.set_unavailable_state();
st.set_last_gw_down_ts();
+ st.reset_beacon_sequence();
for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
fsm_handle_gw_down(
gw_id, group_key, state_itr.second,
// find_already_created_gw(gw_id, group_key);
NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
NvmeNonceVector nonces;
+
+ NvmeAnaNonceMap nonce_map;
+ for (const auto& sub: gw_map.subsystems) { // recreate nonce map from subsystems
+ for (const auto& ns: sub.namespaces) {
+ auto& nonce_vec = nonce_map[ns.anagrpid-1]; //Converting ana groups to offsets
+ if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end())
+ nonce_vec.push_back(ns.nonce);
+ }
+ }
for (auto& state_itr: gw_map.sm_state) {
// to make blocklist on all clusters of the failing GW
- nonces.insert(nonces.end(), gw_map.nonce_map[state_itr.first].begin(),
- gw_map.nonce_map[state_itr.first].end());
+ nonces.insert(nonces.end(), nonce_map[state_itr.first].begin(),
+ nonce_map[state_itr.first].end());
}
-
+ gw_map.subsystems.clear();
if (nonces.size() > 0) {
- NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];;
+ NvmeNonceVector &nonce_vector = nonces;
std::string str = "[";
entity_addrvec_t addr_vect;
}
}
+bool NVMeofGwMap::put_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+ int gw_version, const NvmeGroupKey& group_key,
+ uint64_t beacon_sequence, uint64_t& old_sequence)
+{
+ bool rc = true;
+ NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+ (gw_version > 0) ) {
+ uint64_t seq_number = gw_map.beacon_sequence;
+ if ((beacon_sequence != seq_number+1) &&
+ !(beacon_sequence == 0 && seq_number == 0 )) {// new GW startup
+ rc = false;
+ old_sequence = seq_number;
+ dout(4) << "Warning: GW " << gw_id
+ << " sent beacon sequence out of order, expected "
+ << seq_number +1 << " received " << beacon_sequence << dendl;
+ gw_map.beacon_sequence_ooo = true;
+ } else {
+ gw_map.beacon_sequence = beacon_sequence;
+ }
+ }
+ return rc;
+}
+
+bool NVMeofGwMap::set_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+ int gw_version, const NvmeGroupKey& group_key, uint64_t beacon_sequence)
+{
+ NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+ (gw_version > 0)) {
+ gw_map.beacon_sequence = beacon_sequence;
+ gw_map.beacon_sequence_ooo = false;
+ dout(10) << gw_id << " set beacon_sequence " << beacon_sequence << dendl;
+ }
+ return true;
+}
+
+
void NVMeofGwMap::update_active_timers(bool &propose_pending)
{
const auto now = std::chrono::system_clock::now();
void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
void track_deleting_gws(const NvmeGroupKey& group_key,
const BeaconSubsystems& subs, bool &propose_pending);
- int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
- int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
void check_all_gws_in_deleting_state(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key);
+ int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+ uint64_t features);
+ int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
void process_gw_map_ka(
const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
epoch_t& last_osd_epoch, bool &propose_pending);
const NvmeGroupKey& group_key, bool &propose_pending);
void set_addr_vect(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, const entity_addr_t &addr_vect);
- void skip_failovers_for_group(const NvmeGroupKey& group_key);
+ void skip_failovers_for_group(const NvmeGroupKey& group_key,
+ int interval_sec = 0);
+ bool put_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, uint64_t beacon_sequence,
+ uint64_t& old_sequence);
+ bool set_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, uint64_t beacon_sequence);
private:
int do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
int do_erase_gw_id(const NvmeGwId &gw_id,
// force send ack after nearest beacon after leader re-election
gw_created_pair.second.beacon_index =
g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+ // force send full beacon after leader election
+ gw_created_pair.second.beacon_sequence = 0;
+ gw_created_pair.second.beacon_sequence_ooo = true;
}
}
}
for (auto &[group_key, gws_states]: pending_map.created_gws) {
BeaconSubsystems *subsystems = &empty_subsystems;
for (auto& gw_state : gws_states) { // loop for GWs inside nqn group
- subsystems = &gw_state.second.subsystems;
+ if (gw_state.second.availability == gw_availability_t::GW_AVAILABLE) {
+ subsystems = &gw_state.second.subsystems;
+ }
if (subsystems->size()) { // Set subsystems to the valid value
break;
}
* function called to restore in pending map all data that is not serialized
* to paxos peons. Othervise it would be overriden in "pending_map = map"
* currently "allow_failovers_ts", "last_gw_down_ts",
- * "last_gw_map_epoch_valid" variables are restored
+ * "last_gw_map_epoch_valid", "beacon_sequence", "beacon_index" variables are restored
*/
void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
gw_created_pair.second.last_gw_down_ts;
pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
gw_created_pair.second.last_gw_map_epoch_valid;
+ pending_map.created_gws[group_key][gw_id].beacon_index =
+ gw_created_pair.second.beacon_index;
+ pending_map.created_gws[group_key][gw_id].beacon_sequence =
+ gw_created_pair.second.beacon_sequence;
+ pending_map.created_gws[group_key][gw_id].beacon_sequence_ooo =
+ gw_created_pair.second.beacon_sequence_ooo;
}
}
}
}
pending_map.encode(bl, features);
dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
- << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
+ << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP)
+ << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) << dendl;
put_version(t, pending_map.epoch, bl);
put_last_committed(t, pending_map.epoch);
void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)
{
version_t version = get_last_committed();
-
+ uint64_t features = mon.get_quorum_con_features();
+ dout(20) << " has BEACON_DIFF: " << HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)
+ << dendl;
if (version != map.epoch) {
dout(10) << " NVMeGW loading version " << version
<< " " << map.epoch << dendl;
auto group_key = std::make_pair(pool, group);
dout(10) << " id "<< id <<" pool "<< pool << " group "<< group << dendl;
if (prefix == "nvme-gw create") {
- rc = pending_map.cfg_add_gw(id, group_key);
+ rc = pending_map.cfg_add_gw(id, group_key,
+ mon.get_quorum_con_features());
if (rc == -EINVAL) {
err = rc;
dout (4) << "Error: GW cannot be created " << id
return rc;
}
+void NVMeofGwMon::do_send_map_ack(MonOpRequestRef op,
+ bool gw_created, bool gw_propose,
+ uint64_t stored_sequence, bool is_correct_sequence,
+ const NvmeGroupKey& group_key, const NvmeGwId &gw_id) {
+ /* always send beacon ack to gw in Created state,
+ * it should be temporary state
+ * if epoch-filter-bit: send ack to beacon in case no propose
+ * or if changed something not relevant to gw-epoch
+ */
+ NVMeofGwMap ack_map;
+ if (gw_created) {
+ NvmeGwMonState& pending_gw_map = pending_map.created_gws[group_key][gw_id];
+ // respond with a map slice correspondent to the same GW
+ ack_map.created_gws[group_key][gw_id] = (gw_propose) ? //avail = CREATED
+ pending_gw_map : map.created_gws[group_key][gw_id];
+ ack_map.created_gws[group_key][gw_id].beacon_sequence =
+ pending_gw_map.beacon_sequence;
+ if (!is_correct_sequence) {
+ dout(4) << " GW " << gw_id <<
+ " sending ACK due to receiving beacon_sequence out of order" << dendl;
+ ack_map.created_gws[group_key][gw_id].beacon_sequence = stored_sequence;
+ ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = true;
+ } else {
+ ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = false;
+ }
+ if (gw_propose) {
+ dout(10) << "GW in Created " << gw_id << " ack map " << ack_map << dendl;
+ }
+ }
+ ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+ if (!gw_created)
+ dout(10) << "gw not created, ack map "
+ << ack_map << " epoch " << ack_map.epoch << dendl;
+ dout(20) << "ack_map " << ack_map <<dendl;
+ auto msg = make_message<MNVMeofGwMap>(ack_map);
+ mon.send_reply(op, msg.detach());
+}
+
+/*
+ * Any subsystem was added to the Beacon only if it( or it's encapsulated fields)
+ * was changed, added or deleted. If nothing changed the beacon did not
+ * encode any subsystem.
+ * New descriptor was added under the subsystem to describe
+ * the change : ADDED, DELETED, CHANGED
+ * rules for apply the subsystems to the map:
+ * Pass all subsystems in the beacon->sub list
+ * if descriptor is ADDED or CHANGED do the following
+ * look for subs nqn in the gw.subs list and if found - substitute,
+ * if not found - add
+ * if descriptor is DELETED do the following
+ * look for subs nqn in the gw.subs list and if found - delete
+ */
+int NVMeofGwMon::apply_beacon(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, void *msg,
+ const BeaconSubsystems& sub, gw_availability_t &avail,
+ bool &propose_pending)
+{
+ bool found = false;
+ bool changed = false;
+ BeaconSubsystems &gw_subs =
+ pending_map.created_gws[group_key][gw_id].subsystems;
+ auto &state = pending_map.created_gws[group_key][gw_id];
+
+ if (!HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOF_BEACON_DIFF) &&
+ gw_version == 0) {
+ if (gw_subs != sub) {
+ dout(10) << "BEACON_DIFF logic not applied."
+ "subsystems of GW changed, propose pending " << gw_id << dendl;
+ gw_subs = sub;
+ //rebuild nonce map.
+ state.nonce_map = ((MNVMeofGwBeacon *)msg)->get_nonce_map();
+ changed = true;
+ }
+ } else {
+ if (state.beacon_sequence_ooo) {
+ dout(10) << "Good sequence after out of order detection "
+ "sequence "<< state.beacon_sequence << " " << gw_id << dendl;
+ // need to clear subsystems for correct calculation of difference
+ state.subsystems.clear();
+ state.beacon_sequence_ooo = false;
+ propose_pending = true;
+ }
+
+ for (auto &subs_it: sub) {
+ if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED ||
+ subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED) {
+ found = false;
+ for (auto &gw_subs_it: gw_subs) {
+ if (gw_subs_it.nqn == subs_it.nqn) {
+ gw_subs_it = subs_it;
+ dout(10) << "subsystem changed " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ found = true;
+ changed = true;
+ break;
+ }
+ }
+ if (!found) {
+ gw_subs.push_back(subs_it);
+ changed = true;
+ dout(10) << "subsystem added " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ }
+ }
+ else
+ if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED)
+ {
+ auto it = std::find_if(gw_subs.begin(), gw_subs.end(),
+ [&](const auto& gw_subs_it) {
+ return gw_subs_it.nqn == subs_it.nqn;
+ });
+ if (it != gw_subs.end()) {
+ gw_subs.erase(it);
+ dout(10) << "subsystem deleted " << subs_it.nqn << " change descr "
+ << (uint32_t)subs_it.change_descriptor << dendl;
+ changed = true;
+ }
+ }
+ }
+ }
+ if (changed) {
+ avail = gw_availability_t::GW_AVAILABLE;
+ }
+ if (gw_subs.size() == 0) {
+ avail = gw_availability_t::GW_CREATED;
+ dout(10) << "No-subsystems condition detected for GW " << gw_id <<dendl;
+ } else {
+ bool listener_found = false;
+ for (auto &subs: gw_subs) {
+ if (subs.listeners.size()) {
+ listener_found = true;
+ break;
+ }
+ }
+ if (!listener_found) {
+ dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
+ avail = gw_availability_t::GW_CREATED;
+ }
+ }// for HA no-subsystems and no-listeners are same usecases
+ if (avail == gw_availability_t::GW_UNAVAILABLE) {
+ dout(4) << "Warning: UNAVAILABLE gw " << gw_id << dendl;
+ }
+ return (changed == true ? 1:0);
+}
+
bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
{
auto m = op->get_req<MNVMeofGwBeacon>();
-
- dout(20) << "availability " << m->get_availability()
+ uint64_t sequence = m->get_sequence();
+ int version = m->version;
+ dout(10) << "availability " << m->get_availability()
+ << " sequence " << sequence << " version " << version
<< " GW : " << m->get_gw_id()
<< " osdmap_epoch " << m->get_last_osd_epoch()
- << " subsystems " << m->get_subsystems() << dendl;
+ << " subsystems " << m->get_subsystems().size() << dendl;
+
ConnectionRef con = op->get_connection();
NvmeGwId gw_id = m->get_gw_id();
NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group());
+ //"avail" variable will be changed inside the function
+ // when it becomes CREATED for several reasons GW's load balance group
+ // is serviced by another GW
gw_availability_t avail = m->get_availability();
bool propose = false;
bool nonce_propose = false;
bool timer_propose = false;
bool gw_propose = false;
bool gw_created = true;
+ bool correct_sequence = true;
+ uint64_t stored_sequence;
NVMeofGwMap ack_map;
bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
NVMEOFHAMAP);
<< map.created_gws << dendl;
goto set_propose;
} else {
+ pending_map.created_gws[group_key][gw_id].subsystems.clear();
+ pending_map.set_gw_beacon_sequence_number(gw_id, version,
+ group_key, sequence);
dout(4) << "GW beacon: Created state - full startup done " << gw_id
<< " GW state in monitor data-base : "
<< pending_map.created_gws[group_key][gw_id].availability
} else { // first GW beacon should come with avail = Created
// if GW reports Avail/Unavail but in monitor's database it is Unavailable
if (gw != group_gws.end()) {
+ correct_sequence = pending_map.put_gw_beacon_sequence_number
+ (gw_id, version, group_key, sequence, stored_sequence);
// it means it did not perform "exit" after failover was set by
// NVMeofGWMon
if ((pending_map.created_gws[group_key][gw_id].availability ==
mon.send_reply(op, msg.detach());
goto false_return;
}
+ if (!correct_sequence) {
+ if (avail == gw_availability_t::GW_AVAILABLE) {
+ /*prevent failover - give GW a chance to send the expected sequence */
+ dout(4) << "sequence ooo: set skip-failovers for group " << gw_id
+ << " group " << group_key << dendl;
+ pending_map.skip_failovers_for_group(group_key, 7);
+ }
+ avail = gw_availability_t::GW_CREATED;
+ // availability would be set to Active and GW receive the full map
+ // when it sends the correct beacon-sequence,
+ goto check_availability;
+ }
}
}
// Beacon from GW in !Created state but it does not appear in the map
pending_map.set_addr_vect(gw_id, group_key, con->get_peer_addr());
gw_propose = true;
}
- // deep copy the whole nonce map of this GW
- if (m->get_nonce_map().size()) {
- if (pending_map.created_gws[group_key][gw_id].nonce_map !=
- m->get_nonce_map()) {
- dout(10) << "nonce map of GW changed , propose pending "
- << gw_id << dendl;
- pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map();
- dout(10) << "nonce map of GW " << gw_id << " "
- << pending_map.created_gws[group_key][gw_id].nonce_map << dendl;
- nonce_propose = true;
- }
- } else {
- dout(10) << "Warning: received empty nonce map in the beacon of GW "
- << gw_id << " avail " << (int)avail << dendl;
- }
-
- if (sub.size() == 0) {
- avail = gw_availability_t::GW_CREATED;
- dout(20) << "No-subsystems condition detected for GW " << gw_id <<dendl;
- } else {
- bool listener_found = false;
- for (auto &subs: sub) {
- if (subs.listeners.size()) {
- listener_found = true;
- break;
- }
- }
- if (!listener_found) {
- dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
- avail = gw_availability_t::GW_CREATED;
- }
- }// for HA no-subsystems and no-listeners are same usecases
- if (pending_map.created_gws[group_key][gw_id].subsystems != sub) {
- dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl;
- pending_map.created_gws[group_key][gw_id].subsystems = sub;
- dout(20) << "subsystems of GW " << gw_id << " "
- << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
+ if (apply_beacon(gw_id, version, group_key, (void *)m, sub, avail, propose) !=0) {
nonce_propose = true;
+ dout(10) << "subsystem(subs/listener/nonce/NM) of GW changed, propose pending "
+ << gw_id << " available " << avail << dendl;
+ dout(20) << "subsystems of GW " << gw_id << " "
+ << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
}
pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
(get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
<< " epoch " << get_ack_map_epoch(true, group_key)
<< " beacon_epoch " << m->get_last_gwmap_epoch() << dendl;
}
+
+check_availability:
if (avail == gw_availability_t::GW_AVAILABLE) {
// check pending_map.epoch vs m->get_version() -
// if different - drop the beacon
-
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = now;
epoch_t last_osd_epoch = m->get_last_osd_epoch();
// Periodic: check active FSM timers
pending_map.update_active_timers(timer_propose);
- set_propose:
+set_propose:
propose |= (timer_propose | gw_propose | nonce_propose);
- apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+ apply_ack_logic = ((avail == gw_availability_t::GW_AVAILABLE)
+ && correct_sequence) ? true : false;
if ( (apply_ack_logic &&
((pending_map.created_gws[group_key][gw_id].beacon_index++
% beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
(!propose && !epoch_filter_enabled) ||
(avail == gw_availability_t::GW_CREATED)) ) {
- /* always send beacon ack to gw in Created state,
- * it should be temporary state
- * if epoch-filter-bit: send ack to beacon in case no propose
- * or if changed something not relevant to gw-epoch
- */
- if (gw_created) {
- // respond with a map slice correspondent to the same GW
- ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
- }
- ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
- if (!gw_created)
- dout(10) << "gw not created, ack map "
- << ack_map << " epoch " << ack_map.epoch << dendl;
- dout(20) << "ack_map " << ack_map <<dendl;
- auto msg = make_message<MNVMeofGwMap>(ack_map);
- mon.send_reply(op, msg.detach());
+ do_send_map_ack(op, gw_created, gw_propose, stored_sequence,
+ correct_sequence, group_key, gw_id);
} else {
mon.no_reply(op);
}
void restore_pending_map_info(NVMeofGwMap & tmp_map);
void cleanup_pending_map();
void get_gw_listeners(ceph::Formatter *f, std::pair<std::string, std::string>& group_key);
+ int apply_beacon(const NvmeGwId &gw_id, int gw_version,
+ const NvmeGroupKey& group_key, void *msg,
+ const BeaconSubsystems& sub, gw_availability_t &avail, bool &propose_pending);
+ void do_send_map_ack(MonOpRequestRef op, bool gw_created, bool gw_propose,
+ uint64_t stored_sequence, bool is_correct_sequence,
+ const NvmeGroupKey& group_key, const NvmeGwId &gw_id);
};
#endif /* MON_NVMEGWMONITOR_H_ */
*/
#ifndef MON_NVMEOFGWSERIALIZE_H_
#define MON_NVMEOFGWSERIALIZE_H_
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mon
#undef dout_prefix
}
inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) {
- os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ ";
+ os << "BeaconSubsystem( nqn:" << value.nqn << " descr "
+ << (uint32_t)value.change_descriptor << ", listeners [ ";
for (const auto& list: value.listeners) os << list << " ";
os << "] namespaces [ ";
for (const auto& ns: value.namespaces) os << ns << " ";
std::ostream& os, const NvmeGwClientState value) {
os << "NvmeGwState { group id: " << value.group_id
<< " gw_map_epoch " << value.gw_map_epoch
- << " availablilty "<< value.availability
+ << " availablilty " << value.availability
+ << " sequence " << value.last_beacon_seq_number
+ << " sequence-ooo " << value.last_beacon_seq_ooo
<< " GwSubsystems: [ ";
for (const auto& sub: value.subsystems) {
os << sub.second << " ";
for (const auto& sub: subsystems) {
encode(sub.second.nqn, bl);
if (version == 1) {
- dout(20) << "encode ana_state vector version1 = " << version << dendl;
+ dout(20) << "encode ana_state vector version1 = " << (int)version << dendl;
/* Version 1 requires exactly 16 entries */
ana_state_t filled(sub.second.ana_state);
filled.resize(
0));
encode(filled, bl);
} else {
- dout(20) << "encode ana_state vector version2 = " << version << dendl;
+ dout(20) << "encode ana_state vector version2 = " << (int)version << dendl;
encode(sub.second.ana_state, bl);
}
}
}
inline void encode(const NvmeGwClientState& state, ceph::bufferlist &bl, uint64_t features) {
- ENCODE_START(1, 1, bl);
+ uint8_t version = 1;
+ if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+ version = 2;
+ }
+ ENCODE_START(version, version, bl);
encode(state.group_id, bl);
encode(state.gw_map_epoch, bl);
encode (state.subsystems, bl, features);
encode((uint32_t)state.availability, bl);
+ if (version >= 2) {
+ encode((uint64_t)state.last_beacon_seq_number, bl);
+ encode(state.last_beacon_seq_ooo, bl);
+ }
ENCODE_FINISH(bl);
}
inline void decode(
NvmeGwClientState& state, ceph::bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(state.group_id, bl);
decode(state.gw_map_epoch, bl);
decode(state.subsystems, bl);
uint32_t avail;
decode(avail, bl);
state.availability = (gw_availability_t)avail;
+ if (struct_v >= 2) {
+ decode(state.last_beacon_seq_number, bl);
+ decode(state.last_beacon_seq_ooo, bl);
+ }
DECODE_FINISH(bl);
}
uint64_t features) {
ENCODE_START(1, 1, bl);
encode((uint32_t)nonce_map.size(), bl);
+ dout(20) << "encode nonce map size " << nonce_map.size() << dendl;
for (auto& ana_group_nonces : nonce_map) {
// ana group id
encode(ana_group_nonces.first, bl);
version = 3;
}
ENCODE_START(version, version, bl);
+ dout(20) << "encode NvmeGwMonStates. struct_v: " << (int)version << dendl;
encode ((uint32_t)gws.size(), bl); // number of gws in the group
for (auto& gw : gws) {
encode(gw.first, bl);// GW_id
encode(gw.second.ana_grp_id, bl); // GW owns this group-id
+ dout(20) << "encode gw-id " << gw.first << dendl;
if (version >= 2) {
encode((uint32_t)gw.second.sm_state.size(), bl);
for (auto &state_it:gw.second.sm_state) {
encode((uint32_t)gw.second.availability, bl);
encode((uint16_t)gw.second.performed_full_startup, bl);
encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
- encode(gw.second.subsystems, bl);
+ dout(20) << "encode availability " << gw.second.availability
+ << " startup " << (int)gw.second.performed_full_startup << dendl;
+ encode(gw.second.subsystems, bl, features);
encode((uint32_t)gw.second.blocklist_data.size(), bl);
for (auto &blklst_itr: gw.second.blocklist_data) {
encode((uint32_t)gw.second.availability, bl);
encode((uint16_t)gw.second.performed_full_startup, bl);
encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
- encode(gw.second.subsystems, bl); // TODO reuse but put features - encode version
+ encode(gw.second.subsystems, bl, features);
Blocklist_data bl_data[MAX_SUPPORTED_ANA_GROUPS];
for (auto &blklst_itr: gw.second.blocklist_data) {
bl_data[blklst_itr.first].osd_epoch = blklst_itr.second.osd_epoch;
dout(20) << "decode addr_vect and beacon_index" << dendl;
gw_created.addr_vect.decode(bl);
decode(gw_created.beacon_index, bl);
+ dout(20) << "decoded beacon_index " << gw_created.beacon_index << dendl;
}
gws[gw_name] = gw_created;
DECODE_FINISH(bl);
}
-inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl) {
- ENCODE_START(1, 1, bl);
+inline void encode(const BeaconSubsystem& sub, ceph::bufferlist &bl, uint64_t features) {
+ uint8_t version = 1;
+ if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+ version = 2;
+ }
+ // For legacy encoding, skip deleted subsystems to maintain compatibility
+ if (version == 1 &&
+ sub.change_descriptor != subsystem_change_t::SUBSYSTEM_ADDED) {
+ dout(4) << "encode BeaconSubsystem: skipping subsystem " << sub.nqn
+ << " with change_descriptor " << (int)sub.change_descriptor
+ << " in legacy mode" << dendl;
+ return; // Skip encoding this subsystem entirely
+ }
+
+ ENCODE_START(version, version, bl);
encode(sub.nqn, bl);
+ dout(20) << "encode BeaconSubsystems " << sub.nqn <<" features " << features
+ << " version " << (int)version << dendl;
encode((uint32_t)sub.listeners.size(), bl);
for (const auto& ls: sub.listeners)
encode(ls, bl);
encode((uint32_t)sub.namespaces.size(), bl);
for (const auto& ns: sub.namespaces)
encode(ns, bl);
+ if (version >= 2) {
+ encode((uint32_t)sub.change_descriptor, bl);
+ dout(20) << "encode BeaconSubsystems change-descr: " << (uint32_t)sub.change_descriptor << dendl;
+ }
ENCODE_FINISH(bl);
}
inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) {
- DECODE_START(1, bl);
- dout(20) << "decode BeaconSubsystems " << dendl;
+ DECODE_START(2, bl);
decode(sub.nqn, bl);
+ dout(20) << "decode BeaconSubsystems " << sub.nqn << dendl;
uint32_t s;
sub.listeners.clear();
decode(s, bl);
decode(ns, bl);
sub.namespaces.push_back(ns);
}
+ if (struct_v >= 2) {
+ uint32_t change_desc;
+ decode(change_desc, bl);
+ sub.change_descriptor = static_cast<subsystem_change_t>(change_desc);
+ dout(20) << "decode BeaconSubsystems version >= " << 2 << dendl;
+ }
DECODE_FINISH(bl);
}
#include <iomanip>
#include <map>
#include <iostream>
+#include <list>
+#include <vector>
+#include <cstdint>
+#include <chrono>
+#include "include/types.h"
+#include "msg/msg_types.h"
using NvmeGwId = std::string;
using NvmeGroupKey = std::pair<std::string, std::string>;
GW_DELETED
};
+enum class subsystem_change_t {
+ SUBSYSTEM_ADDED,
+ SUBSYSTEM_CHANGED,
+ SUBSYSTEM_DELETED
+};
+
#define REDUNDANT_GW_ANA_GROUP_ID 0xFF
using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>;
NvmeNqnId nqn;
std::list<BeaconListener> listeners;
std::list<BeaconNamespace> namespaces;
+ subsystem_change_t change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
// Define the equality operator
bool operator==(const BeaconSubsystem& other) const {
BlocklistData blocklist_data;
//ceph entity address allocated for the GW-client that represents this GW-id
entity_addrvec_t addr_vect;
- uint16_t beacon_index = 0;
+ uint64_t beacon_sequence = 0;// sequence number of last beacon copied to GW state
+ bool beacon_sequence_ooo = false; // last beacon sequence was out of order;
+ uint16_t beacon_index = 0; // used for filter acks sent to the client as response to beacon
/**
* during redeploy action and maybe other emergency use-cases gw performs scenario
* that we call fast-reboot. It quickly reboots(due to redeploy f.e) and sends the
// it expects it performed the full startup
performed_full_startup = false;
}
+ void reset_beacon_sequence(){
+ beacon_sequence = 0;
+ }
void standby_state(NvmeAnaGrpId grpid) {
sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE;
}
epoch_t gw_map_epoch;
GwSubsystems subsystems;
gw_availability_t availability;
- NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available)
- : group_id(id), gw_map_epoch(epoch), availability(available) {}
+ uint64_t last_beacon_seq_number;
+ bool last_beacon_seq_ooo; //out of order sequence
+ NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available,
+ uint64_t sequence, bool sequence_ooo)
+ : group_id(id), gw_map_epoch(epoch), availability(available),
+ last_beacon_seq_number(sequence), last_beacon_seq_ooo(sequence_ooo) {}
NvmeGwClientState()
: NvmeGwClientState(
- REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}
+ REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE, 0, 0) {}
};
struct Tmdata {
*/
#include <boost/algorithm/string/replace.hpp>
+#include <fmt/format.h>
#include "common/errno.h"
#include "common/signal.h"
#include "include/compat.h"
#include "include/stringify.h"
+#include "include/ceph_features.h"
#include "global/global_context.h"
#include "global/signal_handler.h"
#include "NVMeofGwMonitorClient.h"
#include "NVMeofGwClient.h"
#include "NVMeofGwMonitorGroupClient.h"
+#include "nvmeof/NVMeofGwUtils.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mon
last_map_time(std::chrono::steady_clock::now()),
reset_timestamp(std::chrono::steady_clock::now()),
start_time(last_map_time),
+ cluster_beacon_diff_included(0),
+ poolctx(),
monc{g_ceph_context, poolctx},
client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())),
objecter{g_ceph_context, client_messenger.get(), &monc, poolctx},
{
ceph_assert(ceph_mutex_is_locked_by_me(beacon_lock));
gw_availability_t gw_availability = gw_availability_t::GW_CREATED;
- BeaconSubsystems subs;
+ BeaconSubsystems current_subsystems;
NVMeofGwClient gw_client(
grpc::CreateChannel(gateway_address, gw_creds()));
subsystems_info gw_subsystems;
BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() };
bsub.listeners.push_back(bls);
}
- subs.push_back(bsub);
+ current_subsystems.push_back(bsub);
}
}
+ // Determine change descriptors by comparing with previous beacon's subsystems
+ BeaconSubsystems subsystem_diff = current_subsystems;
+ determine_subsystem_changes(prev_beacon_subsystems, subsystem_diff);
+
auto group_key = std::make_pair(pool, group);
NvmeGwClientState old_gw_state;
// if already got gateway state in the map
if (first_beacon == false && get_gw_state("old map", map, group_key, name, old_gw_state))
gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE;
- dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
+ dout(1) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
" osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl;
+
+ // Check if NVMEOF_BEACON_DIFF feature is supported by the cluster
+ dout(10) << fmt::format("NVMEOF_BEACON_DIFF supported: {}", cluster_beacon_diff_included ? "yes" : "no") << dendl;
+
+ // Send beacon with appropriate version based on cluster features
auto m = ceph::make_message<MNVMeofGwBeacon>(
name,
pool,
group,
- subs,
+ cluster_beacon_diff_included ? subsystem_diff : current_subsystems,
gw_availability,
osdmap_epoch,
- gwmap_epoch);
+ gwmap_epoch,
+ beacon_sequence,
+ // Pass affected features to the constructor
+ cluster_beacon_diff_included ? CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF : 0
+ );
+ dout(10) << "sending beacon with diff support: " << (cluster_beacon_diff_included ? "enabled" : "disabled") << dendl;
+
monc.send_mon_message(std::move(m));
+ ++beacon_sequence;
+ prev_beacon_subsystems = std::move(current_subsystems);
}
void NVMeofGwMonitorClient::disconnect_panic()
// ensure that the gateway state has not vanished
ceph_assert(got_new_gw_state || !got_old_gw_state);
+ // Check if the last_beacon_seq_number in the received map doesn't match our last sent beacon_sequence
+ // beacon_sequence is incremented after sending, so we compare with (beacon_sequence - 1)
+ {
+ std::lock_guard bl(beacon_lock);
+ if (got_new_gw_state && new_gw_state.last_beacon_seq_ooo) {
+ //new_gw_state.last_beacon_seq_number != (beacon_sequence - 1)) {
+ dout(4) << "Beacon sequence mismatch detected. Expected: " << (beacon_sequence - 1)
+ << ", received: " << new_gw_state.last_beacon_seq_number
+ << ". Truncating previous subsystems list." << dendl;
+ beacon_sequence = new_gw_state.last_beacon_seq_number + 1;
+ prev_beacon_subsystems.clear();
+ dout(4) << "OOO map received, Ignore it" << dendl;
+ return;
+ }
+ }
+
if (!got_old_gw_state) {
if (!got_new_gw_state) {
dout(10) << "Can not find new gw state" << dendl;
}
}
+ // Combined subsystems
+ const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
+ GwSubsystems combined_subsystems = new_gw_state.subsystems;
+ for (const auto& nqn_state_pair: old_gw_state.subsystems) {
+ const auto& nqn = nqn_state_pair.first;
+ auto& old_nqn_state = nqn_state_pair.second;
+
+ // The monitor might remove active subsystems from the new distributed GwSubsystems.
+ // In such cases, ensure an INACCESSIBLE state is generated for subsystems
+ // that were present in the old state but are now missing.
+ if (new_gw_state.subsystems.find(nqn) == new_gw_state.subsystems.end()) {
+ ana_state_t all_disabled(old_nqn_state.ana_state.size(), initial_ana_state);
+ dout(4) << "set all groups to Inacccessible stat for " << nqn << dendl;
+ NqnState nqn_state(nqn, all_disabled);
+
+ combined_subsystems.insert({nqn, nqn_state});
+ }
+ }
+
// Gather all state changes
ana_info ai;
epoch_t max_blocklist_epoch = 0;
- for (const auto& nqn_state_pair: new_gw_state.subsystems) {
+ for (const auto& nqn_state_pair: combined_subsystems) {
auto& sub = nqn_state_pair.second;
const auto& nqn = nqn_state_pair.first;
nqn_ana_states nas;
sub.ana_state.size();
for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) {
- const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
auto new_group_state = (ana_grp_index < sub.ana_state.size()) ?
sub.ana_state[ana_grp_index] :
initial_ana_state;
std::lock_guard l(lock);
dout(10) << "got map type " << m->get_type() << dendl;
+ // print connection features for all incoming messages and update cluster features
+ if (m->get_connection()) {
+ cluster_beacon_diff_included = monc.get_monmap_required_features().contains_all(ceph::features::mon::FEATURE_NVMEOF_BEACON_DIFF);
+ dout(10) << fmt::format("Updated cluster features: 0x{:x}", cluster_beacon_diff_included) << dendl;
+ }
+
if (m->get_type() == MSG_MNVMEOF_GW_MAP) {
handle_nvmeof_gw_map(ref_cast<MNVMeofGwMap>(m));
return Dispatcher::HANDLED();
bool first_beacon = true;
bool set_group_id = false;
-
+ uint64_t beacon_sequence = 0;
+ BeaconSubsystems prev_beacon_subsystems;
+ bool cluster_beacon_diff_included = 0; // track cluster features for beacon encoding
// init gw ssl opts
void init_gw_ssl_opts();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+ BeaconSubsystems& new_subsystems) {
+ BeaconSubsystems result;
+
+ // for each subsystem in new_subsystems, check if it's added or changed
+ for (const auto& new_sub : new_subsystems) {
+ auto old_it = std::find_if(old_subsystems.begin(), old_subsystems.end(),
+ [&](const BeaconSubsystem& s) { return s.nqn == new_sub.nqn; });
+ if (old_it == old_subsystems.end()) {
+ // Subsystem not found in old list - it's new
+ BeaconSubsystem added = new_sub;
+ added.change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
+ result.push_back(std::move(added));
+ } else {
+ // subsystem exists - check if it changed
+ if (!(*old_it == new_sub)) {
+ BeaconSubsystem changed = new_sub;
+ changed.change_descriptor = subsystem_change_t::SUBSYSTEM_CHANGED;
+ result.push_back(std::move(changed));
+ }
+ // else: unchanged, do not add
+ }
+ }
+
+ // for any subsystem in old_subsystems not present in new_subsystems, add as deleted
+ for (const auto& old_sub : old_subsystems) {
+ auto found = std::find_if(new_subsystems.begin(), new_subsystems.end(),
+ [&](const BeaconSubsystem& s) { return s.nqn == old_sub.nqn; });
+ if (found == new_subsystems.end()) {
+ BeaconSubsystem deleted_sub = old_sub;
+ deleted_sub.change_descriptor = subsystem_change_t::SUBSYSTEM_DELETED;
+ result.push_back(std::move(deleted_sub));
+ }
+ }
+
+ new_subsystems = std::move(result);
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef __NVMEOFGWUTILS_H__
+#define __NVMEOFGWUTILS_H__
+#include "mon/NVMeofGwTypes.h"
+#include <list>
+
+// utility for diffing nvmeof subsystems changes
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+ BeaconSubsystems& new_subsystems);
+
+#endif
target_link_libraries(unittest_ceph_assert ceph-common global)
endif()
+add_executable(test_nvmeof_gw_utils
+ test_nvmeof_gw_utils.cc
+ ../nvmeof/NVMeofGwUtils.cc
+ )
+target_link_libraries(test_nvmeof_gw_utils
+ mon ceph-common global-static
+ )
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+#include "mon/NVMeofGwTypes.h"
+#include <iostream>
+#include <algorithm>
+#include "include/ceph_assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void test_determine_subsystem_changes() {
+ std::cout << __func__ << "\n\n" << std::endl;
+ // Prepare old and new subsystems
+ BeaconSubsystem sub1_old = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2_old = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub3_old = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems old_subs = { sub1_old, sub2_old, sub3_old };
+
+ // sub1 unchanged, sub2 changed, sub4 added, sub3 deleted
+ BeaconSubsystem sub1_new = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2_new = { "nqn2", { {"IPv4", "1.2.3.4", "4420"} }, {}, subsystem_change_t::SUBSYSTEM_ADDED }; // changed listeners
+ BeaconSubsystem sub4_new = { "nqn4", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems new_subs = { sub1_new, sub2_new, sub4_new };
+
+ determine_subsystem_changes(old_subs, new_subs);
+
+ // After call, new_subs should only contain changed, added, and deleted subsystems
+ // sub1 (unchanged) should be removed
+ // sub2 (changed) should be present with SUBSYSTEM_CHANGED
+ // sub4 (added) should be present with SUBSYSTEM_ADDED
+ // sub3 (deleted) should be present with SUBSYSTEM_DELETED
+ bool found_sub2 = false, found_sub3 = false, found_sub4 = false;
+ for (const auto& s : new_subs) {
+ if (s.nqn == "nqn2") {
+ found_sub2 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+ } else if (s.nqn == "nqn3") {
+ found_sub3 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED);
+ } else if (s.nqn == "nqn4") {
+ found_sub4 = true;
+ ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ } else {
+ ceph_assert(false && "Unexpected subsystem in result");
+ }
+ }
+ ceph_assert(found_sub2 && found_sub3 && found_sub4);
+ std::cout << "determine_subsystem_changes test passed" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+ test_determine_subsystem_changes();
+ return 0;
+}
std::string pool = "pool1";
std::string group = "grp1";
auto group_key = std::make_pair(pool, group);
- pending_map.cfg_add_gw("GW1" ,group_key);
- pending_map.cfg_add_gw("GW2" ,group_key);
- pending_map.cfg_add_gw("GW3" ,group_key);
+ std::string nqn = "nqn-nqn";
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+ BeaconSubsystems subs = {sub};
+
+ pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL);
+ pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL);
+ pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL);
NvmeNonceVector new_nonces = {"abc", "def","hij"};
pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
pending_map.created_gws[group_key]["GW1"].performed_full_startup = true;
+ pending_map.created_gws[group_key]["GW1"].subsystems = subs;
int i = 0;
for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
blklst_itr.second.osd_epoch = 2*(i++);
dout(0) << pending_map << dendl;
ceph::buffer::list bl;
+ dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
+
pending_map.encode(bl, CEPH_FEATURES_ALL);
auto p = bl.cbegin();
pending_map.decode(p);
dout(0) << " == Dump map after Decode: == " <<dendl;
dout(0) << pending_map << dendl;
+ dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
}
void test_MNVMeofGwMap() {
+ //test message to the Mon Client
dout(0) << __func__ << "\n\n" << dendl;
std::map<NvmeGroupKey, NvmeGwMonClientStates> map;
std::string pool = "pool1";
std::string group = "grp1";
std::string gw_id = "GW1";
- NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE);
+ NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE, 0, false);
std::string nqn = "nqn";
ana_state_t ana_state;
NqnState nqn_state(nqn, ana_state);
encode(map, bl, CEPH_FEATURES_ALL);
dout(0) << "encoded: " << map << dendl;
decode(map, bl);
- dout(0) << "decode: " << map << dendl;
+ dout(0) << "decoded: " << map << dendl;
- BeaconSubsystem sub = { nqn, {}, {} };
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
NVMeofGwMap pending_map;
pending_map.epoch = 2;
auto msg1 = make_message<MNVMeofGwMap>(pending_map);
int epoch = msg1->get_gwmap_epoch();
dout(0) << "after decode empty msg: " << *msg1 << " epoch " << epoch << dendl;
- pending_map.cfg_add_gw("GW1" ,group_key);
- pending_map.cfg_add_gw("GW2" ,group_key);
- pending_map.cfg_add_gw("GW3" ,group_key);
+ pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL);
+ pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL);
+ pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL);
NvmeNonceVector new_nonces = {"abc", "def","hij"};
pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub);
+
int i = 0;
for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
blklst_itr.second.osd_epoch = 2*(i++);
dout(0) << "after encode msg: " << *msg << dendl;
msg->decode_payload();
dout(0) << "after decode msg: " << *msg << dendl;
-
+
//dout(0) << "\n == Test GW Delete ==" << dendl;
//pending_map.cfg_delete_gw("GW1" ,group_key);
//dout(0) << "deleted GW1 " << pending_map << dendl;
//dout(0) << "deleted GW2 " << pending_map << dendl;
//dout(0) << "delete of wrong gw id" << dendl;
- //pending_map.cfg_delete_gw("wow" ,group_key);
+ //pending_map.cfg_delete_gw("wow" ,group_key, true);
- pending_map.cfg_delete_gw("GW3" ,group_key);
- dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
+ //pending_map.cfg_delete_gw("GW3" ,group_key);
+ //dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
}
std::string gw_group = "group";
gw_availability_t availability = gw_availability_t::GW_AVAILABLE;
std::string nqn = "nqn";
- BeaconSubsystem sub = { nqn, {}, {} };
+ BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
BeaconSubsystems subs = { sub };
epoch_t osd_epoch = 17;
epoch_t gwmap_epoch = 42;
-
+ uint64_t sequence = 12345;
+
+ // Test legacy beacon (without diff support)
auto msg = make_message<MNVMeofGwBeacon>(
gw_id,
gw_pool,
subs,
availability,
osd_epoch,
- gwmap_epoch);
- msg->encode_payload(0);
+ gwmap_epoch
+ // sequence defaults to 0
+ // enable_diff defaults to false
+ );
+ msg->encode_payload(CEPH_FEATURES_ALL);
msg->decode_payload();
- dout(0) << "decode msg: " << *msg << dendl;
+ dout(0) << "decode msg (revision 1): " << *msg << dendl;
ceph_assert(msg->get_gw_id() == gw_id);
ceph_assert(msg->get_gw_pool() == gw_pool);
ceph_assert(msg->get_gw_group() == gw_group);
ceph_assert(msg->get_availability() == availability);
ceph_assert(msg->get_last_osd_epoch() == osd_epoch);
ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch);
+ // Legacy beacons don't preserve sequence field - it gets reset to 0
+ ceph_assert(msg->get_sequence() == 0);
const auto& dsubs = msg->get_subsystems();
auto it = std::find_if(dsubs.begin(), dsubs.end(),
[&nqn](const auto& element) {
return element.nqn == nqn;
});
ceph_assert(it != dsubs.end());
+ ceph_assert(it->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+
+ // Test enhanced beacon (with diff support)
+ auto msg2 = make_message<MNVMeofGwBeacon>(
+ gw_id,
+ gw_pool,
+ gw_group,
+ subs,
+ availability,
+ osd_epoch,
+ gwmap_epoch,
+ sequence,
+ true // enable_diff = true
+ );
+ msg2->encode_payload(CEPH_FEATURES_ALL);
+ msg2->decode_payload();
+ dout(0) << "decode msg (revision 2): " << *msg2 << dendl;
+ ceph_assert(msg2->get_gw_id() == gw_id);
+ ceph_assert(msg2->get_gw_pool() == gw_pool);
+ ceph_assert(msg2->get_gw_group() == gw_group);
+ ceph_assert(msg2->get_availability() == availability);
+ ceph_assert(msg2->get_last_osd_epoch() == osd_epoch);
+ ceph_assert(msg2->get_last_gwmap_epoch() == gwmap_epoch);
+ ceph_assert(msg2->get_sequence() == sequence);
+ const auto& dsubs2 = msg2->get_subsystems();
+ auto it2 = std::find_if(dsubs2.begin(), dsubs2.end(),
+ [&nqn](const auto& element) {
+ return element.nqn == nqn;
+ });
+ ceph_assert(it2 != dsubs2.end());
+ ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+}
+
+void test_subsystem_change_descriptors() {
+ dout(0) << __func__ << "\n\n" << dendl;
+ // Test different change descriptors
+ BeaconSubsystem sub1 = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystem sub2 = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+ BeaconSubsystem sub3 = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+ BeaconSubsystems subs = { sub1, sub2, sub3 };
+ // Encode and decode
+ ceph::buffer::list bl;
+ encode(subs, bl, CEPH_FEATURES_ALL);
+ auto p = bl.cbegin();
+ BeaconSubsystems decoded_subs;
+ decode(decoded_subs, p);
+ // Verify change descriptors are preserved
+ auto it1 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn1"; });
+ auto it2 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn2"; });
+ auto it3 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+ [](const auto& s) { return s.nqn == "nqn3"; });
+ ceph_assert(it1 != decoded_subs.end());
+ ceph_assert(it2 != decoded_subs.end());
+ ceph_assert(it3 != decoded_subs.end());
+ ceph_assert(it1->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+ ceph_assert(it3->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+ dout(0) << "Subsystem change descriptors test passed" << dendl;
}
void test_NVMeofGwTimers()
test_NVMeofGwMap();
test_MNVMeofGwMap();
test_MNVMeofGwBeacon();
+ test_subsystem_change_descriptors();
test_NVMeofGwTimers();
}