#ifndef CEPH_COMMON_ADMIN_SOCKET_H
#define CEPH_COMMON_ADMIN_SOCKET_H
+#ifdef WITH_SEASTAR
+#include "crimson/admin/admin_socket.h"
+#else
+
#include <condition_variable>
#include <mutex>
#include <string>
};
#endif
+#endif
INTERFACE_LINK_LIBRARIES Seastar::seastar)
set(crimson_common_srcs
+ admin/admin_socket.cc
+ admin/osd_admin.cc
common/buffer_io.cc
common/config_proxy.cc
common/perf_counters_collection.cc
# - the logging is sent to Seastar backend
# - and the template parameter of lock_policy is SINGLE
add_library(crimson-common STATIC
- ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc
${PROJECT_SOURCE_DIR}/src/common/admin_socket_client.cc
${PROJECT_SOURCE_DIR}/src/common/bit_str.cc
${PROJECT_SOURCE_DIR}/src/common/bloom_filter.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/admin/admin_socket.h"
+
+#include <boost/algorithm/string.hpp>
+#include <fmt/format.h>
+#include <seastar/net/api.hh>
+#include <seastar/net/inet_address.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/util/std-compat.hh>
+
+#include "common/version.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Socket.h"
+
+/**
+ * A Crimson-wise version of the admin socket - implementation file
+ *
+ * \todo handle the unlinking of the admin socket. Note that 'cleanup_files'
+ * at-exit functionality is not yet implemented in Crimson.
+ */
+
+namespace {
+seastar::logger& logger()
+{
+ return crimson::get_logger(ceph_subsys_osd);
+}
+} // namespace
+
+namespace crimson::admin {
+
+// Hooks table - iterator support
+
+AdminHooksIter AdminSocket::begin() const
+{
+ AdminHooksIter it{ *this };
+ return it;
+}
+
+AdminHooksIter AdminSocket::end() const
+{
+ AdminHooksIter it{ *this, true };
+ return it;
+}
+
+/**
+ * note that 'end-flag' is an optional parameter, and is only used internally
+ * (to signal the end() state).
+ */
+AdminHooksIter::AdminHooksIter(const AdminSocket& master, bool end_flag)
+ : m_master{ master }, m_end_marker{ end_flag }
+{
+ if (end_flag) {
+ // create the equivalent of servers.end();
+ m_miter = m_master.servers.cend();
+ } else {
+ m_miter = m_master.servers.cbegin();
+ m_siter = m_miter->second.m_hooks.begin();
+ }
+}
+
+AdminHooksIter& AdminHooksIter::operator++()
+{
+ if (!m_end_marker) {
+ ++m_siter;
+ if (m_siter == m_miter->second.m_hooks.end()) {
+ // move to the next server-block
+ m_miter++;
+ if (m_miter == m_master.servers.end()) {
+ m_end_marker = true;
+ return *this;
+ }
+
+ m_siter = m_miter->second.m_hooks.begin();
+ }
+ }
+ return *this;
+}
+
+seastar::future<AsokRegistrationRes> AdminSocket::register_server(
+ hook_server_tag server_tag, const std::vector<AsokServiceDef>& apis_served)
+{
+ return seastar::with_lock(
+ servers_tbl_rwlock,
+ [this, server_tag, &apis_served]() -> AsokRegistrationRes {
+ auto ne = servers.try_emplace(server_tag, apis_served);
+ // was this server tag already registered?
+ if (!ne.second) {
+ return {};
+ }
+ logger().info("register_server(): pid:{} ^:{} (tag: {})", (int)getpid(),
+ (uint64_t)(this), (uint64_t)(server_tag));
+ return this->shared_from_this();
+ });
+}
+
+seastar::future<> AdminSocket::unregister_server(hook_server_tag server_tag)
+{
+ logger().debug("{}: pid:{} server tag: {})", __func__, (int)getpid(),
+ (uint64_t)(server_tag));
+
+ return seastar::with_lock(servers_tbl_rwlock, [this, server_tag] {
+ if (auto erased = servers.erase(server_tag); erased == 0) {
+ logger().warn("unregister_server(): unregistering a "
+ "non-existing registration (tag: {})",
+ (uint64_t)(server_tag));
+ }
+ });
+}
+
+seastar::future<> AdminSocket::unregister_server(hook_server_tag server_tag,
+ AdminSocketRef&& server_ref)
+{
+ return unregister_server(server_tag).then([ref=std::move(server_ref)] {
+ // reducing the ref-count on us (the ASOK server) by discarding server_ref:
+ }).finally([server_tag] {
+ logger().debug("unregister_server: done {}", (uint64_t)(server_tag));
+ });
+}
+
+AdminSocket::maybe_service_def_t AdminSocket::locate_subcmd(
+ std::string match) const
+{
+ while (!match.empty()) {
+ // try locating this sub-sequence of the incoming command
+ for (auto& [tag, srv] : servers) {
+ for (auto& api : srv.m_hooks) {
+ if (api.command == match) {
+ logger().debug("{}: located {} w/ server {}", __func__, match, tag);
+ return &api;
+ }
+ }
+ }
+ // drop right-most word
+ size_t pos = match.rfind(' ');
+ if (pos == match.npos) {
+ match.clear(); // we fail
+ return {};
+ } else {
+ match.resize(pos);
+ }
+ }
+ return {};
+}
+
+/*
+ * Note: parse_cmd() is executed with servers_tbl_rwlock held as shared
+ */
+AdminSocket::maybe_parsed_t AdminSocket::parse_cmd(std::string cmd,
+ ceph::bufferlist& out)
+{
+ // preliminaries:
+ // - create the formatter specified by the cmd parameters
+ // - locate the "op-code" string (the 'prefix' segment)
+ // - prepare for command parameters extraction via cmdmap_t
+ cmdmap_t cmdmap;
+
+ try {
+ stringstream errss;
+ // note that cmdmap_from_json() may throw on syntax issues
+ if (!cmdmap_from_json({cmd}, &cmdmap, errss)) {
+ logger().error("{}: incoming command error: {}", __func__, errss.str());
+ out.append("error:"s);
+ out.append(errss.str());
+ return maybe_parsed_t{ std::nullopt };
+ }
+ } catch (std::runtime_error& e) {
+ logger().error("{}: incoming command syntax: {}", __func__, cmd);
+ out.append("error: command syntax"s);
+ return maybe_parsed_t{ std::nullopt };
+ }
+
+ string format;
+ string match;
+ std::size_t full_command_seq; // the full sequence, before we start chipping
+ // away the end
+ try {
+ cmd_getval(cmdmap, "format", format);
+ cmd_getval(cmdmap, "prefix", match);
+ full_command_seq = match.length();
+ } catch (const bad_cmd_get& e) {
+ logger().error("{}: invalid syntax: {}", __func__, cmd);
+ out.append("error: command syntax: missing 'prefix'"s);
+ return maybe_parsed_t{ std::nullopt };
+ }
+
+ if (match.empty()) {
+ // no command identified
+ out.append("error: no command identified"s);
+ return maybe_parsed_t{ std::nullopt };
+ }
+
+ // match the incoming op-code to one of the registered APIs
+ auto parsed_cmd = locate_subcmd(match);
+ if (!parsed_cmd.has_value()) {
+ return maybe_parsed_t{ std::nullopt };
+ }
+ return parsed_command_t{ match,
+ cmdmap,
+ format,
+ parsed_cmd.value(),
+ parsed_cmd.value()->hook,
+ full_command_seq };
+}
+
+/*
+ * Note: validate_command() is executed with servers_tbl_rwlock held as shared
+ */
+bool AdminSocket::validate_command(const parsed_command_t& parsed,
+ const std::string& command_text,
+ ceph::bufferlist& out) const
+{
+ // did we receive any arguments apart from the command word(s)?
+ if (parsed.cmd_seq_len == parsed.cmd.length())
+ return true;
+
+ logger().info("{}: validating {} against:{}", __func__, command_text,
+ parsed.api->cmddesc);
+
+ stringstream os; // for possible validation error messages
+ try {
+ // validate_cmd throws on some syntax errors
+ if (validate_cmd(nullptr, parsed.api->cmddesc, parsed.parameters, os)) {
+ return true;
+ }
+ } catch (std::exception& e) {
+ logger().error("{}: validation failure ({} : {}) {}", __func__,
+ command_text, parsed.cmd, e.what());
+ }
+
+ os << "error: command validation failure ";
+ logger().error("{}: validation failure (incoming:{}) {}", __func__,
+ command_text, os.str());
+ out.append(os);
+ return false;
+}
+
+seastar::future<> AdminSocket::finalize_response(
+ seastar::output_stream<char>& out, ceph::bufferlist&& msgs)
+{
+ string outbuf_cont = msgs.to_str();
+ if (outbuf_cont.empty()) {
+ outbuf_cont = " {} ";
+ }
+ uint32_t response_length = htonl(outbuf_cont.length());
+ logger().info("asok response length: {}", outbuf_cont.length());
+
+ return out.write((char*)&response_length, sizeof(uint32_t))
+ .then([&out, outbuf_cont] { return out.write(outbuf_cont.c_str()); });
+}
+
+seastar::future<> AdminSocket::execute_line(std::string cmdline,
+ seastar::output_stream<char>& out)
+{
+ return seastar::with_shared(servers_tbl_rwlock,
+ [this, cmdline, &out]() mutable {
+ ceph::bufferlist err;
+ auto parsed = parse_cmd(cmdline, err);
+ if (!parsed.has_value() ||
+ !validate_command(*parsed, cmdline, err)) {
+ return finalize_response(out, std::move(err));
+ }
+ return parsed->hook->call(parsed->cmd,
+ parsed->format,
+ parsed->parameters).then(
+ [this, &out](auto result) {
+ // add 'failed' to the contents of out_buf? not what
+ // happens in the old code
+ return finalize_response(out, std::move(result));
+ });
+ });
+}
+
+// an input_stream consumer that reads buffer into a std::string up to the first
+// '\0' which indicates the end of command
+struct line_consumer {
+ using tmp_buf = seastar::temporary_buffer<char>;
+ using consumption_result_type =
+ typename seastar::input_stream<char>::consumption_result_type;
+
+ seastar::future<consumption_result_type> operator()(tmp_buf&& buf) {
+ size_t consumed = 0;
+ for (auto c : buf) {
+ consumed++;
+ if (c == '\0' || c == '\n') {
+ buf.trim_front(consumed);
+ return seastar::make_ready_future<consumption_result_type>(
+ consumption_result_type::stop_consuming_type(std::move(buf)));
+ } else {
+ line.push_back(c);
+ }
+ }
+ return seastar::make_ready_future<consumption_result_type>(
+ seastar::continue_consuming{});
+ }
+ std::string line;
+};
+
+seastar::future<> AdminSocket::handle_client(seastar::input_stream<char>& in,
+ seastar::output_stream<char>& out)
+{
+ auto consumer = seastar::make_shared<line_consumer>();
+ return in.consume(*consumer).then([consumer, &out, this] {
+ logger().debug("AdminSocket::handle_client: incoming asok string: {}",
+ consumer->line);
+ return execute_line(consumer->line, out);
+ }).then([&out] {
+ return out.flush();
+ }).finally([&out] {
+ return out.close();
+ }).then([&in] {
+ return in.close();
+ }).handle_exception([](auto ep) {
+ logger().debug("exception on {}: {}", __func__, ep);
+ return seastar::make_ready_future<>();
+ }).discard_result();
+}
+
+/**
+ * \brief continuously run a block of code "within a gate"
+ *
+ * Neither gate closure, nor a failure of the code block we are running, will
+ * cause an exception to be thrown by safe_action_gate_func()
+ */
+template <typename AsyncAction>
+seastar::future<> do_until_gate(seastar::gate& gt, AsyncAction action)
+{
+ auto stop_cond = [>] { return gt.is_closed(); };
+ auto safe_action{ [act = std::move(action), >]() mutable {
+ if (gt.is_closed())
+ return seastar::make_ready_future<>();
+ return with_gate(gt, [act = std::move(act)] {
+ return act().handle_exception([](auto e) {});
+ });
+ } };
+
+ return seastar::do_until(stop_cond, std::move(safe_action)).discard_result();
+}
+
+seastar::future<> AdminSocket::start(const std::string& path)
+{
+ if (path.empty()) {
+ logger().error(
+ "{}: Admin Socket socket path missing from the configuration", __func__);
+ return seastar::now();
+ }
+
+ logger().debug("{}: asok socket path={}", __func__, path);
+ auto sock_path = seastar::socket_address{ seastar::unix_domain_addr{ path } };
+
+ std::ignore = register_admin_hooks().then([this, sock_path](
+ AsokRegistrationRes reg_res) {
+ return seastar::do_with(
+ seastar::engine().listen(sock_path),
+ [this](seastar::server_socket& lstn) {
+ m_server_sock = &lstn; // used for 'abort_accept()'
+ return do_until_gate(arrivals_gate, [&lstn, this] {
+ return lstn.accept().then(
+ [this](seastar::accept_result from_accept) {
+ seastar::connected_socket cn =
+ std::move(from_accept.connection);
+ return do_with(cn.input(), cn.output(), std::move(cn),
+ [this](auto& inp, auto& out, auto& cn) {
+ return handle_client(inp, out).finally([] {
+ ; // left for debugging
+ });
+ });
+ });
+ }).then([] {
+ logger().debug("AdminSocket::init(): admin-sock thread terminated");
+ return seastar::now();
+ });
+ });
+ });
+
+ return seastar::make_ready_future<>();
+}
+
+seastar::future<> AdminSocket::stop()
+{
+ if (m_server_sock && !arrivals_gate.is_closed()) {
+ // note that we check 'is_closed()' as if already closed - the server-sock
+ // may have already been discarded
+ m_server_sock->abort_accept();
+ m_server_sock = nullptr;
+ }
+
+ return seastar::futurize_apply([this] { return arrivals_gate.close(); })
+ .then_wrapped([](seastar::future<> res) {
+ if (res.failed()) {
+ std::ignore = res.handle_exception([](std::exception_ptr eptr) {
+ return seastar::make_ready_future<>();
+ });
+ }
+ return seastar::make_ready_future<>();
+ }).handle_exception(
+ [](std::exception_ptr eptr) { return seastar::make_ready_future<>();
+ }).finally([] { return seastar::make_ready_future<>(); });
+}
+
+/////////////////////////////////////////
+// the internal hooks
+/////////////////////////////////////////
+
+class VersionHook final : public AdminSocketHook {
+ public:
+ seastar::future<bufferlist> call(std::string_view,
+ std::string_view format,
+ const cmdmap_t&) const final
+ {
+ unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+ f->open_object_section("version");
+ f->dump_string("version", ceph_version_to_str());
+ f->dump_string("release", ceph_release_to_str());
+ f->dump_string("release_type", ceph_release_type());
+ f->close_section();
+ bufferlist out;
+ f->flush(out);
+ return seastar::make_ready_future<bufferlist>(std::move(out));
+ }
+};
+
+/**
+ Note that the git_version command is expected to return a 'version' JSON
+ segment.
+*/
+class GitVersionHook final : public AdminSocketHook {
+ public:
+ seastar::future<bufferlist> call(std::string_view command,
+ std::string_view format,
+ const cmdmap_t&) const final
+ {
+ unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+ f->open_object_section("version");
+ f->dump_string("git_version", git_version_to_str());
+ f->close_section();
+ ceph::bufferlist out;
+ f->flush(out);
+ return seastar::make_ready_future<bufferlist>(std::move(out));
+ }
+};
+
+class HelpHook final : public AdminSocketHook {
+ const AdminSocket& m_as;
+
+ public:
+ explicit HelpHook(const AdminSocket& as) : m_as{as} {}
+
+ seastar::future<bufferlist> call(std::string_view command,
+ std::string_view format,
+ const cmdmap_t& cmdmap) const final
+ {
+ return seastar::with_shared(m_as.servers_tbl_rwlock,
+ [this, format] {
+ unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+ f->open_object_section("help");
+ for (const auto& hook : m_as) {
+ if (!hook->help.empty()) {
+ f->dump_string(hook->command.c_str(), hook->help);
+ }
+ }
+ f->close_section();
+ ceph::bufferlist out;
+ f->flush(out);
+ return seastar::make_ready_future<bufferlist>(std::move(out));
+ });
+ }
+};
+
+class GetdescsHook final : public AdminSocketHook {
+ const AdminSocket& m_as;
+
+ public:
+ explicit GetdescsHook(const AdminSocket& as) : m_as{ as } {}
+
+ seastar::future<bufferlist> call(std::string_view command,
+ std::string_view format,
+ const cmdmap_t& cmdmap) const final
+ {
+ unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+ return seastar::with_shared(m_as.servers_tbl_rwlock, [this, f=move(f)] {
+ int cmdnum = 0;
+ f->open_object_section("command_descriptions");
+ for (const auto& hook : m_as) {
+ auto secname = fmt::format("cmd {:>03}", cmdnum);
+ dump_cmd_and_help_to_json(f.get(), CEPH_FEATURES_ALL, secname,
+ hook->cmddesc, hook->help);
+ cmdnum++;
+ }
+ f->close_section();
+ ceph::bufferlist out;
+ f->flush(out);
+ return seastar::make_ready_future<bufferlist>(std::move(out));
+ });
+ }
+};
+
+/// the hooks that are served directly by the admin_socket server
+seastar::future<AsokRegistrationRes> AdminSocket::register_admin_hooks()
+{
+ version_hook = std::make_unique<VersionHook>();
+ git_ver_hook = std::make_unique<GitVersionHook>();
+ help_hook = std::make_unique<HelpHook>(*this);
+ getdescs_hook = std::make_unique<GetdescsHook>(*this);
+
+ // clang-format off
+ static const std::vector<AsokServiceDef> internal_hooks_tbl{
+ AsokServiceDef{"version", "version", version_hook.get(), "get ceph version"}
+ , AsokServiceDef{"git_version", "git_version", git_ver_hook.get(), "get git sha1"}
+ , AsokServiceDef{"help", "help", help_hook.get(), "list available commands"}
+ , AsokServiceDef{"get_command_descriptions", "get_command_descriptions",
+ getdescs_hook.get(), "list available commands"}
+ };
+ // clang-format on
+
+ // server_registration() returns a shared pointer to the AdminSocket
+ // server, i.e. to us. As we already have shared ownership of this object,
+ // we do not need it.
+ return register_server(AdminSocket::hook_server_tag{ this },
+ internal_hooks_tbl);
+}
+
+} // namespace crimson::admin
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+/**
+ A Crimson-wise version of the src/common/admin_socket.h
+
+ Note: assumed to be running on a single core.
+*/
+#include <map>
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/iostream.hh>
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/net/api.hh>
+
+#include "common/cmdparse.h"
+
+class CephContext;
+
+using namespace std::literals;
+
+inline constexpr auto CEPH_ADMIN_SOCK_VERSION = "2"sv;
+
+namespace crimson::admin {
+
+class AdminSocket;
+
+/**
+ * A specific hook must implement exactly one of the two interfaces:
+ * (1) call(command, cmdmap, format, out)
+ * or
+ * (2) exec_command(formatter, command, cmdmap, format, out)
+ *
+ * The default implementation of (1) above calls exec_command() after handling
+ * most of the boiler-plate choirs:
+ * - setting up the formatter, with an appropiate 'section' already opened;
+ * - handling possible failures (exceptions or future_exceptions) returned
+ * by (2)
+ * - flushing the output to the outgoing bufferlist.
+ */
+class AdminSocketHook {
+ public:
+ /**
+ * \retval 'false' for hook execution errors
+ */
+ virtual seastar::future<ceph::bufferlist>
+ call(std::string_view command,
+ std::string_view format,
+ const cmdmap_t& cmdmap) const = 0;
+ virtual ~AdminSocketHook() {}
+};
+
+/**
+ * The details of a single API in a server's hooks block
+ */
+struct AsokServiceDef {
+ const std::string command; ///< the sequence of words that should be used
+ const std::string cmddesc; ///< the command syntax
+ const AdminSocketHook* hook;
+ const std::string help; ///< help message
+};
+
+class AdminHooksIter; ///< an iterator over all APIs in all server blocks
+
+/// a ref-count owner of the AdminSocket, used to guarantee its existence until
+/// all server-blocks are unregistered
+using AdminSocketRef = seastar::lw_shared_ptr<AdminSocket>;
+
+using AsokRegistrationRes =
+ std::optional<AdminSocketRef>; // holding the server alive until after our
+ // unregistration
+
+class AdminSocket : public seastar::enable_lw_shared_from_this<AdminSocket> {
+ public:
+ AdminSocket() = default;
+ ~AdminSocket() = default;
+
+ AdminSocket(const AdminSocket&) = delete;
+ AdminSocket& operator=(const AdminSocket&) = delete;
+ AdminSocket(AdminSocket&&) = delete;
+ AdminSocket& operator=(AdminSocket&&) = delete;
+
+ using hook_server_tag = const void*;
+
+ /**
+ * create the async Seastar thread that handles asok commands arriving
+ * over the socket.
+ */
+ seastar::future<> start(const std::string& path);
+
+ seastar::future<> stop();
+
+ /**
+ * register an admin socket hooks server
+ *
+ * The server registers a set of APIs under a common hook_server_tag.
+ *
+ * Commands (APIs) are registered under a command string. Incoming
+ * commands are split by spaces and matched against the longest
+ * registered command. For example, if 'foo' and 'foo bar' are
+ * registered, and an incoming command is 'foo bar baz', it is
+ * matched with 'foo bar', while 'foo fud' will match 'foo'.
+ *
+ * The entire incoming command string is passed to the registered
+ * hook.
+ *
+ * \param server_tag a tag identifying the server registering the hook
+ * \param apis_served a vector of the commands served by this server. Each
+ * command registration includes its identifying command string, the
+ * expected call syntax, and some help text.
+ *
+ * A note regarding the help text: if empty, command will not be
+ * included in 'help' output.
+ *
+ * \retval a shared ptr to the asok server itself, or nullopt if
+ * a block with same tag is already registered.
+ */
+ seastar::future<AsokRegistrationRes> register_server(
+ hook_server_tag server_tag, const std::vector<AsokServiceDef>& apis_served);
+
+ /**
+ * unregister all hooks registered by this hooks-server. The caller
+ * gives up on its shared-ownership of the asok server once the
+ * deregistration is complete.
+ */
+ seastar::future<> unregister_server(hook_server_tag server_tag,
+ AdminSocketRef&& server_ref);
+
+ private:
+ /**
+ * the result of analyzing an incoming command, and locating it in
+ * the registered APIs collection.
+ */
+ struct parsed_command_t {
+ std::string cmd;
+ cmdmap_t parameters;
+ std::string format;
+ const AsokServiceDef* api;
+ const AdminSocketHook* hook;
+ /**
+ * the length of the whole command-sequence under the 'prefix' header
+ */
+ std::size_t cmd_seq_len;
+ };
+ // and the shorthand:
+ using maybe_parsed_t = std::optional<AdminSocket::parsed_command_t>;
+
+ /**
+ * Registering the APIs that are served directly by the admin_socket server.
+ */
+ seastar::future<AsokRegistrationRes> register_admin_hooks();
+
+ /**
+ * unregister all hooks registered by this hooks-server
+ */
+ seastar::future<> unregister_server(hook_server_tag server_tag);
+
+ seastar::future<> handle_client(seastar::input_stream<char>& inp,
+ seastar::output_stream<char>& out);
+
+ seastar::future<> execute_line(std::string cmdline,
+ seastar::output_stream<char>& out);
+
+ seastar::future<> finalize_response(seastar::output_stream<char>& out,
+ ceph::bufferlist&& msgs);
+
+ bool validate_command(const parsed_command_t& parsed,
+ const std::string& command_text,
+ ceph::bufferlist& out) const;
+
+ /**
+ * Non-owning ptr to the UNIX-domain "server-socket".
+ * Named here to allow a call to abort_accept().
+ */
+ seastar::api_v2::server_socket* m_server_sock{ nullptr };
+
+ /**
+ * stopping incoming ASOK requests at shutdown
+ */
+ seastar::gate arrivals_gate;
+
+ std::unique_ptr<AdminSocketHook> version_hook;
+ std::unique_ptr<AdminSocketHook> git_ver_hook;
+ std::unique_ptr<AdminSocketHook> the0_hook;
+ std::unique_ptr<AdminSocketHook> help_hook;
+ std::unique_ptr<AdminSocketHook> getdescs_hook;
+ std::unique_ptr<AdminSocketHook> test_throw_hook; // for dev unit-tests
+
+ /**
+ * parse the incoming command line into the sequence of words that identifies
+ * the API, and into its arguments. Locate the command string in the
+ * registered blocks.
+ */
+ maybe_parsed_t parse_cmd(std::string command_text, bufferlist& out);
+
+ struct server_block {
+ server_block(const std::vector<AsokServiceDef>& hooks) : m_hooks{ hooks } {}
+ const std::vector<AsokServiceDef>& m_hooks;
+ };
+
+ /**
+ * The servers table is protected by a rw-lock, to be acquired exclusively
+ * only when registering or removing a server.
+ * The lock is locked-shared when executing any hook.
+ */
+ mutable seastar::shared_mutex servers_tbl_rwlock;
+ std::map<hook_server_tag, server_block> servers;
+
+ using maybe_service_def_t = std::optional<const AsokServiceDef*>;
+
+ /**
+ * Find the longest subset of words in 'match' that is a registered API (in
+ * any of the servers' control blocks).
+ * locate_subcmd() is expected to be called with the servers table RW-lock
+ * held.
+ */
+ maybe_service_def_t locate_subcmd(std::string match) const;
+
+ public:
+ /**
+ * iterator support
+ */
+ AdminHooksIter begin() const;
+ AdminHooksIter end() const;
+
+ using ServersListIt = std::map<hook_server_tag, server_block>::const_iterator;
+ using ServerApiIt = std::vector<AsokServiceDef>::const_iterator;
+
+ friend class AdminSocketTest;
+ friend class HelpHook;
+ friend class GetdescsHook;
+ friend class AdminHooksIter;
+};
+
+/**
+ * An iterator over all registered APIs.
+ */
+struct AdminHooksIter
+ : public std::iterator<std::output_iterator_tag, AsokServiceDef> {
+ public:
+ explicit AdminHooksIter(const AdminSocket& master, bool end_flag = false);
+
+ ~AdminHooksIter() = default;
+
+ const AsokServiceDef* operator*() const
+ {
+ return &(*m_siter);
+ }
+
+ /**
+ * The (in)equality test is only used to compare to 'end'.
+ */
+ bool operator!=(const AdminHooksIter& other) const
+ {
+ return m_end_marker != other.m_end_marker;
+ }
+
+ AdminHooksIter& operator++();
+
+ private:
+ const AdminSocket& m_master;
+ AdminSocket::ServersListIt m_miter;
+ AdminSocket::ServerApiIt m_siter;
+ bool m_end_marker;
+
+ friend class AdminSocket;
+};
+
+} // namespace crimson::admin
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/admin/osd_admin.h"
+
+#include <iostream>
+
+#include <boost/algorithm/string.hpp>
+#include <fmt/format.h>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/thread.hh>
+
+#include "common/config.h"
+#include "crimson/admin/admin_socket.h"
+#include "crimson/common/log.h"
+#include "crimson/osd/exceptions.h"
+#include "crimson/osd/osd.h"
+
+using crimson::osd::OSD;
+
+namespace {
+seastar::logger& logger()
+{
+ return crimson::get_logger(ceph_subsys_osd);
+}
+} // namespace
+
+namespace crimson::admin {
+
+using crimson::common::local_conf;
+
+/**
+ * the hooks and states needed to handle OSD asok requests
+ */
+class OsdAdminImp {
+ friend class OsdAdmin;
+ friend class OsdAdminHookBase;
+
+ OSD* m_osd;
+
+ // shared-ownership of the socket server itself, to guarantee its existence
+ // until we have a chance to remove our registration:
+ AsokRegistrationRes m_socket_server;
+
+ /**
+ * Common code for all OSD admin hooks.
+ * Adds access to the owning OSD.
+ */
+ class OsdAdminHookBase : public AdminSocketHook {
+ protected:
+ explicit OsdAdminHookBase(OsdAdminImp& master) : m_osd_admin{ master } {}
+ struct tell_result_t {
+ int ret = 0;
+ string err;
+ };
+ /// the specific command implementation
+ virtual seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+ Formatter* f) const = 0;
+ seastar::future<bufferlist> call(std::string_view command,
+ std::string_view format,
+ const cmdmap_t& cmdmap) const final
+ {
+ unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+ auto ret = seastar::do_with(std::move(f), [cmdmap, this] (auto& f) {
+ Formatter* formatter = f.get();
+ return tell(cmdmap, formatter).then([formatter](auto result) {
+ bufferlist out;
+ if (auto& [ret, err] = result; ret < 0) {
+ out.append(fmt::format("ERROR: {}\n", cpp_strerror(ret)));
+ out.append(err);
+ } else {
+ formatter->flush(out);
+ }
+ return seastar::make_ready_future<bufferlist>(std::move(out));
+ });
+ });
+ return ret;
+ }
+ OsdAdminImp& m_osd_admin;
+ };
+
+ /**
+ * An OSD admin hook: OSD status
+ */
+ class OsdStatusHook : public OsdAdminHookBase {
+ public:
+ explicit OsdStatusHook(OsdAdminImp& master) : OsdAdminHookBase(master){};
+ seastar::future<tell_result_t> tell(const cmdmap_t&,
+ Formatter* f) const final
+ {
+ f->open_object_section("status");
+ f->dump_stream("cluster_fsid")
+ << m_osd_admin.osd_superblock().cluster_fsid;
+ f->dump_stream("osd_fsid") << m_osd_admin.osd_superblock().osd_fsid;
+ f->dump_unsigned("whoami", m_osd_admin.osd_superblock().whoami);
+ f->dump_string("state", m_osd_admin.osd_state_name());
+ f->dump_unsigned("oldest_map", m_osd_admin.osd_superblock().oldest_map);
+ f->dump_unsigned("newest_map", m_osd_admin.osd_superblock().newest_map);
+ f->dump_unsigned("num_pgs", m_osd_admin.m_osd->pg_map.get_pgs().size());
+ f->close_section();
+ return seastar::make_ready_future<tell_result_t>();
+ }
+ };
+
+ /**
+ * An OSD admin hook: send beacon
+ */
+ class SendBeaconHook : public OsdAdminHookBase {
+ public:
+ explicit SendBeaconHook(OsdAdminImp& master) : OsdAdminHookBase(master){};
+ seastar::future<tell_result_t> tell(const cmdmap_t&, Formatter*) const final
+ {
+ if (m_osd_admin.get_osd_state().is_active()) {
+ return m_osd_admin.m_osd->send_beacon().then([] {
+ return seastar::make_ready_future<tell_result_t>();
+ });
+ } else {
+ return seastar::make_ready_future<tell_result_t>();
+ }
+ }
+ };
+
+ /**
+ * A CephContext admin hook: listing the configuration values
+ */
+ class ConfigShowHook : public OsdAdminHookBase {
+ public:
+ explicit ConfigShowHook(OsdAdminImp& master)
+ : OsdAdminHookBase(master) {}
+ seastar::future<tell_result_t> tell(const cmdmap_t&, Formatter* f) const final
+ {
+ f->open_object_section("config_show");
+ local_conf().show_config(f);
+ f->close_section();
+ return seastar::make_ready_future<tell_result_t>();
+ }
+ };
+
+ /**
+ * A CephContext admin hook: fetching the value of a specific
+ * configuration item
+ */
+ class ConfigGetHook : public OsdAdminHookBase {
+ public:
+ explicit ConfigGetHook(OsdAdminImp& master)
+ : OsdAdminHookBase(master) {}
+ seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+ ceph::Formatter* f) const final
+ {
+ std::string var;
+ if (!cmd_getval(cmdmap, "var", var)) {
+ // should have been caught by 'validate()'
+ return seastar::make_ready_future<tell_result_t>(
+ tell_result_t{-EINVAL, "syntax error: 'config get <var>'"});
+ }
+ try {
+ f->open_object_section("config_get");
+ std::string conf_val =
+ local_conf().get_val<std::string>(var);
+ f->dump_string(var.c_str(), conf_val.c_str());
+ f->close_section();
+ return seastar::make_ready_future<tell_result_t>();
+ } catch (const boost::bad_get&) {
+ return seastar::make_ready_future<tell_result_t>(
+ tell_result_t{-EINVAL, fmt::format("unrecognized option {}", var)});
+ }
+ }
+ };
+
+ /**
+ * A CephContext admin hook: setting the value of a specific configuration
+ * item (a real example: {"prefix": "config set", "var":"debug_osd", "val":
+ * ["30/20"]} )
+ */
+ class ConfigSetHook : public OsdAdminHookBase {
+ public:
+ explicit ConfigSetHook(OsdAdminImp& master)
+ : OsdAdminHookBase(master) {}
+
+ seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+ ceph::Formatter* f) const final
+ {
+ std::string var;
+ std::vector<std::string> new_val;
+ if (!cmd_getval(cmdmap, "var", var) ||
+ !cmd_getval(cmdmap, "val", new_val)) {
+ return seastar::make_ready_future<tell_result_t>(
+ tell_result_t{-EINVAL, "syntax error: 'config set <var> <value>'"});
+ }
+ // val may be multiple words
+ string valstr = str_join(new_val, " ");
+ return local_conf().set_val(var, valstr).then([f] {
+ f->open_object_section("config_set");
+ f->dump_string("success", "");
+ f->close_section();
+ return seastar::make_ready_future<tell_result_t>();
+ }).handle_exception_type([](std::invalid_argument& e) {
+ return seastar::make_ready_future<tell_result_t>(
+ tell_result_t{-EINVAL, e.what()});
+ });
+ }
+ };
+
+ /**
+ * A CephContext admin hook: calling assert (if allowed by
+ * 'debug_asok_assert_abort')
+ */
+ class AssertAlwaysHook : public OsdAdminHookBase {
+ public:
+ explicit AssertAlwaysHook(OsdAdminImp& master)
+ : OsdAdminHookBase(master) {}
+ seastar::future<tell_result_t> tell(const cmdmap_t&,
+ ceph::Formatter*) const final
+ {
+ if (local_conf().get_val<bool>("debug_asok_assert_abort")) {
+ ceph_assert_always(0);
+ return seastar::make_ready_future<tell_result_t>();
+ } else {
+ return seastar::make_ready_future<tell_result_t>(
+ tell_result_t{-EPERM, "configuration set to disallow asok assert"});
+ }
+ }
+ };
+
+ /**
+ * provide the hooks with access to OSD internals
+ */
+ const OSDSuperblock& osd_superblock() const
+ {
+ return m_osd->superblock;
+ }
+
+ OSDState get_osd_state() const
+ {
+ return m_osd->state;
+ }
+
+ string_view osd_state_name() const
+ {
+ return m_osd->state.to_string();
+ }
+
+ OsdStatusHook osd_status_hook{*this};
+ SendBeaconHook send_beacon_hook{*this};
+ ConfigShowHook config_show_hook{*this};
+ ConfigGetHook config_get_hook{*this};
+ ConfigSetHook config_set_hook{*this};
+ AssertAlwaysHook assert_hook{*this};
+
+ public:
+ OsdAdminImp(OSD* osd)
+ : m_osd{ osd }
+ {}
+
+ ~OsdAdminImp() = default;
+
+ seastar::future<> register_admin_commands()
+ {
+ static const std::vector<AsokServiceDef> hooks_tbl{
+ // clang-format off
+ AsokServiceDef{"status", "status", &osd_status_hook,
+ "OSD status"}
+ , AsokServiceDef{"send_beacon", "send_beacon", &send_beacon_hook,
+ "send OSD beacon to mon immediately"}
+ , AsokServiceDef{"config show", "config show", &config_show_hook,
+ "dump current config settings" }
+ , AsokServiceDef{"config get", "config get name=var,type=CephString",
+ &config_get_hook,
+ "config get <field>: get the config value" }
+ , AsokServiceDef{"config set",
+ "config set name=var,type=CephString name=val,type=CephString,n=N",
+ &config_set_hook,
+ "config set <field> <val> [<val> ...]: set a config variable" }
+ , AsokServiceDef{ "assert", "assert", &assert_hook, "asserts" }
+ // clang-format on
+ };
+
+ return m_osd->asok
+ ->register_server(AdminSocket::hook_server_tag{ this }, hooks_tbl)
+ .then([this](AsokRegistrationRes res) { m_socket_server = res; });
+ }
+
+ seastar::future<> unregister_admin_commands()
+ {
+ if (!m_socket_server.has_value()) {
+ logger().warn("{}: OSD asok APIs already removed", __func__);
+ return seastar::now();
+ }
+
+ AdminSocketRef srv{ std::move(m_socket_server.value()) };
+ m_socket_server.reset();
+
+ return m_osd->asok->unregister_server(AdminSocket::hook_server_tag{ this },
+ std::move(srv));
+ }
+};
+
+//
+// some PIMPL details:
+//
+OsdAdmin::OsdAdmin(OSD* osd)
+ : m_imp{ std::make_unique<crimson::admin::OsdAdminImp>(osd) }
+{}
+
+seastar::future<> OsdAdmin::register_admin_commands()
+{
+ return m_imp->register_admin_commands();
+}
+
+seastar::future<> OsdAdmin::unregister_admin_commands()
+{
+ return seastar::do_with(std::move(m_imp), [](auto& detached_imp) {
+ return detached_imp->unregister_admin_commands();
+ });
+}
+
+OsdAdmin::~OsdAdmin() = default;
+} // namespace crimson::admin
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <memory>
+
+#include "crimson/common/config_proxy.h"
+
+namespace crimson::osd {
+class OSD;
+}
+
+namespace crimson::admin {
+class OsdAdminImp;
+
+/**
+ * \brief implementation of the configuration-related 'admin_socket' API of
+ * (Crimson) OSD
+ *
+ * Main functionality:
+ * - fetching OSD status data
+ * - ...
+ */
+class OsdAdmin {
+ public:
+ OsdAdmin(crimson::osd::OSD* osd);
+ ~OsdAdmin();
+ seastar::future<> register_admin_commands();
+ seastar::future<> unregister_admin_commands();
+ private:
+ std::unique_ptr<crimson::admin::OsdAdminImp> m_imp;
+};
+
+} // namespace crimson::admin
heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+ asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
+ admin{std::make_unique<crimson::admin::OsdAdmin>(this)},
osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
}).then([this] {
return heartbeat->start(public_msgr.get_myaddrs(),
cluster_msgr.get_myaddrs());
+ }).then([this] {
+ // create the admin-socket server, and the objects that register
+ // to handle incoming commands
+ return start_asok_admin();
}).then([this] {
return start_boot();
});
}
}
+/*
+ The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
+ to handle) registered to it:
+ - OSD's specific commands are handled by the OSD object;
+ - there are some common commands registered to be directly handled by the AdminSocket object
+ itself.
+*/
+seastar::future<> OSD::start_asok_admin()
+{
+ auto asok_path = local_conf().get_val<std::string>("admin_socket");
+
+ return asok->start(asok_path).then([this] {
+ // register OSD-specific admin-socket hooks
+ return admin->register_admin_commands();
+ });
+}
+
+/*
+ Note: stop_asok_admin() is executed as part of a destruction process,
+ and may occur even before 'start' is complete. We thus take nothing for granted,
+ and all interfaces are checked for existence.
+*/
+seastar::future<> OSD::stop_asok_admin()
+{
+ return ([this] {
+ if (admin) {
+ return admin->unregister_admin_commands();
+ } else {
+ return seastar::make_ready_future<>();
+ }
+ })().then([this] {
+ if (asok) {
+ return asok->stop().then([this]() {
+ asok.release();
+ return seastar::make_ready_future<>();
+ });
+ } else {
+ return seastar::make_ready_future<>();
+ }
+ }).handle_exception([](auto ep) {
+ logger().error("exception on admin-stop: {}", ep);
+ return seastar::make_ready_future<>();
+ }).finally([] {
+ logger().info("OSD::stop_asok_admin(): Admin-sock service destructed");
+ });
+}
+
seastar::future<> OSD::stop()
{
logger().info("stop");
// see also OSD::shutdown()
state.set_stopping();
+
return gate.close().then([this] {
+ return stop_asok_admin();
+ }).then([this] {
return heartbeat->stop();
}).then([this] {
return monc->stop();
#include "crimson/common/type_helpers.h"
#include "crimson/common/auth_handler.h"
+#include "crimson/admin/admin_socket.h"
+#include "crimson/admin/osd_admin.h"
#include "crimson/common/simple_lru.h"
#include "crimson/common/shared_lru.h"
#include "crimson/mgr/client.h"
std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> heartbeat_timer;
+ // admin-socket
+ seastar::lw_shared_ptr<crimson::admin::AdminSocket> asok;
+ std::unique_ptr<crimson::admin::OsdAdmin> admin;
+
public:
OSD(int id, uint32_t nonce,
crimson::net::Messenger& cluster_msgr,
void check_osdmap_features();
+ seastar::future<> stop_asok_admin();
+ seastar::future<> start_asok_admin();
+
public:
OSDMapGate osdmap_gate;
seastar::future<> update_heartbeat_peers();
friend class PGAdvanceMap;
+ friend class crimson::admin::OsdAdminImp;
};
}