]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds: move 'traverse to auth' logic into MDCache::path_traverse
authorYan, Zheng <zyan@redhat.com>
Thu, 12 Sep 2019 01:31:48 +0000 (09:31 +0800)
committerYan, Zheng <zyan@redhat.com>
Mon, 11 Nov 2019 02:58:24 +0000 (10:58 +0800)
simplfy code and reduce duplication

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Server.cc
src/mds/Server.h

index 7a722b19ab6596036ee8e749a2468063be58b31d..2012f75af7508d597582b103fed90ca1fbdcfb97 100644 (file)
@@ -8056,6 +8056,7 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
   bool forward = !discover;
   bool last_xlocked = (flags & MDS_TRAVERSE_LAST_XLOCKED);
   bool want_dentry = (flags & MDS_TRAVERSE_WANT_DENTRY);
+  bool want_auth = (flags & MDS_TRAVERSE_WANT_AUTH);
 
   if (forward)
     ceph_assert(mdr);  // forward requires a request
@@ -8181,6 +8182,19 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
     }
     */
 
+    if (want_auth && want_dentry && depth == path.depth() - 1) {
+      if (curdir->is_ambiguous_auth()) {
+       dout(10) << "waiting for single auth on " << *curdir << dendl;
+       curdir->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
+       return 1;
+      }
+      if (!curdir->is_auth()) {
+       dout(10) << "fw to auth for " << *curdir << dendl;
+       request_forward(mdr, curdir->authority().first);
+       return 2;
+      }
+    }
+
     // Before doing dirfrag->dn lookup, compare with DamageTable's
     // record of which dentries were unreadable
     if (mds->damage_table.is_dentry_damaged(curdir, path[depth], snapid)) {
@@ -8242,7 +8256,7 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
                             (last_xlocked && depth == path.depth() - 1));
          if (mds->logger) mds->logger->inc(l_mds_traverse_remote_ino);
           return 1;
-        }        
+        }
       }
 
       cur = in;
@@ -8355,6 +8369,19 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
 
     ceph_abort();  // i shouldn't get here
   }
+
+  if (want_auth && !want_dentry) {
+    if (cur->is_ambiguous_auth()) {
+      dout(10) << "waiting for single auth on " << *cur << dendl;
+      cur->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
+      return 1;
+    }
+    if (!cur->is_auth()) {
+      dout(10) << "fw to auth for " << *cur << dendl;
+      request_forward(mdr, cur->authority().first);
+      return 2;
+    }
+  }
   
   // success.
   if (mds->logger) mds->logger->inc(l_mds_traverse_hit);
index 0e3f781d5f315c5818212000e6dabca7ad7733c8..0027a7e18eef907c7599c759f1c3c71d9dcdd8f4 100644 (file)
@@ -114,6 +114,8 @@ enum {
 static const int MDS_TRAVERSE_DISCOVER         = (1 << 0);
 static const int MDS_TRAVERSE_LAST_XLOCKED     = (1 << 1);
 static const int MDS_TRAVERSE_WANT_DENTRY      = (1 << 2);
+static const int MDS_TRAVERSE_WANT_AUTH                = (1 << 3);
+
 
 // flags for predirty_journal_parents()
 static const int PREDIRTY_PRIMARY = 1; // primary dn, adjust nested accounting
@@ -774,6 +776,8 @@ class MDCache {
    * dentry is encountered.
    * MDS_TRAVERSE_WANT_DENTRY: Caller wants tail dentry. Add a null dentry if
    * tail dentry does not exist. return 0 even tail dentry is null.
+   * MDS_TRAVERSE_WANT_AUTH: Always forward request to auth MDS of target inode
+   * or auth MDS of tail dentry (MDS_TRAVERSE_WANT_DENTRY is set).
    *
    * @param pdnvec Data return parameter -- on success, contains a
    * vector of dentries. On failure, is either empty or contains the
index 4012a13b2bd86b3fb696a72c80c847cba7edab5b..edaf03791a5427cdca9e1d7671913c342b4e704d 100644 (file)
@@ -3093,87 +3093,6 @@ bool Server::check_fragment_space(MDRequestRef &mdr, CDir *in)
   return true;
 }
 
-
-/** validate_dentry_dir
- *
- * verify that the dir exists and would own the dname.
- * do not check if the dentry exists.
- */
-CDir *Server::validate_dentry_dir(MDRequestRef& mdr, CInode *diri, std::string_view dname)
-{
-  // make sure parent is a dir?
-  if (!diri->is_dir()) {
-    dout(7) << "validate_dentry_dir: not a dir" << dendl;
-    respond_to_request(mdr, -ENOTDIR);
-    return NULL;
-  }
-
-  // which dirfrag?
-  frag_t fg = diri->pick_dirfrag(dname);
-  CDir *dir = try_open_auth_dirfrag(diri, fg, mdr);
-  if (!dir)
-    return 0;
-
-  // frozen?
-  if (dir->is_frozen()) {
-    dout(7) << "dir is frozen " << *dir << dendl;
-    dir->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
-    return NULL;
-  }
-
-  return dir;
-}
-
-
-/** prepare_null_dentry
- * prepare a null (or existing) dentry in given dir. 
- * wait for any dn lock.
- */
-CDentry* Server::prepare_null_dentry(MDRequestRef& mdr, CDir *dir, std::string_view dname, bool okexist)
-{
-  dout(10) << "prepare_null_dentry " << dname << " in " << *dir << dendl;
-  ceph_assert(dir->is_auth());
-
-  client_t client = mdr->get_client();
-
-  // does it already exist?
-  CDentry *dn = dir->lookup(dname);
-  if (dn) {
-    /*
-    if (dn->lock.is_xlocked_by_other(mdr)) {
-      dout(10) << "waiting on xlocked dentry " << *dn << dendl;
-      dn->lock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
-      return 0;
-    }
-    */
-    if (!dn->get_linkage(client, mdr)->is_null()) {
-      // name already exists
-      dout(10) << "dentry " << dname << " exists in " << *dir << dendl;
-      if (!okexist) {
-        respond_to_request(mdr, -EEXIST);
-        return 0;
-      }
-    } else {
-      snapid_t next_snap = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
-      dn->first = std::max(dn->first, next_snap);
-    }
-    return dn;
-  }
-
-  // make sure dir is complete
-  if (!dir->is_complete() && (!dir->has_bloom() || dir->is_in_bloom(dname))) {
-    dout(7) << " incomplete dir contents for " << *dir << ", fetching" << dendl;
-    dir->fetch(new C_MDS_RetryRequest(mdcache, mdr));
-    return 0;
-  }
-  
-  // create
-  dn = dir->add_null_dentry(dname, mdcache->get_global_snaprealm()->get_newest_seq() + 1);
-  dn->mark_new();
-  dout(10) << "prepare_null_dentry added " << *dn << dendl;
-  return dn;
-}
-
 CDentry* Server::prepare_stray_dentry(MDRequestRef& mdr, CInode *in)
 {
   CDentry *straydn = mdr->straydn;
@@ -3371,43 +3290,6 @@ private:
   MDRequestRef mdr;
 };
 
-CDir *Server::traverse_to_auth_dir(MDRequestRef& mdr, vector<CDentry*> &trace, filepath refpath)
-{
-  // figure parent dir vs dname
-  if (refpath.depth() == 0) {
-    dout(7) << "can't do that to root" << dendl;
-    respond_to_request(mdr, -EINVAL);
-    return 0;
-  }
-  string dname = refpath.last_dentry();
-  refpath.pop_dentry();
-  
-  dout(10) << "traverse_to_auth_dir dirpath " << refpath << " dname " << dname << dendl;
-
-  // traverse to parent dir
-  CInode *diri;
-  CF_MDS_MDRContextFactory cf(mdcache, mdr);
-  int r = mdcache->path_traverse(mdr, cf, refpath, 0, &trace, &diri);
-  if (r > 0) return 0; // delayed
-  if (r < 0) {
-    if (r == -ESTALE) {
-      dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
-      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
-      return 0;
-    }
-    respond_to_request(mdr, r);
-    return 0;
-  }
-
-  // is it an auth dir?
-  CDir *dir = validate_dentry_dir(mdr, diri, dname);
-  if (!dir)
-    return 0; // forwarded or waiting for freeze
-
-  dout(10) << "traverse_to_auth_dir " << *dir << dendl;
-  return dir;
-}
-
 /* If this returns null, the request has been handled
  * as appropriate: forwarded on, or the client's been replied to */
 CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
@@ -3424,11 +3306,17 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
   if (mdr->done_locking)
     return mdr->in[n];
 
+  if (!no_want_auth && refpath.is_last_snap())
+    want_auth = true;
+
   // traverse
   CF_MDS_MDRContextFactory cf(mdcache, mdr);
-  int r = mdcache->path_traverse(mdr, cf, refpath, 0, &mdr->dn[n], &mdr->in[n]);
+  int flags = 0;
+  if (want_auth)
+    flags |= MDS_TRAVERSE_WANT_AUTH;
+  int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[n], &mdr->in[n]);
   if (r > 0)
-    return NULL; // delayed
+    return nullptr; // delayed
   if (r < 0) {  // error
     if (r == -ENOENT && n == 0 && !mdr->dn[n].empty()) {
       if (!no_lookup) {
@@ -3448,22 +3336,7 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
   CInode *ref = mdr->in[n];
   dout(10) << "ref is " << *ref << dendl;
 
-  // fw to inode auth?
-  if (mdr->snapid != CEPH_NOSNAP && !no_want_auth)
-    want_auth = true;
-
   if (want_auth) {
-    if (ref->is_ambiguous_auth()) {
-      dout(10) << "waiting for single auth on " << *ref << dendl;
-      ref->add_waiter(CInode::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
-      return 0;
-    }
-    if (!ref->is_auth()) {
-      dout(10) << "fw to auth for " << *ref << dendl;
-      mdcache->request_forward(mdr, ref->authority().first);
-      return 0;
-    }
-
     // auth_pin?
     //   do NOT proceed if freezing, as cap release may defer in that case, and
     //   we could deadlock when we try to lock @ref.
@@ -3513,68 +3386,65 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
                                          MutationImpl::LockOpVec& lov,
-                                         bool okexist, bool mustexist, bool alwaysxlock,
+                                         bool okexist, bool alwaysxlock,
                                          file_layout_t **layout)
 {
   const filepath& refpath = n ? mdr->get_filepath2() : mdr->get_filepath();
 
   dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
-
-  client_t client = mdr->get_client();
-
   if (mdr->done_locking)
     return mdr->dn[n].back();
 
-  CDir *dir = traverse_to_auth_dir(mdr, mdr->dn[n], refpath);
-  if (!dir) return 0;
+  CF_MDS_MDRContextFactory cf(mdcache, mdr);
+  int flags = MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_WANT_AUTH;
+  int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[n]);
+  if (r > 0)
+    return nullptr; // delayed
+  if (r < 0) {
+    if (r == -ESTALE) {
+      dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
+      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+      return nullptr;
+    }
+    respond_to_request(mdr, r);
+    return nullptr;
+  }
 
+  CDentry *dn = mdr->dn[0].back();
+  CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
+
   if (!mdr->reqid.name.is_mds()) {
     if (diri->is_system() && !diri->is_root()) {
       respond_to_request(mdr, -EROFS);
       return 0;
     }
   }
+
   if (!diri->is_base() && diri->get_projected_parent_dir()->inode->is_stray()) {
     respond_to_request(mdr, -ENOENT);
     return 0;
   }
 
-  // make a null dentry?
-  std::string_view dname = refpath.last_dentry();
-  CDentry *dn;
-  if (mustexist) {
-    dn = dir->lookup(dname);
-
-    // make sure dir is complete
-    if (!dn && !dir->is_complete() &&
-        (!dir->has_bloom() || dir->is_in_bloom(dname))) {
-      dout(7) << " incomplete dir contents for " << *dir << ", fetching" << dendl;
-      dir->fetch(new C_MDS_RetryRequest(mdcache, mdr));
-      return 0;
-    }
+  client_t client = mdr->get_client();
+  if (dn && !dn->lock.can_read(client) && dn->lock.get_xlock_by() != mdr) {
+    dout(10) << "waiting on xlocked dentry " << *dn << dendl;
+    dn->lock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
+    return 0;
+  }
 
-    // readable?
-    if (dn && !dn->lock.can_read(client) && dn->lock.get_xlock_by() != mdr) {
-      dout(10) << "waiting on xlocked dentry " << *dn << dendl;
-      dn->lock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
+  CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
+  if (!dnl->is_null()) {
+    // name already exists
+    dout(10) << "dentry " << dn->get_name() << " exists in " << *dir << dendl;
+    if (!okexist) {
+      respond_to_request(mdr, -EEXIST);
       return 0;
     }
-      
-    // exists?
-    if (!dn || dn->get_linkage(client, mdr)->is_null()) {
-      dout(7) << "dentry " << dname << " dne in " << *dir << dendl;
-      respond_to_request(mdr, -ENOENT);
-      return 0;
-    }    
   } else {
-    dn = prepare_null_dentry(mdr, dir, dname, okexist);
-    if (!dn) 
-      return 0;
+    snapid_t next_snap = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+    dn->first = std::max(dn->first, next_snap);
   }
-
-  mdr->dn[n].push_back(dn);
-  CDentry::linkage_t *dnl = dn->get_linkage(client, mdr);
   mdr->in[n] = dnl->get_inode();
 
   // -- lock --
@@ -4220,7 +4090,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   MutationImpl::LockOpVec lov;
   file_layout_t *dir_layout = nullptr;
   CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov,
-                                         !excl, false, false, &dir_layout);
+                                         !excl, false, &dir_layout);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -5840,7 +5710,7 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
   client_t client = mdr->get_client();
   MutationImpl::LockOpVec lov;
   file_layout_t *dir_layout = nullptr;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false,
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false,
                                         &dir_layout);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -5940,7 +5810,7 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   }
 
   MutationImpl::LockOpVec lov;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -6027,7 +5897,7 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
 {
   const cref_t<MClientRequest> &req = mdr->client_request;
   MutationImpl::LockOpVec lov;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -6090,7 +5960,7 @@ void Server::handle_client_link(MDRequestRef& mdr)
 
   MutationImpl::LockOpVec lov;
 
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false);
   if (!dn) return;
   CInode *targeti = rdlock_path_pin_ref(mdr, 1, lov, false);
   if (!targeti) return;
@@ -7520,7 +7390,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
 
   MutationImpl::LockOpVec lov;
 
-  CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, false, true);
+  CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, true);
   if (!destdn) return;
   dout(10) << " destdn " << *destdn << dendl;
   if (mdr->snapid != CEPH_NOSNAP) {
index d6e97ef0fd126e04664a0ce434ae6507c15992b5..3b49c47dda560b4cd1ca7f0e00aaae8cc152e553 100644 (file)
@@ -208,9 +208,6 @@ public:
   bool check_fragment_space(MDRequestRef& mdr, CDir *in);
   bool check_access(MDRequestRef& mdr, CInode *in, unsigned mask);
   bool _check_access(Session *session, CInode *in, unsigned mask, int caller_uid, int caller_gid, int setattr_uid, int setattr_gid);
-  CDir *validate_dentry_dir(MDRequestRef& mdr, CInode *diri, std::string_view dname);
-  CDir *traverse_to_auth_dir(MDRequestRef& mdr, vector<CDentry*> &trace, filepath refpath);
-  CDentry *prepare_null_dentry(MDRequestRef& mdr, CDir *dir, std::string_view dname, bool okexist=false);
   CDentry *prepare_stray_dentry(MDRequestRef& mdr, CInode *in);
   CInode* prepare_new_inode(MDRequestRef& mdr, CDir *dir, inodeno_t useino, unsigned mode,
                            file_layout_t *layout=NULL);
@@ -223,7 +220,7 @@ public:
                              bool no_lookup=false);
   CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
                                    MutationImpl::LockOpVec& lov,
-                                   bool okexist, bool mustexist, bool alwaysxlock,
+                                   bool okexist, bool alwaysxlock,
                                    file_layout_t **layout=nullptr);
 
   CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr);