]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common,crimson: supporting admin-socket commands
authorRonen Friedman <rfriedma@redhat.com>
Sun, 8 Dec 2019 14:48:59 +0000 (16:48 +0200)
committerRonen Friedman <rfriedma@redhat.com>
Thu, 13 Feb 2020 15:52:12 +0000 (17:52 +0200)
- basic infrastructure:
  - the seastar::thread that handles commands arriving over the UNIX_domain socket
  - command dispatching
- the implementation of some commands.
- simplified locking scheme: access to the API blocks (the registration elements) is
  now under one course-grained RW-lock.
- OSD shutdown support;

Signed-off-by: Ronen Friedman <rfriedma@redhat.com>
src/common/admin_socket.h
src/crimson/CMakeLists.txt
src/crimson/admin/admin_socket.cc [new file with mode: 0644]
src/crimson/admin/admin_socket.h [new file with mode: 0644]
src/crimson/admin/osd_admin.cc [new file with mode: 0644]
src/crimson/admin/osd_admin.h [new file with mode: 0644]
src/crimson/osd/osd.cc
src/crimson/osd/osd.h

index ee68d3fcabca8ed92b545aceee8a7f31426bbe9f..607dfc257a99c5ad426fcd442401e3d71fd28193 100644 (file)
 #ifndef CEPH_COMMON_ADMIN_SOCKET_H
 #define CEPH_COMMON_ADMIN_SOCKET_H
 
+#ifdef WITH_SEASTAR
+#include "crimson/admin/admin_socket.h"
+#else
+
 #include <condition_variable>
 #include <mutex>
 #include <string>
@@ -211,3 +215,4 @@ private:
 };
 
 #endif
+#endif
index 3d2ffc066efb609e4b14aa4652f125b38f82352b..33a7ac44aa558e745e6fc619928825ff5638f237 100644 (file)
@@ -4,6 +4,8 @@ set_target_properties(crimson::cflags PROPERTIES
   INTERFACE_LINK_LIBRARIES Seastar::seastar)
 
 set(crimson_common_srcs
+  admin/admin_socket.cc
+  admin/osd_admin.cc
   common/buffer_io.cc
   common/config_proxy.cc
   common/perf_counters_collection.cc
@@ -14,7 +16,6 @@ set(crimson_common_srcs
 #  - the logging is sent to Seastar backend
 #  - and the template parameter of lock_policy is SINGLE
 add_library(crimson-common STATIC
-  ${PROJECT_SOURCE_DIR}/src/common/admin_socket.cc
   ${PROJECT_SOURCE_DIR}/src/common/admin_socket_client.cc
   ${PROJECT_SOURCE_DIR}/src/common/bit_str.cc
   ${PROJECT_SOURCE_DIR}/src/common/bloom_filter.cc
diff --git a/src/crimson/admin/admin_socket.cc b/src/crimson/admin/admin_socket.cc
new file mode 100644 (file)
index 0000000..2a85ac9
--- /dev/null
@@ -0,0 +1,526 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/admin/admin_socket.h"
+
+#include <boost/algorithm/string.hpp>
+#include <fmt/format.h>
+#include <seastar/net/api.hh>
+#include <seastar/net/inet_address.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/util/std-compat.hh>
+
+#include "common/version.h"
+#include "crimson/common/log.h"
+#include "crimson/net/Socket.h"
+
+/**
+ *  A Crimson-wise version of the admin socket - implementation file
+ *
+ *  \todo handle the unlinking of the admin socket. Note that 'cleanup_files'
+ *  at-exit functionality is not yet implemented in Crimson.
+ */
+
+namespace {
+seastar::logger& logger()
+{
+  return crimson::get_logger(ceph_subsys_osd);
+}
+}  // namespace
+
+namespace crimson::admin {
+
+// Hooks table - iterator support
+
+AdminHooksIter AdminSocket::begin() const
+{
+  AdminHooksIter it{ *this };
+  return it;
+}
+
+AdminHooksIter AdminSocket::end() const
+{
+  AdminHooksIter it{ *this, true };
+  return it;
+}
+
+/**
+ *  note that 'end-flag' is an optional parameter, and is only used internally
+ * (to signal the end() state).
+ */
+AdminHooksIter::AdminHooksIter(const AdminSocket& master, bool end_flag)
+    : m_master{ master }, m_end_marker{ end_flag }
+{
+  if (end_flag) {
+    // create the equivalent of servers.end();
+    m_miter = m_master.servers.cend();
+  } else {
+    m_miter = m_master.servers.cbegin();
+    m_siter = m_miter->second.m_hooks.begin();
+  }
+}
+
+AdminHooksIter& AdminHooksIter::operator++()
+{
+  if (!m_end_marker) {
+    ++m_siter;
+    if (m_siter == m_miter->second.m_hooks.end()) {
+      // move to the next server-block
+      m_miter++;
+      if (m_miter == m_master.servers.end()) {
+        m_end_marker = true;
+        return *this;
+      }
+
+      m_siter = m_miter->second.m_hooks.begin();
+    }
+  }
+  return *this;
+}
+
+seastar::future<AsokRegistrationRes> AdminSocket::register_server(
+  hook_server_tag server_tag, const std::vector<AsokServiceDef>& apis_served)
+{
+  return seastar::with_lock(
+    servers_tbl_rwlock,
+    [this, server_tag, &apis_served]() -> AsokRegistrationRes {
+      auto ne = servers.try_emplace(server_tag, apis_served);
+      //  was this server tag already registered?
+      if (!ne.second) {
+        return {};
+      }
+      logger().info("register_server(): pid:{} ^:{} (tag: {})", (int)getpid(),
+                   (uint64_t)(this), (uint64_t)(server_tag));
+      return this->shared_from_this();
+    });
+}
+
+seastar::future<> AdminSocket::unregister_server(hook_server_tag server_tag)
+{
+  logger().debug("{}: pid:{} server tag: {})", __func__, (int)getpid(),
+                 (uint64_t)(server_tag));
+
+  return seastar::with_lock(servers_tbl_rwlock, [this, server_tag] {
+    if (auto erased = servers.erase(server_tag); erased == 0) {
+      logger().warn("unregister_server(): unregistering a "
+                   "non-existing registration (tag: {})",
+                   (uint64_t)(server_tag));
+    }
+  });
+}
+
+seastar::future<> AdminSocket::unregister_server(hook_server_tag server_tag,
+                                                 AdminSocketRef&& server_ref)
+{
+  return unregister_server(server_tag).then([ref=std::move(server_ref)] {
+    // reducing the ref-count on us (the ASOK server) by discarding server_ref:
+  }).finally([server_tag] {
+    logger().debug("unregister_server: done {}", (uint64_t)(server_tag));
+  });
+}
+
+AdminSocket::maybe_service_def_t AdminSocket::locate_subcmd(
+  std::string match) const
+{
+  while (!match.empty()) {
+    // try locating this sub-sequence of the incoming command
+    for (auto& [tag, srv] : servers) {
+      for (auto& api : srv.m_hooks) {
+        if (api.command == match) {
+          logger().debug("{}: located {} w/ server {}", __func__, match, tag);
+          return &api;
+        }
+      }
+    }
+    // drop right-most word
+    size_t pos = match.rfind(' ');
+    if (pos == match.npos) {
+      match.clear();  // we fail
+      return {};
+    } else {
+      match.resize(pos);
+    }
+  }
+  return {};
+}
+
+/*
+ * Note: parse_cmd() is executed with servers_tbl_rwlock held as shared
+ */
+AdminSocket::maybe_parsed_t AdminSocket::parse_cmd(std::string cmd,
+                                                  ceph::bufferlist& out)
+{
+  // preliminaries:
+  //   - create the formatter specified by the cmd parameters
+  //   - locate the "op-code" string (the 'prefix' segment)
+  //   - prepare for command parameters extraction via cmdmap_t
+  cmdmap_t cmdmap;
+
+  try {
+    stringstream errss;
+    //  note that cmdmap_from_json() may throw on syntax issues
+    if (!cmdmap_from_json({cmd}, &cmdmap, errss)) {
+      logger().error("{}: incoming command error: {}", __func__, errss.str());
+      out.append("error:"s);
+      out.append(errss.str());
+      return maybe_parsed_t{ std::nullopt };
+    }
+  } catch (std::runtime_error& e) {
+    logger().error("{}: incoming command syntax: {}", __func__, cmd);
+    out.append("error: command syntax"s);
+    return maybe_parsed_t{ std::nullopt };
+  }
+
+  string format;
+  string match;
+  std::size_t full_command_seq;  // the full sequence, before we start chipping
+                                 // away the end
+  try {
+    cmd_getval(cmdmap, "format", format);
+    cmd_getval(cmdmap, "prefix", match);
+    full_command_seq = match.length();
+  } catch (const bad_cmd_get& e) {
+    logger().error("{}: invalid syntax: {}", __func__, cmd);
+    out.append("error: command syntax: missing 'prefix'"s);
+    return maybe_parsed_t{ std::nullopt };
+  }
+
+  if (match.empty()) {
+    // no command identified
+    out.append("error: no command identified"s);
+    return maybe_parsed_t{ std::nullopt };
+  }
+
+  // match the incoming op-code to one of the registered APIs
+  auto parsed_cmd = locate_subcmd(match);
+  if (!parsed_cmd.has_value()) {
+    return maybe_parsed_t{ std::nullopt };
+  }
+  return parsed_command_t{ match,
+                           cmdmap,
+                           format,
+                           parsed_cmd.value(),
+                           parsed_cmd.value()->hook,
+                           full_command_seq };
+}
+
+/*
+ * Note: validate_command() is executed with servers_tbl_rwlock held as shared
+ */
+bool AdminSocket::validate_command(const parsed_command_t& parsed,
+                                   const std::string& command_text,
+                                   ceph::bufferlist& out) const
+{
+  // did we receive any arguments apart from the command word(s)?
+  if (parsed.cmd_seq_len == parsed.cmd.length())
+    return true;
+
+  logger().info("{}: validating {} against:{}", __func__, command_text,
+                parsed.api->cmddesc);
+
+  stringstream os;  // for possible validation error messages
+  try {
+    // validate_cmd throws on some syntax errors
+    if (validate_cmd(nullptr, parsed.api->cmddesc, parsed.parameters, os)) {
+      return true;
+    }
+  } catch (std::exception& e) {
+    logger().error("{}: validation failure ({} : {}) {}", __func__,
+                   command_text, parsed.cmd, e.what());
+  }
+
+  os << "error: command validation failure ";
+  logger().error("{}: validation failure (incoming:{}) {}", __func__,
+                 command_text, os.str());
+  out.append(os);
+  return false;
+}
+
+seastar::future<> AdminSocket::finalize_response(
+  seastar::output_stream<char>& out, ceph::bufferlist&& msgs)
+{
+  string outbuf_cont = msgs.to_str();
+  if (outbuf_cont.empty()) {
+    outbuf_cont = " {} ";
+  }
+  uint32_t response_length = htonl(outbuf_cont.length());
+  logger().info("asok response length: {}", outbuf_cont.length());
+
+  return out.write((char*)&response_length, sizeof(uint32_t))
+    .then([&out, outbuf_cont] { return out.write(outbuf_cont.c_str()); });
+}
+
+seastar::future<> AdminSocket::execute_line(std::string cmdline,
+                                            seastar::output_stream<char>& out)
+{
+  return seastar::with_shared(servers_tbl_rwlock,
+                             [this, cmdline, &out]() mutable {
+    ceph::bufferlist err;
+    auto parsed = parse_cmd(cmdline, err);
+    if (!parsed.has_value() ||
+       !validate_command(*parsed, cmdline, err)) {
+      return finalize_response(out, std::move(err));
+    }
+    return parsed->hook->call(parsed->cmd,
+                             parsed->format,
+                             parsed->parameters).then(
+      [this, &out](auto result) {
+       // add 'failed' to the contents of out_buf? not what
+       // happens in the old code
+        return finalize_response(out, std::move(result));
+      });
+  });
+}
+
+// an input_stream consumer that reads buffer into a std::string up to the first
+// '\0' which indicates the end of command
+struct line_consumer {
+  using tmp_buf = seastar::temporary_buffer<char>;
+  using consumption_result_type =
+    typename seastar::input_stream<char>::consumption_result_type;
+
+  seastar::future<consumption_result_type> operator()(tmp_buf&& buf) {
+    size_t consumed = 0;
+    for (auto c : buf) {
+      consumed++;
+      if (c == '\0' || c == '\n') {
+       buf.trim_front(consumed);
+       return seastar::make_ready_future<consumption_result_type>(
+         consumption_result_type::stop_consuming_type(std::move(buf)));
+      } else {
+       line.push_back(c);
+      }
+    }
+    return seastar::make_ready_future<consumption_result_type>(
+      seastar::continue_consuming{});
+  }
+  std::string line;
+};
+
+seastar::future<> AdminSocket::handle_client(seastar::input_stream<char>& in,
+                                             seastar::output_stream<char>& out)
+{
+  auto consumer = seastar::make_shared<line_consumer>();
+  return in.consume(*consumer).then([consumer, &out, this] {
+    logger().debug("AdminSocket::handle_client: incoming asok string: {}",
+                   consumer->line);
+    return execute_line(consumer->line, out);
+  }).then([&out] {
+    return out.flush();
+  }).finally([&out] {
+    return out.close();
+  }).then([&in] {
+    return in.close();
+  }).handle_exception([](auto ep) {
+    logger().debug("exception on {}: {}", __func__, ep);
+    return seastar::make_ready_future<>();
+  }).discard_result();
+}
+
+/**
+ *  \brief continuously run a block of code "within a gate"
+ *
+ *  Neither gate closure, nor a failure of the code block we are running, will
+ *  cause an exception to be thrown by safe_action_gate_func()
+ */
+template <typename AsyncAction>
+seastar::future<> do_until_gate(seastar::gate& gt, AsyncAction action)
+{
+  auto stop_cond = [&gt] { return gt.is_closed(); };
+  auto safe_action{ [act = std::move(action), &gt]() mutable {
+    if (gt.is_closed())
+      return seastar::make_ready_future<>();
+    return with_gate(gt, [act = std::move(act)] {
+      return act().handle_exception([](auto e) {});
+    });
+  } };
+
+  return seastar::do_until(stop_cond, std::move(safe_action)).discard_result();
+}
+
+seastar::future<> AdminSocket::start(const std::string& path)
+{
+  if (path.empty()) {
+    logger().error(
+      "{}: Admin Socket socket path missing from the configuration", __func__);
+    return seastar::now();
+  }
+
+  logger().debug("{}: asok socket path={}", __func__, path);
+  auto sock_path = seastar::socket_address{ seastar::unix_domain_addr{ path } };
+
+  std::ignore = register_admin_hooks().then([this, sock_path](
+                                              AsokRegistrationRes reg_res) {
+    return seastar::do_with(
+      seastar::engine().listen(sock_path),
+      [this](seastar::server_socket& lstn) {
+        m_server_sock = &lstn;  // used for 'abort_accept()'
+        return do_until_gate(arrivals_gate, [&lstn, this] {
+          return lstn.accept().then(
+            [this](seastar::accept_result from_accept) {
+              seastar::connected_socket cn =
+               std::move(from_accept.connection);
+             return do_with(cn.input(), cn.output(), std::move(cn),
+                [this](auto& inp, auto& out, auto& cn) {
+                 return handle_client(inp, out).finally([] {
+                   ;  // left for debugging
+                  });
+               });
+           });
+          }).then([] {
+            logger().debug("AdminSocket::init(): admin-sock thread terminated");
+            return seastar::now();
+          });
+      });
+  });
+
+  return seastar::make_ready_future<>();
+}
+
+seastar::future<> AdminSocket::stop()
+{
+  if (m_server_sock && !arrivals_gate.is_closed()) {
+    // note that we check 'is_closed()' as if already closed - the server-sock
+    // may have already been discarded
+    m_server_sock->abort_accept();
+    m_server_sock = nullptr;
+  }
+
+  return seastar::futurize_apply([this] { return arrivals_gate.close(); })
+    .then_wrapped([](seastar::future<> res) {
+      if (res.failed()) {
+        std::ignore = res.handle_exception([](std::exception_ptr eptr) {
+          return seastar::make_ready_future<>();
+        });
+      }
+      return seastar::make_ready_future<>();
+    }).handle_exception(
+      [](std::exception_ptr eptr) { return seastar::make_ready_future<>();
+    }).finally([] { return seastar::make_ready_future<>(); });
+}
+
+/////////////////////////////////////////
+// the internal hooks
+/////////////////////////////////////////
+
+class VersionHook final : public AdminSocketHook {
+ public:
+  seastar::future<bufferlist> call(std::string_view,
+                                  std::string_view format,
+                                  const cmdmap_t&) const final
+  {
+    unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+    f->open_object_section("version");
+    f->dump_string("version", ceph_version_to_str());
+    f->dump_string("release", ceph_release_to_str());
+    f->dump_string("release_type", ceph_release_type());
+    f->close_section();
+    bufferlist out;
+    f->flush(out);
+    return seastar::make_ready_future<bufferlist>(std::move(out));
+  }
+};
+
+/**
+  Note that the git_version command is expected to return a 'version' JSON
+  segment.
+*/
+class GitVersionHook final : public AdminSocketHook {
+ public:
+  seastar::future<bufferlist> call(std::string_view command,
+                                  std::string_view format,
+                                  const cmdmap_t&) const final
+  {
+    unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+    f->open_object_section("version");
+    f->dump_string("git_version", git_version_to_str());
+    f->close_section();
+    ceph::bufferlist out;
+    f->flush(out);
+    return seastar::make_ready_future<bufferlist>(std::move(out));
+  }
+};
+
+class HelpHook final : public AdminSocketHook {
+  const AdminSocket& m_as;
+
+ public:
+  explicit HelpHook(const AdminSocket& as) : m_as{as} {}
+
+  seastar::future<bufferlist> call(std::string_view command,
+                                  std::string_view format,
+                                  const cmdmap_t& cmdmap) const final
+  {
+    return seastar::with_shared(m_as.servers_tbl_rwlock,
+                               [this, format] {
+      unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+      f->open_object_section("help");
+      for (const auto& hook : m_as) {
+        if (!hook->help.empty()) {
+          f->dump_string(hook->command.c_str(), hook->help);
+       }
+      }
+      f->close_section();
+      ceph::bufferlist out;
+      f->flush(out);
+      return seastar::make_ready_future<bufferlist>(std::move(out));
+    });
+  }
+};
+
+class GetdescsHook final : public AdminSocketHook {
+  const AdminSocket& m_as;
+
+ public:
+  explicit GetdescsHook(const AdminSocket& as) : m_as{ as } {}
+
+  seastar::future<bufferlist> call(std::string_view command,
+                                  std::string_view format,
+                                  const cmdmap_t& cmdmap) const final
+  {
+    unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+    return seastar::with_shared(m_as.servers_tbl_rwlock, [this, f=move(f)] {
+      int cmdnum = 0;
+      f->open_object_section("command_descriptions");
+      for (const auto& hook : m_as) {
+       auto secname = fmt::format("cmd {:>03}", cmdnum);
+        dump_cmd_and_help_to_json(f.get(), CEPH_FEATURES_ALL, secname,
+                                  hook->cmddesc, hook->help);
+        cmdnum++;
+      }
+      f->close_section();
+      ceph::bufferlist out;
+      f->flush(out);
+      return seastar::make_ready_future<bufferlist>(std::move(out));
+    });
+  }
+};
+
+/// the hooks that are served directly by the admin_socket server
+seastar::future<AsokRegistrationRes> AdminSocket::register_admin_hooks()
+{
+  version_hook = std::make_unique<VersionHook>();
+  git_ver_hook = std::make_unique<GitVersionHook>();
+  help_hook = std::make_unique<HelpHook>(*this);
+  getdescs_hook = std::make_unique<GetdescsHook>(*this);
+
+  // clang-format off
+  static const std::vector<AsokServiceDef> internal_hooks_tbl{
+      AsokServiceDef{"version",      "version",              version_hook.get(),     "get ceph version"}
+    , AsokServiceDef{"git_version",  "git_version",          git_ver_hook.get(),     "get git sha1"}
+    , AsokServiceDef{"help",         "help",                 help_hook.get(),        "list available commands"}
+    , AsokServiceDef{"get_command_descriptions", "get_command_descriptions",
+                                                             getdescs_hook.get(),    "list available commands"}
+  };
+  // clang-format on
+
+  // server_registration() returns a shared pointer to the AdminSocket
+  // server, i.e. to us. As we already have shared ownership of this object,
+  // we do not need it.
+  return register_server(AdminSocket::hook_server_tag{ this },
+                         internal_hooks_tbl);
+}
+
+}  // namespace crimson::admin
diff --git a/src/crimson/admin/admin_socket.h b/src/crimson/admin/admin_socket.h
new file mode 100644 (file)
index 0000000..e7d7eb0
--- /dev/null
@@ -0,0 +1,274 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+/**
+  A Crimson-wise version of the src/common/admin_socket.h
+
+  Note: assumed to be running on a single core.
+*/
+#include <map>
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/iostream.hh>
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/net/api.hh>
+
+#include "common/cmdparse.h"
+
+class CephContext;
+
+using namespace std::literals;
+
+inline constexpr auto CEPH_ADMIN_SOCK_VERSION = "2"sv;
+
+namespace crimson::admin {
+
+class AdminSocket;
+
+/**
+ * A specific hook must implement exactly one of the two interfaces:
+ * (1) call(command, cmdmap, format, out)
+ * or
+ * (2) exec_command(formatter, command, cmdmap, format, out)
+ *
+ * The default implementation of (1) above calls exec_command() after handling
+ * most of the boiler-plate choirs:
+ * - setting up the formatter, with an appropiate 'section' already opened;
+ * - handling possible failures (exceptions or future_exceptions) returned
+ *   by (2)
+ * - flushing the output to the outgoing bufferlist.
+ */
+class AdminSocketHook {
+ public:
+  /**
+   * \retval 'false' for hook execution errors
+   */
+  virtual seastar::future<ceph::bufferlist>
+  call(std::string_view command,
+       std::string_view format,
+       const cmdmap_t& cmdmap) const = 0;
+  virtual ~AdminSocketHook() {}
+};
+
+/**
+ * The details of a single API in a server's hooks block
+ */
+struct AsokServiceDef {
+  const std::string command;  ///< the sequence of words that should be used
+  const std::string cmddesc;  ///< the command syntax
+  const AdminSocketHook* hook;
+  const std::string help;  ///< help message
+};
+
+class AdminHooksIter;  ///< an iterator over all APIs in all server blocks
+
+/// a ref-count owner of the AdminSocket, used to guarantee its existence until
+/// all server-blocks are unregistered
+using AdminSocketRef = seastar::lw_shared_ptr<AdminSocket>;
+
+using AsokRegistrationRes =
+  std::optional<AdminSocketRef>;  // holding the server alive until after our
+                                  // unregistration
+
+class AdminSocket : public seastar::enable_lw_shared_from_this<AdminSocket> {
+ public:
+  AdminSocket() = default;
+  ~AdminSocket() = default;
+
+  AdminSocket(const AdminSocket&) = delete;
+  AdminSocket& operator=(const AdminSocket&) = delete;
+  AdminSocket(AdminSocket&&) = delete;
+  AdminSocket& operator=(AdminSocket&&) = delete;
+
+  using hook_server_tag = const void*;
+
+  /**
+   *  create the async Seastar thread that handles asok commands arriving
+   *  over the socket.
+   */
+  seastar::future<> start(const std::string& path);
+
+  seastar::future<> stop();
+
+  /**
+   * register an admin socket hooks server
+   *
+   * The server registers a set of APIs under a common hook_server_tag.
+   *
+   * Commands (APIs) are registered under a command string. Incoming
+   * commands are split by spaces and matched against the longest
+   * registered command. For example, if 'foo' and 'foo bar' are
+   * registered, and an incoming command is 'foo bar baz', it is
+   * matched with 'foo bar', while 'foo fud' will match 'foo'.
+   *
+   * The entire incoming command string is passed to the registered
+   * hook.
+   *
+   * \param server_tag  a tag identifying the server registering the hook
+   * \param apis_served a vector of the commands served by this server. Each
+   *        command registration includes its identifying command string, the
+   *        expected call syntax, and some help text.
+   *
+   * A note regarding the help text: if empty, command will not be
+   * included in 'help' output.
+   *
+   * \retval a shared ptr to the asok server itself, or nullopt if
+   *         a block with same tag is already registered.
+   */
+  seastar::future<AsokRegistrationRes> register_server(
+    hook_server_tag server_tag, const std::vector<AsokServiceDef>& apis_served);
+
+  /**
+   * unregister all hooks registered by this hooks-server. The caller
+   * gives up on its shared-ownership of the asok server once the
+   * deregistration is complete.
+   */
+  seastar::future<> unregister_server(hook_server_tag server_tag,
+                                      AdminSocketRef&& server_ref);
+
+ private:
+  /**
+   * the result of analyzing an incoming command, and locating it in
+   * the registered APIs collection.
+   */
+  struct parsed_command_t {
+    std::string cmd;
+    cmdmap_t parameters;
+    std::string format;
+    const AsokServiceDef* api;
+    const AdminSocketHook* hook;
+    /**
+     *  the length of the whole command-sequence under the 'prefix' header
+     */
+    std::size_t cmd_seq_len;
+  };
+  // and the shorthand:
+  using maybe_parsed_t = std::optional<AdminSocket::parsed_command_t>;
+
+  /**
+   * Registering the APIs that are served directly by the admin_socket server.
+   */
+  seastar::future<AsokRegistrationRes> register_admin_hooks();
+
+  /**
+   * unregister all hooks registered by this hooks-server
+   */
+  seastar::future<> unregister_server(hook_server_tag server_tag);
+
+  seastar::future<> handle_client(seastar::input_stream<char>& inp,
+                                  seastar::output_stream<char>& out);
+
+  seastar::future<> execute_line(std::string cmdline,
+                                 seastar::output_stream<char>& out);
+
+  seastar::future<> finalize_response(seastar::output_stream<char>& out,
+                                      ceph::bufferlist&& msgs);
+
+  bool validate_command(const parsed_command_t& parsed,
+                        const std::string& command_text,
+                        ceph::bufferlist& out) const;
+
+  /**
+   * Non-owning ptr to the UNIX-domain "server-socket".
+   * Named here to allow a call to abort_accept().
+   */
+  seastar::api_v2::server_socket* m_server_sock{ nullptr };
+
+  /**
+   * stopping incoming ASOK requests at shutdown
+   */
+  seastar::gate arrivals_gate;
+
+  std::unique_ptr<AdminSocketHook> version_hook;
+  std::unique_ptr<AdminSocketHook> git_ver_hook;
+  std::unique_ptr<AdminSocketHook> the0_hook;
+  std::unique_ptr<AdminSocketHook> help_hook;
+  std::unique_ptr<AdminSocketHook> getdescs_hook;
+  std::unique_ptr<AdminSocketHook> test_throw_hook;  // for dev unit-tests
+
+  /**
+   *  parse the incoming command line into the sequence of words that identifies
+   *  the API, and into its arguments. Locate the command string in the
+   *  registered blocks.
+   */
+  maybe_parsed_t parse_cmd(std::string command_text, bufferlist& out);
+
+  struct server_block {
+    server_block(const std::vector<AsokServiceDef>& hooks) : m_hooks{ hooks } {}
+    const std::vector<AsokServiceDef>& m_hooks;
+  };
+
+  /**
+   *  The servers table is protected by a rw-lock, to be acquired exclusively
+   *  only when registering or removing a server.
+   *  The lock is locked-shared when executing any hook.
+   */
+  mutable seastar::shared_mutex servers_tbl_rwlock;
+  std::map<hook_server_tag, server_block> servers;
+
+  using maybe_service_def_t = std::optional<const AsokServiceDef*>;
+
+  /**
+   * Find the longest subset of words in 'match' that is a registered API (in
+   * any of the servers' control blocks).
+   * locate_subcmd() is expected to be called with the servers table RW-lock
+   * held.
+   */
+  maybe_service_def_t locate_subcmd(std::string match) const;
+
+ public:
+  /**
+   * iterator support
+   */
+  AdminHooksIter begin() const;
+  AdminHooksIter end() const;
+
+  using ServersListIt = std::map<hook_server_tag, server_block>::const_iterator;
+  using ServerApiIt = std::vector<AsokServiceDef>::const_iterator;
+
+  friend class AdminSocketTest;
+  friend class HelpHook;
+  friend class GetdescsHook;
+  friend class AdminHooksIter;
+};
+
+/**
+ * An iterator over all registered APIs.
+ */
+struct AdminHooksIter
+    : public std::iterator<std::output_iterator_tag, AsokServiceDef> {
+ public:
+  explicit AdminHooksIter(const AdminSocket& master, bool end_flag = false);
+
+  ~AdminHooksIter() = default;
+
+  const AsokServiceDef* operator*() const
+  {
+    return &(*m_siter);
+  }
+
+  /**
+   * The (in)equality test is only used to compare to 'end'.
+   */
+  bool operator!=(const AdminHooksIter& other) const
+  {
+    return m_end_marker != other.m_end_marker;
+  }
+
+  AdminHooksIter& operator++();
+
+ private:
+  const AdminSocket& m_master;
+  AdminSocket::ServersListIt m_miter;
+  AdminSocket::ServerApiIt m_siter;
+  bool m_end_marker;
+
+  friend class AdminSocket;
+};
+
+}  // namespace crimson::admin
diff --git a/src/crimson/admin/osd_admin.cc b/src/crimson/admin/osd_admin.cc
new file mode 100644 (file)
index 0000000..7600041
--- /dev/null
@@ -0,0 +1,319 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/admin/osd_admin.h"
+
+#include <iostream>
+
+#include <boost/algorithm/string.hpp>
+#include <fmt/format.h>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/thread.hh>
+
+#include "common/config.h"
+#include "crimson/admin/admin_socket.h"
+#include "crimson/common/log.h"
+#include "crimson/osd/exceptions.h"
+#include "crimson/osd/osd.h"
+
+using crimson::osd::OSD;
+
+namespace {
+seastar::logger& logger()
+{
+  return crimson::get_logger(ceph_subsys_osd);
+}
+}  // namespace
+
+namespace crimson::admin {
+
+using crimson::common::local_conf;
+
+/**
+ * the hooks and states needed to handle OSD asok requests
+ */
+class OsdAdminImp {
+  friend class OsdAdmin;
+  friend class OsdAdminHookBase;
+
+  OSD* m_osd;
+
+  //  shared-ownership of the socket server itself, to guarantee its existence
+  //  until we have a chance to remove our registration:
+  AsokRegistrationRes m_socket_server;
+
+  /**
+   * Common code for all OSD admin hooks.
+   * Adds access to the owning OSD.
+   */
+  class OsdAdminHookBase : public AdminSocketHook {
+   protected:
+    explicit OsdAdminHookBase(OsdAdminImp& master) : m_osd_admin{ master } {}
+    struct tell_result_t {
+      int ret = 0;
+      string err;
+    };
+    /// the specific command implementation
+    virtual seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+                                               Formatter* f) const = 0;
+    seastar::future<bufferlist> call(std::string_view command,
+                                    std::string_view format,
+                                    const cmdmap_t& cmdmap) const final
+    {
+      unique_ptr<Formatter> f{Formatter::create(format, "json-pretty", "json-pretty")};
+      auto ret = seastar::do_with(std::move(f), [cmdmap, this] (auto& f) {
+       Formatter* formatter = f.get();
+       return tell(cmdmap, formatter).then([formatter](auto result) {
+         bufferlist out;
+         if (auto& [ret, err] = result; ret < 0) {
+           out.append(fmt::format("ERROR: {}\n", cpp_strerror(ret)));
+           out.append(err);
+         } else {
+           formatter->flush(out);
+         }
+         return seastar::make_ready_future<bufferlist>(std::move(out));
+        });
+      });
+      return ret;
+    }
+    OsdAdminImp& m_osd_admin;
+  };
+
+  /**
+   * An OSD admin hook: OSD status
+   */
+  class OsdStatusHook : public OsdAdminHookBase {
+   public:
+    explicit OsdStatusHook(OsdAdminImp& master) : OsdAdminHookBase(master){};
+    seastar::future<tell_result_t> tell(const cmdmap_t&,
+                                       Formatter* f) const final
+    {
+      f->open_object_section("status");
+      f->dump_stream("cluster_fsid")
+        << m_osd_admin.osd_superblock().cluster_fsid;
+      f->dump_stream("osd_fsid") << m_osd_admin.osd_superblock().osd_fsid;
+      f->dump_unsigned("whoami", m_osd_admin.osd_superblock().whoami);
+      f->dump_string("state", m_osd_admin.osd_state_name());
+      f->dump_unsigned("oldest_map", m_osd_admin.osd_superblock().oldest_map);
+      f->dump_unsigned("newest_map", m_osd_admin.osd_superblock().newest_map);
+      f->dump_unsigned("num_pgs", m_osd_admin.m_osd->pg_map.get_pgs().size());
+      f->close_section();
+      return seastar::make_ready_future<tell_result_t>();
+    }
+  };
+
+  /**
+   * An OSD admin hook: send beacon
+   */
+  class SendBeaconHook : public OsdAdminHookBase {
+   public:
+    explicit SendBeaconHook(OsdAdminImp& master) : OsdAdminHookBase(master){};
+    seastar::future<tell_result_t> tell(const cmdmap_t&, Formatter*) const final
+    {
+      if (m_osd_admin.get_osd_state().is_active()) {
+        return m_osd_admin.m_osd->send_beacon().then([] {
+          return seastar::make_ready_future<tell_result_t>();
+        });
+      } else {
+        return seastar::make_ready_future<tell_result_t>();
+      }
+    }
+  };
+
+  /**
+   * A CephContext admin hook: listing the configuration values
+   */
+  class ConfigShowHook : public OsdAdminHookBase {
+   public:
+    explicit ConfigShowHook(OsdAdminImp& master)
+        : OsdAdminHookBase(master) {}
+    seastar::future<tell_result_t> tell(const cmdmap_t&, Formatter* f) const final
+    {
+      f->open_object_section("config_show");
+      local_conf().show_config(f);
+      f->close_section();
+      return seastar::make_ready_future<tell_result_t>();
+    }
+  };
+
+  /**
+   * A CephContext admin hook: fetching the value of a specific
+   * configuration item
+   */
+  class ConfigGetHook : public OsdAdminHookBase {
+   public:
+    explicit ConfigGetHook(OsdAdminImp& master)
+        : OsdAdminHookBase(master) {}
+    seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+                                       ceph::Formatter* f) const final
+    {
+      std::string var;
+      if (!cmd_getval(cmdmap, "var", var)) {
+        // should have been caught by 'validate()'
+        return seastar::make_ready_future<tell_result_t>(
+         tell_result_t{-EINVAL, "syntax error: 'config get <var>'"});
+      }
+      try {
+       f->open_object_section("config_get");
+       std::string conf_val =
+         local_conf().get_val<std::string>(var);
+       f->dump_string(var.c_str(), conf_val.c_str());
+       f->close_section();
+       return seastar::make_ready_future<tell_result_t>();
+      } catch (const boost::bad_get&) {
+       return seastar::make_ready_future<tell_result_t>(
+         tell_result_t{-EINVAL, fmt::format("unrecognized option {}", var)});
+      }
+    }
+  };
+
+  /**
+   * A CephContext admin hook: setting the value of a specific configuration
+   * item (a real example: {"prefix": "config set", "var":"debug_osd", "val":
+   * ["30/20"]} )
+   */
+  class ConfigSetHook : public OsdAdminHookBase {
+   public:
+    explicit ConfigSetHook(OsdAdminImp& master)
+        : OsdAdminHookBase(master) {}
+
+    seastar::future<tell_result_t> tell(const cmdmap_t& cmdmap,
+                                       ceph::Formatter* f) const final
+    {
+      std::string var;
+      std::vector<std::string> new_val;
+      if (!cmd_getval(cmdmap, "var", var) ||
+          !cmd_getval(cmdmap, "val", new_val)) {
+       return seastar::make_ready_future<tell_result_t>(
+         tell_result_t{-EINVAL, "syntax error: 'config set <var> <value>'"});
+      }
+      // val may be multiple words
+      string valstr = str_join(new_val, " ");
+      return local_conf().set_val(var, valstr).then([f] {
+       f->open_object_section("config_set");
+        f->dump_string("success", "");
+       f->close_section();
+       return seastar::make_ready_future<tell_result_t>();
+      }).handle_exception_type([](std::invalid_argument& e) {
+       return seastar::make_ready_future<tell_result_t>(
+          tell_result_t{-EINVAL, e.what()});
+      });
+    }
+  };
+
+  /**
+   * A CephContext admin hook: calling assert (if allowed by
+   * 'debug_asok_assert_abort')
+   */
+  class AssertAlwaysHook : public OsdAdminHookBase {
+   public:
+    explicit AssertAlwaysHook(OsdAdminImp& master)
+        : OsdAdminHookBase(master) {}
+    seastar::future<tell_result_t> tell(const cmdmap_t&,
+                                       ceph::Formatter*) const final
+    {
+      if (local_conf().get_val<bool>("debug_asok_assert_abort")) {
+       ceph_assert_always(0);
+       return seastar::make_ready_future<tell_result_t>();
+      } else {
+       return seastar::make_ready_future<tell_result_t>(
+         tell_result_t{-EPERM, "configuration set to disallow asok assert"});
+      }
+    }
+  };
+
+  /**
+   * provide the hooks with access to OSD internals
+   */
+  const OSDSuperblock& osd_superblock() const
+  {
+    return m_osd->superblock;
+  }
+
+  OSDState get_osd_state() const
+  {
+    return m_osd->state;
+  }
+
+  string_view osd_state_name() const
+  {
+    return m_osd->state.to_string();
+  }
+
+  OsdStatusHook osd_status_hook{*this};
+  SendBeaconHook send_beacon_hook{*this};
+  ConfigShowHook config_show_hook{*this};
+  ConfigGetHook config_get_hook{*this};
+  ConfigSetHook config_set_hook{*this};
+  AssertAlwaysHook assert_hook{*this};
+
+ public:
+  OsdAdminImp(OSD* osd)
+      : m_osd{ osd }
+  {}
+
+  ~OsdAdminImp() = default;
+
+  seastar::future<> register_admin_commands()
+  {
+    static const std::vector<AsokServiceDef> hooks_tbl{
+      // clang-format off
+        AsokServiceDef{"status",      "status",        &osd_status_hook,
+                      "OSD status"}
+      , AsokServiceDef{"send_beacon", "send_beacon",   &send_beacon_hook,
+                      "send OSD beacon to mon immediately"}
+      , AsokServiceDef{"config show", "config show", &config_show_hook,
+                      "dump current config settings" }
+      , AsokServiceDef{"config get", "config get name=var,type=CephString",
+                      &config_get_hook,
+                      "config get <field>: get the config value" }
+      , AsokServiceDef{"config set",
+                      "config set name=var,type=CephString name=val,type=CephString,n=N",
+                      &config_set_hook,
+                      "config set <field> <val> [<val> ...]: set a config variable" }
+      , AsokServiceDef{ "assert", "assert", &assert_hook, "asserts" }
+      // clang-format on
+    };
+
+    return m_osd->asok
+      ->register_server(AdminSocket::hook_server_tag{ this }, hooks_tbl)
+      .then([this](AsokRegistrationRes res) { m_socket_server = res; });
+  }
+
+  seastar::future<> unregister_admin_commands()
+  {
+    if (!m_socket_server.has_value()) {
+      logger().warn("{}: OSD asok APIs already removed", __func__);
+      return seastar::now();
+    }
+
+    AdminSocketRef srv{ std::move(m_socket_server.value()) };
+    m_socket_server.reset();
+
+    return m_osd->asok->unregister_server(AdminSocket::hook_server_tag{ this },
+                                          std::move(srv));
+  }
+};
+
+//
+//  some PIMPL details:
+//
+OsdAdmin::OsdAdmin(OSD* osd)
+    : m_imp{ std::make_unique<crimson::admin::OsdAdminImp>(osd) }
+{}
+
+seastar::future<> OsdAdmin::register_admin_commands()
+{
+  return m_imp->register_admin_commands();
+}
+
+seastar::future<> OsdAdmin::unregister_admin_commands()
+{
+  return seastar::do_with(std::move(m_imp), [](auto& detached_imp) {
+    return detached_imp->unregister_admin_commands();
+  });
+}
+
+OsdAdmin::~OsdAdmin() = default;
+}  // namespace crimson::admin
diff --git a/src/crimson/admin/osd_admin.h b/src/crimson/admin/osd_admin.h
new file mode 100644 (file)
index 0000000..c40f38f
--- /dev/null
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+#pragma once
+
+#include <memory>
+
+#include "crimson/common/config_proxy.h"
+
+namespace crimson::osd {
+class OSD;
+}
+
+namespace crimson::admin {
+class OsdAdminImp;
+
+/**
+ * \brief implementation of the configuration-related 'admin_socket' API of
+ *        (Crimson) OSD
+ *
+ * Main functionality:
+ * - fetching OSD status data
+ * - ...
+ */
+class OsdAdmin {
+ public:
+  OsdAdmin(crimson::osd::OSD* osd);
+  ~OsdAdmin();
+  seastar::future<> register_admin_commands();
+  seastar::future<> unregister_admin_commands();
+ private:
+  std::unique_ptr<crimson::admin::OsdAdminImp> m_imp;
+};
+
+}  // namespace crimson::admin
index fcbfe47f83746390785a087b586e021a811a6179..3f53b353202ef71e9d64b5270a08b8024a0c84e7 100644 (file)
@@ -73,6 +73,8 @@ OSD::OSD(int id, uint32_t nonce,
     heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
     heartbeat_timer{[this] { (void)update_heartbeat_peers(); }},
+    asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},
+    admin{std::make_unique<crimson::admin::OsdAdmin>(this)},
     osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
 {
   osdmaps[0] = boost::make_local_shared<OSDMap>();
@@ -273,6 +275,10 @@ seastar::future<> OSD::start()
   }).then([this] {
     return heartbeat->start(public_msgr.get_myaddrs(),
                             cluster_msgr.get_myaddrs());
+  }).then([this] {
+    // create the admin-socket server, and the objects that register
+    // to handle incoming commands
+    return start_asok_admin();
   }).then([this] {
     return start_boot();
   });
@@ -391,12 +397,62 @@ seastar::future<> OSD::_send_alive()
   }
 }
 
+/*
+  The OSD's Admin Socket object created here has two servers (i.e. - blocks of commands
+  to handle) registered to it:
+  - OSD's specific commands are handled by the OSD object;
+  - there are some common commands registered to be directly handled by the AdminSocket object
+    itself.
+*/
+seastar::future<> OSD::start_asok_admin()
+{
+  auto asok_path = local_conf().get_val<std::string>("admin_socket");
+
+  return asok->start(asok_path).then([this] {
+    // register OSD-specific admin-socket hooks
+    return admin->register_admin_commands();
+  });
+}
+
+/*
+  Note: stop_asok_admin() is executed as part of a destruction process,
+  and may occur even before 'start' is complete. We thus take nothing for granted,
+  and all interfaces are checked for existence.
+*/
+seastar::future<> OSD::stop_asok_admin()
+{
+  return ([this] {
+    if (admin) {
+      return admin->unregister_admin_commands();
+    } else {
+      return seastar::make_ready_future<>();
+    }
+  })().then([this] {
+    if (asok) {
+      return asok->stop().then([this]() {
+        asok.release();
+        return seastar::make_ready_future<>();
+      });
+    } else {
+      return seastar::make_ready_future<>();
+    }
+  }).handle_exception([](auto ep) {
+    logger().error("exception on admin-stop: {}", ep);
+    return seastar::make_ready_future<>();
+  }).finally([] {
+    logger().info("OSD::stop_asok_admin(): Admin-sock service destructed");
+  });
+}
+
 seastar::future<> OSD::stop()
 {
   logger().info("stop");
   // see also OSD::shutdown()
   state.set_stopping();
+
   return gate.close().then([this] {
+    return stop_asok_admin();
+  }).then([this] {
     return heartbeat->stop();
   }).then([this] {
     return monc->stop();
index 570017a8a9bb1c190e128ebea397e57792b0bba0..97c9ac456ab01a9cc9c3d46f09e720c4d5dc7a74 100644 (file)
@@ -15,6 +15,8 @@
 
 #include "crimson/common/type_helpers.h"
 #include "crimson/common/auth_handler.h"
+#include "crimson/admin/admin_socket.h"
+#include "crimson/admin/osd_admin.h"
 #include "crimson/common/simple_lru.h"
 #include "crimson/common/shared_lru.h"
 #include "crimson/mgr/client.h"
@@ -115,6 +117,10 @@ class OSD final : public crimson::net::Dispatcher,
   std::unique_ptr<Heartbeat> heartbeat;
   seastar::timer<seastar::lowres_clock> heartbeat_timer;
 
+  // admin-socket
+  seastar::lw_shared_ptr<crimson::admin::AdminSocket> asok;
+  std::unique_ptr<crimson::admin::OsdAdmin> admin;
+
 public:
   OSD(int id, uint32_t nonce,
       crimson::net::Messenger& cluster_msgr,
@@ -180,6 +186,9 @@ private:
 
   void check_osdmap_features();
 
+  seastar::future<> stop_asok_admin();
+  seastar::future<> start_asok_admin();
+
 public:
   OSDMapGate osdmap_gate;
 
@@ -208,6 +217,7 @@ public:
   seastar::future<> update_heartbeat_peers();
 
   friend class PGAdvanceMap;
+  friend class crimson::admin::OsdAdminImp;
 };
 
 }