]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
admin_socket: refactor
authorSage Weil <sage@newdream.net>
Thu, 5 Jan 2012 18:46:31 +0000 (10:46 -0800)
committerSage Weil <sage@newdream.net>
Thu, 5 Jan 2012 18:46:31 +0000 (10:46 -0800)
Combine AdminSocketConfigObs with AdminSocket so that we can interact
with it via the cct.  Simpler class structure.  Less pointer indirection.

Signed-off-by: Sage Weil <sage@newdream.net>
src/common/admin_socket.cc
src/common/admin_socket.h
src/common/ceph_context.cc
src/common/ceph_context.h

index ff8c5de7115314743d0b11148feb32bf740d1a65..568ec9a6a20eff1d6ba6048e2ac51d89f2e5655f 100644 (file)
@@ -94,12 +94,26 @@ static void add_cleanup_file(const char *file)
   pthread_mutex_unlock(&cleanup_lock);
 }
 
+
+AdminSocket::AdminSocket(CephContext *cct)
+  : m_cct(cct),
+    m_sock_fd(-1),
+    m_shutdown_rd_fd(-1),
+    m_shutdown_wr_fd(-1)
+{
+}
+
+AdminSocket::~AdminSocket()
+{
+  shutdown();
+}
+
 /*
  * This thread listens on the UNIX domain socket for incoming connections.
  * It only handles one connection at a time at the moment. All I/O is nonblocking,
  * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
  *
- * This thread also listens to m_shutdown_fd. If there is any data sent to this
+ * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
  * pipe, the thread terminates itself gracefully, allowing the
  * AdminSocketConfigObs class to join() it.
  */
@@ -107,258 +121,215 @@ static void add_cleanup_file(const char *file)
 #define PFL_SUCCESS ((void*)(intptr_t)0)
 #define PFL_FAIL ((void*)(intptr_t)1)
 
-class AdminSocket : public Thread
+std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
 {
-public:
-  static std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
-  {
-    int pipefd[2];
-    int ret = pipe_cloexec(pipefd);
-    if (ret < 0) {
-      ostringstream oss;
-      oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
-      return oss.str();
-    }
-
-    *pipe_rd = pipefd[0];
-    *pipe_wr = pipefd[1];
-    return "";
+  int pipefd[2];
+  int ret = pipe_cloexec(pipefd);
+  if (ret < 0) {
+    ostringstream oss;
+    oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
+    return oss.str();
   }
+  
+  *pipe_rd = pipefd[0];
+  *pipe_wr = pipefd[1];
+  return "";
+}
 
-  static std::string bind_and_listen(const std::string &sock_path, int *fd)
-  {
-    struct sockaddr_un address;
-    if (sock_path.size() > sizeof(address.sun_path) - 1) {
-      ostringstream oss;
-      oss << "AdminSocket::bind_and_listen: "
-         << "The UNIX domain socket path " << sock_path << " is too long! The "
-         << "maximum length on this system is "
-         << (sizeof(address.sun_path) - 1);
-      return oss.str();
-    }
-    int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
-    if (sock_fd < 0) {
-      int err = errno;
-      ostringstream oss;
-      oss << "AdminSocket::bind_and_listen: "
-         << "failed to create socket: " << cpp_strerror(err);
-      return oss.str();
-    }
-    fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
-    memset(&address, 0, sizeof(struct sockaddr_un));
-    address.sun_family = AF_UNIX;
-    snprintf(address.sun_path, sizeof(address.sun_path),
-            "%s", sock_path.c_str());
-    if (bind(sock_fd, (struct sockaddr*)&address,
-              sizeof(struct sockaddr_un)) != 0) {
-      int err = errno;
-      if (err == EADDRINUSE) {
-       // The old UNIX domain socket must still be there.
-       // Let's unlink it and try again.
-       TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
-       if (bind(sock_fd, (struct sockaddr*)&address,
-                  sizeof(struct sockaddr_un)) == 0) {
-         err = 0;
-       }
-       else {
-         err = errno;
-       }
+std::string AdminSocket::bind_and_listen(const std::string &sock_path, int *fd)
+{
+  struct sockaddr_un address;
+  if (sock_path.size() > sizeof(address.sun_path) - 1) {
+    ostringstream oss;
+    oss << "AdminSocket::bind_and_listen: "
+       << "The UNIX domain socket path " << sock_path << " is too long! The "
+       << "maximum length on this system is "
+       << (sizeof(address.sun_path) - 1);
+    return oss.str();
+  }
+  int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
+  if (sock_fd < 0) {
+    int err = errno;
+    ostringstream oss;
+    oss << "AdminSocket::bind_and_listen: "
+       << "failed to create socket: " << cpp_strerror(err);
+    return oss.str();
+  }
+  fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
+  memset(&address, 0, sizeof(struct sockaddr_un));
+  address.sun_family = AF_UNIX;
+  snprintf(address.sun_path, sizeof(address.sun_path),
+          "%s", sock_path.c_str());
+  if (bind(sock_fd, (struct sockaddr*)&address,
+          sizeof(struct sockaddr_un)) != 0) {
+    int err = errno;
+    if (err == EADDRINUSE) {
+      // The old UNIX domain socket must still be there.
+      // Let's unlink it and try again.
+      TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+      if (bind(sock_fd, (struct sockaddr*)&address,
+              sizeof(struct sockaddr_un)) == 0) {
+       err = 0;
       }
-      if (err != 0) {
-       ostringstream oss;
-       oss << "AdminSocket::bind_and_listen: "
-           << "failed to bind the UNIX domain socket to '" << sock_path
-           << "': " << cpp_strerror(err);
-       close(sock_fd);
-       return oss.str();
+      else {
+       err = errno;
       }
     }
-    if (listen(sock_fd, 5) != 0) {
-      int err = errno;
+    if (err != 0) {
       ostringstream oss;
       oss << "AdminSocket::bind_and_listen: "
-         << "failed to listen to socket: " << cpp_strerror(err);
+         << "failed to bind the UNIX domain socket to '" << sock_path
+         << "': " << cpp_strerror(err);
       close(sock_fd);
-      TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
       return oss.str();
     }
-    *fd = sock_fd;
-    return "";
   }
-
-  AdminSocket(int sock_fd, int shutdown_fd, AdminSocketConfigObs *parent)
-    : m_sock_fd(sock_fd),
-      m_shutdown_fd(shutdown_fd),
-      m_parent(parent)
-  {
-  }
-
-  virtual ~AdminSocket()
-  {
-    if (m_sock_fd != -1)
-      close(m_sock_fd);
-    if (m_shutdown_fd != -1)
-      close(m_shutdown_fd);
-  }
-
-  virtual void* entry()
-  {
-    while (true) {
-      struct pollfd fds[2];
-      memset(fds, 0, sizeof(fds));
-      fds[0].fd = m_sock_fd;
-      fds[0].events = POLLIN | POLLRDBAND;
-      fds[1].fd = m_shutdown_fd;
-      fds[1].events = POLLIN | POLLRDBAND;
-
-      int ret = poll(fds, 2, -1);
-      if (ret < 0) {
-       int err = errno;
-       if (err == EINTR) {
-         continue;
-       }
-       lderr(m_parent->m_cct) << "AdminSocket: poll(2) error: '"
-           << cpp_strerror(err) << dendl;
-       return PFL_FAIL;
-      }
-
-      if (fds[0].revents & POLLIN) {
-       // Send out some data
-       do_accept();
-      }
-      if (fds[1].revents & POLLIN) {
-       // Parent wants us to shut down
-       return PFL_SUCCESS;
-      }
-    }
+  if (listen(sock_fd, 5) != 0) {
+    int err = errno;
+    ostringstream oss;
+    oss << "AdminSocket::bind_and_listen: "
+         << "failed to listen to socket: " << cpp_strerror(err);
+    close(sock_fd);
+    TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
+    return oss.str();
   }
+  *fd = sock_fd;
+  return "";
+}
 
-
-private:
-  bool do_accept()
-  {
-    int ret;
-    struct sockaddr_un address;
-    socklen_t address_length = sizeof(address);
-    ldout(m_parent->m_cct, 30) << "AdminSocket: calling accept" << dendl;
-    int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
-                                  &address_length);
-    ldout(m_parent->m_cct, 30) << "AdminSocket: finished accept" << dendl;
-    if (connection_fd < 0) {
+void* AdminSocket::entry()
+{
+  while (true) {
+    struct pollfd fds[2];
+    memset(fds, 0, sizeof(fds));
+    fds[0].fd = m_sock_fd;
+    fds[0].events = POLLIN | POLLRDBAND;
+    fds[1].fd = m_shutdown_rd_fd;
+    fds[1].events = POLLIN | POLLRDBAND;
+
+    int ret = poll(fds, 2, -1);
+    if (ret < 0) {
       int err = errno;
-      lderr(m_parent->m_cct) << "AdminSocket: do_accept error: '"
-         << cpp_strerror(err) << dendl;
-      return false;
+      if (err == EINTR) {
+       continue;
+      }
+      lderr(m_cct) << "AdminSocket: poll(2) error: '"
+                  << cpp_strerror(err) << dendl;
+      return PFL_FAIL;
     }
 
-    uint32_t request, request_raw;
-    ret = safe_read(connection_fd, &request_raw, sizeof(request_raw));
-    if (ret < 0) {
-      lderr(m_parent->m_cct) << "AdminSocket: error reading request code: "
-         << cpp_strerror(ret) << dendl;
-      close(connection_fd);
-      return false;
-    }
-    request = ntohl(request_raw);
-    switch (request) {
-      case 0:
-       /* version request */
-       ret = handle_version_request(connection_fd);
-       break;
-      case 1:
-       /* data request */
-       ret = handle_json_request(connection_fd, false);
-       break;
-      case 2:
-       /* schema request */
-       ret = handle_json_request(connection_fd, true);
-       break;
-      default:
-       lderr(m_parent->m_cct) << "AdminSocket: unknown request "
-           << "code " << request << dendl;
-       ret = false;
-       break;
+    if (fds[0].revents & POLLIN) {
+      // Send out some data
+      do_accept();
     }
-    TEMP_FAILURE_RETRY(close(connection_fd));
-    return ret;
-  }
-
-  bool handle_version_request(int connection_fd)
-  {
-    uint32_t version_raw = htonl(CEPH_ADMIN_SOCK_VERSION);
-    int ret = safe_write(connection_fd, &version_raw, sizeof(version_raw));
-    if (ret < 0) {
-      lderr(m_parent->m_cct) << "AdminSocket: error writing version_raw: "
-         << cpp_strerror(ret) << dendl;
-      return false;
+    if (fds[1].revents & POLLIN) {
+      // Parent wants us to shut down
+      return PFL_SUCCESS;
     }
-    return true;
   }
+}
 
-  bool handle_json_request(int connection_fd, bool schema)
-  {
-    std::vector<char> buffer;
-    buffer.reserve(512);
-
-    PerfCountersCollection *coll = m_parent->m_cct->get_perfcounters_collection();
-    if (coll) {
-      coll->write_json_to_buf(buffer, schema);
-    }
 
-    uint32_t len = htonl(buffer.size());
-    int ret = safe_write(connection_fd, &len, sizeof(len));
-    if (ret < 0) {
-      lderr(m_parent->m_cct) << "AdminSocket: error writing message size: "
-         << cpp_strerror(ret) << dendl;
-      return false;
-    }
-    ret = safe_write(connection_fd, &buffer[0], buffer.size());
-    if (ret < 0) {
-      lderr(m_parent->m_cct) << "AdminSocket: error writing message: "
-         << cpp_strerror(ret) << dendl;
-      return false;
-    }
-    ldout(m_parent->m_cct, 30) << "AdminSocket: handle_json_request succeeded."
-        << dendl;
-    return true;
+bool AdminSocket::do_accept()
+{
+  int ret;
+  struct sockaddr_un address;
+  socklen_t address_length = sizeof(address);
+  ldout(m_cct, 30) << "AdminSocket: calling accept" << dendl;
+  int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
+                            &address_length);
+  ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
+  if (connection_fd < 0) {
+    int err = errno;
+    lderr(m_cct) << "AdminSocket: do_accept error: '"
+                          << cpp_strerror(err) << dendl;
+    return false;
   }
 
-  AdminSocket(AdminSocket &rhs);
-  const AdminSocket &operator=(const AdminSocket &rhs);
-
-  int m_sock_fd;
-  int m_shutdown_fd;
-  AdminSocketConfigObs *m_parent;
-};
+  uint32_t request, request_raw;
+  ret = safe_read(connection_fd, &request_raw, sizeof(request_raw));
+  if (ret < 0) {
+    lderr(m_cct) << "AdminSocket: error reading request code: "
+                          << cpp_strerror(ret) << dendl;
+    close(connection_fd);
+    return false;
+  }
+  request = ntohl(request_raw);
+  switch (request) {
+  case 0:
+    /* version request */
+    ret = handle_version_request(connection_fd);
+    break;
+  case 1:
+    /* data request */
+    ret = handle_json_request(connection_fd, false);
+    break;
+  case 2:
+    /* schema request */
+    ret = handle_json_request(connection_fd, true);
+    break;
+  default:
+    lderr(m_cct) << "AdminSocket: unknown request "
+                          << "code " << request << dendl;
+    ret = false;
+    break;
+  }
+  TEMP_FAILURE_RETRY(close(connection_fd));
+  return ret;
+}
 
-/*
- * The AdminSocketConfigObs receives callbacks from the configuration
- * management system. It will create the AdminSocket thread when the
- * appropriate configuration is set.
- */
-AdminSocketConfigObs::AdminSocketConfigObs(CephContext *cct)
-  : m_cct(cct),
-    m_thread(NULL),
-    m_shutdown_fd(-1)
+bool AdminSocket::handle_version_request(int connection_fd)
 {
+  uint32_t version_raw = htonl(CEPH_ADMIN_SOCK_VERSION);
+  int ret = safe_write(connection_fd, &version_raw, sizeof(version_raw));
+  if (ret < 0) {
+    lderr(m_cct) << "AdminSocket: error writing version_raw: "
+                          << cpp_strerror(ret) << dendl;
+    return false;
+  }
+  return true;
 }
 
-AdminSocketConfigObs::~AdminSocketConfigObs()
+bool AdminSocket::handle_json_request(int connection_fd, bool schema)
 {
-  shutdown();
+  std::vector<char> buffer;
+  buffer.reserve(512);
+
+  PerfCountersCollection *coll = m_cct->get_perfcounters_collection();
+  if (coll) {
+    coll->write_json_to_buf(buffer, schema);
+  }
+
+  uint32_t len = htonl(buffer.size());
+  int ret = safe_write(connection_fd, &len, sizeof(len));
+  if (ret < 0) {
+    lderr(m_cct) << "AdminSocket: error writing message size: "
+                          << cpp_strerror(ret) << dendl;
+    return false;
+  }
+  ret = safe_write(connection_fd, &buffer[0], buffer.size());
+  if (ret < 0) {
+    lderr(m_cct) << "AdminSocket: error writing message: "
+                          << cpp_strerror(ret) << dendl;
+    return false;
+  }
+  ldout(m_cct, 30) << "AdminSocket: handle_json_request succeeded."
+                            << dendl;
+  return true;
 }
 
-const char** AdminSocketConfigObs::get_tracked_conf_keys() const
+const char** AdminSocket::get_tracked_conf_keys() const
 {
-  static const char *KEYS[] = { "admin_socket",
-         "internal_safe_to_start_threads",
-         NULL
+  static const char *KEYS[] = {
+    "admin_socket",
+    "internal_safe_to_start_threads",
+    NULL
   };
   return KEYS;
 }
 
-void AdminSocketConfigObs::handle_conf_change(const md_config_t *conf,
-                                             const std::set <std::string> &changed)
+void AdminSocket::handle_conf_change(const md_config_t *conf,
+                                    const std::set <std::string> &changed)
 {
   if (!conf->internal_safe_to_start_threads) {
     // We can't do anything until it's safe to start threads.
@@ -374,18 +345,18 @@ void AdminSocketConfigObs::handle_conf_change(const md_config_t *conf,
   }
 }
 
-bool AdminSocketConfigObs::init(const std::string &path)
+bool AdminSocket::init(const std::string &path)
 {
   /* Set up things for the new thread */
   std::string err;
   int pipe_rd = -1, pipe_wr = -1;
-  err = AdminSocket::create_shutdown_pipe(&pipe_rd, &pipe_wr);
+  err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
     return false;
   }
   int sock_fd;
-  err = AdminSocket::bind_and_listen(path, &sock_fd);
+  err = bind_and_listen(path, &sock_fd);
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocketConfigObs::init: failed: " << err << dendl;
     close(pipe_rd);
@@ -394,42 +365,32 @@ bool AdminSocketConfigObs::init(const std::string &path)
   }
 
   /* Create new thread */
-  m_thread = new (std::nothrow) AdminSocket(sock_fd, pipe_rd, this);
-  if (!m_thread) {
-    lderr(m_cct) << "AdminSocketConfigObs::init: failed: " << err << dendl;
-    TEMP_FAILURE_RETRY(unlink(path.c_str()));
-    close(sock_fd);
-    close(pipe_rd);
-    close(pipe_wr);
-    return false;
-  }
+  m_sock_fd = sock_fd;
+  m_shutdown_rd_fd = pipe_rd;
+  m_shutdown_wr_fd = pipe_wr;
   m_path = path;
-  m_thread->create();
-  m_shutdown_fd = pipe_wr;
+  create();
   add_cleanup_file(m_path.c_str());
   return true;
 }
 
-void AdminSocketConfigObs::shutdown()
+void AdminSocket::shutdown()
 {
-  if (!m_thread)
+  if (m_shutdown_wr_fd < 0)
     return;
+
   // Send a byte to the shutdown pipe that the thread is listening to
   char buf[1] = { 0x0 };
-  int ret = safe_write(m_shutdown_fd, buf, sizeof(buf));
-  TEMP_FAILURE_RETRY(close(m_shutdown_fd));
-  m_shutdown_fd = -1;
+  int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+  TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
+  m_shutdown_wr_fd = -1;
 
   if (ret == 0) {
-    // Join and delete the thread
-    m_thread->join();
-    delete m_thread;
-  }
-  else {
-    lderr(m_cct) << "AdminSocketConfigObs::shutdown: failed to write "
-           "to thread shutdown pipe: error " << ret << dendl;
+    join();
+  } else {
+    lderr(m_cct) << "AdminSocket::shutdown: failed to write "
+      "to thread shutdown pipe: error " << ret << dendl;
   }
-  m_thread = NULL;
   remove_cleanup_file(m_path.c_str());
   m_path.clear();
 }
index 6b48d5eebe9a43c1ba87f66aa3f6d7eeb5bdc259..32ecac73439e1fb7cb0a6e16347eac7b0665b73e 100644 (file)
  */
 
 #include "common/config_obs.h"
+#include "common/Thread.h"
 
 #include <string>
+#include "include/buffer.h"
 
 class AdminSocket;
 class CephContext;
 
 #define CEPH_ADMIN_SOCK_VERSION 1U
 
-class AdminSocketConfigObs : public md_config_obs_t
+class AdminSocket : public Thread, public md_config_obs_t
 {
 public:
-  AdminSocketConfigObs(CephContext *cct);
-  ~AdminSocketConfigObs();
+  AdminSocket(CephContext *cct);
+  virtual ~AdminSocket();
+
+  // md_config_obs_t
   virtual const char** get_tracked_conf_keys() const;
   virtual void handle_conf_change(const md_config_t *conf,
-                         const std::set <std::string> &changed);
+                                 const std::set <std::string> &changed);
+
 private:
-  AdminSocketConfigObs(const AdminSocketConfigObs& rhs);
-  AdminSocketConfigObs& operator=(const AdminSocketConfigObs &rhs);
+  AdminSocket(const AdminSocket& rhs);
+  AdminSocket& operator=(const AdminSocket &rhs);
+
   bool init(const std::string &path);
   void shutdown();
 
+  std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr);
+  std::string bind_and_listen(const std::string &sock_path, int *fd);
+
+  void *entry();
+  bool do_accept();
+
+  bool handle_version_request(int connection_fd);
+  bool handle_json_request(int connection_fd, bool schema);
+
   CephContext *m_cct;
-  AdminSocket* m_thread;
   std::string m_path;
-  int m_shutdown_fd;
+  int m_sock_fd;
+  int m_shutdown_rd_fd;
+  int m_shutdown_wr_fd;
 
-  friend class AdminSocket;
   friend class AdminSocketTest;
 };
+
+
index 4634339c33b2f528b9e9f432f2f92444932f7f76..d6608979a55902a14a98710a4779535f6adf1689 100644 (file)
@@ -91,15 +91,15 @@ CephContext::CephContext(uint32_t module_type_)
     _dout(_doss),
     _module_type(module_type_),
     _service_thread(NULL),
-    _admin_socket_config_obs(NULL),
+    _admin_socket(NULL),
     _perf_counters_collection(NULL),
     _heartbeat_map(NULL)
 {
   pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
   _perf_counters_collection = new PerfCountersCollection(this);
   _conf->add_observer(_doss);
-  _admin_socket_config_obs = new AdminSocketConfigObs(this);
-  _conf->add_observer(_admin_socket_config_obs);
+  _admin_socket = new AdminSocket(this);
+  _conf->add_observer(_admin_socket);
   _heartbeat_map = new HeartbeatMap(this);
 }
 
@@ -109,7 +109,7 @@ CephContext::~CephContext()
 
   delete _heartbeat_map;
 
-  _conf->remove_observer(_admin_socket_config_obs);
+  _conf->remove_observer(_admin_socket);
   _conf->remove_observer(_doss);
 
   delete _perf_counters_collection;
index 583d5418a44fe53de3ef291f7fd30c47b0bb9e8d..ba99b1e87d059afcd9c2313f7112d4a031178ca8 100644 (file)
@@ -22,7 +22,7 @@
 template <typename T, typename U>
 class DoutStreambuf;
 
-class AdminSocketConfigObs;
+class AdminSocket;
 class CephContextServiceThread;
 class DoutLocker;
 class PerfCountersCollection;
@@ -85,7 +85,7 @@ private:
   CephContextServiceThread *_service_thread;
 
   /* The collection of profiling loggers associated with this context */
-  AdminSocketConfigObs *_admin_socket_config_obs;
+  AdminSocket *_admin_socket;
 
   /* lock which protects service thread creation, destruction, etc. */
   pthread_spinlock_t _service_thread_lock;