]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: inode.c comments, cleanup
authorSage Weil <sage@newdream.net>
Wed, 15 Oct 2008 19:02:20 +0000 (12:02 -0700)
committerSage Weil <sage@newdream.net>
Wed, 15 Oct 2008 19:13:35 +0000 (12:13 -0700)
src/TODO
src/kernel/inode.c
src/kernel/mds_client.h
src/kernel/super.h

index 99d53edd7bb264c548810e0e05d2213dc99603d9..aa033d7501a395fcc81478c0411fe6aeaefd6a36 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -1,3 +1,6 @@
+kclient
+- fix chown metadata op args
+
 v0.5
 - debug restart, cosd reformat, etc.
 - finish btrfs ioctl interface
index f7fccb6a5f776c88d21d99dcfce85ab67605f3cd..676aedc23d6031099303190d68ff53457f387d09 100644 (file)
@@ -26,7 +26,6 @@ const struct inode_operations ceph_symlink_iops;
 struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
 {
        struct inode *inode;
-       struct ceph_inode_info *ci;
        ino_t t = ceph_vino_to_ino(vino);
 
        inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
@@ -38,13 +37,14 @@ struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
                unlock_new_inode(inode);
        }
 
-       ci = ceph_inode(inode);
-
        dout(30, "get_inode on %lu=%llx.%llx got %p\n", inode->i_ino, vino.ino,
             vino.snap, inode);
        return inode;
 }
 
+/*
+ * get/constuct snapdir inode for a given directory
+ */
 struct inode *ceph_get_snapdir(struct inode *parent)
 {
        struct ceph_vino vino = {
@@ -73,15 +73,6 @@ const struct inode_operations ceph_file_iops = {
        .removexattr = ceph_removexattr,
 };
 
-const struct inode_operations ceph_special_iops = {
-       .setattr = ceph_setattr,
-       .getattr = ceph_getattr,
-       .setxattr = ceph_setxattr,
-       .getxattr = ceph_getxattr,
-       .listxattr = ceph_listxattr,
-       .removexattr = ceph_removexattr
-};
-
 
 /*
  * find/create a frag in the tree
@@ -90,10 +81,9 @@ struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci, u32 f)
 {
        struct rb_node **p;
        struct rb_node *parent = NULL;
-       struct ceph_inode_frag *frag, *newfrag = 0;
+       struct ceph_inode_frag *frag;
        int c;
 
-retry:
        p = &ci->i_fragtree.rb_node;
        while (*p) {
                parent = *p;
@@ -104,18 +94,15 @@ retry:
                else if (c > 0)
                        p = &(*p)->rb_right;
                else
-                       goto out;
+                       return frag;
        }
 
-       if (!newfrag) {
-               newfrag = kmalloc(sizeof(*frag), GFP_NOFS);
-               if (!newfrag)
-                       return ERR_PTR(-ENOMEM);
-               goto retry;
+       frag = kmalloc(sizeof(*frag), GFP_NOFS);
+       if (!frag) {
+               derr(0, "ENOMEM on %p %llx.%llx frag %x\n", &ci->vfs_inode,
+                    ceph_vinop(&ci->vfs_inode), f);
+               return ERR_PTR(-ENOMEM);
        }
-
-       frag = newfrag;
-       newfrag = 0;
        frag->frag = f;
        frag->split_by = 0;
        frag->mds = -1;
@@ -124,14 +111,17 @@ retry:
        rb_link_node(&frag->node, parent, p);
        rb_insert_color(&frag->node, &ci->i_fragtree);
 
-       dout(20, "get_frag added %llx.%llx frag %x\n",
+       dout(20, "get_or_create_frag added %llx.%llx frag %x\n",
             ceph_vinop(&ci->vfs_inode), f);
 
-out:
-       kfree(newfrag);
        return frag;
 }
 
+/*
+ * Choose frag containing the given value @v.  If @pfrag is
+ * specified, copy the frag delegation info to the caller if
+ * it is present.
+ */
 __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
                       struct ceph_inode_frag *pfrag,
                       int *found)
@@ -152,8 +142,7 @@ __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
                        break; /* t is a leaf */
                if (frag->split_by == 0) {
                        if (pfrag)
-                               memcpy(pfrag, frag, sizeof(struct ceph_inode_frag));
-
+                               memcpy(pfrag, frag, sizeof(*pfrag));
                        if (found)
                                *found = 1;
                        break;
@@ -180,9 +169,9 @@ __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
 }
 
 /*
- * process dirfrag (delegation) info.  include leaf fragment in tree
- * ONLY if mds >= 0 || ndist > 0.  (otherwise, only branches/splits
- * are included in i_fragtree)
+ * Process dirfrag (delegation) info from the mds.  Include leaf
+ * fragment in tree ONLY if mds >= 0 || ndist > 0.  Otherwise, only
+ * branches/splits are included in i_fragtree)
  */
 static int ceph_fill_dirfrag(struct inode *inode,
                             struct ceph_mds_reply_dirfrag *dirinfo)
@@ -208,7 +197,7 @@ static int ceph_fill_dirfrag(struct inode *inode,
                        rb_erase(&frag->node, &ci->i_fragtree);
                        kfree(frag);
                } else {
-                       /* tree branch, keep */
+                       /* tree branch, keep and clear */
                        dout(20, "fill_dirfrag cleared %llx.%llx frag %x"
                             " referral\n", ceph_vinop(inode), id);
                        frag->mds = -1;
@@ -221,6 +210,8 @@ static int ceph_fill_dirfrag(struct inode *inode,
        /* find/add this frag to store mds delegation info */
        frag = __get_or_create_frag(ci, id);
        if (IS_ERR(frag)) {
+               /* this is not the end of the world; we can continue
+                  with bad/inaccurate delegation info */
                derr(0, "fill_dirfrag ENOMEM on mds ref %llx.%llx frag %x\n",
                     ceph_vinop(inode), le32_to_cpu(dirinfo->frag));
                err = -ENOMEM;
@@ -240,10 +231,12 @@ out:
 }
 
 /*
- * helper to fill in size, ctime, mtime, and atime.  we have to be
- * careful because the client or MDS may have more up to date info,
- * depending on which capabilities/were help, and on the time_warp_seq
- * (which we increment on utimes()).
+ * Helper to fill in size, ctime, mtime, and atime.  We have to be
+ * careful because either the client or MDS may have more up to date
+ * info, depending on which capabilities are held, and whether
+ * time_warp_seq or truncate_seq have increased.  Ordinarily, mtime
+ * and size are monotonically increasing, except when utimes() or
+ * truncate() increments the corresponding _seq values on the MDS.
  */
 void ceph_fill_file_bits(struct inode *inode, int issued,
                         u64 truncate_seq, u64 size,
@@ -263,6 +256,10 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
        }
 
        if (issued & CEPH_CAP_EXCL) {
+               /*
+                * if we hold EXCL cap, we have the most up to date
+                * values for everything except possibly ctime.
+                */
                if (timespec_compare(ctime, &inode->i_ctime) > 0)
                        inode->i_ctime = *ctime;
                if (time_warp_seq > ci->i_time_warp_seq)
@@ -270,6 +267,7 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
                             inode, time_warp_seq, ci->i_time_warp_seq);
        } else if (issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) {
                if (time_warp_seq > ci->i_time_warp_seq) {
+                       /* the MDS did a utimes() */
                        inode->i_ctime = *ctime;
                        inode->i_mtime = *mtime;
                        inode->i_atime = *atime;
@@ -284,6 +282,7 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
                } else
                        warn = 1;
        } else {
+               /* we have no write caps; whatever the MDS says is true */
                if (time_warp_seq >= ci->i_time_warp_seq) {
                        inode->i_ctime = *ctime;
                        inode->i_mtime = *mtime;
@@ -292,7 +291,7 @@ void ceph_fill_file_bits(struct inode *inode, int issued,
                } else
                        warn = 1;
        }
-       if (warn)
+       if (warn) /* time_warp_seq shouldn't go backwards */
                dout(10, "%p mds time_warp_seq %llu < %llu\n",
                     inode, time_warp_seq, ci->i_time_warp_seq);
 }
@@ -308,19 +307,15 @@ int ceph_fill_inode(struct inode *inode,
        struct ceph_mds_reply_inode *info = iinfo->in;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int i;
-       int symlen;
        int issued;
        struct timespec mtime, atime, ctime;
-       u64 size = le64_to_cpu(info->size);
-       u32 su = le32_to_cpu(info->layout.fl_stripe_unit);
-       int blkbits = fls(su) - 1;
        u32 nsplits;
        void *xattr_data = 0;
        int err = 0;
 
-       dout(30, "fill_inode %p ino %llx by %d.%d sz=%llu mode 0%o nlink %d\n",
-            inode, info->ino, inode->i_uid, inode->i_gid,
-            inode->i_size, inode->i_mode, inode->i_nlink);
+       dout(30, "fill_inode %p ino %llx.%llx v %llu had %llu\n",
+            inode, ceph_vinop(inode), le64_to_cpu(info->version),
+            ci->i_version);
 
        /* prealloc xattr data, if it looks like we'll need it */
        if (iinfo->xattr_len && iinfo->xattr_len != ci->i_xattr_len) {
@@ -331,8 +326,7 @@ int ceph_fill_inode(struct inode *inode,
        }
 
        spin_lock(&inode->i_lock);
-       dout(30, " su %d, blkbits %d.  v %llu, had %llu\n",
-            su, blkbits, le64_to_cpu(info->version), ci->i_version);
+
        if (le64_to_cpu(info->version) > 0 &&
            ci->i_version == le64_to_cpu(info->version))
                goto no_change;
@@ -351,19 +345,15 @@ int ceph_fill_inode(struct inode *inode,
        ceph_decode_timespec(&mtime, &info->mtime);
        ceph_decode_timespec(&ctime, &info->ctime);
        issued = __ceph_caps_issued(ci, 0);
-
        ceph_fill_file_bits(inode, issued,
-                           le64_to_cpu(info->truncate_seq), size,
+                           le64_to_cpu(info->truncate_seq),
+                           le64_to_cpu(info->size),
                            le64_to_cpu(info->time_warp_seq),
                            &ctime, &mtime, &atime);
 
-       inode->i_blkbits = blkbits;
-
        ci->i_max_size = le64_to_cpu(info->max_size);
-
        ci->i_layout = info->layout;
-       kfree(ci->i_symlink);
-       ci->i_symlink = 0;
+       inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
        /* xattrs */
        if (iinfo->xattr_len) {
@@ -384,26 +374,24 @@ int ceph_fill_inode(struct inode *inode,
        inode->i_mapping->backing_dev_info = &ceph_client(inode->i_sb)->backing_dev_info;
 
 no_change:
+       spin_unlock(&inode->i_lock);
+
        /* populate frag tree */
        /* FIXME: move me up, if/when version reflects fragtree changes */
        nsplits = le32_to_cpu(info->fragtree.nsplits);
+       mutex_lock(&ci->i_fragtree_mutex);
        for (i = 0; i < nsplits; i++) {
                u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-               struct ceph_inode_frag *frag = __get_or_create_frag(ci,id);
-               if (IS_ERR(frag)) {
-                       derr(0,"ENOMEM on ino %llx.%llx frag %x\n",
-                            ceph_vinop(inode), id);
+               struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
+
+               if (IS_ERR(frag))
                        continue;
-               }
-               frag->split_by =
-                       le32_to_cpu(info->fragtree.splits[i].by);
-               dout(20, " frag %x split by %d\n", frag->frag,
-                    frag->split_by);
+               frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
+               dout(20, " frag %x split by %d\n", frag->frag, frag->split_by);
        }
+       mutex_unlock(&ci->i_fragtree_mutex);
 
-       spin_unlock(&inode->i_lock);
-
-       /* set dirinfo? */
+       /* update delegation info? */
        if (dirinfo)
                ceph_fill_dirfrag(inode, dirinfo);
 
@@ -413,7 +401,7 @@ no_change:
        case S_IFCHR:
        case S_IFSOCK:
                init_special_inode(inode, inode->i_mode, inode->i_rdev);
-               inode->i_op = &ceph_special_iops;
+               inode->i_op = &ceph_file_iops;
                break;
        case S_IFREG:
                inode->i_op = &ceph_file_iops;
@@ -421,14 +409,17 @@ no_change:
                break;
        case S_IFLNK:
                inode->i_op = &ceph_symlink_iops;
-               symlen = iinfo->symlink_len;
-               BUG_ON(symlen != ci->vfs_inode.i_size);
-               err = -ENOMEM;
-               ci->i_symlink = kmalloc(symlen+1, GFP_NOFS);
-               if (ci->i_symlink == NULL)
-                       goto out;
-               memcpy(ci->i_symlink, iinfo->symlink, symlen);
-               ci->i_symlink[symlen] = 0;
+               if (!ci->i_symlink) {
+                       int symlen = iinfo->symlink_len;
+
+                       BUG_ON(symlen != inode->i_size);
+                       err = -ENOMEM;
+                       ci->i_symlink = kmalloc(symlen+1, GFP_NOFS);
+                       if (!ci->i_symlink)
+                               goto out;
+                       memcpy(ci->i_symlink, iinfo->symlink, symlen);
+                       ci->i_symlink[symlen] = 0;
+               }
                break;
        case S_IFDIR:
                inode->i_op = &ceph_dir_iops;
@@ -441,13 +432,14 @@ no_change:
                ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
                ceph_decode_timespec(&ci->i_rctime, &info->rctime);
 
+               /* it may be better to set st_size in getattr instead? */
                if (ceph_client(inode->i_sb)->mount_args.flags &
                    CEPH_MOUNT_RBYTES)
                        inode->i_size = ci->i_rbytes;
                break;
        default:
-               derr(0, "BAD mode 0x%x S_IFMT 0x%x\n",
-                    inode->i_mode, inode->i_mode & S_IFMT);
+               derr(0, "BAD mode 0%o S_IFMT 0%o\n", inode->i_mode,
+                    inode->i_mode & S_IFMT);
                err = -EINVAL;
                goto out;
        }
@@ -480,8 +472,10 @@ static int update_inode_lease(struct inode *inode,
 
        spin_lock(&inode->i_lock);
        /*
-        * be careful: we can't remove lease from a different session
-        * without holding that other session's s_mutex.  so don't.
+        * be careful: we can't remove a lease from a different session
+        * without holding the other session's s_mutex.  and we only
+        * remember one lease per object.  so if one already exists,
+        * don't touch it.
         */
        if ((ci->i_lease_ttl == 0 || !time_before(ttl, ci->i_lease_ttl) ||
             ci->i_lease_gen != session->s_cap_gen) &&
@@ -497,11 +491,8 @@ static int update_inode_lease(struct inode *inode,
        } else
                mask = 0;
        spin_unlock(&inode->i_lock);
-       if (is_new) {
-               dout(10, "lease iget on %p\n", inode);
+       if (is_new)
                igrab(inode);
-       }
-
        return mask;
 }
 
@@ -624,7 +615,7 @@ fail_unlock:
 int ceph_dentry_lease_valid(struct dentry *dentry)
 {
        struct ceph_dentry_info *di;
-       struct ceph_mds_session *session;
+       struct ceph_mds_session *s;
        int valid = 0;
        u32 gen;
        unsigned long ttl;
@@ -632,11 +623,11 @@ int ceph_dentry_lease_valid(struct dentry *dentry)
        spin_lock(&dentry->d_lock);
        di = ceph_dentry(dentry);
        if (di) {
-               session = di->lease_session;
-               spin_lock(&session->s_cap_lock);
-               gen = session->s_cap_gen;
-               ttl = session->s_cap_ttl;
-               spin_unlock(&session->s_cap_lock);
+               s = di->lease_session;
+               spin_lock(&s->s_cap_lock);
+               gen = s->s_cap_gen;
+               ttl = s->s_cap_ttl;
+               spin_unlock(&s->s_cap_lock);
 
                if (di->lease_gen == gen &&
                    time_before(jiffies, dentry->d_time) &&
@@ -652,6 +643,10 @@ int ceph_dentry_lease_valid(struct dentry *dentry)
 /*
  * splice a dentry to an inode.
  * caller must hold directory i_mutex for this to be safe.
+ *
+ * we will only rehash the resulting dentry if @prehash is
+ * true; @prehash will be set to false (for the benefit of
+ * the caller) if we fail.
  */
 static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
                                    bool *prehash)
@@ -663,10 +658,8 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
                d_drop(dn);
        realdn = d_materialise_unique(dn, in);
        if (IS_ERR(realdn)) {
-               derr(0, "error splicing %p (%d) "
-                    "inode %p ino %llx.%llx\n",
-                    dn, atomic_read(&dn->d_count), in,
-                    ceph_vinop(in));
+               derr(0, "error splicing %p (%d) inode %p ino %llx.%llx\n",
+                    dn, atomic_read(&dn->d_count), in, ceph_vinop(in));
                if (prehash)
                        *prehash = false; /* don't rehash on error */
                goto out;
@@ -675,8 +668,7 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
                     "inode %p ino %llx.%llx\n",
                     dn, atomic_read(&dn->d_count),
                     realdn, atomic_read(&realdn->d_count),
-                    realdn->d_inode,
-                    ceph_vinop(realdn->d_inode));
+                    realdn->d_inode, ceph_vinop(realdn->d_inode));
                dput(dn);
                dn = realdn;
                ceph_init_dentry(dn);
@@ -690,12 +682,15 @@ out:
 }
 
 /*
- * assimilate a full trace of inodes and dentries, from the root to
- * the item relevant for this reply, into our cache.  make any dcache
+ * Assimilate a full trace of inodes and dentries, from the root to
+ * the item relevant for this reply, into our cache.  Make any dcache
  * changes needed to properly reflect the completed operation (e.g.,
- * call d_move).  make note of the distribution of metadata across the
+ * call d_move).  Make note of the distribution of metadata across the
  * mds cluster.
  *
+ * Care is taken to (attempt to) take i_mutex before adjusting dentry
+ * linkages or leases.
+ *
  * FIXME: we should check inode.version to avoid races between traces
  * from multiple MDSs after, say, a ancestor directory is renamed.
  */
@@ -712,8 +707,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct ceph_mds_reply_inode *ininfo;
        int d = 0;
        struct ceph_vino vino;
-       int have_icontent = 0;
-       bool have_lease = 0;
+       int have_icontent;
+       bool have_lease;
 
        if (rinfo->trace_numi == 0) {
                dout(10, "fill_trace reply has empty trace!\n");
@@ -724,7 +719,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        /*
         * if we resend completed ops to a recovering mds, we get no
         * trace.  pretend this is the case to ensure the 'no trace'
-        * handlers behave.
+        * handlers in the callers behave.
         */
        if (rinfo->head->op & CEPH_MDS_OP_WRITE) {
                dout(0, "fill_trace faking empty trace on %d %s\n",
@@ -741,16 +736,16 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        if (likely(dn)) {
                in = dn->d_inode;
                /* trace should start at root, or have only 1 dentry
-                * (if it is in mds stray dir) */
+                * (if it is in an mds stray dir) */
                WARN_ON(vino.ino != 1 && rinfo->trace_numd != 1);
        } else {
-               /* first reply (i.e. mount) */
+               /* first reply (i.e. we just mounted) */
                in = ceph_get_inode(sb, vino);
                if (IS_ERR(in))
                        return PTR_ERR(in);
                dn = d_alloc_root(in);
                if (dn == NULL) {
-                       derr(0, "d_alloc_root enomem badness on root dentry\n");
+                       derr(0, "d_alloc_root ENOMEM badness on root dentry\n");
                        return -ENOMEM;
                }
        }
@@ -764,10 +759,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                if (rinfo->trace_numd == 0)
                        update_inode_lease(in, rinfo->trace_ilease[0],
                                           session, req->r_from_time);
-               if (sb->s_root == NULL)
+               if (unlikely(sb->s_root == NULL))
                        sb->s_root = dn;
        }
 
+       have_icontent = 0;
+       have_lease = 0;
        dget(dn);
        for (d = 0; d < rinfo->trace_numd; d++) {
                dname.name = rinfo->trace_dname[d];
@@ -835,7 +832,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                if (!dn) {
                        dn = d_alloc(parent, &dname);
                        if (!dn) {
-                               derr(0, "d_alloc enomem\n");
+                               derr(0, "d_alloc ENOMEM\n");
                                err = -ENOMEM;
                                goto out_dir_no_inode;
                        }
@@ -927,19 +924,36 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                if (err < 0) {
                        derr(30, "ceph_fill_inode badness\n");
                        iput(in);
+                       in = NULL;
                        d_delete(dn);
                        dn = NULL;
-                       in = 0;
                        break;
                }
 
                dput(parent);
                parent = NULL;
 
-               if (d == rinfo->trace_numi-rinfo->trace_snapdirpos-1) {
+               /* do we diverge into a snap dir at this point in the trace? */
+               if (d == rinfo->trace_numi - rinfo->trace_snapdirpos - 1) {
                        struct inode *snapdir = ceph_get_snapdir(dn->d_inode);
                        dput(dn);
                        dn = d_find_alias(snapdir);
+                       if (!dn) {
+                               struct ceph_client *client =
+                                       ceph_sb_to_client(parent->d_sb);
+
+                               dname.name = client->mount_args.snapdir_name,
+                               dname.len = strlen(dname.name);
+                               dname.hash = full_name_hash(dname.name,
+                                                           dname.len);
+                               dn = d_alloc(parent, &dname);
+                               if (!dn) {
+                                       err = -ENOMEM;
+                                       iput(snapdir);
+                                       break;
+                               }
+                               d_add(dn, snapdir);
+                       }
                        iput(snapdir);
                        dout(10, " snapdir dentry is %p\n", dn);
                }
@@ -1010,11 +1024,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        if (parent)
                dput(parent);
 
-       if (in) {
+       if (in)
                update_inode_lease(dn->d_inode,
                                   rinfo->trace_ilease[d],
                                   session, req->r_from_time);
-       }
 
        dout(10, "fill_trace done err=%d, last dn %p in %p\n", err, dn, in);
        if (req->r_last_dentry)
@@ -1029,7 +1042,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 }
 
 /*
- * prepopulate cache with readdir results
+ * prepopulate cache with readdir results, leases, etc.
  */
 int ceph_readdir_prepopulate(struct ceph_mds_request *req)
 {
@@ -1038,13 +1051,12 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req)
        struct qstr dname;
        struct dentry *dn;
        struct inode *in;
-       int i;
+       int err = 0, i;
        struct inode *snapdir = 0;
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
                snapdir = ceph_get_snapdir(parent->d_inode);
                parent = d_find_alias(snapdir);
-               dput(parent);
                dout(10, "readdir_prepopulate %d items under SNAPDIR dn %p\n",
                     rinfo->dir_nr, parent);
        } else {
@@ -1056,7 +1068,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req)
 
        for (i = 0; i < rinfo->dir_nr; i++) {
                struct ceph_vino vino;
-               /* dentry */
+
                dname.name = rinfo->dir_dname[i];
                dname.len = le32_to_cpu(rinfo->dir_dname_len[i]);
                dname.hash = full_name_hash(dname.name, dname.len);
@@ -1066,16 +1078,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req)
 
 retry_lookup:
                dn = d_lookup(parent, &dname);
-               dout(30, "calling d_lookup on parent=%p name=%.*s"
-                    " returned %p\n", parent, dname.len, dname.name, dn);
+               dout(30, "d_lookup on parent=%p name=%.*s got %p\n",
+                    parent, dname.len, dname.name, dn);
 
                if (!dn) {
                        dn = d_alloc(parent, &dname);
-                       dout(40, "d_alloc %p '%.*s'\n", parent,
-                            dname.len, dname.name);
+                       dout(40, "d_alloc %p '%.*s' = %p\n", parent,
+                            dname.len, dname.name, dn);
                        if (dn == NULL) {
                                dout(30, "d_alloc badness\n");
-                               return -1;
+                               err = -ENOMEM;
+                               goto out;
                        }
                        ceph_init_dentry(dn);
                } else if (dn->d_inode &&
@@ -1097,7 +1110,8 @@ retry_lookup:
                                dout(30, "new_inode badness\n");
                                d_delete(dn);
                                dput(dn);
-                               return -ENOMEM;
+                               err = -ENOMEM;
+                               goto out;
                        }
                        dn = splice_dentry(dn, in, NULL);
                }
@@ -1114,14 +1128,16 @@ retry_lookup:
                dput(dn);
        }
 
-       iput(snapdir);
+out:
+       if (snapdir) {
+               iput(snapdir);
+               dput(parent);
+       }               
        dout(10, "readdir_prepopulate done\n");
-       return 0;
+       return err;
 }
 
 
-
-
 void ceph_inode_set_size(struct inode *inode, loff_t size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1131,6 +1147,7 @@ void ceph_inode_set_size(struct inode *inode, loff_t size)
        inode->i_size = size;
        inode->i_blocks = (size + (1 << 9) - 1) >> 9;
 
+       /* tell the MDS if we are approaching max_size */
        if ((size << 1) >= ci->i_max_size &&
            (ci->i_reported_size << 1) < ci->i_max_size) {
                spin_unlock(&inode->i_lock);
@@ -1139,6 +1156,11 @@ void ceph_inode_set_size(struct inode *inode, loff_t size)
                spin_unlock(&inode->i_lock);
 }
 
+/*
+ * Drop open file reference.  If we were the last open file,
+ * we may need to release capabilities to the MDS (or schedule
+ * their delayed release).
+ */
 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
 {
        int last = 0;
@@ -1155,6 +1177,10 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
 }
 
 
+/*
+ * Write back inode data in a worker thread.  (This can't be done
+ * in the message handler context.)
+ */
 void ceph_inode_writeback(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
@@ -1166,29 +1192,10 @@ void ceph_inode_writeback(struct work_struct *work)
 }
 
 
-/*
- * called by setattr
- */
-static int apply_truncate(struct inode *inode, loff_t size)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int rc;
-       int check = 0;
-
-       rc = vmtruncate(inode, size);
-       spin_lock(&inode->i_lock);
-       if (rc == 0)
-               ci->i_reported_size = size;
-       if (ci->i_wrbuffer_ref == 0)
-               check = 1;
-       spin_unlock(&inode->i_lock);
-       if (check)
-               ceph_check_caps(ci, 0);
-       return rc;
-}
-
 /*
  * called by trunc_wq; take i_mutex ourselves
+ *
+ * We also truncation in a separate thread as well.
  */
 void ceph_vmtruncate_work(struct work_struct *work)
 {
@@ -1203,7 +1210,10 @@ void ceph_vmtruncate_work(struct work_struct *work)
 }
 
 /*
- * called with i_mutex held
+ * called with i_mutex held.
+ *
+ * Make sure any pending truncation is applied before doing anything
+ * that may depend on it.
  */
 void __ceph_do_pending_vmtruncate(struct inode *inode)
 {
@@ -1243,7 +1253,9 @@ const struct inode_operations ceph_symlink_iops = {
 
 
 /*
- * generics
+ * Prepare a setattr request.  If we know we have the file open (and
+ * thus hold at lease a PIN capability), generate the request without
+ * a path name.
  */
 static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
                                                struct dentry *dentry,
@@ -1287,6 +1299,7 @@ static int ceph_setattr_chown(struct dentry *dentry, struct iattr *attr)
        if (IS_ERR(req))
                return PTR_ERR(req);
        reqh = req->r_request->front.iov_base;
+       /* FIXME */
        if (ia_valid & ATTR_UID)
                reqh->args.chown.uid = cpu_to_le32(attr->ia_uid);
        else
@@ -1299,10 +1312,7 @@ static int ceph_setattr_chown(struct dentry *dentry, struct iattr *attr)
        err = ceph_mdsc_do_request(mdsc, req);
        ceph_mdsc_put_request(req);
        dout(10, "chown result %d\n", err);
-       if (err)
-               return err;
-
-       return 0;
+       return err;
 }
 
 static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr)
@@ -1323,10 +1333,7 @@ static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr)
        err = ceph_mdsc_do_request(mdsc, req);
        ceph_mdsc_put_request(req);
        dout(10, "chmod result %d\n", err);
-       if (err)
-               return err;
-
-       return 0;
+       return err;
 }
 
 static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
@@ -1347,7 +1354,8 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
                if (ia_valid & ATTR_ATIME)
                        inode->i_atime = attr->ia_atime;
                if (ia_valid & ATTR_MTIME)
-                       inode->i_ctime = inode->i_mtime = attr->ia_mtime;
+                       inode->i_mtime = attr->ia_mtime;
+               inode->i_ctime = CURRENT_TIME;
                return 0;
        }
 
@@ -1361,7 +1369,8 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
                if (ia_valid & ATTR_ATIME)
                        inode->i_atime = attr->ia_atime;
                if (ia_valid & ATTR_MTIME)
-                       inode->i_ctime = inode->i_mtime = attr->ia_mtime;
+                       inode->i_mtime = attr->ia_mtime;
+               inode->i_ctime = CURRENT_TIME;
                return 0;
        }
 
@@ -1369,7 +1378,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
        if (ceph_inode_lease_valid(inode, CEPH_LOCK_ICONTENT) &&
            !(((ia_valid & ATTR_ATIME) &&
               !timespec_equal(&inode->i_atime, &attr->ia_atime)) ||
-                       ((ia_valid & ATTR_MTIME) &&
+             ((ia_valid & ATTR_MTIME) &&
               !timespec_equal(&inode->i_mtime, &attr->ia_mtime)))) {
                dout(10, "lease indicates utimes is a no-op\n");
                return 0;
@@ -1392,10 +1401,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
        err = ceph_mdsc_do_request(mdsc, req);
        ceph_mdsc_put_request(req);
        dout(10, "utime result %d\n", err);
-       if (err)
-               return err;
-
-       return 0;
+       return err;
 }
 
 static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr)
@@ -1414,10 +1420,14 @@ static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr)
        if (ceph_caps_issued(ci) & CEPH_CAP_EXCL &&
            attr->ia_size > inode->i_size) {
                dout(10, "holding EXCL, doing truncate (fwd) locally\n");
-               inode->i_ctime = attr->ia_ctime;
-               err = apply_truncate(inode, attr->ia_size);
+               err = vmtruncate(inode, attr->ia_size);
                if (err)
                        return err;
+               spin_lock(&inode->i_lock);
+               inode->i_size = attr->ia_size;
+               inode->i_ctime = attr->ia_ctime;
+               ci->i_reported_size = attr->ia_size;
+               spin_unlock(&inode->i_lock);
                return 0;
        }
        if (ceph_inode_lease_valid(inode, CEPH_LOCK_ICONTENT) &&
@@ -1435,10 +1445,7 @@ static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr)
        ceph_mdsc_put_request(req);
        dout(10, "truncate result %d\n", err);
        __ceph_do_pending_vmtruncate(inode);
-       if (err)
-               return err;
-
-       return 0;
+       return err;
 }
 
 
@@ -1485,37 +1492,21 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (ia_valid & ATTR_FILE)
                dout(10, "setattr: %p ATTR_FILE ... hrm!\n", inode);
 
-       /* chown */
-       if (ia_valid & (ATTR_UID|ATTR_GID)) {
+       if (ia_valid & (ATTR_UID|ATTR_GID))
                err = ceph_setattr_chown(dentry, attr);
-               if (err)
-                       return err;
-       }
-
-       /* chmod? */
-       if (ia_valid & ATTR_MODE) {
+       if (ia_valid & ATTR_MODE)
                err = ceph_setattr_chmod(dentry, attr);
-               if (err)
-                       return err;
-       }
-
-       /* utimes */
-       if (ia_valid & (ATTR_ATIME|ATTR_MTIME)) {
+       if (ia_valid & (ATTR_ATIME|ATTR_MTIME))
                err = ceph_setattr_time(dentry, attr);
-               if (err)
-                       return err;
-       }
-
-       /* truncate? */
-       if (ia_valid & ATTR_SIZE) {
+       if (ia_valid & ATTR_SIZE)
                err = ceph_setattr_size(dentry, attr);
-               if (err)
-                       return err;
-       }
-
-       return 0;
+       return err;
 }
 
+/*
+ * Verify that we have a lease on the given mask.  If not,
+ * do a getattr against an mds.
+ */
 int ceph_do_getattr(struct dentry *dentry, int mask)
 {
        int want_inode = 0;
@@ -1527,9 +1518,8 @@ int ceph_do_getattr(struct dentry *dentry, int mask)
                return 0;
        }
 
-       dout(30, "getattr dentry %p inode %p\n", dentry,
-            dentry->d_inode);
-
+       dout(30, "getattr dentry %p inode %p mask %d\n", dentry,
+            dentry->d_inode, mask);
        if (ceph_inode_lease_valid(dentry->d_inode, mask))
                return 0;
 
@@ -1542,7 +1532,7 @@ int ceph_do_getattr(struct dentry *dentry, int mask)
                if (ceph_get_cap_mds(dentry->d_inode) >= 0)
                        want_inode = 1;
                else
-                       derr(10, "WARNING: getattr on unhashed cap-less"
+                       derr(0, "WARNING: getattr on unhashed cap-less"
                             " dentry %p %.*s\n", dentry,
                             dentry->d_name.len, dentry->d_name.name);
        }
@@ -1557,6 +1547,10 @@ int ceph_do_getattr(struct dentry *dentry, int mask)
        return 0;
 }
 
+/*
+ * Get all attributes.  Hopefully somedata we'll have a statlite()
+ * and can limit the fields we require to be accurate.
+ */
 int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
                 struct kstat *stat)
 {
@@ -1575,62 +1569,76 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
        return err;
 }
 
+/*
+ * (virtual) xattrs
+ *
+ * These define virtual xattrs exposing the recursive directory statistics.
+ */
 struct _ceph_vir_xattr_cb {
        char *name;
-       size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, size_t size);
+       size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
+                             size_t size);
 };
 
-static size_t _ceph_vir_xattrcb_entries(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_entries(struct ceph_inode_info *ci, char *val,
+                                       size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
 }
 
-static size_t _ceph_vir_xattrcb_files(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_files(struct ceph_inode_info *ci, char *val,
+                                     size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_files);
 }
 
-static size_t _ceph_vir_xattrcb_subdirs(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_subdirs(struct ceph_inode_info *ci, char *val,
+                                       size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_subdirs);
 }
 
-static size_t _ceph_vir_xattrcb_rentries(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_rentries(struct ceph_inode_info *ci, char *val,
+                                        size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
 }
 
-static size_t _ceph_vir_xattrcb_rfiles(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_rfiles(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rfiles);
 }
 
-static size_t _ceph_vir_xattrcb_rsubdirs(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_rsubdirs(struct ceph_inode_info *ci, char *val,
+                                        size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_subdirs);
 }
 
-static size_t _ceph_vir_xattrcb_rbytes(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_rbytes(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
 {
        return snprintf(val, size, "%lld", ci->i_rbytes);
 }
 
-static size_t _ceph_vir_xattrcb_rctime(struct ceph_inode_info *ci, char *val, size_t size)
+static size_t _ceph_vir_xattrcb_rctime(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
 {
        return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec,
                                 (long)ci->i_rctime.tv_nsec);
 }
 
 static struct _ceph_vir_xattr_cb _ceph_vir_xattr_recs[] = {
-                               { "user.ceph.dir.entries", _ceph_vir_xattrcb_entries},
-                               { "user.ceph.dir.files", _ceph_vir_xattrcb_files},
-                               { "user.ceph.dir.subdirs", _ceph_vir_xattrcb_subdirs},
-                               { "user.ceph.dir.rentries", _ceph_vir_xattrcb_rentries},
-                               { "user.ceph.dir.rfiles", _ceph_vir_xattrcb_rfiles},
-                               { "user.ceph.dir.rsubdirs", _ceph_vir_xattrcb_rsubdirs},
-                               { "user.ceph.dir.rbytes", _ceph_vir_xattrcb_rbytes},
-                               { "user.ceph.dir.rctime", _ceph_vir_xattrcb_rctime},
-                               { NULL, NULL }
+       { "user.ceph.dir.entries", _ceph_vir_xattrcb_entries},
+       { "user.ceph.dir.files", _ceph_vir_xattrcb_files},
+       { "user.ceph.dir.subdirs", _ceph_vir_xattrcb_subdirs},
+       { "user.ceph.dir.rentries", _ceph_vir_xattrcb_rentries},
+       { "user.ceph.dir.rfiles", _ceph_vir_xattrcb_rfiles},
+       { "user.ceph.dir.rsubdirs", _ceph_vir_xattrcb_rsubdirs},
+       { "user.ceph.dir.rbytes", _ceph_vir_xattrcb_rbytes},
+       { "user.ceph.dir.rctime", _ceph_vir_xattrcb_rctime},
+       { NULL, NULL }
 };
 
 static struct _ceph_vir_xattr_cb *_ceph_match_vir_xattr(const char *name)
@@ -1659,26 +1667,24 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
 
        /* let's see if a virtual xattr was requested */
        vir_xattr = _ceph_match_vir_xattr(name);
-       if (vir_xattr) {
-               err = (vir_xattr->getxattr_cb)(ci, value, size);
-               return err;
-       }
+       if (vir_xattr)
+               return (vir_xattr->getxattr_cb)(ci, value, size);
 
+       /* get xattrs from mds (if we don't already have them) */
        err = ceph_do_getattr(dentry, CEPH_STAT_MASK_XATTR);
        if (err)
                return err;
 
        spin_lock(&inode->i_lock);
 
-       err = -ENODATA;
+       err = -ENODATA;  /* == ENOATTR */
        if (!ci->i_xattr_len)
                goto out;
 
-       /* find attr */
+       /* find attr name */
        p = ci->i_xattr_data;
        end = p + ci->i_xattr_len;
        ceph_decode_32_safe(&p, end, numattr, bad);
-       err = -ENODATA;  /* == ENOATTR */
        while (numattr--) {
                u32 len;
                int match;
@@ -1719,6 +1725,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
        void *p, *end;
        int err;
        u32 len;
+       int i;
 
        err = ceph_do_getattr(dentry, CEPH_STAT_MASK_XATTR);
        if (err)
@@ -1726,7 +1733,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 
        spin_lock(&inode->i_lock);
 
-       /* measure len */
+       /* measure len of names */
        if (ci->i_xattr_len) {
                p = ci->i_xattr_data;
                end = p + ci->i_xattr_len;
@@ -1742,13 +1749,10 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
        } else
                namelen = 0;
 
-       if ((inode->i_mode & S_IFMT) == S_IFDIR) {
-               int i;
-
-               for (i=0;  _ceph_vir_xattr_recs[i].name; i++) {
+       /* include virtual dir xattrs */
+       if ((inode->i_mode & S_IFMT) == S_IFDIR)
+               for (i=0;  _ceph_vir_xattr_recs[i].name; i++)
                        namelen += strlen(_ceph_vir_xattr_recs[i].name) + 1;
-               }
-       }
 
        err = -ERANGE;
        if (size && namelen > size)
@@ -1773,16 +1777,12 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
        } else
                names[0] = 0;
 
-       /* now do all the virtual xattr stuff */
-
-       if ((inode->i_mode & S_IFMT) == S_IFDIR) {
-               int i;
-
+       /* virtual xattr names, too */
+       if ((inode->i_mode & S_IFMT) == S_IFDIR)
                for (i=0;  _ceph_vir_xattr_recs[i].name; i++) {
                        len = sprintf(names, _ceph_vir_xattr_recs[i].name);
                        names += len + 1;
                }
-       }
 
 out:
        spin_unlock(&inode->i_lock);
index 962fe55502f7df75e440c59b22ce9fe6e614c520..4d1097e9e048401d982be7ab25aa089326120ac1 100644 (file)
@@ -56,7 +56,6 @@ enum {
        CEPH_MDS_SESSION_OPENING = 2,
        CEPH_MDS_SESSION_OPEN = 3,
        CEPH_MDS_SESSION_CLOSING = 4,
-//     CEPH_MDS_SESSION_RESUMING = 5,
        CEPH_MDS_SESSION_RECONNECTING = 6
 };
 struct ceph_mds_session {
index 7e25140df7bdcea7f80ad865397eda1b5d50d5fd..7b930e9f0d0bb984be578a45aeb9b691f6c1d0af 100644 (file)
@@ -254,7 +254,8 @@ struct ceph_inode_info {
        struct rb_root i_fragtree;
        struct mutex i_fragtree_mutex;
 
-       /* (still encoded) xattr blob */
+       /* (still encoded) xattr blob. we avoid the overhead of parsing
+        * this until someone actually called getxattr, etc. */
        int i_xattr_len;
        char *i_xattr_data;
 
@@ -605,7 +606,7 @@ extern const char *ceph_msg_type_name(int type);
 
 /* inode.c */
 extern const struct inode_operations ceph_file_iops;
-extern const struct inode_operations ceph_special_iops;
+
 extern struct inode *ceph_get_inode(struct super_block *sb,
                                    struct ceph_vino vino);
 extern struct inode *ceph_get_snapdir(struct inode *parent);