]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add maximum fragment size constraint 9789/head
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 17 Jun 2016 15:53:32 +0000 (11:53 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 22 Jun 2016 20:41:47 +0000 (16:41 -0400)
This commit adds a new config option

    mds_bal_fragment_size_max = 10000*10

which is an order of magnitude larger than mds_bal_split_size.

This limit prevents a fragment from getting too large which results in large
omap directories.

Right now the limit is enforced only in the RPC paths and in stray directory
entry creation.

Fixes http://tracker.ceph.com/issues/16164

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/common/config_opts.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Server.cc
src/mds/Server.h

index 9df991ee6f370bbfd2701fa58dcff240f18e98a0..ea2397f4dcde2eb9853c036855f0efe925b71775 100644 (file)
@@ -487,6 +487,7 @@ OPTION(mds_bal_merge_rd, OPT_FLOAT, 1000)
 OPTION(mds_bal_merge_wr, OPT_FLOAT, 1000)
 OPTION(mds_bal_interval, OPT_INT, 10)           // seconds
 OPTION(mds_bal_fragment_interval, OPT_INT, 5)      // seconds
+OPTION(mds_bal_fragment_size_max, OPT_INT, 10000*10) // order of magnitude higher than split size
 OPTION(mds_bal_idle_threshold, OPT_FLOAT, 0)
 OPTION(mds_bal_max, OPT_INT, -1)
 OPTION(mds_bal_max_until, OPT_INT, -1)
index dbda0c681935a1b20eb385ceb5fb4a1954fdde8a..170cdf6eff5b2625ee967b1fe4a8e4300b5ac90e 100644 (file)
@@ -718,7 +718,7 @@ void MDCache::open_foreign_mdsdir(inodeno_t ino, MDSInternalContextBase *fin)
   discover_base_ino(ino, fin, mds_rank_t(ino & (MAX_MDS-1)));
 }
 
-CDentry *MDCache::get_or_create_stray_dentry(CInode *in)
+CDir *MDCache::get_stray_dir(CInode *in)
 {
   string straydname;
   in->name_stray_dentry(straydname);
@@ -728,6 +728,14 @@ CDentry *MDCache::get_or_create_stray_dentry(CInode *in)
   frag_t fg = strayi->pick_dirfrag(straydname);
   CDir *straydir = strayi->get_dirfrag(fg);
   assert(straydir);
+  return straydir;
+}
+
+CDentry *MDCache::get_or_create_stray_dentry(CInode *in)
+{
+  CDir *straydir = get_stray_dir(in);
+  string straydname;
+  in->name_stray_dentry(straydname);
   CDentry *straydn = straydir->lookup(straydname);
   if (!straydn) {
     straydn = straydir->add_null_dentry(straydname);
index ff3115e62270126acd33c22e6630d4b8d4a63829..df4085e199695823687e8111c0ca4882f04c8043 100644 (file)
@@ -829,6 +829,7 @@ public:
                                   version_t dpv, MDSInternalContextBase *fin);
 
   void open_foreign_mdsdir(inodeno_t ino, MDSInternalContextBase *c);
+  CDir *get_stray_dir(CInode *in);
   CDentry *get_or_create_stray_dentry(CInode *in);
 
   MDSInternalContextBase *_get_waiter(MDRequestRef& mdr, Message *req, MDSInternalContextBase *fin);
index d804a8ac68c24d0db023244faa9e7ead5ded4bb8..1f625deb4914ce3023e5bd6f799098f1cf31e437 100644 (file)
@@ -2155,6 +2155,23 @@ bool Server::check_access(MDRequestRef& mdr, CInode *in, unsigned mask)
   return true;
 }
 
+/**
+ * check whether fragment has reached maximum size
+ *
+ */
+bool Server::check_fragment_space(MDRequestRef &mdr, CDir *in)
+{
+  const auto size = in->get_frag_size();
+  if (size >= g_conf->mds_bal_fragment_size_max) {
+    dout(10) << "fragment " << *in << " size exceeds " << g_conf->mds_bal_fragment_size_max << " (ENOSPC)" << dendl;
+    respond_to_request(mdr, -ENOSPC);
+    return false;
+  }
+
+  return true;
+}
+
+
 /** validate_dentry_dir
  *
  * verify that the dir exists and would own the dname.
@@ -2239,15 +2256,20 @@ CDentry* Server::prepare_stray_dentry(MDRequestRef& mdr, CInode *in)
 {
   CDentry *straydn = mdr->straydn;
   if (straydn) {
-    string name;
-    in->name_stray_dentry(name);
-    if (straydn->get_name() == name)
+    string straydname;
+    in->name_stray_dentry(straydname);
+    if (straydn->get_name() == straydname)
       return straydn;
 
     assert(!mdr->done_locking);
     mdr->unpin(straydn);
   }
 
+  CDir *straydir = mdcache->get_stray_dir(in);
+
+  if (!check_fragment_space(mdr, straydir))
+    return NULL;
+
   straydn = mdcache->get_or_create_stray_dentry(in);
   mdr->straydn = straydn;
   mdr->pin(straydn);
@@ -3186,7 +3208,8 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     return;
   }
 
-  CInode *diri = dn->get_dir()->get_inode();
+  CDir *dir = dn->get_dir();
+  CInode *diri = dir->get_inode();
   rdlocks.insert(&diri->authlock);
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -3194,6 +3217,9 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   if (!check_access(mdr, diri, access))
     return;
 
+  if (!check_fragment_space(mdr, dir))
+    return;
+
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
 
   if (!dnl->is_null()) {
@@ -4601,6 +4627,9 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
   if (!check_access(mdr, diri, MAY_WRITE))
     return;
 
+  if (!check_fragment_space(mdr, dn->get_dir()))
+    return;
+
   unsigned mode = req->head.args.mknod.mode;
   if ((mode & S_IFMT) == 0)
     mode |= S_IFREG;
@@ -4684,7 +4713,8 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
     respond_to_request(mdr, -EROFS);
     return;
   }
-  CInode *diri = dn->get_dir()->get_inode();
+  CDir *dir = dn->get_dir();
+  CInode *diri = dir->get_inode();
   rdlocks.insert(&diri->authlock);
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -4693,6 +4723,9 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   if (!check_access(mdr, diri, MAY_WRITE))
     return;
 
+  if (!check_fragment_space(mdr, dir))
+    return;
+
   // new inode
   SnapRealm *realm = dn->get_dir()->inode->find_snaprealm();
   snapid_t follows = realm->get_newest_seq();
@@ -4764,7 +4797,8 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
     respond_to_request(mdr, -EROFS);
     return;
   }
-  CInode *diri = dn->get_dir()->get_inode();
+  CDir *dir = dn->get_dir();
+  CInode *diri = dir->get_inode();
   rdlocks.insert(&diri->authlock);
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -4772,6 +4806,9 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   if (!check_access(mdr, diri, MAY_WRITE))
    return;
 
+  if (!check_fragment_space(mdr, dir))
+    return;
+
   unsigned mode = S_IFLNK | 0777;
   CInode *newi = prepare_new_inode(mdr, dn->get_dir(), inodeno_t(req->head.ino), mode);
   assert(newi);
@@ -4845,6 +4882,9 @@ void Server::handle_client_link(MDRequestRef& mdr)
   if (!check_access(mdr, dir->get_inode(), MAY_WRITE))
     return;
 
+  if (!check_fragment_space(mdr, dir))
+    return;
+
   // go!
   assert(g_conf->mds_kill_link_at != 1);
 
@@ -5416,6 +5456,8 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   CDentry *straydn = NULL;
   if (dnl->is_primary()) {
     straydn = prepare_stray_dentry(mdr, dnl->get_inode());
+    if (!straydn)
+      return;
     dout(10) << " straydn is " << *straydn << dendl;
   } else if (mdr->straydn) {
     mdr->unpin(mdr->straydn);
@@ -6196,6 +6238,8 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   CDentry *straydn = NULL;
   if (destdnl->is_primary() && !linkmerge) {
     straydn = prepare_stray_dentry(mdr, destdnl->get_inode());
+    if (!straydn)
+      return;
     dout(10) << " straydn is " << *straydn << dendl;
   } else if (mdr->straydn) {
     mdr->unpin(mdr->straydn);
@@ -6306,6 +6350,9 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   if (!check_access(mdr, destdn->get_dir()->get_inode(), MAY_WRITE))
     return;
 
+  if (!check_fragment_space(mdr, destdn->get_dir()))
+    return;
+
   if (!check_access(mdr, srci, MAY_WRITE))
     return;
 
index 66aa6b9e4449de9efdbfe15103fa9c6d2f6e5c22..0e871031cd2f001bbf64894b655ef8444d051804 100644 (file)
@@ -134,6 +134,7 @@ public:
   void handle_slave_auth_pin_ack(MDRequestRef& mdr, MMDSSlaveRequest *ack);
 
   // some helpers
+  bool check_fragment_space(MDRequestRef& mdr, CDir *in);
   bool check_access(MDRequestRef& mdr, CInode *in, unsigned mask);
   bool _check_access(Session *session, CInode *in, unsigned mask, int caller_uid, int caller_gid, int setattr_uid, int setattr_gid);
   CDir *validate_dentry_dir(MDRequestRef& mdr, CInode *diri, const string& dname);