using std::vector;
+void do_status(CephToolCtx *ctx, bool shutdown = false);
+
static void usage()
{
cout << "usage: ceph [options] [commands]\n";
static void parse_cmd_args(vector<const char*> &args,
std::string *in_file, std::string *out_file,
ceph_tool_mode_t *mode, bool *concise,
- string *admin_socket, string *admin_socket_cmd)
+ string *admin_socket, string *admin_socket_cmd,
+ string *watch_level)
{
std::vector<const char*>::iterator i;
std::string val;
usage();
*admin_socket_cmd = *i;
} else if (ceph_argparse_flag(args, i, "-s", "--status", (char*)NULL)) {
- *mode = CEPH_TOOL_MODE_ONE_SHOT_OBSERVER;
+ *mode = CEPH_TOOL_MODE_STATUS;
} else if (ceph_argparse_flag(args, i, "-w", "--watch", (char*)NULL)) {
- *mode = CEPH_TOOL_MODE_OBSERVER;
+ *mode = CEPH_TOOL_MODE_WATCH;
+ } else if (ceph_argparse_flag(args, i, "--watch-debug", (char*) NULL)) {
+ *watch_level = "log-debug";
+ } else if (ceph_argparse_flag(args, i, "--watch-info", (char*) NULL)) {
+ *watch_level = "log-info";
+ } else if (ceph_argparse_flag(args, i, "--watch-sec", (char*) NULL)) {
+ *watch_level = "log-sec";
+ } else if (ceph_argparse_flag(args, i, "--watch-warn", (char*) NULL)) {
+ *watch_level = "log-warn";
+ } else if (ceph_argparse_flag(args, i, "--watch-error", (char*) NULL)) {
+ *watch_level = "log-error";
} else if (ceph_argparse_flag(args, i, "--concise", (char*)NULL)) {
*concise = true;
} else if (ceph_argparse_flag(args, i, "--verbose", (char*)NULL)) {
return r;
}
+
int main(int argc, const char **argv)
{
std::string in_file, out_file;
bool concise = true;
string admin_socket;
string admin_socket_cmd;
- parse_cmd_args(args, &in_file, &out_file, &mode, &concise, &admin_socket, &admin_socket_cmd);
+ string watch_level = "log-info";
+ parse_cmd_args(args, &in_file, &out_file, &mode, &concise,
+ &admin_socket, &admin_socket_cmd, &watch_level);
// daemon admin socket?
if (admin_socket.length()) {
bufferlist outbl;
int ret = 0;
switch (mode) {
- case CEPH_TOOL_MODE_ONE_SHOT_OBSERVER: // fall through
- case CEPH_TOOL_MODE_OBSERVER: {
+ case CEPH_TOOL_MODE_STATUS:
+ do_status(ctx, true);
+ break;
+ case CEPH_TOOL_MODE_WATCH: {
+ do_status(ctx);
ctx->lock.Lock();
- send_observe_requests(ctx);
+ ctx->dispatcher->subs.name = watch_level;
+ ctx->dispatcher->subs.last_known_version = 0;
+ ctx->mc.sub_want(watch_level, 0, 0);
+ ctx->mc.renew_subs();
ctx->lock.Unlock();
break;
}
#include "acconfig.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
+#include "messages/MLog.h"
#include "mon/MonClient.h"
#include "mon/MonMap.h"
#include "osd/OSDMap.h"
static set<int> registered, seen;
-version_t map_ver[PAXOS_NUM];
-
-static void handle_osd_map(CephToolCtx *ctx, MOSDMap *m)
-{
- epoch_t e = m->get_first();
- assert(m->maps.count(e));
- ctx->lock.Lock();
- delete osdmap;
- osdmap = new OSDMap;
- osdmap->decode(m->maps[e]);
- cmd_cond.Signal();
- ctx->lock.Unlock();
- m->put();
-}
-
-static void handle_observe(CephToolCtx *ctx, MMonObserve *observe)
-{
- dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
- << " registered" << dendl;
- ctx->lock.Lock();
- registered.insert(observe->machine_id);
- ctx->lock.Unlock();
- observe->put();
-}
-
-static void handle_notify(CephToolCtx *ctx, MMonObserveNotify *notify)
-{
- utime_t now = ceph_clock_now(g_ceph_context);
-
- dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id)
- << " v" << notify->ver
- << (notify->is_latest ? " (latest)" : "")
- << dendl;
-
- if (notify->fsid != ctx->mc.monmap.fsid) {
- dout(0) << notify->get_source_inst() << " notify fsid " << notify->fsid << " != "
- << ctx->mc.monmap.fsid << dendl;
- notify->put();
- return;
- }
-
- if (map_ver[notify->machine_id] >= notify->ver)
- return;
-
- switch (notify->machine_id) {
- case PAXOS_PGMAP:
- {
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- ctx->pgmap.decode(p);
- } else {
- PGMap::Incremental inc;
- inc.decode(p);
- ctx->pgmap.apply_incremental(inc);
- }
- *ctx->log << now << " pg " << ctx->pgmap << std::endl;
- ctx->updates |= PG_MON_UPDATE;
- break;
- }
-
- case PAXOS_MDSMAP:
- ctx->mdsmap.decode(notify->bl);
- *ctx->log << now << " mds " << ctx->mdsmap << std::endl;
- ctx->updates |= MDS_MON_UPDATE;
- break;
-
- case PAXOS_OSDMAP:
- {
- if (notify->is_latest) {
- ctx->osdmap.decode(notify->bl);
- } else {
- OSDMap::Incremental inc(notify->bl);
- ctx->osdmap.apply_incremental(inc);
- }
- *ctx->log << now << " osd " << ctx->osdmap << std::endl;
- }
- ctx->updates |= OSD_MON_UPDATE;
- break;
-
- case PAXOS_LOG:
- {
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- LogSummary summary;
- ::decode(summary, p);
- // show last log message
- if (!summary.tail.empty())
- *ctx->log << now << " log " << summary.tail.back() << std::endl;
- } else {
- LogEntry le;
- __u8 v;
- ::decode(v, p);
- while (!p.end()) {
- le.decode(p);
- *ctx->log << le << std::endl;
- }
- }
- break;
- }
-
- case PAXOS_AUTH:
- {
-#if 0
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- KeyServerData ctx;
- ::decode(ctx, p);
- *ctx->log << now << " auth " << std::endl;
- } else {
- while (!p.end()) {
- AuthMonitor::Incremental inc;
- inc.decode(p);
- *ctx->log << now << " auth " << inc.name.to_str() << std::endl;
- }
- }
-#endif
- /* ignoring auth incremental.. don't want to decode it */
- break;
- }
-
- case PAXOS_MONMAP:
- {
- ctx->mc.monmap.decode(notify->bl);
- *ctx->log << now << " mon " << ctx->mc.monmap << std::endl;
- }
- break;
-
- default:
- *ctx->log << now << " ignoring unknown machine id " << notify->machine_id << std::endl;
- }
-
- map_ver[notify->machine_id] = notify->ver;
-
- // have we seen them all?
- seen.insert(notify->machine_id);
- switch (ceph_tool_mode) {
- case CEPH_TOOL_MODE_ONE_SHOT_OBSERVER:
- if (seen.size() == PAXOS_NUM) {
- messenger->shutdown();
- }
- break;
- case CEPH_TOOL_MODE_GUI:
- ctx->gui_cond.Signal();
- break;
- default:
- // do nothing
- break;
- }
-
- notify->put();
-}
-
-class C_ObserverRefresh : public Context {
-public:
- bool newmon;
- C_ObserverRefresh(bool n, CephToolCtx *ctx_)
- : newmon(n),
- ctx(ctx_)
- {
- }
- void finish(int r) {
- send_observe_requests(ctx);
- }
-private:
- CephToolCtx *ctx;
-};
-
-void send_observe_requests(CephToolCtx *ctx)
-{
- dout(1) << "send_observe_requests " << dendl;
-
- for (int i=0; i<PAXOS_NUM; i++) {
- MMonObserve *m = new MMonObserve(ctx->mc.monmap.fsid, i, map_ver[i]);
- dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
- ctx->mc.send_mon_message(m);
- }
-
- registered.clear();
- float seconds = g_conf->paxos_observer_timeout/2;
- dout(1) << " refresh after " << seconds << " with same mon" << dendl;
- ctx->timer.add_event_after(seconds, new C_ObserverRefresh(false, ctx));
-}
-
static void handle_ack(CephToolCtx *ctx, MMonCommandAck *ack)
{
ctx->lock.Lock();
reply = true;
}
-class Admin : public Dispatcher {
-public:
- Admin(CephToolCtx *ctx_)
- : Dispatcher(g_ceph_context),
- ctx(ctx_)
- {
- }
-
- bool ms_dispatch(Message *m) {
- switch (m->get_type()) {
- case MSG_MON_COMMAND_ACK:
- handle_ack(ctx, (MMonCommandAck*)m);
- break;
- case MSG_COMMAND_REPLY:
- handle_ack(ctx, (MCommandReply*)m);
- break;
- case MSG_MON_OBSERVE_NOTIFY:
- handle_notify(ctx, (MMonObserveNotify *)m);
- break;
- case MSG_MON_OBSERVE:
- handle_observe(ctx, (MMonObserve *)m);
- break;
- case CEPH_MSG_MON_MAP:
- m->put();
- break;
- case CEPH_MSG_OSD_MAP:
- handle_osd_map(ctx, (MOSDMap *)m);
- break;
- default:
- return false;
- }
- return true;
- }
-
- void ms_handle_connect(Connection *con) {
- if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
- ctx->lock.Lock();
- if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
- send_observe_requests(ctx);
- }
- if (pending_cmd.size())
- send_command(ctx);
- ctx->lock.Unlock();
- }
- }
- bool ms_handle_reset(Connection *con) { return false; }
- void ms_handle_remote_reset(Connection *con) {}
-
- bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
- {
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
- *authorizer = ctx->mc.auth->build_authorizer(dest_type);
- return true;
- }
-
-private:
- CephToolCtx *ctx;
-};
-
int do_command(CephToolCtx *ctx,
vector<string>& cmd, bufferlist& bl, bufferlist& rbl)
{
return reply_rc;
}
+void do_status(CephToolCtx *ctx, bool shutdown) {
+ vector<string> cmd;
+ cmd.push_back("status");
+ bufferlist bl;
+
+ do_command(ctx, cmd, bl, bl);
+
+ if (shutdown)
+ messenger->shutdown();
+}
+
static const char *cli_prompt(EditLine *e)
{
return "ceph> ";
return 0;
}
+
+void Subscriptions::handle_log(MLog *m)
+{
+ dout(10) << __func__ << " received log msg ver " << m->version << dendl;
+
+ if (last_known_version >= m->version) {
+ dout(10) << __func__
+ << " we have already received ver " << m->version
+ << " (highest received: " << last_known_version << ") " << dendl;
+ return;
+ }
+ last_known_version = m->version;
+
+ std::deque<LogEntry>::iterator it = m->entries.begin();
+ for (; it != m->entries.end(); it++) {
+ LogEntry e = *it;
+ cout << e.stamp << " " << e.seq << " " << e.who
+ << " " << e.type << " " << e.msg << std::endl;
+ }
+
+ version_t v = last_known_version+1;
+ dout(10) << __func__ << " wanting " << name << " ver " << v << dendl;
+ ctx->mc.sub_want(name, v, 0);
+}
+
+bool Admin::ms_dispatch(Message *m) {
+ switch (m->get_type()) {
+ case MSG_MON_COMMAND_ACK:
+ handle_ack(ctx, (MMonCommandAck*)m);
+ break;
+ case MSG_COMMAND_REPLY:
+ handle_ack(ctx, (MCommandReply*)m);
+ break;
+ case CEPH_MSG_MON_MAP:
+ m->put();
+ break;
+ case MSG_LOG:
+ {
+ MLog *mlog = (MLog*) m;
+ subs.handle_log(mlog);
+ break;
+ }
+ default:
+ return false;
+ }
+ return true;
+}
+
+void Admin::ms_handle_connect(Connection *con) {
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ ctx->lock.Lock();
+ if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
+// send_observe_requests(ctx);
+ }
+ if (pending_cmd.size())
+ send_command(ctx);
+ ctx->lock.Unlock();
+ }
+}
+
+bool Admin::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new)
+{
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+ *authorizer = ctx->mc.auth->build_authorizer(dest_type);
+ return true;
+}
+
#ifndef CEPH_TOOLS_COMMON_DOT_H
#define CEPH_TOOLS_COMMON_DOT_H
+#include <iosfwd>
+#include <stdint.h>
+#include <map>
+
#include "common/Cond.h"
#include "common/Mutex.h"
#include "mon/MonClient.h"
#include "osd/OSDMap.h"
#include "common/Timer.h"
-#include <iosfwd>
-#include <stdint.h>
+
+#include "common/LogEntry.h"
+#include "mon/mon_types.h"
+#include "messages/MOSDMap.h"
+#include "messages/MLog.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MMonCommandAck.h"
+
#define OSD_MON_UPDATE (1<<0)
#define MDS_MON_UPDATE (1<<1)
#define EVERYTHING_UPDATE 0xffffffff
class CephContext;
+class CephToolCtx;
enum ceph_tool_mode_t {
CEPH_TOOL_MODE_CLI_INPUT = 0,
- CEPH_TOOL_MODE_OBSERVER = 1,
- CEPH_TOOL_MODE_ONE_SHOT_OBSERVER = 2,
+ CEPH_TOOL_MODE_WATCH = 1,
+ CEPH_TOOL_MODE_STATUS = 2,
CEPH_TOOL_MODE_GUI = 3
};
+struct Subscriptions {
+ CephToolCtx *ctx;
+ version_t last_known_version;
+ string name;
+
+ Subscriptions(CephToolCtx *c) : ctx(c) { }
+
+ void handle_log(MLog *m);
+};
+
+class Admin : public Dispatcher {
+private:
+ CephToolCtx *ctx;
+public:
+ Subscriptions subs;
+
+ Admin(CephToolCtx *ctx_)
+ : Dispatcher(g_ceph_context),
+ ctx(ctx_), subs(ctx_)
+ {
+ }
+
+ bool ms_dispatch(Message *m);
+ void ms_handle_connect(Connection *con);
+ bool ms_handle_reset(Connection *con) { return false; }
+ void ms_handle_remote_reset(Connection *con) {}
+
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
+};
+
+
// tool/ceph.cc
class CephToolCtx
{
bool concise;
- Dispatcher *dispatcher;
+ Admin *dispatcher;
CephToolCtx(CephContext *cct_, bool concise_) :
cct(cct_),
// tool/ceph.cc
int ceph_tool_do_cli(CephToolCtx *data);
int run_command(CephToolCtx *data, const char *line);
-void send_observe_requests(CephToolCtx *ctx);
CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise);
int do_command(CephToolCtx *ctx,
vector<string>& cmd, bufferlist& bl, bufferlist& rbl);