]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: answering all pending getattr/lookups targeting the same inode in one go
authorxuxuehan <xuxuehan@ceph10v.ops.corp.qihoo.net>
Thu, 27 Sep 2018 08:42:43 +0000 (16:42 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Tue, 20 Aug 2019 02:10:05 +0000 (10:10 +0800)
As for now, all getattr/lookup requests get processes one by one, which
is kind of wasting CPU resources. Actually, for those getattr/lookup
requests for the same inode, only one of them needs to be processes, the
results applies all others.

Resolves: http://tracker.ceph.com/issues/36608
Signed-off-by: Xuehan Xu <xuxuehan@360.cn>
src/mds/CDentry.h
src/mds/CInode.h
src/mds/MDCache.cc
src/mds/Migrator.cc
src/mds/Mutation.cc
src/mds/Mutation.h
src/mds/Server.cc
src/mds/Server.h

index 56aa58c561a9ca2f9ff933e2990f6f1b84608f25..f8e408ef4689473306cce654c337c0354eceb0e5 100644 (file)
@@ -41,8 +41,7 @@ class CDentry;
 class LogSegment;
 
 class Session;
-
-
+class BatchOp;
 
 // define an ordering
 bool operator<(const CDentry& l, const CDentry& r);
@@ -354,6 +353,7 @@ public:
   LocalLock versionlock; // FIXME referenced containers not in mempool
 
   mempool::mds_co::map<client_t,ClientLease*> client_lease_map;
+  std::map<int, BatchOp*> batch_ops;
 
 
 protected:
index 4fba93d466bfe376de819410c73b00a74379ce86..e8c4b8a1542856687f7538b6b6c6f9bf614f24ae 100644 (file)
@@ -58,6 +58,26 @@ struct cinode_lock_info_t {
   int wr_caps;
 };
 
+class BatchOp {
+public:
+  virtual void add_request(const MDRequestRef& mdr) = 0;
+  virtual void set_request(const MDRequestRef& mdr) = 0;
+  virtual void forward_all(mds_rank_t target) = 0;
+  virtual void respond_all(int r) = 0;
+
+  void finish(int r) {
+    respond_all(r);
+    delete this;
+  }
+
+  void forward_requests(mds_rank_t target) {
+    forward_all(target);
+    delete this;
+  }
+
+  virtual ~BatchOp() {}
+};
+
 /**
  * Base class for CInode, containing the backing store data and
  * serialization methods.  This exists so that we can read and
@@ -348,6 +368,8 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter<CIno
     ceph_assert(num_exporting_dirs == 0);
   }
 
+  std::map<int, BatchOp*> batch_ops;
+
   std::string_view pin_name(int p) const override;
 
   ostream& print_db_line_prefix(ostream& out) override;
index de4bef951b17ed6a8d3cf9c5916a0f0c66a12c5b..e703a1eeb6f94ecc2a527503f2484da6bcced7ff 100644 (file)
@@ -9312,7 +9312,44 @@ void MDCache::request_forward(MDRequestRef& mdr, mds_rank_t who, int port)
   if (mdr->client_request && mdr->client_request->get_source().is_client()) {
     dout(7) << "request_forward " << *mdr << " to mds." << who << " req "
             << *mdr->client_request << dendl;
-    mds->forward_message_mds(mdr->release_client_request(), who);
+    if (mdr->is_batch_head) {
+      BatchOp* bop = nullptr;
+      int mask = mdr->client_request->head.args.getattr.mask;
+
+      switch (mdr->client_request->get_op()) {
+       case CEPH_MDS_OP_GETATTR:
+         {
+           CInode* in = mdr->in[0];
+           if (in) {
+             auto it = in->batch_ops.find(mask);
+             if (it != in->batch_ops.end()) {
+               bop = it->second;
+             }
+           }
+           break;
+         }
+       case CEPH_MDS_OP_LOOKUP:
+         {
+           CDentry* dn = mdr->dn[0].back();
+           if (dn) {
+             auto it = dn->batch_ops.find(mask);
+             if (it != dn->batch_ops.end()) {
+               bop = it->second;
+             }
+           }
+           break;
+         }
+       default:
+         ceph_abort();
+      }
+      if (bop) {
+       dout(20) << __func__ << ": forward other batch ops(GETATTR/LOOKUP) to "
+         << who << ":" << port <<", too. " << *mdr << dendl;
+       bop->forward_requests(who);
+      }
+    } else {
+      mds->forward_message_mds(mdr->release_client_request(), who);
+    }
     if (mds->logger) mds->logger->inc(l_mds_forward);
   } else if (mdr->internal_op >= 0) {
     dout(10) << "request_forward on internal op; cancelling" << dendl;
index 0551f37b134ae3be01808e3b6e0a8d6e399799d7..ace31a3094631517e9674bef7929b045e7aa00e9 100644 (file)
@@ -1776,7 +1776,7 @@ uint64_t Migrator::encode_export_dir(bufferlist& exportbl,
     exportbl.append("I", 1);    // inode dentry
     
     encode_export_inode(in, exportbl, exported_client_map, exported_client_metadata_map);  // encode, and (update state for) export
-    
+
     // directory?
     auto&& dfs = in->get_dirfrags();
     for (const auto& t : dfs) {
index 1cfaeadab6a8082a0b576f204057d7d5a3b9332d..ec26e4df1cd7e76d12771139ab817b02cfdbb567 100644 (file)
@@ -357,6 +357,14 @@ bool MDRequestImpl::is_queued_for_replay() const
   return client_request ? client_request->is_queued_for_replay() : false;
 }
 
+bool MDRequestImpl::is_batch_op()
+{
+  return (client_request->get_op() == CEPH_MDS_OP_LOOKUP &&
+      client_request->get_filepath().depth() == 1) ||
+    (client_request->get_op() == CEPH_MDS_OP_GETATTR &&
+     client_request->get_filepath().depth() == 0);
+}
+
 cref_t<MClientRequest> MDRequestImpl::release_client_request()
 {
   msg_lock.lock();
@@ -378,7 +386,7 @@ void MDRequestImpl::reset_slave_request(const cref_t<MMDSSlaveRequest>& req)
 
 void MDRequestImpl::print(ostream &out) const
 {
-  out << "request(" << reqid;
+  out << "request(" << reqid << " nref=" << nref;
   //if (request) out << " " << *request;
   if (is_slave()) out << " slave_to mds." << slave_to_mds;
   if (client_request) out << " cr=" << client_request;
index 076be8b0b5f3d7175bbcc58a2d30c4ee0dfbf385..b1b22470b83358a974862a0edadba7f354df5a3e 100644 (file)
@@ -28,6 +28,7 @@
 #include "common/TrackedOp.h"
 #include "messages/MClientRequest.h"
 #include "messages/MMDSSlaveRequest.h"
+#include "messages/MClientReply.h"
 
 class LogSegment;
 class Capability;
@@ -281,6 +282,8 @@ struct MDRequestImpl : public MutationImpl {
   // indicates how may retries of request have been made
   int retry;
 
+  bool is_batch_head = false;
+
   // indicator for vxattr osdmap update
   bool waited_for_osdmap;
 
@@ -398,6 +401,7 @@ struct MDRequestImpl : public MutationImpl {
   void set_filepath(const filepath& fp);
   void set_filepath2(const filepath& fp);
   bool is_queued_for_replay() const;
+  bool is_batch_op();
 
   void print(ostream &out) const override;
   void dump(Formatter *f) const override;
@@ -407,6 +411,7 @@ struct MDRequestImpl : public MutationImpl {
 
   // TrackedOp stuff
   typedef boost::intrusive_ptr<MDRequestImpl> Ref;
+  std::vector<Ref> batch_reqs;
 protected:
   void _dump(Formatter *f) const override;
   void _dump_op_descriptor_unlocked(ostream& stream) const override;
index ebaa1fa6aa017c439f846276e3e20b2ffc22ce5d..df03426f1d02e16c3f6068d9f6fbcd8cee1105a8 100644 (file)
@@ -77,6 +77,43 @@ class ServerContext : public MDSContext {
   }
 };
 
+class Batch_Getattr_Lookup : public BatchOp {
+protected:
+  Server* server;
+  MDRequestRef mdr;
+  MDCache* mdcache;
+  int res = 0;
+public:
+  Batch_Getattr_Lookup(Server* s, MDRequestRef& r, MDCache* mdc) : server(s), mdr(r), mdcache(mdc) {}
+  void add_request(const MDRequestRef& m) override {
+    mdr->batch_reqs.push_back(m);
+  }
+  void set_request(const MDRequestRef& m) override {
+    mdr = m;
+  }
+  void forward_all(mds_rank_t t) override {
+    mdcache->mds->forward_message_mds(mdr->release_client_request(), t);
+    mdr->set_mds_stamp(ceph_clock_now());
+    for (auto m : mdr->batch_reqs) {
+      if (!m->killed)
+       mdcache->request_forward(m, t);
+    }
+    mdr->batch_reqs.clear();
+  }
+  void respond_all(int r) {
+    mdr->set_mds_stamp(ceph_clock_now());
+    for (auto m : mdr->batch_reqs) {
+      if (!m->killed) {
+       m->tracei = mdr->tracei;
+       m->tracedn = mdr->tracedn;
+       server->respond_to_request(m, r);
+      }
+    }
+    mdr->batch_reqs.clear();
+    server->reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+  }
+};
+
 class ServerLogContext : public MDSLogContextBase {
 protected:
   Server *server;
@@ -1750,7 +1787,28 @@ void Server::submit_mdlog_entry(LogEvent *le, MDSLogContextBase *fin, MDRequestR
 void Server::respond_to_request(MDRequestRef& mdr, int r)
 {
   if (mdr->client_request) {
-    reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+    if (mdr->is_batch_op() && mdr->is_batch_head) {
+      int mask = mdr->client_request->head.args.getattr.mask;
+
+      BatchOp *fin;
+      if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+       dout(20) << __func__ << ": respond other getattr ops. " << *mdr << dendl;
+       fin = mdr->in[0]->batch_ops[mask];
+      } else {
+       dout(20) << __func__ << ": respond other lookup ops. " << *mdr << dendl;
+       fin = mdr->dn[0].back()->batch_ops[mask];
+      }
+
+      fin->finish(r);
+
+      if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+       mdr->in[0]->batch_ops.erase(mask);
+      } else {
+       mdr->dn[0].back()->batch_ops.erase(mask);
+      }
+    } else {
+     reply_client_request(mdr, make_message<MClientReply>(*mdr->client_request, r));
+    }
   } else if (mdr->internal_op > -1) {
     dout(10) << "respond_to_request on internal request " << mdr << dendl;
     if (!mdr->internal_op_finish)
@@ -2267,6 +2325,22 @@ void Server::handle_osd_map()
     });
 }
 
+void Server::clear_batch_ops(const MDRequestRef& mdr)
+{
+  int mask = mdr->client_request->head.args.getattr.mask;
+  if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR && mdr->in[0]) {
+    auto it = mdr->in[0]->batch_ops.find(mask);
+    auto bocp = it->second;
+    mdr->in[0]->batch_ops.erase(it);
+    delete bocp;
+  } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP && mdr->dn[0].size()) {
+    auto it = mdr->dn[0].back()->batch_ops.find(mask);
+    auto bocp = it->second;
+    mdr->dn[0].back()->batch_ops.erase(it);
+    delete bocp;
+  }
+}
+
 void Server::dispatch_client_request(MDRequestRef& mdr)
 {
   // we shouldn't be waiting on anyone.
@@ -2274,7 +2348,44 @@ void Server::dispatch_client_request(MDRequestRef& mdr)
 
   if (mdr->killed) {
     dout(10) << "request " << *mdr << " was killed" << dendl;
-    return;
+    //if the mdr is a "batch_op" and it has followers, pick a follower as
+    //the new "head of the batch ops" and go on processing the new one.
+    if (mdr->is_batch_op() && mdr->is_batch_head ) {
+      if (!mdr->batch_reqs.empty()) {
+       MDRequestRef new_batch_head;
+       for (auto itr = mdr->batch_reqs.cbegin(); itr != mdr->batch_reqs.cend();) {
+         auto req = *itr;
+         itr = mdr->batch_reqs.erase(itr);
+         if (!req->killed) {
+           new_batch_head = req;
+           break;
+         }
+       }
+
+       if (!new_batch_head) {
+         clear_batch_ops(mdr);
+         return;
+       }
+
+       new_batch_head->batch_reqs = std::move(mdr->batch_reqs);
+
+       mdr = new_batch_head;
+       mdr->is_batch_head = true;
+       int mask = mdr->client_request->head.args.getattr.mask;
+       if (mdr->client_request->get_op() == CEPH_MDS_OP_GETATTR) {
+         auto fin = mdr->in[0]->batch_ops[mask];
+         fin->set_request(new_batch_head);
+       } else if (mdr->client_request->get_op() == CEPH_MDS_OP_LOOKUP) {
+         auto fin = mdr->dn[0].back()->batch_ops[mask];
+         fin->set_request(new_batch_head);
+       }
+      } else {
+       clear_batch_ops(mdr);
+       return;
+      }
+    } else {
+      return;
+    }
   } else if (mdr->aborted) {
     mdr->aborted = false;
     mdcache->request_kill(mdr);
@@ -3545,6 +3656,28 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
                                    !is_lookup);
   if (!ref) return;
 
+  mdr->getattr_caps = mask;
+
+  if (!mdr->is_batch_head && mdr->is_batch_op()) {
+    if (!is_lookup) {
+      if (ref->batch_ops.count(mask)) {
+       dout(20) << __func__ << ": GETATTR op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+       ref->batch_ops[mask]->add_request(mdr);
+       return;
+      } else
+       ref->batch_ops[mask] = new Batch_Getattr_Lookup(this, mdr, mdcache);
+    } else {
+      CDentry* dn = mdr->dn[0].back();
+      if (dn->batch_ops.count(mask)) {
+       dout(20) << __func__ << ": LOOKUP op, wait for previous same getattr ops to respond. " << *mdr << dendl;
+       dn->batch_ops[mask]->add_request(mdr);
+       return;
+      } else
+       dn->batch_ops[mask] = new Batch_Getattr_Lookup(this, mdr, mdcache);
+    }
+  }
+  mdr->is_batch_head = true;
+
   /*
    * if client currently holds the EXCL cap on a field, do not rdlock
    * it; client's stat() will result in valid info if _either_ EXCL
index a5296a0a430f848ea9dccf1b4058fc22953d57d9..3b72d97caf51eda54814cbe219111d083ffcecdc 100644 (file)
@@ -110,6 +110,7 @@ private:
   friend class MDSContinuation;
   friend class ServerContext;
   friend class ServerLogContext;
+  friend class Batch_Getattr_Lookup;
 
 public:
   bool terminating_sessions;
@@ -348,6 +349,7 @@ public:
 private:
   void reply_client_request(MDRequestRef& mdr, const ref_t<MClientReply> &reply);
   void flush_session(Session *session, MDSGatherBuilder *gather);
+  void clear_batch_ops(const MDRequestRef& mdr);
 
   DecayCounter recall_throttle;
   time last_recall_state;