]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: revalidate dentries while constructing paths.
authorSage Weil <sage@newdream.net>
Tue, 17 Jun 2008 04:04:28 +0000 (21:04 -0700)
committerSage Weil <sage@newdream.net>
Tue, 17 Jun 2008 18:18:26 +0000 (11:18 -0700)
src/TODO
src/kernel/dir.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/super.h

index 5701839f30366b9d6a44ca34790af775fa584bde..f3e5e64181066814ed13df53ff0309320b6b9493 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -1,6 +1,5 @@
 v0.3
 - client rsync bug
-- document nested accounting
 
 v0.4
 - ENOSPC
@@ -8,7 +7,7 @@ v0.4
 big items
 - ENOSPC
 - quotas
-  - accounting
+/  - accounting
   - enforcement
 - rados cow/snapshot infrastructure
 - mds snapshots
index 503b685220c07364d9b5ea1e096af5dff8d558f7..417719433eca5424e9c5eb889fd6d3483d4e6a12 100644 (file)
@@ -10,12 +10,24 @@ const struct inode_operations ceph_dir_iops;
 const struct file_operations ceph_dir_fops;
 struct dentry_operations ceph_dentry_ops;
 
+static int ceph_dentry_revalidate(struct dentry *dentry, struct nameidata *nd);
+
 /*
- * build a dentry's path, relative to sb root.  allocate on
- * heap; caller must kfree.
- * (based on build_path_from_dentry in fs/cifs/dir.c)
+ * build a dentry's path.  allocate on heap; caller must kfree.  based
+ * on build_path_from_dentry in fs/cifs/dir.c.
+ *
+ * stop path construction as soon as we hit a dentry we do not have a
+ * valid lease over.  races aside, this ensures we describe the
+ * operation relative to a base inode that is likely to be cached by
+ * the MDS, using a relative path that is known to be valid (e.g., not
+ * munged up by a directory rename on another client).
+ *
+ * this is, unfortunately, both racy and inefficient.  dentries are
+ * revalidated during path traversal, and revalidated _again_ when we
+ * reconstruct the reverse path.  lame.  unfortunately the VFS doesn't
+ * tell us the path it traversed, so i'm not sure we can do any better.
  */
-char *ceph_build_dentry_path(struct dentry *dentry, int *plen)
+char *ceph_build_dentry_path(struct dentry *dentry, int *plen, __u64 *base)
 {
        struct dentry *temp;
        char *path;
@@ -27,6 +39,9 @@ char *ceph_build_dentry_path(struct dentry *dentry, int *plen)
 retry:
        len = 0;
        for (temp = dentry; !IS_ROOT(temp);) {
+               if (temp->d_inode &&
+                   !ceph_dentry_revalidate(temp, 0))
+                       break;
                len += 1 + temp->d_name.len;
                temp = temp->d_parent;
                if (temp == NULL) {
@@ -42,7 +57,7 @@ retry:
                return ERR_PTR(-ENOMEM);
        pos = len;
        path[pos] = 0;  /* trailing null */
-       for (temp = dentry; !IS_ROOT(temp);) {
+       for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
                pos -= temp->d_name.len;
                if (pos < 0) {
                        break;
@@ -63,7 +78,7 @@ retry:
        }
        if (pos != 0) {
                derr(1, "did not end path lookup where expected, "
-                    "namelen is %d\n", len);
+                    "namelen is %d, pos is %d\n", len, pos);
                /* presumably this is only possible if racing with a
                   rename of one of the parent directories (we can not
                   lock the dentries above us to prevent this, but
@@ -72,9 +87,10 @@ retry:
                goto retry;
        }
 
-       dout(10, "build_path_dentry on %p %d build '%.*s'\n",
-            dentry, atomic_read(&dentry->d_count), len, path);
+       *base = ceph_ino(temp->d_inode);
        *plen = len;
+       dout(10, "build_path_dentry on %p %d built %llx '%.*s'\n",
+            dentry, atomic_read(&dentry->d_count), *base, len, path);
        return path;
 }
 
@@ -257,12 +273,12 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
                                               dentry, USE_CAP_MDS);
        } else {
                /* build path */
-               path = ceph_build_dentry_path(dentry, &pathlen);
+               u64 pathbase;
+               path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
                if (IS_ERR(path))
                        return ERR_PTR(PTR_ERR(path));
                req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
-                                              ceph_ino(sb->s_root->d_inode),
-                                              path, 0, 0,
+                                              pathbase, path, 0, 0,
                                               dentry, USE_ANY_MDS);
                kfree(path);
        }
@@ -327,16 +343,16 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_request_head *rhead;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
 
        dout(5, "dir_mknod in dir %p dentry %p mode 0%o rdev %d\n",
             dir, dentry, mode, rdev);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      path, 0, 0,
+                                      pathbase, path, 0, 0,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
@@ -394,15 +410,15 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_request *req;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
 
        dout(5, "dir_symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      path, 0, dest,
+                                      pathbase, path, 0, dest,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
@@ -428,15 +444,15 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
        struct ceph_mds_request_head *rhead;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
 
        dout(5, "dir_mkdir in dir %p dentry %p mode 0%o\n", dir, dentry, mode);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKDIR,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      path, 0, 0,
+                                      pathbase, path, 0, 0,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
@@ -464,23 +480,22 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        struct ceph_mds_request *req;
        char *oldpath, *path;
        int oldpathlen, pathlen;
+       u64 oldpathbase, pathbase;
        int err;
 
        dout(5, "dir_link in dir %p old_dentry %p dentry %p\n", dir,
             old_dentry, dentry);
-       oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen);
+       oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase);
        if (IS_ERR(oldpath))
                return PTR_ERR(oldpath);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path)) {
                kfree(oldpath);
                return PTR_ERR(path);
        }
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      path,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      oldpath,
+                                      pathbase, path,
+                                      oldpathbase, oldpath,
                                       dentry, USE_AUTH_MDS);
        kfree(oldpath);
        kfree(path);
@@ -518,18 +533,18 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
        struct ceph_mds_request *req;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
        int op = ((dentry->d_inode->i_mode & S_IFMT) == S_IFDIR) ?
                CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
 
        dout(5, "dir_unlink/rmdir in dir %p dentry %p inode %p\n",
             dir, dentry, inode);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, op,
-                                      ceph_ino(dir->i_sb->s_root->d_inode),
-                                      path, 0, 0,
+                                      pathbase, path, 0, 0,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req))
@@ -557,24 +572,24 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
        struct ceph_client *client = ceph_sb_to_client(old_dir->i_sb);
        struct ceph_mds_client *mdsc = &client->mdsc;
        struct ceph_mds_request *req;
-       struct dentry *root = old_dir->i_sb->s_root;
        char *oldpath, *newpath;
        int oldpathlen, newpathlen;
+       u64 oldpathbase, newpathbase;
        int err;
 
        dout(5, "dir_rename in dir %p dentry %p to dir %p dentry %p\n",
             old_dir, old_dentry, new_dir, new_dentry);
-       oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen);
+       oldpath = ceph_build_dentry_path(old_dentry, &oldpathlen, &oldpathbase);
        if (IS_ERR(oldpath))
                return PTR_ERR(oldpath);
-       newpath = ceph_build_dentry_path(new_dentry, &newpathlen);
+       newpath = ceph_build_dentry_path(new_dentry, &newpathlen, &newpathbase);
        if (IS_ERR(newpath)) {
                kfree(oldpath);
                return PTR_ERR(newpath);
        }
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME,
-                                      ceph_ino(root->d_inode), oldpath,
-                                      ceph_ino(root->d_inode), newpath,
+                                      oldpathbase, oldpath,
+                                      newpathbase, newpath,
                                       new_dentry, USE_AUTH_MDS);
        kfree(oldpath);
        kfree(newpath);
index 4269317e0ca0721e2ac7eb77f7a1e95b4d5e25df..d3aa24a0f98c7211a35232822ccacd7c6b83793c 100644 (file)
@@ -33,8 +33,7 @@ prepare_open_request(struct super_block *sb, struct dentry *dentry,
 
        dout(5, "prepare_open_request dentry %p name '%s' flags %d\n", dentry,
             dentry->d_name.name, flags);
-       pathbase = ceph_ino(sb->s_root->d_inode);
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return ERR_PTR(PTR_ERR(path));
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN, pathbase, path,
index 0b8d896ef1e351edd635605c05607fc7b4ea7b1a..33e07bc088d32ab16a71fed369346a0a20794281 100644 (file)
@@ -613,6 +613,9 @@ int ceph_dentry_lease_valid(struct dentry *dentry)
  * changes needed to properly reflect the completed operation (e.g.,
  * call d_move).  make note of the distribution of metadata across the
  * mds cluster.
+ *
+ * FIXME: we should check inode.version to avoid races between traces
+ * from multiple MDSs after, say, a ancestor directory is renamed.
  */
 int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                    struct ceph_mds_session *session)
@@ -1696,7 +1699,7 @@ static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
        char *path;
        int pathlen;
        struct ceph_mds_request *req;
-       __u64 baseino = ceph_ino(dentry->d_inode->i_sb->s_root->d_inode);
+       u64 pathbase;
 
        if (ia_valid & ATTR_FILE) {
                dout(5, "prepare_setattr dentry %p (inode %llx)\n", dentry,
@@ -1707,10 +1710,10 @@ static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
                                               dentry, USE_CAP_MDS);
        } else {
                dout(5, "prepare_setattr dentry %p (full path)\n", dentry);
-               path = ceph_build_dentry_path(dentry, &pathlen);
+               path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
                if (IS_ERR(path))
                        return ERR_PTR(PTR_ERR(path));
-               req = ceph_mdsc_create_request(mdsc, op, baseino, path, 0, 0,
+               req = ceph_mdsc_create_request(mdsc, op, pathbase, path, 0, 0,
                                               dentry, USE_ANY_MDS);
                kfree(path);
        }
@@ -2135,6 +2138,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        struct ceph_mds_request_head *rhead;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
        int i, nr_pages;
        struct page **pages = 0;
@@ -2164,12 +2168,11 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        }
 
        /* do request */
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSETXATTR,
-                                      ceph_ino(dentry->d_sb->s_root->d_inode),
-                                      path, 0, name,
+                                      pathbase, path, 0, name,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req))
@@ -2203,18 +2206,18 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
        struct ceph_mds_request *req;
        char *path;
        int pathlen;
+       u64 pathbase;
        int err;
 
        /* only support user.* xattrs, for now */
        if (strncmp(name, "user.", 5) != 0)
                return -EOPNOTSUPP;
 
-       path = ceph_build_dentry_path(dentry, &pathlen);
+       path = ceph_build_dentry_path(dentry, &pathlen, &pathbase);
        if (IS_ERR(path))
                return PTR_ERR(path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LRMXATTR,
-                                      ceph_ino(dentry->d_sb->s_root->d_inode),
-                                      path, 0, name,
+                                      pathbase, path, 0, name,
                                       dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req))
index a6656790b25a88a614fe7ec3edf6761495e0ac30..601b5f6e4a95e35cc9e61a76af3662757f86081b 100644 (file)
@@ -1274,6 +1274,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        struct ceph_inode_cap *cap;
        char *path;
        int pathlen, err;
+       u64 pathbase;
        struct dentry *dentry;
        struct ceph_inode_info *ci;
        struct ceph_mds_cap_reconnect *rec;
@@ -1347,7 +1348,8 @@ retry:
 
                dentry = d_find_alias(&ci->vfs_inode);
                if (dentry) {
-                       path = ceph_build_dentry_path(dentry, &pathlen);
+                       path = ceph_build_dentry_path(dentry, &pathlen,
+                                                     &pathbase);
                        if (IS_ERR(path)) {
                                err = PTR_ERR(path);
                                BUG_ON(err);
index 8ea5dbb9c3b1fc626c2146a51f375b415e64f128..9693fa3b01aff2bbb1b890fe8422dc4139eeb025 100644 (file)
@@ -499,7 +499,7 @@ extern const struct inode_operations ceph_dir_iops;
 extern const struct file_operations ceph_dir_fops;
 extern struct dentry_operations ceph_dentry_ops;
 
-extern char *ceph_build_dentry_path(struct dentry *dentry, int *len);
+extern char *ceph_build_dentry_path(struct dentry *dn, int *len, __u64 *base);
 extern struct dentry *ceph_do_lookup(struct super_block *sb, 
                                     struct dentry *dentry, 
                                     int mask, int on_inode);