]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mds: introduce Server::rdlock_two_paths_xlock_destdn()
authorYan, Zheng <zyan@redhat.com>
Thu, 12 Sep 2019 07:34:12 +0000 (15:34 +0800)
committerYan, Zheng <zyan@redhat.com>
Thu, 12 Dec 2019 18:04:11 +0000 (02:04 +0800)
The helper function is for requests that operate on two paths. It
ensures that the two paths get locks in proper order. The rule is:

 1. Lock directory inodes or dentries according to which trees they
    are under. Lock objects under fs root before objects under mdsdir.
 2. Lock directory inodes or dentries according to their depth, in
    ascending order.
 3. Lock directory inodes or dentries according to inode numbers or
    dentries' parent inode numbers, in ascending order.
 4. Lock dentries in the same directory in order of their keys.
 5. Lock non-directory inodes according to inode numbers, in ascending
    order.

This patch also makes handle_client_link() and handle_client_rename()
to use this helper function.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/Locker.cc
src/mds/MDCache.cc
src/mds/Mutation.cc
src/mds/Mutation.h
src/mds/Server.cc
src/mds/Server.h
src/mds/StrayManager.cc

index eb05bf3d446fe9e0ef0105278cb8efbc604d710d..d1754524a5d4a684d60158b4c1e1c79a761d53e5 100644 (file)
@@ -141,6 +141,8 @@ bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
 {
   dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
   // rdlock ancestor snaps
+  inodeno_t root;
+  int depth = -1;
   bool found_locked = false;
   bool found_layout = false;
 
@@ -151,6 +153,7 @@ bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
 
   CInode *t = in;
   while (true) {
+    ++depth;
     if (!found_locked && mdr->is_rdlocked(&t->snaplock))
       found_locked = true;
 
@@ -179,11 +182,15 @@ bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
       }
     }
     CDentry* pdn = t->get_projected_parent_dn();
-    if (!pdn)
+    if (!pdn) {
+      root = t->ino();
       break;
+    }
     t = pdn->get_dir()->get_inode();
   }
 
+  mdr->dir_root[n] = root;
+  mdr->dir_depth[n] = depth;
   return true;
 
 failed:
index fb24905785a9e9566a1c6edbda2097ad434e88d2..a829f6af5c32a9dfd0921519b33b1a75e4ddcddf 100644 (file)
@@ -8087,14 +8087,20 @@ int MDCache::path_traverse(MDRequestRef& mdr, MDSContextFactory& cf,
 
   dout(7) << "traverse: opening base ino " << path.get_ino() << " snap " << snapid << dendl;
   CInode *cur = get_inode(path.get_ino());
-  if (cur == NULL) {
-    if (MDS_INO_IS_MDSDIR(path.get_ino())) 
+  if (!cur) {
+    if (MDS_INO_IS_MDSDIR(path.get_ino())) {
       open_foreign_mdsdir(path.get_ino(), cf.build());
-    else {
-      //ceph_abort();  // hrm.. broken
-      return -ESTALE;
+      return 1;
     }
-    return 1;
+    if (MDS_INO_IS_STRAY(path.get_ino())) {
+      mds_rank_t rank = MDS_INO_STRAY_OWNER(path.get_ino());
+      unsigned idx = MDS_INO_STRAY_INDEX(path.get_ino());
+      filepath path(strays[idx]->get_parent_dn()->get_name(),
+                   MDS_INO_MDSDIR(rank));
+      MDRequestRef null_ref;
+      return path_traverse(null_ref, cf, path, MDS_TRAVERSE_DISCOVER, nullptr);
+    }
+    return -ESTALE;
   }
   if (cur->state_test(CInode::STATE_PURGING))
     return -ESTALE;
index 2bdaad584bdc841f5005c9be1a44bd66d3192c8c..b14d6983758240878509b6b03aaf42d95e1b55f8 100644 (file)
@@ -413,6 +413,19 @@ bool MDRequestImpl::is_batch_op()
      client_request->get_filepath().depth() == 0);
 }
 
+int MDRequestImpl::compare_paths()
+{
+  if (dir_root[0] < dir_root[1])
+    return -1;
+  if (dir_root[0] > dir_root[1])
+    return 1;
+  if (dir_depth[0] < dir_depth[1])
+    return -1;
+  if (dir_depth[0] > dir_depth[1])
+    return 1;
+  return 0;
+}
+
 cref_t<MClientRequest> MDRequestImpl::release_client_request()
 {
   msg_lock.lock();
index 456779a1752ca6fa6a6fd3981ae7e9d2e2180325..d8227e0e1bb30a5acaef17256c4503813744d147 100644 (file)
@@ -285,7 +285,11 @@ struct MDRequestImpl : public MutationImpl {
   // -- i am a client (master) request
   cref_t<MClientRequest> client_request; // client request (if any)
 
+  // tree and depth info of path1 and path2
+  inodeno_t dir_root[2] = {0, 0};
+  int dir_depth[2] = {-1, -1};
   file_layout_t dir_layout;
+
   // store up to two sets of dn vectors, inode pointers, for request path1 and path2.
   vector<CDentry*> dn[2];
   CInode *in[2];
@@ -441,6 +445,7 @@ struct MDRequestImpl : public MutationImpl {
   void set_filepath2(const filepath& fp);
   bool is_queued_for_replay() const;
   bool is_batch_op();
+  int compare_paths();
 
   void print(ostream &out) const override;
   void dump(Formatter *f) const override;
index 3f1b365f717e57a2fee86aedaa570109700ed3e8..2c51079d7c954fff061825d12e5baf8001b26b60 100644 (file)
@@ -3287,7 +3287,7 @@ public:
 
 class CF_MDS_MDRContextFactory : public MDSContextFactory {
 public:
-  CF_MDS_MDRContextFactory(MDCache *cache, MDRequestRef &mdr, bool dl=false) :
+  CF_MDS_MDRContextFactory(MDCache *cache, MDRequestRef &mdr, bool dl) :
     mdcache(cache), mdr(mdr), drop_locks(dl) {}
   MDSContext *build() {
     if (drop_locks) {
@@ -3466,6 +3466,164 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
   return dn;
 }
 
+/** rdlock_two_paths_xlock_destdn
+ * traverse two paths and lock the two paths in proper order.
+ * The order of taking locks is:
+ * 1. Lock directory inodes or dentries according to which trees they
+ *    are under. Lock objects under fs root before objects under mdsdir.
+ * 2. Lock directory inodes or dentries according to their depth, in
+ *    ascending order.
+ * 3. Lock directory inodes or dentries according to inode numbers or
+ *    dentries' parent inode numbers, in ascending order.
+ * 4. Lock dentries in the same directory in order of their keys.
+ * 5. Lock non-directory inodes according to inode numbers, in ascending
+ *    order.
+ */
+std::pair<CDentry*, CDentry*>
+Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
+{
+
+  const filepath& refpath = mdr->get_filepath();
+  const filepath& refpath2 = mdr->get_filepath2();
+
+  dout(10) << "rdlock_two_paths_xlock_destdn " << *mdr << " " << refpath << " " << refpath2 << dendl;
+
+  if (mdr->locking_state & MutationImpl::PATH_LOCKED)
+    return std::make_pair(mdr->dn[0].back(), mdr->dn[1].back());
+
+  if (refpath.depth() != 1 || refpath2.depth() != 1) {
+    respond_to_request(mdr, -EINVAL);
+    return std::pair<CDentry*, CDentry*>(nullptr, nullptr);
+  }
+
+  if (refpath.is_last_snap() || refpath2.is_last_snap()) {
+    respond_to_request(mdr, -EROFS);
+    return std::make_pair(nullptr, nullptr);
+  }
+
+  // traverse to parent dir
+  CF_MDS_MDRContextFactory cf(mdcache, mdr, true);
+  int flags = MDS_TRAVERSE_RDLOCK_SNAP |  MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_WANT_AUTH;
+  int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
+  if (r != 0) {
+    if (r == -ESTALE) {
+      dout(10) << "ESTALE on path, attempting recovery" << dendl;
+      mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+    } else if (r < 0) {
+      respond_to_request(mdr, r);
+    }
+    return std::make_pair(nullptr, nullptr);
+  }
+
+  flags = MDS_TRAVERSE_RDLOCK_SNAP2 | MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_DISCOVER;
+  r = mdcache->path_traverse(mdr, cf, refpath2, flags, &mdr->dn[1]);
+  if (r != 0) {
+    if (r == -ESTALE) {
+      dout(10) << "ESTALE on path2, attempting recovery" << dendl;
+      mdcache->find_ino_peers(refpath2.get_ino(), new C_MDS_TryFindInode(this, mdr));
+    } else if (r < 0) {
+      respond_to_request(mdr, r);
+    }
+    return std::make_pair(nullptr, nullptr);
+  }
+
+  CDentry *srcdn = mdr->dn[1].back();
+  CDir *srcdir = srcdn->get_dir();
+  CDentry *destdn = mdr->dn[0].back();
+  CDir *destdir = destdn->get_dir();
+
+  if (!mdr->reqid.name.is_mds()) {
+    if ((srcdir->get_inode()->is_system() && !srcdir->get_inode()->is_root()) ||
+       (destdir->get_inode()->is_system() && !destdir->get_inode()->is_root())) {
+      respond_to_request(mdr, -EROFS);
+      return std::make_pair(nullptr, nullptr);
+    }
+  }
+
+  if (!destdir->get_inode()->is_base() &&
+      destdir->get_inode()->get_projected_parent_dir()->inode->is_stray()) {
+    respond_to_request(mdr, -ENOENT);
+    return std::make_pair(nullptr, nullptr);
+  }
+
+  MutationImpl::LockOpVec lov;
+  if (srcdir->get_inode() == destdir->get_inode()) {
+    lov.add_wrlock(&destdir->inode->filelock);
+    lov.add_wrlock(&destdir->inode->nestlock);
+    if (xlock_srcdn && srcdir != destdir) {
+      mds_rank_t srcdir_auth = srcdir->authority().first;
+      if (srcdir_auth != mds->get_nodeid()) {
+       lov.add_remote_wrlock(&srcdir->inode->filelock, srcdir_auth);
+       lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdir_auth);
+      }
+    }
+
+    if (srcdn->get_name() > destdn->get_name())
+      lov.add_xlock(&destdn->lock);
+
+    if (xlock_srcdn)
+      lov.add_xlock(&srcdn->lock);
+    else
+      lov.add_rdlock(&srcdn->lock);
+
+    if (srcdn->get_name() < destdn->get_name())
+      lov.add_xlock(&destdn->lock);
+  } else {
+    int cmp = mdr->compare_paths();
+    bool lock_destdir_first =
+      (cmp < 0 || (cmp == 0 && destdir->ino() < srcdir->ino()));
+
+    if (lock_destdir_first) {
+      lov.add_wrlock(&destdir->inode->filelock);
+      lov.add_wrlock(&destdir->inode->nestlock);
+      lov.add_xlock(&destdn->lock);
+    }
+
+    if (xlock_srcdn) {
+      mds_rank_t srcdir_auth = srcdir->authority().first;
+      if (srcdir_auth == mds->get_nodeid()) {
+       lov.add_wrlock(&srcdir->inode->filelock);
+       lov.add_wrlock(&srcdir->inode->nestlock);
+      } else {
+       lov.add_remote_wrlock(&srcdir->inode->filelock, srcdir_auth);
+       lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdir_auth);
+      }
+      lov.add_xlock(&srcdn->lock);
+    } else {
+      lov.add_rdlock(&srcdn->lock);
+    }
+
+    if (!lock_destdir_first) {
+      lov.add_wrlock(&destdir->inode->filelock);
+      lov.add_wrlock(&destdir->inode->nestlock);
+      lov.add_xlock(&destdn->lock);
+    }
+  }
+
+  CInode *auth_pin_freeze = nullptr;
+  // XXX any better way to do this?
+  if (xlock_srcdn && !srcdn->is_auth()) {
+    CDentry::linkage_t *srcdnl = srcdn->get_projected_linkage();
+    auth_pin_freeze = srcdnl->is_primary() ? srcdnl->get_inode() : nullptr;
+  }
+  if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
+    return std::make_pair(nullptr, nullptr);
+
+  if (srcdn->get_projected_linkage()->is_null()) {
+    respond_to_request(mdr, -ENOENT);
+    return std::make_pair(nullptr, nullptr);
+  }
+
+  if (destdn->get_projected_linkage()->is_null()) {
+    snapid_t next_snap = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+    destdn->first = std::max(destdn->first, next_snap);
+  }
+
+  mdr->locking_state |= MutationImpl::PATH_LOCKED;
+
+  return std::make_pair(destdn, srcdn);
+}
+
 /**
  * try_open_auth_dirfrag -- open dirfrag, or forward to dirfrag auth
  *
@@ -5900,40 +6058,36 @@ void Server::handle_client_link(MDRequestRef& mdr)
          << " to " << req->get_filepath2()
          << dendl;
 
-  MutationImpl::LockOpVec lov;
+  auto [destdn, srcdn] = rdlock_two_paths_xlock_destdn(mdr, false);
+  if (!destdn)
+    return;
 
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false);
-  if (!dn) return;
-  // FIXME
-  // CInode *targeti = rdlock_path_pin_ref(mdr, 1, lov, false);
-  CInode *targeti = nullptr;
-  if (!targeti) return;
-  if (mdr->snapid != CEPH_NOSNAP) {
-    respond_to_request(mdr, -EROFS);
+  if (!destdn->get_projected_linkage()->is_null()) {
+    respond_to_request(mdr, -EEXIST);
     return;
   }
 
-  CDir *dir = dn->get_dir();
-  dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl;
-  dout(7) << "target is " << *targeti << dendl;
+  CInode *targeti = srcdn->get_projected_linkage()->get_inode();
   if (targeti->is_dir()) {
-    // if srcdn is replica, need to make sure its linkage is correct
-    vector<CDentry*>& trace = mdr->dn[1];
-    if (trace.empty() ||
-       trace.back()->is_auth() ||
-       trace.back()->lock.can_read(mdr->get_client())) {
-      dout(7) << "target is a dir, failing..." << dendl;
-      respond_to_request(mdr, -EINVAL);
-      return;
-    }
+    dout(7) << "target is a dir, failing..." << dendl;
+    respond_to_request(mdr, -EINVAL);
+    return;
   }
 
-  lov.erase_rdlock(&targeti->snaplock);
-  lov.add_xlock(&targeti->snaplock);
-  lov.add_xlock(&targeti->linklock);
+  CDir *dir = destdn->get_dir();
+  dout(7) << "handle_client_link link " << destdn->get_name() << " in " << *dir << dendl;
+  dout(7) << "target is " << *targeti << dendl;
 
-  if (!mds->locker->acquire_locks(mdr, lov))
-    return;
+  if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+    MutationImpl::LockOpVec lov;
+    lov.add_xlock(&targeti->snaplock);
+    lov.add_xlock(&targeti->linklock);
+
+    if (!mds->locker->acquire_locks(mdr, lov))
+      return;
+
+    mdr->locking_state |= MutationImpl::ALL_LOCKED;
+  }
 
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
     if (!check_access(mdr, targeti, MAY_WRITE))
@@ -5951,9 +6105,9 @@ void Server::handle_client_link(MDRequestRef& mdr)
 
   // local or remote?
   if (targeti->is_auth()) 
-    _link_local(mdr, dn, targeti);
+    _link_local(mdr, destdn, targeti);
   else 
-    _link_remote(mdr, true, dn, targeti);
+    _link_remote(mdr, true, destdn, targeti);
   mds->balancer->maybe_fragment(dir, false);  
 }
 
@@ -6601,22 +6755,25 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   }
 
   // lock
+  if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+    MutationImpl::LockOpVec lov;
 
-  MutationImpl::LockOpVec lov;
+    lov.add_xlock(&in->linklock);
+    lov.add_xlock(&in->snaplock);
+    if (in->is_dir())
+      lov.add_rdlock(&in->filelock);   // to verify it's empty
 
-  lov.add_xlock(&in->linklock);
-  lov.add_xlock(&in->snaplock);
-  if (in->is_dir())
-    lov.add_rdlock(&in->filelock);   // to verify it's empty
+    if (straydn) {
+      lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+      lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+      lov.add_xlock(&straydn->lock);
+    }
 
-  if (straydn) {
-    lov.add_wrlock(&straydn->get_dir()->inode->filelock);
-    lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
-    lov.add_xlock(&straydn->lock);
-  }
+    if (!mds->locker->acquire_locks(mdr, lov))
+      return;
 
-  if (!mds->locker->acquire_locks(mdr, lov))
-    return;
+    mdr->locking_state |= MutationImpl::ALL_LOCKED;
+  }
 
   if (in->is_dir() &&
       _dir_is_nonempty(mdr, in)) {
@@ -6908,7 +7065,7 @@ void Server::handle_slave_rmdir_prep(MDRequestRef& mdr)
   filepath srcpath(mdr->slave_request->srcdnpath);
   dout(10) << " src " << srcpath << dendl;
   CInode *in;
-  CF_MDS_MDRContextFactory cf(mdcache, mdr);
+  CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
   int r = mdcache->path_traverse(mdr, cf, srcpath,
                                 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_LAST_XLOCKED,
                                 &trace, &in);
@@ -7279,60 +7436,51 @@ void Server::handle_client_rename(MDRequestRef& mdr)
 
   filepath destpath = req->get_filepath();
   filepath srcpath = req->get_filepath2();
-  if (destpath.depth() == 0 || srcpath.depth() == 0) {
-    respond_to_request(mdr, -EINVAL);
-    return;
-  }
   if (srcpath.is_last_dot_or_dotdot() || destpath.is_last_dot_or_dotdot()) {
     respond_to_request(mdr, -EBUSY);
     return;
   }
 
-  std::string_view destname = destpath.last_dentry();
-
-  vector<CDentry*>& srctrace = mdr->dn[1];
-  vector<CDentry*>& desttrace = mdr->dn[0];
-
-  MutationImpl::LockOpVec lov;
+  auto [destdn, srcdn] = rdlock_two_paths_xlock_destdn(mdr, true);
+  if (!destdn)
+    return;
 
-  CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, true);
-  if (!destdn) return;
   dout(10) << " destdn " << *destdn << dendl;
-  if (mdr->snapid != CEPH_NOSNAP) {
-    respond_to_request(mdr, -EROFS);
-    return;
-  }
-  CDentry::linkage_t *destdnl = destdn->get_projected_linkage();
   CDir *destdir = destdn->get_dir();
   ceph_assert(destdir->is_auth());
+  CDentry::linkage_t *destdnl = destdn->get_projected_linkage();
 
-  CF_MDS_MDRContextFactory cf(mdcache, mdr);
-  int r = mdcache->path_traverse(mdr, cf, srcpath, MDS_TRAVERSE_DISCOVER, &srctrace);
-  if (r > 0)
-    return; // delayed
-  if (r < 0) {
-    if (r == -ESTALE) {
-      dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
-      mdcache->find_ino_peers(srcpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
-    } else {
-      dout(10) << "FAIL on error " << r << dendl;
-      respond_to_request(mdr, r);
-    }
-    return;
-
-  }
-  ceph_assert(!srctrace.empty());
-  CDentry *srcdn = srctrace.back();
   dout(10) << " srcdn " << *srcdn << dendl;
-  if (srcdn->last != CEPH_NOSNAP) {
-    respond_to_request(mdr, -EROFS);
-    return;
-  }
   CDir *srcdir = srcdn->get_dir();
   CDentry::linkage_t *srcdnl = srcdn->get_projected_linkage();
   CInode *srci = srcdnl->get_inode();
   dout(10) << " srci " << *srci << dendl;
 
+  // -- some sanity checks --
+  if (destdn == srcdn) {
+    dout(7) << "rename src=dest, noop" << dendl;
+    respond_to_request(mdr, 0);
+    return;
+  }
+
+  // dest a child of src?
+  // e.g. mv /usr /usr/foo
+  if (srci->is_dir() && srci->is_projected_ancestor_of(destdir->get_inode())) {
+    dout(7) << "cannot rename item to be a child of itself" << dendl;
+    respond_to_request(mdr, -EINVAL);
+    return;
+  }
+
+  // is this a stray migration, reintegration or merge? (sanity checks!)
+  if (mdr->reqid.name.is_mds() &&
+      !(MDS_INO_IS_STRAY(srcpath.get_ino()) &&
+       MDS_INO_IS_STRAY(destpath.get_ino())) &&
+      !(destdnl->is_remote() &&
+       destdnl->get_remote_ino() == srci->ino())) {
+    respond_to_request(mdr, -EINVAL);  // actually, this won't reply, but whatev.
+    return;
+  }
+
   CInode *oldin = 0;
   if (!destdnl->is_null()) {
     //dout(10) << "dest dn exists " << *destdn << dendl;
@@ -7346,32 +7494,28 @@ void Server::handle_client_rename(MDRequestRef& mdr)
       return;
     }
 
-    // if srcdn is replica, need to make sure its linkage is correct
-    if (srcdn->is_auth() ||
-       srcdn->lock.can_read(mdr->get_client()) ||
-       (srcdn->lock.is_xlocked() && srcdn->lock.get_xlock_by() == mdr)) {
-      // mv /some/thing /to/some/existing_other_thing
-      if (oldin->is_dir() && !srci->is_dir()) {
-       respond_to_request(mdr, -EISDIR);
-       return;
-      }
-      if (!oldin->is_dir() && srci->is_dir()) {
-       respond_to_request(mdr, -ENOTDIR);
-       return;
-      }
-      if (srci == oldin && !srcdir->inode->is_stray()) {
-       respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
-       return;
-      }
+    // mv /some/thing /to/some/existing_other_thing
+    if (oldin->is_dir() && !srci->is_dir()) {
+      respond_to_request(mdr, -EISDIR);
+      return;
+    }
+    if (!oldin->is_dir() && srci->is_dir()) {
+      respond_to_request(mdr, -ENOTDIR);
+      return;
+    }
+    if (srci == oldin && !srcdir->inode->is_stray()) {
+      respond_to_request(mdr, 0);  // no-op.  POSIX makes no sense.
+      return;
     }
   }
 
-  // -- some sanity checks --
+  vector<CDentry*>& srctrace = mdr->dn[1];
+  vector<CDentry*>& desttrace = mdr->dn[0];
 
   // src+dest traces _must_ share a common ancestor for locking to prevent orphans
   if (destpath.get_ino() != srcpath.get_ino() &&
       !(req->get_source().is_mds() &&
-       MDS_INO_IS_MDSDIR(srcpath.get_ino()))) {  // <-- mds 'rename' out of stray dir is ok!
+       MDS_INO_IS_STRAY(srcpath.get_ino()))) {  // <-- mds 'rename' out of stray dir is ok!
     CInode *srcbase = srctrace[0]->get_dir()->get_inode();
     CInode *destbase = desttrace[0]->get_dir()->get_inode();
     // ok, extend srctrace toward root until it is an ancestor of desttrace.
@@ -7387,41 +7531,12 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     while (destbase != srcbase) {
       CDentry *pdn = destbase->get_projected_parent_dn();
       desttrace.insert(desttrace.begin(), pdn);
-      lov.add_rdlock(&pdn->lock);
       dout(10) << "rename prepending desttrace with " << *pdn << dendl;
       destbase = pdn->get_dir()->get_inode();
     }
     dout(10) << "rename src and dest traces now share common ancestor " << *destbase << dendl;
   }
 
-  // src == dest?
-  if (srcdir == destdir && srcdn->get_name() == destname) {
-    dout(7) << "rename src=dest, noop" << dendl;
-    respond_to_request(mdr, 0);
-    return;
-  }
-
-  // dest a child of src?
-  // e.g. mv /usr /usr/foo
-  CDentry *pdn = destdir->inode->get_projected_parent_dn();
-  while (pdn) {
-    if (pdn == srcdn) {
-      dout(7) << "cannot rename item to be a child of itself" << dendl;
-      respond_to_request(mdr, -EINVAL);
-      return;
-    }
-    pdn = pdn->get_dir()->inode->parent;
-  }
-
-  // is this a stray migration, reintegration or merge? (sanity checks!)
-  if (mdr->reqid.name.is_mds() &&
-      !(MDS_INO_IS_MDSDIR(srcpath.get_ino()) &&
-       MDS_INO_IS_MDSDIR(destpath.get_ino())) &&
-      !(destdnl->is_remote() &&
-       destdnl->get_remote_ino() == srci->ino())) {
-    respond_to_request(mdr, -EINVAL);  // actually, this won't reply, but whatev.
-    return;
-  }
 
   bool linkmerge = srcdnl->get_inode() == destdnl->get_inode();
   if (linkmerge)
@@ -7439,94 +7554,48 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     mdr->straydn = NULL;
   }
 
-  // -- prepare witness list --
-  /*
-   * NOTE: we use _all_ replicas as witnesses.
-   * this probably isn't totally necessary (esp for file renames),
-   * but if/when we change that, we have to make sure rejoin is
-   * sufficiently robust to handle strong rejoins from survivors
-   * with totally wrong dentry->inode linkage.
-   * (currently, it can ignore rename effects, because the resolve
-   * stage will sort them out.)
-   */
-  set<mds_rank_t> witnesses = mdr->more()->extra_witnesses;
-  if (srcdn->is_auth())
-    srcdn->list_replicas(witnesses);
-  else
-    witnesses.insert(srcdn->authority().first);
-  if (srcdnl->is_remote() && !srci->is_auth())
-    witnesses.insert(srci->authority().first);
-  destdn->list_replicas(witnesses);
-  if (destdnl->is_remote() && !oldin->is_auth())
-    witnesses.insert(oldin->authority().first);
-  dout(10) << " witnesses " << witnesses << ", have " << mdr->more()->witnessed << dendl;
-
 
   // -- locks --
+  if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+    MutationImpl::LockOpVec lov;
 
-  // srctrace items.  this mirrors locks taken in rdlock_path_xlock_dentry
-  for (int i=0; i<(int)srctrace.size(); i++) 
-    lov.add_rdlock(&srctrace[i]->lock);
-  lov.add_xlock(&srcdn->lock);
-  mds_rank_t srcdirauth = srcdir->authority().first;
-  if (srcdirauth != mds->get_nodeid()) {
-    dout(10) << " will remote_wrlock srcdir scatterlocks on mds." << srcdirauth << dendl;
-    lov.add_remote_wrlock(&srcdir->inode->filelock, srcdirauth);
-    lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdirauth);
-    if (srci->is_dir())
-      lov.add_rdlock(&srci->dirfragtreelock);
-  } else {
-    lov.add_wrlock(&srcdir->inode->filelock);
-    lov.add_wrlock(&srcdir->inode->nestlock);
-  }
-  // FIXME
-  // mds->locker->include_snap_rdlocks(srcdir->inode, lov);
-
-  // straydn?
-  if (straydn) {
-    lov.add_wrlock(&straydn->get_dir()->inode->filelock);
-    lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
-    lov.add_xlock(&straydn->lock);
-  }
+    // we need to update srci's ctime.  xlock its least contended lock to do that...
+    lov.add_xlock(&srci->linklock);
+    lov.add_xlock(&srci->snaplock);
 
-  // xlock versionlock on dentries if there are witnesses.
-  //  replicas can't see projected dentry linkages, and will get
-  //  confused if we try to pipeline things.
-  if (!witnesses.empty()) {
-    // take xlock on all projected ancestor dentries for srcdn and destdn.
-    // this ensures the srcdn and destdn can be traversed to by the witnesses.
-    for (int i= 0; i<(int)srctrace.size(); i++) {
-      if (srctrace[i]->is_auth() && srctrace[i]->is_projected())
-         lov.add_xlock(&srctrace[i]->versionlock);
-    }
-    for (int i=0; i<(int)desttrace.size(); i++) {
-      if (desttrace[i]->is_auth() && desttrace[i]->is_projected())
-         lov.add_xlock(&desttrace[i]->versionlock);
-    }
-    // xlock srci and oldin's primary dentries, so witnesses can call
-    // open_remote_ino() with 'want_locked=true' when the srcdn or destdn
-    // is traversed.
-    if (srcdnl->is_remote())
-      lov.add_xlock(&srci->get_projected_parent_dn()->lock);
-    if (destdnl->is_remote())
-      lov.add_xlock(&oldin->get_projected_parent_dn()->lock);
-  }
-
-  // we need to update srci's ctime.  xlock its least contended lock to do that...
-  lov.add_xlock(&srci->linklock);
-  lov.add_xlock(&srci->snaplock);
-
-  if (oldin) {
-    // xlock oldin (for nlink--)
-    lov.add_xlock(&oldin->linklock);
-    lov.add_xlock(&oldin->snaplock);
-    if (oldin->is_dir())
+    if (oldin) {
+      // xlock oldin (for nlink--)
+      lov.add_xlock(&oldin->linklock);
+      lov.add_xlock(&oldin->snaplock);
+      if (oldin->is_dir()) {
+       ceph_assert(srci->is_dir());
        lov.add_rdlock(&oldin->filelock);   // to verify it's empty
-  }
 
-  CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : nullptr;
-  if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
-    return;
+       // adjust locking order?
+       int cmp = mdr->compare_paths();
+       if (cmp < 0 || (cmp == 0 && oldin->ino() < srci->ino()))
+         std::reverse(lov.begin(), lov.end());
+      } else {
+       ceph_assert(!srci->is_dir());
+       // adjust locking order;
+       if (srci->ino() > oldin->ino())
+         std::reverse(lov.begin(), lov.end());
+      }
+    }
+
+    // straydn?
+    if (straydn) {
+      lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+      lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+      lov.add_xlock(&straydn->lock);
+    }
+
+    CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : nullptr;
+    if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
+      return;
+
+    mdr->locking_state |= MutationImpl::ALL_LOCKED;
+  }
 
   if (linkmerge)
     ceph_assert(srcdir->inode->is_stray() && srcdnl->is_primary() && destdnl->is_remote());
@@ -7648,6 +7717,58 @@ void Server::handle_client_rename(MDRequestRef& mdr)
 
   // -- prepare witnesses --
 
+  /*
+   * NOTE: we use _all_ replicas as witnesses.
+   * this probably isn't totally necessary (esp for file renames),
+   * but if/when we change that, we have to make sure rejoin is
+   * sufficiently robust to handle strong rejoins from survivors
+   * with totally wrong dentry->inode linkage.
+   * (currently, it can ignore rename effects, because the resolve
+   * stage will sort them out.)
+   */
+  set<mds_rank_t> witnesses = mdr->more()->extra_witnesses;
+  if (srcdn->is_auth())
+    srcdn->list_replicas(witnesses);
+  else
+    witnesses.insert(srcdn->authority().first);
+  if (srcdnl->is_remote() && !srci->is_auth())
+    witnesses.insert(srci->authority().first);
+  destdn->list_replicas(witnesses);
+  if (destdnl->is_remote() && !oldin->is_auth())
+    witnesses.insert(oldin->authority().first);
+  dout(10) << " witnesses " << witnesses << ", have " << mdr->more()->witnessed << dendl;
+
+  if (!witnesses.empty()) {
+    // Replicas can't see projected dentry linkages and will get confused.
+    // We have taken snaplocks on ancestor inodes. Later rename/rmdir requests
+    // can't project these inodes' linkages.
+    bool need_flush = false;
+    for (auto& dn : srctrace) {
+      if (dn->is_projected()) {
+       need_flush = true;
+       break;
+      }
+    }
+    if (!need_flush) {
+      CDentry *dn = destdn;
+      do {
+       if (dn->is_projected()) {
+         need_flush = true;
+         break;
+       }
+       CInode *diri = dn->get_dir()->get_inode();
+       dn = diri->get_projected_parent_dn();
+      } while (dn);
+    }
+    if (need_flush) {
+      mdlog->wait_for_safe(
+         new MDSInternalContextWrapper(mds,
+           new C_MDS_RetryRequest(mdcache, mdr)));
+      mdlog->flush();
+      return;
+    }
+  }
+
   // do srcdn auth last
   mds_rank_t last = MDS_RANK_NONE;
   if (!srcdn->is_auth()) {
@@ -8465,7 +8586,7 @@ void Server::handle_slave_rename_prep(MDRequestRef& mdr)
   filepath destpath(mdr->slave_request->destdnpath);
   dout(10) << " dest " << destpath << dendl;
   vector<CDentry*> trace;
-  CF_MDS_MDRContextFactory cf(mdcache, mdr);
+  CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
   int r = mdcache->path_traverse(mdr, cf, destpath,
                                 MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_LAST_XLOCKED | MDS_TRAVERSE_WANT_DENTRY,
                                 &trace);
index 8c2b8f6e48793326cd1183a1bb77c5b17907df72..c1eddd25977ba3c4df14592e196aa3412a134405 100644 (file)
@@ -218,6 +218,8 @@ public:
                              bool no_want_auth=false, bool want_layout=false);
   CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, bool create,
                                    bool okexist=false, bool want_layout=false);
+  std::pair<CDentry*, CDentry*>
+           rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn);
 
   CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr);
 
index efc4fe352734d51a0bbf5e2a8f82d05deb91cc6e..6ff5558741bc41b26ce891f930edf4bbe0b98263 100644 (file)
@@ -649,15 +649,13 @@ void StrayManager::_eval_stray_remote(CDentry *stray_dn, CDentry *remote_dn)
 
 void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
 {
-  dout(10) << __func__ << " " << *straydn << " into " << *rdn << dendl;
+  dout(10) << __func__ << " " << *straydn << " to " << *rdn << dendl;
 
   logger->inc(l_mdc_strays_reintegrated);
   
-  // rename it to another mds.
-  filepath src;
-  straydn->make_path(src);
-  filepath dst;
-  rdn->make_path(dst);
+  // rename it to remote linkage .
+  filepath src(straydn->get_name(), straydn->get_dir()->ino());
+  filepath dst(rdn->get_name(), rdn->get_dir()->ino());
 
   auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
   req->set_filepath(dst);
@@ -669,24 +667,16 @@ void StrayManager::reintegrate_stray(CDentry *straydn, CDentry *rdn)
  
 void StrayManager::migrate_stray(CDentry *dn, mds_rank_t to)
 {
-  CInode *in = dn->get_projected_linkage()->get_inode();
-  ceph_assert(in);
-  CInode *diri = dn->dir->get_inode();
-  ceph_assert(diri->is_stray());
-  dout(10) << "migrate_stray from mds." << MDS_INO_STRAY_OWNER(diri->inode.ino)
-          << " to mds." << to
-          << " " << *dn << " " << *in << dendl;
+  dout(10) << __func__ << " " << *dn << " to mds." << to << dendl;
 
   logger->inc(l_mdc_strays_migrated);
 
   // rename it to another mds.
-  filepath src;
-  dn->make_path(src);
-  ceph_assert(src.depth() == 2);
+  inodeno_t dirino = dn->get_dir()->ino();
+  ceph_assert(MDS_INO_IS_STRAY(dirino));
 
-  filepath dst(MDS_INO_MDSDIR(to));
-  dst.push_dentry(src[0]);
-  dst.push_dentry(src[1]);
+  filepath src(dn->get_name(), dirino);
+  filepath dst(dn->get_name(), MDS_INO_STRAY(to, MDS_INO_STRAY_INDEX(dirino)));
 
   auto req = make_message<MClientRequest>(CEPH_MDS_OP_RENAME);
   req->set_filepath(dst);