]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-wnbd: use one daemon process per host
authorLucian Petrut <lpetrut@cloudbasesolutions.com>
Mon, 12 Jun 2023 13:16:39 +0000 (13:16 +0000)
committerLucian Petrut <lpetrut@cloudbasesolutions.com>
Fri, 1 Mar 2024 17:38:53 +0000 (17:38 +0000)
We're currently using one rbd-wnbd process per image mapping.
Since OSD connections aren't shared across those processes,
we end up with an excessive amount of TCP sessions, potentially
exceeding Windows limits:
https://ask.cloudbase.it/question/3598/ceph-for-windows-tcp-session-count/

In order to improve rbd-wnbd's scalability, we're going to use
a single process per host (unless "-f" is passed when mapping the
image, in which case the daemon will run as part of the same
process). This allows OSD sessions to be shared across image
mappings.

Another advantage is that the "ceph-rbd" service starts faster,
especially when having a large number of image mappings.

Signed-off-by: Lucian Petrut <lpetrut@cloudbasesolutions.com>
src/tools/rbd_wnbd/CMakeLists.txt
src/tools/rbd_wnbd/rados_client_cache.cc [new file with mode: 0644]
src/tools/rbd_wnbd/rados_client_cache.h [new file with mode: 0644]
src/tools/rbd_wnbd/rbd_mapping.cc
src/tools/rbd_wnbd/rbd_mapping.h
src/tools/rbd_wnbd/rbd_mapping_config.cc
src/tools/rbd_wnbd/rbd_mapping_config.h
src/tools/rbd_wnbd/rbd_wnbd.cc
src/tools/rbd_wnbd/rbd_wnbd.h
src/tools/rbd_wnbd/wnbd_handler.cc

index e757c71601eb7c609b5584527d5d139e6c71fc18..12a54986fba785767400193e023ba38e250b0c2e 100644 (file)
@@ -1,5 +1,6 @@
 add_executable(
     rbd-wnbd
+    rados_client_cache.cc
     rbd_mapping.cc rbd_mapping_config.cc
     rbd_wnbd.cc wnbd_handler.cc wnbd_wmi.cc
     ../../common/win32/code_page.rc)
diff --git a/src/tools/rbd_wnbd/rados_client_cache.cc b/src/tools/rbd_wnbd/rados_client_cache.cc
new file mode 100644 (file)
index 0000000..4c1acab
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Cloudbase Solutions
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "rados_client_cache.h"
+
+#include "common/errno.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd-wnbd: "
+
+std::shared_ptr<librados::Rados> RadosClientCache::init_client(
+  std::string& entity_name, std::string& cluster_name)
+{
+  auto rados = std::make_shared<librados::Rados>();
+
+  int r = rados->init2(entity_name.c_str(), cluster_name.c_str(), 0);
+  if (r < 0) {
+    derr << "couldn't initialize rados: " << cpp_strerror(r)
+         << dendl;
+    return std::shared_ptr<librados::Rados>();
+  }
+
+  r = rados->conf_read_file(nullptr);
+  if (r < 0) {
+    derr << "couldn't read conf file: " << cpp_strerror(r)
+         << dendl;
+    return std::shared_ptr<librados::Rados>();
+  }
+
+  r = rados->connect();
+  if (r < 0) {
+    derr << "couldn't establish rados connection: "
+         << cpp_strerror(r) << dendl;
+    return std::shared_ptr<librados::Rados>();
+  } else {
+    dout(1) << "successfully initialized rados connection" << dendl;
+  }
+
+  return rados;
+}
+
+std::shared_ptr<librados::Rados> RadosClientCache::get_client(
+  std::string& entity_name, std::string& cluster_name)
+{
+  std::unique_lock l{cache_lock};
+
+  remove_expired();
+
+  std::string key = entity_name + "@" + cluster_name;
+  auto cached_client_weak = cache.find(key);
+  if (cached_client_weak != cache.end()) {
+    if (auto cached_client = cached_client_weak->second.lock()) {
+      dout(1) << "reusing cached rados client: " << key << dendl;
+      return cached_client;
+    } else {
+      dout(5) << "cleaning up expired rados ref: "
+              << cached_client_weak->first << dendl;
+      cache.erase(cached_client_weak);
+    }
+  }
+
+  dout(1) << "creating new rados client: " << key << dendl;
+  auto client = init_client(entity_name, cluster_name);
+  cache.insert(std::pair{key, client});
+  return client;
+}
+
+void RadosClientCache::remove_expired()
+{
+  auto i = cache.begin();
+  while (i != cache.end()) {
+    if (i->second.expired()) {
+      dout(5) << "removing expired rados ref: "
+              << i->first << dendl;
+      i = cache.erase(i);
+      continue;
+    }
+    i++;
+  }
+}
diff --git a/src/tools/rbd_wnbd/rados_client_cache.h b/src/tools/rbd_wnbd/rados_client_cache.h
new file mode 100644 (file)
index 0000000..15841b0
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2023 Cloudbase Solutions
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "common/debug.h"
+#include "common/dout.h"
+
+#include "global/global_init.h"
+
+#include "include/rados/librados.hpp"
+
+// In order to re-use OSD connections, we're caching one rados client
+// per cluster.
+class RadosClientCache
+{
+private:
+  std::map<std::string, std::weak_ptr<librados::Rados>> cache;
+  ceph::mutex cache_lock = ceph::make_mutex("RadosClientCache::MapLock");
+
+  // Remove deleted objects from the map.
+  void remove_expired();
+
+  std::shared_ptr<librados::Rados> init_client(
+    std::string& entity_name, std::string& cluster_name);
+
+public:
+  std::shared_ptr<librados::Rados> get_client(
+    std::string& entity_name, std::string& cluster_name);
+};
index db136dbf6b92caa7bafb50e9fbbbfece62942079..b2d7cff93bc04c31f043505d3e3f13425cb38796 100644 (file)
 #undef dout_prefix
 #define dout_prefix *_dout << "rbd-wnbd: "
 
+#define DISK_STATUS_POLLING_INTERVAL_MS 500
+
 
 int RbdMapping::init()
 {
   librbd::image_info_t info;
 
-  int r = rados.ioctx_create(cfg.poolname.c_str(), io_ctx);
+  rados = client_cache.get_client(cfg.entity_name, cfg.cluster_name);
+  if (!rados) {
+    return -EINVAL;
+  }
+
+  int r = rados->ioctx_create(cfg.poolname.c_str(), io_ctx);
   if (r < 0) {
     derr << "rbd-wnbd: couldn't create IO context: " << cpp_strerror(r)
+         << ". Pool name: " << cfg.poolname
          << dendl;
     return r;
   }
@@ -73,7 +81,7 @@ int RbdMapping::init()
   // as the rbd pool or admin socket path.
   // We're cleaning up the registry entry when the non-persistent mapping
   // gets disconnected or when the ceph service restarts.
-  r = save_config_to_registry(&cfg, command_line);
+  r = save_config_to_registry(&cfg);
   if (r < 0)
     return r;
 
@@ -91,6 +99,8 @@ void RbdMapping::shutdown()
 {
   std::unique_lock l{shutdown_lock};
 
+  dout(5) << __func__ << ": removing RBD mapping: " << cfg.devpath << dendl;
+
   int r = 0;
   if (!cfg.persistent) {
     dout(5) << __func__ << ": cleaning up non-persistent mapping: "
@@ -120,21 +130,23 @@ void RbdMapping::shutdown()
 
   image.close();
   io_ctx.close();
-  rados.shutdown();
 }
 
 int RbdMapping::start()
 {
+  dout(10) << "initializing mapping" << dendl;
   int r = init();
   if (r < 0) {
     return r;
   }
 
+  dout(10) << "starting wnbd handler" << dendl;
   r = handler->start();
   if (r) {
     return r == ERROR_ALREADY_EXISTS ? -EEXIST : -EINVAL;
   }
 
+  dout(10) << "setting up watcher" << dendl;
   watch_ctx = new WNBDWatchCtx(io_ctx, handler, image, initial_image_size);
   r = image.update_watch(watch_ctx, &watch_handle);
   if (r < 0) {
@@ -143,38 +155,123 @@ int RbdMapping::start()
     return r;
   }
 
-  // We're informing the parent processes that the initialization
-  // was successful.
-  int err = 0;
-  if (!cfg.parent_pipe.empty()) {
-    HANDLE parent_pipe_handle = CreateFile(
-      cfg.parent_pipe.c_str(), GENERIC_WRITE, 0, NULL,
-      OPEN_EXISTING, 0, NULL);
-    if (parent_pipe_handle == INVALID_HANDLE_VALUE) {
-      err = GetLastError();
-      derr << "Could not open parent pipe: " << win32_strerror(err) << dendl;
-    } else if (!WriteFile(parent_pipe_handle, "a", 1, NULL, NULL)) {
-      // TODO: consider exiting in this case. The parent didn't wait for us,
-      // maybe it was killed after a timeout.
-      err = GetLastError();
-      derr << "Failed to communicate with the parent: "
-           << win32_strerror(err) << dendl;
-    } else {
-      dout(5) << __func__ << ": submitted parent notification." << dendl;
-    }
-
-    if (parent_pipe_handle != INVALID_HANDLE_VALUE)
-      CloseHandle(parent_pipe_handle);
+  // Wait for the mapped disk to become available.
+  r = wait_mapped_disk(cfg);
+  if (r < 0) {
+    return r;
+  }
 
-    global_init_postfork_finish(g_ceph_context);
+  if (disconnect_cbk) {
+    monitor_thread = std::thread([this]{
+      int ret = this->wait();
+      // Allow "this" to be destroyed by the disconnect callback.
+      this->monitor_thread.detach();
+      dout(5) << "finished waiting for: " << this->cfg.devpath
+              << ", ret: " << ret << dendl;
+      disconnect_cbk(this->cfg.devpath, ret);
+    });
   }
 
   return 0;
 }
 
-int RbdMapping::wait() {
+// Wait until the image gets disconnected.
+int RbdMapping::wait()
+{
   if (handler) {
     return handler->wait();
   }
   return 0;
 }
+
+RbdMapping::~RbdMapping()
+{
+  dout(10) << __func__ << ": cleaning up rbd mapping: "
+           << cfg.devpath << dendl;
+  shutdown();
+}
+
+// Wait for the mapped disk to become available.
+int wait_mapped_disk(Config& cfg)
+{
+  DWORD status = WnbdPollDiskNumber(
+    cfg.devpath.c_str(),
+    TRUE, // ExpectMapped
+    TRUE, // TryOpen
+    cfg.image_map_timeout * 1000,
+    DISK_STATUS_POLLING_INTERVAL_MS,
+    (PDWORD) &cfg.disk_number);
+  if (status) {
+    derr << "WNBD disk unavailable, error: "
+         << win32_strerror(status) << dendl;
+    return -EINVAL;
+  }
+  dout(0) << "Successfully mapped image: " << cfg.devpath
+          << ". Windows disk path: "
+          << "\\\\.\\PhysicalDrive" + std::to_string(cfg.disk_number)
+          << dendl;
+  return 0;
+}
+
+int RbdMappingDispatcher::create(Config& cfg)
+{
+  if (cfg.devpath.empty()) {
+    derr << "missing device identifier" << dendl;
+    return -EINVAL;
+  }
+
+  if (get_mapping(cfg.devpath)) {
+    derr << "already mapped: " << cfg.devpath << dendl;
+    return -EEXIST;
+  }
+
+  auto rbd_mapping = std::make_shared<RbdMapping>(
+    cfg, client_cache,
+    std::bind(
+      &RbdMappingDispatcher::disconnect_cbk,
+      this,
+      std::placeholders::_1,
+      std::placeholders::_2));
+
+  int r = rbd_mapping.get()->start();
+  if (!r) {
+    std::unique_lock l{map_mutex};
+    mappings.insert(std::make_pair(cfg.devpath, rbd_mapping));
+  }
+  return r;
+}
+
+std::shared_ptr<RbdMapping> RbdMappingDispatcher::get_mapping(
+  std::string& devpath)
+{
+  std::unique_lock l{map_mutex};
+
+  auto mapping_it = mappings.find(devpath);
+  if (mapping_it == mappings.end()) {
+    // not found
+    return std::shared_ptr<RbdMapping>();
+  } else {
+    return mapping_it->second;
+  }
+}
+
+void RbdMappingDispatcher::disconnect_cbk(std::string devpath, int ret)
+{
+  dout(10) << "RbdMappingDispatcher: cleaning up stopped mapping" << dendl;
+  if (ret) {
+    derr << "rbd mapping wait error: " << ret
+         << ", allowing cleanup to proceed"
+         << dendl;
+  }
+
+  auto mapping = get_mapping(devpath);
+  if (mapping) {
+    // This step can be fairly time consuming, especially when
+    // cumulated. For this reason, we'll ensure that multiple mappings
+    // can be cleaned up simultaneously.
+    mapping->shutdown();
+
+    std::unique_lock l{map_mutex};
+    mappings.erase(devpath);
+  }
+}
index ce96beeec9e30a30fe8c6b268d3499401e2e5850..52404ed503f297c80b5ba1cc92bb76de8cb8217b 100644 (file)
@@ -12,6 +12,7 @@
 
 #pragma once
 
+#include "rados_client_cache.h"
 #include "rbd_mapping_config.h"
 #include "wnbd_handler.h"
 
@@ -45,6 +46,7 @@ public:
   }
 };
 
+typedef std::function<void(std::string devpath, int ret)> disconnect_cbk_t;
 
 class RbdMapping
 {
@@ -52,8 +54,8 @@ private:
   Config cfg;
   // We're sharing the rados object across mappings in order to
   // reuse the OSD connections.
-  librados::Rados &rados;
-  std::string command_line;
+  RadosClientCache& client_cache;
+  std::shared_ptr<librados::Rados> rados;
 
   librbd::RBD rbd;
   librados::IoCtx io_ctx;
@@ -63,26 +65,54 @@ private:
   WnbdHandler* handler = nullptr;
   uint64_t watch_handle;
   WNBDWatchCtx* watch_ctx = nullptr;
+  disconnect_cbk_t disconnect_cbk;
 
   ceph::mutex shutdown_lock = ceph::make_mutex("RbdMapping::ShutdownLock");
+  std::thread monitor_thread;
 
   int init();
-  void shutdown();
 
 public:
-  RbdMapping(Config& _cfg, librados::Rados& _rados,
-             std::string _command_line)
+  RbdMapping(Config& _cfg,
+             RadosClientCache& _client_cache)
     : cfg(_cfg)
-    , rados(_rados)
-    , command_line(_command_line)
-  {
-  }
+    , client_cache(_client_cache)
+  {}
 
-  ~RbdMapping()
-  {
-      shutdown();
-  }
+  RbdMapping(Config& _cfg,
+             RadosClientCache& _client_cache,
+             disconnect_cbk_t _disconnect_cbk)
+    : cfg(_cfg)
+    , client_cache(_client_cache)
+    , disconnect_cbk(_disconnect_cbk)
+  {}
+
+  ~RbdMapping();
 
   int start();
+  // Wait until the image gets disconnected.
   int wait();
+  void shutdown();
+};
+
+// Wait for the mapped disk to become available.
+int wait_mapped_disk(Config& cfg);
+
+class RbdMappingDispatcher
+{
+private:
+  RadosClientCache& client_cache;
+
+  std::map<std::string, std::shared_ptr<RbdMapping>> mappings;
+  ceph::mutex map_mutex = ceph::make_mutex("RbdMappingDispatcher::MapMutex");
+
+  void disconnect_cbk(std::string devpath, int ret);
+
+public:
+  RbdMappingDispatcher(RadosClientCache& _client_cache)
+    : client_cache(_client_cache)
+  {}
+
+  int create(Config& cfg);
+  std::shared_ptr<RbdMapping> get_mapping(std::string& devpath);
 };
index c78920ea8d32ac72a3f99eee5ddc6cf5b73a8e3d..3cdd7f6dda5137e4b8d5e43c4881067073bb464d 100644 (file)
@@ -52,7 +52,7 @@ int construct_devpath_if_missing(Config* cfg)
   return 0;
 }
 
-int save_config_to_registry(Config* cfg, std::string command_line)
+int save_config_to_registry(Config* cfg)
 {
   std::string strKey{ SERVICE_REG_KEY };
   strKey.append("\\");
@@ -73,7 +73,7 @@ int save_config_to_registry(Config* cfg, std::string command_line)
       reg_key.set("nsname", cfg->nsname) ||
       reg_key.set("imgname", cfg->imgname) ||
       reg_key.set("snapname", cfg->snapname) ||
-      reg_key.set("command_line", command_line) ||
+      reg_key.set("command_line", cfg->command_line) ||
       reg_key.set("persistent", cfg->persistent) ||
       reg_key.set("admin_sock_path", g_conf()->admin_socket) ||
       reg_key.flush()) {
index 60e3fa20a45e47c3e585ef6d8789146979838cb1..55262f6d02bb6dda5ef76832b908c26048192a7d 100644 (file)
@@ -31,7 +31,8 @@ struct Config {
   bool exclusive = false;
   bool readonly = false;
 
-  std::string parent_pipe;
+  std::string cluster_name;
+  std::string entity_name;
 
   std::string poolname;
   std::string nsname;
@@ -72,6 +73,6 @@ struct Config {
 };
 
 int construct_devpath_if_missing(Config* cfg);
-int save_config_to_registry(Config* cfg, std::string command_line);
+int save_config_to_registry(Config* cfg);
 int remove_config_from_registry(Config* cfg);
 int load_mapping_config_from_registry(std::string devpath, Config* cfg);
index 08cb7ca9359d155fc274cf1eaf0895eebc68488d..b3a42285b2c1886b11de8b5d961c97a434a0261d 100644 (file)
@@ -29,6 +29,7 @@
 #include "wnbd_handler.h"
 #include "wnbd_wmi.h"
 #include "rbd_wnbd.h"
+#include "rados_client_cache.h"
 
 #include <fstream>
 #include <memory>
@@ -72,9 +73,12 @@ using namespace std;
 // Wait for wmi events up to two seconds
 #define WMI_EVENT_TIMEOUT 2
 
-static WnbdHandler* handler = nullptr;
 static ceph::mutex shutdown_lock = ceph::make_mutex("RbdWnbd::ShutdownLock");
 
+static RadosClientCache client_cache;
+static RbdMappingDispatcher mapping_dispatcher(client_cache);
+static RbdMapping* daemon_mapping = nullptr;
+
 bool is_process_running(DWORD pid)
 {
   HANDLE process = OpenProcess(SYNCHRONIZE, FALSE, pid);
@@ -326,192 +330,55 @@ int send_map_request(std::string arguments) {
   return reply.status;
 }
 
-// Spawn a subprocess using the specified "rbd-wnbd" command
-// arguments. A pipe is passed to the child process,
-// which will allow it to communicate the mapping status
-int map_device_using_suprocess(std::string arguments, int timeout_ms)
+int map_device_using_same_process(std::string command_line)
 {
-  STARTUPINFOW si;
-  PROCESS_INFORMATION pi;
-  char ch;
-  DWORD err = 0, status = 0;
-  int exit_code = 0;
-  std::ostringstream command_line;
-  std::string exe_path;
-  // Windows async IO context
-  OVERLAPPED connect_o, read_o;
-  HANDLE connect_event = NULL, read_event = NULL;
-  // Used for waiting on multiple events that are going to be initialized later.
-  HANDLE wait_events[2] = { INVALID_HANDLE_VALUE, INVALID_HANDLE_VALUE};
-  DWORD bytes_read = 0;
-  // We may get a command line containing an old pipe handle when
-  // recreating mappings, so we'll have to replace it.
-  std::regex pipe_pattern("([\'\"]?--pipe-name[\'\"]? +[\'\"]?[^ ]+[\'\"]?)");
-
-  uuid_d uuid;
-  uuid.generate_random();
-  std::ostringstream pipe_name;
-  pipe_name << "\\\\.\\pipe\\rbd-wnbd-" << uuid;
-
-  // Create an unique named pipe to communicate with the child. */
-  HANDLE pipe_handle = CreateNamedPipe(
-    pipe_name.str().c_str(),
-    PIPE_ACCESS_INBOUND | FILE_FLAG_FIRST_PIPE_INSTANCE |
-      FILE_FLAG_OVERLAPPED,
-    PIPE_WAIT,
-    1, // Only accept one instance
-    SERVICE_PIPE_BUFFSZ,
-    SERVICE_PIPE_BUFFSZ,
-    SERVICE_PIPE_TIMEOUT_MS,
-    NULL);
-  if (pipe_handle == INVALID_HANDLE_VALUE) {
-    err = GetLastError();
-    derr << "CreateNamedPipe failed: " << win32_strerror(err) << dendl;
-    exit_code = -ECHILD;
-    goto finally;
-  }
-  connect_event = CreateEvent(0, TRUE, FALSE, NULL);
-  read_event = CreateEvent(0, TRUE, FALSE, NULL);
-  if (!connect_event || !read_event) {
-    err = GetLastError();
-    derr << "CreateEvent failed: " << win32_strerror(err) << dendl;
-    exit_code = -ECHILD;
-    goto finally;
-  }
-  connect_o.hEvent = connect_event;
-  read_o.hEvent = read_event;
-
-  status = ConnectNamedPipe(pipe_handle, &connect_o);
-  err = GetLastError();
-  if (status || err != ERROR_IO_PENDING) {
-    if (status)
-      err = status;
-    derr << "ConnectNamedPipe failed: " << win32_strerror(err) << dendl;
-    exit_code = -ECHILD;
-    goto finally;
+  dout(5) << "Creating mapping using the same process. Command line: "
+          << command_line << dendl;
+
+  int argc;
+  // CommandLineToArgvW only has an UTF-16 variant.
+  LPWSTR* argv_w = CommandLineToArgvW(
+    to_wstring(command_line).c_str(), &argc);
+  if (!argv_w) {
+    DWORD err = GetLastError();
+    derr << "Couldn't parse args, error: "
+         << win32_strerror(err) << dendl;
+    return -EINVAL;
   }
-  err = 0;
 
-  dout(5) << __func__ << ": command arguments: " << arguments << dendl;
-
-  // We'll avoid running arbitrary commands, instead using the executable
-  // path of this process (expected to be the full rbd-wnbd.exe path).
-  err = get_exe_path(exe_path);
-  if (err) {
-    exit_code = -EINVAL;
-    goto finally;
-  }
-  command_line << std::quoted(exe_path)
-               << " " << std::regex_replace(arguments, pipe_pattern, "")
-               << " --pipe-name " << pipe_name.str();
-
-  dout(5) << __func__ << ": command line: " << command_line.str() << dendl;
-
-  GetStartupInfoW(&si);
-  // Create a detached child
-  if (!CreateProcessW(
-      NULL, const_cast<wchar_t*>(to_wstring(command_line.str()).c_str()),
-      NULL, NULL, FALSE, DETACHED_PROCESS,
-      NULL, NULL, &si, &pi)) {
-    err = GetLastError();
-    derr << "CreateProcess failed: " << win32_strerror(err) << dendl;
-    exit_code = -ECHILD;
-    goto finally;
+  std::vector<const char*> args;
+  std::vector<std::string> argv_sv;
+  // We're reserving the vector size in order to avoid resizes,
+  // which would invalidate our char* pointers.
+  argv_sv.reserve(argc);
+  args.reserve(argc);
+  for (int i = 0; i < argc; i++) {
+    argv_sv.push_back(to_string(argv_w[i]));
+    args.push_back(argv_sv[i].c_str());
   }
+  LocalFree(argv_w);
 
-  wait_events[0] = connect_event;
-  wait_events[1] = pi.hProcess;
-  status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
-  switch(status) {
-    case WAIT_OBJECT_0:
-      if (!GetOverlappedResult(pipe_handle, &connect_o, &bytes_read, TRUE)) {
-        err = GetLastError();
-        derr << "Couldn't establish a connection with the child process. "
-             << "Error: " << win32_strerror(err) << dendl;
-        exit_code = -ECHILD;
-        goto clean_process;
-      }
-      // We have an incoming connection.
-      break;
-    case WAIT_OBJECT_0 + 1:
-      // The process has exited prematurely.
-      goto clean_process;
-    case WAIT_TIMEOUT:
-      derr << "Timed out waiting for child process connection." << dendl;
-      goto clean_process;
-    default:
-      derr << "Failed waiting for child process. Status: " << status << dendl;
-      goto clean_process;
-  }
-  // Block and wait for child to say it is ready.
-  dout(5) << __func__ << ": waiting for child notification." << dendl;
-  if (!ReadFile(pipe_handle, &ch, 1, NULL, &read_o)) {
-    err = GetLastError();
-    if (err != ERROR_IO_PENDING) {
-      derr << "Receiving child process reply failed with: "
-           << win32_strerror(err) << dendl;
-      exit_code = -ECHILD;
-      goto clean_process;
-    }
+  Config cfg;
+  cfg.command_line = command_line;
+  Command parsed_cmd = None;
+  std::ostringstream err_msg;
+  int r = parse_args(args, &err_msg, &parsed_cmd, &cfg);
+  if (r) {
+    derr << "Couldn't parse args, error: " << r
+         << ". Error message: " << err_msg.str() << dendl;
+    return -EINVAL;
   }
-  wait_events[0] = read_event;
-  wait_events[1] = pi.hProcess;
-  // The RBD daemon is expected to write back right after opening the
-  // pipe. We'll use the same timeout value for now.
-  status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
-  switch(status) {
-    case WAIT_OBJECT_0:
-      if (!GetOverlappedResult(pipe_handle, &read_o, &bytes_read, TRUE)) {
-        err = GetLastError();
-        derr << "Receiving child process reply failed with: "
-             << win32_strerror(err) << dendl;
-        exit_code = -ECHILD;
-        goto clean_process;
-      }
-      break;
-    case WAIT_OBJECT_0 + 1:
-      // The process has exited prematurely.
-      goto clean_process;
-    case WAIT_TIMEOUT:
-      derr << "Timed out waiting for child process message." << dendl;
-      goto clean_process;
-    default:
-      derr << "Failed waiting for child process. Status: " << status << dendl;
-      goto clean_process;
+  if (parsed_cmd != Connect) {
+    derr << "Unexpected map command: " << parsed_cmd
+         << ", expecting: " << Connect << dendl;
+    return -EINVAL;
   }
 
-  dout(5) << __func__ << ": received child notification." << dendl;
-  goto finally;
-
-  clean_process:
-    if (!is_process_running(pi.dwProcessId)) {
-      GetExitCodeProcess(pi.hProcess, (PDWORD)&exit_code);
-      if (!exit_code) {
-        // Child terminated unexpectedly.
-        exit_code = -ECHILD;
-      } else if (exit_code > 0) {
-        // Make sure to return a negative error code.
-        exit_code = -exit_code;
-      }
-      derr << "Daemon failed with: " << cpp_strerror(exit_code) << dendl;
-    } else {
-      // The process closed the pipe without notifying us or exiting.
-      // This is quite unlikely, but we'll terminate the process.
-      dout(0) << "Terminating unresponsive process." << dendl;
-      TerminateProcess(pi.hProcess, 1);
-      exit_code = -EINVAL;
-    }
+  if (construct_devpath_if_missing(&cfg)) {
+    return -EINVAL;
+  }
 
-  finally:
-    if (exit_code)
-      derr << "Could not start RBD daemon." << dendl;
-    if (pipe_handle)
-      CloseHandle(pipe_handle);
-    if (connect_event)
-      CloseHandle(connect_event);
-    if (read_event)
-      CloseHandle(read_event);
-  return exit_code;
+  return mapping_dispatcher.create(cfg);
 }
 
 BOOL WINAPI console_handler_routine(DWORD dwCtrlType)
@@ -519,7 +386,10 @@ BOOL WINAPI console_handler_routine(DWORD dwCtrlType)
   dout(0) << "Received control signal: " << dwCtrlType
           << ". Exiting." << dendl;
 
-  // TODO: shutdown all mappings
+  std::unique_lock l{shutdown_lock};
+  if (daemon_mapping) {
+    daemon_mapping->shutdown();
+  }
 
   return true;
 }
@@ -591,7 +461,7 @@ int restart_registered_mappings(
 
         // We'll try to map all devices and return a non-zero value
         // if any of them fails.
-        int r = map_device_using_suprocess(cfg.command_line, time_left_ms);
+        int r = map_device_using_same_process(cfg.command_line);
         if (r) {
           err = r;
           derr << "Could not create mapping: "
@@ -724,9 +594,8 @@ class RBDService : public ServiceBase {
                   << (char*)request->arguments << dendl;
           // TODO: use the configured service map timeout.
           // TODO: add ceph.conf options.
-          return map_device_using_suprocess(
-            (char*)request->arguments,
-            DEFAULT_IMAGE_MAP_TIMEOUT * 1000);
+          return map_device_using_same_process(
+            std::string((char*) request->arguments));
         default:
           dout(1) << "Received unsupported command: "
                   << request->command << dendl;
@@ -939,6 +808,8 @@ exit:
         } else {
           dout(0) << "Ignoring image remap failure." << dendl;
         }
+      } else {
+        dout(0) << "successfully restarted mappings" << dendl;
       }
 
       if (adapter_monitoring_enabled) {
@@ -1070,7 +941,7 @@ boost::intrusive_ptr<CephContext> do_global_init(
   global_pre_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, code_env, flags);
   // Avoid cluttering the console when spawning a mapping that will run
   // in the background.
-  if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
+  if (g_conf()->daemonize) {
     flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
   }
   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
@@ -1083,61 +954,17 @@ boost::intrusive_ptr<CephContext> do_global_init(
   return cct;
 }
 
-// Wait for the mapped disk to become available.
-static int wait_mapped_disk(Config *cfg)
-{
-  DWORD status = WnbdPollDiskNumber(
-    cfg->devpath.c_str(),
-    TRUE, // ExpectMapped
-    TRUE, // TryOpen
-    cfg->image_map_timeout,
-    DISK_STATUS_POLLING_INTERVAL_MS,
-    (PDWORD) &cfg->disk_number);
-  if (status) {
-    derr << "WNBD disk unavailable, error: "
-         << win32_strerror(status) << dendl;
-    return -EINVAL;
-  }
-  dout(0) << "Successfully mapped image: " << cfg->devpath
-          << ". Windows disk path: "
-          << "\\\\.\\PhysicalDrive" + std::to_string(cfg->disk_number)
-          << dendl;
-  return 0;
-}
-
 static int do_map(Config *cfg)
 {
-  if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
-    r = send_map_request(get_cli_args());
-    if (r < 0) {
-      return r;
-    }
-
-    return wait_mapped_disk(cfg);
-  }
-
   dout(0) << "Mapping RBD image: " << cfg->devpath << dendl;
 
-  librados::Rados rados;
-  int r = rados.init_with_context(g_ceph_context);
-  if (r < 0) {
-    derr << "rbd-wnbd: couldn't initialize rados: " << cpp_strerror(r)
-         << dendl;
-    return r;
-  }
-
-  RbdMapping rbd_mapping(*cfg, rados, get_cli_args());
-  r = rbd_mapping.start();
+  RbdMapping rbd_mapping(*cfg, client_cache);
+  int r = rbd_mapping.start();
   if (r) {
     return r;
   }
 
-  // TODO: consider substracting the time it took to perform the
-  // above operations from cfg->image_map_timeout in wait_mapped_disk().
-  r = wait_mapped_disk(cfg);
-  if (r < 0) {
-    goto close_ret;
-  }
+  daemon_mapping = &rbd_mapping;
 
   dout(0) << "Successfully mapped RBD image: " << cfg->devpath << dendl;
   return rbd_mapping.wait();
@@ -1371,10 +1198,17 @@ static int parse_args(std::vector<const char*>& args,
   }
   config.parse_env(CEPH_ENTITY_TYPE_CLIENT);
   config.parse_argv(args);
+
+  cfg->cluster_name = string(config->cluster);
+  cfg->entity_name = config->name.to_str();
   cfg->poolname = config.get_val<std::string>("rbd_default_pool");
 
   std::vector<const char*>::iterator i;
   std::ostringstream err;
+  // The parent pipe parameter has been deprecated since we're no longer
+  // using separate processes per mapping (unless "-f" is passed).
+  // TODO: remove this parameter eventually.
+  std::string parent_pipe;
 
   // TODO: consider using boost::program_options like Device.cc does.
   // This should simplify argument parsing. Also, some arguments must be tied
@@ -1400,12 +1234,14 @@ static int parse_args(std::vector<const char*>& args,
       cfg->remap_failure_fatal = true;
     } else if (ceph_argparse_flag(args, i, "--adapter-monitoring-enabled", (char *)NULL)) {
       cfg->adapter_monitoring_enabled = true;
-    } else if (ceph_argparse_witharg(args, i, &cfg->parent_pipe, err,
+    } else if (ceph_argparse_witharg(args, i, &parent_pipe, err,
                                      "--pipe-name", (char *)NULL)) {
       if (!err.str().empty()) {
         *err_msg << "rbd-wnbd: " << err.str();
         return -EINVAL;
       }
+      std::cerr << "WARNING: '--pipe-name' has been deprecated and is currently ignored."
+                << std::endl;
     } else if (ceph_argparse_witharg(args, i, (int*)&cfg->wnbd_log_level,
                                      err, "--wnbd-log-level", (char *)NULL)) {
       if (!err.str().empty()) {
@@ -1547,6 +1383,7 @@ static int parse_args(std::vector<const char*>& args,
 static int rbd_wnbd(int argc, const char *argv[])
 {
   Config cfg;
+  cfg.command_line = get_cli_args();
   auto args = argv_to_vec(argc, argv);
 
   // Avoid using dout before calling "do_global_init"
@@ -1578,6 +1415,14 @@ static int rbd_wnbd(int argc, const char *argv[])
       if (construct_devpath_if_missing(&cfg)) {
         return -EINVAL;
       }
+      if (g_conf()->daemonize) {
+        r = send_map_request(cfg.command_line);
+        if (r < 0) {
+          return r;
+        }
+        return wait_mapped_disk(cfg);
+      }
+
       r = do_map(&cfg);
       if (r < 0)
         return r;
index ba6280031a364ca3089b864e54b90908c5bef328..6ec4851e8a7c04ffecc6001fa03b510893a0bed9 100644 (file)
@@ -27,8 +27,6 @@
 #define SERVICE_PIPE_TIMEOUT_MS 5000
 #define SERVICE_PIPE_BUFFSZ 4096
 
-#define DISK_STATUS_POLLING_INTERVAL_MS 500
-
 #define HELP_INFO 1
 #define VERSION_INFO 2
 
@@ -64,16 +62,16 @@ int disconnect_all_mappings(
   int worker_count);
 int restart_registered_mappings(
   int worker_count, int total_timeout, int image_map_timeout);
-int map_device_using_suprocess(std::string command_line);
+int map_device_using_same_process(std::string command_line);
 
 BOOL WINAPI console_handler_routine(DWORD dwCtrlType);
 
 static int parse_args(std::vector<const char*>& args,
                       std::ostream *err_msg,
                       Command *command, Config *cfg);
+static int do_map(Config *cfg);
 static int do_unmap(Config *cfg, bool unregister);
 
-
 class BaseIterator {
   public:
     virtual ~BaseIterator() {};
index ba53b872697c07d09283e83c2758c491f4c70882..d5f4356ad1778ddaa3d5a596a76673c73c67daf8 100644 (file)
@@ -50,12 +50,15 @@ int WnbdHandler::wait()
 {
   int err = 0;
   if (started && wnbd_disk) {
-    dout(10) << __func__ << ": waiting" << dendl;
+    dout(10) << "waiting for WNBD mapping: " << instance_name << dendl;
 
     err = WnbdWaitDispatcher(wnbd_disk);
     if (err) {
-      derr << __func__ << " failed waiting for dispatcher to stop: "
-           << err << dendl;
+      derr << __func__ << ": failed waiting for dispatcher to stop: "
+           << instance_name
+           << ". Error: " << err << dendl;
+    } else {
+      dout(10) << "WNBD mapping disconnected: " << instance_name << dendl;
     }
   }