]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephtool: ability to send commands directly to osds
authorSage Weil <sage@newdream.net>
Wed, 12 Oct 2011 17:38:23 +0000 (10:38 -0700)
committerSage Weil <sage@newdream.net>
Sat, 15 Oct 2011 03:43:23 +0000 (20:43 -0700)
This makes commands beginning with 'tell <target>' magic in that they go
to the given target instead of to the monitor.  This is slightly odd, but
I think it gives the most natural interface for the user, with the tool
Doing The Right Thing for you.  E.g.,

 ceph tell <someone> something      (direct to some daemon)
 ceph do something                  (goes to monitor to do X)

Signed-off-by: Sage Weil <sage@newdream.net>
src/tools/common.cc

index ebd781e627904b5751b50c6c7c6254ec456b32bb..3a0ace022e74902fd18834cbd7084faf309b9891 100644 (file)
@@ -22,6 +22,7 @@ using namespace std;
 #include "messages/MMonCommandAck.h"
 #include "mon/MonClient.h"
 #include "mon/MonMap.h"
+#include "osd/OSDMap.h"
 #include "msg/SimpleMessenger.h"
 #include "tools/common.h"
 
@@ -52,6 +53,9 @@ static SimpleMessenger *messenger = 0;
 static Tokenizer *tok;
 
 // sync command
+bool pending_tell;  // is tell, vs monitor command
+uint64_t pending_tid = 0;
+EntityName pending_target;
 vector<string> pending_cmd;
 bufferlist pending_bl;
 bool reply;
@@ -61,7 +65,7 @@ bufferlist reply_bl;
 entity_inst_t reply_from;
 Context *resend_event = 0;
 
-
+OSDMap *osdmap = 0;
 
 // observe (push)
 #include "mon/PGMap.h"
@@ -73,12 +77,28 @@ Context *resend_event = 0;
 
 #include "messages/MMonObserve.h"
 #include "messages/MMonObserveNotify.h"
+#include "messages/MOSDMap.h"
 
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
 
 static set<int> registered, seen;
 
 version_t map_ver[PAXOS_NUM];
 
+static void handle_osd_map(CephToolCtx *ctx, MOSDMap *m)
+{
+  epoch_t e = m->get_first();
+  assert(m->maps.count(e));
+  ctx->lock.Lock();
+  delete osdmap;
+  osdmap = new OSDMap;
+  osdmap->decode(m->maps[e]);
+  cmd_cond.Signal();
+  ctx->lock.Unlock();
+  m->put();
+}
+
 static void handle_observe(CephToolCtx *ctx, MMonObserve *observe)
 {
   dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
@@ -264,16 +284,76 @@ static void handle_ack(CephToolCtx *ctx, MMonCommandAck *ack)
   ack->put();
 }
 
+static void handle_ack(CephToolCtx *ctx, MCommandReply *ack)
+{
+  ctx->lock.Lock();
+  if (ack->get_tid() == pending_tid) {
+    reply = true;
+    reply_from = ack->get_source_inst();
+    reply_rs = ack->rs;
+    reply_rc = ack->r;
+    reply_bl = ack->get_data();
+    cmd_cond.Signal();
+    if (resend_event) {
+      ctx->timer.cancel_event(resend_event);
+      resend_event = 0;
+    }
+  }
+  ctx->lock.Unlock();
+  ack->put();
+}
+
 static void send_command(CephToolCtx *ctx)
 {
-  version_t last_seen_version = 0;
-  MMonCommand *m = new MMonCommand(ctx->mc.monmap.fsid, last_seen_version);
+  if (!pending_tell) {
+    version_t last_seen_version = 0;
+    MMonCommand *m = new MMonCommand(ctx->mc.monmap.fsid, last_seen_version);
+    m->cmd = pending_cmd;
+    m->set_data(pending_bl);
+
+    if (!ctx->concise)
+      *ctx->log << ceph_clock_now(g_ceph_context) << " mon" << " <- " << pending_cmd << std::endl;
+
+    ctx->mc.send_mon_message(m);
+    return;
+  }
+
+  if (!ctx->concise)
+    *ctx->log << ceph_clock_now(g_ceph_context) << " " << pending_target << " <- " << pending_cmd << std::endl;
+
+  MCommand *m = new MCommand(ctx->mc.monmap.fsid);
   m->cmd = pending_cmd;
   m->set_data(pending_bl);
+  m->set_tid(++pending_tid);
+  
+  if (pending_target.get_type() == (int)entity_name_t::TYPE_OSD) {
+    if (!osdmap) {
+      ctx->mc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
+      ctx->mc.renew_subs();
+      while (!osdmap)
+       cmd_cond.Wait(ctx->lock);
+    }
 
-  if (!ctx->concise)
-    *ctx->log << ceph_clock_now(g_ceph_context) << " mon" << " <- " << pending_cmd << std::endl;
-  ctx->mc.send_mon_message(m);
+    const char *start = pending_target.get_id().c_str();
+    char *end;
+    int n = strtoll(start, &end, 10);
+    if (end <= start) {
+      reply_rc = -EINVAL;
+      reply = true;
+      return;
+    }
+    
+    if (!osdmap->is_up(n)) {
+      reply_rc = -ENOENT;
+      reply = true;
+    }
+
+    messenger->send_message(m, osdmap->get_inst(n));
+    return;
+  }
+
+  reply_rc = -EINVAL;
+  reply = true;
 }
 
 class Admin : public Dispatcher {
@@ -289,6 +369,9 @@ public:
     case MSG_MON_COMMAND_ACK:
       handle_ack(ctx, (MMonCommandAck*)m);
       break;
+    case MSG_COMMAND_REPLY:
+      handle_ack(ctx, (MCommandReply*)m);
+      break;
     case MSG_MON_OBSERVE_NOTIFY:
       handle_notify(ctx, (MMonObserveNotify *)m);
       break;
@@ -298,6 +381,9 @@ public:
     case CEPH_MSG_MON_MAP:
       m->put();
       break;
+    case CEPH_MSG_OSD_MAP:
+      handle_osd_map(ctx, (MOSDMap *)m);
+      break;
     default:
       return false;
     }
@@ -318,6 +404,14 @@ public:
   bool ms_handle_reset(Connection *con) { return false; }
   void ms_handle_remote_reset(Connection *con) {}
 
+  bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
+  {
+    if (dest_type == CEPH_ENTITY_TYPE_MON)
+      return true;
+    *authorizer = ctx->mc.auth->build_authorizer(dest_type);
+    return true;
+  }
+
 private:
   CephToolCtx *ctx;
 };
@@ -327,10 +421,25 @@ int do_command(CephToolCtx *ctx,
 {
   Mutex::Locker l(ctx->lock);
 
+  pending_target = EntityName();
   pending_cmd = cmd;
   pending_bl = bl;
+  pending_tell = false;
   reply = false;
   
+  if (cmd.size() > 0 && cmd[0] == "tell") {
+    if (cmd.size() == 1) {
+      cerr << "no tell target specified" << std::endl;
+      return -EINVAL;
+    }
+    if (!pending_target.from_str(cmd[1])) {
+      cerr << "tell target '" << cmd[1] << "' not a valid entity name" << std::endl;
+      return -EINVAL;
+    }
+    pending_cmd.erase(pending_cmd.begin(), pending_cmd.begin() + 2);
+    pending_tell = true;
+  }
+
   send_command(ctx);
 
   while (!reply)
@@ -516,6 +625,9 @@ CephToolCtx* ceph_tool_common_init(ceph_tool_mode_t mode, bool concise)
   ctx->mc.set_messenger(messenger);
   ctx->mc.init();
 
+  // in case we 'tell ...'
+  ctx->mc.set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
+
   if (ctx->mc.authenticate() < 0) {
     derr << "unable to authenticate as " << g_conf->name << dendl;
     ceph_tool_messenger_shutdown();