#undef dout_prefix
#define dout_prefix *_dout << "rbd-wnbd: "
+#define DISK_STATUS_POLLING_INTERVAL_MS 500
+
int RbdMapping::init()
{
librbd::image_info_t info;
- int r = rados.ioctx_create(cfg.poolname.c_str(), io_ctx);
+ rados = client_cache.get_client(cfg.entity_name, cfg.cluster_name);
+ if (!rados) {
+ return -EINVAL;
+ }
+
+ int r = rados->ioctx_create(cfg.poolname.c_str(), io_ctx);
if (r < 0) {
derr << "rbd-wnbd: couldn't create IO context: " << cpp_strerror(r)
+ << ". Pool name: " << cfg.poolname
<< dendl;
return r;
}
// as the rbd pool or admin socket path.
// We're cleaning up the registry entry when the non-persistent mapping
// gets disconnected or when the ceph service restarts.
- r = save_config_to_registry(&cfg, command_line);
+ r = save_config_to_registry(&cfg);
if (r < 0)
return r;
{
std::unique_lock l{shutdown_lock};
+ dout(5) << __func__ << ": removing RBD mapping: " << cfg.devpath << dendl;
+
int r = 0;
if (!cfg.persistent) {
dout(5) << __func__ << ": cleaning up non-persistent mapping: "
image.close();
io_ctx.close();
- rados.shutdown();
}
int RbdMapping::start()
{
+ dout(10) << "initializing mapping" << dendl;
int r = init();
if (r < 0) {
return r;
}
+ dout(10) << "starting wnbd handler" << dendl;
r = handler->start();
if (r) {
return r == ERROR_ALREADY_EXISTS ? -EEXIST : -EINVAL;
}
+ dout(10) << "setting up watcher" << dendl;
watch_ctx = new WNBDWatchCtx(io_ctx, handler, image, initial_image_size);
r = image.update_watch(watch_ctx, &watch_handle);
if (r < 0) {
return r;
}
- // We're informing the parent processes that the initialization
- // was successful.
- int err = 0;
- if (!cfg.parent_pipe.empty()) {
- HANDLE parent_pipe_handle = CreateFile(
- cfg.parent_pipe.c_str(), GENERIC_WRITE, 0, NULL,
- OPEN_EXISTING, 0, NULL);
- if (parent_pipe_handle == INVALID_HANDLE_VALUE) {
- err = GetLastError();
- derr << "Could not open parent pipe: " << win32_strerror(err) << dendl;
- } else if (!WriteFile(parent_pipe_handle, "a", 1, NULL, NULL)) {
- // TODO: consider exiting in this case. The parent didn't wait for us,
- // maybe it was killed after a timeout.
- err = GetLastError();
- derr << "Failed to communicate with the parent: "
- << win32_strerror(err) << dendl;
- } else {
- dout(5) << __func__ << ": submitted parent notification." << dendl;
- }
-
- if (parent_pipe_handle != INVALID_HANDLE_VALUE)
- CloseHandle(parent_pipe_handle);
+ // Wait for the mapped disk to become available.
+ r = wait_mapped_disk(cfg);
+ if (r < 0) {
+ return r;
+ }
- global_init_postfork_finish(g_ceph_context);
+ if (disconnect_cbk) {
+ monitor_thread = std::thread([this]{
+ int ret = this->wait();
+ // Allow "this" to be destroyed by the disconnect callback.
+ this->monitor_thread.detach();
+ dout(5) << "finished waiting for: " << this->cfg.devpath
+ << ", ret: " << ret << dendl;
+ disconnect_cbk(this->cfg.devpath, ret);
+ });
}
return 0;
}
-int RbdMapping::wait() {
+// Wait until the image gets disconnected.
+int RbdMapping::wait()
+{
if (handler) {
return handler->wait();
}
return 0;
}
+
+RbdMapping::~RbdMapping()
+{
+ dout(10) << __func__ << ": cleaning up rbd mapping: "
+ << cfg.devpath << dendl;
+ shutdown();
+}
+
+// Wait for the mapped disk to become available.
+int wait_mapped_disk(Config& cfg)
+{
+ DWORD status = WnbdPollDiskNumber(
+ cfg.devpath.c_str(),
+ TRUE, // ExpectMapped
+ TRUE, // TryOpen
+ cfg.image_map_timeout * 1000,
+ DISK_STATUS_POLLING_INTERVAL_MS,
+ (PDWORD) &cfg.disk_number);
+ if (status) {
+ derr << "WNBD disk unavailable, error: "
+ << win32_strerror(status) << dendl;
+ return -EINVAL;
+ }
+ dout(0) << "Successfully mapped image: " << cfg.devpath
+ << ". Windows disk path: "
+ << "\\\\.\\PhysicalDrive" + std::to_string(cfg.disk_number)
+ << dendl;
+ return 0;
+}
+
+int RbdMappingDispatcher::create(Config& cfg)
+{
+ if (cfg.devpath.empty()) {
+ derr << "missing device identifier" << dendl;
+ return -EINVAL;
+ }
+
+ if (get_mapping(cfg.devpath)) {
+ derr << "already mapped: " << cfg.devpath << dendl;
+ return -EEXIST;
+ }
+
+ auto rbd_mapping = std::make_shared<RbdMapping>(
+ cfg, client_cache,
+ std::bind(
+ &RbdMappingDispatcher::disconnect_cbk,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2));
+
+ int r = rbd_mapping.get()->start();
+ if (!r) {
+ std::unique_lock l{map_mutex};
+ mappings.insert(std::make_pair(cfg.devpath, rbd_mapping));
+ }
+ return r;
+}
+
+std::shared_ptr<RbdMapping> RbdMappingDispatcher::get_mapping(
+ std::string& devpath)
+{
+ std::unique_lock l{map_mutex};
+
+ auto mapping_it = mappings.find(devpath);
+ if (mapping_it == mappings.end()) {
+ // not found
+ return std::shared_ptr<RbdMapping>();
+ } else {
+ return mapping_it->second;
+ }
+}
+
+void RbdMappingDispatcher::disconnect_cbk(std::string devpath, int ret)
+{
+ dout(10) << "RbdMappingDispatcher: cleaning up stopped mapping" << dendl;
+ if (ret) {
+ derr << "rbd mapping wait error: " << ret
+ << ", allowing cleanup to proceed"
+ << dendl;
+ }
+
+ auto mapping = get_mapping(devpath);
+ if (mapping) {
+ // This step can be fairly time consuming, especially when
+ // cumulated. For this reason, we'll ensure that multiple mappings
+ // can be cleaned up simultaneously.
+ mapping->shutdown();
+
+ std::unique_lock l{map_mutex};
+ mappings.erase(devpath);
+ }
+}
#include "wnbd_handler.h"
#include "wnbd_wmi.h"
#include "rbd_wnbd.h"
+#include "rados_client_cache.h"
#include <fstream>
#include <memory>
// Wait for wmi events up to two seconds
#define WMI_EVENT_TIMEOUT 2
-static WnbdHandler* handler = nullptr;
static ceph::mutex shutdown_lock = ceph::make_mutex("RbdWnbd::ShutdownLock");
+static RadosClientCache client_cache;
+static RbdMappingDispatcher mapping_dispatcher(client_cache);
+static RbdMapping* daemon_mapping = nullptr;
+
bool is_process_running(DWORD pid)
{
HANDLE process = OpenProcess(SYNCHRONIZE, FALSE, pid);
return reply.status;
}
-// Spawn a subprocess using the specified "rbd-wnbd" command
-// arguments. A pipe is passed to the child process,
-// which will allow it to communicate the mapping status
-int map_device_using_suprocess(std::string arguments, int timeout_ms)
+int map_device_using_same_process(std::string command_line)
{
- STARTUPINFOW si;
- PROCESS_INFORMATION pi;
- char ch;
- DWORD err = 0, status = 0;
- int exit_code = 0;
- std::ostringstream command_line;
- std::string exe_path;
- // Windows async IO context
- OVERLAPPED connect_o, read_o;
- HANDLE connect_event = NULL, read_event = NULL;
- // Used for waiting on multiple events that are going to be initialized later.
- HANDLE wait_events[2] = { INVALID_HANDLE_VALUE, INVALID_HANDLE_VALUE};
- DWORD bytes_read = 0;
- // We may get a command line containing an old pipe handle when
- // recreating mappings, so we'll have to replace it.
- std::regex pipe_pattern("([\'\"]?--pipe-name[\'\"]? +[\'\"]?[^ ]+[\'\"]?)");
-
- uuid_d uuid;
- uuid.generate_random();
- std::ostringstream pipe_name;
- pipe_name << "\\\\.\\pipe\\rbd-wnbd-" << uuid;
-
- // Create an unique named pipe to communicate with the child. */
- HANDLE pipe_handle = CreateNamedPipe(
- pipe_name.str().c_str(),
- PIPE_ACCESS_INBOUND | FILE_FLAG_FIRST_PIPE_INSTANCE |
- FILE_FLAG_OVERLAPPED,
- PIPE_WAIT,
- 1, // Only accept one instance
- SERVICE_PIPE_BUFFSZ,
- SERVICE_PIPE_BUFFSZ,
- SERVICE_PIPE_TIMEOUT_MS,
- NULL);
- if (pipe_handle == INVALID_HANDLE_VALUE) {
- err = GetLastError();
- derr << "CreateNamedPipe failed: " << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto finally;
- }
- connect_event = CreateEvent(0, TRUE, FALSE, NULL);
- read_event = CreateEvent(0, TRUE, FALSE, NULL);
- if (!connect_event || !read_event) {
- err = GetLastError();
- derr << "CreateEvent failed: " << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto finally;
- }
- connect_o.hEvent = connect_event;
- read_o.hEvent = read_event;
-
- status = ConnectNamedPipe(pipe_handle, &connect_o);
- err = GetLastError();
- if (status || err != ERROR_IO_PENDING) {
- if (status)
- err = status;
- derr << "ConnectNamedPipe failed: " << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto finally;
+ dout(5) << "Creating mapping using the same process. Command line: "
+ << command_line << dendl;
+
+ int argc;
+ // CommandLineToArgvW only has an UTF-16 variant.
+ LPWSTR* argv_w = CommandLineToArgvW(
+ to_wstring(command_line).c_str(), &argc);
+ if (!argv_w) {
+ DWORD err = GetLastError();
+ derr << "Couldn't parse args, error: "
+ << win32_strerror(err) << dendl;
+ return -EINVAL;
}
- err = 0;
- dout(5) << __func__ << ": command arguments: " << arguments << dendl;
-
- // We'll avoid running arbitrary commands, instead using the executable
- // path of this process (expected to be the full rbd-wnbd.exe path).
- err = get_exe_path(exe_path);
- if (err) {
- exit_code = -EINVAL;
- goto finally;
- }
- command_line << std::quoted(exe_path)
- << " " << std::regex_replace(arguments, pipe_pattern, "")
- << " --pipe-name " << pipe_name.str();
-
- dout(5) << __func__ << ": command line: " << command_line.str() << dendl;
-
- GetStartupInfoW(&si);
- // Create a detached child
- if (!CreateProcessW(
- NULL, const_cast<wchar_t*>(to_wstring(command_line.str()).c_str()),
- NULL, NULL, FALSE, DETACHED_PROCESS,
- NULL, NULL, &si, &pi)) {
- err = GetLastError();
- derr << "CreateProcess failed: " << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto finally;
+ std::vector<const char*> args;
+ std::vector<std::string> argv_sv;
+ // We're reserving the vector size in order to avoid resizes,
+ // which would invalidate our char* pointers.
+ argv_sv.reserve(argc);
+ args.reserve(argc);
+ for (int i = 0; i < argc; i++) {
+ argv_sv.push_back(to_string(argv_w[i]));
+ args.push_back(argv_sv[i].c_str());
}
+ LocalFree(argv_w);
- wait_events[0] = connect_event;
- wait_events[1] = pi.hProcess;
- status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
- switch(status) {
- case WAIT_OBJECT_0:
- if (!GetOverlappedResult(pipe_handle, &connect_o, &bytes_read, TRUE)) {
- err = GetLastError();
- derr << "Couldn't establish a connection with the child process. "
- << "Error: " << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto clean_process;
- }
- // We have an incoming connection.
- break;
- case WAIT_OBJECT_0 + 1:
- // The process has exited prematurely.
- goto clean_process;
- case WAIT_TIMEOUT:
- derr << "Timed out waiting for child process connection." << dendl;
- goto clean_process;
- default:
- derr << "Failed waiting for child process. Status: " << status << dendl;
- goto clean_process;
- }
- // Block and wait for child to say it is ready.
- dout(5) << __func__ << ": waiting for child notification." << dendl;
- if (!ReadFile(pipe_handle, &ch, 1, NULL, &read_o)) {
- err = GetLastError();
- if (err != ERROR_IO_PENDING) {
- derr << "Receiving child process reply failed with: "
- << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto clean_process;
- }
+ Config cfg;
+ cfg.command_line = command_line;
+ Command parsed_cmd = None;
+ std::ostringstream err_msg;
+ int r = parse_args(args, &err_msg, &parsed_cmd, &cfg);
+ if (r) {
+ derr << "Couldn't parse args, error: " << r
+ << ". Error message: " << err_msg.str() << dendl;
+ return -EINVAL;
}
- wait_events[0] = read_event;
- wait_events[1] = pi.hProcess;
- // The RBD daemon is expected to write back right after opening the
- // pipe. We'll use the same timeout value for now.
- status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
- switch(status) {
- case WAIT_OBJECT_0:
- if (!GetOverlappedResult(pipe_handle, &read_o, &bytes_read, TRUE)) {
- err = GetLastError();
- derr << "Receiving child process reply failed with: "
- << win32_strerror(err) << dendl;
- exit_code = -ECHILD;
- goto clean_process;
- }
- break;
- case WAIT_OBJECT_0 + 1:
- // The process has exited prematurely.
- goto clean_process;
- case WAIT_TIMEOUT:
- derr << "Timed out waiting for child process message." << dendl;
- goto clean_process;
- default:
- derr << "Failed waiting for child process. Status: " << status << dendl;
- goto clean_process;
+ if (parsed_cmd != Connect) {
+ derr << "Unexpected map command: " << parsed_cmd
+ << ", expecting: " << Connect << dendl;
+ return -EINVAL;
}
- dout(5) << __func__ << ": received child notification." << dendl;
- goto finally;
-
- clean_process:
- if (!is_process_running(pi.dwProcessId)) {
- GetExitCodeProcess(pi.hProcess, (PDWORD)&exit_code);
- if (!exit_code) {
- // Child terminated unexpectedly.
- exit_code = -ECHILD;
- } else if (exit_code > 0) {
- // Make sure to return a negative error code.
- exit_code = -exit_code;
- }
- derr << "Daemon failed with: " << cpp_strerror(exit_code) << dendl;
- } else {
- // The process closed the pipe without notifying us or exiting.
- // This is quite unlikely, but we'll terminate the process.
- dout(0) << "Terminating unresponsive process." << dendl;
- TerminateProcess(pi.hProcess, 1);
- exit_code = -EINVAL;
- }
+ if (construct_devpath_if_missing(&cfg)) {
+ return -EINVAL;
+ }
- finally:
- if (exit_code)
- derr << "Could not start RBD daemon." << dendl;
- if (pipe_handle)
- CloseHandle(pipe_handle);
- if (connect_event)
- CloseHandle(connect_event);
- if (read_event)
- CloseHandle(read_event);
- return exit_code;
+ return mapping_dispatcher.create(cfg);
}
BOOL WINAPI console_handler_routine(DWORD dwCtrlType)
dout(0) << "Received control signal: " << dwCtrlType
<< ". Exiting." << dendl;
- // TODO: shutdown all mappings
+ std::unique_lock l{shutdown_lock};
+ if (daemon_mapping) {
+ daemon_mapping->shutdown();
+ }
return true;
}
// We'll try to map all devices and return a non-zero value
// if any of them fails.
- int r = map_device_using_suprocess(cfg.command_line, time_left_ms);
+ int r = map_device_using_same_process(cfg.command_line);
if (r) {
err = r;
derr << "Could not create mapping: "
<< (char*)request->arguments << dendl;
// TODO: use the configured service map timeout.
// TODO: add ceph.conf options.
- return map_device_using_suprocess(
- (char*)request->arguments,
- DEFAULT_IMAGE_MAP_TIMEOUT * 1000);
+ return map_device_using_same_process(
+ std::string((char*) request->arguments));
default:
dout(1) << "Received unsupported command: "
<< request->command << dendl;
} else {
dout(0) << "Ignoring image remap failure." << dendl;
}
+ } else {
+ dout(0) << "successfully restarted mappings" << dendl;
}
if (adapter_monitoring_enabled) {
global_pre_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, code_env, flags);
// Avoid cluttering the console when spawning a mapping that will run
// in the background.
- if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
+ if (g_conf()->daemonize) {
flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
}
auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
return cct;
}
-// Wait for the mapped disk to become available.
-static int wait_mapped_disk(Config *cfg)
-{
- DWORD status = WnbdPollDiskNumber(
- cfg->devpath.c_str(),
- TRUE, // ExpectMapped
- TRUE, // TryOpen
- cfg->image_map_timeout,
- DISK_STATUS_POLLING_INTERVAL_MS,
- (PDWORD) &cfg->disk_number);
- if (status) {
- derr << "WNBD disk unavailable, error: "
- << win32_strerror(status) << dendl;
- return -EINVAL;
- }
- dout(0) << "Successfully mapped image: " << cfg->devpath
- << ". Windows disk path: "
- << "\\\\.\\PhysicalDrive" + std::to_string(cfg->disk_number)
- << dendl;
- return 0;
-}
-
static int do_map(Config *cfg)
{
- if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
- r = send_map_request(get_cli_args());
- if (r < 0) {
- return r;
- }
-
- return wait_mapped_disk(cfg);
- }
-
dout(0) << "Mapping RBD image: " << cfg->devpath << dendl;
- librados::Rados rados;
- int r = rados.init_with_context(g_ceph_context);
- if (r < 0) {
- derr << "rbd-wnbd: couldn't initialize rados: " << cpp_strerror(r)
- << dendl;
- return r;
- }
-
- RbdMapping rbd_mapping(*cfg, rados, get_cli_args());
- r = rbd_mapping.start();
+ RbdMapping rbd_mapping(*cfg, client_cache);
+ int r = rbd_mapping.start();
if (r) {
return r;
}
- // TODO: consider substracting the time it took to perform the
- // above operations from cfg->image_map_timeout in wait_mapped_disk().
- r = wait_mapped_disk(cfg);
- if (r < 0) {
- goto close_ret;
- }
+ daemon_mapping = &rbd_mapping;
dout(0) << "Successfully mapped RBD image: " << cfg->devpath << dendl;
return rbd_mapping.wait();
}
config.parse_env(CEPH_ENTITY_TYPE_CLIENT);
config.parse_argv(args);
+
+ cfg->cluster_name = string(config->cluster);
+ cfg->entity_name = config->name.to_str();
cfg->poolname = config.get_val<std::string>("rbd_default_pool");
std::vector<const char*>::iterator i;
std::ostringstream err;
+ // The parent pipe parameter has been deprecated since we're no longer
+ // using separate processes per mapping (unless "-f" is passed).
+ // TODO: remove this parameter eventually.
+ std::string parent_pipe;
// TODO: consider using boost::program_options like Device.cc does.
// This should simplify argument parsing. Also, some arguments must be tied
cfg->remap_failure_fatal = true;
} else if (ceph_argparse_flag(args, i, "--adapter-monitoring-enabled", (char *)NULL)) {
cfg->adapter_monitoring_enabled = true;
- } else if (ceph_argparse_witharg(args, i, &cfg->parent_pipe, err,
+ } else if (ceph_argparse_witharg(args, i, &parent_pipe, err,
"--pipe-name", (char *)NULL)) {
if (!err.str().empty()) {
*err_msg << "rbd-wnbd: " << err.str();
return -EINVAL;
}
+ std::cerr << "WARNING: '--pipe-name' has been deprecated and is currently ignored."
+ << std::endl;
} else if (ceph_argparse_witharg(args, i, (int*)&cfg->wnbd_log_level,
err, "--wnbd-log-level", (char *)NULL)) {
if (!err.str().empty()) {
static int rbd_wnbd(int argc, const char *argv[])
{
Config cfg;
+ cfg.command_line = get_cli_args();
auto args = argv_to_vec(argc, argv);
// Avoid using dout before calling "do_global_init"
if (construct_devpath_if_missing(&cfg)) {
return -EINVAL;
}
+ if (g_conf()->daemonize) {
+ r = send_map_request(cfg.command_line);
+ if (r < 0) {
+ return r;
+ }
+ return wait_mapped_disk(cfg);
+ }
+
r = do_map(&cfg);
if (r < 0)
return r;