]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
admin_socket: string commands
authorSage Weil <sage@newdream.net>
Thu, 5 Jan 2012 21:57:58 +0000 (13:57 -0800)
committerSage Weil <sage@newdream.net>
Fri, 6 Jan 2012 17:31:11 +0000 (09:31 -0800)
Commands are strings.  Old __be32 works too.  'help' to list available
commands.

Signed-off-by: Sage Weil <sage@newdream.net>
src/common/admin_socket.cc
src/common/admin_socket.h
src/common/ceph_context.cc
src/common/ceph_context.h

index 568ec9a6a20eff1d6ba6048e2ac51d89f2e5655f..8d9ae60b0f2c124a766a56b28d5430da8243c279 100644 (file)
@@ -99,7 +99,8 @@ AdminSocket::AdminSocket(CephContext *cct)
   : m_cct(cct),
     m_sock_fd(-1),
     m_shutdown_rd_fd(-1),
-    m_shutdown_wr_fd(-1)
+    m_shutdown_wr_fd(-1),
+    m_lock("AdminSocket::m_lock")
 {
 }
 
@@ -246,76 +247,110 @@ bool AdminSocket::do_accept()
     return false;
   }
 
-  uint32_t request, request_raw;
-  ret = safe_read(connection_fd, &request_raw, sizeof(request_raw));
-  if (ret < 0) {
-    lderr(m_cct) << "AdminSocket: error reading request code: "
-                          << cpp_strerror(ret) << dendl;
-    close(connection_fd);
-    return false;
+  char cmd[80];
+  int pos = 0;
+  string c;
+  while (1) {
+    ret = safe_read(connection_fd, &cmd[pos], 1);
+    if (ret <= 0) {
+      lderr(m_cct) << "AdminSocket: error reading request code: "
+                  << cpp_strerror(ret) << dendl;
+      close(connection_fd);
+      return false;
+    }
+    //ldout(m_cct, 0) << "AdminSocket read byte " << (int)cmd[pos] << " pos " << pos << dendl;
+    if (cmd[0] == '\0') {
+      // old protocol: __be32
+      if (pos == 3 && cmd[0] == '\0') {
+       switch (cmd[3]) {
+       case 0:
+         c = "version";
+         break;
+       case 1:
+         c = "perfcounters_dump";
+         break;
+       case 2:
+         c = "perfcounters_schema";
+         break;
+       default:
+         c = "foo";
+         break;
+       }
+       break;
+      }
+    } else {
+      // new protocol: null or \n terminated string
+      if (cmd[pos] == '\n' || cmd[pos] == '\0') {
+       cmd[pos] = '\0';
+       c = cmd;
+       break;
+      }
+    }
+    pos++;
   }
-  request = ntohl(request_raw);
-  switch (request) {
-  case 0:
-    /* version request */
-    ret = handle_version_request(connection_fd);
-    break;
-  case 1:
-    /* data request */
-    ret = handle_json_request(connection_fd, false);
-    break;
-  case 2:
-    /* schema request */
-    ret = handle_json_request(connection_fd, true);
-    break;
-  default:
-    lderr(m_cct) << "AdminSocket: unknown request "
-                          << "code " << request << dendl;
-    ret = false;
-    break;
+
+  bool rval = false;
+
+  m_lock.Lock();
+  map<string,AdminSocketHook*>::iterator p = m_hooks.find(c);
+  bufferlist out;
+  if (p == m_hooks.end()) {
+    lderr(m_cct) << "AdminSocket: request '" << c << "' not defined" << dendl;
+  } else {
+    bool success = p->second->call(c, out);
+    if (!success) {
+      ldout(m_cct, 0) << "AdminSocket: request '" << c << "' to " << p->second << " failed" << dendl;
+      out.append("failed");
+    } else {
+      ldout(m_cct, 0) << "AdminSocket: request '" << c << "' to " << p->second
+                     << " returned " << out.length() << " bytes" << dendl;
+    }
+    uint32_t len = htonl(out.length());
+    int ret = safe_write(connection_fd, &len, sizeof(len));
+    if (ret < 0) {
+      lderr(m_cct) << "AdminSocket: error writing response length "
+                  << cpp_strerror(ret) << dendl;
+    } else {
+      ret = out.write_fd(connection_fd);
+      if (ret >= 0)
+       rval = true;
+    }
   }
+  m_lock.Unlock();
+
   TEMP_FAILURE_RETRY(close(connection_fd));
-  return ret;
+  return rval;
 }
 
-bool AdminSocket::handle_version_request(int connection_fd)
+int AdminSocket::register_command(std::string command, AdminSocketHook *hook, std::string help)
 {
-  uint32_t version_raw = htonl(CEPH_ADMIN_SOCK_VERSION);
-  int ret = safe_write(connection_fd, &version_raw, sizeof(version_raw));
-  if (ret < 0) {
-    lderr(m_cct) << "AdminSocket: error writing version_raw: "
-                          << cpp_strerror(ret) << dendl;
-    return false;
-  }
-  return true;
+  int ret;
+  m_lock.Lock();
+  if (m_hooks.count(command)) {
+    ret = -EEXIST;
+  } else {
+    m_hooks[command] = hook;
+    if (help.length())
+      m_help[command] = help;
+    ret = 0;
+  }  
+  m_lock.Unlock();
+  return ret;
 }
 
-bool AdminSocket::handle_json_request(int connection_fd, bool schema)
+int AdminSocket::unregister_command(std::string command)
 {
-  std::vector<char> buffer;
-  buffer.reserve(512);
-
-  PerfCountersCollection *coll = m_cct->get_perfcounters_collection();
-  if (coll) {
-    coll->write_json_to_buf(buffer, schema);
-  }
-
-  uint32_t len = htonl(buffer.size());
-  int ret = safe_write(connection_fd, &len, sizeof(len));
-  if (ret < 0) {
-    lderr(m_cct) << "AdminSocket: error writing message size: "
-                          << cpp_strerror(ret) << dendl;
-    return false;
-  }
-  ret = safe_write(connection_fd, &buffer[0], buffer.size());
-  if (ret < 0) {
-    lderr(m_cct) << "AdminSocket: error writing message: "
-                          << cpp_strerror(ret) << dendl;
-    return false;
-  }
-  ldout(m_cct, 30) << "AdminSocket: handle_json_request succeeded."
-                            << dendl;
-  return true;
+  int ret;
+  m_lock.Lock();
+  if (m_hooks.count(command)) {
+    m_hooks.erase(command);
+    m_help.erase(command);
+    ret = 0;
+  } else {
+    ret = -ENOENT;
+  }  
+  m_lock.Unlock();
+  return ret;
 }
 
 const char** AdminSocket::get_tracked_conf_keys() const
@@ -345,6 +380,42 @@ void AdminSocket::handle_conf_change(const md_config_t *conf,
   }
 }
 
+class VersionHook : public AdminSocketHook {
+public:
+  virtual bool call(std::string command, bufferlist& out) {
+    out.append(CEPH_ADMIN_SOCK_VERSION);
+    return true;
+  }
+};
+
+class HelpHook : public AdminSocketHook {
+  AdminSocket *m_as;
+public:
+  HelpHook(AdminSocket *as) : m_as(as) {}
+  bool call(string command, bufferlist& out) {
+    unsigned max = 0;
+    for (map<string,string>::iterator p = m_as->m_help.begin();
+        p != m_as->m_help.end();
+        ++p) {
+      if (p->first.length() > max)
+       max = p->first.length();
+    }
+    max += 1;
+    char spaces[max];
+    for (unsigned i=0; i<max; ++i)
+      spaces[i] = ' ';
+    for (map<string,string>::iterator p = m_as->m_help.begin();
+        p != m_as->m_help.end();
+        ++p) {
+      out.append(p->first);
+      out.append(spaces, max - p->first.length());
+      out.append(p->second);
+      out.append("\n");
+    }
+    return true;
+  }
+};
+
 bool AdminSocket::init(const std::string &path)
 {
   /* Set up things for the new thread */
@@ -369,6 +440,13 @@ bool AdminSocket::init(const std::string &path)
   m_shutdown_rd_fd = pipe_rd;
   m_shutdown_wr_fd = pipe_wr;
   m_path = path;
+
+  m_version_hook = new VersionHook;
+  register_command("version", m_version_hook, "get protocol version");
+  register_command("0", m_version_hook, "");
+  m_help_hook = new HelpHook(this);
+  register_command("help", m_help_hook, "list available commands");
+
   create();
   add_cleanup_file(m_path.c_str());
   return true;
@@ -391,6 +469,13 @@ void AdminSocket::shutdown()
     lderr(m_cct) << "AdminSocket::shutdown: failed to write "
       "to thread shutdown pipe: error " << ret << dendl;
   }
+
+  unregister_command("version");
+  unregister_command("0");
+  delete m_version_hook;
+  unregister_command("help");
+  delete m_help_hook;
+
   remove_cleanup_file(m_path.c_str());
   m_path.clear();
 }
index 32ecac73439e1fb7cb0a6e16347eac7b0665b73e..79f648b146a937a4736c1aeaab225c37dae62aa7 100644 (file)
 
 #include "common/config_obs.h"
 #include "common/Thread.h"
+#include "common/Mutex.h"
 
 #include <string>
+#include <map>
 #include "include/buffer.h"
 
 class AdminSocket;
 class CephContext;
 
-#define CEPH_ADMIN_SOCK_VERSION 1U
+#define CEPH_ADMIN_SOCK_VERSION "2"
+
+class AdminSocketHook {
+public:
+  virtual bool call(std::string command, bufferlist& out) = 0;
+  virtual ~AdminSocketHook() {};
+};
 
 class AdminSocket : public Thread, public md_config_obs_t
 {
@@ -34,6 +42,25 @@ public:
   virtual void handle_conf_change(const md_config_t *conf,
                                  const std::set <std::string> &changed);
 
+  /**
+   * register an admin socket command
+   *
+   * @param command command string
+   * @param hook implementaiton
+   * @param help help text.  if empty, command will not be included in 'help' output.
+   *
+   * @return 0 for success, -EEXIST if command already registered.
+   */
+  int register_command(std::string command, AdminSocketHook *hook, std::string help);
+
+  /**
+   * unregister an admin socket command
+   *
+   * @param command command string
+   * @return 0 on succest, -ENOENT if command dne.
+   */
+  int unregister_command(std::string command);
+  
 private:
   AdminSocket(const AdminSocket& rhs);
   AdminSocket& operator=(const AdminSocket &rhs);
@@ -47,16 +74,20 @@ private:
   void *entry();
   bool do_accept();
 
-  bool handle_version_request(int connection_fd);
-  bool handle_json_request(int connection_fd, bool schema);
-
   CephContext *m_cct;
   std::string m_path;
   int m_sock_fd;
   int m_shutdown_rd_fd;
   int m_shutdown_wr_fd;
 
+  Mutex m_lock;    // protects m_hooks, m_help
+  AdminSocketHook *m_version_hook, *m_help_hook;
+
+  std::map<std::string,AdminSocketHook*> m_hooks;
+  std::map<std::string,std::string> m_help;
+
   friend class AdminSocketTest;
+  friend class HelpHook;
 };
 
 
index d6608979a55902a14a98710a4779535f6adf1689..285d9852bdcba453f74fa6e582a5c4f6b9df12bc 100644 (file)
@@ -85,6 +85,31 @@ private:
   CephContext *_cct;
 };
 
+
+// perfcounter hooks
+
+class PerfCountersHook : public AdminSocketHook {
+  PerfCountersCollection *m_coll;
+
+public:
+  PerfCountersHook(PerfCountersCollection *c) : m_coll(c) {}
+
+  bool call(std::string command, bufferlist& out) {
+    std::vector<char> v;
+    if (command == "perfcounters_dump" ||
+       command == "1")
+      m_coll->write_json_to_buf(v, false);
+    else if (command == "perfcounters_schema" ||
+            command == "2")
+      m_coll->write_json_to_buf(v, true);
+    else 
+      assert(0 == "registered under wrong command?");    
+    out.append(&v[0], v.size());
+    return true;
+  }
+};
+
+
 CephContext::CephContext(uint32_t module_type_)
   : _conf(new md_config_t()),
     _doss(new DoutStreambuf <char, std::basic_string<char>::traits_type>()),
@@ -101,12 +126,24 @@ CephContext::CephContext(uint32_t module_type_)
   _admin_socket = new AdminSocket(this);
   _conf->add_observer(_admin_socket);
   _heartbeat_map = new HeartbeatMap(this);
+
+  _perf_counters_hook = new PerfCountersHook(_perf_counters_collection);
+  _admin_socket->register_command("perfcounters_dump", _perf_counters_hook, "dump perfcounters value");
+  _admin_socket->register_command("1", _perf_counters_hook, "");
+  _admin_socket->register_command("perfcounters_schema", _perf_counters_hook, "dump perfcounters schema");
+  _admin_socket->register_command("2", _perf_counters_hook, "");
 }
 
 CephContext::~CephContext()
 {
   join_service_thread();
 
+  _admin_socket->unregister_command("perfcounters_dump");
+  _admin_socket->unregister_command("1");
+  _admin_socket->unregister_command("perfcounters_schema");
+  _admin_socket->unregister_command("2");
+  delete _perf_counters_hook;
+
   delete _heartbeat_map;
 
   _conf->remove_observer(_admin_socket);
@@ -193,3 +230,5 @@ PerfCountersCollection *CephContext::get_perfcounters_collection()
 {
   return _perf_counters_collection;
 }
+
+
index ba99b1e87d059afcd9c2313f7112d4a031178ca8..8a8ad71925d72c7bad95a88aafd418696f65c0e5 100644 (file)
@@ -28,6 +28,7 @@ class DoutLocker;
 class PerfCountersCollection;
 class md_config_obs_t;
 class md_config_t;
+class PerfCountersHook;
 
 namespace ceph {
   class HeartbeatMap;
@@ -95,6 +96,8 @@ private:
 
   md_config_obs_t *_perf_counters_conf_obs;
 
+  PerfCountersHook *_perf_counters_hook;
+
   ceph::HeartbeatMap *_heartbeat_map;
 };