From aed0fe4605f34e25fb44b890cb0f0cbdefbbaba1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 5 Sep 2019 14:23:27 -0500 Subject: [PATCH] common/admin_socket: use pipe for general wakeup Use the pipe to wake up the thread. Use a separate bool to signal a shutdown. Signed-off-by: Sage Weil --- src/common/admin_socket.cc | 46 ++++++++++++++++++++++---------------- src/common/admin_socket.h | 9 ++++---- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/common/admin_socket.cc b/src/common/admin_socket.cc index 58075b054e7..ff71c02b9ee 100644 --- a/src/common/admin_socket.cc +++ b/src/common/admin_socket.cc @@ -99,18 +99,18 @@ AdminSocket::~AdminSocket() * It only handles one connection at a time at the moment. All I/O is nonblocking, * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking] * - * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this - * pipe, the thread terminates itself gracefully, allowing the - * AdminSocketConfigObs class to join() it. + * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this + * pipe, the thread wakes up. If m_shutdown is set, the thread terminates + * itself gracefully, allowing the AdminSocketConfigObs class to join() it. */ -std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr) +std::string AdminSocket::create_wakeup_pipe(int *pipe_rd, int *pipe_wr) { int pipefd[2]; - if (pipe_cloexec(pipefd, 0) < 0) { + if (pipe_cloexec(pipefd, O_NONBLOCK) < 0) { int e = errno; ostringstream oss; - oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e); + oss << "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e); return oss.str(); } @@ -119,15 +119,15 @@ std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr) return ""; } -std::string AdminSocket::destroy_shutdown_pipe() +std::string AdminSocket::destroy_wakeup_pipe() { - // Send a byte to the shutdown pipe that the thread is listening to + // Send a byte to the wakeup pipe that the thread is listening to char buf[1] = { 0x0 }; - int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf)); + int ret = safe_write(m_wakeup_wr_fd, buf, sizeof(buf)); // Close write end - retry_sys_call(::close, m_shutdown_wr_fd); - m_shutdown_wr_fd = -1; + retry_sys_call(::close, m_wakeup_wr_fd); + m_wakeup_wr_fd = -1; if (ret != 0) { ostringstream oss; @@ -140,8 +140,8 @@ std::string AdminSocket::destroy_shutdown_pipe() // Close read end. Doing this before join() blocks the listenter and prevents // joining. - retry_sys_call(::close, m_shutdown_rd_fd); - m_shutdown_rd_fd = -1; + retry_sys_call(::close, m_wakeup_rd_fd); + m_wakeup_rd_fd = -1; return ""; } @@ -222,9 +222,10 @@ void AdminSocket::entry() noexcept memset(fds, 0, sizeof(fds)); fds[0].fd = m_sock_fd; fds[0].events = POLLIN | POLLRDBAND; - fds[1].fd = m_shutdown_rd_fd; + fds[1].fd = m_wakeup_rd_fd; fds[1].events = POLLIN | POLLRDBAND; + ldout(m_cct,20) << __func__ << " waiting" << dendl; int ret = poll(fds, 2, -1); if (ret < 0) { int err = errno; @@ -235,12 +236,18 @@ void AdminSocket::entry() noexcept << cpp_strerror(err) << dendl; return; } + ldout(m_cct,20) << __func__ << " awake" << dendl; if (fds[0].revents & POLLIN) { // Send out some data do_accept(); } if (fds[1].revents & POLLIN) { + // read off one byte + char buf; + ::read(m_wakeup_rd_fd, &buf, 1); + } + if (m_shutdown) { // Parent wants us to shut down return; } @@ -592,7 +599,7 @@ bool AdminSocket::init(const std::string& path) /* Set up things for the new thread */ std::string err; int pipe_rd = -1, pipe_wr = -1; - err = create_shutdown_pipe(&pipe_rd, &pipe_wr); + err = create_wakeup_pipe(&pipe_rd, &pipe_wr); if (!err.empty()) { lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl; return false; @@ -608,8 +615,8 @@ bool AdminSocket::init(const std::string& path) /* Create new thread */ m_sock_fd = sock_fd; - m_shutdown_rd_fd = pipe_rd; - m_shutdown_wr_fd = pipe_wr; + m_wakeup_rd_fd = pipe_rd; + m_wakeup_wr_fd = pipe_wr; m_path = path; version_hook = std::make_unique(); @@ -634,12 +641,13 @@ void AdminSocket::shutdown() // Under normal operation this is unlikely to occur. However for some unit // tests, some object members are not initialized and so cannot be deleted // without fault. - if (m_shutdown_wr_fd < 0) + if (m_wakeup_wr_fd < 0) return; ldout(m_cct, 5) << "shutdown" << dendl; + m_shutdown = true; - auto err = destroy_shutdown_pipe(); + auto err = destroy_wakeup_pipe(); if (!err.empty()) { lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl; } diff --git a/src/common/admin_socket.h b/src/common/admin_socket.h index 688db292739..3591e1711d4 100644 --- a/src/common/admin_socket.h +++ b/src/common/admin_socket.h @@ -100,8 +100,8 @@ private: void shutdown(); - std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr); - std::string destroy_shutdown_pipe(); + std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr); + std::string destroy_wakeup_pipe(); std::string bind_and_listen(const std::string &sock_path, int *fd); std::thread th; @@ -114,8 +114,9 @@ private: CephContext *m_cct; std::string m_path; int m_sock_fd = -1; - int m_shutdown_rd_fd = -1; - int m_shutdown_wr_fd = -1; + int m_wakeup_rd_fd = -1; + int m_wakeup_wr_fd = -1; + bool m_shutdown = false; bool in_hook = false; std::condition_variable in_hook_cond; -- 2.39.5