]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: add mds_command operation
authorJohn Spray <john.spray@redhat.com>
Thu, 18 Sep 2014 17:56:42 +0000 (18:56 +0100)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Oct 2014 10:58:19 +0000 (11:58 +0100)
akin to Objecter::osd_command

Signed-off-by: John Spray <john.spray@redhat.com>
src/client/Client.cc
src/client/Client.h

index b0092d8816eaf78202656baef1bbd86116d711ff..fa79829e8db7349a040b076f5f4520bfba2a6772 100644 (file)
@@ -47,6 +47,7 @@ using namespace std;
 #include "messages/MClientCapRelease.h"
 #include "messages/MClientLease.h"
 #include "messages/MClientSnap.h"
+#include "messages/MCommandReply.h"
 
 #include "messages/MGenericMessage.h"
 
@@ -54,7 +55,6 @@ using namespace std;
 
 #include "mon/MonClient.h"
 
-#include "mds/MDSMap.h"
 #include "osd/OSDMap.h"
 #include "mon/MonMap.h"
 
@@ -166,7 +166,8 @@ Client::Client(Messenger *m, MonClient *mc)
     objecter_finisher(m->cct),
     tick_event(NULL),
     monclient(mc), messenger(m), whoami(m->get_myname().num()),
-    initialized(false), mounted(false), unmounting(false),
+    initialized(false), authenticated(false),
+    mounted(false), unmounting(false),
     local_osd(-1), local_osd_epoch(0),
     unsafe_sync_write(0),
     client_lock("Client::client_lock")
@@ -2006,7 +2007,13 @@ bool Client::ms_dispatch(Message *m)
   case CEPH_MSG_CLIENT_LEASE:
     handle_lease(static_cast<MClientLease*>(m));
     break;
-
+  case MSG_COMMAND_REPLY:
+    if (m->get_source().type() == CEPH_ENTITY_TYPE_MDS) {
+      handle_command_reply(static_cast<MCommandReply*>(m));
+    } else {
+      return false;
+    }
+    break;
   default:
     return false;
   }
@@ -2045,6 +2052,31 @@ void Client::handle_mds_map(MMDSMap* m)
   mdsmap = new MDSMap;
   mdsmap->decode(m->get_encoded());
 
+  // Cancel any commands for missing or laggy GIDs
+  std::list<ceph_tid_t> cancel_ops;
+  for (std::map<ceph_tid_t, CommandOp>::iterator i = commands.begin();
+       i != commands.end(); ++i) {
+    const mds_gid_t op_mds_gid = i->second.mds_gid;
+    if (mdsmap->is_dne_gid(op_mds_gid) || mdsmap->is_laggy_gid(op_mds_gid)) {
+      ldout(cct, 1) << __func__ << ": cancelling command op " << i->first << dendl;
+      cancel_ops.push_back(i->first);
+      if (i->second.outs) {
+        std::ostringstream ss;
+        ss << "MDS " << op_mds_gid << " went away";
+        *(i->second.outs) = ss.str();
+      }
+      i->second.con->mark_down();
+      if (i->second.on_finish) {
+        i->second.on_finish->complete(-ETIMEDOUT);
+      }
+    }
+  }
+
+  for (std::list<ceph_tid_t>::iterator i = cancel_ops.begin();
+       i != cancel_ops.end(); ++i) {
+    commands.erase(*i);
+  }
+
   // reset session
   for (map<mds_rank_t,MetaSession*>::iterator p = mds_sessions.begin();
        p != mds_sessions.end();
@@ -4062,6 +4094,219 @@ inodeno_t Client::_get_inodeno(Inode *in)
   return in->ino;
 }
 
+
+/**
+ * Resolve an MDS spec to a list of MDS daemon GIDs.
+ *
+ * The spec is a string representing a GID, rank or name/id.  It may
+ * be * in which case it matches all GIDs.
+ *
+ * If no error is returned, the `targets` vector will be populated with at least
+ * one MDS.
+ */
+int Client::resolve_mds(
+    const std::string &mds_spec,
+    std::vector<mds_gid_t> *targets)
+{
+  std::string strtol_err;
+  long long rank_or_gid = strict_strtoll(mds_spec.c_str(), 10, &strtol_err);
+  if (strtol_err.empty()) {
+    // If it parses as an integer, it's GID or a rank
+    if (rank_or_gid >= 0 && rank_or_gid < MAX_MDS) {
+      const mds_rank_t mds_rank = mds_rank_t(rank_or_gid);
+
+      if (mdsmap->is_dne(mds_rank)) {
+        lderr(cct) << __func__ << ": MDS rank " << mds_rank << " does not exist" << dendl;
+        return -ENOENT;
+      }
+
+      if (!mdsmap->is_up(mds_rank)) {
+        lderr(cct) << __func__ << ": MDS rank " << mds_rank << " is not up" << dendl;
+        return -EAGAIN;
+      }
+
+      const mds_gid_t mds_gid = mdsmap->get_info(mds_rank).global_id;
+      ldout(cct, 10) << __func__ << ": resolved rank " << mds_rank << " to GID " << mds_gid << dendl;
+      targets->push_back(mds_gid);
+    } else {
+      const mds_gid_t mds_gid = mds_gid_t(rank_or_gid);
+      if (mdsmap->is_dne_gid(mds_gid)) {
+        lderr(cct) << __func__ << ": GID " << mds_gid << " not in MDS map" << dendl;
+        return -ENOENT;
+      } else {
+        ldout(cct, 10) << __func__ << ": validated GID " << mds_gid << dendl;
+        targets->push_back(mds_gid);
+      }
+    }
+  } else if (mds_spec == "*") {
+    // It is a wildcard: use all MDSs
+    const std::map<mds_gid_t, MDSMap::mds_info_t> &mds_info = mdsmap->get_mds_info();
+
+    if (mds_info.size() == 0) {
+      lderr(cct) << __func__ << ": * passed but no MDS daemons found" << dendl;
+      return -ENOENT;
+    }
+
+    for (std::map<mds_gid_t, MDSMap::mds_info_t>::const_iterator i = mds_info.begin();
+        i != mds_info.end(); ++i) {
+      targets->push_back(i->first);
+    }
+  } else {
+    // It did not parse as an integer, it is not a wildcard, it must be a name
+    const mds_gid_t mds_gid = mdsmap->find_mds_gid_by_name(mds_spec);
+    if (mds_gid == 0) {
+      lderr(cct) << "MDS ID '" << mds_spec << "' not found" << dendl;
+      return -ENOENT;
+    } else {
+      ldout(cct, 10) << __func__ << ": resolved ID '" << mds_spec << "' to GID " << mds_gid << dendl;
+      targets->push_back(mds_gid);
+    }
+  }
+
+  return 0;
+}
+
+
+/**
+ * Authenticate with mon and establish global ID
+ */
+int Client::authenticate()
+{
+  assert(client_lock.is_locked_by_me());
+
+  if (authenticated) {
+    return 0;
+  }
+
+  client_lock.Unlock();
+  int r = monclient->authenticate(cct->_conf->client_mount_timeout);
+  client_lock.Lock();
+  if (r < 0) {
+    return r;
+  }
+
+  whoami = monclient->get_global_id();
+  messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+  authenticated = true;
+
+  return 0;
+}
+
+
+/**
+ *
+ * @mds_spec one of ID, rank, GID, "*"
+ *
+ */
+int Client::mds_command(
+    const std::string &mds_spec,
+    const vector<string>& cmd,
+    const bufferlist& inbl,
+    bufferlist *outbl,
+    string *outs,
+    Context *onfinish)
+{
+  Mutex::Locker lock(client_lock);
+
+  assert(initialized);
+
+  int r;
+  r = authenticate();
+  if (r < 0) {
+    return r;
+  }
+
+  // Block until we have an MDSMap to resolve IDs
+  if (mdsmap->get_epoch() == 0) {
+    wait_on_list(waiting_for_mdsmap);
+  }
+
+  // Look up MDS target(s) of the command
+  std::vector<mds_gid_t> targets;
+  r = resolve_mds(mds_spec, &targets);
+  if (r < 0) {
+    return r;
+  }
+
+  // If daemons are laggy, we won't send them commands.  If all
+  // are laggy then we fail.
+  std::vector<mds_gid_t> non_laggy;
+  for (std::vector<mds_gid_t>::iterator target = targets.begin();
+      target != targets.end(); ++target) {
+    if (!mdsmap->is_laggy_gid(*target)) {
+      non_laggy.push_back(*target);
+    }
+  }
+  if (non_laggy.size() == 0) {
+    *outs = "All targeted MDS daemons are laggy";
+    return -ENOENT;
+  }
+
+  // Send commands to targets
+  C_GatherBuilder gather(cct, onfinish);
+  for (std::vector<mds_gid_t>::iterator target = non_laggy.begin();
+      target != non_laggy.end(); ++target) {
+    ceph_tid_t tid = ++last_tid;
+
+    // Open a connection to the target MDS
+    entity_inst_t inst = mdsmap->get_info_gid(*target).get_inst();
+    ConnectionRef conn = messenger->get_connection(inst);
+
+    // Generate CommandOp state
+    CommandOp op;
+    op.tid = tid;
+    op.on_finish = gather.new_sub();
+    op.outbl = outbl;
+    op.outs = outs;
+    op.mds_gid = *target;
+    op.con = conn;
+    commands[op.tid] = op;
+
+    ldout(cct, 4) << __func__ << ": new command op to " << *target
+      << " tid=" << op.tid << cmd << dendl;
+
+    // Construct and send MCommand
+    MCommand *m = new MCommand(monclient->get_fsid());
+    m->cmd = cmd;
+    m->set_data(inbl);
+    m->set_tid(tid);
+    conn->send_message(m);
+  }
+  gather.activate();
+
+  return 0;
+}
+
+void Client::handle_command_reply(MCommandReply *m)
+{
+  ceph_tid_t const tid = m->get_tid();
+
+  ldout(cct, 10) << __func__ << ": tid=" << m->get_tid() << dendl;
+
+  map<ceph_tid_t, CommandOp>::iterator opiter = commands.find(tid);
+  if (opiter == commands.end()) {
+    ldout(cct, 1) << __func__ << ": unknown tid " << tid << ", dropping" << dendl;
+    m->put();
+    return;
+  }
+
+  CommandOp const &op = opiter->second;
+  if (op.outbl) {
+    op.outbl->claim(m->get_data());
+  }
+  if (op.outs) {
+    *op.outs = m->rs;
+  }
+
+  op.con->mark_down();
+
+  if (op.on_finish) {
+    op.on_finish->complete(m->r);
+  }
+
+  m->put();
+}
+
 // -------------------
 // MOUNT
 
@@ -4074,14 +4319,10 @@ int Client::mount(const std::string &mount_root)
     return 0;
   }
 
-  client_lock.Unlock();
-  int r = monclient->authenticate(cct->_conf->client_mount_timeout);
-  client_lock.Lock();
-  if (r < 0)
+  int r = authenticate();
+  if (r < 0) {
     return r;
-  
-  whoami = monclient->get_global_id();
-  messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+  }
 
   mounted = true;
 
index 1e0bcc7f2de8ab170ab1a71d09840e0959462a97..eddf4c00a20f8921e6a14188783855aedce7e8db 100644 (file)
@@ -38,6 +38,7 @@ using std::fstream;
 //#include "barrier.h"
 
 #include "mds/mdstypes.h"
+#include "mds/MDSMap.h"
 
 #include "msg/Message.h"
 #include "msg/Dispatcher.h"
@@ -86,6 +87,16 @@ enum {
 };
 
 
+struct CommandOp
+{
+  ConnectionRef con;
+  mds_gid_t     mds_gid;
+  ceph_tid_t    tid;
+  Context      *on_finish;
+  bufferlist   *outbl;
+  std::string  *outs;
+};
+
 
 // ============================================
 // types for my local metadata cache
@@ -242,6 +253,13 @@ public:
   map<mds_rank_t, MetaSession*> mds_sessions;  // mds -> push seq
   list<Cond*> waiting_for_mdsmap;
 
+  // MDS command state
+  std::map<ceph_tid_t, CommandOp> commands;
+  void handle_command_reply(MCommandReply *m);
+  int resolve_mds(
+      const std::string &mds_spec,
+      std::vector<mds_gid_t> *targets);
+
   void get_session_metadata(std::map<std::string, std::string> *meta) const;
   bool have_open_session(mds_rank_t mds);
   void got_mds_push(MetaSession *s);
@@ -286,6 +304,7 @@ public:
   void handle_client_reply(MClientReply *reply);
 
   bool   initialized;
+  bool   authenticated;
   bool   mounted;
   bool   unmounting;
 
@@ -424,6 +443,7 @@ protected:
   void ms_handle_remote_reset(Connection *con);
   bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
 
+  int authenticate();
 
  public:
   void set_filer_flags(int flags);
@@ -670,6 +690,12 @@ public:
   int mount(const std::string &mount_root);
   void unmount();
 
+  int mds_command(
+    const std::string &mds_spec,
+    const std::vector<std::string>& cmd,
+    const bufferlist& inbl,
+    bufferlist *poutbl, std::string *prs, Context *onfinish);
+
   // these shoud (more or less) mirror the actual system calls.
   int statfs(const char *path, struct statvfs *stbuf);