]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: refactor command handling
authorJohn Spray <john.spray@redhat.com>
Wed, 8 Jun 2016 13:24:47 +0000 (14:24 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 29 Sep 2016 16:26:52 +0000 (17:26 +0100)
Create a CommandTable structure for places
in Client, MgrClient, Objecter where we do
basically the same kind of thing for sending
and tracking MCommands.

Signed-off-by: John Spray <john.spray@redhat.com>
src/client/Client.cc
src/client/Client.h
src/common/CommandTable.h [new file with mode: 0644]

index f0a36e643eb5208cea6868c05af7c984f309649f..0da3fd9cd7ccd1d59de4f84ab2adf9b0a9d62b08 100644 (file)
@@ -2545,27 +2545,28 @@ void Client::handle_mds_map(MMDSMap* m)
 
   // Cancel any commands for missing or laggy GIDs
   std::list<ceph_tid_t> cancel_ops;
-  for (std::map<ceph_tid_t, CommandOp>::iterator i = commands.begin();
-       i != commands.end(); ++i) {
-    const mds_gid_t op_mds_gid = i->second.mds_gid;
+  auto commands = command_table.get_commands();
+  for (const auto &i : commands) {
+    const MDSCommandOp *op = i.second;
+    const mds_gid_t op_mds_gid = op->mds_gid;
     if (mdsmap->is_dne_gid(op_mds_gid) || mdsmap->is_laggy_gid(op_mds_gid)) {
-      ldout(cct, 1) << __func__ << ": cancelling command op " << i->first << dendl;
-      cancel_ops.push_back(i->first);
-      if (i->second.outs) {
+      ldout(cct, 1) << __func__ << ": cancelling command op " << i.first << dendl;
+      cancel_ops.push_back(i.first);
+      if (op->outs) {
         std::ostringstream ss;
         ss << "MDS " << op_mds_gid << " went away";
-        *(i->second.outs) = ss.str();
+        *(op->outs) = ss.str();
       }
-      i->second.con->mark_down();
-      if (i->second.on_finish) {
-        i->second.on_finish->complete(-ETIMEDOUT);
+      op->con->mark_down();
+      if (op->on_finish) {
+        op->on_finish->complete(-ETIMEDOUT);
       }
     }
   }
 
   for (std::list<ceph_tid_t>::iterator i = cancel_ops.begin();
        i != cancel_ops.end(); ++i) {
-    commands.erase(*i);
+    command_table.erase(*i);
   }
 
   // reset session
@@ -5498,31 +5499,28 @@ int Client::mds_command(
   // Send commands to targets
   C_GatherBuilder gather(cct, onfinish);
   for (const auto target_gid : non_laggy) {
-    ceph_tid_t tid = ++last_tid;
     const auto info = fsmap->get_info_gid(target_gid);
 
     // Open a connection to the target MDS
     entity_inst_t inst = info.get_inst();
     ConnectionRef conn = messenger->get_connection(inst);
 
-    // Generate CommandOp state
-    CommandOp op;
-    op.tid = tid;
-    op.on_finish = gather.new_sub();
-    op.outbl = outbl;
-    op.outs = outs;
-    op.mds_gid = target_gid;
-    op.con = conn;
-    commands[op.tid] = op;
+    // Generate MDSCommandOp state
+    MDSCommandOp *op = command_table.start_command();
+
+    op->on_finish = gather.new_sub();
+    op->cmd = cmd;
+    op->outbl = outbl;
+    op->outs = outs;
+    op->inbl = inbl;
+    op->mds_gid = target_gid;
+    op->con = conn;
 
     ldout(cct, 4) << __func__ << ": new command op to " << target_gid
-      << " tid=" << op.tid << cmd << dendl;
+      << " tid=" << op->tid << cmd << dendl;
 
     // Construct and send MCommand
-    MCommand *m = new MCommand(monclient->get_fsid());
-    m->cmd = cmd;
-    m->set_data(inbl);
-    m->set_tid(tid);
+    MCommand *m = op->get_message(monclient->get_fsid());
     conn->send_message(m);
   }
   gather.activate();
@@ -5536,25 +5534,26 @@ void Client::handle_command_reply(MCommandReply *m)
 
   ldout(cct, 10) << __func__ << ": tid=" << m->get_tid() << dendl;
 
-  map<ceph_tid_t, CommandOp>::iterator opiter = commands.find(tid);
-  if (opiter == commands.end()) {
+  MDSCommandOp *op = command_table.get_command(tid);
+  if (op == nullptr) {
     ldout(cct, 1) << __func__ << ": unknown tid " << tid << ", dropping" << dendl;
     m->put();
     return;
   }
 
-  CommandOp const &op = opiter->second;
-  if (op.outbl) {
-    op.outbl->claim(m->get_data());
+  if (op->outbl) {
+    op->outbl->claim(m->get_data());
   }
-  if (op.outs) {
-    *op.outs = m->rs;
+  if (op->outs) {
+    *op->outs = m->rs;
   }
 
-  if (op.on_finish) {
-    op.on_finish->complete(m->r);
+  if (op->on_finish) {
+    op->on_finish->complete(m->r);
   }
 
+  command_table.erase(tid);
+
   m->put();
 }
 
@@ -11720,8 +11719,7 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
   Mutex::Locker lock(client_lock);
   vinodeno_t vino = ll_get_vino(in);
   object_t oid = file_object_t(vino.ino, blockid);
-  int r = 0;
-  C_SaferCond cond;
+  C_SaferCond onfinish;
   bufferlist bl;
 
   objecter->read(oid,
@@ -11731,11 +11729,11 @@ int Client::ll_read_block(Inode *in, uint64_t blockid,
                 vino.snapid,
                 &bl,
                 CEPH_OSD_FLAG_READ,
-                &cond);
+                 &onfinish);
 
   client_lock.Unlock();
-  r = cond.wait();
-  client_lock.Lock(); // lock is going to unlock on exit.
+  int r = onfinish.wait();
+  client_lock.Lock();
 
   if (r >= 0) {
       bl.copy(0, bl.length(), buf);
index 4b365c1049ab29014d56db8800b95e8ae7d9de77..45f3b72fcaf622c977b00a28357ca68b516bfdf0 100644 (file)
@@ -42,6 +42,7 @@ using std::fstream;
 #include "common/Finisher.h"
 #include "common/compiler_extensions.h"
 #include "common/cmdparse.h"
+#include "common/CommandTable.h"
 
 #include "osdc/ObjectCacher.h"
 
@@ -83,14 +84,12 @@ enum {
 };
 
 
-struct CommandOp
+class MDSCommandOp : public CommandOp
 {
-  ConnectionRef con;
+  public:
   mds_gid_t     mds_gid;
-  ceph_tid_t    tid;
-  Context      *on_finish;
-  bufferlist   *outbl;
-  std::string  *outs;
+
+  MDSCommandOp(ceph_tid_t t) : CommandOp(t) {}
 };
 
 /* error code for ceph_fuse */
@@ -326,7 +325,7 @@ protected:
   FSMapUser *fsmap_user;
 
   // MDS command state
-  std::map<ceph_tid_t, CommandOp> commands;
+  CommandTable<MDSCommandOp> command_table;
   void handle_command_reply(MCommandReply *m);
   int fetch_fsmap(bool user);
   int resolve_mds(
diff --git a/src/common/CommandTable.h b/src/common/CommandTable.h
new file mode 100644 (file)
index 0000000..6be6757
--- /dev/null
@@ -0,0 +1,91 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef COMMAND_TABLE_H_
+#define COMMAND_TABLE_H_
+
+#include "messages/MCommand.h"
+
+class CommandOp
+{
+  public:
+  ConnectionRef con;
+  ceph_tid_t tid;
+
+  std::vector<std::string> cmd;
+  bufferlist    inbl;
+  Context      *on_finish;
+  bufferlist   *outbl;
+  std::string  *outs;
+
+  MCommand *get_message(const uuid_d &fsid) const
+  {
+    MCommand *m = new MCommand(fsid);
+    m->cmd = cmd;
+    m->set_data(inbl);
+    m->set_tid(tid);
+
+    return m;
+  }
+
+  CommandOp(const ceph_tid_t t) : tid(t) {}
+};
+
+template<typename T>
+class CommandTable
+{
+protected:
+  ceph_tid_t last_tid;
+  std::map<ceph_tid_t, T*> commands;
+
+public:
+
+  CommandTable()
+    : last_tid(0)
+  {}
+
+  T* start_command()
+  {
+    ceph_tid_t tid = last_tid++;
+    auto cmd = new T(tid);
+    commands[tid] = cmd;
+    
+    return cmd;
+  }
+
+  const std::map<ceph_tid_t, T*> &get_commands() const
+  {
+    return commands;
+  }
+
+  T* get_command(ceph_tid_t tid)
+  {
+    auto result = commands.find(tid);
+    if (result == commands.end()) {
+      return nullptr;
+    } else {
+      return result->second;
+    }
+  }
+
+  void erase(ceph_tid_t tid)
+  {
+    auto ptr = commands.at(tid);
+    delete ptr;
+    commands.erase(tid);
+  }
+};
+
+#endif
+