: m_cct(cct),
m_sock_fd(-1),
m_shutdown_rd_fd(-1),
- m_shutdown_wr_fd(-1)
+ m_shutdown_wr_fd(-1),
+ m_lock("AdminSocket::m_lock")
{
}
return false;
}
- uint32_t request, request_raw;
- ret = safe_read(connection_fd, &request_raw, sizeof(request_raw));
- if (ret < 0) {
- lderr(m_cct) << "AdminSocket: error reading request code: "
- << cpp_strerror(ret) << dendl;
- close(connection_fd);
- return false;
+ char cmd[80];
+ int pos = 0;
+ string c;
+ while (1) {
+ ret = safe_read(connection_fd, &cmd[pos], 1);
+ if (ret <= 0) {
+ lderr(m_cct) << "AdminSocket: error reading request code: "
+ << cpp_strerror(ret) << dendl;
+ close(connection_fd);
+ return false;
+ }
+ //ldout(m_cct, 0) << "AdminSocket read byte " << (int)cmd[pos] << " pos " << pos << dendl;
+ if (cmd[0] == '\0') {
+ // old protocol: __be32
+ if (pos == 3 && cmd[0] == '\0') {
+ switch (cmd[3]) {
+ case 0:
+ c = "version";
+ break;
+ case 1:
+ c = "perfcounters_dump";
+ break;
+ case 2:
+ c = "perfcounters_schema";
+ break;
+ default:
+ c = "foo";
+ break;
+ }
+ break;
+ }
+ } else {
+ // new protocol: null or \n terminated string
+ if (cmd[pos] == '\n' || cmd[pos] == '\0') {
+ cmd[pos] = '\0';
+ c = cmd;
+ break;
+ }
+ }
+ pos++;
}
- request = ntohl(request_raw);
- switch (request) {
- case 0:
- /* version request */
- ret = handle_version_request(connection_fd);
- break;
- case 1:
- /* data request */
- ret = handle_json_request(connection_fd, false);
- break;
- case 2:
- /* schema request */
- ret = handle_json_request(connection_fd, true);
- break;
- default:
- lderr(m_cct) << "AdminSocket: unknown request "
- << "code " << request << dendl;
- ret = false;
- break;
+
+ bool rval = false;
+
+ m_lock.Lock();
+ map<string,AdminSocketHook*>::iterator p = m_hooks.find(c);
+ bufferlist out;
+ if (p == m_hooks.end()) {
+ lderr(m_cct) << "AdminSocket: request '" << c << "' not defined" << dendl;
+ } else {
+ bool success = p->second->call(c, out);
+ if (!success) {
+ ldout(m_cct, 0) << "AdminSocket: request '" << c << "' to " << p->second << " failed" << dendl;
+ out.append("failed");
+ } else {
+ ldout(m_cct, 0) << "AdminSocket: request '" << c << "' to " << p->second
+ << " returned " << out.length() << " bytes" << dendl;
+ }
+ uint32_t len = htonl(out.length());
+ int ret = safe_write(connection_fd, &len, sizeof(len));
+ if (ret < 0) {
+ lderr(m_cct) << "AdminSocket: error writing response length "
+ << cpp_strerror(ret) << dendl;
+ } else {
+ ret = out.write_fd(connection_fd);
+ if (ret >= 0)
+ rval = true;
+ }
}
+ m_lock.Unlock();
+
TEMP_FAILURE_RETRY(close(connection_fd));
- return ret;
+ return rval;
}
-bool AdminSocket::handle_version_request(int connection_fd)
+int AdminSocket::register_command(std::string command, AdminSocketHook *hook, std::string help)
{
- uint32_t version_raw = htonl(CEPH_ADMIN_SOCK_VERSION);
- int ret = safe_write(connection_fd, &version_raw, sizeof(version_raw));
- if (ret < 0) {
- lderr(m_cct) << "AdminSocket: error writing version_raw: "
- << cpp_strerror(ret) << dendl;
- return false;
- }
- return true;
+ int ret;
+ m_lock.Lock();
+ if (m_hooks.count(command)) {
+ ret = -EEXIST;
+ } else {
+ m_hooks[command] = hook;
+ if (help.length())
+ m_help[command] = help;
+ ret = 0;
+ }
+ m_lock.Unlock();
+ return ret;
}
-bool AdminSocket::handle_json_request(int connection_fd, bool schema)
+int AdminSocket::unregister_command(std::string command)
{
- std::vector<char> buffer;
- buffer.reserve(512);
-
- PerfCountersCollection *coll = m_cct->get_perfcounters_collection();
- if (coll) {
- coll->write_json_to_buf(buffer, schema);
- }
-
- uint32_t len = htonl(buffer.size());
- int ret = safe_write(connection_fd, &len, sizeof(len));
- if (ret < 0) {
- lderr(m_cct) << "AdminSocket: error writing message size: "
- << cpp_strerror(ret) << dendl;
- return false;
- }
- ret = safe_write(connection_fd, &buffer[0], buffer.size());
- if (ret < 0) {
- lderr(m_cct) << "AdminSocket: error writing message: "
- << cpp_strerror(ret) << dendl;
- return false;
- }
- ldout(m_cct, 30) << "AdminSocket: handle_json_request succeeded."
- << dendl;
- return true;
+ int ret;
+ m_lock.Lock();
+ if (m_hooks.count(command)) {
+ m_hooks.erase(command);
+ m_help.erase(command);
+ ret = 0;
+ } else {
+ ret = -ENOENT;
+ }
+ m_lock.Unlock();
+ return ret;
}
const char** AdminSocket::get_tracked_conf_keys() const
}
}
+class VersionHook : public AdminSocketHook {
+public:
+ virtual bool call(std::string command, bufferlist& out) {
+ out.append(CEPH_ADMIN_SOCK_VERSION);
+ return true;
+ }
+};
+
+class HelpHook : public AdminSocketHook {
+ AdminSocket *m_as;
+public:
+ HelpHook(AdminSocket *as) : m_as(as) {}
+ bool call(string command, bufferlist& out) {
+ unsigned max = 0;
+ for (map<string,string>::iterator p = m_as->m_help.begin();
+ p != m_as->m_help.end();
+ ++p) {
+ if (p->first.length() > max)
+ max = p->first.length();
+ }
+ max += 1;
+ char spaces[max];
+ for (unsigned i=0; i<max; ++i)
+ spaces[i] = ' ';
+ for (map<string,string>::iterator p = m_as->m_help.begin();
+ p != m_as->m_help.end();
+ ++p) {
+ out.append(p->first);
+ out.append(spaces, max - p->first.length());
+ out.append(p->second);
+ out.append("\n");
+ }
+ return true;
+ }
+};
+
bool AdminSocket::init(const std::string &path)
{
/* Set up things for the new thread */
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
m_path = path;
+
+ m_version_hook = new VersionHook;
+ register_command("version", m_version_hook, "get protocol version");
+ register_command("0", m_version_hook, "");
+ m_help_hook = new HelpHook(this);
+ register_command("help", m_help_hook, "list available commands");
+
create();
add_cleanup_file(m_path.c_str());
return true;
lderr(m_cct) << "AdminSocket::shutdown: failed to write "
"to thread shutdown pipe: error " << ret << dendl;
}
+
+ unregister_command("version");
+ unregister_command("0");
+ delete m_version_hook;
+ unregister_command("help");
+ delete m_help_hook;
+
remove_cleanup_file(m_path.c_str());
m_path.clear();
}
#include "common/config_obs.h"
#include "common/Thread.h"
+#include "common/Mutex.h"
#include <string>
+#include <map>
#include "include/buffer.h"
class AdminSocket;
class CephContext;
-#define CEPH_ADMIN_SOCK_VERSION 1U
+#define CEPH_ADMIN_SOCK_VERSION "2"
+
+class AdminSocketHook {
+public:
+ virtual bool call(std::string command, bufferlist& out) = 0;
+ virtual ~AdminSocketHook() {};
+};
class AdminSocket : public Thread, public md_config_obs_t
{
virtual void handle_conf_change(const md_config_t *conf,
const std::set <std::string> &changed);
+ /**
+ * register an admin socket command
+ *
+ * @param command command string
+ * @param hook implementaiton
+ * @param help help text. if empty, command will not be included in 'help' output.
+ *
+ * @return 0 for success, -EEXIST if command already registered.
+ */
+ int register_command(std::string command, AdminSocketHook *hook, std::string help);
+
+ /**
+ * unregister an admin socket command
+ *
+ * @param command command string
+ * @return 0 on succest, -ENOENT if command dne.
+ */
+ int unregister_command(std::string command);
+
private:
AdminSocket(const AdminSocket& rhs);
AdminSocket& operator=(const AdminSocket &rhs);
void *entry();
bool do_accept();
- bool handle_version_request(int connection_fd);
- bool handle_json_request(int connection_fd, bool schema);
-
CephContext *m_cct;
std::string m_path;
int m_sock_fd;
int m_shutdown_rd_fd;
int m_shutdown_wr_fd;
+ Mutex m_lock; // protects m_hooks, m_help
+ AdminSocketHook *m_version_hook, *m_help_hook;
+
+ std::map<std::string,AdminSocketHook*> m_hooks;
+ std::map<std::string,std::string> m_help;
+
friend class AdminSocketTest;
+ friend class HelpHook;
};
CephContext *_cct;
};
+
+// perfcounter hooks
+
+class PerfCountersHook : public AdminSocketHook {
+ PerfCountersCollection *m_coll;
+
+public:
+ PerfCountersHook(PerfCountersCollection *c) : m_coll(c) {}
+
+ bool call(std::string command, bufferlist& out) {
+ std::vector<char> v;
+ if (command == "perfcounters_dump" ||
+ command == "1")
+ m_coll->write_json_to_buf(v, false);
+ else if (command == "perfcounters_schema" ||
+ command == "2")
+ m_coll->write_json_to_buf(v, true);
+ else
+ assert(0 == "registered under wrong command?");
+ out.append(&v[0], v.size());
+ return true;
+ }
+};
+
+
CephContext::CephContext(uint32_t module_type_)
: _conf(new md_config_t()),
_doss(new DoutStreambuf <char, std::basic_string<char>::traits_type>()),
_admin_socket = new AdminSocket(this);
_conf->add_observer(_admin_socket);
_heartbeat_map = new HeartbeatMap(this);
+
+ _perf_counters_hook = new PerfCountersHook(_perf_counters_collection);
+ _admin_socket->register_command("perfcounters_dump", _perf_counters_hook, "dump perfcounters value");
+ _admin_socket->register_command("1", _perf_counters_hook, "");
+ _admin_socket->register_command("perfcounters_schema", _perf_counters_hook, "dump perfcounters schema");
+ _admin_socket->register_command("2", _perf_counters_hook, "");
}
CephContext::~CephContext()
{
join_service_thread();
+ _admin_socket->unregister_command("perfcounters_dump");
+ _admin_socket->unregister_command("1");
+ _admin_socket->unregister_command("perfcounters_schema");
+ _admin_socket->unregister_command("2");
+ delete _perf_counters_hook;
+
delete _heartbeat_map;
_conf->remove_observer(_admin_socket);
{
return _perf_counters_collection;
}
+
+