]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephtool: Slight rework and pub/sub log support.
authorJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 17 May 2012 00:19:45 +0000 (17:19 -0700)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 17 May 2012 23:15:25 +0000 (16:15 -0700)
We reworked the code a bit to accommodate the introduction for the log
monitor's publish/subscribe mechanisms. With this patch we no longer
depend on the observer's, and use instead the much broader approach of
subscribing to events. In our case, we will subscribe to log levels.
If the '-w'/'--watch' flag is defined, the tool will be subscribed to the
'log-info' level by default, unless one of the following flags are defined
(in which case the level will be changed accordingly): '--watch-debug',
'--watch-info', '--watch-sec', '--watch-warn' and '--watch-error'.

Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
src/tools/ceph.cc
src/tools/common.cc
src/tools/common.h

index 99adc20a9ecda748ed88b25001f80bec86378229..5da84792fce498d9d7dffb21998ee31645b8d466 100644 (file)
@@ -37,6 +37,8 @@
 
 using std::vector;
 
+void do_status(CephToolCtx *ctx, bool shutdown = false);
+
 static void usage()
 {
   cout << "usage: ceph [options] [commands]\n";
@@ -58,7 +60,8 @@ static void usage()
 static void parse_cmd_args(vector<const char*> &args,
                std::string *in_file, std::string *out_file,
                           ceph_tool_mode_t *mode, bool *concise,
-                          string *admin_socket, string *admin_socket_cmd)
+                          string *admin_socket, string *admin_socket_cmd,
+                          string *watch_level)
 {
   std::vector<const char*>::iterator i;
   std::string val;
@@ -75,9 +78,19 @@ static void parse_cmd_args(vector<const char*> &args,
        usage();
       *admin_socket_cmd = *i;
     } else if (ceph_argparse_flag(args, i, "-s", "--status", (char*)NULL)) {
-      *mode = CEPH_TOOL_MODE_ONE_SHOT_OBSERVER;
+      *mode = CEPH_TOOL_MODE_STATUS;
     } else if (ceph_argparse_flag(args, i, "-w", "--watch", (char*)NULL)) {
-      *mode = CEPH_TOOL_MODE_OBSERVER;
+      *mode = CEPH_TOOL_MODE_WATCH;
+    } else if (ceph_argparse_flag(args, i, "--watch-debug", (char*) NULL)) {
+      *watch_level = "log-debug";
+    } else if (ceph_argparse_flag(args, i, "--watch-info", (char*) NULL)) {
+      *watch_level = "log-info";
+    } else if (ceph_argparse_flag(args, i, "--watch-sec", (char*) NULL)) {
+      *watch_level = "log-sec";
+    } else if (ceph_argparse_flag(args, i, "--watch-warn", (char*) NULL)) {
+      *watch_level = "log-warn";
+    } else if (ceph_argparse_flag(args, i, "--watch-error", (char*) NULL)) {
+      *watch_level = "log-error";
     } else if (ceph_argparse_flag(args, i, "--concise", (char*)NULL)) {
       *concise = true;
     } else if (ceph_argparse_flag(args, i, "--verbose", (char*)NULL)) {
@@ -179,6 +192,7 @@ int do_admin_socket(string path, string cmd)
   return r;
 }
 
+
 int main(int argc, const char **argv)
 {
   std::string in_file, out_file;
@@ -195,7 +209,9 @@ int main(int argc, const char **argv)
   bool concise = true;
   string admin_socket;
   string admin_socket_cmd;
-  parse_cmd_args(args, &in_file, &out_file, &mode, &concise, &admin_socket, &admin_socket_cmd);
+  string watch_level = "log-info";
+  parse_cmd_args(args, &in_file, &out_file, &mode, &concise, 
+                &admin_socket, &admin_socket_cmd, &watch_level);
 
   // daemon admin socket?
   if (admin_socket.length()) {
@@ -222,10 +238,16 @@ int main(int argc, const char **argv)
   bufferlist outbl;
   int ret = 0;
   switch (mode) {
-    case CEPH_TOOL_MODE_ONE_SHOT_OBSERVER: // fall through
-    case CEPH_TOOL_MODE_OBSERVER: {
+    case CEPH_TOOL_MODE_STATUS:
+      do_status(ctx, true);
+      break;
+    case CEPH_TOOL_MODE_WATCH: {
+      do_status(ctx);
       ctx->lock.Lock();
-      send_observe_requests(ctx);
+      ctx->dispatcher->subs.name = watch_level;
+      ctx->dispatcher->subs.last_known_version = 0;
+      ctx->mc.sub_want(watch_level, 0, 0);
+      ctx->mc.renew_subs();
       ctx->lock.Unlock();
       break;
     }
index e2055d35ac84d7c030e7566daa93f55c790f00e2..bb2ab399bdad649b7d868ee5bb484d07ffa2e187 100644 (file)
@@ -20,6 +20,7 @@ using namespace std;
 #include "acconfig.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
+#include "messages/MLog.h"
 #include "mon/MonClient.h"
 #include "mon/MonMap.h"
 #include "osd/OSDMap.h"
@@ -88,189 +89,6 @@ OSDMap *osdmap = 0;
 
 static set<int> registered, seen;
 
-version_t map_ver[PAXOS_NUM];
-
-static void handle_osd_map(CephToolCtx *ctx, MOSDMap *m)
-{
-  epoch_t e = m->get_first();
-  assert(m->maps.count(e));
-  ctx->lock.Lock();
-  delete osdmap;
-  osdmap = new OSDMap;
-  osdmap->decode(m->maps[e]);
-  cmd_cond.Signal();
-  ctx->lock.Unlock();
-  m->put();
-}
-
-static void handle_observe(CephToolCtx *ctx, MMonObserve *observe)
-{
-  dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
-         << " registered" << dendl;
-  ctx->lock.Lock();
-  registered.insert(observe->machine_id);  
-  ctx->lock.Unlock();
-  observe->put();
-}
-
-static void handle_notify(CephToolCtx *ctx, MMonObserveNotify *notify)
-{
-  utime_t now = ceph_clock_now(g_ceph_context);
-
-  dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id)
-         << " v" << notify->ver
-         << (notify->is_latest ? " (latest)" : "")
-         << dendl;
-  
-  if (notify->fsid != ctx->mc.monmap.fsid) {
-    dout(0) << notify->get_source_inst() << " notify fsid " << notify->fsid << " != "
-           << ctx->mc.monmap.fsid << dendl;
-    notify->put();
-    return;
-  }
-
-  if (map_ver[notify->machine_id] >= notify->ver)
-    return;
-
-  switch (notify->machine_id) {
-  case PAXOS_PGMAP:
-    {
-      bufferlist::iterator p = notify->bl.begin();
-      if (notify->is_latest) {
-       ctx->pgmap.decode(p);
-      } else {
-       PGMap::Incremental inc;
-       inc.decode(p);
-       ctx->pgmap.apply_incremental(inc);
-      }
-      *ctx->log << now << "    pg " << ctx->pgmap << std::endl;
-      ctx->updates |= PG_MON_UPDATE;
-      break;
-    }
-
-  case PAXOS_MDSMAP:
-    ctx->mdsmap.decode(notify->bl);
-    *ctx->log << now << "   mds " << ctx->mdsmap << std::endl;
-    ctx->updates |= MDS_MON_UPDATE;
-    break;
-
-  case PAXOS_OSDMAP:
-    {
-      if (notify->is_latest) {
-       ctx->osdmap.decode(notify->bl);
-      } else {
-       OSDMap::Incremental inc(notify->bl);
-       ctx->osdmap.apply_incremental(inc);
-      }
-      *ctx->log << now << "   osd " << ctx->osdmap << std::endl;
-    }
-    ctx->updates |= OSD_MON_UPDATE;
-    break;
-
-  case PAXOS_LOG:
-    {
-      bufferlist::iterator p = notify->bl.begin();
-      if (notify->is_latest) {
-       LogSummary summary;
-       ::decode(summary, p);
-       // show last log message
-       if (!summary.tail.empty())
-         *ctx->log << now << "   log " << summary.tail.back() << std::endl;
-      } else {
-       LogEntry le;
-       __u8 v;
-       ::decode(v, p);
-       while (!p.end()) {
-         le.decode(p);
-         *ctx->log << le << std::endl;
-       }
-      }
-      break;
-    }
-
-  case PAXOS_AUTH:
-    {
-#if 0
-      bufferlist::iterator p = notify->bl.begin();
-      if (notify->is_latest) {
-       KeyServerData ctx;
-       ::decode(ctx, p);
-       *ctx->log << now << "   auth " << std::endl;
-      } else {
-       while (!p.end()) {
-         AuthMonitor::Incremental inc;
-          inc.decode(p);
-         *ctx->log << now << "   auth " << inc.name.to_str() << std::endl;
-       }
-      }
-#endif
-      /* ignoring auth incremental.. don't want to decode it */
-      break;
-    }
-
-  case PAXOS_MONMAP:
-    {
-      ctx->mc.monmap.decode(notify->bl);
-      *ctx->log << now << "   mon " << ctx->mc.monmap << std::endl;
-    }
-    break;
-
-  default:
-    *ctx->log << now << "  ignoring unknown machine id " << notify->machine_id << std::endl;
-  }
-
-  map_ver[notify->machine_id] = notify->ver;
-
-  // have we seen them all?
-  seen.insert(notify->machine_id);
-  switch (ceph_tool_mode) {
-    case CEPH_TOOL_MODE_ONE_SHOT_OBSERVER:
-      if (seen.size() == PAXOS_NUM) {
-       messenger->shutdown();
-      }
-      break;
-    case CEPH_TOOL_MODE_GUI:
-      ctx->gui_cond.Signal();
-      break;
-    default:
-      // do nothing
-      break;
-  }
-
-  notify->put();
-}
-
-class C_ObserverRefresh : public Context {
-public:
-  bool newmon;
-  C_ObserverRefresh(bool n, CephToolCtx *ctx_) 
-    : newmon(n),
-      ctx(ctx_)
-  {
-  }
-  void finish(int r) {
-    send_observe_requests(ctx);
-  }
-private:
-  CephToolCtx *ctx;
-};
-
-void send_observe_requests(CephToolCtx *ctx)
-{
-  dout(1) << "send_observe_requests " << dendl;
-
-  for (int i=0; i<PAXOS_NUM; i++) {
-    MMonObserve *m = new MMonObserve(ctx->mc.monmap.fsid, i, map_ver[i]);
-    dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
-    ctx->mc.send_mon_message(m);
-  }
-
-  registered.clear();
-  float seconds = g_conf->paxos_observer_timeout/2;
-  dout(1) << " refresh after " << seconds << " with same mon" << dendl;
-  ctx->timer.add_event_after(seconds, new C_ObserverRefresh(false, ctx));
-}
-
 static void handle_ack(CephToolCtx *ctx, MMonCommandAck *ack)
 {
   ctx->lock.Lock();
@@ -385,66 +203,6 @@ static void send_command(CephToolCtx *ctx)
   reply = true;
 }
 
-class Admin : public Dispatcher {
-public:
-  Admin(CephToolCtx *ctx_)
-    : Dispatcher(g_ceph_context),
-      ctx(ctx_)
-  {
-  }
-
-  bool ms_dispatch(Message *m) {
-    switch (m->get_type()) {
-    case MSG_MON_COMMAND_ACK:
-      handle_ack(ctx, (MMonCommandAck*)m);
-      break;
-    case MSG_COMMAND_REPLY:
-      handle_ack(ctx, (MCommandReply*)m);
-      break;
-    case MSG_MON_OBSERVE_NOTIFY:
-      handle_notify(ctx, (MMonObserveNotify *)m);
-      break;
-    case MSG_MON_OBSERVE:
-      handle_observe(ctx, (MMonObserve *)m);
-      break;
-    case CEPH_MSG_MON_MAP:
-      m->put();
-      break;
-    case CEPH_MSG_OSD_MAP:
-      handle_osd_map(ctx, (MOSDMap *)m);
-      break;
-    default:
-      return false;
-    }
-    return true;
-  }
-
-  void ms_handle_connect(Connection *con) {
-    if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
-      ctx->lock.Lock();
-      if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
-       send_observe_requests(ctx);
-      }
-      if (pending_cmd.size())
-       send_command(ctx);
-      ctx->lock.Unlock();
-    }
-  }
-  bool ms_handle_reset(Connection *con) { return false; }
-  void ms_handle_remote_reset(Connection *con) {}
-
-  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
-  {
-    if (dest_type == CEPH_ENTITY_TYPE_MON)
-      return true;
-    *authorizer = ctx->mc.auth->build_authorizer(dest_type);
-    return true;
-  }
-
-private:
-  CephToolCtx *ctx;
-};
-
 int do_command(CephToolCtx *ctx,
               vector<string>& cmd, bufferlist& bl, bufferlist& rbl)
 {
@@ -497,6 +255,17 @@ int do_command(CephToolCtx *ctx,
   return reply_rc;
 }
 
+void do_status(CephToolCtx *ctx, bool shutdown) {
+  vector<string> cmd;
+  cmd.push_back("status");
+  bufferlist bl;
+
+  do_command(ctx, cmd, bl, bl);
+
+  if (shutdown)
+    messenger->shutdown();
+}
+
 static const char *cli_prompt(EditLine *e)
 {
   return "ceph> ";
@@ -705,3 +474,72 @@ int ceph_tool_common_shutdown(CephToolCtx *ctx)
   
   return 0;
 }
+
+void Subscriptions::handle_log(MLog *m) 
+{
+  dout(10) << __func__ << " received log msg ver " << m->version << dendl;
+
+  if (last_known_version >= m->version) {
+    dout(10) << __func__ 
+           << " we have already received ver " << m->version 
+           << " (highest received: " << last_known_version << ") " << dendl;
+    return;
+  }
+  last_known_version = m->version;
+
+  std::deque<LogEntry>::iterator it = m->entries.begin();
+  for (; it != m->entries.end(); it++) {
+    LogEntry e = *it;
+    cout << e.stamp << " " << e.seq << " " << e.who 
+        << " " << e.type << " " << e.msg << std::endl;
+  }
+  version_t v = last_known_version+1;
+  dout(10) << __func__ << " wanting " << name << " ver " << v << dendl;
+  ctx->mc.sub_want(name, v, 0);
+}
+
+bool Admin::ms_dispatch(Message *m) {
+  switch (m->get_type()) {
+  case MSG_MON_COMMAND_ACK:
+    handle_ack(ctx, (MMonCommandAck*)m);
+    break;
+  case MSG_COMMAND_REPLY:
+    handle_ack(ctx, (MCommandReply*)m);
+    break;
+  case CEPH_MSG_MON_MAP:
+    m->put();
+    break;
+  case MSG_LOG:
+   {
+    MLog *mlog = (MLog*) m;
+    subs.handle_log(mlog);
+    break;
+   }
+  default:
+    return false;
+  }
+  return true;
+}
+
+void Admin::ms_handle_connect(Connection *con) {
+  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+    ctx->lock.Lock();
+    if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
+//     send_observe_requests(ctx);
+    }
+    if (pending_cmd.size())
+      send_command(ctx);
+    ctx->lock.Unlock();
+  }
+}
+
+bool Admin::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, 
+                             bool force_new)
+{
+  if (dest_type == CEPH_ENTITY_TYPE_MON)
+    return true;
+  *authorizer = ctx->mc.auth->build_authorizer(dest_type);
+  return true;
+}
+
index e2f15494cead4e36e8ade5444243f6e5cb51304f..a0bad8a30a369094430a5aa10532ce5467ad738e 100644 (file)
@@ -1,6 +1,10 @@
 #ifndef CEPH_TOOLS_COMMON_DOT_H
 #define CEPH_TOOLS_COMMON_DOT_H
 
+#include <iosfwd>
+#include <stdint.h>
+#include <map>
+
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "mon/MonClient.h"
@@ -9,8 +13,15 @@
 #include "osd/OSDMap.h"
 #include "common/Timer.h"
 
-#include <iosfwd>
-#include <stdint.h>
+
+#include "common/LogEntry.h"
+#include "mon/mon_types.h"
+#include "messages/MOSDMap.h"
+#include "messages/MLog.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MMonCommandAck.h"
+
 
 #define OSD_MON_UPDATE     (1<<0)
 #define MDS_MON_UPDATE     (1<<1)
 #define EVERYTHING_UPDATE   0xffffffff
 
 class CephContext;
+class CephToolCtx;
 
 enum ceph_tool_mode_t {
   CEPH_TOOL_MODE_CLI_INPUT = 0,
-  CEPH_TOOL_MODE_OBSERVER = 1,
-  CEPH_TOOL_MODE_ONE_SHOT_OBSERVER = 2,
+  CEPH_TOOL_MODE_WATCH = 1,
+  CEPH_TOOL_MODE_STATUS = 2,
   CEPH_TOOL_MODE_GUI = 3
 };
 
+struct Subscriptions {
+  CephToolCtx *ctx;
+  version_t last_known_version;
+  string name;
+
+  Subscriptions(CephToolCtx *c) : ctx(c) { }
+
+  void handle_log(MLog *m);
+};
+
+class Admin : public Dispatcher {
+private:
+  CephToolCtx *ctx;
+public:
+  Subscriptions subs;
+
+  Admin(CephToolCtx *ctx_)
+    : Dispatcher(g_ceph_context),
+      ctx(ctx_), subs(ctx_)
+  {
+  }
+
+  bool ms_dispatch(Message *m);
+  void ms_handle_connect(Connection *con);
+  bool ms_handle_reset(Connection *con) { return false; }
+  void ms_handle_remote_reset(Connection *con) {}
+
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
+};
+
+
 // tool/ceph.cc
 class CephToolCtx
 {
@@ -56,7 +99,7 @@ public:
 
   bool concise;
 
-  Dispatcher *dispatcher;
+  Admin *dispatcher;
 
   CephToolCtx(CephContext *cct_, bool concise_) :
     cct(cct_),
@@ -78,7 +121,6 @@ public:
 // tool/ceph.cc
 int ceph_tool_do_cli(CephToolCtx *data);
 int run_command(CephToolCtx *data, const char *line);
-void send_observe_requests(CephToolCtx *ctx);
 CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise);
 int do_command(CephToolCtx *ctx,
               vector<string>& cmd, bufferlist& bl, bufferlist& rbl);