class CF_MDS_MDRContextFactory : public MDSContextFactory {
public:
- CF_MDS_MDRContextFactory(MDCache *cache, MDRequestRef &mdr, bool dl=false) :
+ CF_MDS_MDRContextFactory(MDCache *cache, MDRequestRef &mdr, bool dl) :
mdcache(cache), mdr(mdr), drop_locks(dl) {}
MDSContext *build() {
if (drop_locks) {
return dn;
}
+/** rdlock_two_paths_xlock_destdn
+ * traverse two paths and lock the two paths in proper order.
+ * The order of taking locks is:
+ * 1. Lock directory inodes or dentries according to which trees they
+ * are under. Lock objects under fs root before objects under mdsdir.
+ * 2. Lock directory inodes or dentries according to their depth, in
+ * ascending order.
+ * 3. Lock directory inodes or dentries according to inode numbers or
+ * dentries' parent inode numbers, in ascending order.
+ * 4. Lock dentries in the same directory in order of their keys.
+ * 5. Lock non-directory inodes according to inode numbers, in ascending
+ * order.
+ */
+std::pair<CDentry*, CDentry*>
+Server::rdlock_two_paths_xlock_destdn(MDRequestRef& mdr, bool xlock_srcdn)
+{
+
+ const filepath& refpath = mdr->get_filepath();
+ const filepath& refpath2 = mdr->get_filepath2();
+
+ dout(10) << "rdlock_two_paths_xlock_destdn " << *mdr << " " << refpath << " " << refpath2 << dendl;
+
+ if (mdr->locking_state & MutationImpl::PATH_LOCKED)
+ return std::make_pair(mdr->dn[0].back(), mdr->dn[1].back());
+
+ if (refpath.depth() != 1 || refpath2.depth() != 1) {
+ respond_to_request(mdr, -EINVAL);
+ return std::pair<CDentry*, CDentry*>(nullptr, nullptr);
+ }
+
+ if (refpath.is_last_snap() || refpath2.is_last_snap()) {
+ respond_to_request(mdr, -EROFS);
+ return std::make_pair(nullptr, nullptr);
+ }
+
+ // traverse to parent dir
+ CF_MDS_MDRContextFactory cf(mdcache, mdr, true);
+ int flags = MDS_TRAVERSE_RDLOCK_SNAP | MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_WANT_AUTH;
+ int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
+ if (r != 0) {
+ if (r == -ESTALE) {
+ dout(10) << "ESTALE on path, attempting recovery" << dendl;
+ mdcache->find_ino_peers(refpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
+ } else if (r < 0) {
+ respond_to_request(mdr, r);
+ }
+ return std::make_pair(nullptr, nullptr);
+ }
+
+ flags = MDS_TRAVERSE_RDLOCK_SNAP2 | MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_DISCOVER;
+ r = mdcache->path_traverse(mdr, cf, refpath2, flags, &mdr->dn[1]);
+ if (r != 0) {
+ if (r == -ESTALE) {
+ dout(10) << "ESTALE on path2, attempting recovery" << dendl;
+ mdcache->find_ino_peers(refpath2.get_ino(), new C_MDS_TryFindInode(this, mdr));
+ } else if (r < 0) {
+ respond_to_request(mdr, r);
+ }
+ return std::make_pair(nullptr, nullptr);
+ }
+
+ CDentry *srcdn = mdr->dn[1].back();
+ CDir *srcdir = srcdn->get_dir();
+ CDentry *destdn = mdr->dn[0].back();
+ CDir *destdir = destdn->get_dir();
+
+ if (!mdr->reqid.name.is_mds()) {
+ if ((srcdir->get_inode()->is_system() && !srcdir->get_inode()->is_root()) ||
+ (destdir->get_inode()->is_system() && !destdir->get_inode()->is_root())) {
+ respond_to_request(mdr, -EROFS);
+ return std::make_pair(nullptr, nullptr);
+ }
+ }
+
+ if (!destdir->get_inode()->is_base() &&
+ destdir->get_inode()->get_projected_parent_dir()->inode->is_stray()) {
+ respond_to_request(mdr, -ENOENT);
+ return std::make_pair(nullptr, nullptr);
+ }
+
+ MutationImpl::LockOpVec lov;
+ if (srcdir->get_inode() == destdir->get_inode()) {
+ lov.add_wrlock(&destdir->inode->filelock);
+ lov.add_wrlock(&destdir->inode->nestlock);
+ if (xlock_srcdn && srcdir != destdir) {
+ mds_rank_t srcdir_auth = srcdir->authority().first;
+ if (srcdir_auth != mds->get_nodeid()) {
+ lov.add_remote_wrlock(&srcdir->inode->filelock, srcdir_auth);
+ lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdir_auth);
+ }
+ }
+
+ if (srcdn->get_name() > destdn->get_name())
+ lov.add_xlock(&destdn->lock);
+
+ if (xlock_srcdn)
+ lov.add_xlock(&srcdn->lock);
+ else
+ lov.add_rdlock(&srcdn->lock);
+
+ if (srcdn->get_name() < destdn->get_name())
+ lov.add_xlock(&destdn->lock);
+ } else {
+ int cmp = mdr->compare_paths();
+ bool lock_destdir_first =
+ (cmp < 0 || (cmp == 0 && destdir->ino() < srcdir->ino()));
+
+ if (lock_destdir_first) {
+ lov.add_wrlock(&destdir->inode->filelock);
+ lov.add_wrlock(&destdir->inode->nestlock);
+ lov.add_xlock(&destdn->lock);
+ }
+
+ if (xlock_srcdn) {
+ mds_rank_t srcdir_auth = srcdir->authority().first;
+ if (srcdir_auth == mds->get_nodeid()) {
+ lov.add_wrlock(&srcdir->inode->filelock);
+ lov.add_wrlock(&srcdir->inode->nestlock);
+ } else {
+ lov.add_remote_wrlock(&srcdir->inode->filelock, srcdir_auth);
+ lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdir_auth);
+ }
+ lov.add_xlock(&srcdn->lock);
+ } else {
+ lov.add_rdlock(&srcdn->lock);
+ }
+
+ if (!lock_destdir_first) {
+ lov.add_wrlock(&destdir->inode->filelock);
+ lov.add_wrlock(&destdir->inode->nestlock);
+ lov.add_xlock(&destdn->lock);
+ }
+ }
+
+ CInode *auth_pin_freeze = nullptr;
+ // XXX any better way to do this?
+ if (xlock_srcdn && !srcdn->is_auth()) {
+ CDentry::linkage_t *srcdnl = srcdn->get_projected_linkage();
+ auth_pin_freeze = srcdnl->is_primary() ? srcdnl->get_inode() : nullptr;
+ }
+ if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
+ return std::make_pair(nullptr, nullptr);
+
+ if (srcdn->get_projected_linkage()->is_null()) {
+ respond_to_request(mdr, -ENOENT);
+ return std::make_pair(nullptr, nullptr);
+ }
+
+ if (destdn->get_projected_linkage()->is_null()) {
+ snapid_t next_snap = mdcache->get_global_snaprealm()->get_newest_seq() + 1;
+ destdn->first = std::max(destdn->first, next_snap);
+ }
+
+ mdr->locking_state |= MutationImpl::PATH_LOCKED;
+
+ return std::make_pair(destdn, srcdn);
+}
+
/**
* try_open_auth_dirfrag -- open dirfrag, or forward to dirfrag auth
*
<< " to " << req->get_filepath2()
<< dendl;
- MutationImpl::LockOpVec lov;
+ auto [destdn, srcdn] = rdlock_two_paths_xlock_destdn(mdr, false);
+ if (!destdn)
+ return;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false);
- if (!dn) return;
- // FIXME
- // CInode *targeti = rdlock_path_pin_ref(mdr, 1, lov, false);
- CInode *targeti = nullptr;
- if (!targeti) return;
- if (mdr->snapid != CEPH_NOSNAP) {
- respond_to_request(mdr, -EROFS);
+ if (!destdn->get_projected_linkage()->is_null()) {
+ respond_to_request(mdr, -EEXIST);
return;
}
- CDir *dir = dn->get_dir();
- dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl;
- dout(7) << "target is " << *targeti << dendl;
+ CInode *targeti = srcdn->get_projected_linkage()->get_inode();
if (targeti->is_dir()) {
- // if srcdn is replica, need to make sure its linkage is correct
- vector<CDentry*>& trace = mdr->dn[1];
- if (trace.empty() ||
- trace.back()->is_auth() ||
- trace.back()->lock.can_read(mdr->get_client())) {
- dout(7) << "target is a dir, failing..." << dendl;
- respond_to_request(mdr, -EINVAL);
- return;
- }
+ dout(7) << "target is a dir, failing..." << dendl;
+ respond_to_request(mdr, -EINVAL);
+ return;
}
- lov.erase_rdlock(&targeti->snaplock);
- lov.add_xlock(&targeti->snaplock);
- lov.add_xlock(&targeti->linklock);
+ CDir *dir = destdn->get_dir();
+ dout(7) << "handle_client_link link " << destdn->get_name() << " in " << *dir << dendl;
+ dout(7) << "target is " << *targeti << dendl;
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
+ lov.add_xlock(&targeti->snaplock);
+ lov.add_xlock(&targeti->linklock);
+
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
+
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
if (!check_access(mdr, targeti, MAY_WRITE))
// local or remote?
if (targeti->is_auth())
- _link_local(mdr, dn, targeti);
+ _link_local(mdr, destdn, targeti);
else
- _link_remote(mdr, true, dn, targeti);
+ _link_remote(mdr, true, destdn, targeti);
mds->balancer->maybe_fragment(dir, false);
}
}
// lock
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
- MutationImpl::LockOpVec lov;
+ lov.add_xlock(&in->linklock);
+ lov.add_xlock(&in->snaplock);
+ if (in->is_dir())
+ lov.add_rdlock(&in->filelock); // to verify it's empty
- lov.add_xlock(&in->linklock);
- lov.add_xlock(&in->snaplock);
- if (in->is_dir())
- lov.add_rdlock(&in->filelock); // to verify it's empty
+ if (straydn) {
+ lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+ lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+ lov.add_xlock(&straydn->lock);
+ }
- if (straydn) {
- lov.add_wrlock(&straydn->get_dir()->inode->filelock);
- lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
- lov.add_xlock(&straydn->lock);
- }
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if (in->is_dir() &&
_dir_is_nonempty(mdr, in)) {
filepath srcpath(mdr->slave_request->srcdnpath);
dout(10) << " src " << srcpath << dendl;
CInode *in;
- CF_MDS_MDRContextFactory cf(mdcache, mdr);
+ CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
int r = mdcache->path_traverse(mdr, cf, srcpath,
MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_LAST_XLOCKED,
&trace, &in);
filepath destpath = req->get_filepath();
filepath srcpath = req->get_filepath2();
- if (destpath.depth() == 0 || srcpath.depth() == 0) {
- respond_to_request(mdr, -EINVAL);
- return;
- }
if (srcpath.is_last_dot_or_dotdot() || destpath.is_last_dot_or_dotdot()) {
respond_to_request(mdr, -EBUSY);
return;
}
- std::string_view destname = destpath.last_dentry();
-
- vector<CDentry*>& srctrace = mdr->dn[1];
- vector<CDentry*>& desttrace = mdr->dn[0];
-
- MutationImpl::LockOpVec lov;
+ auto [destdn, srcdn] = rdlock_two_paths_xlock_destdn(mdr, true);
+ if (!destdn)
+ return;
- CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, true);
- if (!destdn) return;
dout(10) << " destdn " << *destdn << dendl;
- if (mdr->snapid != CEPH_NOSNAP) {
- respond_to_request(mdr, -EROFS);
- return;
- }
- CDentry::linkage_t *destdnl = destdn->get_projected_linkage();
CDir *destdir = destdn->get_dir();
ceph_assert(destdir->is_auth());
+ CDentry::linkage_t *destdnl = destdn->get_projected_linkage();
- CF_MDS_MDRContextFactory cf(mdcache, mdr);
- int r = mdcache->path_traverse(mdr, cf, srcpath, MDS_TRAVERSE_DISCOVER, &srctrace);
- if (r > 0)
- return; // delayed
- if (r < 0) {
- if (r == -ESTALE) {
- dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
- mdcache->find_ino_peers(srcpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
- } else {
- dout(10) << "FAIL on error " << r << dendl;
- respond_to_request(mdr, r);
- }
- return;
-
- }
- ceph_assert(!srctrace.empty());
- CDentry *srcdn = srctrace.back();
dout(10) << " srcdn " << *srcdn << dendl;
- if (srcdn->last != CEPH_NOSNAP) {
- respond_to_request(mdr, -EROFS);
- return;
- }
CDir *srcdir = srcdn->get_dir();
CDentry::linkage_t *srcdnl = srcdn->get_projected_linkage();
CInode *srci = srcdnl->get_inode();
dout(10) << " srci " << *srci << dendl;
+ // -- some sanity checks --
+ if (destdn == srcdn) {
+ dout(7) << "rename src=dest, noop" << dendl;
+ respond_to_request(mdr, 0);
+ return;
+ }
+
+ // dest a child of src?
+ // e.g. mv /usr /usr/foo
+ if (srci->is_dir() && srci->is_projected_ancestor_of(destdir->get_inode())) {
+ dout(7) << "cannot rename item to be a child of itself" << dendl;
+ respond_to_request(mdr, -EINVAL);
+ return;
+ }
+
+ // is this a stray migration, reintegration or merge? (sanity checks!)
+ if (mdr->reqid.name.is_mds() &&
+ !(MDS_INO_IS_STRAY(srcpath.get_ino()) &&
+ MDS_INO_IS_STRAY(destpath.get_ino())) &&
+ !(destdnl->is_remote() &&
+ destdnl->get_remote_ino() == srci->ino())) {
+ respond_to_request(mdr, -EINVAL); // actually, this won't reply, but whatev.
+ return;
+ }
+
CInode *oldin = 0;
if (!destdnl->is_null()) {
//dout(10) << "dest dn exists " << *destdn << dendl;
return;
}
- // if srcdn is replica, need to make sure its linkage is correct
- if (srcdn->is_auth() ||
- srcdn->lock.can_read(mdr->get_client()) ||
- (srcdn->lock.is_xlocked() && srcdn->lock.get_xlock_by() == mdr)) {
- // mv /some/thing /to/some/existing_other_thing
- if (oldin->is_dir() && !srci->is_dir()) {
- respond_to_request(mdr, -EISDIR);
- return;
- }
- if (!oldin->is_dir() && srci->is_dir()) {
- respond_to_request(mdr, -ENOTDIR);
- return;
- }
- if (srci == oldin && !srcdir->inode->is_stray()) {
- respond_to_request(mdr, 0); // no-op. POSIX makes no sense.
- return;
- }
+ // mv /some/thing /to/some/existing_other_thing
+ if (oldin->is_dir() && !srci->is_dir()) {
+ respond_to_request(mdr, -EISDIR);
+ return;
+ }
+ if (!oldin->is_dir() && srci->is_dir()) {
+ respond_to_request(mdr, -ENOTDIR);
+ return;
+ }
+ if (srci == oldin && !srcdir->inode->is_stray()) {
+ respond_to_request(mdr, 0); // no-op. POSIX makes no sense.
+ return;
}
}
- // -- some sanity checks --
+ vector<CDentry*>& srctrace = mdr->dn[1];
+ vector<CDentry*>& desttrace = mdr->dn[0];
// src+dest traces _must_ share a common ancestor for locking to prevent orphans
if (destpath.get_ino() != srcpath.get_ino() &&
!(req->get_source().is_mds() &&
- MDS_INO_IS_MDSDIR(srcpath.get_ino()))) { // <-- mds 'rename' out of stray dir is ok!
+ MDS_INO_IS_STRAY(srcpath.get_ino()))) { // <-- mds 'rename' out of stray dir is ok!
CInode *srcbase = srctrace[0]->get_dir()->get_inode();
CInode *destbase = desttrace[0]->get_dir()->get_inode();
// ok, extend srctrace toward root until it is an ancestor of desttrace.
while (destbase != srcbase) {
CDentry *pdn = destbase->get_projected_parent_dn();
desttrace.insert(desttrace.begin(), pdn);
- lov.add_rdlock(&pdn->lock);
dout(10) << "rename prepending desttrace with " << *pdn << dendl;
destbase = pdn->get_dir()->get_inode();
}
dout(10) << "rename src and dest traces now share common ancestor " << *destbase << dendl;
}
- // src == dest?
- if (srcdir == destdir && srcdn->get_name() == destname) {
- dout(7) << "rename src=dest, noop" << dendl;
- respond_to_request(mdr, 0);
- return;
- }
-
- // dest a child of src?
- // e.g. mv /usr /usr/foo
- CDentry *pdn = destdir->inode->get_projected_parent_dn();
- while (pdn) {
- if (pdn == srcdn) {
- dout(7) << "cannot rename item to be a child of itself" << dendl;
- respond_to_request(mdr, -EINVAL);
- return;
- }
- pdn = pdn->get_dir()->inode->parent;
- }
-
- // is this a stray migration, reintegration or merge? (sanity checks!)
- if (mdr->reqid.name.is_mds() &&
- !(MDS_INO_IS_MDSDIR(srcpath.get_ino()) &&
- MDS_INO_IS_MDSDIR(destpath.get_ino())) &&
- !(destdnl->is_remote() &&
- destdnl->get_remote_ino() == srci->ino())) {
- respond_to_request(mdr, -EINVAL); // actually, this won't reply, but whatev.
- return;
- }
bool linkmerge = srcdnl->get_inode() == destdnl->get_inode();
if (linkmerge)
mdr->straydn = NULL;
}
- // -- prepare witness list --
- /*
- * NOTE: we use _all_ replicas as witnesses.
- * this probably isn't totally necessary (esp for file renames),
- * but if/when we change that, we have to make sure rejoin is
- * sufficiently robust to handle strong rejoins from survivors
- * with totally wrong dentry->inode linkage.
- * (currently, it can ignore rename effects, because the resolve
- * stage will sort them out.)
- */
- set<mds_rank_t> witnesses = mdr->more()->extra_witnesses;
- if (srcdn->is_auth())
- srcdn->list_replicas(witnesses);
- else
- witnesses.insert(srcdn->authority().first);
- if (srcdnl->is_remote() && !srci->is_auth())
- witnesses.insert(srci->authority().first);
- destdn->list_replicas(witnesses);
- if (destdnl->is_remote() && !oldin->is_auth())
- witnesses.insert(oldin->authority().first);
- dout(10) << " witnesses " << witnesses << ", have " << mdr->more()->witnessed << dendl;
-
// -- locks --
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
- // srctrace items. this mirrors locks taken in rdlock_path_xlock_dentry
- for (int i=0; i<(int)srctrace.size(); i++)
- lov.add_rdlock(&srctrace[i]->lock);
- lov.add_xlock(&srcdn->lock);
- mds_rank_t srcdirauth = srcdir->authority().first;
- if (srcdirauth != mds->get_nodeid()) {
- dout(10) << " will remote_wrlock srcdir scatterlocks on mds." << srcdirauth << dendl;
- lov.add_remote_wrlock(&srcdir->inode->filelock, srcdirauth);
- lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdirauth);
- if (srci->is_dir())
- lov.add_rdlock(&srci->dirfragtreelock);
- } else {
- lov.add_wrlock(&srcdir->inode->filelock);
- lov.add_wrlock(&srcdir->inode->nestlock);
- }
- // FIXME
- // mds->locker->include_snap_rdlocks(srcdir->inode, lov);
-
- // straydn?
- if (straydn) {
- lov.add_wrlock(&straydn->get_dir()->inode->filelock);
- lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
- lov.add_xlock(&straydn->lock);
- }
+ // we need to update srci's ctime. xlock its least contended lock to do that...
+ lov.add_xlock(&srci->linklock);
+ lov.add_xlock(&srci->snaplock);
- // xlock versionlock on dentries if there are witnesses.
- // replicas can't see projected dentry linkages, and will get
- // confused if we try to pipeline things.
- if (!witnesses.empty()) {
- // take xlock on all projected ancestor dentries for srcdn and destdn.
- // this ensures the srcdn and destdn can be traversed to by the witnesses.
- for (int i= 0; i<(int)srctrace.size(); i++) {
- if (srctrace[i]->is_auth() && srctrace[i]->is_projected())
- lov.add_xlock(&srctrace[i]->versionlock);
- }
- for (int i=0; i<(int)desttrace.size(); i++) {
- if (desttrace[i]->is_auth() && desttrace[i]->is_projected())
- lov.add_xlock(&desttrace[i]->versionlock);
- }
- // xlock srci and oldin's primary dentries, so witnesses can call
- // open_remote_ino() with 'want_locked=true' when the srcdn or destdn
- // is traversed.
- if (srcdnl->is_remote())
- lov.add_xlock(&srci->get_projected_parent_dn()->lock);
- if (destdnl->is_remote())
- lov.add_xlock(&oldin->get_projected_parent_dn()->lock);
- }
-
- // we need to update srci's ctime. xlock its least contended lock to do that...
- lov.add_xlock(&srci->linklock);
- lov.add_xlock(&srci->snaplock);
-
- if (oldin) {
- // xlock oldin (for nlink--)
- lov.add_xlock(&oldin->linklock);
- lov.add_xlock(&oldin->snaplock);
- if (oldin->is_dir())
+ if (oldin) {
+ // xlock oldin (for nlink--)
+ lov.add_xlock(&oldin->linklock);
+ lov.add_xlock(&oldin->snaplock);
+ if (oldin->is_dir()) {
+ ceph_assert(srci->is_dir());
lov.add_rdlock(&oldin->filelock); // to verify it's empty
- }
- CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : nullptr;
- if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
- return;
+ // adjust locking order?
+ int cmp = mdr->compare_paths();
+ if (cmp < 0 || (cmp == 0 && oldin->ino() < srci->ino()))
+ std::reverse(lov.begin(), lov.end());
+ } else {
+ ceph_assert(!srci->is_dir());
+ // adjust locking order;
+ if (srci->ino() > oldin->ino())
+ std::reverse(lov.begin(), lov.end());
+ }
+ }
+
+ // straydn?
+ if (straydn) {
+ lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+ lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+ lov.add_xlock(&straydn->lock);
+ }
+
+ CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : nullptr;
+ if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
+ return;
+
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if (linkmerge)
ceph_assert(srcdir->inode->is_stray() && srcdnl->is_primary() && destdnl->is_remote());
// -- prepare witnesses --
+ /*
+ * NOTE: we use _all_ replicas as witnesses.
+ * this probably isn't totally necessary (esp for file renames),
+ * but if/when we change that, we have to make sure rejoin is
+ * sufficiently robust to handle strong rejoins from survivors
+ * with totally wrong dentry->inode linkage.
+ * (currently, it can ignore rename effects, because the resolve
+ * stage will sort them out.)
+ */
+ set<mds_rank_t> witnesses = mdr->more()->extra_witnesses;
+ if (srcdn->is_auth())
+ srcdn->list_replicas(witnesses);
+ else
+ witnesses.insert(srcdn->authority().first);
+ if (srcdnl->is_remote() && !srci->is_auth())
+ witnesses.insert(srci->authority().first);
+ destdn->list_replicas(witnesses);
+ if (destdnl->is_remote() && !oldin->is_auth())
+ witnesses.insert(oldin->authority().first);
+ dout(10) << " witnesses " << witnesses << ", have " << mdr->more()->witnessed << dendl;
+
+ if (!witnesses.empty()) {
+ // Replicas can't see projected dentry linkages and will get confused.
+ // We have taken snaplocks on ancestor inodes. Later rename/rmdir requests
+ // can't project these inodes' linkages.
+ bool need_flush = false;
+ for (auto& dn : srctrace) {
+ if (dn->is_projected()) {
+ need_flush = true;
+ break;
+ }
+ }
+ if (!need_flush) {
+ CDentry *dn = destdn;
+ do {
+ if (dn->is_projected()) {
+ need_flush = true;
+ break;
+ }
+ CInode *diri = dn->get_dir()->get_inode();
+ dn = diri->get_projected_parent_dn();
+ } while (dn);
+ }
+ if (need_flush) {
+ mdlog->wait_for_safe(
+ new MDSInternalContextWrapper(mds,
+ new C_MDS_RetryRequest(mdcache, mdr)));
+ mdlog->flush();
+ return;
+ }
+ }
+
// do srcdn auth last
mds_rank_t last = MDS_RANK_NONE;
if (!srcdn->is_auth()) {
filepath destpath(mdr->slave_request->destdnpath);
dout(10) << " dest " << destpath << dendl;
vector<CDentry*> trace;
- CF_MDS_MDRContextFactory cf(mdcache, mdr);
+ CF_MDS_MDRContextFactory cf(mdcache, mdr, false);
int r = mdcache->path_traverse(mdr, cf, destpath,
MDS_TRAVERSE_DISCOVER | MDS_TRAVERSE_LAST_XLOCKED | MDS_TRAVERSE_WANT_DENTRY,
&trace);