From: Leonid Chernin Date: Tue, 17 Oct 2023 13:25:07 +0000 (+0000) Subject: mon: add NVMe-oF gateway monitor and HA X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=18f5b8754ec6ab1c7f4da477d7e71ea15bf059c2;p=ceph-ci.git mon: add NVMe-oF gateway monitor and HA - gateway submodule Fixes: https://tracker.ceph.com/issues/64777 This PR adds high availability support for the nvmeof Ceph service. High availability means that even in the case that a certain GW is down, there will be another available path for the initiator to be able to continue the IO through another GW. High availability is achieved by running nvmeof service consisting of at least 2 nvmeof GWs in the Ceph cluster. Every GW will be seen by the host (initiator) as a separate path to the nvme namespaces (volumes). The implementation consists of the following main modules: - NVMeofGWMon - a PaxosService. It is a monitor that tracks the status of the nvmeof running services, and take actions in case that services fail, and in case services restored. - NVMeofGwMonitorClient – It is an agent that is running as a part of each nvmeof GW. It is sending beacons to the monitor to signal that the GW is alive. As a part of the beacon, the client also sends information about the service. This information is used by the monitor to take decisions and perform some operations. - MNVMeofGwBeacon – It is a structure used by the client and the monitor to send/recv the beacons. - MNVMeofGwMap – The map is tracking the nvmeof GWs status. It also defines what should be the new role of every GW. So in the events of GWs go down or GWs restored, the map will reflect the new role of each GW resulted by these events. The map is distributed to the NVMeofGwMonitorClient on each GW, and it knows to update the GW with the required changes. It is also adding 2 new mon commands: - nvme-gw create - nvme-gw delete - nvme-gw show The commands are used by the ceph adm to update the monitor that a new GW is deployed. The monitor will update the map accordingly and will start tracking this GW until it is deleted. Resolves rhbz#2272661 Signed-off-by: Leonid Chernin Signed-off-by: Alexander Indenbaum (cherry picked from commit bf9505fb569e9b95a78f9700ed8c4bd20508ef55) additional commits: (cherry picked from commit 4f543c150503fc9ee3c4d840b6eb5ed43e15905b) (cherry picked from commit 7af4d96530531b1458e5d02bd9f706bb8c30c2ea) (cherry picked from commit 249951b5545d857861562ef7a90f876cb8f3d778) (cherry picked from commit f704d0c84257701fcbf8891dd5f285c890343219) --- diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 20b090562f8..9efc063fb37 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -571,3 +571,11 @@ Relevant tracker: https://tracker.ceph.com/issues/57090 set using the `fs set` command. This flag prevents using a standby for another file system (join_fs = X) when standby for the current filesystem is not available. Relevant tracker: https://tracker.ceph.com/issues/61599 +* mon: add NVMe-oF gateway monitor and HA + This PR adds high availability support for the nvmeof Ceph service. High availability +means that even in the case that a certain GW is down, there will be another available +path for the initiator to be able to continue the IO through another GW. +It is also adding 2 new mon commands, to notify monitor about the gateway creation/deletion: + - nvme-gw create + - nvme-gw delete +Relevant tracker: https://tracker.ceph.com/issues/64777 diff --git a/ceph.spec.in b/ceph.spec.in index 3a7538fb5e3..6cdd0a7f1ff 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -212,6 +212,7 @@ BuildRequires: gperf BuildRequires: cmake > 3.5 BuildRequires: fuse-devel BuildRequires: git +BuildRequires: grpc-devel %if 0%{?fedora} || 0%{?suse_version} > 1500 || 0%{?rhel} == 9 || 0%{?openEuler} BuildRequires: gcc-c++ >= 11 %endif @@ -586,6 +587,17 @@ system. One or more instances of ceph-mon form a Paxos part-time parliament cluster that provides extremely reliable and durable storage of cluster membership, configuration, and state. +%package mon-client-nvmeof +Summary: Ceph NVMeoF Gateway Monitor Client +%if 0%{?suse_version} +Group: System/Filesystems +%endif +Provides: ceph-test:/usr/bin/ceph-nvmeof-monitor-client +Requires: librados2 = %{_epoch_prefix}%{version}-%{release} +%description mon-client-nvmeof +Ceph NVMeoF Gateway Monitor Client distributes Paxos ANA info +to NVMeoF Gateway and provides beacons to the monitor daemon + %package mgr Summary: Ceph Manager Daemon %if 0%{?suse_version} @@ -1993,6 +2005,9 @@ if [ $1 -ge 1 ] ; then fi fi +%files mon-client-nvmeof +%{_bindir}/ceph-nvmeof-monitor-client + %files fuse %{_bindir}/ceph-fuse %{_mandir}/man8/ceph-fuse.8* diff --git a/doc/nvmeof/beacon.puml b/doc/nvmeof/beacon.puml new file mode 100644 index 00000000000..c4279b77e75 --- /dev/null +++ b/doc/nvmeof/beacon.puml @@ -0,0 +1,63 @@ +@startuml + +package "MNVMeofGwBeacon" #DDDDDD { + + class Beacon { + gw_id: String + gw_pool: String + gw_group: String + availability: Availability + subsystems: Subsystem[*] + } + + note top + NVMeoF gateways regularly transmit + a Beacon message to the monitor, + conveying their current state. + end note + + class Subsystem { + nqn: String + listeners: Listener[*] + namespaces: Namespace[*] + } + + class Listener { + address: String + family: String + service: Decimal + + } + + class Namespace { + nonce: String + ana_grp_id: Decimal + } + + note right of Namespace::nonce + RADOS connection + end note + + enum Availability { + CREATED + AVAILABLE + INACCESSIBLE + } + + note right of Availability::CREATED + The initial state, + until a MNVMeofGwMap + that contains the entry for + this gateway + end note + + Beacon::availability --> "1" Availability + Beacon::subsystems "1" *-- "n" Subsystem : contains + + Subsystem::listeners "1" *-- "n" Listener : contains + Subsystem::namespaces "1" *-- "n" Namespace : contains +} +@enduml + + + diff --git a/doc/nvmeof/gateway-state.puml b/doc/nvmeof/gateway-state.puml new file mode 100644 index 00000000000..dbfb6143ba7 --- /dev/null +++ b/doc/nvmeof/gateway-state.puml @@ -0,0 +1,41 @@ +@startuml + +package "MNVMeofGwMap" #DDDDDD { + + class GatewayState { + group_id: Decimal + ana_info: SubsystemAnaInfo[*] + } + + note top + The data structure distributed by the Monitors to NVMEoF gateways includes + a mapping of the GatewayState objects. + + NVMeoF gateways are segmented into groups identified + by (pool, group) pair, with each gateway uniquely + identified by a gateway ID. + end note + + note right of GatewayState::group_id + Identification of this gateway within its group + end note + + class SubsystemAnaInfo { + nqn: String + ana_state: AnaState[0..n] + } + + note right of SubsystemAnaInfo + Denotes the condition of the ANA groups + of the subsystem identified by nqn. + end note + + enum AnaState { + OPTIMIZED + INACCESSIBLE + } + + GatewayState "1" *-- "many" SubsystemAnaInfo : contains + SubsystemAnaInfo::ana_state "1" *-- "n" AnaState : contains +} +@enduml diff --git a/install-deps.sh b/install-deps.sh index c0117175711..bb452ff4678 100755 --- a/install-deps.sh +++ b/install-deps.sh @@ -572,6 +572,11 @@ else $SUDO rpm --import /etc/pki/rpm-gpg/RPM-GPG-KEY-EPEL-$MAJOR_VERSION $SUDO rm -f /etc/yum.repos.d/dl.fedoraproject.org* if test $ID = centos -a $MAJOR_VERSION = 8 ; then + # for grpc-devel + # See https://copr.fedorainfracloud.org/coprs/ceph/grpc/ + # epel is enabled for all major versions couple of lines above + $SUDO dnf copr enable -y ceph/grpc + # Enable 'powertools' or 'PowerTools' repo $SUDO dnf config-manager --set-enabled $(dnf repolist --all 2>/dev/null|gawk 'tolower($0) ~ /^powertools\s/{print $1}') dts_ver=11 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f91dd1abe7f..484a38484ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -860,6 +860,112 @@ if(WITH_FUSE) install(PROGRAMS mount.fuse.ceph DESTINATION ${CMAKE_INSTALL_SBINDIR}) endif(WITH_FUSE) +# NVMEOF GATEWAY MONITOR CLIENT +# Supported on RPM-based platforms only, depends on grpc devel libraries/tools +if(EXISTS "/etc/redhat-release" OR EXISTS "/etc/fedora-release") + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" ON) +else() + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" OFF) +endif() + +if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT) + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf REQUIRED) + + set(_REFLECTION grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() + + # Gateway Proto file + get_filename_component(nvmeof_gateway_proto "nvmeof/gateway/control/proto/gateway.proto" ABSOLUTE) + get_filename_component(nvmeof_gateway_proto_path "${nvmeof_gateway_proto}" PATH) + + # Generated sources + set(nvmeof_gateway_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.cc") + set(nvmeof_gateway_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.h") + set(nvmeof_gateway_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.cc") + set(nvmeof_gateway_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_gateway_proto_srcs}" "${nvmeof_gateway_proto_hdrs}" "${nvmeof_gateway_grpc_srcs}" "${nvmeof_gateway_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_gateway_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_gateway_proto}" + DEPENDS "${nvmeof_gateway_proto}") + + + # Monitor Proto file + get_filename_component(nvmeof_monitor_proto "nvmeof/gateway/control/proto/monitor.proto" ABSOLUTE) + get_filename_component(nvmeof_monitor_proto_path "${nvmeof_monitor_proto}" PATH) + + # Generated sources + set(nvmeof_monitor_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.cc") + set(nvmeof_monitor_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.h") + set(nvmeof_monitor_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.cc") + set(nvmeof_monitor_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_monitor_proto_srcs}" "${nvmeof_monitor_proto_hdrs}" "${nvmeof_monitor_grpc_srcs}" "${nvmeof_monitor_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_monitor_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_monitor_proto}" + DEPENDS "${nvmeof_monitor_proto}") + + # Include generated *.pb.h files + include_directories("${CMAKE_CURRENT_BINARY_DIR}") + + set(ceph_nvmeof_monitor_client_srcs + ${nvmeof_gateway_proto_srcs} + ${nvmeof_gateway_proto_hdrs} + ${nvmeof_gateway_grpc_srcs} + ${nvmeof_gateway_grpc_hdrs} + ${nvmeof_monitor_proto_srcs} + ${nvmeof_monitor_proto_hdrs} + ${nvmeof_monitor_grpc_srcs} + ${nvmeof_monitor_grpc_hdrs} + ceph_nvmeof_monitor_client.cc + nvmeof/NVMeofGwClient.cc + nvmeof/NVMeofGwMonitorGroupClient.cc + nvmeof/NVMeofGwMonitorClient.cc) + add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs}) + add_dependencies(ceph-nvmeof-monitor-client ceph-common) + target_link_libraries(ceph-nvmeof-monitor-client + client + mon + global-static + ceph-common + ${_REFLECTION} + ${_GRPC_GRPCPP} + ) + install(TARGETS ceph-nvmeof-monitor-client DESTINATION bin) +endif() +# END OF NVMEOF GATEWAY MONITOR CLIENT + if(WITH_DOKAN) add_subdirectory(dokan) endif(WITH_DOKAN) diff --git a/src/ceph_nvmeof_monitor_client.cc b/src/ceph_nvmeof_monitor_client.cc new file mode 100644 index 00000000000..05457998cb8 --- /dev/null +++ b/src/ceph_nvmeof_monitor_client.cc @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM Inc + * + * Author: Alexander Indenbaum + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/types.h" +#include "include/compat.h" +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "common/errno.h" +#include "common/pick_address.h" +#include "global/global_init.h" + +#include "nvmeof/NVMeofGwMonitorClient.h" + +static void usage() +{ + std::cout << "usage: ceph-nvmeof-monitor-client\n" + " --gateway-name \n" + " --gateway-address \n" + " --gateway-pool \n" + " --gateway-group \n" + " --monitor-group-address \n" + " [flags]\n" + << std::endl; + generic_server_usage(); +} + +/** + * A short main() which just instantiates a Nvme and + * hands over control to that. + */ +int main(int argc, const char **argv) +{ + ceph_pthread_setname(pthread_self(), "ceph-nvmeof-monitor-client"); + + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, // maybe later use CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); + + global_init_daemonize(g_ceph_context); + global_init_chdir(g_ceph_context); + common_init_finish(g_ceph_context); + + NVMeofGwMonitorClient gw_monitor_client(argc, argv); + int rc = gw_monitor_client.init(); + if (rc != 0) { + std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; + return rc; + } + + return gw_monitor_client.main(args); +} + diff --git a/src/common/options/mon.yaml.in b/src/common/options/mon.yaml.in index 8300e31d022..7ff915903eb 100644 --- a/src/common/options/mon.yaml.in +++ b/src/common/options/mon.yaml.in @@ -72,6 +72,14 @@ options: default: 30 services: - mon +- name: mon_nvmeofgw_beacon_grace + type: secs + level: advanced + desc: Period in seconds from last beacon to monitor marking a NVMeoF gateway as + failed + default: 10 + services: + - mon - name: mon_mgr_inactive_grace type: int level: advanced @@ -1337,3 +1345,10 @@ options: with_legacy: true see_also: - osd_heartbeat_use_min_delay_socket +- name: nvmeof_mon_client_tick_period + type: secs + level: advanced + desc: Period in seconds of nvmeof gateway beacon messages to monitor + default: 2 + services: + - mon diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h new file mode 100644 index 00000000000..202d77a78b8 --- /dev/null +++ b/src/messages/MNVMeofGwBeacon.h @@ -0,0 +1,132 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOFGWBEACON_H +#define CEPH_NVMEOFGWBEACON_H + +#include +#include +#include "messages/PaxosServiceMessage.h" +#include "mon/MonCommand.h" +#include "mon/NVMeofGwMap.h" +#include "include/types.h" + +class MNVMeofGwBeacon final : public PaxosServiceMessage { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +protected: + std::string gw_id; + std::string gw_pool; + std::string gw_group; + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable + epoch_t last_osd_epoch; + epoch_t last_gwmap_epoch; + +public: + MNVMeofGwBeacon() + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION} + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + MNVMeofGwBeacon(const std::string &gw_id_, + const std::string& gw_pool_, + const std::string& gw_group_, + const BeaconSubsystems& subsystems_, + const GW_AVAILABILITY_E& availability_, + const epoch_t& last_osd_epoch_, + const epoch_t& last_gwmap_epoch_ + ) + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}, + gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_), + availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_) + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + const std::string& get_gw_id() const { return gw_id; } + const std::string& get_gw_pool() const { return gw_pool; } + const std::string& get_gw_group() const { return gw_group; } + NvmeAnaNonceMap get_nonce_map() const { + NvmeAnaNonceMap nonce_map; + for (const auto& sub: subsystems) { + for (const auto& ns: sub.namespaces) { + auto& nonce_vec = nonce_map[ns.anagrpid-1];//Converting ana groups to offsets + if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end()) + nonce_vec.push_back(ns.nonce); + } + } + return nonce_map; + } + + const GW_AVAILABILITY_E& get_availability() const { return availability; } + const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; } + const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; } + const BeaconSubsystems& get_subsystems() const { return subsystems; }; + +private: + ~MNVMeofGwBeacon() final {} + +public: + + std::string_view get_type_name() const override { return "nvmeofgwbeacon"; } + + void encode_payload(uint64_t features) override { + using ceph::encode; + paxos_encode(); + encode(gw_id, payload); + encode(gw_pool, payload); + encode(gw_group, payload); + encode((uint32_t)subsystems.size(), payload); + for (const auto& st: subsystems) { + encode(st, payload); + } + encode((uint32_t)availability, payload); + encode(last_osd_epoch, payload); + encode(last_gwmap_epoch, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + + paxos_decode(p); + decode(gw_id, p); + decode(gw_pool, p); + decode(gw_group, p); + uint32_t n; + decode(n, p); + subsystems.clear(); + for (uint32_t i = 0; i < n; i++) { + BeaconSubsystem sub; + decode(sub, p); + subsystems.push_back(sub); + } + uint32_t tmp; + decode(tmp, p); + availability = static_cast(tmp); + decode(last_osd_epoch, p); + decode(last_gwmap_epoch, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; + + +#endif diff --git a/src/messages/MNVMeofGwMap.h b/src/messages/MNVMeofGwMap.h new file mode 100644 index 00000000000..add2554b137 --- /dev/null +++ b/src/messages/MNVMeofGwMap.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MNVMEOFGWMAP_H +#define CEPH_MNVMEOFGWMAP_H + +#include "msg/Message.h" +#include "mon/NVMeofGwMap.h" + +class MNVMeofGwMap final : public Message { +protected: + std::map map; + epoch_t gwmap_epoch; + +public: + const std::map& get_map() {return map;} + const epoch_t& get_gwmap_epoch() {return gwmap_epoch;} + +private: + MNVMeofGwMap() : + Message{MSG_MNVMEOF_GW_MAP} {} + MNVMeofGwMap(const NVMeofGwMap &map_) : + Message{MSG_MNVMEOF_GW_MAP}, gwmap_epoch(map_.epoch) + { + map_.to_gmap(map); + } + ~MNVMeofGwMap() final {} + +public: + std::string_view get_type_name() const override { return "nvmeofgwmap"; } + + void decode_payload() override { + auto p = payload.cbegin(); + decode(gwmap_epoch, p); + decode(map, p); + } + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(gwmap_epoch, payload); + encode(map, payload); + } +private: + using RefCountedObject::put; + using RefCountedObject::get; + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); + template + friend MURef crimson::make_message(Args&&... args); +}; + +#endif diff --git a/src/mon/CMakeLists.txt b/src/mon/CMakeLists.txt index 784b4c3ee0b..35e27d35e85 100644 --- a/src/mon/CMakeLists.txt +++ b/src/mon/CMakeLists.txt @@ -21,6 +21,8 @@ set(lib_mon_srcs ConnectionTracker.cc HealthMonitor.cc KVMonitor.cc + NVMeofGwMon.cc + NVMeofGwMap.cc ../mds/MDSAuthCaps.cc ../mgr/mgr_commands.cc ../osd/OSDCap.cc diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 8dafbd397af..3f3e5ba9a47 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1394,8 +1394,25 @@ COMMAND("config generate-minimal-conf", "Generate a minimal ceph.conf file", "config", "r") +/* NVMeofGwMon*/ +COMMAND("nvme-gw create" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "create nvmeof gateway id for (pool, group)", + "mgr", "rw") +COMMAND("nvme-gw delete" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "delete nvmeof gateway id for (pool, group)", + "mgr", "rw") - +COMMAND("nvme-gw show" + " name=pool,type=CephString" + " name=group,type=CephString", + " show nvmeof gateways within (pool, group)", + "mgr", "rw") // these are tell commands that were implemented as CLI commands in // the broken pre-octopus way that we want to allow to work when a diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 8d759285d1f..1d96397b9e1 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -84,6 +84,7 @@ #include "MgrStatMonitor.h" #include "ConfigMonitor.h" #include "KVMonitor.h" +#include "NVMeofGwMon.h" #include "mon/HealthMonitor.h" #include "common/config.h" #include "common/cmdparse.h" @@ -247,6 +248,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health")); paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config")); paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv")); + paxos_service[PAXOS_NVMEGW].reset(new NVMeofGwMon(*this, *paxos, "nvmeofgw")); bool r = mon_caps.parse("allow *", NULL); ceph_assert(r); @@ -3632,7 +3634,11 @@ void Monitor::handle_command(MonOpRequestRef op) mgrmon()->dispatch(op); return; } - + if (module == "nvme-gw"){ + nvmegwmon()->dispatch(op); + dout(10) << " Dispatching module " << module << " to NVMeofGwMon" << dendl; + return; + } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); @@ -4461,6 +4467,7 @@ void Monitor::_ms_dispatch(Message *m) } MonOpRequestRef op = op_tracker.create_request(m); + dout(10) << "Received message: " << op->get_req()->get_type() << dendl; bool src_is_mon = op->is_src_mon(); op->mark_event("mon:_ms_dispatch"); MonSession *s = op->get_session(); @@ -4566,6 +4573,9 @@ void Monitor::_ms_dispatch(Message *m) void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); + + dout(10) << "Received message: " << op->get_req()->get_type() << dendl; + MonSession *s = op->get_session(); ceph_assert(s); if (s->closed) { @@ -4679,6 +4689,11 @@ void Monitor::dispatch_op(MonOpRequestRef op) paxos_service[PAXOS_MGR]->dispatch(op); return; + case MSG_MNVMEOF_GW_BEACON: + paxos_service[PAXOS_NVMEGW]->dispatch(op); + return; + + // MgrStat case MSG_MON_MGR_REPORT: case CEPH_MSG_STATFS: @@ -5366,6 +5381,10 @@ void Monitor::handle_subscribe(MonOpRequestRef op) } else if (p->first.find("kv:") == 0) { kvmon()->check_sub(s->sub_map[p->first]); } + else if (p->first == "NVMeofGw") { + dout(10) << "NVMeofGw->check_sub " << dendl; + nvmegwmon()->check_sub(s->sub_map[p->first]); + } } if (reply) { diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 629846ad490..66baaa67a65 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -713,6 +713,11 @@ public: return (class KVMonitor*) paxos_service[PAXOS_KV].get(); } + class NVMeofGwMon *nvmegwmon() { + return (class NVMeofGwMon*) paxos_service[PAXOS_NVMEGW].get(); + } + + friend class Paxos; friend class OSDMonitor; friend class MDSMonitor; diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc new file mode 100755 index 00000000000..869c3784ca1 --- /dev/null +++ b/src/mon/NVMeofGwMap.cc @@ -0,0 +1,598 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "NVMeofGwMap.h" +#include "OSDMonitor.h" + +using std::map; +using std::make_pair; +using std::ostream; +using std::ostringstream; +using std::string; + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +void NVMeofGwMap::to_gmap(std::map& Gmap) const { + Gmap.clear(); + for (const auto& created_map_pair: Created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwCreatedMap& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + const auto& gw_created = gw_created_pair.second; + + auto gw_state = NvmeGwState(gw_created.ana_grp_id, epoch); + for (const auto& sub: gw_created.subsystems) { + gw_state.subsystems.insert({sub.nqn, NqnState(sub.nqn, gw_created.sm_state, gw_created )}); + } + Gmap[group_key][gw_id] = gw_state; + } + } +} + +int NVMeofGwMap::cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + // Calculate allocated group bitmask + bool allocated[MAX_SUPPORTED_ANA_GROUPS] = {false}; + for (auto& itr: Created_gws[group_key]) { + allocated[itr.second.ana_grp_id] = true; + if(itr.first == gw_id) { + dout(1) << __func__ << " ERROR create GW: already exists in map " << gw_id << dendl; + return -EEXIST ; + } + } + + // Allocate the new group id + for(int i=0; i<=MAX_SUPPORTED_ANA_GROUPS; i++) { + if (allocated[i] == false) { + NvmeGwCreated gw_created(i); + Created_gws[group_key][gw_id] = gw_created; + dout(4) << __func__ << "Created GWS: " << Created_gws << dendl; + return 0; + } + } + dout(1) << __func__ << " ERROR create GW: " << gw_id << " ANA groupId was not allocated " << dendl; + return -EINVAL; +} + +int NVMeofGwMap::cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + int rc = 0; + for (auto& gws_states: Created_gws[group_key]) { + + if (gws_states.first == gw_id) { + auto& state = gws_states.second; + for(int i=0; isecond; + st.availability = GW_AVAILABILITY_E::GW_UNAVAILABLE; + for (NvmeAnaGrpId i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i ++) { + fsm_handle_gw_down (gw_id, group_key, st.sm_state[i], i, propose_pending); + st.standby_state(i); + } + } + else { + dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_id << dendl; + rc = -EINVAL; + } + return rc; +} + + +void NVMeofGwMap::process_gw_map_ka(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending) +{ + auto& gws_states = Created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + ceph_assert (gw_state != gws_states.end()); + auto& st = gw_state->second; + dout(20) << "KA beacon from the GW " << gw_id << " in state " << (int)st.availability << dendl; + + if (st.availability == GW_AVAILABILITY_E::GW_CREATED) { + // first time appears - allow IO traffic for this GW + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + if (st.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW + st.active_state(st.ana_grp_id); + } + propose_pending = true; + } + else if (st.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE) { + st.availability = GW_AVAILABILITY_E::GW_AVAILABLE; + if (st.ana_grp_id == REDUNDANT_GW_ANA_GROUP_ID) { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) st.sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + propose_pending = true; //TODO try to find the 1st GW overloaded by ANA groups and start failback for ANA group that it is not an owner of + } + else { + //========= prepare to Failback to this GW ========= + // find the GW that took over on the group st.ana_grp_id + find_failback_gw(gw_id, group_key, propose_pending); + } + } + else if (st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + for(int i=0; isecond, st.sm_state[i], i, last_osd_epoch, propose_pending); + } + } +} + + +void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose) +{ + propose = false; + for (auto& group_state: Created_gws) { + auto& group_key = group_state.first; + auto& gws_states = group_state.second; + + for (auto& gw_state : gws_states) { // loop for GWs inside nqn group + auto& gw_id = gw_state.first; + NvmeGwCreated& state = gw_state.second; + + //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? + if (state.availability == GW_AVAILABILITY_E::GW_UNAVAILABLE && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { + auto found_gw_for_ana_group = false; + for (auto& gw_state2 : gws_states) { + NvmeGwCreated& state2 = gw_state2.second; + if (state2.availability == GW_AVAILABILITY_E::GW_AVAILABLE && state2.sm_state[state.ana_grp_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + found_gw_for_ana_group = true; // dout(4) << "Found GW " << ptr2.first << " that handles ANA grp " << (int)state->optimized_ana_group_id << dendl; + break; + } + } + if (found_gw_for_ana_group == false) { //choose the GW for handle ana group + dout(4)<< "Was not found the GW " << " that handles ANA grp " << (int)state.ana_grp_id << " find candidate "<< dendl; + + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) + find_failover_candidate( gw_id, group_key, i, propose ); + } + } + + //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it + else if (state.availability == GW_AVAILABILITY_E::GW_AVAILABLE + && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID && + state.sm_state[state.ana_grp_id] == GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE) + { + find_failback_gw(gw_id, group_key, propose); + } + } + } +} + + +void NVMeofGwMap::set_failover_gw_for_ANA_group(const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, NvmeAnaGrpId ANA_groupid) +{ + NvmeGwCreated& gw_state = Created_gws[group_key][gw_id]; + gw_state.failover_peer[ANA_groupid] = failed_gw_id; + epoch_t epoch; + dout(4) << "Found failower GW " << gw_id << " for ANA group " << (int)ANA_groupid << dendl; + int rc = blocklist_gw (failed_gw_id, group_key, ANA_groupid, epoch, true); + if(rc){ + gw_state.active_state(ANA_groupid); //TODO check whether it is valid to start failover when nonces are empty ! + //ceph_assert(false); + } + else{ + gw_state.sm_state[ANA_groupid] = GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL; + gw_state.blocklist_data[ANA_groupid].osd_epoch = epoch; + gw_state.blocklist_data[ANA_groupid].is_failover = true; + start_timer(gw_id, group_key, ANA_groupid, 30); //start Failover preparation timer + } +} + +void NVMeofGwMap::find_failback_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose) +{ + auto& gws_states = Created_gws[group_key]; + auto& gw_state = Created_gws[group_key][gw_id]; + bool do_failback = false; + + dout(4) << "Find failback GW for GW " << gw_id << dendl; + for (auto& gw_state_it: gws_states) { + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] != GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE) {// some other gw owns or owned the desired ana-group + do_failback = true;// if candidate is in state ACTIVE for the desired ana-group, then failback starts immediately, otherwise need to wait + dout(4) << "Found some gw " << gw_state_it.first << " in state " << st.sm_state[gw_state.ana_grp_id] << dendl; + break; + } + } + + if (do_failback == false) { + // No other gw currently performs some activity with desired ana group of coming-up GW - so it just takes over on the group + dout(4) << "Failback GW candidate was not found, just set Optimized to group " << gw_state.ana_grp_id << " to GW " << gw_id << dendl; + gw_state.active_state(gw_state.ana_grp_id); + propose = true; + return; + } + //try to do_failback + for (auto& gw_state_it: gws_states) { + auto& failback_gw_id = gw_state_it.first; + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) { + ceph_assert(st.failover_peer[gw_state.ana_grp_id] == gw_id); + + dout(4) << "Found Failback GW " << failback_gw_id << " that previously took over the ANAGRP " << gw_state.ana_grp_id << " of the available GW " << gw_id << dendl; + st.sm_state[gw_state.ana_grp_id] = GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED; + start_timer(failback_gw_id, group_key, gw_state.ana_grp_id, 3);// Add timestamp of start Failback preparation + gw_state.sm_state[gw_state.ana_grp_id] = GW_STATES_PER_AGROUP_E::GW_OWNER_WAIT_FAILBACK_PREPARED; + propose = true; + break; + } + } +} + + +// TODO When decision to change ANA state of group is prepared, need to consider that last seen FSM state is "approved" - means it was returned in beacon alone with map version +void NVMeofGwMap::find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending) +{ + dout(4) <<__func__<< " " << gw_id << dendl; + #define ILLEGAL_GW_ID " " + #define MIN_NUM_ANA_GROUPS 0xFFF + int min_num_ana_groups_in_gw = 0; + int current_ana_groups_in_gw = 0; + NvmeGwId min_loaded_gw_id = ILLEGAL_GW_ID; + + auto& gws_states = Created_gws[group_key]; + + auto gw_state = gws_states.find(gw_id); + ceph_assert(gw_state != gws_states.end()); + + // this GW may handle several ANA groups and for each of them need to found the candidate GW + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE || gw_state->second.ana_grp_id == grpid) { + + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if(st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL){ // some GW already started failover/failback on this group + dout(4) << "Failover" << st.blocklist_data[grpid].is_failover << " already started for the group " << grpid << " by GW " << found_gw_state.first << dendl; + gw_state->second.standby_state(grpid); + return ; + } + } + + // Find a GW that takes over the ANA group(s) + min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; + min_loaded_gw_id = ILLEGAL_GW_ID; + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.availability == GW_AVAILABILITY_E::GW_AVAILABLE) { + current_ana_groups_in_gw = 0; + for (int j = 0; j < MAX_SUPPORTED_ANA_GROUPS; j++) { + if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_OWNER_WAIT_FAILBACK_PREPARED || st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED + || st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL){ + current_ana_groups_in_gw = 0xFFFF; + break; // dont take into account GWs in the transitive state + } + else if (st.sm_state[j] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE) + //dout(4) << " process GW down " << current_ana_groups_in_gw << dendl; + current_ana_groups_in_gw++; // how many ANA groups are handled by this GW + } + + if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) { + min_num_ana_groups_in_gw = current_ana_groups_in_gw; + min_loaded_gw_id = found_gw_state.first; + dout(4) << "choose: gw-id min_ana_groups " << min_loaded_gw_id << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; + } + } + } + if (min_loaded_gw_id != ILLEGAL_GW_ID) { + propose_pending = true; + set_failover_gw_for_ANA_group(gw_id, group_key, min_loaded_gw_id, grpid); + } + else { + if (gw_state->second.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE){// not found candidate but map changed. + propose_pending = true; + dout(4) << "gw down no candidate found " << dendl; + } + } + gw_state->second.standby_state(grpid); + } +} + +void NVMeofGwMap::fsm_handle_gw_alive (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwCreated & gw_state, GW_STATES_PER_AGROUP_E state, NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified) +{ + switch (state) { + case GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL: + { + int timer_val = get_timer(gw_id, group_key, grpid); + NvmeGwCreated& gw_map = Created_gws[group_key][gw_id]; + if(gw_map.blocklist_data[grpid].osd_epoch <= last_osd_epoch){ + dout(4) << "is-failover: " << gw_map.blocklist_data[grpid].is_failover << " osd epoch changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + gw_state.active_state(grpid); // Failover Gw still alive and guaranteed that + cancel_timer(gw_id, group_key, grpid); // ana group wouldnt be taken back during blocklist wait period + map_modified = true; + } + else{ + dout(20) << "osd epoch not changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + } + } + break; + + default: + break; + } +} + + void NVMeofGwMap::fsm_handle_gw_down(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, GW_STATES_PER_AGROUP_E state, NvmeAnaGrpId grpid, bool &map_modified) + { + switch (state) + { + case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: + case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: + // nothing to do + break; + + case GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL: + { + cancel_timer(gw_id, group_key, grpid); + }break; + + case GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED: + cancel_timer(gw_id, group_key, grpid); + + for (auto& gw_st: Created_gws[group_key]) { + auto& st = gw_st.second; + if (st.sm_state[grpid] == GW_STATES_PER_AGROUP_E::GW_OWNER_WAIT_FAILBACK_PREPARED) { // found GW that was intended for Failback for this ana grp + dout(4) << "Warning: Outgoing Failback when GW is down back - to rollback it" <<" GW " <second; +} + +struct CMonRequestProposal : public Context { + NVMeofGwMap *m; + CMonRequestProposal(NVMeofGwMap *mon) : m(mon) {} + void finish(int r) { + dout(4) << "osdmon is writable? " << m->mon->osdmon()->is_writeable() << dendl; + if(m->mon->osdmon()->is_writeable()){ + m->mon->nvmegwmon()->request_proposal(m->mon->osdmon()); + } + else { + m->mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(m)); + } + } +}; + +int NVMeofGwMap::blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, epoch_t &epoch, bool failover) +{ + NvmeGwCreated& gw_map = Created_gws[group_key][gw_id]; //find_already_created_gw(gw_id, group_key); + + if (gw_map.nonce_map[grpid].size() > 0){ + NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];; + std::string str = "["; + entity_addrvec_t addr_vect; + + double d = g_conf().get_val("mon_osd_blocklist_default_expire"); + utime_t expires = ceph_clock_now(); + expires += d; + dout(4) << " blocklist timestamp " << expires << dendl; + for(auto &it: nonce_vector ){ + if(str != "[") str += ","; + str += it; + } + str += "]"; + bool rc = addr_vect.parse(&str[0]); + dout(10) << str << " rc " << rc << " network vector: " << addr_vect << " " << addr_vect.size() << dendl; + ceph_assert(rc); + epoch = mon->osdmon()->blocklist(addr_vect, expires); + + if (!mon->osdmon()->is_writeable()) { + dout(4) << "osdmon is not writable, waiting " << dendl; + mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(this ));// return false; + } + else mon->nvmegwmon()->request_proposal(mon->osdmon()); + dout(4) << str << " mon->osdmon()->blocklist: " << epoch << " address vector: " << addr_vect << " " << addr_vect.size() << dendl; + } + else{ + dout(1) << "Error: No nonces context present for gw: " <= md.data[ana_grpid].end_time){ + fsm_handle_to_expired (gw_id, std::make_pair(pool, group), ana_grpid, propose_pending); + } + } + } + } +} + + +void NVMeofGwMap::start_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value_sec) { + Gmetadata[group_key][gw_id].data[anagrpid].timer_started = 1; + Gmetadata[group_key][gw_id].data[anagrpid].timer_value = value_sec; + dout(4) << "start timer for ana " << anagrpid << " gw " << gw_id << "value sec " << (int)value_sec << dendl; + const auto now = std::chrono::system_clock::now(); + Gmetadata[group_key][gw_id].data[anagrpid].end_time = now + std::chrono::seconds(value_sec); +} + +int NVMeofGwMap::get_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + auto timer = Gmetadata[group_key][gw_id].data[anagrpid].timer_value; + return timer; +} + +void NVMeofGwMap::cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + Gmetadata[group_key][gw_id].data[anagrpid].timer_started = 0; +} diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h new file mode 100755 index 00000000000..bf46b31dc07 --- /dev/null +++ b/src/mon/NVMeofGwMap.h @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef MON_NVMEOFGWMAP_H_ +#define MON_NVMEOFGWMAP_H_ +#include +#include +#include "include/encoding.h" +#include "include/utime.h" +#include "common/Formatter.h" +#include "common/ceph_releases.h" +#include "common/version.h" +#include "common/options.h" +#include "common/Clock.h" +#include "msg/Message.h" +#include "common/ceph_time.h" +#include "NVMeofGwTypes.h" + +using ceph::coarse_mono_clock; +class Monitor; +/*-------------------*/ +class NVMeofGwMap +{ +public: + Monitor* mon = NULL; + epoch_t epoch = 0; // epoch is for Paxos synchronization mechanizm + bool delay_propose = false; + + std::map Created_gws; + std::map Gmetadata; + void to_gmap(std::map& Gmap) const; + + int cfg_add_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + int cfg_delete_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + void process_gw_map_ka (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending); + int process_gw_map_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void update_active_timers (bool &propose_pending); + void handle_abandoned_ana_groups (bool &propose_pending); + void handle_removed_subsystems (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const std::vector ¤t_subsystems, bool &propose_pending); + void start_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value); +private: + NvmeGwCreated& find_already_created_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + void fsm_handle_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, GW_STATES_PER_AGROUP_E state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_delete (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, GW_STATES_PER_AGROUP_E state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_alive (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwCreated & gw_state, GW_STATES_PER_AGROUP_E state, + NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified); + void fsm_handle_to_expired (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &map_modified); + + void find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending); + void find_failback_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void set_failover_gw_for_ANA_group (const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, + NvmeAnaGrpId groupid); + int blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId ANA_groupid, epoch_t &epoch, bool failover); + + int get_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + void cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + +public: + void encode(ceph::buffer::list &bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(epoch, bl);// global map epoch + + encode(Created_gws, bl); //Encode created GWs + encode(Gmetadata, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator &bl) { + using ceph::decode; + DECODE_START(1, bl); + decode(epoch, bl); + + decode(Created_gws, bl); + decode(Gmetadata, bl); + DECODE_FINISH(bl); + } +}; + +#include "NVMeofGwSerialize.h" + +#endif /* SRC_MON_NVMEOFGWMAP_H_ */ diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc new file mode 100644 index 00000000000..bdbdec716da --- /dev/null +++ b/src/mon/NVMeofGwMon.cc @@ -0,0 +1,546 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +using std::string; + +void NVMeofGwMon::init(){ + dout(4) << "called " << dendl; + g_conf().add_observer(this); +} + +void NVMeofGwMon::on_restart(){ + dout(4) << "called " << dendl; + last_beacon.clear(); + last_tick = ceph::coarse_mono_clock::now(); +} + + +void NVMeofGwMon::synchronize_last_beacon(){ + dout(4) << "called " << dendl; + last_beacon.clear(); + last_tick = ceph::coarse_mono_clock::now(); + // Initialize last_beacon to identify transitions of available GWs to unavailable state + for (const auto& created_map_pair: pending_map.Created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwCreatedMap& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + if (gw_created_pair.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE){ + dout(4) << "synchronize last_beacon for GW :" << gw_id << dendl; + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = last_tick; + } + } + } +} + +void NVMeofGwMon::on_shutdown() { + g_conf().remove_observer(this); +} + +void NVMeofGwMon::tick(){ + if (!is_active() || !mon.is_leader()){ + dout(10) << "NVMeofGwMon leader : " << mon.is_leader() << "active : " << is_active() << dendl; + last_leader = false; + return; + } + bool _propose_pending = false; + + const auto now = ceph::coarse_mono_clock::now(); + const auto nvmegw_beacon_grace = g_conf().get_val("mon_nvmeofgw_beacon_grace"); + dout(15) << "NVMeofGwMon leader got a real tick, pending epoch "<< pending_map.epoch << dendl; + + const auto mgr_tick_period = g_conf().get_val("mgr_tick_period"); + + if (last_tick != ceph::coarse_mono_clock::zero() + && (now - last_tick > (nvmegw_beacon_grace - mgr_tick_period))) { + // This case handles either local slowness (calls being delayed + // for whatever reason) or cluster election slowness (a long gap + // between calls while an election happened) + dout(4) << ": resetting beacon timeouts due to mon delay " + "(slow election?) of " << now - last_tick << " seconds" << dendl; + for (auto &i : last_beacon) { + i.second = now; + } + } + + last_tick = now; + bool propose = false; + + pending_map.update_active_timers(propose); // Periodic: check active FSM timers + _propose_pending |= propose; + + + //handle exception of tick overdued in order to avoid false detection of overdued beacons , see MgrMonitor::tick + const auto cutoff = now - nvmegw_beacon_grace; + for(auto &itr : last_beacon){// Pass over all the stored beacons + auto& lb = itr.first; + auto last_beacon_time = itr.second; + if(last_beacon_time < cutoff){ + dout(4) << "beacon timeout for GW " << lb.gw_id << dendl; + pending_map.process_gw_map_gw_down( lb.gw_id, lb.group_key, propose); + _propose_pending |= propose; + last_beacon.erase(lb); + } + else { + dout(20) << "beacon live for GW key: " << lb.gw_id << dendl; + } + } + + pending_map.handle_abandoned_ana_groups(propose); // Periodic: take care of not handled ANA groups + _propose_pending |= propose; + + if(_propose_pending){ + //pending_map.delay_propose = true; // not to send map to clients immediately in "update_from_paxos" + dout(4) << "propose pending " < &changed) +{ + dout(4) << "changed " << changed << dendl; +} + +void NVMeofGwMon::create_pending(){ + + pending_map = map;// deep copy of the object + // TODO since "pending_map" can be reset each time during paxos re-election even in the middle of the changes ... + pending_map.epoch++; + dout(4) << " pending " << pending_map << dendl; + if(last_leader == false){ // peon becomes leader and gets updated map , need to synchronize the last_beacon + synchronize_last_beacon(); + last_leader = true; + } +} + +void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t){ + + dout(10) << dendl; + ceph_assert(get_last_committed() + 1 == pending_map.epoch); + bufferlist bl; + pending_map.encode(bl); + put_version(t, pending_map.epoch, bl); + put_last_committed(t, pending_map.epoch); +} + +void NVMeofGwMon::update_from_paxos(bool *need_bootstrap){ + version_t version = get_last_committed(); + + //dout(4) << MY_MON_PREFFIX << __func__ << " version " << version << " map.epoch " << map.epoch << dendl; + + if (version != map.epoch) { + dout(4) << " NVMeGW loading version " << version << " " << map.epoch << dendl; + + bufferlist bl; + int err = get_version(version, bl); + ceph_assert(err == 0); + + auto p = bl.cbegin(); + map.decode(p); + if(!mon.is_leader()) { + dout(4) << "leader map: " << map << dendl; + } + check_subs(true); + } +} + +void NVMeofGwMon::check_sub(Subscription *sub) +{ + /* MgrMonitor::check_sub*/ + //if (sub->type == "NVMeofGw") { + dout(10) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl; + if (sub->next <= map.epoch) + { + dout(4) << "Sending map to subscriber " << sub->session->con << " " << sub->session->con->get_peer_addr() << dendl; + sub->session->con->send_message2(make_message(map)); + + if (sub->onetime) { + mon.session_map.remove_sub(sub); + } else { + sub->next = map.epoch + 1; + } + } +} + +void NVMeofGwMon::check_subs(bool t) +{ + const std::string type = "NVMeofGw"; + dout(4) << "count " << mon.session_map.subs.count(type) << dendl; + + if (mon.session_map.subs.count(type) == 0){ + return; + } + for (auto sub : *(mon.session_map.subs[type])){ + check_sub(sub); + } +} + +bool NVMeofGwMon::preprocess_query(MonOpRequestRef op){ + dout(20) << dendl; + + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return preprocess_beacon(op); + + case MSG_MON_COMMAND: + try { + return preprocess_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return true; + } + + default: + mon.no_reply(op); + derr << "Unhandled message type " << m->get_type() << dendl; + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_update(MonOpRequestRef op){ + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return prepare_beacon(op); + + case MSG_MON_COMMAND: + try { + return prepare_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return false; /* nothing to propose! */ + } + + default: + mon.no_reply(op); + dout(1) << "Unhandled message type " << m->get_type() << dendl; + return false; /* nothing to propose! */ + } + return true; +} + +bool NVMeofGwMon::preprocess_command(MonOpRequestRef op) +{ + dout(4) << dendl; + auto m = op->get_req(); + std::stringstream ss; + bufferlist rdata; + + cmdmap_t cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) + { + string rs = ss.str(); + dout(1) << "Invalid command " << m->cmd << "Error " << rs << dendl; + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(cmdmap, "prefix", prefix); + dout(4) << "MonCommand : "<< prefix << dendl; + + /* MonSession *session = op->get_session(); + if (!session) + { + dout(4) << "MonCommand : "<< prefix << " access denied due to lack of session" << dendl; + mon.reply_command(op, -EACCES, "access denied", rdata, + get_last_committed()); + return true; + } + */ + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + // TODO need to check formatter per preffix - if f is NULL + + return false; +} + +bool NVMeofGwMon::prepare_command(MonOpRequestRef op) +{ + dout(4) << dendl; + auto m = op->get_req(); + int rc; + std::stringstream ss; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) + { + string rs = ss.str(); + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + MonSession *session = op->get_session(); + if (!session) + { + mon.reply_command(op, -EACCES, "access denied", rdata, get_last_committed()); + return true; + } + + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + const auto prefix = cmd_getval_or(cmdmap, "prefix", string{}); + + dout(4) << "MonCommand : "<< prefix << dendl; + if( prefix == "nvme-gw create" || prefix == "nvme-gw delete" ) { + std::string id, pool, group; + + cmd_getval(cmdmap, "id", id); + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(4) << " id "<< id <<" pool "<< pool << " group "<< group << dendl; + if(prefix == "nvme-gw create"){ + rc = pending_map.cfg_add_gw(id, group_key); + ceph_assert(rc!= -EINVAL); + } + else{ + rc = pending_map.cfg_delete_gw(id, group_key); + if(rc== -EINVAL){ + dout (1) << "Error: GW not found in the database " << id << " " << pool << " " << group << " rc " << rc << dendl; + err = rc; + ss.str(""); + } + } + if((rc != -EEXIST) && (rc != -EINVAL)){ + //propose pending would be generated by the PaxosService + goto update; + } + else { + goto reply_no_propose; + } + } + else if ( prefix == "nvme-gw show" ){ + std::string pool, group; + if (!f) { + f.reset(Formatter::create(format, "json-pretty", "json-pretty")); + } + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(4) <<"nvme-gw show pool "<< pool << " group "<< group << dendl; + + if( map.Created_gws[group_key].size()){ + f->open_object_section("common"); + f->dump_string("pool", pool); + f->dump_string("group", group); + f->dump_unsigned("num gws", map.Created_gws[group_key].size()); + ss <<"[ "; + NvmeAnaGrpId anas[MAX_SUPPORTED_ANA_GROUPS]; + int i = 0; + for (auto& gw_created_pair: map.Created_gws[group_key]) { + auto& st = gw_created_pair.second; + ss << st.ana_grp_id+1 << " "; + anas[i++] = st.ana_grp_id; + } + ss << "]"; + f->dump_string("Anagrp list", ss.str()); + f->close_section(); + + for (auto& gw_created_pair: map.Created_gws[group_key]) { + auto& gw_id = gw_created_pair.first; + auto& state = gw_created_pair.second; + f->open_object_section("stat"); + f->dump_string("gw-id", gw_id); + f->dump_unsigned("anagrp-id",state.ana_grp_id+1); + f->dump_unsigned("last-gw_map-epoch-valid",state.last_gw_map_epoch_valid); + std::stringstream ss1; + ss1 << state.availability; + f->dump_string("Availability", ss1.str()); + ss1.str(""); + for (size_t i = 0; i < map.Created_gws[group_key].size(); i++) { + ss1 << " " << anas[i]+1 <<": " << state.sm_state[anas[i]] << ","; + } + f->dump_string("ana states", ss1.str()); + f->close_section(); + } + f->flush(rdata); + ss.str(""); + } + else { + ss << "num_gws 0"; + } + } + + reply_no_propose: + getline(ss, rs); + if (err < 0 && rs.length() == 0) + { + rs = cpp_strerror(err); + dout(1) << "Error command err : "<< err << " rs-len: " << rs.length() << dendl; + } + mon.reply_command(op, err, rs, rdata, get_last_committed()); + return false; /* nothing to propose */ + + update: + getline(ss, rs); + wait_for_commit(op, new Monitor::C_Command(mon, op, 0, rs, + get_last_committed() + 1)); + return true; +} + + +bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op){ + //dout(4) << dendl; + auto m = op->get_req(); + const BeaconSubsystems& sub = m->get_subsystems(); + //mon.no_reply(op); // we never reply to beacons + dout(15) << "beacon from " << m->get_type() << " GW : " << m->get_gw_id() << " num subsystems " << sub.size() << dendl; + MonSession *session = op->get_session(); + if (!session){ + dout(4) << "beacon no session " << dendl; + return true; + } + + return false; // allways return false to call leader's prepare beacon +} + + +//#define BYPASS_GW_CREATE_CLI + +bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op){ + //dout(4) << dendl; + auto m = op->get_req(); + + dout(20) << "availability " << m->get_availability() << " GW : " << m->get_gw_id() << + " osdmap_epoch " << m->get_last_osd_epoch() << " subsystems " << m->get_subsystems() << dendl; + + NvmeGwId gw_id = m->get_gw_id(); + NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); + GW_AVAILABILITY_E avail = m->get_availability(); + bool propose = false; + bool nonce_propose = false; + bool timer_propose = false; + bool gw_created = true; + NVMeofGwMap ack_map; + auto& group_gws = map.Created_gws[group_key]; + auto gw = group_gws.find(gw_id); + const BeaconSubsystems& sub = m->get_subsystems(); + + if (avail == GW_AVAILABILITY_E::GW_CREATED){ + if (gw == group_gws.end()) { + gw_created = false; + dout(4) << "Warning: GW " << gw_id << " group_key " << group_key << " was not found in the map.Created_gws "<< map.Created_gws <get_nonce_map().size()) { + if(pending_map.Created_gws[group_key][gw_id].nonce_map != m->get_nonce_map()) + { + dout(4) << "nonce map of GW changed , propose pending " << gw_id << dendl; + pending_map.Created_gws[group_key][gw_id].nonce_map = m->get_nonce_map(); + dout(4) << "nonce map of GW " << gw_id << " "<< pending_map.Created_gws[group_key][gw_id].nonce_map << dendl; + nonce_propose = true; + } + } + else { + dout(4) << "Warning: received empty nonce map in the beacon of GW " << gw_id << " "<< dendl; + } + + //pending_map.handle_removed_subsystems(gw_id, group_key, configured_subsystems, propose); + + //if no subsystem configured set gw as avail = GW_AVAILABILITY_E::GW_UNAVAILABLE + + if(sub.size() == 0) { + avail = GW_AVAILABILITY_E::GW_UNAVAILABLE; + } + pending_map.Created_gws[group_key][gw_id].subsystems = sub; + pending_map.Created_gws[group_key][gw_id].last_gw_map_epoch_valid = ( map.epoch == m->get_last_gwmap_epoch() ); + if( pending_map.Created_gws[group_key][gw_id].last_gw_map_epoch_valid == false ){ + dout(1) << "map epoch of gw is not up-to-date " << gw_id << " epoch " << map.epoch << " beacon_epoch " << m->get_last_gwmap_epoch() << dendl; + } + if(avail == GW_AVAILABILITY_E::GW_AVAILABLE) + { + //dout(4) <<"subsystems from beacon " << pending_map.Created_gws << dendl; + auto now = ceph::coarse_mono_clock::now(); + // check pending_map.epoch vs m->get_version() - if different - drop the beacon + + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = now; + epoch_t last_osd_epoch = m->get_last_osd_epoch(); + pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, propose); + } + else if(avail == GW_AVAILABILITY_E::GW_UNAVAILABLE){ // state set by GW client application + // TODO: remove from last_beacon if found . if gw was found in last_beacon call process_gw_map_gw_down + + LastBeacon lb = {gw_id, group_key}; + + auto it = last_beacon.find(lb); + if (it != last_beacon.end()){ + last_beacon.erase(lb); + pending_map.process_gw_map_gw_down(gw_id, group_key, propose); + } + } + pending_map.update_active_timers(timer_propose); // Periodic: check active FSM timers + propose |= timer_propose; + propose |= nonce_propose; + +set_propose: + if(!propose) { + if(gw_created){ + ack_map.Created_gws[group_key][gw_id] = map.Created_gws[group_key][gw_id];// respond with a map slice correspondent to the same GW + } + ack_map.epoch = map.epoch; + dout(20) << "ack_map " << ack_map <(ack_map); + mon.send_reply(op, msg.detach()); + } +false_return: + if (propose){ + dout(4) << "decision in prepare_beacon" < last_beacon; + + // when the mon was not updating us for some period (e.g. during slow + // election) to reset last_beacon timeouts + ceph::coarse_mono_clock::time_point last_tick; + + std::vector command_descs; + std::vector pending_command_descs; + +public: + NVMeofGwMon(Monitor &mn, Paxos &p, const std::string& service_name): PaxosService(mn, p, service_name) {map.mon = &mn; last_leader = false;} + ~NVMeofGwMon() override {} + + + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override; + + // 3 pure virtual methods of the paxosService + void create_initial()override{}; + void create_pending()override ; + void encode_pending(MonitorDBStore::TransactionRef t)override ; + + void init() override; + void on_shutdown() override; + void on_restart() override; + void update_from_paxos(bool *need_bootstrap) override; + + + bool preprocess_query(MonOpRequestRef op) override; + bool prepare_update(MonOpRequestRef op) override; + + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + + void encode_full(MonitorDBStore::TransactionRef t) override { } + + bool preprocess_beacon(MonOpRequestRef op); + bool prepare_beacon(MonOpRequestRef op); + + void tick() override; + void print_summary(ceph::Formatter *f, std::ostream *ss) const; + + void check_subs(bool type); + void check_sub(Subscription *sub); + +private: + bool last_leader; + void synchronize_last_beacon(); + +}; + +#endif /* MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h new file mode 100755 index 00000000000..0c8414e6b7f --- /dev/null +++ b/src/mon/NVMeofGwSerialize.h @@ -0,0 +1,602 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ +#ifndef MON_NVMEOFGWSERIALIZE_H_ +#define MON_NVMEOFGWSERIALIZE_H_ +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define MODULE_PREFFIX "nvmeofgw " +#define dout_prefix *_dout << MODULE_PREFFIX << __PRETTY_FUNCTION__ << " " + +inline std::ostream& operator<<(std::ostream& os, const GW_EXPORTED_STATES_PER_AGROUP_E value) { + switch (value) { + case GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_OPTIMIZED_STATE: os << "OPTIMIZED "; break; + case GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_INACCESSIBLE_STATE: os << "INACCESSIBLE "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const GW_STATES_PER_AGROUP_E value) { + switch (value) { + case GW_STATES_PER_AGROUP_E::GW_IDLE_STATE: os << "IDLE "; break; + case GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE: os << "STANDBY "; break; + case GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE: os << "ACTIVE "; break; + case GW_STATES_PER_AGROUP_E::GW_OWNER_WAIT_FAILBACK_PREPARED: os << "OWNER_FAILBACK_PREPARED "; break; + case GW_STATES_PER_AGROUP_E::GW_WAIT_FAILBACK_PREPARED: os << "WAIT_FAILBACK_PREPARED "; break; + case GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL: os << "WAIT_BLOCKLIST_CMPL "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const GW_AVAILABILITY_E value) { + switch (value) { + + case GW_AVAILABILITY_E::GW_CREATED: os << "CREATED"; break; + case GW_AVAILABILITY_E::GW_AVAILABLE: os << "AVAILABLE"; break; + case GW_AVAILABILITY_E::GW_UNAVAILABLE: os << "UNAVAILABLE"; break; + + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const SM_STATE value) { + os << "SM_STATE [ "; + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++) os << value[i]; + os << "]"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconNamespace value) { + os << "BeaconNamespace( anagrpid:" << value.anagrpid << ", nonce:" << value.nonce <<" )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) { + os << "BeaconListener( addrfam:" << value.address_family + << ", addr:" << value.address + << ", svcid:" << value.svcid << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) { + os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ "; + for (const auto& list: value.listeners) os << list << " "; + os << "] namespaces [ "; + for (const auto& ns: value.namespaces) os << ns << " "; + os << "] )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NqnState value) { + os << "NqnState( nqn: " << value.nqn << ", " << value.ana_state << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwState value) { + os << "NvmeGwState { group id: " << value.group_id << " gw_map_epoch " << value.gw_map_epoch + << " GwSubsystems: [ "; + for (const auto& sub: value.subsystems) os << sub.second << " "; + os << " ] }"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGroupKey value) { + os << "NvmeGroupKey {" << value.first << "," << value.second << "}"; + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwMap value) { + os << "NvmeGwMap "; + for (auto& gw_state: value) { + os << "\n" << MODULE_PREFFIX <<" { == gw_id: " << gw_state.first << " -> " << gw_state.second << "}"; + } + os << "}"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeNonceVector value) { + for (auto & nonces : value) { + os << nonces << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeAnaNonceMap value) { + if(value.size()) os << "\n" << MODULE_PREFFIX; + for (auto &nonce_map : value) { + os << " ana_grp: " << nonce_map.first << " [ " << nonce_map.second << "]\n"<< MODULE_PREFFIX ; + } + return os; +} + +inline std::ostream& print_gw_created_t(std::ostream& os, const NvmeGwCreated value, size_t num_ana_groups, NvmeAnaGrpId *anas) { + os << "==Internal map ==NvmeGwCreated { ana_group_id " << value.ana_grp_id << " osd_epochs: "; + for(size_t i = 0; i < num_ana_groups; i ++){ + os << " " << anas[i] <<": " << value.blocklist_data[anas[i]].osd_epoch << ":" < { " << group_gws.second << " }"; + } + os << "]"; + return os; +} + +inline void encode(const ANA_STATE& st, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)st.size(), bl); + for (const auto& gr: st){ + encode((uint32_t)gr.first, bl); + encode((uint32_t)gr.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(ANA_STATE& st, ceph::buffer::list::const_iterator &bl) { + uint32_t n; + DECODE_START(1, bl); + decode(n, bl); + st.resize(n); + for (uint32_t i = 0; i < n; i++) { + uint32_t a, b; + decode(a, bl); + decode(b, bl); + st[i].first = (GW_EXPORTED_STATES_PER_AGROUP_E)a; + st[i].second = (epoch_t)b; + } + DECODE_FINISH(bl); +} + +inline void encode(const GwSubsystems& subsystems, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)subsystems.size(), bl); + for (const auto& sub: subsystems){ + encode(sub.second.nqn, bl); + encode(sub.second.ana_state, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(GwSubsystems& subsystems, ceph::bufferlist::const_iterator& bl) { + uint32_t num_subsystems; + DECODE_START(1, bl); + decode(num_subsystems, bl); + subsystems.clear(); + for (uint32_t i=0; i(endtime.time_since_epoch()).count(); + encode(millisecondsSinceEpoch , bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeGwMetaData& state, ceph::bufferlist::const_iterator& bl) { + uint32_t s; + + DECODE_START(1, bl); + decode(s, bl); + ceph_assert(s == (uint32_t)MAX_SUPPORTED_ANA_GROUPS); + for(int i = 0; i (duration); + } + DECODE_FINISH(bl); +} + +inline void encode(const NvmeAnaNonceMap& nonce_map, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)nonce_map.size(), bl); + for (auto& ana_group_nonces : nonce_map) { + encode(ana_group_nonces.first, bl); // ana group id + encode ((uint32_t)ana_group_nonces.second.size(), bl); // encode the vector size + for (auto& nonce: ana_group_nonces.second) encode(nonce, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeAnaNonceMap& nonce_map, ceph::buffer::list::const_iterator &bl) { + uint32_t map_size; + NvmeAnaGrpId ana_grp_id; + uint32_t vector_size; + std::string nonce; + DECODE_START(1, bl); + decode(map_size, bl); + for(uint32_t i = 0; i& created_gws, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)created_gws.size(), bl); // number of groups + for (auto& group_gws: created_gws) { + auto& group_key = group_gws.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + auto& gws = group_gws.second; + encode (gws, bl); // encode group gws + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& created_gws, ceph::buffer::list::const_iterator &bl) { + created_gws.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for(uint32_t i = 0; i& gmap, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmap.size(), bl); // number of groups + for (auto& group_state: gmap) { + auto& group_key = group_state.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + encode(group_state.second, bl); + } + ENCODE_FINISH(bl); +} +// Start decode NvmeGroupKey, NvmeGwMap +inline void decode(std::map& gmap, ceph::buffer::list::const_iterator &bl) { + gmap.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for(uint32_t i = 0; i& gmetadata, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmetadata.size(), bl); // number of groups + for (auto& group_md: gmetadata) { + auto& group_key = group_md.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_md.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& gmetadata, ceph::buffer::list::const_iterator &bl) { + gmetadata.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for(uint32_t i = 0; i +#include +#include +#include + +using NvmeGwId = std::string; +using NvmeGroupKey = std::pair; +using NvmeNqnId = std::string; +using NvmeAnaGrpId = uint32_t; + + +enum class GW_STATES_PER_AGROUP_E { + GW_IDLE_STATE = 0, //invalid state + GW_STANDBY_STATE, + GW_ACTIVE_STATE, + GW_OWNER_WAIT_FAILBACK_PREPARED, + GW_WAIT_FAILBACK_PREPARED, + GW_WAIT_BLOCKLIST_CMPL +}; + +enum class GW_EXPORTED_STATES_PER_AGROUP_E { + GW_EXPORTED_OPTIMIZED_STATE = 0, + GW_EXPORTED_INACCESSIBLE_STATE +}; + +enum class GW_AVAILABILITY_E { + GW_CREATED = 0, + GW_AVAILABLE, + GW_UNAVAILABLE, + GW_DELETED +}; + +#define MAX_SUPPORTED_ANA_GROUPS 16 +#define REDUNDANT_GW_ANA_GROUP_ID 0xFF + +typedef GW_STATES_PER_AGROUP_E SM_STATE [MAX_SUPPORTED_ANA_GROUPS]; + +using ANA_STATE = std::vector>; + +struct BeaconNamespace { + NvmeAnaGrpId anagrpid; + std::string nonce; +}; + +struct BeaconListener { + std::string address_family; // IPv4 or IPv6 + std::string address; // + std::string svcid; // port +}; + +struct BeaconSubsystem { + NvmeNqnId nqn; + std::list listeners; + std::list namespaces; +}; + +using BeaconSubsystems = std::list; + +using NvmeNonceVector = std::vector; +using NvmeAnaNonceMap = std::map ; + +struct NvmeGwCreated { + NvmeAnaGrpId ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id + GW_AVAILABILITY_E availability; // in absence of beacon heartbeat messages it becomes inavailable + bool last_gw_map_epoch_valid; // "true" if the last epoch seen by the gw-client is up-to-date + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + NvmeAnaNonceMap nonce_map; + NvmeAnaNonceMap copied_nonce_map; + SM_STATE sm_state; // state machine states per ANA group + NvmeGwId failover_peer[MAX_SUPPORTED_ANA_GROUPS]; + struct{ + epoch_t osd_epoch; + bool is_failover; + }blocklist_data[MAX_SUPPORTED_ANA_GROUPS]; + + NvmeGwCreated(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}; + + NvmeGwCreated(NvmeAnaGrpId id): ana_grp_id(id), availability(GW_AVAILABILITY_E::GW_CREATED), last_gw_map_epoch_valid(false) + { + for (int i = 0; i < MAX_SUPPORTED_ANA_GROUPS; i++){ + sm_state[i] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + failover_peer[i] = ""; + blocklist_data[i].osd_epoch = 0; + blocklist_data[i].is_failover = true; + } + }; + + void standby_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_STANDBY_STATE; + failover_peer[grpid] = ""; + }; + void active_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE; + blocklist_data[grpid].osd_epoch = 0; + }; +}; + +struct NqnState { + std::string nqn; // subsystem NQN + ANA_STATE ana_state; // subsystem's ANA state + + // constructors + NqnState(const std::string& _nqn, const ANA_STATE& _ana_state): + nqn(_nqn), ana_state(_ana_state) {} + NqnState(const std::string& _nqn, const SM_STATE& sm_state, const NvmeGwCreated & gw_created) : nqn(_nqn) { + for (int i=0; i < MAX_SUPPORTED_ANA_GROUPS; i++){ + std::pair state_pair; + state_pair.first = ( sm_state[i] == GW_STATES_PER_AGROUP_E::GW_ACTIVE_STATE + || sm_state[i] == GW_STATES_PER_AGROUP_E::GW_WAIT_BLOCKLIST_CMPL) + ? GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_OPTIMIZED_STATE + : GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_INACCESSIBLE_STATE; + state_pair.second = gw_created.blocklist_data[i].osd_epoch; + ana_state.push_back(state_pair); + } + } +}; + +typedef std::map GwSubsystems; + +struct NvmeGwState { + NvmeAnaGrpId group_id; + epoch_t gw_map_epoch; + GwSubsystems subsystems; + + NvmeGwState(NvmeAnaGrpId id, epoch_t epoch): + group_id(id), + gw_map_epoch(epoch) + {}; + + NvmeGwState() : NvmeGwState(REDUNDANT_GW_ANA_GROUP_ID, 0) {}; +}; + +struct NvmeGwMetaData { + struct{ + uint32_t timer_started; // statemachine timer(timestamp) set in some state + uint8_t timer_value; + std::chrono::system_clock::time_point end_time; + } data[MAX_SUPPORTED_ANA_GROUPS]; + + NvmeGwMetaData() { + for (int i=0; i; +using NvmeGwMetaDataMap = std::map; +using NvmeGwCreatedMap = std::map; + +#endif /* SRC_MON_NVMEOFGWTYPES_H_ */ diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 02aad5a0b87..8d774fc54d4 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -36,6 +36,7 @@ enum { PAXOS_HEALTH, PAXOS_CONFIG, PAXOS_KV, + PAXOS_NVMEGW, PAXOS_NUM }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 348546abdf0..024f167ae72 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -217,6 +217,9 @@ #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + #ifdef WITH_BLKIN #include "Messenger.h" #endif @@ -866,6 +869,10 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_BEACON: + m = make_message(); + break; + case MSG_MON_MGR_REPORT: m = make_message(); break; @@ -925,6 +932,9 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_MAP: + m = make_message(); + break; // -- simple messages without payload -- case CEPH_MSG_SHUTDOWN: diff --git a/src/msg/Message.h b/src/msg/Message.h index f27c5448ea2..f62e27ff4f3 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -231,7 +231,7 @@ #define MSG_MGR_MAP 0x704 // *** ceph-mon(MgrMonitor) -> ceph-mgr -#define MSG_MGR_DIGEST 0x705 +#define MSG_MGR_DIGEST 0x705 // *** cephmgr -> ceph-mon #define MSG_MON_MGR_REPORT 0x706 #define MSG_SERVICE_MAP 0x707 @@ -241,7 +241,13 @@ #define MSG_MGR_COMMAND_REPLY 0x70a // *** ceph-mgr <-> MON daemons *** -#define MSG_MGR_UPDATE 0x70b +#define MSG_MGR_UPDATE 0x70b + +// *** nvmeof mon -> gw daemons *** +#define MSG_MNVMEOF_GW_MAP 0x800 + +// *** gw daemons -> nvmeof mon *** +#define MSG_MNVMEOF_GW_BEACON 0x801 // ====================================================== diff --git a/src/nvmeof/NVMeofGwClient.cc b/src/nvmeof/NVMeofGwClient.cc new file mode 100644 index 00000000000..c82423de515 --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.cc @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwClient.h" + +bool NVMeofGwClient::get_subsystems(subsystems_info& reply) { + get_subsystems_req request; + ClientContext context; + + Status status = stub_->get_subsystems(&context, request, &reply); + + return status.ok(); +} + +bool NVMeofGwClient::set_ana_state(const ana_info& info) { + req_status reply; + ClientContext context; + + Status status = stub_->set_ana_state(&context, info, &reply); + + return status.ok() && reply.status(); +} diff --git a/src/nvmeof/NVMeofGwClient.h b/src/nvmeof/NVMeofGwClient.h new file mode 100644 index 00000000000..022485251d6 --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWCLIENT_H__ +#define __NVMEOFGWCLIENT_H__ +#include +#include +#include + +#include + +#include "gateway.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwClient { + public: + NVMeofGwClient(std::shared_ptr channel) + : stub_(Gateway::NewStub(channel)) {} + + bool get_subsystems(subsystems_info& reply); + bool set_ana_state(const ana_info& info); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/NVMeofGwMonitorClient.cc b/src/nvmeof/NVMeofGwMonitorClient.cc new file mode 100644 index 00000000000..28330e432ea --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.cc @@ -0,0 +1,384 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include + +#include "common/errno.h" +#include "common/signal.h" +#include "common/ceph_argparse.h" +#include "include/compat.h" + +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + + +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" +#include "NVMeofGwMonitorClient.h" +#include "NVMeofGwClient.h" +#include "NVMeofGwMonitorGroupClient.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) : + Dispatcher(g_ceph_context), + osdmap_epoch(0), + gwmap_epoch(0), + monc{g_ceph_context, poolctx}, + client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())), + objecter{g_ceph_context, client_messenger.get(), &monc, poolctx}, + client{client_messenger.get(), &monc, &objecter}, + timer(g_ceph_context, lock), + orig_argc(argc), + orig_argv(argv) +{ +} + +NVMeofGwMonitorClient::~NVMeofGwMonitorClient() = default; + +const char** NVMeofGwMonitorClient::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + NULL + }; + return KEYS; +} + +int NVMeofGwMonitorClient::init() +{ + dout(0) << dendl; + std::string val; + auto args = argv_to_vec(orig_argc, orig_argv); + + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-name", (char*)NULL)) { + name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-pool", (char*)NULL)) { + pool = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-group", (char*)NULL)) { + group = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-address", (char*)NULL)) { + gateway_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--monitor-group-address", (char*)NULL)) { + monitor_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-key", (char*)NULL)) { + server_key = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-cert", (char*)NULL)) { + server_cert = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-cert", (char*)NULL)) { + client_cert = val; + } else { + ++i; + } + } + + dout(0) << "gateway name: " << name << + " pool:" << pool << + " group:" << group << + " address: " << gateway_address << dendl; + ceph_assert(name != "" && pool != "" && gateway_address != "" && monitor_address != ""); + + // todo + ceph_assert(server_key == "" && server_cert == "" && client_cert == ""); + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + + std::lock_guard l(lock); + + // Initialize Messenger + client_messenger->add_dispatcher_tail(this); + client_messenger->add_dispatcher_head(&objecter); + client_messenger->add_dispatcher_tail(&client); + client_messenger->start(); + + poolctx.start(2); + + // Initialize MonClient + if (monc.build_initial_monmap() < 0) { + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc.sub_want("NVMeofGw", 0, 0); + monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc.set_messenger(client_messenger.get()); + + // We must register our config callback before calling init(), so + // that we see the initial configuration message + monc.register_config_callback([this](const std::string &k, const std::string &v){ + // leaving this for debugging purposes + dout(10) << "nvmeof config_callback: " << k << " : " << v << dendl; + + return false; + }); + monc.register_config_notify_callback([this]() { + dout(4) << "nvmeof monc config notify callback" << dendl; + }); + dout(4) << "nvmeof Registered monc callback" << dendl; + + int r = monc.init(); + if (r < 0) { + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(0) << "nvmeof Registered monc callback" << dendl; + + r = monc.authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify an ID with a valid keyring?" << dendl; + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(0) << "monc.authentication done" << dendl; + monc.set_passthrough_monmap(); + + client_t whoami = monc.get_global_id(); + client_messenger->set_myname(entity_name_t::MGR(whoami.v)); + objecter.set_client_incarnation(0); + objecter.init(); + objecter.enable_blocklist_events(); + objecter.start(); + client.init(); + timer.init(); + + tick(); + + dout(0) << "Complete." << dendl; + return 0; +} + +static bool get_gw_state(const char* desc, const std::map& m, const NvmeGroupKey& group_key, const NvmeGwId& gw_id, NvmeGwState& out) +{ + auto gw_group = m.find(group_key); + if (gw_group == m.end()) { + dout(0) << "can not find group (" << group_key.first << "," << group_key.second << ") " << desc << " map: " << m << dendl; + return false; + } + auto gw_state = gw_group->second.find(gw_id); + if (gw_state == gw_group->second.end()) { + dout(0) << "can not find gw id: " << gw_id << " in " << desc << "group: " << gw_group->second << dendl; + return false; + } + out = gw_state->second; + return true; +} + +void NVMeofGwMonitorClient::send_beacon() +{ + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + //dout(0) << "sending beacon as gid " << monc.get_global_id() << dendl; + GW_AVAILABILITY_E gw_availability = GW_AVAILABILITY_E::GW_CREATED; + BeaconSubsystems subs; + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, grpc::InsecureChannelCredentials())); + subsystems_info gw_subsystems; + bool ok = gw_client.get_subsystems(gw_subsystems); + if (ok) { + for (int i = 0; i < gw_subsystems.subsystems_size(); i++) { + const subsystem& sub = gw_subsystems.subsystems(i); + BeaconSubsystem bsub; + bsub.nqn = sub.nqn(); + for (int j = 0; j < sub.namespaces_size(); j++) { + const auto& ns = sub.namespaces(j); + BeaconNamespace bns = {ns.anagrpid(), ns.nonce()}; + bsub.namespaces.push_back(bns); + } + for (int k = 0; k < sub.listen_addresses_size(); k++) { + const auto& ls = sub.listen_addresses(k); + BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() }; + bsub.listeners.push_back(bls); + } + subs.push_back(bsub); + } + } + + auto group_key = std::make_pair(pool, group); + NvmeGwState old_gw_state; + // if already got gateway state in the map + if (get_gw_state("old map", map, group_key, name, old_gw_state)) + gw_availability = ok ? GW_AVAILABILITY_E::GW_AVAILABLE : GW_AVAILABILITY_E::GW_UNAVAILABLE; + dout(0) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << + " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl; + auto m = ceph::make_message( + name, + pool, + group, + subs, + gw_availability, + osdmap_epoch, + gwmap_epoch); + monc.send_mon_message(std::move(m)); +} + +void NVMeofGwMonitorClient::tick() +{ + dout(0) << dendl; + send_beacon(); + + timer.add_event_after( + g_conf().get_val("nvmeof_mon_client_tick_period").count(), + new LambdaContext([this](int r){ + tick(); + } + )); +} + +void NVMeofGwMonitorClient::shutdown() +{ + std::lock_guard l(lock); + + dout(4) << "nvmeof Shutting down" << dendl; + + + // stop sending beacon first, I use monc to talk with monitors + timer.shutdown(); + // client uses monc and objecter + client.shutdown(); + // Stop asio threads, so leftover events won't call into shut down + // monclient/objecter. + poolctx.finish(); + // stop monc + monc.shutdown(); + + // objecter is used by monc + objecter.shutdown(); + // client_messenger is used by all of them, so stop it in the end + client_messenger->shutdown(); +} + +void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) +{ + auto &new_map = nmap->get_map(); + gwmap_epoch = nmap->get_gwmap_epoch(); + auto group_key = std::make_pair(pool, group); + dout(0) << "handle nvmeof gw map: " << new_map << dendl; + + NvmeGwState old_gw_state; + auto got_old_gw_state = get_gw_state("old map", map, group_key, name, old_gw_state); + NvmeGwState new_gw_state; + auto got_new_gw_state = get_gw_state("new map", new_map, group_key, name, new_gw_state); + + if (!got_old_gw_state) { + if (!got_new_gw_state) { + dout(0) << "Can not find new gw state" << dendl; + return; + } + bool set_group_id = false; + while (!set_group_id) { + NVMeofGwMonitorGroupClient monitor_group_client( + grpc::CreateChannel(monitor_address, grpc::InsecureChannelCredentials())); + dout(0) << "GRPC set_group_id: " << new_gw_state.group_id << dendl; + set_group_id = monitor_group_client.set_group_id( new_gw_state.group_id); + if (!set_group_id) { + dout(0) << "GRPC set_group_id failed" << dendl; + usleep(1000); // TODO: conf options + } + } + } + + // Make sure we do not get out of order state changes from the monitor + if (got_old_gw_state && got_new_gw_state) { + dout(0) << "got_old_gw_state: " << old_gw_state << "got_new_gw_state: " << new_gw_state << dendl; + ceph_assert(new_gw_state.gw_map_epoch >= old_gw_state.gw_map_epoch); + } + + // Gather all state changes + ana_info ai; + epoch_t max_blocklist_epoch = 0; + for (const auto& nqn_state_pair: new_gw_state.subsystems) { + auto& sub = nqn_state_pair.second; + const auto& nqn = nqn_state_pair.first; + nqn_ana_states nas; + nas.set_nqn(nqn); + for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < sub.ana_state.size(); ana_grp_index++) { + const auto& old_nqn_state_pair = old_gw_state.subsystems.find(nqn); + auto found_old_nqn_state = (old_nqn_state_pair != old_gw_state.subsystems.end()); + auto new_group_state = sub.ana_state[ana_grp_index]; + + // if no state change detected for this nqn, group id + if (got_old_gw_state && found_old_nqn_state && + new_group_state == old_nqn_state_pair->second.ana_state[ana_grp_index]) { + continue; + } + ana_group_state gs; + gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 + const auto& new_agroup_state = new_group_state.first; + const epoch_t& blocklist_epoch = new_group_state.second; + + if (new_agroup_state == GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_OPTIMIZED_STATE && + blocklist_epoch != 0) { + if (blocklist_epoch > max_blocklist_epoch) max_blocklist_epoch = blocklist_epoch; + } + gs.set_state(new_agroup_state == GW_EXPORTED_STATES_PER_AGROUP_E::GW_EXPORTED_OPTIMIZED_STATE ? OPTIMIZED : INACCESSIBLE); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(0) << " grpid " << (ana_grp_index + 1) << " state: " << new_gw_state << dendl; + } + if (nas.states_size()) ai.mutable_states()->Add(std::move(nas)); + } + + // if there is state change, notify the gateway + if (ai.states_size()) { + bool set_ana_state = false; + while (!set_ana_state) { + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, grpc::InsecureChannelCredentials())); + set_ana_state = gw_client.set_ana_state(ai); + if (!set_ana_state) { + dout(0) << "GRPC set_ana_state failed" << dendl; + usleep(1000); // TODO conf option + } + } + // Update latest accepted osdmap epoch, for beacons + if (max_blocklist_epoch > osdmap_epoch) { + osdmap_epoch = max_blocklist_epoch; + dout(0) << "Ready for blocklist osd map epoch: " << osdmap_epoch << dendl; + } + } + map = new_map; +} + +bool NVMeofGwMonitorClient::ms_dispatch2(const ref_t& m) +{ + std::lock_guard l(lock); + dout(0) << "got map type " << m->get_type() << dendl; + + if (m->get_type() == MSG_MNVMEOF_GW_MAP) { + handle_nvmeof_gw_map(ref_cast(m)); + } + bool handled = false; + return handled; +} + +int NVMeofGwMonitorClient::main(std::vector args) +{ + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGHUP, sighup_handler); + shutdown_async_signal_handler(); + + return 0; +} diff --git a/src/nvmeof/NVMeofGwMonitorClient.h b/src/nvmeof/NVMeofGwMonitorClient.h new file mode 100644 index 00000000000..1c4c58f5d2f --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef NVMEOFGWMONITORCLIENT_H_ +#define NVMEOFGWMONITORCLIENT_H_ + +#include "auth/Auth.h" +#include "common/async/context_pool.h" +#include "common/Finisher.h" +#include "common/Timer.h" +#include "common/LogClient.h" + +#include "client/Client.h" +#include "mon/MonClient.h" +#include "osdc/Objecter.h" +#include "messages/MNVMeofGwMap.h" + +class NVMeofGwMonitorClient: public Dispatcher, + public md_config_obs_t { +private: + std::string name; + std::string pool; + std::string group; + std::string gateway_address; + std::string monitor_address; + std::string server_key; + std::string server_cert; + std::string client_cert; + epoch_t osdmap_epoch; // last awaited osdmap_epoch + epoch_t gwmap_epoch; // last received gw map epoch + +protected: + ceph::async::io_context_pool poolctx; + MonClient monc; + std::unique_ptr client_messenger; + Objecter objecter; + Client client; + std::map map; + ceph::mutex lock = ceph::make_mutex("NVMeofGw::lock"); + SafeTimer timer; + + int orig_argc; + const char **orig_argv; + + void send_config_beacon(); + void send_beacon(); + +public: + NVMeofGwMonitorClient(int argc, const char **argv); + ~NVMeofGwMonitorClient() override; + + // Dispatcher interface + bool ms_dispatch2(const ceph::ref_t& m) override; + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; }; + + // config observer bits + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override {}; + + int init(); + void shutdown(); + int main(std::vector args); + void tick(); + + void handle_nvmeof_gw_map(ceph::ref_t m); +}; + +#endif + diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.cc b/src/nvmeof/NVMeofGwMonitorGroupClient.cc new file mode 100644 index 00000000000..27ed7b13481 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwMonitorGroupClient.h" + +bool NVMeofGwMonitorGroupClient::set_group_id(const uint32_t& id) { + group_id_req request; + request.set_id(id); + google::protobuf::Empty reply; + ClientContext context; + + Status status = stub_->group_id(&context, request, &reply); + + return status.ok(); +} diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.h b/src/nvmeof/NVMeofGwMonitorGroupClient.h new file mode 100644 index 00000000000..805e182c15c --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWMONITORGROUPCLIENT_H__ +#define __NVMEOFGWMONITORGROUPCLIENT_H__ +#include +#include +#include + +#include + +#include "monitor.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwMonitorGroupClient { + public: + NVMeofGwMonitorGroupClient(std::shared_ptr channel) + : stub_(MonitorGroup::NewStub(channel)) {} + + bool set_group_id(const uint32_t& id); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/gateway/control/proto/gateway.proto b/src/nvmeof/gateway/control/proto/gateway.proto new file mode 100644 index 00000000000..3b44515eeb5 --- /dev/null +++ b/src/nvmeof/gateway/control/proto/gateway.proto @@ -0,0 +1,488 @@ +// +// Copyright (c) 2021 International Business Machines +// All rights reserved. +// +// SPDX-License-Identifier: MIT +// +// Authors: anita.shekar@ibm.com, sandy.kaur@ibm.com +// + + +syntax = "proto3"; + +enum AddressFamily { + ipv4 = 0; + ipv6 = 1; +} + +enum LogLevel { + ERROR = 0; + WARNING = 1; + NOTICE = 2; + INFO = 3; + DEBUG = 4; +} + +enum GwLogLevel { + notset = 0; + debug = 10; + info = 20; + warning = 30; + error = 40; + critical = 50; +} + +service Gateway { + // Creates a namespace from an RBD image + rpc namespace_add(namespace_add_req) returns (nsid_status) {} + + // Creates a subsystem + rpc create_subsystem(create_subsystem_req) returns(req_status) {} + + // Deletes a subsystem + rpc delete_subsystem(delete_subsystem_req) returns(req_status) {} + + // List namespaces + rpc list_namespaces(list_namespaces_req) returns(namespaces_info) {} + + // Resizes a namespace + rpc namespace_resize(namespace_resize_req) returns (req_status) {} + + // Gets namespace's IO stats + rpc namespace_get_io_stats(namespace_get_io_stats_req) returns (namespace_io_stats_info) {} + + // Sets namespace's qos limits + rpc namespace_set_qos_limits(namespace_set_qos_req) returns (req_status) {} + + // Changes namespace's load balancing group + rpc namespace_change_load_balancing_group(namespace_change_load_balancing_group_req) returns (req_status) {} + + // Deletes a namespace + rpc namespace_delete(namespace_delete_req) returns (req_status) {} + + // Adds a host to a subsystem + rpc add_host(add_host_req) returns (req_status) {} + + // Removes a host from a subsystem + rpc remove_host(remove_host_req) returns (req_status) {} + + // List hosts + rpc list_hosts(list_hosts_req) returns(hosts_info) {} + + // List connections + rpc list_connections(list_connections_req) returns(connections_info) {} + + // Creates a listener for a subsystem at a given IP/Port + rpc create_listener(create_listener_req) returns(req_status) {} + + // Deletes a listener from a subsystem at a given IP/Port + rpc delete_listener(delete_listener_req) returns(req_status) {} + + // List listeners + rpc list_listeners(list_listeners_req) returns(listeners_info) {} + + // List subsystems + rpc list_subsystems(list_subsystems_req) returns(subsystems_info_cli) {} + + // Gets subsystems + rpc get_subsystems(get_subsystems_req) returns(subsystems_info) {} + + // Set gateway ANA states + rpc set_ana_state(ana_info) returns(req_status) {} + + // Gets spdk nvmf log flags and level + rpc get_spdk_nvmf_log_flags_and_level(get_spdk_nvmf_log_flags_and_level_req) returns(spdk_nvmf_log_flags_and_level_info) {} + + // Disables spdk nvmf logs + rpc disable_spdk_nvmf_logs(disable_spdk_nvmf_logs_req) returns(req_status) {} + + // Set spdk nvmf logs + rpc set_spdk_nvmf_logs(set_spdk_nvmf_logs_req) returns(req_status) {} + + // Get gateway info + rpc get_gateway_info(get_gateway_info_req) returns(gateway_info) {} + + // Get gateway log level + rpc get_gateway_log_level(get_gateway_log_level_req) returns(gateway_log_level_info) {} + + // Set gateway log level + rpc set_gateway_log_level(set_gateway_log_level_req) returns(req_status) {} +} + +// Request messages + +message namespace_add_req { + string rbd_pool_name = 1; + string rbd_image_name = 2; + string subsystem_nqn = 3; + optional uint32 nsid = 4; + uint32 block_size = 5; + optional string uuid = 6; + optional int32 anagrpid = 7; + optional bool create_image = 8; + optional uint64 size = 9; + optional bool force = 10; +} + +message namespace_resize_req { + string subsystem_nqn = 1; + optional uint32 nsid = 2; + optional string uuid = 3; + uint64 new_size = 4; +} + +message namespace_get_io_stats_req { + string subsystem_nqn = 1; + optional uint32 nsid = 2; + optional string uuid = 3; +} + +message namespace_set_qos_req { + string subsystem_nqn = 1; + optional uint32 nsid = 2; + optional string uuid = 3; + optional uint64 rw_ios_per_second = 4; + optional uint64 rw_mbytes_per_second = 5; + optional uint64 r_mbytes_per_second = 6; + optional uint64 w_mbytes_per_second = 7; +} + +message namespace_change_load_balancing_group_req { + string subsystem_nqn = 1; + optional uint32 nsid = 2; + optional string uuid = 3; + int32 anagrpid = 4; +} + +message namespace_delete_req { + string subsystem_nqn = 1; + optional uint32 nsid = 2; + optional string uuid = 3; +} + +message create_subsystem_req { + string subsystem_nqn = 1; + string serial_number = 2; + optional uint32 max_namespaces = 3; + bool enable_ha = 4; +} + +message delete_subsystem_req { + string subsystem_nqn = 1; + optional bool force = 2; +} + +message list_namespaces_req { + string subsystem = 1; + optional uint32 nsid = 2; + optional string uuid = 3; +} + +message add_host_req { + string subsystem_nqn = 1; + string host_nqn = 2; +} + +message remove_host_req { + string subsystem_nqn = 1; + string host_nqn = 2; +} + +message list_hosts_req { + string subsystem = 1; +} + +message list_connections_req { + string subsystem = 1; +} + +message create_listener_req { + string nqn = 1; + string host_name = 2; + string traddr = 3; + optional AddressFamily adrfam = 5; + optional uint32 trsvcid = 6; +} + +message delete_listener_req { + string nqn = 1; + string host_name = 2; + string traddr = 3; + optional AddressFamily adrfam = 5; + optional uint32 trsvcid = 6; + optional bool force = 7; +} + +message list_listeners_req { + string subsystem = 1; +} + +message list_subsystems_req { + optional string subsystem_nqn = 1; + optional string serial_number = 2; +} + +message get_subsystems_req { +} + +message get_spdk_nvmf_log_flags_and_level_req { +} + +message disable_spdk_nvmf_logs_req { +} + +message set_spdk_nvmf_logs_req { + optional LogLevel log_level = 1; + optional LogLevel print_level = 2; +} + +message get_gateway_info_req { + optional string cli_version = 1; +} + +message get_gateway_log_level_req { +} + +message set_gateway_log_level_req { + GwLogLevel log_level = 1; +} + +// From https://nvmexpress.org/wp-content/uploads/NVM-Express-1_4-2019.06.10-Ratified.pdf page 138 +// Asymmetric Namespace Access state for all namespaces in this ANA +// Group when accessed through this controller. +// Value Description Reference +// 01h ANA Optimized state 8.20.3.1 +// 02h ANA Non-Optimized state 8.20.3.2 +// 03h ANA Inaccessible state 8.20.3.3 +// 04h ANA Persistent Loss state 8.20.3.4 +// 0Fh ANA Change state 8.20.3.5 +// All others Reserved +enum ana_state { + UNSET = 0; + OPTIMIZED = 1; + NON_OPTIMIZED = 2; + INACCESSIBLE = 3; +} + +message ana_group_state { + uint32 grp_id = 1; // groupd id + ana_state state = 2; // ANA state +} + +message nqn_ana_states { + string nqn = 1; // subsystem nqn + repeated ana_group_state states = 2; // list of group states +} + +message ana_info { + repeated nqn_ana_states states = 1; // list of nqn states +} + +// Return messages + +message req_status { + int32 status = 1; + string error_message = 2; +} + +message nsid_status { + int32 status = 1; + string error_message = 2; + uint32 nsid = 3; +} + +message subsystems_info { + repeated subsystem subsystems = 1; +} + +message subsystem { + string nqn = 1; + string subtype = 2; + repeated listen_address listen_addresses = 3; + repeated host hosts = 4; + bool allow_any_host = 5; + optional string serial_number = 6; + optional string model_number = 7; + optional uint32 max_namespaces = 8; + optional uint32 min_cntlid = 9; + optional uint32 max_cntlid = 10; + repeated namespace namespaces = 11; +} + +message listen_address { + string trtype = 1; + string adrfam = 2; + string traddr = 3; + string trsvcid = 4; + optional string transport = 5; +} + +message namespace { + uint32 nsid = 1; + string name = 2; + optional string bdev_name = 3; + optional string nguid = 4; + optional string uuid = 5; + optional uint32 anagrpid = 6; + optional string nonce = 7; +} + +message subsystems_info_cli { + int32 status = 1; + string error_message = 2; + repeated subsystem_cli subsystems = 3; +} + +message subsystem_cli { + string nqn = 1; + bool enable_ha = 2; + string serial_number = 3; + string model_number = 4; + uint32 min_cntlid = 5; + uint32 max_cntlid = 6; + uint32 namespace_count = 7; + string subtype = 8; + uint32 max_namespaces = 9; +} + +message gateway_info { + string cli_version = 1; + string version = 2; + string name = 3; + string group = 4; + string addr = 5; + string port = 6; + bool bool_status = 7; + int32 status = 8; + string error_message = 9; + optional string spdk_version = 10; + uint32 load_balancing_group = 11; + string hostname = 12; +} + +message cli_version { + int32 status = 1; + string error_message = 2; + string version = 3; +} + +message gw_version { + int32 status = 1; + string error_message = 2; + string version = 3; +} + +message listener_info { + string host_name = 1; + string trtype = 2; + AddressFamily adrfam = 3; + string traddr = 4; + uint32 trsvcid = 5; +} + +message listeners_info { + int32 status = 1; + string error_message = 2; + repeated listener_info listeners = 3; +} + +message host { + string nqn = 1; +} + +message hosts_info { + int32 status = 1; + string error_message = 2; + bool allow_any_host = 3; + string subsystem_nqn = 4; + repeated host hosts = 5; +} + +message connection { + string nqn = 1; + string traddr = 2; + uint32 trsvcid = 3; + string trtype = 4; + AddressFamily adrfam = 5; + bool connected = 6; + int32 qpairs_count = 7; + int32 controller_id = 8; +} + +message connections_info { + int32 status = 1; + string error_message = 2; + string subsystem_nqn = 3; + repeated connection connections = 4; +} + +message namespace_cli { + uint32 nsid = 1; + string bdev_name = 2; + string rbd_image_name = 3; + string rbd_pool_name = 4; + uint32 load_balancing_group = 5; + uint32 block_size = 6; + uint64 rbd_image_size = 7; + string uuid = 8; + uint64 rw_ios_per_second = 9; + uint64 rw_mbytes_per_second = 10; + uint64 r_mbytes_per_second = 11; + uint64 w_mbytes_per_second = 12; +} + +message namespaces_info { + int32 status = 1; + string error_message = 2; + string subsystem_nqn = 3; + repeated namespace_cli namespaces = 4; +} + +message namespace_io_stats_info { + int32 status = 1; + string error_message = 2; + string subsystem_nqn = 3; + uint32 nsid = 4; + string uuid = 5; + string bdev_name = 6; + uint64 tick_rate = 7; + uint64 ticks = 8; + uint64 bytes_read = 9; + uint64 num_read_ops = 10; + uint64 bytes_written = 11; + uint64 num_write_ops = 12; + uint64 bytes_unmapped = 13; + uint64 num_unmap_ops = 14; + uint64 read_latency_ticks = 15; + uint64 max_read_latency_ticks = 16; + uint64 min_read_latency_ticks = 17; + uint64 write_latency_ticks = 18; + uint64 max_write_latency_ticks = 19; + uint64 min_write_latency_ticks = 20; + uint64 unmap_latency_ticks = 21; + uint64 max_unmap_latency_ticks = 22; + uint64 min_unmap_latency_ticks = 23; + uint64 copy_latency_ticks = 24; + uint64 max_copy_latency_ticks = 25; + uint64 min_copy_latency_ticks = 26; + repeated uint32 io_error = 27; +} + +message spdk_log_flag_info { + string name = 1; + bool enabled = 2; +} + +message spdk_nvmf_log_flags_and_level_info { + int32 status = 1; + string error_message = 2; + repeated spdk_log_flag_info nvmf_log_flags = 3; + LogLevel log_level = 4; + LogLevel log_print_level = 5; +} + +message gateway_log_level_info { + int32 status = 1; + string error_message = 2; + GwLogLevel log_level = 3; +} diff --git a/src/nvmeof/gateway/control/proto/monitor.proto b/src/nvmeof/gateway/control/proto/monitor.proto new file mode 100644 index 00000000000..0882141cba7 --- /dev/null +++ b/src/nvmeof/gateway/control/proto/monitor.proto @@ -0,0 +1,19 @@ +// +// Copyright (c) 2023 International Business Machines +// All rights reserved. +// +// SPDX-License-Identifier: MIT +// + +syntax = "proto3"; +import "google/protobuf/empty.proto"; + +service MonitorGroup { + // Called by the monitor client to set the gateway's group id + rpc group_id(group_id_req) returns (google.protobuf.Empty) {} +} + +// Request messages +message group_id_req { + uint32 id = 1; +} diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py index af54240b842..454f5b8c11a 100644 --- a/src/pybind/mgr/cephadm/services/nvmeof.py +++ b/src/pybind/mgr/cephadm/services/nvmeof.py @@ -20,6 +20,9 @@ class NvmeofService(CephService): def config(self, spec: NvmeofServiceSpec) -> None: # type: ignore assert self.TYPE == spec.service_type assert spec.pool + self.pool = spec.pool + assert spec.group is not None + self.group = spec.group self.mgr._check_pool_exists(spec.pool, spec.service_name()) def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: @@ -53,6 +56,19 @@ class NvmeofService(CephService): daemon_spec.extra_files = {'ceph-nvmeof.conf': gw_conf} daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) daemon_spec.deps = [] + # Notify monitor about this gateway creation + cmd = { + 'prefix': 'nvme-gw create', + 'id': name, + 'group': spec.group, + 'pool': spec.pool + } + _, _, err = self.mgr.mon_command(cmd) + # if send command failed, raise assertion exception, failing the daemon creation + assert not err, f"Unable to send monitor command {cmd}, error {err}" + if not hasattr(self, 'gws'): + self.gws = {} # id -> name map of gateways for this service. + self.gws[nvmeof_gw_id] = name # add to map of service's gateway names return daemon_spec def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: @@ -86,8 +102,31 @@ class NvmeofService(CephService): logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') # to clean the keyring up super().post_remove(daemon, is_failed_deploy=is_failed_deploy) - # TODO: remove config for dashboard nvmeof gateways if any - # and any certificates being used for mTLS + + # remove config for dashboard nvmeof gateways if any + ret, out, err = self.mgr.mon_command({ + 'prefix': 'dashboard nvmeof-gateway-rm', + 'name': daemon.hostname, + }) + if not ret: + logger.info(f'{daemon.hostname} removed from nvmeof gateways dashboard config') + + # Assert configured + assert self.pool + assert self.group is not None + assert daemon.daemon_id in self.gws + name = self.gws[daemon.daemon_id] + self.gws.pop(daemon.daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") def purge(self, service_name: str) -> None: """Removes configuration diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 6cf481feacf..45e858fe20f 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1269,7 +1269,7 @@ class NvmeofServiceSpec(ServiceSpec): #: ``name`` name of the nvmeof gateway self.name = name #: ``group`` name of the nvmeof gateway - self.group = group + self.group = group or '' #: ``enable_auth`` enables user authentication on nvmeof gateway self.enable_auth = enable_auth #: ``server_key`` gateway server key diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 841b7ffa6ca..53f34ef2ca6 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -981,3 +981,11 @@ add_ceph_unittest(unittest_weighted_shuffle) add_executable(unittest_intarith test_intarith.cc) add_ceph_unittest(unittest_intarith) #make check ends here + +# test_nvmeof_mon_encoding +add_executable(test_nvmeof_mon_encoding + test_nvmeof_mon_encoding.cc + ) +target_link_libraries(test_nvmeof_mon_encoding + mon ceph-common global-static + ) diff --git a/src/test/test_nvmeof_mon_encoding.cc b/src/test/test_nvmeof_mon_encoding.cc new file mode 100644 index 00000000000..9d84b58397b --- /dev/null +++ b/src/test/test_nvmeof_mon_encoding.cc @@ -0,0 +1,197 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "include/ceph_assert.h" +#include "global/global_init.h" +#include "mon/NVMeofGwMon.h" +#include "messages/MNVMeofGwMap.h" +#include "messages/MNVMeofGwBeacon.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout + +using namespace std; + +void test_NVMeofGwMap() { + dout(0) << __func__ << "\n\n" << dendl; + + NVMeofGwMap pending_map; + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.Created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + pending_map.Created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + pending_map.Created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + pending_map.Created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << pending_map << dendl; + + ceph::buffer::list bl; + pending_map.encode(bl); + auto p = bl.cbegin(); + pending_map.decode(p); + dout(0) << "Dump map after decode encode:" < map; + + std::string pool = "pool1"; + std::string group = "grp1"; + std::string gw_id = "GW1"; + NvmeGwState state(1, 32); + std::string nqn = "nqn"; + ANA_STATE ana_state; + NqnState nqn_state(nqn, ana_state); + state.subsystems.insert({nqn, nqn_state}); + + auto group_key = std::make_pair(pool, group); + map[group_key][gw_id] = state; + + + + ceph::buffer::list bl; + encode(map, bl); + dout(0) << "encode: " << map << dendl; + decode(map, bl); + dout(0) << "decode: " << map << dendl; + + BeaconSubsystem sub = { nqn, {}, {} }; + NVMeofGwMap pending_map; + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.Created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.Created_gws[group_key]["GW1"].copied_nonce_map[1] = new_nonces; + pending_map.Created_gws[group_key]["GW1"].subsystems.push_back(sub); + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + pending_map.Created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + pending_map.Created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + pending_map.Created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << "False pending map: " << pending_map << dendl; + + auto msg = make_message(pending_map); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + + dout(0) << "\n == Test GW Delete ==" << dendl; + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "deleted GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "duplicated delete of GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW2" ,group_key); + dout(0) << "deleted GW2 " << pending_map << dendl; + + dout(0) << "delete of wrong gw id" << dendl; + pending_map.cfg_delete_gw("wow" ,group_key); + + pending_map.cfg_delete_gw("GW3" ,group_key); + dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl; + + +} + +void test_MNVMeofGwBeacon() { + std::string gw_id = "GW"; + std::string gw_pool = "pool"; + std::string gw_group = "group"; + GW_AVAILABILITY_E availability = GW_AVAILABILITY_E::GW_AVAILABLE; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystems subs = { sub }; + epoch_t osd_epoch = 17; + epoch_t gwmap_epoch = 42; + + auto msg = make_message( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + ceph_assert(msg->get_gw_id() == gw_id); + ceph_assert(msg->get_gw_pool() == gw_pool); + ceph_assert(msg->get_gw_group() == gw_group); + ceph_assert(msg->get_availability() == availability); + ceph_assert(msg->get_last_osd_epoch() == osd_epoch); + ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch); + const auto& dsubs = msg->get_subsystems(); + auto it = std::find_if(dsubs.begin(), dsubs.end(), + [&nqn](const auto& element) { + return element.nqn == nqn; + }); + ceph_assert(it != dsubs.end()); +} + +void test_NVMeofGwTimers() +{ + NVMeofGwMap pending_map; + //pending_map.Gmetadata; + const NvmeGroupKey group_key = std::make_pair("a","b"); + std::string gwid = "GW1"; + NvmeAnaGrpId grpid = 2; + pending_map.start_timer(gwid, group_key, grpid, 30); + auto end_time = pending_map.Gmetadata[group_key][gwid].data[grpid].end_time; + uint64_t millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.Gmetadata[group_key][gwid].data[grpid].timer_value << dendl; + ceph::buffer::list bl; + pending_map.encode(bl); + auto p = bl.cbegin(); + pending_map.decode(p); + + end_time = pending_map.Gmetadata[group_key][gwid].data[2].end_time; + millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "After encode decode Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.Gmetadata[group_key][gwid].data[grpid].timer_value<