]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
common/admin_socket: use pipe for general wakeup
authorSage Weil <sage@redhat.com>
Thu, 5 Sep 2019 19:23:27 +0000 (14:23 -0500)
committerSage Weil <sage@redhat.com>
Tue, 1 Oct 2019 21:30:52 +0000 (16:30 -0500)
Use the pipe to wake up the thread.  Use a separate bool to signal a
shutdown.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/admin_socket.cc
src/common/admin_socket.h

index 58075b054e72ad0f60fa520919ce0af0f5273caf..ff71c02b9ee1dfbe1fe3338616b2065a1995afdf 100644 (file)
@@ -99,18 +99,18 @@ AdminSocket::~AdminSocket()
  * It only handles one connection at a time at the moment. All I/O is nonblocking,
  * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
  *
- * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
- * pipe, the thread terminates itself gracefully, allowing the
- * AdminSocketConfigObs class to join() it.
+ * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this
+ * pipe, the thread wakes up.  If m_shutdown is set, the thread terminates
+ * itself gracefully, allowing the AdminSocketConfigObs class to join() it.
  */
 
-std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
+std::string AdminSocket::create_wakeup_pipe(int *pipe_rd, int *pipe_wr)
 {
   int pipefd[2];
-  if (pipe_cloexec(pipefd, 0) < 0) {
+  if (pipe_cloexec(pipefd, O_NONBLOCK) < 0) {
     int e = errno;
     ostringstream oss;
-    oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e);
+    oss << "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e);
     return oss.str();
   }
   
@@ -119,15 +119,15 @@ std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
   return "";
 }
 
-std::string AdminSocket::destroy_shutdown_pipe()
+std::string AdminSocket::destroy_wakeup_pipe()
 {
-  // Send a byte to the shutdown pipe that the thread is listening to
+  // Send a byte to the wakeup pipe that the thread is listening to
   char buf[1] = { 0x0 };
-  int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+  int ret = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
 
   // Close write end
-  retry_sys_call(::close, m_shutdown_wr_fd);
-  m_shutdown_wr_fd = -1;
+  retry_sys_call(::close, m_wakeup_wr_fd);
+  m_wakeup_wr_fd = -1;
 
   if (ret != 0) {
     ostringstream oss;
@@ -140,8 +140,8 @@ std::string AdminSocket::destroy_shutdown_pipe()
 
   // Close read end. Doing this before join() blocks the listenter and prevents
   // joining.
-  retry_sys_call(::close, m_shutdown_rd_fd);
-  m_shutdown_rd_fd = -1;
+  retry_sys_call(::close, m_wakeup_rd_fd);
+  m_wakeup_rd_fd = -1;
 
   return "";
 }
@@ -222,9 +222,10 @@ void AdminSocket::entry() noexcept
     memset(fds, 0, sizeof(fds));
     fds[0].fd = m_sock_fd;
     fds[0].events = POLLIN | POLLRDBAND;
-    fds[1].fd = m_shutdown_rd_fd;
+    fds[1].fd = m_wakeup_rd_fd;
     fds[1].events = POLLIN | POLLRDBAND;
 
+    ldout(m_cct,20) << __func__ << " waiting" << dendl;
     int ret = poll(fds, 2, -1);
     if (ret < 0) {
       int err = errno;
@@ -235,12 +236,18 @@ void AdminSocket::entry() noexcept
                   << cpp_strerror(err) << dendl;
       return;
     }
+    ldout(m_cct,20) << __func__ << " awake" << dendl;
 
     if (fds[0].revents & POLLIN) {
       // Send out some data
       do_accept();
     }
     if (fds[1].revents & POLLIN) {
+      // read off one byte
+      char buf;
+      ::read(m_wakeup_rd_fd, &buf, 1);
+    }
+    if (m_shutdown) {
       // Parent wants us to shut down
       return;
     }
@@ -592,7 +599,7 @@ bool AdminSocket::init(const std::string& path)
   /* Set up things for the new thread */
   std::string err;
   int pipe_rd = -1, pipe_wr = -1;
-  err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
+  err = create_wakeup_pipe(&pipe_rd, &pipe_wr);
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
     return false;
@@ -608,8 +615,8 @@ bool AdminSocket::init(const std::string& path)
 
   /* Create new thread */
   m_sock_fd = sock_fd;
-  m_shutdown_rd_fd = pipe_rd;
-  m_shutdown_wr_fd = pipe_wr;
+  m_wakeup_rd_fd = pipe_rd;
+  m_wakeup_wr_fd = pipe_wr;
   m_path = path;
 
   version_hook = std::make_unique<VersionHook>();
@@ -634,12 +641,13 @@ void AdminSocket::shutdown()
   // Under normal operation this is unlikely to occur.  However for some unit
   // tests, some object members are not initialized and so cannot be deleted
   // without fault.
-  if (m_shutdown_wr_fd < 0)
+  if (m_wakeup_wr_fd < 0)
     return;
 
   ldout(m_cct, 5) << "shutdown" << dendl;
+  m_shutdown = true;
 
-  auto err = destroy_shutdown_pipe();
+  auto err = destroy_wakeup_pipe();
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl;
   }
index 688db292739b4389253f51355575f5e10ea9178b..3591e1711d40a8c5449411e1aa4edf89fa5e8201 100644 (file)
@@ -100,8 +100,8 @@ private:
 
   void shutdown();
 
-  std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr);
-  std::string destroy_shutdown_pipe();
+  std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr);
+  std::string destroy_wakeup_pipe();
   std::string bind_and_listen(const std::string &sock_path, int *fd);
 
   std::thread th;
@@ -114,8 +114,9 @@ private:
   CephContext *m_cct;
   std::string m_path;
   int m_sock_fd = -1;
-  int m_shutdown_rd_fd = -1;
-  int m_shutdown_wr_fd = -1;
+  int m_wakeup_rd_fd = -1;
+  int m_wakeup_wr_fd = -1;
+  bool m_shutdown = false;
 
   bool in_hook = false;
   std::condition_variable in_hook_cond;