]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: rdlock_path_xlock_dentry supports returning auth target inode
authorZhansong Gao <zhsgao@hotmail.com>
Tue, 10 Jan 2023 13:40:12 +0000 (21:40 +0800)
committerVenky Shankar <vshankar@redhat.com>
Sat, 20 May 2023 10:51:53 +0000 (16:21 +0530)
`Server::handle_client_openc` will call `Server::handle_client_open`
if the target inode exists. `Server::handle_client_open requires`
the target inode to be auth if client sets `O_WRONLY` or `O_TRUNC`,
but `Server::rdlock_path_xlock_dentry` can not ensure that. The assertion
that the target inode is auth in `Server::handle_client_open` may fail
and it will crash mds. So add a parameter to to support returning
auth target inode if it exists and forward the request if necessary.
The tail dentry may not be auth anymore when an auth target inode
is provided as required by the caller, because we can't require
both of them to be auth in one mds.
This bug will only be triggered in kernel client, because ceph-fuse
client will try to lookup the target before sending a create request
to mds and it can send an open request instead if the target exists.
But kernel client will send the create request to mds without lookup
first because `atomic open` is supported to improve performance.

Fixes: https://tracker.ceph.com/issues/58411
Signed-off-by: Zhansong Gao <zhsgao@hotmail.com>
(cherry picked from commit cf5fdcd6e2d22f35b670d3c13ae8b7c9b9d2616d)

src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/Server.cc
src/mds/Server.h

index ec5c32468434f9e6ed0c672f17d178bf01cc6d6c..734b296e9bf33a23904ce720348921774ba10805 100644 (file)
@@ -8151,6 +8151,7 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
   bool forward = !discover;
   bool path_locked = (flags & MDS_TRAVERSE_PATH_LOCKED);
   bool want_dentry = (flags & MDS_TRAVERSE_WANT_DENTRY);
+  bool want_inode = (flags & MDS_TRAVERSE_WANT_INODE);
   bool want_auth = (flags & MDS_TRAVERSE_WANT_AUTH);
   bool rdlock_snap = (flags & (MDS_TRAVERSE_RDLOCK_SNAP | MDS_TRAVERSE_RDLOCK_SNAP2));
   bool rdlock_path = (flags & MDS_TRAVERSE_RDLOCK_PATH);
@@ -8210,7 +8211,9 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
   if (pin)
     *pin = cur;
 
+  CInode *target_inode = nullptr;
   MutationImpl::LockOpVec lov;
+  int r;
 
   for (unsigned depth = 0; depth < path.depth(); ) {
     dout(12) << "traverse: path seg depth " << depth << " '" << path[depth]
@@ -8243,6 +8246,8 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
          pdnvec->clear();   // do not confuse likes of rdlock_path_pin_ref();
        return -CEPHFS_ENOENT;
       }
+      if (depth == path.depth() - 1)
+       target_inode = cur;
       mdr->snapid = snapid;
       depth++;
       continue;
@@ -8288,18 +8293,11 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
     }
     */
 
-    if (want_auth && want_dentry && depth == path.depth() - 1) {
-      if (curdir->is_ambiguous_auth()) {
-       dout(10) << "waiting for single auth on " << *curdir << dendl;
-       curdir->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
-       return 1;
-      }
-      if (!curdir->is_auth()) {
-       dout(10) << "fw to auth for " << *curdir << dendl;
-       request_forward(mdr, curdir->authority().first);
-       return 2;
-      }
-    }
+    // Defer the auth check until the target inode is determined not to exist
+    // if want_inode is true.
+    if (want_auth && want_dentry && !want_inode && depth == path.depth() - 1 &&
+        (r = maybe_request_forward_to_auth(mdr, cf, curdir)) != 0)
+      return r;
 
     // Before doing dirfrag->dn lookup, compare with DamageTable's
     // record of which dentries were unreadable
@@ -8315,9 +8313,19 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
       if (dn->state_test(CDentry::STATE_PURGING))
        return -CEPHFS_ENOENT;
 
+      CDentry::linkage_t *dnl = dn->get_projected_linkage();
+      // If an auth check was deferred before and the target inode is found
+      // not to exist now, do the auth check here if necessary.
+      if (want_auth && want_dentry && want_inode && depth == path.depth() - 1 &&
+         dnl->is_null() && (r = maybe_request_forward_to_auth(mdr, cf, dn)) != 0)
+       return r;
+
       if (rdlock_path) {
        lov.clear();
-       if (xlock_dentry && depth == path.depth() - 1) {
+       // do not xlock the tail dentry if target inode exists and caller wants it
+       if (xlock_dentry && (dnl->is_null() || !want_inode) &&
+           depth == path.depth() - 1) {
+         ceph_assert(dn->is_auth());
          if (depth > 0 || !mdr->lock_cache) {
            lov.add_wrlock(&cur->filelock);
            lov.add_wrlock(&cur->nestlock);
@@ -8350,7 +8358,6 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
       if (pdnvec)
        pdnvec->push_back(dn);
 
-      CDentry::linkage_t *dnl = dn->get_projected_linkage();
       // can we conclude CEPHFS_ENOENT?
       if (dnl->is_null()) {
        dout(10) << "traverse: null+readable dentry at " << *dn << dendl;
@@ -8390,7 +8397,7 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
 
       cur = in;
 
-      if (rdlock_snap && !(want_dentry && depth == path.depth() - 1)) {
+      if (rdlock_snap && !(want_dentry && !want_inode && depth == path.depth() - 1)) {
        lov.clear();
        lov.add_rdlock(&cur->snaplock);
        if (!mds->locker->acquire_locks(mdr, lov)) {
@@ -8399,6 +8406,9 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
        }
       }
 
+      if (depth == path.depth() - 1)
+       target_inode = cur;
+
       // add to trace, continue.
       touch_inode(cur);
       if (pin)
@@ -8484,7 +8494,6 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
       }
     } else {
       // dirfrag/dentry is not mine.
-      mds_authority_t dauth = curdir->authority();
 
       if (forward &&
          mdr && mdr->client_request &&
@@ -8505,39 +8514,35 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
       if (forward) {
         // forward
         dout(7) << "traverse: not auth for " << path << " in " << *curdir << dendl;
-       
-       if (curdir->is_ambiguous_auth()) {
-         // wait
-         dout(7) << "traverse: waiting for single auth in " << *curdir << dendl;
-         curdir->add_waiter(CDir::WAIT_SINGLEAUTH, cf.build());
-         return 1;
-       } 
 
-       dout(7) << "traverse: forwarding, not auth for " << *curdir << dendl;
+       r = maybe_request_forward_to_auth(mdr, cf, curdir);
+       ceph_assert(r != 0);
 
-        request_forward(mdr, dauth.first);
+       if (r == 2 && mds->logger)
+         mds->logger->inc(l_mds_traverse_forward);
 
-       if (mds->logger) mds->logger->inc(l_mds_traverse_forward);
-       return 2;
+       return r;
       }
     }
 
     ceph_abort();  // i shouldn't get here
   }
 
-  if (want_auth && !want_dentry) {
-    if (cur->is_ambiguous_auth()) {
-      dout(10) << "waiting for single auth on " << *cur << dendl;
-      cur->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
-      return 1;
-    }
-    if (!cur->is_auth()) {
-      dout(10) << "fw to auth for " << *cur << dendl;
-      request_forward(mdr, cur->authority().first);
-      return 2;
+  if (path.depth() == 0) {
+    dout(7) << "no tail dentry, base " << *cur << dendl;
+    if (want_dentry && !want_inode) {
+      return -CEPHFS_ENOENT;
     }
+    target_inode = cur;
   }
-  
+
+  if (target_inode) {
+    dout(7) << "found target " << *target_inode << dendl;
+    if (want_auth && !(want_dentry && !want_inode) &&
+       (r = maybe_request_forward_to_auth(mdr, cf, target_inode)) != 0)
+      return r;
+  }
+
   // success.
   if (mds->logger) mds->logger->inc(l_mds_traverse_hit);
   dout(10) << "path_traverse finish on snapid " << snapid << dendl;
@@ -8555,6 +8560,22 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
   return 0;
 }
 
+int MDCache::maybe_request_forward_to_auth(MDRequestRef& mdr, MDSContextFactory& cf,
+                                          MDSCacheObject *p)
+{
+  if (p->is_ambiguous_auth()) {
+    dout(7) << "waiting for single auth on " << *p << dendl;
+    p->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
+    return 1;
+  }
+  if (!p->is_auth()) {
+    dout(7) << "fw to auth for " << *p << dendl;
+    request_forward(mdr, p->authority().first);
+    return 2;
+  }
+  return 0;
+}
+
 CInode *MDCache::cache_traverse(const filepath& fp)
 {
   dout(10) << "cache_traverse " << fp << dendl;
index 56fe1164b4435d65cbafca1cbe888b7d0b8853c5..e214b84fdcbd1da4ea9c1e21ff2b6084d0ef6f00 100644 (file)
@@ -121,6 +121,7 @@ static const int MDS_TRAVERSE_RDLOCK_PATH   = (1 << 7);
 static const int MDS_TRAVERSE_XLOCK_DENTRY     = (1 << 8);
 static const int MDS_TRAVERSE_RDLOCK_AUTHLOCK  = (1 << 9);
 static const int MDS_TRAVERSE_CHECK_LOCKCACHE  = (1 << 10);
+static const int MDS_TRAVERSE_WANT_INODE       = (1 << 11);
 
 
 // flags for predirty_journal_parents()
@@ -796,8 +797,13 @@ class MDCache {
    * dentry is encountered.
    * MDS_TRAVERSE_WANT_DENTRY: Caller wants tail dentry. Add a null dentry if
    * tail dentry does not exist. return 0 even tail dentry is null.
+   * MDS_TRAVERSE_WANT_INODE: Caller only wants target inode if it exists, or
+   * wants tail dentry if target inode does not exist and MDS_TRAVERSE_WANT_DENTRY
+   * is also set.
    * MDS_TRAVERSE_WANT_AUTH: Always forward request to auth MDS of target inode
    * or auth MDS of tail dentry (MDS_TRAVERSE_WANT_DENTRY is set).
+   * MDS_TRAVERSE_XLOCK_DENTRY: Caller wants to xlock tail dentry if MDS_TRAVERSE_WANT_INODE
+   * is not set or (MDS_TRAVERSE_WANT_INODE is set but target inode does not exist)
    *
    * @param pdnvec Data return parameter -- on success, contains a
    * vector of dentries. On failure, is either empty or contains the
@@ -815,6 +821,9 @@ class MDCache {
                    const filepath& path, int flags,
                    vector<CDentry*> *pdnvec, CInode **pin=nullptr);
 
+  int maybe_request_forward_to_auth(MDRequestRef& mdr, MDSContextFactory& cf,
+                                   MDSCacheObject *p);
+
   CInode *cache_traverse(const filepath& path);
 
   void open_remote_dirfrag(CInode *diri, frag_t fg, MDSContext *fin);
index b8627e8e071c52a0b510bfc7af627b348c0507ad..ec99d24aabfddecba24f331f8773277a943b9394 100644 (file)
@@ -3581,12 +3581,18 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr,
 
 /** rdlock_path_xlock_dentry
  * traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
  * get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
-                                         bool create, bool okexist, bool want_layout)
+                                         bool create, bool okexist, bool authexist,
+                                         bool want_layout)
 {
   const filepath& refpath = mdr->get_filepath();
   dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
@@ -3624,6 +3630,8 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
     flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
   if (create)
     flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+  if (authexist)
+    flags |= MDS_TRAVERSE_WANT_INODE;
   if (want_layout)
     flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
   int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
@@ -4452,7 +4460,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   }
 
   bool excl = req->head.args.open.flags & CEPH_O_EXCL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
   if (!dn)
     return;
 
@@ -4464,7 +4472,7 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
   if (!excl && !dnl->is_null()) {
     // it existed.
-    mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+    ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
 
     MutationImpl::LockOpVec lov;
     lov.add_rdlock(&dnl->get_inode()->snaplock);
@@ -6736,7 +6744,7 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
     mode |= S_IFREG;
 
   mdr->disable_lock_cache();
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
   if (!dn)
     return;
 
index 91a01a19ee83a4af8b567a55030d14730f983af7..a1cc15304d16e4a135f392ae023949caacc7bae8 100644 (file)
@@ -191,7 +191,8 @@ public:
   CInode* rdlock_path_pin_ref(MDRequestRef& mdr, bool want_auth,
                              bool no_want_auth=false);
   CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, bool create,
-                                   bool okexist=false, bool want_layout=false);
+                                   bool okexist=false, bool authexist=false,
+                                   bool want_layout=false);
   std::pair<CDentry*, CDentry*>
            rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn);