bool forward = !discover;
bool path_locked = (flags & MDS_TRAVERSE_PATH_LOCKED);
bool want_dentry = (flags & MDS_TRAVERSE_WANT_DENTRY);
+ bool want_inode = (flags & MDS_TRAVERSE_WANT_INODE);
bool want_auth = (flags & MDS_TRAVERSE_WANT_AUTH);
bool rdlock_snap = (flags & (MDS_TRAVERSE_RDLOCK_SNAP | MDS_TRAVERSE_RDLOCK_SNAP2));
bool rdlock_path = (flags & MDS_TRAVERSE_RDLOCK_PATH);
if (pin)
*pin = cur;
+ CInode *target_inode = nullptr;
MutationImpl::LockOpVec lov;
+ int r;
for (unsigned depth = 0; depth < path.depth(); ) {
dout(12) << "traverse: path seg depth " << depth << " '" << path[depth]
pdnvec->clear(); // do not confuse likes of rdlock_path_pin_ref();
return -CEPHFS_ENOENT;
}
+ if (depth == path.depth() - 1)
+ target_inode = cur;
mdr->snapid = snapid;
depth++;
continue;
}
*/
- if (want_auth && want_dentry && depth == path.depth() - 1) {
- if (curdir->is_ambiguous_auth()) {
- dout(10) << "waiting for single auth on " << *curdir << dendl;
- curdir->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
- return 1;
- }
- if (!curdir->is_auth()) {
- dout(10) << "fw to auth for " << *curdir << dendl;
- request_forward(mdr, curdir->authority().first);
- return 2;
- }
- }
+ // Defer the auth check until the target inode is determined not to exist
+ // if want_inode is true.
+ if (want_auth && want_dentry && !want_inode && depth == path.depth() - 1 &&
+ (r = maybe_request_forward_to_auth(mdr, cf, curdir)) != 0)
+ return r;
// Before doing dirfrag->dn lookup, compare with DamageTable's
// record of which dentries were unreadable
if (dn->state_test(CDentry::STATE_PURGING))
return -CEPHFS_ENOENT;
+ CDentry::linkage_t *dnl = dn->get_projected_linkage();
+ // If an auth check was deferred before and the target inode is found
+ // not to exist now, do the auth check here if necessary.
+ if (want_auth && want_dentry && want_inode && depth == path.depth() - 1 &&
+ dnl->is_null() && (r = maybe_request_forward_to_auth(mdr, cf, dn)) != 0)
+ return r;
+
if (rdlock_path) {
lov.clear();
- if (xlock_dentry && depth == path.depth() - 1) {
+ // do not xlock the tail dentry if target inode exists and caller wants it
+ if (xlock_dentry && (dnl->is_null() || !want_inode) &&
+ depth == path.depth() - 1) {
+ ceph_assert(dn->is_auth());
if (depth > 0 || !mdr->lock_cache) {
lov.add_wrlock(&cur->filelock);
lov.add_wrlock(&cur->nestlock);
if (pdnvec)
pdnvec->push_back(dn);
- CDentry::linkage_t *dnl = dn->get_projected_linkage();
// can we conclude CEPHFS_ENOENT?
if (dnl->is_null()) {
dout(10) << "traverse: null+readable dentry at " << *dn << dendl;
cur = in;
- if (rdlock_snap && !(want_dentry && depth == path.depth() - 1)) {
+ if (rdlock_snap && !(want_dentry && !want_inode && depth == path.depth() - 1)) {
lov.clear();
lov.add_rdlock(&cur->snaplock);
if (!mds->locker->acquire_locks(mdr, lov)) {
}
}
+ if (depth == path.depth() - 1)
+ target_inode = cur;
+
// add to trace, continue.
touch_inode(cur);
if (pin)
}
} else {
// dirfrag/dentry is not mine.
- mds_authority_t dauth = curdir->authority();
if (forward &&
mdr && mdr->client_request &&
if (forward) {
// forward
dout(7) << "traverse: not auth for " << path << " in " << *curdir << dendl;
-
- if (curdir->is_ambiguous_auth()) {
- // wait
- dout(7) << "traverse: waiting for single auth in " << *curdir << dendl;
- curdir->add_waiter(CDir::WAIT_SINGLEAUTH, cf.build());
- return 1;
- }
- dout(7) << "traverse: forwarding, not auth for " << *curdir << dendl;
+ r = maybe_request_forward_to_auth(mdr, cf, curdir);
+ ceph_assert(r != 0);
- request_forward(mdr, dauth.first);
+ if (r == 2 && mds->logger)
+ mds->logger->inc(l_mds_traverse_forward);
- if (mds->logger) mds->logger->inc(l_mds_traverse_forward);
- return 2;
+ return r;
}
}
ceph_abort(); // i shouldn't get here
}
- if (want_auth && !want_dentry) {
- if (cur->is_ambiguous_auth()) {
- dout(10) << "waiting for single auth on " << *cur << dendl;
- cur->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
- return 1;
- }
- if (!cur->is_auth()) {
- dout(10) << "fw to auth for " << *cur << dendl;
- request_forward(mdr, cur->authority().first);
- return 2;
+ if (path.depth() == 0) {
+ dout(7) << "no tail dentry, base " << *cur << dendl;
+ if (want_dentry && !want_inode) {
+ return -CEPHFS_ENOENT;
}
+ target_inode = cur;
}
-
+
+ if (target_inode) {
+ dout(7) << "found target " << *target_inode << dendl;
+ if (want_auth && !(want_dentry && !want_inode) &&
+ (r = maybe_request_forward_to_auth(mdr, cf, target_inode)) != 0)
+ return r;
+ }
+
// success.
if (mds->logger) mds->logger->inc(l_mds_traverse_hit);
dout(10) << "path_traverse finish on snapid " << snapid << dendl;
return 0;
}
+int MDCache::maybe_request_forward_to_auth(MDRequestRef& mdr, MDSContextFactory& cf,
+ MDSCacheObject *p)
+{
+ if (p->is_ambiguous_auth()) {
+ dout(7) << "waiting for single auth on " << *p << dendl;
+ p->add_waiter(CInode::WAIT_SINGLEAUTH, cf.build());
+ return 1;
+ }
+ if (!p->is_auth()) {
+ dout(7) << "fw to auth for " << *p << dendl;
+ request_forward(mdr, p->authority().first);
+ return 2;
+ }
+ return 0;
+}
+
CInode *MDCache::cache_traverse(const filepath& fp)
{
dout(10) << "cache_traverse " << fp << dendl;
static const int MDS_TRAVERSE_XLOCK_DENTRY = (1 << 8);
static const int MDS_TRAVERSE_RDLOCK_AUTHLOCK = (1 << 9);
static const int MDS_TRAVERSE_CHECK_LOCKCACHE = (1 << 10);
+static const int MDS_TRAVERSE_WANT_INODE = (1 << 11);
// flags for predirty_journal_parents()
* dentry is encountered.
* MDS_TRAVERSE_WANT_DENTRY: Caller wants tail dentry. Add a null dentry if
* tail dentry does not exist. return 0 even tail dentry is null.
+ * MDS_TRAVERSE_WANT_INODE: Caller only wants target inode if it exists, or
+ * wants tail dentry if target inode does not exist and MDS_TRAVERSE_WANT_DENTRY
+ * is also set.
* MDS_TRAVERSE_WANT_AUTH: Always forward request to auth MDS of target inode
* or auth MDS of tail dentry (MDS_TRAVERSE_WANT_DENTRY is set).
+ * MDS_TRAVERSE_XLOCK_DENTRY: Caller wants to xlock tail dentry if MDS_TRAVERSE_WANT_INODE
+ * is not set or (MDS_TRAVERSE_WANT_INODE is set but target inode does not exist)
*
* @param pdnvec Data return parameter -- on success, contains a
* vector of dentries. On failure, is either empty or contains the
const filepath& path, int flags,
std::vector<CDentry*> *pdnvec, CInode **pin=nullptr);
+ int maybe_request_forward_to_auth(MDRequestRef& mdr, MDSContextFactory& cf,
+ MDSCacheObject *p);
+
CInode *cache_traverse(const filepath& path);
void open_remote_dirfrag(CInode *diri, frag_t fg, MDSContext *fin);
/** rdlock_path_xlock_dentry
* traverse path to the directory that could/would contain dentry.
- * make sure i am auth for that dentry, forward as necessary.
- * create null dentry in place (or use existing if okexist).
+ * make sure i am auth for that dentry (or target inode if it exists and authexist),
+ * forward as necessary. create null dentry in place (or use existing if okexist).
* get rdlocks on traversed dentries, xlock on new dentry.
+ *
+ * set authexist true if caller requires the target inode to be auth when it exists.
+ * the tail dentry is not always auth any more if authexist because it is impossible
+ * to ensure tail dentry and target inode are both auth in one mds. the tail dentry
+ * will not be xlocked too if authexist and the target inode exists.
*/
CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr,
- bool create, bool okexist, bool want_layout)
+ bool create, bool okexist, bool authexist,
+ bool want_layout)
{
const filepath& refpath = mdr->get_filepath();
dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
if (create)
flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
+ if (authexist)
+ flags |= MDS_TRAVERSE_WANT_INODE;
if (want_layout)
flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[0]);
}
bool excl = req->head.args.open.flags & CEPH_O_EXCL;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, !excl, true, true);
if (!dn)
return;
CDentry::linkage_t *dnl = dn->get_projected_linkage();
if (!excl && !dnl->is_null()) {
// it existed.
- mds->locker->xlock_downgrade(&dn->lock, mdr.get());
+ ceph_assert(mdr.get()->is_rdlocked(&dn->lock));
MutationImpl::LockOpVec lov;
lov.add_rdlock(&dnl->get_inode()->snaplock);
mode |= S_IFREG;
mdr->disable_lock_cache();
- CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, false, S_ISREG(mode));
if (!dn)
return;