From: Leonid Chernin Date: Tue, 17 Oct 2023 13:25:07 +0000 (+0000) Subject: mon: add NVMe-oF gateway monitor and HA X-Git-Tag: v20.0.0~1370^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=5843c6b04bacf9a7c981fca5d874ab3400f855db;p=ceph.git mon: add NVMe-oF gateway monitor and HA - gateway submodule Fixes: https://tracker.ceph.com/issues/64777 This PR adds high availability support for the nvmeof Ceph service. High availability means that even in the case that a certain GW is down, there will be another available path for the initiator to be able to continue the IO through another GW. High availability is achieved by running nvmeof service consisting of at least 2 nvmeof GWs in the Ceph cluster. Every GW will be seen by the host (initiator) as a separate path to the nvme namespaces (volumes). The implementation consists of the following main modules: - NVMeofGWMon - a PaxosService. It is a monitor that tracks the status of the nvmeof running services, and take actions in case that services fail, and in case services restored. - NVMeofGwMonitorClient – It is an agent that is running as a part of each nvmeof GW. It is sending beacons to the monitor to signal that the GW is alive. As a part of the beacon, the client also sends information about the service. This information is used by the monitor to take decisions and perform some operations. - MNVMeofGwBeacon – It is a structure used by the client and the monitor to send/recv the beacons. - MNVMeofGwMap – The map is tracking the nvmeof GWs status. It also defines what should be the new role of every GW. So in the events of GWs go down or GWs restored, the map will reflect the new role of each GW resulted by these events. The map is distributed to the NVMeofGwMonitorClient on each GW, and it knows to update the GW with the required changes. It is also adding 3 new mon commands: - nvme-gw create - nvme-gw delete - nvme-gw show The commands are used by the ceph adm to update the monitor that a new GW is deployed. The monitor will update the map accordingly and will start tracking this GW until it is deleted. Signed-off-by: Leonid Chernin Signed-off-by: Alexander Indenbaum --- diff --git a/.gitmodules b/.gitmodules index c4f68c6b2fa43..a44fccbedcfeb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -78,4 +78,11 @@ [submodule "src/BLAKE3"] path = src/BLAKE3 url = https://github.com/BLAKE3-team/BLAKE3.git - +[submodule "src/boost_redis"] + path = src/boost_redis + url = https://github.com/boostorg/redis.git +[submodule "src/nvmeof/gateway"] + path = src/nvmeof/gateway + url = https://github.com/ceph/ceph-nvmeof.git + fetchRecurseSubmodules = false + shallow = true diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 25fcbb70db08b..391b8e69cfbbb 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -506,3 +506,11 @@ Relevant tracker: https://tracker.ceph.com/issues/57090 set using the `fs set` command. This flag prevents using a standby for another file system (join_fs = X) when standby for the current filesystem is not available. Relevant tracker: https://tracker.ceph.com/issues/61599 +* mon: add NVMe-oF gateway monitor and HA + This PR adds high availability support for the nvmeof Ceph service. High availability +means that even in the case that a certain GW is down, there will be another available +path for the initiator to be able to continue the IO through another GW. +It is also adding 2 new mon commands, to notify monitor about the gateway creation/deletion: + - nvme-gw create + - nvme-gw delete +Relevant tracker: https://tracker.ceph.com/issues/64777 diff --git a/ceph.spec.in b/ceph.spec.in index fae1e390ebab7..686b9388c9427 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -250,6 +250,7 @@ BuildRequires: gperf BuildRequires: cmake > 3.5 BuildRequires: fuse-devel BuildRequires: git +BuildRequires: grpc-devel %if 0%{?fedora} || 0%{?suse_version} > 1500 || 0%{?rhel} == 9 || 0%{?openEuler} BuildRequires: gcc-c++ >= 11 %endif @@ -642,6 +643,17 @@ system. One or more instances of ceph-mon form a Paxos part-time parliament cluster that provides extremely reliable and durable storage of cluster membership, configuration, and state. +%package mon-client-nvmeof +Summary: Ceph NVMeoF Gateway Monitor Client +%if 0%{?suse_version} +Group: System/Filesystems +%endif +Provides: ceph-test:/usr/bin/ceph-nvmeof-monitor-client +Requires: librados2 = %{_epoch_prefix}%{version}-%{release} +%description mon-client-nvmeof +Ceph NVMeoF Gateway Monitor Client distributes Paxos ANA info +to NVMeoF Gateway and provides beacons to the monitor daemon + %package mgr Summary: Ceph Manager Daemon %if 0%{?suse_version} @@ -2077,6 +2089,9 @@ if [ $1 -ge 1 ] ; then fi fi +%files mon-client-nvmeof +%{_bindir}/ceph-nvmeof-monitor-client + %files fuse %{_bindir}/ceph-fuse %{_mandir}/man8/ceph-fuse.8* diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 79b45ef171f97..591ea5f357e1e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -305,6 +305,12 @@ endif(WITH_BLKIN) if(WITH_JAEGER) find_package(thrift 0.13.0 REQUIRED) + + if(EXISTS "/etc/redhat-release" OR EXISTS "/etc/fedora-release") + # absl is installed as grpc build dependency on RPM based systems + add_definitions(-DHAVE_ABSEIL) + endif() + include(BuildOpentelemetry) build_opentelemetry() add_library(jaeger_base INTERFACE) @@ -875,6 +881,112 @@ if(WITH_FUSE) install(PROGRAMS mount.fuse.ceph DESTINATION ${CMAKE_INSTALL_SBINDIR}) endif(WITH_FUSE) +# NVMEOF GATEWAY MONITOR CLIENT +# Supported on RPM-based platforms only, depends on grpc devel libraries/tools +if(EXISTS "/etc/redhat-release" OR EXISTS "/etc/fedora-release") + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" ON) +else() + option(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT "build nvmeof gateway monitor client" OFF) +endif() + +if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT) + + # Find Protobuf installation + # Looks for protobuf-config.cmake file installed by Protobuf's cmake installation. + option(protobuf_MODULE_COMPATIBLE TRUE) + find_package(Protobuf REQUIRED) + + set(_REFLECTION grpc++_reflection) + if(CMAKE_CROSSCOMPILING) + find_program(_PROTOBUF_PROTOC protoc) + else() + set(_PROTOBUF_PROTOC $) + endif() + + # Find gRPC installation + # Looks for gRPCConfig.cmake file installed by gRPC's cmake installation. + find_package(gRPC CONFIG REQUIRED) + message(STATUS "Using gRPC ${gRPC_VERSION}") + set(_GRPC_GRPCPP gRPC::grpc++) + if(CMAKE_CROSSCOMPILING) + find_program(_GRPC_CPP_PLUGIN_EXECUTABLE grpc_cpp_plugin) + else() + set(_GRPC_CPP_PLUGIN_EXECUTABLE $) + endif() + + # Gateway Proto file + get_filename_component(nvmeof_gateway_proto "nvmeof/gateway/control/proto/gateway.proto" ABSOLUTE) + get_filename_component(nvmeof_gateway_proto_path "${nvmeof_gateway_proto}" PATH) + + # Generated sources + set(nvmeof_gateway_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.cc") + set(nvmeof_gateway_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.pb.h") + set(nvmeof_gateway_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.cc") + set(nvmeof_gateway_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/gateway.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_gateway_proto_srcs}" "${nvmeof_gateway_proto_hdrs}" "${nvmeof_gateway_grpc_srcs}" "${nvmeof_gateway_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_gateway_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_gateway_proto}" + DEPENDS "${nvmeof_gateway_proto}") + + + # Monitor Proto file + get_filename_component(nvmeof_monitor_proto "nvmeof/gateway/control/proto/monitor.proto" ABSOLUTE) + get_filename_component(nvmeof_monitor_proto_path "${nvmeof_monitor_proto}" PATH) + + # Generated sources + set(nvmeof_monitor_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.cc") + set(nvmeof_monitor_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.pb.h") + set(nvmeof_monitor_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.cc") + set(nvmeof_monitor_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/monitor.grpc.pb.h") + + add_custom_command( + OUTPUT "${nvmeof_monitor_proto_srcs}" "${nvmeof_monitor_proto_hdrs}" "${nvmeof_monitor_grpc_srcs}" "${nvmeof_monitor_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${nvmeof_monitor_proto_path}" + --experimental_allow_proto3_optional + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${nvmeof_monitor_proto}" + DEPENDS "${nvmeof_monitor_proto}") + + # Include generated *.pb.h files + include_directories("${CMAKE_CURRENT_BINARY_DIR}") + + set(ceph_nvmeof_monitor_client_srcs + ${nvmeof_gateway_proto_srcs} + ${nvmeof_gateway_proto_hdrs} + ${nvmeof_gateway_grpc_srcs} + ${nvmeof_gateway_grpc_hdrs} + ${nvmeof_monitor_proto_srcs} + ${nvmeof_monitor_proto_hdrs} + ${nvmeof_monitor_grpc_srcs} + ${nvmeof_monitor_grpc_hdrs} + ceph_nvmeof_monitor_client.cc + nvmeof/NVMeofGwClient.cc + nvmeof/NVMeofGwMonitorGroupClient.cc + nvmeof/NVMeofGwMonitorClient.cc) + add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs}) + add_dependencies(ceph-nvmeof-monitor-client ceph-common) + target_link_libraries(ceph-nvmeof-monitor-client + client + mon + global-static + ceph-common + ${_REFLECTION} + ${_GRPC_GRPCPP} + ) + install(TARGETS ceph-nvmeof-monitor-client DESTINATION bin) +endif() +# END OF NVMEOF GATEWAY MONITOR CLIENT + if(WITH_DOKAN) add_subdirectory(dokan) endif(WITH_DOKAN) diff --git a/src/ceph_nvmeof_monitor_client.cc b/src/ceph_nvmeof_monitor_client.cc new file mode 100644 index 0000000000000..05457998cb8ba --- /dev/null +++ b/src/ceph_nvmeof_monitor_client.cc @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM Inc + * + * Author: Alexander Indenbaum + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "include/types.h" +#include "include/compat.h" +#include "common/config.h" +#include "common/ceph_argparse.h" +#include "common/errno.h" +#include "common/pick_address.h" +#include "global/global_init.h" + +#include "nvmeof/NVMeofGwMonitorClient.h" + +static void usage() +{ + std::cout << "usage: ceph-nvmeof-monitor-client\n" + " --gateway-name \n" + " --gateway-address \n" + " --gateway-pool \n" + " --gateway-group \n" + " --monitor-group-address \n" + " [flags]\n" + << std::endl; + generic_server_usage(); +} + +/** + * A short main() which just instantiates a Nvme and + * hands over control to that. + */ +int main(int argc, const char **argv) +{ + ceph_pthread_setname(pthread_self(), "ceph-nvmeof-monitor-client"); + + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + usage(); + exit(0); + } + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, + CODE_ENVIRONMENT_UTILITY, // maybe later use CODE_ENVIRONMENT_DAEMON, + CINIT_FLAG_NO_DEFAULT_CONFIG_FILE); + + pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC); + + global_init_daemonize(g_ceph_context); + global_init_chdir(g_ceph_context); + common_init_finish(g_ceph_context); + + NVMeofGwMonitorClient gw_monitor_client(argc, argv); + int rc = gw_monitor_client.init(); + if (rc != 0) { + std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; + return rc; + } + + return gw_monitor_client.main(args); +} + diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 1b355d6e03ada..b34e3c5a337ba 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -1755,6 +1755,13 @@ options: default: 500 services: - mon +- name: mon_max_nvmeof_epochs + type: int + level: advanced + desc: max number of nvmeof gateway maps to store + default: 500 + services: + - mon - name: mon_max_osd type: int level: advanced diff --git a/src/common/options/mon.yaml.in b/src/common/options/mon.yaml.in index 1ec9871b6a8ea..ab1634bc154bf 100644 --- a/src/common/options/mon.yaml.in +++ b/src/common/options/mon.yaml.in @@ -72,6 +72,25 @@ options: default: 30 services: - mon +- name: mon_nvmeofgw_beacon_grace + type: secs + level: advanced + desc: Period in seconds from last beacon to monitor marking a NVMeoF gateway as + failed + default: 10 + services: + - mon +- name: mon_nvmeofgw_set_group_id_retry + type: uint + level: advanced + desc: Retry wait time in microsecond for set group id between the monitor client + and gateway + long_desc: The monitor server determines the gateway's group ID. If the monitor client + receives a monitor group ID assignment before the gateway is fully up during + initialization, a retry is required. + default: 1000 + services: + - mon - name: mon_mgr_inactive_grace type: int level: advanced @@ -1341,3 +1360,18 @@ options: with_legacy: true see_also: - osd_heartbeat_use_min_delay_socket +- name: nvmeof_mon_client_disconnect_panic + type: secs + level: advanced + desc: The duration, expressed in seconds, after which the nvmeof gateway + should trigger a panic if it loses connection to the monitor + default: 100 + services: + - mon +- name: nvmeof_mon_client_tick_period + type: secs + level: advanced + desc: Period in seconds of nvmeof gateway beacon messages to monitor + default: 2 + services: + - mon diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h new file mode 100644 index 0000000000000..26fc8dcf3ac17 --- /dev/null +++ b/src/messages/MNVMeofGwBeacon.h @@ -0,0 +1,122 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOFGWBEACON_H +#define CEPH_NVMEOFGWBEACON_H + +#include +#include +#include "messages/PaxosServiceMessage.h" +#include "mon/MonCommand.h" +#include "mon/NVMeofGwMap.h" +#include "include/types.h" + +class MNVMeofGwBeacon final : public PaxosServiceMessage { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +protected: + std::string gw_id; + std::string gw_pool; + std::string gw_group; + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable + epoch_t last_osd_epoch; + epoch_t last_gwmap_epoch; + +public: + MNVMeofGwBeacon() + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION} + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + MNVMeofGwBeacon(const std::string &gw_id_, + const std::string& gw_pool_, + const std::string& gw_group_, + const BeaconSubsystems& subsystems_, + const gw_availability_t& availability_, + const epoch_t& last_osd_epoch_, + const epoch_t& last_gwmap_epoch_ + ) + : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}, + gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_), + availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_) + { + set_priority(CEPH_MSG_PRIO_HIGH); + } + + const std::string& get_gw_id() const { return gw_id; } + const std::string& get_gw_pool() const { return gw_pool; } + const std::string& get_gw_group() const { return gw_group; } + NvmeAnaNonceMap get_nonce_map() const { + NvmeAnaNonceMap nonce_map; + for (const auto& sub: subsystems) { + for (const auto& ns: sub.namespaces) { + auto& nonce_vec = nonce_map[ns.anagrpid-1];//Converting ana groups to offsets + if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end()) + nonce_vec.push_back(ns.nonce); + } + } + return nonce_map; + } + + const gw_availability_t& get_availability() const { return availability; } + const epoch_t& get_last_osd_epoch() const { return last_osd_epoch; } + const epoch_t& get_last_gwmap_epoch() const { return last_gwmap_epoch; } + const BeaconSubsystems& get_subsystems() const { return subsystems; }; + +private: + ~MNVMeofGwBeacon() final {} + +public: + + std::string_view get_type_name() const override { return "nvmeofgwbeacon"; } + + void encode_payload(uint64_t features) override { + using ceph::encode; + paxos_encode(); + encode(gw_id, payload); + encode(gw_pool, payload); + encode(gw_group, payload); + encode(subsystems, payload); + encode((uint32_t)availability, payload); + encode(last_osd_epoch, payload); + encode(last_gwmap_epoch, payload); + } + + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + + paxos_decode(p); + decode(gw_id, p); + decode(gw_pool, p); + decode(gw_group, p); + decode(subsystems, p); + uint32_t tmp; + decode(tmp, p); + availability = static_cast(tmp); + decode(last_osd_epoch, p); + decode(last_gwmap_epoch, p); + } + +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; + + +#endif diff --git a/src/messages/MNVMeofGwMap.h b/src/messages/MNVMeofGwMap.h new file mode 100644 index 0000000000000..3affdd250dc08 --- /dev/null +++ b/src/messages/MNVMeofGwMap.h @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MNVMEOFGWMAP_H +#define CEPH_MNVMEOFGWMAP_H + +#include "msg/Message.h" +#include "mon/NVMeofGwMap.h" + +class MNVMeofGwMap final : public Message { +private: + static constexpr int VERSION = 1; + +protected: + std::map map; + epoch_t gwmap_epoch; + +public: + const std::map& get_map() {return map;} + const epoch_t& get_gwmap_epoch() {return gwmap_epoch;} + +private: + MNVMeofGwMap() : + Message{MSG_MNVMEOF_GW_MAP} {} + MNVMeofGwMap(const NVMeofGwMap &map_) : + Message{MSG_MNVMEOF_GW_MAP}, gwmap_epoch(map_.epoch) + { + map_.to_gmap(map); + } + ~MNVMeofGwMap() final {} + +public: + std::string_view get_type_name() const override { return "nvmeofgwmap"; } + + void decode_payload() override { + auto p = payload.cbegin(); + int version; + decode(version, p); + if (version > VERSION) + throw ::ceph::buffer::malformed_input(DECODE_ERR_OLDVERSION(__PRETTY_FUNCTION__, VERSION, version)); + decode(gwmap_epoch, p); + decode(map, p); + } + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(VERSION, payload); + encode(gwmap_epoch, payload); + encode(map, payload); + } +private: + using RefCountedObject::put; + using RefCountedObject::get; + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); + template + friend MURef crimson::make_message(Args&&... args); +}; + +#endif diff --git a/src/mon/CMakeLists.txt b/src/mon/CMakeLists.txt index 4019f854c9913..c5bf64f8c153a 100644 --- a/src/mon/CMakeLists.txt +++ b/src/mon/CMakeLists.txt @@ -21,6 +21,8 @@ set(lib_mon_srcs ConnectionTracker.cc HealthMonitor.cc KVMonitor.cc + NVMeofGwMon.cc + NVMeofGwMap.cc ../mds/MDSAuthCaps.cc ../mgr/mgr_commands.cc ../osd/OSDCap.cc diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index e9025b05ef772..438cbcfd6d580 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1378,8 +1378,25 @@ COMMAND("config generate-minimal-conf", "Generate a minimal ceph.conf file", "config", "r") +/* NVMeofGwMon*/ +COMMAND("nvme-gw create" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "create nvmeof gateway id for (pool, group)", + "mgr", "rw") +COMMAND("nvme-gw delete" + " name=id,type=CephString" + " name=pool,type=CephString" + " name=group,type=CephString", + "delete nvmeof gateway id for (pool, group)", + "mgr", "rw") - +COMMAND("nvme-gw show" + " name=pool,type=CephString" + " name=group,type=CephString", + " show nvmeof gateways within (pool, group)", + "mon", "r") // these are tell commands that were implemented as CLI commands in // the broken pre-octopus way that we want to allow to work when a diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index a70bfbe33c9de..07e6bebab4971 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -84,6 +84,7 @@ #include "MgrStatMonitor.h" #include "ConfigMonitor.h" #include "KVMonitor.h" +#include "NVMeofGwMon.h" #include "mon/HealthMonitor.h" #include "common/config.h" #include "common/cmdparse.h" @@ -247,6 +248,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health")); paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config")); paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv")); + paxos_service[PAXOS_NVMEGW].reset(new NVMeofGwMon(*this, *paxos, "nvmeofgw")); bool r = mon_caps.parse("allow *", NULL); ceph_assert(r); @@ -3617,7 +3619,10 @@ void Monitor::handle_command(MonOpRequestRef op) mgrmon()->dispatch(op); return; } - + if (module == "nvme-gw"){ + nvmegwmon()->dispatch(op); + return; + } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); @@ -4551,6 +4556,7 @@ void Monitor::_ms_dispatch(Message *m) void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); + MonSession *s = op->get_session(); ceph_assert(s); if (s->closed) { @@ -4664,6 +4670,11 @@ void Monitor::dispatch_op(MonOpRequestRef op) paxos_service[PAXOS_MGR]->dispatch(op); return; + case MSG_MNVMEOF_GW_BEACON: + paxos_service[PAXOS_NVMEGW]->dispatch(op); + return; + + // MgrStat case MSG_MON_MGR_REPORT: case CEPH_MSG_STATFS: @@ -5351,6 +5362,9 @@ void Monitor::handle_subscribe(MonOpRequestRef op) } else if (p->first.find("kv:") == 0) { kvmon()->check_sub(s->sub_map[p->first]); } + else if (p->first == "NVMeofGw") { + nvmegwmon()->check_sub(s->sub_map[p->first]); + } } if (reply) { diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 13afacafde7dd..0f8481eea6dc9 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -712,6 +712,11 @@ public: return (class KVMonitor*) paxos_service[PAXOS_KV].get(); } + class NVMeofGwMon *nvmegwmon() { + return (class NVMeofGwMon*) paxos_service[PAXOS_NVMEGW].get(); + } + + friend class Paxos; friend class OSDMonitor; friend class MDSMonitor; diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc new file mode 100755 index 0000000000000..9af9f81b7f3ed --- /dev/null +++ b/src/mon/NVMeofGwMap.cc @@ -0,0 +1,659 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "NVMeofGwMap.h" +#include "OSDMonitor.h" + +using std::map; +using std::make_pair; +using std::ostream; +using std::ostringstream; +using std::string; + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +void NVMeofGwMap::to_gmap(std::map& Gmap) const { + Gmap.clear(); + for (const auto& created_map_pair: created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwMonStates& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + const auto& gw_created = gw_created_pair.second; + + auto gw_state = NvmeGwClientState(gw_created.ana_grp_id, epoch, gw_created.availability); + for (const auto& sub: gw_created.subsystems) { + gw_state.subsystems.insert({sub.nqn, NqnState(sub.nqn, gw_created.sm_state, gw_created )}); + } + Gmap[group_key][gw_id] = gw_state; + dout (20) << gw_id << " Gw-Client: " << gw_state << dendl; + } + } +} + +void NVMeofGwMap::add_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid) +{ + Tmdata tm_data; + Blocklist_data blklist_data; + created_gws[group_key][gw_id].sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE; + fsm_timers[group_key][gw_id].data[grpid] = tm_data; + created_gws[group_key][gw_id].blocklist_data[grpid] = blklist_data; +} + +void NVMeofGwMap::remove_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid) +{ + created_gws[group_key][gw_id].sm_state.erase(grpid); + created_gws[group_key][gw_id].blocklist_data.erase(grpid); + fsm_timers[group_key][gw_id].data.erase(grpid); +} + +int NVMeofGwMap::cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + std::set allocated; + for (auto& itr: created_gws[group_key]) { + allocated.insert(itr.second.ana_grp_id); + if (itr.first == gw_id) { + dout(1) << __func__ << " ERROR create GW: already exists in map " << gw_id << dendl; + return -EEXIST ; + } + } + // Allocate the new group id + NvmeAnaGrpId i = 0; + bool was_allocated = false; + for (NvmeAnaGrpId elem: allocated) {// "allocated" is a sorted set (!),so if found any gap between numbers, it should be filled + if (i != elem) { + allocated.insert(i); + was_allocated = true; + break; + } + i++; + } + if (!was_allocated) allocated.insert(i); + dout(10) << "allocated ANA groupId " << i << " for GW " << gw_id << dendl; + for (auto& itr: created_gws[group_key]) { // add new allocated grp_id to maps of created gateways + add_grp_id(itr.first, group_key, i); + } + NvmeGwMonState gw_created(i); + created_gws[group_key][gw_id] = gw_created; + created_gws[group_key][gw_id].performed_full_startup = true; + for (NvmeAnaGrpId elem: allocated) { + add_grp_id(gw_id, group_key, elem); // add all existed grp_ids to newly created gateway + dout(4) << "adding group " << elem << " to gw " << gw_id << dendl; + } + dout(10) << __func__ << " Created GWS: " << created_gws << dendl; + return 0; +} + +int NVMeofGwMap::cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key) { + int rc = 0; + for (auto& gws_states: created_gws[group_key]) { + + if (gws_states.first == gw_id) { + auto& state = gws_states.second; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + bool modified; + fsm_handle_gw_delete(gw_id, group_key,state_itr.second , state_itr.first, modified); + } + dout(10) << " Delete GW :"<< gw_id << " ANA grpid: " << state.ana_grp_id << dendl; + for (auto& itr: created_gws[group_key]) { + remove_grp_id(itr.first, group_key, state.ana_grp_id);// Update state map and other maps + // of all created gateways. Removed key = anagrp + } + fsm_timers[group_key].erase(gw_id); + if (fsm_timers[group_key].size() == 0) + fsm_timers.erase(group_key); + + created_gws[group_key].erase(gw_id); + if (created_gws[group_key].size() == 0) + created_gws.erase(group_key); + return rc; + } + } + + return -EINVAL; +} + + +int NVMeofGwMap::process_gw_map_gw_down(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, + bool &propose_pending) { + int rc = 0; + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + if (gw_state != gws_states.end()) { + dout(10) << "GW down " << gw_id << dendl; + auto& st = gw_state->second; + st.set_unavailable_state(); + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + fsm_handle_gw_down(gw_id, group_key, state_itr.second, state_itr.first, propose_pending); + state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + } + propose_pending = true; // map should reflect that gw becames unavailable + if (propose_pending) validate_gw_map(group_key); + } + else { + dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map " << gw_id << dendl; + rc = -EINVAL; + } + return rc; +} + + +void NVMeofGwMap::process_gw_map_ka(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending) +{ + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + auto& st = gw_state->second; + dout(20) << "KA beacon from the GW " << gw_id << " in state " << (int)st.availability << dendl; + + if (st.availability == gw_availability_t::GW_CREATED) { + // first time appears - allow IO traffic for this GW + st.availability = gw_availability_t::GW_AVAILABLE; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + if (st.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { // not a redundand GW + st.active_state(st.ana_grp_id); + } + propose_pending = true; + } + else if (st.availability == gw_availability_t::GW_UNAVAILABLE) { + st.availability = gw_availability_t::GW_AVAILABLE; + if (st.ana_grp_id == REDUNDANT_GW_ANA_GROUP_ID) { + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE; + propose_pending = true; + } + else { + //========= prepare to Failback to this GW ========= + // find the GW that took over on the group st.ana_grp_id + find_failback_gw(gw_id, group_key, propose_pending); + } + } + else if (st.availability == gw_availability_t::GW_AVAILABLE) { + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + fsm_handle_gw_alive(gw_id, group_key, gw_state->second, state_itr.second, state_itr.first, last_osd_epoch, propose_pending); + } + } + if (propose_pending) validate_gw_map(group_key); +} + + +void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose) +{ + propose = false; + for (auto& group_state: created_gws) { + auto& group_key = group_state.first; + auto& gws_states = group_state.second; + + for (auto& gw_state : gws_states) { // loop for GWs inside nqn group + auto& gw_id = gw_state.first; + NvmeGwMonState& state = gw_state.second; + + //1. Failover missed : is there is a GW in unavailable state? if yes, is its ANA group handled by some other GW? + if (state.availability == gw_availability_t::GW_UNAVAILABLE && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) { + auto found_gw_for_ana_group = false; + for (auto& gw_state2 : gws_states) { + NvmeGwMonState& state2 = gw_state2.second; + if (state2.availability == gw_availability_t::GW_AVAILABLE && state2.sm_state[state.ana_grp_id] == gw_states_per_group_t::GW_ACTIVE_STATE) { + found_gw_for_ana_group = true; + break; + } + } + if (found_gw_for_ana_group == false) { //choose the GW for handle ana group + dout(10)<< "Was not found the GW " << " that handles ANA grp " << (int)state.ana_grp_id << " find candidate "<< dendl; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + find_failover_candidate(gw_id, group_key, state_itr.first, propose); + } + } + } + + //2. Failback missed: Check this GW is Available and Standby and no other GW is doing Failback to it + else if (state.availability == gw_availability_t::GW_AVAILABLE + && state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID && + state.sm_state[state.ana_grp_id] == gw_states_per_group_t::GW_STANDBY_STATE) + { + find_failback_gw(gw_id, group_key, propose); + } + } + if (propose) { + validate_gw_map(group_key); + } + } +} + + +void NVMeofGwMap::set_failover_gw_for_ANA_group(const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, NvmeAnaGrpId ANA_groupid) +{ + NvmeGwMonState& gw_state = created_gws[group_key][gw_id]; + epoch_t epoch; + dout(10) << "Found failover GW " << gw_id << " for ANA group " << (int)ANA_groupid << dendl; + int rc = blocklist_gw(failed_gw_id, group_key, ANA_groupid, epoch, true); + if (rc) { + gw_state.active_state(ANA_groupid); //start failover even when nonces are empty ! + } + else{ + gw_state.sm_state[ANA_groupid] = gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL; + gw_state.blocklist_data[ANA_groupid].osd_epoch = epoch; + gw_state.blocklist_data[ANA_groupid].is_failover = true; + start_timer(gw_id, group_key, ANA_groupid, 30); //start Failover preparation timer + } +} + +void NVMeofGwMap::find_failback_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose) +{ + auto& gws_states = created_gws[group_key]; + auto& gw_state = created_gws[group_key][gw_id]; + bool do_failback = false; + + dout(10) << "Find failback GW for GW " << gw_id << dendl; + for (auto& gw_state_it: gws_states) { + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] != gw_states_per_group_t::GW_STANDBY_STATE) {// some other gw owns or owned the desired ana-group + do_failback = true;// if candidate is in state ACTIVE for the desired ana-group, then failback starts immediately, otherwise need to wait + dout(10) << "Found some gw " << gw_state_it.first << " in state " << st.sm_state[gw_state.ana_grp_id] << dendl; + break; + } + } + + if (do_failback == false) { + // No other gw currently performs some activity with desired ana group of coming-up GW - so it just takes over on the group + dout(10) << "Failback GW candidate was not found, just set Optimized to group " << gw_state.ana_grp_id << " to GW " << gw_id << dendl; + gw_state.active_state(gw_state.ana_grp_id); + propose = true; + return; + } + //try to do_failback + for (auto& gw_state_it: gws_states) { + auto& failback_gw_id = gw_state_it.first; + auto& st = gw_state_it.second; + if (st.sm_state[gw_state.ana_grp_id] == gw_states_per_group_t::GW_ACTIVE_STATE) { + dout(10) << "Found Failback GW " << failback_gw_id << " that previously took over the ANAGRP " << gw_state.ana_grp_id << " of the available GW " << gw_id << dendl; + st.sm_state[gw_state.ana_grp_id] = gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED; + start_timer(failback_gw_id, group_key, gw_state.ana_grp_id, 3);// Add timestamp of start Failback preparation + gw_state.sm_state[gw_state.ana_grp_id] = gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED; + propose = true; + break; + } + } +} + +void NVMeofGwMap::find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending) +{ + dout(10) <<__func__<< " " << gw_id << dendl; + #define ILLEGAL_GW_ID " " + #define MIN_NUM_ANA_GROUPS 0xFFF + int min_num_ana_groups_in_gw = 0; + int current_ana_groups_in_gw = 0; + NvmeGwId min_loaded_gw_id = ILLEGAL_GW_ID; + auto& gws_states = created_gws[group_key]; + auto gw_state = gws_states.find(gw_id); + + // this GW may handle several ANA groups and for each of them need to found the candidate GW + if (gw_state->second.sm_state[grpid] == gw_states_per_group_t::GW_ACTIVE_STATE || gw_state->second.ana_grp_id == grpid) { + + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.sm_state[grpid] == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) { // some GW already started failover/failback on this group + dout(4) << "Warning : Failover" << st.blocklist_data[grpid].is_failover << " already started for the group " << grpid << " by GW " << found_gw_state.first << dendl; + gw_state->second.standby_state(grpid); + return ; + } + } + // Find a GW that takes over the ANA group(s) + min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS; + min_loaded_gw_id = ILLEGAL_GW_ID; + for (auto& found_gw_state: gws_states) { // for all the gateways of the subsystem + auto st = found_gw_state.second; + if (st.availability == gw_availability_t::GW_AVAILABLE) { + current_ana_groups_in_gw = 0; + for (auto& state_itr: created_gws[group_key][gw_id].sm_state ) { + NvmeAnaGrpId anagrp = state_itr.first; + if (st.sm_state[anagrp] == gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED || st.sm_state[anagrp] == gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED + || st.sm_state[anagrp] == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) { + current_ana_groups_in_gw = 0xFFFF; + break; // dont take into account GWs in the transitive state + } + else if (st.sm_state[anagrp] == gw_states_per_group_t::GW_ACTIVE_STATE) { + current_ana_groups_in_gw++; // how many ANA groups are handled by this GW + } + } + if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) { + min_num_ana_groups_in_gw = current_ana_groups_in_gw; + min_loaded_gw_id = found_gw_state.first; + dout(10) << "choose: gw-id min_ana_groups " << min_loaded_gw_id << current_ana_groups_in_gw << " min " << min_num_ana_groups_in_gw << dendl; + } + } + } + if (min_loaded_gw_id != ILLEGAL_GW_ID) { + propose_pending = true; + set_failover_gw_for_ANA_group(gw_id, group_key, min_loaded_gw_id, grpid); + } + else { + if (gw_state->second.sm_state[grpid] == gw_states_per_group_t::GW_ACTIVE_STATE) {// not found candidate but map changed. + propose_pending = true; + dout(10) << "gw down: no candidate found " << dendl; + } + } + gw_state->second.standby_state(grpid); + } +} + +void NVMeofGwMap::fsm_handle_gw_alive(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwMonState & gw_state, gw_states_per_group_t state, NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified) +{ + switch (state) { + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: + { + int timer_val = get_timer(gw_id, group_key, grpid); + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; + if (gw_map.blocklist_data[grpid].osd_epoch <= last_osd_epoch) { + dout(10) << "is-failover: " << gw_map.blocklist_data[grpid].is_failover << " osd epoch changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + gw_state.active_state(grpid); // Failover Gw still alive and guaranteed that + cancel_timer(gw_id, group_key, grpid); // ana group wouldnt be taken back during blocklist wait period + map_modified = true; + } + else{ + dout(20) << "osd epoch not changed from " << gw_map.blocklist_data[grpid].osd_epoch << " to "<< last_osd_epoch + << " Ana-grp: " << grpid << " timer:" << timer_val << dendl; + } + } + break; + + default: + break; + } +} + + void NVMeofGwMap::fsm_handle_gw_down(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified) + { + switch (state) + { + case gw_states_per_group_t::GW_STANDBY_STATE: + case gw_states_per_group_t::GW_IDLE_STATE: + // nothing to do + break; + + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: + { + cancel_timer(gw_id, group_key, grpid); + map_modified = true; + }break; + + case gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED: + cancel_timer(gw_id, group_key, grpid); + map_modified = true; + for (auto& gw_st: created_gws[group_key]) { + auto& st = gw_st.second; + if (st.sm_state[grpid] == gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) { // found GW that was intended for Failback for this ana grp + dout(4) << "Warning: Outgoing Failback when GW is down back - to rollback it" <<" GW " <mon->osdmon()->is_writeable() << dendl; + if (m->mon->osdmon()->is_writeable()) { + epoch_t epoch = m->mon->osdmon()->blocklist(addr_vect, expires); + dout(10) << "epoch " << epoch <mon->nvmegwmon()->request_proposal(m->mon->osdmon()); + } + else { + m->mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(m, addr_vect, expires)); + } + } +}; + +int NVMeofGwMap::blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, epoch_t &epoch, bool failover) +{ + NvmeGwMonState& gw_map = created_gws[group_key][gw_id]; //find_already_created_gw(gw_id, group_key); + + if (gw_map.nonce_map[grpid].size() > 0) { + NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];; + std::string str = "["; + entity_addrvec_t addr_vect; + + double d = g_conf().get_val("mon_osd_blocklist_default_expire"); + utime_t expires = ceph_clock_now(); + expires += d; + dout(10) << " blocklist timestamp " << expires << dendl; + for (auto &it: nonce_vector ) { + if (str != "[") str += ","; + str += it; + } + str += "]"; + bool rc = addr_vect.parse(&str[0]); + dout(10) << str << " rc " << rc << " network vector: " << addr_vect << " " << addr_vect.size() << dendl; + if (rc) + return 1; + + if (!mon->osdmon()->is_writeable()) { + dout(10) << "osdmon is not writable, waiting, epoch = " << epoch << dendl; + mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(this, addr_vect, expires ));// return false; + } + else { + epoch = mon->osdmon()->blocklist(addr_vect, expires); + if (!mon->osdmon()->is_writeable()) { + dout(10) << "osdmon is not writable after blocklist is done, waiting, epoch = " << epoch << dendl; + mon->osdmon()->wait_for_writeable_ctx( new CMonRequestProposal(this, addr_vect, expires ));// return false; + } + else{ + mon->nvmegwmon()->request_proposal(mon->osdmon()); + } + } + dout(10) << str << " mon->osdmon()->blocklist: epoch : " << epoch << " address vector: " << addr_vect << " " << addr_vect.size() << dendl; + } + else{ + dout(4) << "Error: No nonces context present for gw: " <= to.data[to_itr.first].end_time) { + fsm_handle_to_expired(gw_id, std::make_pair(pool, group), to_itr.first, propose_pending); + } + } + } + } +} + +void NVMeofGwMap::start_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value_sec) { + fsm_timers[group_key][gw_id].data[anagrpid].timer_started = 1; + fsm_timers[group_key][gw_id].data[anagrpid].timer_value = value_sec; + dout(10) << "start timer for ana " << anagrpid << " gw " << gw_id << "value sec " << (int)value_sec << dendl; + const auto now = std::chrono::system_clock::now(); + fsm_timers[group_key][gw_id].data[anagrpid].end_time = now + std::chrono::seconds(value_sec); +} + +int NVMeofGwMap::get_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + auto timer = fsm_timers[group_key][gw_id].data[anagrpid].timer_value; + return timer; +} + +void NVMeofGwMap::cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid) { + fsm_timers[group_key][gw_id].data[anagrpid].timer_started = 0; +} diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h new file mode 100755 index 0000000000000..2390176305b2e --- /dev/null +++ b/src/mon/NVMeofGwMap.h @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef MON_NVMEOFGWMAP_H_ +#define MON_NVMEOFGWMAP_H_ +#include +#include +#include "include/encoding.h" +#include "include/utime.h" +#include "common/Formatter.h" +#include "common/ceph_releases.h" +#include "common/version.h" +#include "common/options.h" +#include "common/Clock.h" +#include "msg/Message.h" +#include "common/ceph_time.h" +#include "NVMeofGwTypes.h" + +using ceph::coarse_mono_clock; +class Monitor; +/*-------------------*/ +class NVMeofGwMap +{ +public: + Monitor* mon = NULL; + epoch_t epoch = 0; // epoch is for Paxos synchronization mechanizm + bool delay_propose = false; + + std::map created_gws; + std::map fsm_timers;// map that handles timers started by all Gateway FSMs + void to_gmap(std::map& Gmap) const; + + int cfg_add_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + int cfg_delete_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key); + void process_gw_map_ka (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, epoch_t& last_osd_epoch, bool &propose_pending); + int process_gw_map_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void update_active_timers (bool &propose_pending); + void handle_abandoned_ana_groups (bool &propose_pending); + void handle_removed_subsystems (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const std::vector ¤t_subsystems, bool &propose_pending); + void start_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid, uint8_t value); +private: + void add_grp_id (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid); + void remove_grp_id(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, const NvmeAnaGrpId grpid); + void fsm_handle_gw_down (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_delete (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, gw_states_per_group_t state, NvmeAnaGrpId grpid, bool &map_modified); + void fsm_handle_gw_alive (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeGwMonState & gw_state, gw_states_per_group_t state, + NvmeAnaGrpId grpid, epoch_t& last_osd_epoch, bool &map_modified); + void fsm_handle_to_expired (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &map_modified); + + void find_failover_candidate(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, bool &propose_pending); + void find_failback_gw (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, bool &propose_pending); + void set_failover_gw_for_ANA_group (const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key, const NvmeGwId &gw_id, + NvmeAnaGrpId groupid); + + + int get_timer (const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + void cancel_timer(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId anagrpid); + void validate_gw_map(const NvmeGroupKey& group_key); + +public: + int blocklist_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key, NvmeAnaGrpId ANA_groupid, epoch_t &epoch, bool failover); + void encode(ceph::buffer::list &bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(epoch, bl);// global map epoch + + encode(created_gws, bl); //Encode created GWs + encode(fsm_timers, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator &bl) { + using ceph::decode; + DECODE_START(1, bl); + decode(epoch, bl); + + decode(created_gws, bl); + decode(fsm_timers, bl); + DECODE_FINISH(bl); + } +}; + +#include "NVMeofGwSerialize.h" + +#endif /* SRC_MON_NVMEOFGWMAP_H_ */ diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc new file mode 100644 index 0000000000000..6111f76f425d6 --- /dev/null +++ b/src/mon/NVMeofGwMon.cc @@ -0,0 +1,532 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include "include/stringify.h" +#include "NVMeofGwMon.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +using std::string; + +void NVMeofGwMon::init() { + dout(10) << "called " << dendl; +} + +void NVMeofGwMon::on_restart() { + dout(10) << "called " << dendl; + last_beacon.clear(); + last_tick = ceph::coarse_mono_clock::now(); + synchronize_last_beacon(); +} + + +void NVMeofGwMon::synchronize_last_beacon() { + dout(10) << "called, is leader : " << mon.is_leader() <<" active " << is_active() << dendl; + // Initialize last_beacon to identify transitions of available GWs to unavailable state + for (const auto& created_map_pair: map.created_gws) { + const auto& group_key = created_map_pair.first; + const NvmeGwMonStates& gw_created_map = created_map_pair.second; + for (const auto& gw_created_pair: gw_created_map) { + const auto& gw_id = gw_created_pair.first; + if (gw_created_pair.second.availability == gw_availability_t::GW_AVAILABLE) { + dout(10) << "synchronize last_beacon for GW :" << gw_id << dendl; + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = last_tick; + } + } + } +} + +void NVMeofGwMon::on_shutdown() { + dout(10) << "called " << dendl; +} + +void NVMeofGwMon::tick() { + if (!is_active() || !mon.is_leader()) { + dout(10) << "NVMeofGwMon leader : " << mon.is_leader() << "active : " << is_active() << dendl; + return; + } + bool _propose_pending = false; + + const auto now = ceph::coarse_mono_clock::now(); + const auto nvmegw_beacon_grace = g_conf().get_val("mon_nvmeofgw_beacon_grace"); + dout(15) << "NVMeofGwMon leader got a tick, pending epoch "<< pending_map.epoch << dendl; + + const auto client_tick_period = g_conf().get_val("nvmeof_mon_client_tick_period"); + //handle exception of tick overdued in order to avoid false detection of overdued beacons, like it done in MgrMonitor::tick + if (last_tick != ceph::coarse_mono_clock::zero() + && (now - last_tick > (nvmegw_beacon_grace - client_tick_period))) { + // This case handles either local slowness (calls being delayed + // for whatever reason) or cluster election slowness (a long gap + // between calls while an election happened) + dout(10) << ": resetting beacon timeouts due to mon delay " + "(slow election?) of " << now - last_tick << " seconds" << dendl; + for (auto &i : last_beacon) { + i.second = now; + } + } + + last_tick = now; + bool propose = false; + + pending_map.update_active_timers(propose); // Periodic: check active FSM timers + _propose_pending |= propose; + + const auto cutoff = now - nvmegw_beacon_grace; + for (auto &itr : last_beacon) {// Pass over all the stored beacons + auto& lb = itr.first; + auto last_beacon_time = itr.second; + if (last_beacon_time < cutoff) { + dout(10) << "beacon timeout for GW " << lb.gw_id << dendl; + pending_map.process_gw_map_gw_down(lb.gw_id, lb.group_key, propose); + _propose_pending |= propose; + last_beacon.erase(lb); + } + else { + dout(20) << "beacon live for GW key: " << lb.gw_id << dendl; + } + } + + pending_map.handle_abandoned_ana_groups(propose); // Periodic: take care of not handled ANA groups + _propose_pending |= propose; + + if (_propose_pending) { + dout(10) << "propose pending " <("mon_max_nvmeof_epochs"); + if (map.epoch > max) { + return map.epoch - max; + } + return 0; +} + +void NVMeofGwMon::create_pending() { + + pending_map = map;// deep copy of the object + pending_map.epoch++; + dout(10) << " pending " << pending_map << dendl; +} + +void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t) { + + dout(10) << dendl; + ceph_assert(get_last_committed() + 1 == pending_map.epoch); + bufferlist bl; + pending_map.encode(bl); + put_version(t, pending_map.epoch, bl); + put_last_committed(t, pending_map.epoch); +} + +void NVMeofGwMon::update_from_paxos(bool *need_bootstrap) { + version_t version = get_last_committed(); + + if (version != map.epoch) { + dout(10) << " NVMeGW loading version " << version << " " << map.epoch << dendl; + + bufferlist bl; + int err = get_version(version, bl); + ceph_assert(err == 0); + + auto p = bl.cbegin(); + map.decode(p); + if (!mon.is_leader()) { + dout(10) << "leader map: " << map << dendl; + } + check_subs(true); + } +} + +void NVMeofGwMon::check_sub(Subscription *sub) +{ + dout(10) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl; + if (sub->next <= map.epoch) + { + dout(10) << "Sending map to subscriber " << sub->session->con << " " << sub->session->con->get_peer_addr() << dendl; + sub->session->con->send_message2(make_message(map)); + + if (sub->onetime) { + mon.session_map.remove_sub(sub); + } else { + sub->next = map.epoch + 1; + } + } +} + +void NVMeofGwMon::check_subs(bool t) +{ + const std::string type = "NVMeofGw"; + dout(10) << "count " << mon.session_map.subs.count(type) << dendl; + + if (mon.session_map.subs.count(type) == 0) { + return; + } + for (auto sub : *(mon.session_map.subs[type])) { + check_sub(sub); + } +} + +bool NVMeofGwMon::preprocess_query(MonOpRequestRef op) { + dout(20) << dendl; + + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return preprocess_beacon(op); + + case MSG_MON_COMMAND: + try { + return preprocess_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return true; + } + + default: + mon.no_reply(op); + derr << "Unhandled message type " << m->get_type() << dendl; + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_update(MonOpRequestRef op) { + auto m = op->get_req(); + switch (m->get_type()) { + case MSG_MNVMEOF_GW_BEACON: + return prepare_beacon(op); + + case MSG_MON_COMMAND: + try { + return prepare_command(op); + } catch (const bad_cmd_get& e) { + bufferlist bl; + mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed()); + return false; /* nothing to propose! */ + } + + default: + mon.no_reply(op); + dout(1) << "Unhandled message type " << m->get_type() << dendl; + return false; /* nothing to propose! */ + } +} + +bool NVMeofGwMon::preprocess_command(MonOpRequestRef op) +{ + dout(10) << dendl; + auto m = op->get_req(); + std::stringstream sstrm; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, sstrm)) + { + string rs = sstrm.str(); + dout(4) << "Error : Invalid command " << m->cmd << "Error " << rs << dendl; + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(cmdmap, "prefix", prefix); + dout(10) << "MonCommand : "<< prefix << dendl; + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + if (prefix == "nvme-gw show") { + std::string pool, group; + if (!f) { + f.reset(Formatter::create(format, "json-pretty", "json-pretty")); + } + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(10) <<"nvme-gw show pool "<< pool << " group "<< group << dendl; + + if (map.created_gws[group_key].size()) { + f->open_object_section("common"); + f->dump_unsigned("epoch", map.epoch); + f->dump_string("pool", pool); + f->dump_string("group", group); + f->dump_unsigned("num gws", map.created_gws[group_key].size()); + sstrm <<"[ "; + NvmeGwId gw_id; + for (auto& gw_created_pair: map.created_gws[group_key]) { + gw_id = gw_created_pair.first; + auto& st = gw_created_pair.second; + sstrm << st.ana_grp_id+1 << " "; + } + sstrm << "]"; + f->dump_string("Anagrp list", sstrm.str()); + f->close_section(); + + for (auto& gw_created_pair: map.created_gws[group_key]) { + auto& gw_id = gw_created_pair.first; + auto& state = gw_created_pair.second; + f->open_object_section("stat"); + f->dump_string("gw-id", gw_id); + f->dump_unsigned("anagrp-id",state.ana_grp_id+1); + f->dump_unsigned("performed-full-startup", state.performed_full_startup); + std::stringstream sstrm1; + sstrm1 << state.availability; + f->dump_string("Availability", sstrm1.str()); + sstrm1.str(""); + for (auto &state_itr: map.created_gws[group_key][gw_id].sm_state) { + sstrm1 << " " << state_itr.first + 1 << ": " << state.sm_state[state_itr.first] << ","; + } + f->dump_string("ana states", sstrm1.str()); + f->close_section(); + } + f->flush(rdata); + sstrm.str(""); + } + else { + sstrm << "num_gws 0"; + } + getline(sstrm, rs); + mon.reply_command(op, err, rs, rdata, get_last_committed()); + return true; + } + return false; +} + +bool NVMeofGwMon::prepare_command(MonOpRequestRef op) +{ + dout(10) << dendl; + auto m = op->get_req(); + int rc; + std::stringstream sstrm; + bufferlist rdata; + string rs; + int err = 0; + cmdmap_t cmdmap; + bool response = false; + + if (!cmdmap_from_json(m->cmd, &cmdmap, sstrm)) + { + string rs = sstrm.str(); + mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + string format = cmd_getval_or(cmdmap, "format", "plain"); + boost::scoped_ptr f(Formatter::create(format)); + + const auto prefix = cmd_getval_or(cmdmap, "prefix", string{}); + + dout(10) << "MonCommand : "<< prefix << dendl; + if (prefix == "nvme-gw create" || prefix == "nvme-gw delete") { + std::string id, pool, group; + + cmd_getval(cmdmap, "id", id); + cmd_getval(cmdmap, "pool", pool); + cmd_getval(cmdmap, "group", group); + auto group_key = std::make_pair(pool, group); + dout(10) << " id "<< id <<" pool "<< pool << " group "<< group << dendl; + if (prefix == "nvme-gw create") { + rc = pending_map.cfg_add_gw(id, group_key); + if (rc == -EINVAL) { + err = rc; + dout (4) << "Error: GW cannot be created " << id << " " << pool << " " << group << " rc " << rc << dendl; + sstrm.str(""); + } + } + else{ + rc = pending_map.cfg_delete_gw(id, group_key); + if (rc == -EINVAL) { + dout (4) << "Error: GW not found in the database " << id << " " << pool << " " << group << " rc " << rc << dendl; + err = 0; + sstrm.str(""); + } + } + if ((rc != -EEXIST) && (rc != -EINVAL)) //propose pending would be generated by the PaxosService + response = true; + } + + getline(sstrm, rs); + if (response == false) { + if (err < 0 && rs.length() == 0) + { + rs = cpp_strerror(err); + dout(10) << "Error command err : "<< err << " rs-len: " << rs.length() << dendl; + } + mon.reply_command(op, err, rs, rdata, get_last_committed()); + } + else + wait_for_commit(op, new Monitor::C_Command(mon, op, 0, rs, + get_last_committed() + 1)); + return response; +} + + +bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op) { + auto m = op->get_req(); + const BeaconSubsystems& sub = m->get_subsystems(); + dout(15) << "beacon from " << m->get_type() << " GW : " << m->get_gw_id() << " num subsystems " << sub.size() << dendl; + + return false; // allways return false to call leader's prepare beacon +} + + +bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op) { + auto m = op->get_req(); + + dout(20) << "availability " << m->get_availability() << " GW : " << m->get_gw_id() << + " osdmap_epoch " << m->get_last_osd_epoch() << " subsystems " << m->get_subsystems() << dendl; + + NvmeGwId gw_id = m->get_gw_id(); + NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group()); + gw_availability_t avail = m->get_availability(); + bool propose = false; + bool nonce_propose = false; + bool timer_propose = false; + bool gw_created = true; + NVMeofGwMap ack_map; + auto& group_gws = map.created_gws[group_key]; + auto gw = group_gws.find(gw_id); + const BeaconSubsystems& sub = m->get_subsystems(); + + if (avail == gw_availability_t::GW_CREATED) { + if (gw == group_gws.end()) { + gw_created = false; + dout(10) << "Warning: GW " << gw_id << " group_key " << group_key << " was not found in the map.Created_gws "<< map.created_gws <(ack_map); + mon.send_reply(op, msg.detach()); + goto false_return; + } + } + + // At this stage the gw has to be in the Created_gws + if (gw == group_gws.end()) { + dout(4) << "Administratively deleted GW sends beacon " << gw_id <get_nonce_map().size()) { + if (pending_map.created_gws[group_key][gw_id].nonce_map != m->get_nonce_map()) + { + dout(10) << "nonce map of GW changed , propose pending " << gw_id << dendl; + pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map(); + dout(10) << "nonce map of GW " << gw_id << " "<< pending_map.created_gws[group_key][gw_id].nonce_map << dendl; + nonce_propose = true; + } + } + else { + dout(10) << "Warning: received empty nonce map in the beacon of GW " << gw_id << " "<< dendl; + } + + if (sub.size() == 0) { + avail = gw_availability_t::GW_UNAVAILABLE; + } + if (pending_map.created_gws[group_key][gw_id].subsystems != sub) + { + dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl; + pending_map.created_gws[group_key][gw_id].subsystems = sub; + dout(20) << "subsystems of GW " << gw_id << " "<< pending_map.created_gws[group_key][gw_id].subsystems << dendl; + nonce_propose = true; + } + pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid = ( map.epoch == m->get_last_gwmap_epoch() ); + if (pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid == false) { + dout(20) << "map epoch of gw is not up-to-date " << gw_id << " epoch " << map.epoch << " beacon_epoch " << m->get_last_gwmap_epoch() << dendl; + } + if (avail == gw_availability_t::GW_AVAILABLE) + { + auto now = ceph::coarse_mono_clock::now(); + // check pending_map.epoch vs m->get_version() - if different - drop the beacon + + LastBeacon lb = {gw_id, group_key}; + last_beacon[lb] = now; + epoch_t last_osd_epoch = m->get_last_osd_epoch(); + pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, propose); + } + else if (avail == gw_availability_t::GW_UNAVAILABLE) { // state set by GW client application + LastBeacon lb = {gw_id, group_key}; + + auto it = last_beacon.find(lb); + if (it != last_beacon.end()) { + last_beacon.erase(lb); + pending_map.process_gw_map_gw_down(gw_id, group_key, propose); + } + } + pending_map.update_active_timers(timer_propose); // Periodic: check active FSM timers + propose |= timer_propose; + propose |= nonce_propose; + +set_propose: + if (!propose) { + if (gw_created) { + ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];// respond with a map slice correspondent to the same GW + } + ack_map.epoch = map.epoch; + dout(20) << "ack_map " << ack_map <(ack_map); + mon.send_reply(op, msg.detach()); + } + else { + mon.no_reply(op); + } +false_return: + if (propose) { + dout(10) << "decision in prepare_beacon" < last_beacon; + ceph::coarse_mono_clock::time_point last_tick; + +public: + NVMeofGwMon(Monitor &mn, Paxos &p, const std::string& service_name): PaxosService(mn, p, service_name) {map.mon = &mn;} + ~NVMeofGwMon() override {} + + + // config observer + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, const std::set &changed) override {}; + + // 3 pure virtual methods of the paxosService + void create_initial() override {}; + void create_pending() override; + void encode_pending(MonitorDBStore::TransactionRef t) override; + + void init() override; + void on_shutdown() override; + void on_restart() override; + void update_from_paxos(bool *need_bootstrap) override; + + version_t get_trim_to() const override; + + bool preprocess_query(MonOpRequestRef op) override; + bool prepare_update(MonOpRequestRef op) override; + + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + + void encode_full(MonitorDBStore::TransactionRef t) override { } + + bool preprocess_beacon(MonOpRequestRef op); + bool prepare_beacon(MonOpRequestRef op); + + void tick() override; + void print_summary(ceph::Formatter *f, std::ostream *ss) const; + + void check_subs(bool type); + void check_sub(Subscription *sub); + +private: + void synchronize_last_beacon(); + +}; + +#endif /* MON_NVMEGWMONITOR_H_ */ diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h new file mode 100755 index 0000000000000..cd70554137275 --- /dev/null +++ b/src/mon/NVMeofGwSerialize.h @@ -0,0 +1,610 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ +#ifndef MON_NVMEOFGWSERIALIZE_H_ +#define MON_NVMEOFGWSERIALIZE_H_ +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define MODULE_PREFFIX "nvmeofgw " +#define dout_prefix *_dout << MODULE_PREFFIX << __PRETTY_FUNCTION__ << " " + +inline std::ostream& operator<<(std::ostream& os, const gw_exported_states_per_group_t value) { + switch (value) { + case gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE: os << "OPTIMIZED "; break; + case gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE: os << "INACCESSIBLE "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const gw_states_per_group_t value) { + switch (value) { + case gw_states_per_group_t::GW_IDLE_STATE: os << "IDLE "; break; + case gw_states_per_group_t::GW_STANDBY_STATE: os << "STANDBY "; break; + case gw_states_per_group_t::GW_ACTIVE_STATE: os << "ACTIVE "; break; + case gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED: os << "OWNER_FAILBACK_PREPARED "; break; + case gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED: os << "WAIT_FAILBACK_PREPARED "; break; + case gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL: os << "WAIT_BLOCKLIST_CMPL "; break; + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const gw_availability_t value) { + switch (value) { + + case gw_availability_t::GW_CREATED: os << "CREATED"; break; + case gw_availability_t::GW_AVAILABLE: os << "AVAILABLE"; break; + case gw_availability_t::GW_UNAVAILABLE: os << "UNAVAILABLE"; break; + + default: os << "Invalid " << (int)value << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const SmState value) { + os << "SM_STATE [ "; + for (auto& state_itr: value ) + os << value.at(state_itr.first); + os << "]"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconNamespace value) { + os << "BeaconNamespace( anagrpid:" << value.anagrpid << ", nonce:" << value.nonce <<" )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) { + os << "BeaconListener( addrfam:" << value.address_family + << ", addr:" << value.address + << ", svcid:" << value.svcid << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) { + os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ "; + for (const auto& list: value.listeners) os << list << " "; + os << "] namespaces [ "; + for (const auto& ns: value.namespaces) os << ns << " "; + os << "] )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NqnState value) { + os << "NqnState( nqn: " << value.nqn << ", " << value.ana_state << " )"; + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwClientState value) { + os << "NvmeGwState { group id: " << value.group_id << " gw_map_epoch " << value.gw_map_epoch << " availablilty "<< value.availability + << " GwSubsystems: [ "; + for (const auto& sub: value.subsystems) os << sub.second << " "; + os << " ] }"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGroupKey value) { + os << "NvmeGroupKey {" << value.first << "," << value.second << "}"; + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeGwMonClientStates value) { + os << "NvmeGwMap "; + for (auto& gw_state: value) { + os << "\n" << MODULE_PREFFIX <<" { == gw_id: " << gw_state.first << " -> " << gw_state.second << "}"; + } + os << "}"; + + return os; +}; + +inline std::ostream& operator<<(std::ostream& os, const NvmeNonceVector value) { + for (auto & nonces : value) { + os << nonces << " "; + } + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const NvmeAnaNonceMap value) { + if(value.size()) os << "\n" << MODULE_PREFFIX; + for (auto &nonce_map : value) { + os << " ana_grp: " << nonce_map.first << " [ " << nonce_map.second << "]\n"<< MODULE_PREFFIX ; + } + return os; +} + +inline std::ostream& print_gw_created_t(std::ostream& os, const NvmeGwMonState value, size_t num_ana_groups) { + os << "==Internal map ==NvmeGwCreated { ana_group_id " << value.ana_grp_id << " osd_epochs: "; + for (auto& blklst_itr: value.blocklist_data) + { + os << " " << blklst_itr.first <<": " << blklst_itr.second.osd_epoch << ":" < { " << group_gws.second << " }"; + } + os << "]"; + return os; +} + +inline void encode(const ana_state_t& st, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)st.size(), bl); + for (const auto& gr: st) { + encode((uint32_t)gr.first, bl); + encode((uint32_t)gr.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(ana_state_t& st, ceph::buffer::list::const_iterator &bl) { + uint32_t n; + DECODE_START(1, bl); + decode(n, bl); + st.resize(n); + for (uint32_t i = 0; i < n; i++) { + uint32_t a, b; + decode(a, bl); + decode(b, bl); + st[i].first = (gw_exported_states_per_group_t)a; + st[i].second = (epoch_t)b; + } + DECODE_FINISH(bl); +} + +inline void encode(const GwSubsystems& subsystems, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)subsystems.size(), bl); + for (const auto& sub: subsystems) { + encode(sub.second.nqn, bl); + encode(sub.second.ana_state, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(GwSubsystems& subsystems, ceph::bufferlist::const_iterator& bl) { + uint32_t num_subsystems; + DECODE_START(1, bl); + decode(num_subsystems, bl); + subsystems.clear(); + for (uint32_t i=0; i(endtime.time_since_epoch()).count(); + encode(millisecondsSinceEpoch , bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeGwTimerState& state, ceph::bufferlist::const_iterator& bl) { + uint32_t size; + DECODE_START(1, bl); + decode(size, bl); + for (uint32_t i = 0; i (duration); + state.data[tm_key] = tm; + } + DECODE_FINISH(bl); +} + +inline void encode(const NvmeAnaNonceMap& nonce_map, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode((uint32_t)nonce_map.size(), bl); + for (auto& ana_group_nonces : nonce_map) { + encode(ana_group_nonces.first, bl); // ana group id + encode ((uint32_t)ana_group_nonces.second.size(), bl); // encode the vector size + for (auto& nonce: ana_group_nonces.second) encode(nonce, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(NvmeAnaNonceMap& nonce_map, ceph::buffer::list::const_iterator &bl) { + uint32_t map_size; + NvmeAnaGrpId ana_grp_id; + uint32_t vector_size; + std::string nonce; + DECODE_START(1, bl); + decode(map_size, bl); + for (uint32_t i = 0; i& created_gws, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)created_gws.size(), bl); // number of groups + for (auto& group_gws: created_gws) { + auto& group_key = group_gws.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + auto& gws = group_gws.second; + encode (gws, bl); // encode group gws + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& created_gws, ceph::buffer::list::const_iterator &bl) { + created_gws.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i& gmap, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmap.size(), bl); // number of groups + for (auto& group_state: gmap) { + auto& group_key = group_state.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + encode(group_state.second, bl); + } + ENCODE_FINISH(bl); +} +// Start decode NvmeGroupKey, NvmeGwMap +inline void decode(std::map& gmap, ceph::buffer::list::const_iterator &bl) { + gmap.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i& gmetadata, ceph::bufferlist &bl) { + ENCODE_START(1, 1, bl); + encode ((uint32_t)gmetadata.size(), bl); // number of groups + for (auto& group_md: gmetadata) { + auto& group_key = group_md.first; + encode(group_key.first, bl); // pool + encode(group_key.second, bl); // group + + encode(group_md.second, bl); + } + ENCODE_FINISH(bl); +} + +inline void decode(std::map& gmetadata, ceph::buffer::list::const_iterator &bl) { + gmetadata.clear(); + uint32_t ngroups; + DECODE_START(1, bl); + decode(ngroups, bl); + for (uint32_t i = 0; i +#include +#include +#include + +using NvmeGwId = std::string; +using NvmeGroupKey = std::pair; +using NvmeNqnId = std::string; +using NvmeAnaGrpId = uint32_t; + + +enum class gw_states_per_group_t { + GW_IDLE_STATE = 0, //invalid state + GW_STANDBY_STATE, + GW_ACTIVE_STATE, + GW_OWNER_WAIT_FAILBACK_PREPARED, + GW_WAIT_FAILBACK_PREPARED, + GW_WAIT_BLOCKLIST_CMPL +}; + +enum class gw_exported_states_per_group_t { + GW_EXPORTED_OPTIMIZED_STATE = 0, + GW_EXPORTED_INACCESSIBLE_STATE +}; + +enum class gw_availability_t { + GW_CREATED = 0, + GW_AVAILABLE, + GW_UNAVAILABLE, + GW_DELETED +}; + +#define REDUNDANT_GW_ANA_GROUP_ID 0xFF +using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>; + +using ana_state_t = std::vector>; + +struct BeaconNamespace { + NvmeAnaGrpId anagrpid; + std::string nonce; + + // Define the equality operator + bool operator==(const BeaconNamespace& other) const { + return anagrpid == other.anagrpid && + nonce == other.nonce; + } +}; + +// Beacon Listener represents an NVME Subsystem listener, +// which generally does not have to use TCP/IP. +// It is derived from the SPDK listener JSON RPC representation. +// For more details, see https://spdk.io/doc/jsonrpc.html#rpc_nvmf_listen_address. +struct BeaconListener { + std::string address_family; // IPv4 or IPv6 + std::string address; // + std::string svcid; // port + + // Define the equality operator + bool operator==(const BeaconListener& other) const { + return address_family == other.address_family && + address == other.address && + svcid == other.svcid; + } +}; + +struct BeaconSubsystem { + NvmeNqnId nqn; + std::list listeners; + std::list namespaces; + + // Define the equality operator + bool operator==(const BeaconSubsystem& other) const { + return nqn == other.nqn && + listeners == other.listeners && + namespaces == other.namespaces; + } +}; + +using BeaconSubsystems = std::list; + +using NvmeNonceVector = std::vector; +using NvmeAnaNonceMap = std::map ; + +struct Blocklist_data{ + epoch_t osd_epoch; + bool is_failover; + Blocklist_data() { + osd_epoch = 0; + is_failover = true; + }; + Blocklist_data(epoch_t epoch, bool failover):osd_epoch(epoch), is_failover(failover) {}; +}; + +using BlocklistData = std::map < NvmeAnaGrpId, Blocklist_data>; + +struct NvmeGwMonState { + NvmeAnaGrpId ana_grp_id; // ana-group-id allocated for this GW, GW owns this group-id + gw_availability_t availability; // in absence of beacon heartbeat messages it becomes inavailable + bool last_gw_map_epoch_valid; // "true" if the last epoch seen by the gw-client is up-to-date + bool performed_full_startup; // in order to identify gws that did not exit upon failover + BeaconSubsystems subsystems; // gateway susbsystem and their state machine states + NvmeAnaNonceMap nonce_map; + SmState sm_state; // state machine states per ANA group + BlocklistData blocklist_data; + + NvmeGwMonState(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}; + + NvmeGwMonState(NvmeAnaGrpId id): ana_grp_id(id), availability(gw_availability_t::GW_CREATED), last_gw_map_epoch_valid(false), + performed_full_startup(false) {}; + void set_unavailable_state() { + availability = gw_availability_t::GW_UNAVAILABLE; + performed_full_startup = false; // after setting this state the next time monitor sees GW, it expects it performed the full startup + } + void standby_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = gw_states_per_group_t::GW_STANDBY_STATE; + }; + void active_state(NvmeAnaGrpId grpid) { + sm_state[grpid] = gw_states_per_group_t::GW_ACTIVE_STATE; + blocklist_data[grpid].osd_epoch = 0; + }; +}; + +struct NqnState { + std::string nqn; // subsystem NQN + ana_state_t ana_state; // subsystem's ANA state + + // constructors + NqnState(const std::string& _nqn, const ana_state_t& _ana_state): + nqn(_nqn), ana_state(_ana_state) {} + NqnState(const std::string& _nqn, const SmState& sm_state, const NvmeGwMonState & gw_created) : nqn(_nqn) { + uint32_t i = 0; + for (auto& state_itr: sm_state) { + if (state_itr.first > i) { + uint32_t num_to_add = state_itr.first - i; + for (uint32_t j = 0; j state_pair; + state_pair.first = gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE; + state_pair.second = 0; + ana_state.push_back(state_pair); + } + i += num_to_add; + } + std::pair state_pair; + state_pair.first = (sm_state.at(state_itr.first) == gw_states_per_group_t::GW_ACTIVE_STATE + || sm_state.at(state_itr.first) == gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) + ? gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE + : gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE; + state_pair.second = gw_created.blocklist_data.at(state_itr.first).osd_epoch; + ana_state.push_back(state_pair); + i ++; + } + } +}; + +typedef std::map GwSubsystems; + +struct NvmeGwClientState { + NvmeAnaGrpId group_id; + epoch_t gw_map_epoch; + GwSubsystems subsystems; + gw_availability_t availability; + NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available): + group_id(id), + gw_map_epoch(epoch), + availability(available) + {}; + + NvmeGwClientState() : NvmeGwClientState(REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}; +}; + + +struct Tmdata{ + uint32_t timer_started; // statemachine timer(timestamp) set in some state + uint8_t timer_value; + std::chrono::system_clock::time_point end_time; + Tmdata() { + timer_started = 0; + timer_value = 0; + } +}; + +using TmData = std::map < NvmeAnaGrpId, Tmdata>; + +struct NvmeGwTimerState { + TmData data; + NvmeGwTimerState() {}; +}; + +using NvmeGwMonClientStates = std::map; +using NvmeGwTimers = std::map; +using NvmeGwMonStates = std::map; + +#endif /* SRC_MON_NVMEOFGWTYPES_H_ */ diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 3429a8e999162..9dd2797852d41 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -36,6 +36,7 @@ enum { PAXOS_HEALTH, PAXOS_CONFIG, PAXOS_KV, + PAXOS_NVMEGW, PAXOS_NUM }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 22208d2d1f428..f649e0f3d3ee2 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -219,6 +219,9 @@ #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" + #ifdef WITH_BLKIN #include "Messenger.h" #endif @@ -885,6 +888,10 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_BEACON: + m = make_message(); + break; + case MSG_MON_MGR_REPORT: m = make_message(); break; @@ -944,6 +951,9 @@ Message *decode_message(CephContext *cct, m = make_message(); break; + case MSG_MNVMEOF_GW_MAP: + m = make_message(); + break; // -- simple messages without payload -- case CEPH_MSG_SHUTDOWN: diff --git a/src/msg/Message.h b/src/msg/Message.h index 15eb3feadcede..78557f90e48f0 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -242,6 +242,12 @@ // *** ceph-mgr <-> MON daemons *** #define MSG_MGR_UPDATE 0x70b +// *** nvmeof mon -> gw daemons *** +#define MSG_MNVMEOF_GW_MAP 0x800 + +// *** gw daemons -> nvmeof mon *** +#define MSG_MNVMEOF_GW_BEACON 0x801 + // ====================================================== // abstract Message class diff --git a/src/nvmeof/NVMeofGwClient.cc b/src/nvmeof/NVMeofGwClient.cc new file mode 100644 index 0000000000000..c82423de51588 --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.cc @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwClient.h" + +bool NVMeofGwClient::get_subsystems(subsystems_info& reply) { + get_subsystems_req request; + ClientContext context; + + Status status = stub_->get_subsystems(&context, request, &reply); + + return status.ok(); +} + +bool NVMeofGwClient::set_ana_state(const ana_info& info) { + req_status reply; + ClientContext context; + + Status status = stub_->set_ana_state(&context, info, &reply); + + return status.ok() && reply.status(); +} diff --git a/src/nvmeof/NVMeofGwClient.h b/src/nvmeof/NVMeofGwClient.h new file mode 100644 index 0000000000000..022485251d6ba --- /dev/null +++ b/src/nvmeof/NVMeofGwClient.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWCLIENT_H__ +#define __NVMEOFGWCLIENT_H__ +#include +#include +#include + +#include + +#include "gateway.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwClient { + public: + NVMeofGwClient(std::shared_ptr channel) + : stub_(Gateway::NewStub(channel)) {} + + bool get_subsystems(subsystems_info& reply); + bool set_ana_state(const ana_info& info); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/NVMeofGwMonitorClient.cc b/src/nvmeof/NVMeofGwMonitorClient.cc new file mode 100644 index 0000000000000..fc4358f07d4d7 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.cc @@ -0,0 +1,451 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include + +#include "common/errno.h" +#include "common/signal.h" +#include "common/ceph_argparse.h" +#include "include/compat.h" + +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + + +#include "messages/MNVMeofGwBeacon.h" +#include "messages/MNVMeofGwMap.h" +#include "NVMeofGwMonitorClient.h" +#include "NVMeofGwClient.h" +#include "NVMeofGwMonitorGroupClient.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout << "nvmeofgw " << __PRETTY_FUNCTION__ << " " + +NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) : + Dispatcher(g_ceph_context), + osdmap_epoch(0), + gwmap_epoch(0), + last_map_time(std::chrono::steady_clock::now()), + monc{g_ceph_context, poolctx}, + client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())), + objecter{g_ceph_context, client_messenger.get(), &monc, poolctx}, + client{client_messenger.get(), &monc, &objecter}, + timer(g_ceph_context, lock), + orig_argc(argc), + orig_argv(argv) +{ +} + +NVMeofGwMonitorClient::~NVMeofGwMonitorClient() = default; + +const char** NVMeofGwMonitorClient::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + NULL + }; + return KEYS; +} + +std::string read_file(const std::string& filename) { + std::ifstream file(filename); + std::string content((std::istreambuf_iterator(file)), std::istreambuf_iterator()); + return content; +} + +void NVMeofGwMonitorClient::init_gw_ssl_opts() +{ + if (server_cert.empty() && client_key.empty() && client_cert.empty()) + return; + + // load the certificates content + // create SSL/TLS credentials + gw_ssl_opts.pem_root_certs = read_file(server_cert); + gw_ssl_opts.pem_private_key = read_file(client_key); + gw_ssl_opts.pem_cert_chain = read_file(client_cert); +} + +std::shared_ptr NVMeofGwMonitorClient::gw_creds() +{ + // use insecure channel if no keys/certs defined + if (server_cert.empty() && client_key.empty() && client_cert.empty()) + return grpc::InsecureChannelCredentials(); + else + return grpc::SslCredentials(gw_ssl_opts); +} + +int NVMeofGwMonitorClient::init() +{ + dout(10) << dendl; + std::string val; + auto args = argv_to_vec(orig_argc, orig_argv); + + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_double_dash(args, i)) { + break; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-name", (char*)NULL)) { + name = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-pool", (char*)NULL)) { + pool = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-group", (char*)NULL)) { + group = val; + } else if (ceph_argparse_witharg(args, i, &val, "--gateway-address", (char*)NULL)) { + gateway_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--monitor-group-address", (char*)NULL)) { + monitor_address = val; + } else if (ceph_argparse_witharg(args, i, &val, "--server-cert", (char*)NULL)) { + server_cert = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-key", (char*)NULL)) { + client_key = val; + } else if (ceph_argparse_witharg(args, i, &val, "--client-cert", (char*)NULL)) { + client_cert = val; + } else { + ++i; + } + } + + dout(10) << "gateway name: " << name << + " pool:" << pool << + " group:" << group << + " address: " << gateway_address << dendl; + ceph_assert(name != "" && pool != "" && gateway_address != "" && monitor_address != ""); + + // ensures that either all are empty or all are non-empty. + ceph_assert((server_cert.empty() == client_key.empty()) && (client_key.empty() == client_cert.empty())); + init_gw_ssl_opts(); + + init_async_signal_handler(); + register_async_signal_handler(SIGHUP, sighup_handler); + + std::lock_guard l(lock); + + // Initialize Messenger + client_messenger->add_dispatcher_tail(this); + client_messenger->add_dispatcher_head(&objecter); + client_messenger->add_dispatcher_tail(&client); + client_messenger->start(); + + poolctx.start(2); + + // Initialize MonClient + if (monc.build_initial_monmap() < 0) { + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc.sub_want("NVMeofGw", 0, 0); + monc.set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc.set_messenger(client_messenger.get()); + + // We must register our config callback before calling init(), so + // that we see the initial configuration message + monc.register_config_callback([this](const std::string &k, const std::string &v){ + // leaving this for debugging purposes + dout(10) << "nvmeof config_callback: " << k << " : " << v << dendl; + + return false; + }); + monc.register_config_notify_callback([this]() { + dout(4) << "nvmeof monc config notify callback" << dendl; + }); + dout(4) << "nvmeof Registered monc callback" << dendl; + + int r = monc.init(); + if (r < 0) { + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(10) << "nvmeof Registered monc callback" << dendl; + + r = monc.authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify an ID with a valid keyring?" << dendl; + monc.shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + dout(10) << "monc.authentication done" << dendl; + monc.set_passthrough_monmap(); + + client_t whoami = monc.get_global_id(); + client_messenger->set_myname(entity_name_t::MGR(whoami.v)); + objecter.set_client_incarnation(0); + objecter.init(); + objecter.enable_blocklist_events(); + objecter.start(); + client.init(); + timer.init(); + + tick(); + + dout(10) << "Complete." << dendl; + return 0; +} + +static bool get_gw_state(const char* desc, const std::map& m, const NvmeGroupKey& group_key, const NvmeGwId& gw_id, NvmeGwClientState& out) +{ + auto gw_group = m.find(group_key); + if (gw_group == m.end()) { + dout(10) << "can not find group (" << group_key.first << "," << group_key.second << ") " << desc << " map: " << m << dendl; + return false; + } + auto gw_state = gw_group->second.find(gw_id); + if (gw_state == gw_group->second.end()) { + dout(10) << "can not find gw id: " << gw_id << " in " << desc << "group: " << gw_group->second << dendl; + return false; + } + out = gw_state->second; + return true; +} + +void NVMeofGwMonitorClient::send_beacon() +{ + ceph_assert(ceph_mutex_is_locked_by_me(lock)); + gw_availability_t gw_availability = gw_availability_t::GW_CREATED; + BeaconSubsystems subs; + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, gw_creds())); + subsystems_info gw_subsystems; + bool ok = gw_client.get_subsystems(gw_subsystems); + if (ok) { + for (int i = 0; i < gw_subsystems.subsystems_size(); i++) { + const subsystem& sub = gw_subsystems.subsystems(i); + BeaconSubsystem bsub; + bsub.nqn = sub.nqn(); + for (int j = 0; j < sub.namespaces_size(); j++) { + const auto& ns = sub.namespaces(j); + BeaconNamespace bns = {ns.anagrpid(), ns.nonce()}; + bsub.namespaces.push_back(bns); + } + for (int k = 0; k < sub.listen_addresses_size(); k++) { + const auto& ls = sub.listen_addresses(k); + BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() }; + bsub.listeners.push_back(bls); + } + subs.push_back(bsub); + } + } + + auto group_key = std::make_pair(pool, group); + NvmeGwClientState old_gw_state; + // if already got gateway state in the map + if (get_gw_state("old map", map, group_key, name, old_gw_state)) + gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE; + dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability << + " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl; + auto m = ceph::make_message( + name, + pool, + group, + subs, + gw_availability, + osdmap_epoch, + gwmap_epoch); + monc.send_mon_message(std::move(m)); +} + +void NVMeofGwMonitorClient::disconnect_panic() +{ + auto disconnect_panic_duration = g_conf().get_val("nvmeof_mon_client_disconnect_panic").count(); + auto now = std::chrono::steady_clock::now(); + auto elapsed_seconds = std::chrono::duration_cast(now - last_map_time).count(); + if (elapsed_seconds > disconnect_panic_duration) { + dout(4) << "Triggering a panic upon disconnection from the monitor, elapsed " << elapsed_seconds << ", configured disconnect panic duration " << disconnect_panic_duration << dendl; + throw std::runtime_error("Lost connection to the monitor (beacon timeout)."); + } +} + +void NVMeofGwMonitorClient::tick() +{ + dout(10) << dendl; + + disconnect_panic(); + send_beacon(); + + timer.add_event_after( + g_conf().get_val("nvmeof_mon_client_tick_period").count(), + new LambdaContext([this](int r){ + tick(); + } + )); +} + +void NVMeofGwMonitorClient::shutdown() +{ + std::lock_guard l(lock); + + dout(4) << "nvmeof Shutting down" << dendl; + + + // stop sending beacon first, I use monc to talk with monitors + timer.shutdown(); + // client uses monc and objecter + client.shutdown(); + // Stop asio threads, so leftover events won't call into shut down + // monclient/objecter. + poolctx.finish(); + // stop monc + monc.shutdown(); + + // objecter is used by monc + objecter.shutdown(); + // client_messenger is used by all of them, so stop it in the end + client_messenger->shutdown(); +} + +void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t nmap) +{ + last_map_time = std::chrono::steady_clock::now(); // record time of last monitor message + + auto &new_map = nmap->get_map(); + gwmap_epoch = nmap->get_gwmap_epoch(); + auto group_key = std::make_pair(pool, group); + dout(10) << "handle nvmeof gw map: " << new_map << dendl; + + NvmeGwClientState old_gw_state; + auto got_old_gw_state = get_gw_state("old map", map, group_key, name, old_gw_state); + NvmeGwClientState new_gw_state; + auto got_new_gw_state = get_gw_state("new map", new_map, group_key, name, new_gw_state); + + // ensure that the gateway state has not vanished + ceph_assert(got_new_gw_state || !got_old_gw_state); + + if (!got_old_gw_state) { + if (!got_new_gw_state) { + dout(10) << "Can not find new gw state" << dendl; + return; + } + bool set_group_id = false; + while (!set_group_id) { + NVMeofGwMonitorGroupClient monitor_group_client( + grpc::CreateChannel(monitor_address, gw_creds())); + dout(10) << "GRPC set_group_id: " << new_gw_state.group_id << dendl; + set_group_id = monitor_group_client.set_group_id( new_gw_state.group_id); + if (!set_group_id) { + dout(10) << "GRPC set_group_id failed" << dendl; + auto retry_timeout = g_conf().get_val("mon_nvmeofgw_set_group_id_retry"); + usleep(retry_timeout); + } + } + } + + if (got_old_gw_state && got_new_gw_state) { + dout(10) << "got_old_gw_state: " << old_gw_state << "got_new_gw_state: " << new_gw_state << dendl; + // Make sure we do not get out of order state changes from the monitor + ceph_assert(new_gw_state.gw_map_epoch >= old_gw_state.gw_map_epoch); + + // If the monitor previously identified this gateway as accessible but now + // flags it as unavailable, it suggests that the gateway lost connection + // to the monitor. + if (old_gw_state.availability == gw_availability_t::GW_AVAILABLE && + new_gw_state.availability == gw_availability_t::GW_UNAVAILABLE) { + dout(4) << "Triggering a panic upon disconnection from the monitor, gw state - unavailable" << dendl; + throw std::runtime_error("Lost connection to the monitor (gw map unavailable)."); + } + } + + // Gather all state changes + ana_info ai; + epoch_t max_blocklist_epoch = 0; + for (const auto& nqn_state_pair: new_gw_state.subsystems) { + auto& sub = nqn_state_pair.second; + const auto& nqn = nqn_state_pair.first; + nqn_ana_states nas; + nas.set_nqn(nqn); + const auto& old_nqn_state_pair = old_gw_state.subsystems.find(nqn); + auto found_old_nqn_state = (old_nqn_state_pair != old_gw_state.subsystems.end()); + + // old and new ana group id ranges could be different + auto ana_state_size = (found_old_nqn_state) ? + std::max(old_nqn_state_pair->second.ana_state.size(), sub.ana_state.size()) : + sub.ana_state.size(); + + for (NvmeAnaGrpId ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) { + const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0); + auto new_group_state = (ana_grp_index < sub.ana_state.size()) ? + sub.ana_state[ana_grp_index] : + initial_ana_state; + auto old_group_state = (got_old_gw_state && found_old_nqn_state && ana_grp_index < old_nqn_state_pair->second.ana_state.size()) ? + old_nqn_state_pair->second.ana_state[ana_grp_index] : + initial_ana_state; + + // if no state change detected for this nqn, group id + if (new_group_state.first == old_group_state.first) { + continue; + } + ana_group_state gs; + gs.set_grp_id(ana_grp_index + 1); // offset by 1, index 0 is ANAGRP1 + const auto& new_agroup_state = new_group_state.first; + const epoch_t& blocklist_epoch = new_group_state.second; + + if (new_agroup_state == gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE && + blocklist_epoch != 0) { + if (blocklist_epoch > max_blocklist_epoch) max_blocklist_epoch = blocklist_epoch; + } + gs.set_state(new_agroup_state == gw_exported_states_per_group_t::GW_EXPORTED_OPTIMIZED_STATE ? OPTIMIZED : INACCESSIBLE); // Set the ANA state + nas.mutable_states()->Add(std::move(gs)); + dout(10) << " grpid " << (ana_grp_index + 1) << " state: " << new_gw_state << dendl; + } + if (nas.states_size()) ai.mutable_states()->Add(std::move(nas)); + } + + // if there is state change, notify the gateway + if (ai.states_size()) { + bool set_ana_state = false; + while (!set_ana_state) { + NVMeofGwClient gw_client( + grpc::CreateChannel(gateway_address, gw_creds())); + set_ana_state = gw_client.set_ana_state(ai); + if (!set_ana_state) { + dout(10) << "GRPC set_ana_state failed" << dendl; + usleep(1000); // TODO conf option + } + } + // Update latest accepted osdmap epoch, for beacons + if (max_blocklist_epoch > osdmap_epoch) { + osdmap_epoch = max_blocklist_epoch; + dout(10) << "Ready for blocklist osd map epoch: " << osdmap_epoch << dendl; + } + } + map = new_map; +} + +bool NVMeofGwMonitorClient::ms_dispatch2(const ref_t& m) +{ + std::lock_guard l(lock); + dout(10) << "got map type " << m->get_type() << dendl; + + if (m->get_type() == MSG_MNVMEOF_GW_MAP) { + handle_nvmeof_gw_map(ref_cast(m)); + } + bool handled = false; + return handled; +} + +int NVMeofGwMonitorClient::main(std::vector args) +{ + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGHUP, sighup_handler); + shutdown_async_signal_handler(); + + return 0; +} diff --git a/src/nvmeof/NVMeofGwMonitorClient.h b/src/nvmeof/NVMeofGwMonitorClient.h new file mode 100644 index 0000000000000..5bcca91eb4a0a --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorClient.h @@ -0,0 +1,97 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023,2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef NVMEOFGWMONITORCLIENT_H_ +#define NVMEOFGWMONITORCLIENT_H_ + +#include "auth/Auth.h" +#include "common/async/context_pool.h" +#include "common/Finisher.h" +#include "common/Timer.h" +#include "common/LogClient.h" + +#include "client/Client.h" +#include "mon/MonClient.h" +#include "osdc/Objecter.h" +#include "messages/MNVMeofGwMap.h" + +#include +#include + +class NVMeofGwMonitorClient: public Dispatcher, + public md_config_obs_t { +private: + std::string name; + std::string pool; + std::string group; + std::string gateway_address; + std::string monitor_address; + std::string server_cert; + std::string client_key; + std::string client_cert; + grpc::SslCredentialsOptions + gw_ssl_opts; // gateway grpc ssl options + epoch_t osdmap_epoch; // last awaited osdmap_epoch + epoch_t gwmap_epoch; // last received gw map epoch + std::chrono::time_point + last_map_time; // used to panic on disconnect + + // init gw ssl opts + void init_gw_ssl_opts(); + + // returns gateway grpc credentials + std::shared_ptr gw_creds(); + +protected: + ceph::async::io_context_pool poolctx; + MonClient monc; + std::unique_ptr client_messenger; + Objecter objecter; + Client client; + std::map map; + ceph::mutex lock = ceph::make_mutex("NVMeofGw::lock"); + SafeTimer timer; + + int orig_argc; + const char **orig_argv; + + void send_config_beacon(); + void send_beacon(); + +public: + NVMeofGwMonitorClient(int argc, const char **argv); + ~NVMeofGwMonitorClient() override; + + // Dispatcher interface + bool ms_dispatch2(const ceph::ref_t& m) override; + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; }; + + // config observer bits + const char** get_tracked_conf_keys() const override; + void handle_conf_change(const ConfigProxy& conf, + const std::set &changed) override {}; + + int init(); + void shutdown(); + int main(std::vector args); + void tick(); + void disconnect_panic(); + + void handle_nvmeof_gw_map(ceph::ref_t m); +}; + +#endif + diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.cc b/src/nvmeof/NVMeofGwMonitorGroupClient.cc new file mode 100644 index 0000000000000..27ed7b134816a --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.cc @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "NVMeofGwMonitorGroupClient.h" + +bool NVMeofGwMonitorGroupClient::set_group_id(const uint32_t& id) { + group_id_req request; + request.set_id(id); + google::protobuf::Empty reply; + ClientContext context; + + Status status = stub_->group_id(&context, request, &reply); + + return status.ok(); +} diff --git a/src/nvmeof/NVMeofGwMonitorGroupClient.h b/src/nvmeof/NVMeofGwMonitorGroupClient.h new file mode 100644 index 0000000000000..805e182c15c13 --- /dev/null +++ b/src/nvmeof/NVMeofGwMonitorGroupClient.h @@ -0,0 +1,39 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef __NVMEOFGWMONITORGROUPCLIENT_H__ +#define __NVMEOFGWMONITORGROUPCLIENT_H__ +#include +#include +#include + +#include + +#include "monitor.grpc.pb.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class NVMeofGwMonitorGroupClient { + public: + NVMeofGwMonitorGroupClient(std::shared_ptr channel) + : stub_(MonitorGroup::NewStub(channel)) {} + + bool set_group_id(const uint32_t& id); + + private: + std::unique_ptr stub_; +}; +#endif diff --git a/src/nvmeof/gateway b/src/nvmeof/gateway new file mode 160000 index 0000000000000..322a86f7348af --- /dev/null +++ b/src/nvmeof/gateway @@ -0,0 +1 @@ +Subproject commit 322a86f7348af1bc173f01e6cc4b64e9a8075727 diff --git a/src/pybind/mgr/cephadm/services/nvmeof.py b/src/pybind/mgr/cephadm/services/nvmeof.py index ac258887f6a51..9f9ba94557b34 100644 --- a/src/pybind/mgr/cephadm/services/nvmeof.py +++ b/src/pybind/mgr/cephadm/services/nvmeof.py @@ -21,6 +21,9 @@ class NvmeofService(CephService): def config(self, spec: NvmeofServiceSpec) -> None: # type: ignore assert self.TYPE == spec.service_type assert spec.pool + self.pool = spec.pool + assert spec.group is not None + self.group = spec.group self.mgr._check_pool_exists(spec.pool, spec.service_name()) def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: @@ -77,8 +80,36 @@ class NvmeofService(CephService): daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) daemon_spec.deps = [] + if not hasattr(self, 'gws'): + self.gws = {} # id -> name map of gateways for this service. + self.gws[nvmeof_gw_id] = name # add to map of service's gateway names return daemon_spec + def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None: + """ Overrides the daemon_check_post to add nvmeof gateways safely + """ + self.mgr.log.info(f"nvmeof daemon_check_post {daemon_descrs}") + # Assert configured + assert self.pool + assert self.group is not None + for dd in daemon_descrs: + self.mgr.log.info(f"nvmeof daemon_descr {dd}") + assert dd.daemon_id in self.gws + name = self.gws[dd.daemon_id] + self.mgr.log.info(f"nvmeof daemon name={name}") + # Notify monitor about this gateway creation + cmd = { + 'prefix': 'nvme-gw create', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"create gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") + super().daemon_check_post(daemon_descrs) + def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: def get_set_cmd_dicts(out: str) -> List[dict]: gateways = json.loads(out)['gateways'] @@ -151,10 +182,41 @@ class NvmeofService(CephService): if not ret: logger.info(f'{daemon.hostname} removed from nvmeof gateways dashboard config') - # and any certificates being used for mTLS + # Assert configured + assert self.pool + assert self.group is not None + assert daemon.daemon_id in self.gws + name = self.gws[daemon.daemon_id] + self.gws.pop(daemon.daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"delete gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") def purge(self, service_name: str) -> None: - """Removes configuration + """Make sure no zombie gateway is left behind """ - # TODO: what should we purge in this case (if any)? - pass + # Assert configured + assert self.pool + assert self.group is not None + for daemon_id in self.gws: + name = self.gws[daemon_id] + self.gws.pop(daemon_id) + # Notify monitor about this gateway deletion + cmd = { + 'prefix': 'nvme-gw delete', + 'id': name, + 'group': self.group, + 'pool': self.pool + } + self.mgr.log.info(f"purge delete gateway: monitor command {cmd}") + _, _, err = self.mgr.mon_command(cmd) + if err: + self.mgr.log.error(f"Unable to send monitor command {cmd}, error {err}") diff --git a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 index 18786f95bbe8d..644ca586ba93f 100644 --- a/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 +++ b/src/pybind/mgr/cephadm/templates/services/nvmeof/ceph-nvmeof.conf.j2 @@ -1,7 +1,7 @@ # {{ cephadm_managed }} [gateway] name = {{ name }} -group = {{ spec.group if spec.group is not none else '' }} +group = {{ spec.group }} addr = {{ addr }} port = {{ spec.port }} enable_auth = {{ spec.enable_auth }} diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index 4b88cf8044269..1664c4de74eca 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -1355,7 +1355,7 @@ class NvmeofServiceSpec(ServiceSpec): max_log_directory_backups: Optional[int] = 10, log_directory: Optional[str] = '/var/log/ceph/', monitor_timeout: Optional[float] = 1.0, - enable_monitor_client: bool = False, + enable_monitor_client: bool = True, placement: Optional[PlacementSpec] = None, unmanaged: bool = False, preview_only: bool = False, @@ -1381,7 +1381,7 @@ class NvmeofServiceSpec(ServiceSpec): #: ``name`` name of the nvmeof gateway self.name = name #: ``group`` name of the nvmeof gateway - self.group = group + self.group = group or '' #: ``enable_auth`` enables user authentication on nvmeof gateway self.enable_auth = enable_auth #: ``state_update_notify`` enables automatic update from OMAP in nvmeof gateway diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 2e756eeb58380..6272b3b1ed676 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -1008,3 +1008,11 @@ add_ceph_unittest(unittest_weighted_shuffle) add_executable(unittest_intarith test_intarith.cc) add_ceph_unittest(unittest_intarith) #make check ends here + +# test_nvmeof_mon_encoding +add_executable(test_nvmeof_mon_encoding + test_nvmeof_mon_encoding.cc + ) +target_link_libraries(test_nvmeof_mon_encoding + mon ceph-common global-static + ) diff --git a/src/test/test_nvmeof_mon_encoding.cc b/src/test/test_nvmeof_mon_encoding.cc new file mode 100644 index 0000000000000..8cd2381fa784a --- /dev/null +++ b/src/test/test_nvmeof_mon_encoding.cc @@ -0,0 +1,200 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include "common/ceph_argparse.h" +#include "common/debug.h" +#include "include/ceph_assert.h" +#include "global/global_init.h" +#include "mon/NVMeofGwMon.h" +#include "messages/MNVMeofGwMap.h" +#include "messages/MNVMeofGwBeacon.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix *_dout + +using namespace std; + +void test_NVMeofGwMap() { + dout(0) << __func__ << "\n\n" << dendl; + + NVMeofGwMap pending_map; + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].performed_full_startup = true; + int i = 0; + for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ + blklst_itr.second.osd_epoch = 2*(i++); + blklst_itr.second.is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << " == Dump map before Encode : == " < map; + + std::string pool = "pool1"; + std::string group = "grp1"; + std::string gw_id = "GW1"; + NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE); + std::string nqn = "nqn"; + ana_state_t ana_state; + NqnState nqn_state(nqn, ana_state); + state.subsystems.insert({nqn, nqn_state}); + + auto group_key = std::make_pair(pool, group); + map[group_key][gw_id] = state; + + + + ceph::buffer::list bl; + encode(map, bl); + dout(0) << "encode: " << map << dendl; + decode(map, bl); + dout(0) << "decode: " << map << dendl; + + BeaconSubsystem sub = { nqn, {}, {} }; + NVMeofGwMap pending_map; + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub); + int i = 0; + for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){ + blklst_itr.second.osd_epoch = 2*(i++); + blklst_itr.second.is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + dout(0) << "False pending map: " << pending_map << dendl; + + auto msg = make_message(pending_map); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + + dout(0) << "\n == Test GW Delete ==" << dendl; + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "deleted GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW1" ,group_key); + dout(0) << "duplicated delete of GW1 " << pending_map << dendl; + + pending_map.cfg_delete_gw("GW2" ,group_key); + dout(0) << "deleted GW2 " << pending_map << dendl; + + dout(0) << "delete of wrong gw id" << dendl; + pending_map.cfg_delete_gw("wow" ,group_key); + + pending_map.cfg_delete_gw("GW3" ,group_key); + dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl; + + +} + +void test_MNVMeofGwBeacon() { + std::string gw_id = "GW"; + std::string gw_pool = "pool"; + std::string gw_group = "group"; + gw_availability_t availability = gw_availability_t::GW_AVAILABLE; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystems subs = { sub }; + epoch_t osd_epoch = 17; + epoch_t gwmap_epoch = 42; + + auto msg = make_message( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch); + msg->encode_payload(0); + msg->decode_payload(); + dout(0) << "decode msg: " << *msg << dendl; + ceph_assert(msg->get_gw_id() == gw_id); + ceph_assert(msg->get_gw_pool() == gw_pool); + ceph_assert(msg->get_gw_group() == gw_group); + ceph_assert(msg->get_availability() == availability); + ceph_assert(msg->get_last_osd_epoch() == osd_epoch); + ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch); + const auto& dsubs = msg->get_subsystems(); + auto it = std::find_if(dsubs.begin(), dsubs.end(), + [&nqn](const auto& element) { + return element.nqn == nqn; + }); + ceph_assert(it != dsubs.end()); +} + +void test_NVMeofGwTimers() +{ + NVMeofGwMap pending_map; + //pending_map.Gmetadata; + const NvmeGroupKey group_key = std::make_pair("a","b"); + std::string gwid = "GW1"; + NvmeAnaGrpId grpid = 2; + pending_map.start_timer(gwid, group_key, grpid, 30); + auto end_time = pending_map.fsm_timers[group_key][gwid].data[grpid].end_time; + uint64_t millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.fsm_timers[group_key][gwid].data[grpid].timer_value << dendl; + ceph::buffer::list bl; + pending_map.encode(bl); + auto p = bl.cbegin(); + pending_map.decode(p); + + end_time = pending_map.fsm_timers[group_key][gwid].data[2].end_time; + millisecondsSinceEpoch = std::chrono::duration_cast(end_time.time_since_epoch()).count(); + dout(0) << "After encode decode Metadata milliseconds " << millisecondsSinceEpoch << " " << (int)pending_map.fsm_timers[group_key][gwid].data[grpid].timer_value< +using namespace std; +#include "include/ceph_features.h" + +#define TYPE(t) +#define TYPE_STRAYDATA(t) +#define TYPE_NONDETERMINISTIC(t) +#define TYPE_FEATUREFUL(t) +#define TYPE_FEATUREFUL_STRAYDATA(t) +#define TYPE_FEATUREFUL_NONDETERMINISTIC(t) +#define TYPE_FEATUREFUL_NOCOPY(t) +#define TYPE_NOCOPY(t) +#define MESSAGE(t) +#include "nvmeof_types.h" +#undef TYPE +#undef TYPE_STRAYDATA +#undef TYPE_NONDETERMINISTIC +#undef TYPE_NOCOPY +#undef TYPE_FEATUREFUL +#undef TYPE_FEATUREFUL_STRAYDATA +#undef TYPE_FEATUREFUL_NONDETERMINISTIC +#undef TYPE_FEATUREFUL_NOCOPY +#undef MESSAGE + +#include "denc_plugin.h" + +DENC_API void register_dencoders(DencoderPlugin* plugin) +{ +#include "nvmeof_types.h" +} + +DENC_API void unregister_dencoders(DencoderPlugin* plugin) +{ + plugin->unregister_dencoders(); +} diff --git a/src/tools/ceph-dencoder/nvmeof_types.h b/src/tools/ceph-dencoder/nvmeof_types.h new file mode 100644 index 0000000000000..96cff7353b634 --- /dev/null +++ b/src/tools/ceph-dencoder/nvmeof_types.h @@ -0,0 +1,174 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_NVMEOF_TYPES_H +#define CEPH_NVMEOF_TYPES_H + +#ifdef WITH_NVMEOF_GATEWAY_MONITOR_CLIENT +#include "mon/NVMeofGwMon.h" +#include "messages/MNVMeofGwMap.h" +#include "messages/MNVMeofGwBeacon.h" +TYPE(NVMeofGwMap) +// Implement the dencoder interface +class NVMeofGwMapDencoder { + private: + NVMeofGwMap m; + public: + NVMeofGwMapDencoder() = default; + explicit NVMeofGwMapDencoder(const NVMeofGwMap& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("NVMeofGwMap") << m; + } + + static void generate_test_instances(std::list& ls) { + std::string pool = "pool1"; + std::string group = "grp1"; + auto group_key = std::make_pair(pool, group); + m.cfg_add_gw("GW1" ,group_key); + m.cfg_add_gw("GW2" ,group_key); + m.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + m.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + m.created_gws[group_key]["GW1"].performed_full_startup = true; + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + m.created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + m.created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + m.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + + ls.push_back(new NVMeofGwMapDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(NVMeofGwMapDencoder) + +TYPE(MNVMeofGwMap) +// Implement the dencoder interface +class MNVMeofGwMapDencoder { + private: + MNVMeofGwMap m; + public: + MNVMeofGwMapDencoder() = default; + explicit MNVMeofGwMapDencoder(const MNVMeofGwMap& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("MNVMeofGwMap") << m; + } + + static void generate_test_instances(std::list& ls) { + std::map map; + std::string pool = "pool1"; + std::string group = "grp1"; + std::string gw_id = "GW1"; + NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE); + std::string nqn = "nqn"; + ANA_STATE ana_state; + NqnState nqn_state(nqn, ana_state); + state.subsystems.insert({nqn, nqn_state}); + + auto group_key = std::make_pair(pool, group); + map[group_key][gw_id] = state; + BeaconSubsystem sub = { nqn, {}, {} }; + NVMeofGwMap pending_map; + pending_map.cfg_add_gw("GW1" ,group_key); + pending_map.cfg_add_gw("GW2" ,group_key); + pending_map.cfg_add_gw("GW3" ,group_key); + NvmeNonceVector new_nonces = {"abc", "def","hij"}; + pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces; + pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub); + for(int i=0; i< MAX_SUPPORTED_ANA_GROUPS; i++){ + pending_map.created_gws[group_key]["GW1"].blocklist_data[i].osd_epoch = i*2; + pending_map.created_gws[group_key]["GW1"].blocklist_data[i].is_failover = false; + } + + pending_map.created_gws[group_key]["GW2"].nonce_map[2] = new_nonces; + pending_map.start_timer(gw_id, group_key, group, 30); + + m = MNVMeofGwMap(pending_map); + ls.push_back(new MNVMeofGwMapDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(MNVMeofGwMapDencoder) + +TYPE(MNVMeofGwBeacon) +// Implement the dencoder interface +class MNVMeofGwBeaconDencoder { + private: + MNVMeofGwBeacon m; + public: + MNVMeofGwBeaconDencoder() = default; + explicit MNVMeofGwBeaconDencoder(const MNVMeofGwBeacon& m) : m(m) {} + + void encode(bufferlist& bl) const { + using ceph::encode; + encode(t, bl); + } + void decode(bufferlist::const_iterator &p) { + using ceph::decode; + decode(t, p); + } + void dump(Formatter* f) { + f->dump_stream("MNVMeofGwBeacon") << m; + } + + static void generate_test_instances(std::list& ls) { + std::string gw_id = "GW"; + std::string gw_pool = "pool"; + std::string gw_group = "group"; + gw_availability_t availability = gw_availability_t::GW_AVAILABLE; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + std::string nqn = "nqn"; + BeaconSubsystem sub = { nqn, {}, {} }; + BeaconSubsystems subs = { sub }; + epoch_t osd_epoch = 17; + epoch_t gwmap_epoch = 42; + m = MNVMeofGwBeacon( + gw_id, + gw_pool, + gw_group, + subs, + availability, + osd_epoch, + gwmap_epoch); + + ls.push_back(new MNVMeofGwBeaconDencoder(m)); + + } +}; +WRITE_CLASS_ENCODER(MNVMeofGwBeaconDencoder) + + +#endif // WITH_NVMEOF_GATEWAY_MONITOR_CLIENT + +#endif // CEPH_NVMEOF_TYPES_H