*
* always include at least @min dentry(ies), or else paths for
* namespace operations (link, rename, etc.) are meaningless.
+ *
+ * encode hidden .snap dirs as a double /, i.e.
+ * foo/.snap/bar -> foo//bar
*/
-char *ceph_build_dentry_path(struct dentry *dentry, int *plen, __u64 *base,
- int min)
+char *ceph_build_path(struct dentry *dentry, int *plen, __u64 *base, int min)
{
struct dentry *temp;
char *path;
/*
- * build fpos from fragment id and offset within that fragment.
+ * for readdir, encoding the directory frag and offset within that frag
+ * into f_pos.
*/
static loff_t make_fpos(unsigned frag, unsigned off)
{
struct ceph_mds_reply_info *rinfo;
nextfrag:
- dout(5, "dir_readdir filp %p at frag %u off %u\n", filp, frag, off);
+ dout(5, "readdir filp %p at frag %u off %u\n", filp, frag, off);
+
+ /* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) {
struct ceph_mds_request *req;
struct ceph_mds_request_head *rhead;
int op = ceph_snap(inode) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LSSNAP:CEPH_MDS_OP_READDIR;
+ /* discard old result, if any */
+ if (fi->last_readdir)
+ ceph_mdsc_put_request(fi->last_readdir);
+
+ /* requery frag tree, as the frag topology may have changed */
frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
- /* query mds */
- dout(10, "dir_readdir querying mds for ino %llx.%llx frag %x\n",
+ dout(10, "readdir querying mds for ino %llx.%llx frag %x\n",
ceph_vinop(inode), frag);
- path = ceph_build_dentry_path(filp->f_dentry, &pathlen,
- &pathbase, 1);
+ path = ceph_build_path(filp->f_dentry, &pathlen, &pathbase, 1);
req = ceph_mdsc_create_request(mdsc, op,
pathbase, path, 0, NULL,
filp->f_dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req))
return PTR_ERR(req);
+ /* hints to request -> mds selection code */
req->r_direct_hash = frag_value(frag);
req->r_direct_is_hash = true;
rhead = req->r_request->front.iov_base;
ceph_mdsc_put_request(req);
return err;
}
- dout(10, "dir_readdir got and parsed readdir result=%d"
+ dout(10, "readdir got and parsed readdir result=%d"
" on frag %x\n", err, frag);
- if (fi->last_readdir)
- ceph_mdsc_put_request(fi->last_readdir);
fi->last_readdir = req;
}
if (frag_is_leftmost(frag)) {
switch (off) {
case 0:
- dout(10, "dir_readdir off 0 -> '.'\n");
+ dout(10, "readdir off 0 -> '.'\n");
if (filldir(dirent, ".", 1, make_fpos(0, 0),
inode->i_ino, inode->i_mode >> 12) < 0)
return 0;
off++;
filp->f_pos++;
case 1:
- dout(10, "dir_readdir off 1 -> '..'\n");
+ dout(10, "readdir off 1 -> '..'\n");
if (filp->f_dentry->d_parent != NULL &&
filldir(dirent, "..", 2, make_fpos(0, 1),
filp->f_dentry->d_parent->d_inode->i_ino,
off++;
filp->f_pos++;
}
- skew = -2;
+ skew = -2; /* compensate for . and .. */
} else
skew = 0;
rinfo = &fi->last_readdir->r_reply_info;
- dout(10, "dir_readdir frag %x num %d off %d skew %d\n", frag,
+ dout(10, "readdir frag %x num %d off %d skew %d\n", frag,
rinfo->dir_nr, off, skew);
while (off+skew < rinfo->dir_nr) {
- dout(10, "dir_readdir off %d -> %d / %d name '%.*s'\n",
+ dout(10, "readdir off %d -> %d / %d name '%.*s'\n",
off, off+skew,
rinfo->dir_nr, rinfo->dir_dname_len[off+skew],
rinfo->dir_dname[off+skew]);
}
/* more frags? */
- if (frag_value(frag) != frag_mask(frag)) {
+ if (!frag_is_rightmost(frag)) {
frag = frag_next(frag);
off = 0;
filp->f_pos = make_fpos(frag, off);
- dout(10, "dir_readdir next frag is %x\n", frag);
+ dout(10, "readdir next frag is %x\n", frag);
goto nextfrag;
}
- dout(20, "dir_readdir done.\n");
+ dout(20, "readdir done.\n");
return 0;
}
file->f_version = 0;
}
retval = offset;
+ /*
+ * discard buffered readdir content on seekdir(0)
+ */
if (offset == 0 && fi->last_readdir) {
- dout(10, "llseek dropping %p readdir content\n", file);
+ dout(10, "dir_llseek dropping %p content\n", file);
ceph_mdsc_put_request(fi->last_readdir);
fi->last_readdir = NULL;
}
return retval;
}
+
+/*
+ * Process result of a lookup/open request.
+ *
+ * Mainly, make sure that return r_last_dentry (the dentry
+ * the MDS trace ended on) in place of the original VFS-provided
+ * dentry, if they differ.
+ *
+ * Gracefully handle the case where the MDS replies with -ENOENT and
+ * no trace (which it may do, at its discretion, e.g., if it doesn't
+ * care to issue a lease on the negative dentry).
+ */
struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
struct dentry *dentry, int err)
{
dentry, dentry->d_inode);
ceph_init_dentry(dentry);
if (dentry->d_inode) {
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
req->r_last_dentry = d_alloc(dentry->d_parent,
&dentry->d_name);
if (err)
dentry = ERR_PTR(err);
else if (dentry != req->r_last_dentry) {
- dentry = req->r_last_dentry; /* we got spliced */
- dget(dentry);
+ dentry = dget(req->r_last_dentry); /* we got spliced */
} else
dentry = NULL;
return dentry;
}
/*
- * do a lookup / lstat (same thing).
- * @on_inode indicates that we should stat the ino, and not a path
- * built from @dentry.
+ * Do a lookup / lstat (same thing, modulo the metadata @mask).
+ * @on_inode indicates that we should stat the ino directly, and not a
+ * path built from @dentry.
*/
struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
int mask, int on_inode, int locked_dir)
} else {
/* build path */
u64 pathbase;
- path = ceph_build_dentry_path(dentry, &pathlen, &pathbase, 1);
+ path = ceph_build_path(dentry, &pathlen, &pathbase, 1);
if (IS_ERR(path))
return ERR_PTR(PTR_ERR(path));
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
return ERR_PTR(PTR_ERR(req));
rhead = req->r_request->front.iov_base;
rhead->args.stat.mask = cpu_to_le32(mask);
- dget(dentry); /* to match put_request below */
- req->r_last_dentry = dentry; /* use this dentry in fill_trace */
- req->r_locked_dir = dentry->d_parent->d_inode;
+ req->r_last_dentry = dget(dentry); /* try to use this in fill_trace */
+ req->r_locked_dir = dentry->d_parent->d_inode; /* by the VFS */
err = ceph_mdsc_do_request(mdsc, req);
dentry = ceph_finish_lookup(req, dentry, err);
ceph_mdsc_put_request(req); /* will dput(dentry) */
return dentry;
}
+/*
+ * Try to do a lookup+open, if possible.
+ */
static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
struct nameidata *nd)
{
- dout(5, "dir_lookup in dir %p dentry %p '%.*s'\n",
+ dout(5, "lookup in dir %p dentry %p '%.*s'\n",
dir, dentry, dentry->d_name.len, dentry->d_name.name);
/* open (but not create!) intent? */
if (ceph_snap(dir) != CEPH_NOSNAP)
return -EROFS;
- dout(5, "dir_mknod in dir %p dentry %p mode 0%o rdev %d\n",
+ dout(5, "mknod in dir %p dentry %p mode 0%o rdev %d\n",
dir, dentry, mode, rdev);
- path = ceph_build_dentry_path(dentry, &pathlen, &pathbase, 1);
+ path = ceph_build_path(dentry, &pathlen, &pathbase, 1);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
return PTR_ERR(req);
}
ceph_mdsc_lease_release(mdsc, dir, NULL, CEPH_LOCK_ICONTENT);
err = ceph_mdsc_do_request(mdsc, req);
if (!err && req->r_reply_info.trace_numd == 0) {
- /* no trace. do lookup, in case we are called from create. */
+ /*
+ * no trace. do lookup, in case we are called from create
+ * and the VFS needs a valid dentry.
+ */
struct dentry *d;
d = ceph_do_lookup(dir->i_sb, dentry, CEPH_STAT_MASK_INODE_ALL,
0, 0);
if (d) {
- /* ick. this is untested... */
+ /* ick. this is untested, and inherently racey... i
+ suppose we _did_ create the file, but it has since
+ been deleted? hrm. */
dput(d);
err = -ESTALE;
dentry = NULL;
}
}
ceph_mdsc_put_request(req);
- if (err) {
- dout(40, "d_drop %p\n", dentry);
+ if (err)
d_drop(dentry);
- }
return err;
}
if (ceph_snap(dir) != CEPH_NOSNAP)
return -EROFS;
- dout(5, "dir_symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
- path = ceph_build_dentry_path(dentry, &pathlen, &pathbase, 1);
+ dout(5, "symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
+ path = ceph_build_path(dentry, &pathlen, &pathbase, 1);
if (IS_ERR(path))
return PTR_ERR(path);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK,
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
return PTR_ERR(req);
}
ceph_mdsc_lease_release(mdsc, dir, NULL, CEPH_LOCK_ICONTENT);
err = ceph_mdsc_do_request(mdsc, req);
ceph_mdsc_put_request(req);
- if (err) {
- dout(40, "d_drop %p\n", dentry);
+ if (err)
d_drop(dentry);
- }
return err;
}
int op = CEPH_MDS_OP_MKDIR;
if (ceph_snap(dir) == CEPH_SNAPDIR) {
+ /* mkdir .snap/foo is a MKSNAP */
op = CEPH_MDS_OP_MKSNAP;
snaplen = dentry->d_name.len;
snap = kmalloc(snaplen + 1, GFP_NOFS);
memcpy(snap, dentry->d_name.name, snaplen);
snap[snaplen] = 0;
pathdentry = d_find_alias(dir);
- dout(5, "dir_mksnap dir %p '%s' dn %p\n", dir, snap, dentry);
+ dout(5, "mksnap dir %p snap '%s' dn %p\n", dir, snap, dentry);
} else if (ceph_snap(dir) != CEPH_NOSNAP)
return -EROFS;
else
- dout(5, "dir_mkdir dir %p dn %p mode 0%o\n", dir, dentry, mode);
- path = ceph_build_dentry_path(pathdentry, &pathlen, &pathbase, 1);
+ dout(5, "mkdir dir %p dn %p mode 0%o\n", dir, dentry, mode);
+ path = ceph_build_path(pathdentry, &pathlen, &pathbase, 1);
if (pathdentry != dentry)
dput(pathdentry);
if (IS_ERR(path))
dentry, USE_AUTH_MDS);
kfree(path);
if (IS_ERR(req)) {
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
return PTR_ERR(req);
}
- dget(dentry); /* to match put_request below */
- req->r_last_dentry = dentry; /* use this dentry in fill_trace */
+ req->r_last_dentry = dget(dentry); /* use this dentry in fill_trace */
req->r_locked_dir = dir;
rhead = req->r_request->front.iov_base;
rhead->args.mkdir.mode = cpu_to_le32(mode);
ceph_mdsc_lease_release(mdsc, dir, NULL, CEPH_LOCK_ICONTENT);
err = ceph_mdsc_do_request(mdsc, req);
ceph_mdsc_put_request(req);
- if (err < 0) {
- dout(40, "d_drop %p\n", dentry);
+ if (err < 0)
d_drop(dentry);
- }
return err;
}
if (ceph_snap(dir) != CEPH_NOSNAP)
return -EROFS;
- dout(5, "dir_link in dir %p old_dentry %p dentry %p\n", dir,
+ dout(5, "link in dir %p old_dentry %p dentry %p\n", dir,
old_dentry, dentry);
- oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase,
- 1);
+ oldpath = ceph_build_path(old_dentry, &oldpathlen, &oldpathbase, 1);
if (IS_ERR(oldpath))
return PTR_ERR(oldpath);
- path = ceph_build_dentry_path(dentry, &pathlen, &pathbase, 1);
+ path = ceph_build_path(dentry, &pathlen, &pathbase, 1);
if (IS_ERR(path)) {
kfree(oldpath);
return PTR_ERR(path);
kfree(oldpath);
kfree(path);
if (IS_ERR(req)) {
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
return PTR_ERR(req);
}
- dget(dentry); /* to match put_request below */
- req->r_last_dentry = dentry; /* use this dentry in fill_trace */
+ req->r_last_dentry = dget(dentry); /* use this dentry in fill_trace */
req->r_locked_dir = old_dentry->d_inode;
ceph_mdsc_lease_release(mdsc, dir, NULL, CEPH_LOCK_ICONTENT);
err = ceph_mdsc_do_request(mdsc, req);
ceph_mdsc_put_request(req);
- if (err) {
- dout(40, "d_drop %p\n", dentry);
+ if (err)
d_drop(dentry);
- } else if (req->r_reply_info.trace_numd == 0) {
+ else if (req->r_reply_info.trace_numd == 0) {
/* no trace */
struct inode *inode = old_dentry->d_inode;
inc_nlink(inode);
return err;
}
+/*
+ * rmdir and unlink are differ only by the metadata op code
+ */
static int ceph_unlink(struct inode *dir, struct dentry *dentry)
{
struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
struct dentry *pathdentry = dentry;
if (ceph_snap(dir) == CEPH_SNAPDIR) {
+ /* rmdir .snap/foo is RMSNAP */
op = CEPH_MDS_OP_RMSNAP;
snaplen = dentry->d_name.len;
snap = kmalloc(snaplen + 1, GFP_NOFS);
memcpy(snap, dentry->d_name.name, snaplen);
snap[snaplen] = 0;
pathdentry = d_find_alias(dir);
- dout(5, "dir_rmsnap dir %p '%s' dn %p\n", dir, snap, dentry);
+ dout(5, "rmsnap dir %p '%s' dn %p\n", dir, snap, dentry);
} else if (ceph_snap(dir) != CEPH_NOSNAP)
return -EROFS;
else
- dout(5, "dir_unlink/rmdir dir %p dn %p inode %p\n",
+ dout(5, "unlink/rmdir dir %p dn %p inode %p\n",
dir, dentry, inode);
- path = ceph_build_dentry_path(pathdentry, &pathlen, &pathbase, 1);
+ path = ceph_build_path(pathdentry, &pathlen, &pathbase, 1);
if (pathdentry != dentry)
dput(pathdentry);
if (IS_ERR(path))
if (IS_ERR(req))
return PTR_ERR(req);
- req->r_locked_dir = dir;
+ req->r_locked_dir = dir; /* by VFS */
ceph_mdsc_lease_release(mdsc, dir, dentry,
CEPH_LOCK_DN|CEPH_LOCK_ICONTENT);
err = ceph_mdsc_do_request(mdsc, req);
ceph_mdsc_put_request(req);
- if (err == -ENOENT)
- dout(10, "HMMM!\n");
- else if (req->r_reply_info.trace_numd == 0) {
+ if (req->r_reply_info.trace_numd == 0) {
/* no trace */
- drop_nlink(dentry->d_inode);
+ if (err == -ENOENT)
+ d_drop(dentry);
dput(dentry);
}
dout(5, "dir_rename in dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry);
- oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase,
- 1);
+ oldpath = ceph_build_path(old_dentry, &oldpathlen, &oldpathbase, 1);
if (IS_ERR(oldpath))
return PTR_ERR(oldpath);
- newpath = ceph_build_dentry_path(new_dentry, &newpathlen, &newpathbase,
- 1);
+ newpath = ceph_build_path(new_dentry, &newpathlen, &newpathbase, 1);
if (IS_ERR(newpath)) {
kfree(oldpath);
return PTR_ERR(newpath);
kfree(newpath);
if (IS_ERR(req))
return PTR_ERR(req);
- dget(old_dentry);
- req->r_old_dentry = old_dentry;
- dget(new_dentry);
- req->r_last_dentry = new_dentry;
+ req->r_old_dentry = dget(old_dentry);
+ req->r_last_dentry = dget(new_dentry);
ceph_mdsc_lease_release(mdsc, old_dir, old_dentry,
CEPH_LOCK_DN|CEPH_LOCK_ICONTENT);
if (new_dentry->d_inode)
CEPH_LOCK_ILINK);
err = ceph_mdsc_do_request(mdsc, req);
if (!err && req->r_reply_info.trace_numd == 0) {
- /* no trace */
+ /*
+ * no trace
+ *
+ * Normally d_move() is done by fill_trace (called by
+ * do_request, above). If there is no trace, we need
+ * to do it here.
+ */
if (new_dentry->d_inode)
dput(new_dentry);
d_move(old_dentry, new_dentry);
/*
- * check if dentry lease, or parent directory inode lease or cap says
+ * check if dentry lease, or parent directory inode lease/cap says
* this dentry is still valid
*/
static int ceph_dentry_revalidate(struct dentry *dentry, struct nameidata *nd)
{
struct inode *dir = dentry->d_parent->d_inode;
- /* always validate snapped metadata, for now */
+ /* always trust cached snapped metadata... for now */
if (ceph_snap(dir) != CEPH_NOSNAP) {
dout(10, "d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry,
dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
dout(10, "d_revalidate %p '%.*s' inode %p\n", dentry,
dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
- if (ceph_ino(dir) != 1 && /* ICONTENT useless on root inode */
+ if (ceph_ino(dir) != 1 && /* ICONTENT is meaningless on root inode */
ceph_inode(dir)->i_version == dentry->d_time &&
ceph_inode_lease_valid(dir, CEPH_LOCK_ICONTENT)) {
dout(20, "dentry_revalidate %p %lu ICONTENT on dir %p %llu\n",
}
dout(20, "dentry_revalidate %p no lease\n", dentry);
- dout(40, "d_drop %p\n", dentry);
d_drop(dentry);
return 0;
}
static int ceph_snapdir_dentry_revalidate(struct dentry *dentry,
struct nameidata *nd)
{
- return 1; /* FIXME */
+ /*
+ * Eventually, we'll want to revalidate snapped metadata
+ * too... probably.
+ */
+ return 1;
}
+
+
/*
- * reading from a dir
+ * read() on a dir. This weird interface hack only works if mounted
+ * with '-o dirstat'.
*/
static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
loff_t *ppos)