]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fix up dcache readdir
authorSage Weil <sage@newdream.net>
Wed, 15 Apr 2009 20:58:55 +0000 (13:58 -0700)
committerSage Weil <sage@newdream.net>
Wed, 15 Apr 2009 20:59:16 +0000 (13:59 -0700)
Code should behave with frags and seeks, though both are untested :).

src/kernel/dir.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/super.h

index f830396aed3a9ec888d24cef53012e4213dc2c49..9f2a63a858a3e44faf697ed6d0dfe11c73e6ab92 100644 (file)
@@ -33,10 +33,6 @@ static int ceph_dentry_revalidate(struct dentry *dentry, struct nameidata *nd);
  * for readdir, encoding the directory frag and offset within that frag
  * into f_pos.
  */
-static loff_t make_fpos(unsigned frag, unsigned off)
-{
-       return ((loff_t)frag << 32) | (loff_t)off;
-}
 static unsigned fpos_frag(loff_t p)
 {
        return p >> 32;
@@ -46,52 +42,37 @@ static unsigned fpos_off(loff_t p)
        return p & 0xffffffff;
 }
 
-static int ceph_dcache_readdir(struct inode *dir, struct file *filp,
-                              void *dirent, filldir_t filldir)
+static int __dcache_readdir(struct file *filp,
+                           void *dirent, filldir_t filldir)
 {
        struct ceph_file_info *fi = filp->private_data;
        struct dentry *parent = filp->f_dentry;
-       struct inode *inode = parent->d_inode;
+       struct inode *dir = parent->d_inode;
        struct list_head *p;
-       struct dentry *dentry, *last = 0;
-
-       if (filp->f_pos == 0) {
-               dout(10, "readdir off 0 -> '.'\n");
-               if (filldir(dirent, ".", 1, make_fpos(0, 0),
-                           inode->i_ino, inode->i_mode >> 12) < 0)
-                       return 0;
-               filp->f_pos++;
-       }
+       struct dentry *dentry, *last;
+       struct ceph_dentry_info *di;
+       int err = 0;
 
-       if (filp->f_pos == 1) {
-               dout(10, "readdir off 1 -> '..'\n");
-               if (filldir(dirent, "..", 2, make_fpos(0, 1),
-                           parent->d_parent->d_inode->i_ino,
-                           inode->i_mode >> 12) < 0)
-                       return 0;
-               filp->f_pos++;
-       }
+       last = fi->dentry;
+       fi->dentry = NULL;
+       dout(10, "__dcache_readdir %p at %llu (last %p)\n", dir, filp->f_pos,
+            last);
 
        spin_lock(&dcache_lock);
-       if (filp->f_pos == 2) {
-               dentry = list_entry(parent->d_subdirs.next,
-                                   struct dentry,
-                                   d_u.d_child);
-               p = &dentry->d_u.d_child;
-               dout(10, " initial dentry %p\n", dentry);
-               goto skip_unhashed;
+       if (filp->f_pos == 2 || (last &&
+                                filp->f_pos < ceph_dentry(last)->offset)) {
+               if (list_empty(&parent->d_subdirs))
+                       goto out_unlock;
+               p = parent->d_subdirs.prev;
+               dout(10, " initial p %p/%p\n", p->prev, p->next);
+       } else {
+               p = last->d_u.d_child.prev;
+               filp->f_pos++;
        }
 
-       last = fi->dentry;
-       fi->dentry = NULL;
-       p = &last->d_u.d_child;
-
-next_entry:
-       p = p->next;
+more:
        dentry = list_entry(p, struct dentry, d_u.d_child);
-       filp->f_pos++;
-
-skip_unhashed:
+       di = ceph_dentry(dentry);
        while (1) {
                dout(10, " p %p/%p d_subdirs %p/%p\n", p->prev, p->next,
                     parent->d_subdirs.prev, parent->d_subdirs.next);
@@ -99,40 +80,51 @@ skip_unhashed:
                        fi->at_end = 1;
                        goto out_unlock;
                }
-               if (!d_unhashed(dentry) && dentry->d_inode)
+               if (!d_unhashed(dentry) && dentry->d_inode &&
+                   filp->f_pos <= di->offset)
                        break;
-               dout(10, " %p is unhashed, skipping\n", dentry);
+               dout(10, " skipping %p at %llu (%llu)\n", dentry, di->offset,
+                       filp->f_pos);
                p = p->next;
                dentry = list_entry(p, struct dentry, d_u.d_child);
+               di = ceph_dentry(dentry);
        }
+
        atomic_inc(&dentry->d_count);
        spin_unlock(&dcache_lock);
-       
+
        if (last) {
                dput(last);
                last = NULL;
        }
 
-       dout(10, " dentry %p %.*s %p\n", dentry, dentry->d_name.len,
-            dentry->d_name.name, dentry->d_inode);
+       dout(10, " %llu (%llu) dentry %p %.*s %p\n", di->offset, filp->f_pos,
+            dentry, dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
+       filp->f_pos = di->offset;
        if (filldir(dirent, dentry->d_name.name,
-                   dentry->d_name.len, filp->f_pos,
+                   dentry->d_name.len, di->offset,
                    dentry->d_inode->i_ino,
                    dentry->d_inode->i_mode >> 12) < 0) {
                fi->dentry = dentry;
-               goto out;
+               goto out_unlock;
        }
 
        last = dentry;
        spin_lock(&dcache_lock);
-       goto next_entry;
+       p = p->prev;
+       filp->f_pos++;
+
+       /* make sure a dentry wasn't dropped while we didn't have dcache_lock */
+       if ((ceph_inode(dir)->i_ceph_flags & CEPH_I_COMPLETE))
+               goto more;
+       dout(20, " lost I_COMPLETE on %p; falling back to mds\n", dir);
+       err = -EAGAIN;
 
 out_unlock:
        spin_unlock(&dcache_lock);
-out:
        if (last)
                dput(last);
-       return 0;
+       return err;
 }
 
 static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
@@ -150,27 +142,48 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
        int complete = 0, len;
        int max_entries = 1024;
 
+       dout(5, "readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
        if (fi->at_end)
                return 0;
 
+       if (filp->f_pos == 0) {
+               /* set I_READDIR at start of readdir */
+               ceph_i_set(inode, CEPH_I_READDIR);
+
+               dout(10, "readdir off 0 -> '.'\n");
+               if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
+                           inode->i_ino, inode->i_mode >> 12) < 0)
+                       return 0;
+               filp->f_pos++;
+               off++;
+       }
+       if (filp->f_pos == 1) {
+               dout(10, "readdir off 1 -> '..'\n");
+               if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1),
+                           filp->f_dentry->d_parent->d_inode->i_ino,
+                           inode->i_mode >> 12) < 0)
+                       return 0;
+               filp->f_pos++;
+               off++;
+       }
+
        /* can we use the dcache? */
        spin_lock(&inode->i_lock);
-       if ((filp->f_pos == 0 || fi->dentry) &&
+       if ((filp->f_pos == 2 || fi->dentry) &&
            (ci->i_ceph_flags & CEPH_I_COMPLETE) &&
            (__ceph_caps_issued(ci, NULL) & CEPH_CAP_FILE_RDCACHE)) {
+               err = __dcache_readdir(filp, dirent, filldir);
                spin_unlock(&inode->i_lock);
-               return ceph_dcache_readdir(inode, filp, dirent, filldir);
+               if (err != -EAGAIN)
+                       return err;
        }
        spin_unlock(&inode->i_lock);
-
-
-       /* set I_READDIR at start of readdir */
-       if (filp->f_pos == 0)
-               ceph_i_set(inode, CEPH_I_READDIR);
+       if (fi->dentry) {
+               dput(fi->dentry);
+               fi->dentry = 0;
+       }
 
 more:
-       dout(5, "readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
-
        /* do we have the correct frag content buffered? */
        if (fi->frag != frag || off < fi->off || fi->last_readdir == NULL) {
                struct ceph_mds_request *req;
@@ -235,30 +248,10 @@ more:
                fi->last_readdir = req;
        }
 
-       /* include . and .. with first piece of fragment */
-       if (frag_is_leftmost(frag)) {
-               switch (off) {
-               case 0:
-                       dout(10, "readdir off 0 -> '.'\n");
-                       if (filldir(dirent, ".", 1, make_fpos(0, 0),
-                                   inode->i_ino, inode->i_mode >> 12) < 0)
-                               return 0;
-                       off++;
-                       filp->f_pos++;
-               case 1:
-                       dout(10, "readdir off 1 -> '..'\n");
-                       if (filp->f_dentry->d_parent != NULL &&
-                           filldir(dirent, "..", 2, make_fpos(0, 1),
-                                   filp->f_dentry->d_parent->d_inode->i_ino,
-                                   inode->i_mode >> 12) < 0)
-                               return 0;
-                       off++;
-                       filp->f_pos++;
-               }
+       if (frag_is_leftmost(frag))
                skew = -2 - fi->off;  /* compensate for . and .. */
-       } else {
+       else
                skew = 0 - fi->off;
-       }
 
        rinfo = &fi->last_readdir->r_reply_info;
        dout(10, "readdir frag %x num %d off %d fragoff %d skew %d\n", frag,
@@ -272,7 +265,7 @@ more:
                if (filldir(dirent,
                            rinfo->dir_dname[off+skew],
                            rinfo->dir_dname_len[off+skew],
-                           make_fpos(frag, off),
+                           ceph_make_fpos(frag, off),
                            le64_to_cpu(rinfo->dir_in[off+skew].in->ino),
                            ftype) < 0) {
                        dout(20, "filldir stopping us...\n");
@@ -292,7 +285,7 @@ more:
        if (!frag_is_rightmost(frag)) {
                frag = frag_next(frag);
                off = 0;
-               filp->f_pos = make_fpos(frag, off);
+               filp->f_pos = ceph_make_fpos(frag, off);
                dout(10, "readdir next frag is %x\n", frag);
                goto more;
        }
@@ -358,7 +351,6 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
        return retval;
 }
 
-
 /*
  * Process result of a lookup/open request.
  *
index 16d739028c5eed10ef91872d8c1c01169810bf45..02f2668a5ba647981898e35af9e003efeb29fbe3 100644 (file)
@@ -243,6 +243,7 @@ int ceph_release(struct inode *inode, struct file *file)
                ceph_mdsc_put_request(cf->last_readdir);
        kfree(cf->last_name);
        kfree(cf->dir_info);
+       dput(cf->dentry);
        kfree(cf);
        return 0;
 }
index bf34873faab13e7c894379ddda1c6bc21051b8fe..8affefcdaade61781f8ac28d4315191bf68f534d 100644 (file)
@@ -1039,6 +1039,9 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        struct inode *in;
        int err = 0, i;
        struct inode *snapdir = NULL;
+       struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
+       u64 frag = le32_to_cpu(rhead->args.readdir.frag);
+       struct ceph_dentry_info *di;
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
                snapdir = ceph_get_snapdir(parent->d_inode);
@@ -1091,11 +1094,15 @@ retry_lookup:
                        /* reorder parent's d_subdirs */
                        spin_lock(&dn->d_lock);
                        spin_lock(&dcache_lock);
-                       list_move_tail(&dn->d_u.d_child, &parent->d_subdirs);
+                       list_move(&dn->d_u.d_child, &parent->d_subdirs);
                        spin_unlock(&dcache_lock);
                        spin_unlock(&dn->d_lock);
                }
 
+               di = dn->d_fsdata;
+               di->offset = ceph_make_fpos(frag,
+                                      i + (frag_is_leftmost(frag) ? 2 : 0));
+
                /* inode */
                if (dn->d_inode) {
                        in = dn->d_inode;
@@ -1132,7 +1139,6 @@ out:
        return err;
 }
 
-
 void ceph_inode_set_size(struct inode *inode, loff_t size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
index 96aea13246be8138172a63723ab889ce90711ed7..8228abd4ce32c4284ab608c7933c5a6d17e00caf 100644 (file)
@@ -394,6 +394,7 @@ struct ceph_dentry_info {
        struct list_head lru;
        struct dentry *dentry;
        u64 time;
+       u64 offset;
 };
 
 static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
@@ -401,6 +402,10 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)
        return (struct ceph_dentry_info *)dentry->d_fsdata;
 }
 
+static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
+{
+       return ((loff_t)frag << 32) | (loff_t)off;
+}
 
 /*
  * ino_t is <64 bits on many architectures, blech.