]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fill_trace takes i_mutex, avoids dentry if no lease
authorSage Weil <sage@newdream.net>
Mon, 30 Jun 2008 18:46:54 +0000 (11:46 -0700)
committerSage Weil <sage@newdream.net>
Mon, 30 Jun 2008 18:46:54 +0000 (11:46 -0700)
src/kernel/dir.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.h

index 30af6571386f4b54634a89c7d879e4ae9557501d..90779edbe147ba8ee6d274398110d94307d6d4c0 100644 (file)
@@ -285,7 +285,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
  * @on_inode indicates that we should stat the ino, and not a path
  * built from @dentry.
  */
-struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry, 
+struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
                              int mask, int on_inode, int locked_dir)
 {
        struct ceph_client *client = ceph_sb_to_client(sb);
@@ -375,10 +375,11 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
                d_drop(dentry);
                return PTR_ERR(req);
        }
-       ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
+       req->r_locked_dir = dir;
        rhead = req->r_request->front.iov_base;
        rhead->args.mknod.mode = cpu_to_le32(mode);
        rhead->args.mknod.rdev = cpu_to_le32(rdev);
+       ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
        err = ceph_mdsc_do_request(mdsc, req);
        if (!err && req->r_reply_info.trace_numd == 0) {
                /* no trace.  do lookup, in case we are called from create. */
@@ -442,6 +443,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
                d_drop(dentry);
                return PTR_ERR(req);
        }
+       req->r_locked_dir = dir;
        ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
        err = ceph_mdsc_do_request(mdsc, req);
        ceph_mdsc_put_request(req);
@@ -476,9 +478,14 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
                d_drop(dentry);
                return PTR_ERR(req);
        }
-       ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
+
+       dget(dentry);                /* to match put_request below */
+       req->r_last_dentry = dentry; /* use this dentry in fill_trace */
+       req->r_locked_dir = dir;
        rhead = req->r_request->front.iov_base;
        rhead->args.mkdir.mode = cpu_to_le32(mode);
+
+       ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
        err = ceph_mdsc_do_request(mdsc, req);
        ceph_mdsc_put_request(req);
        if (err < 0) {
@@ -524,6 +531,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 
        dget(dentry);                /* to match put_request below */
        req->r_last_dentry = dentry; /* use this dentry in fill_trace */
+       req->r_locked_dir = old_dentry->d_inode;
 
        ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT);
        err = ceph_mdsc_do_request(mdsc, req);
@@ -566,6 +574,9 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
        kfree(path);
        if (IS_ERR(req))
                return PTR_ERR(req);
+
+       req->r_locked_dir = dir;
+
        ceph_mdsc_lease_release(mdsc, dir, dentry,
                                CEPH_LOCK_DN|CEPH_LOCK_ICONTENT);
        ceph_mdsc_lease_release(mdsc, inode, 0, CEPH_LOCK_ILINK);
@@ -688,8 +699,8 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
                cf->dir_info = kmalloc(1024, GFP_NOFS);
                if (!cf->dir_info)
                        return -ENOMEM;
-               cf->dir_info_len = 
-                       sprintf(cf->dir_info, 
+               cf->dir_info_len =
+                       sprintf(cf->dir_info,
                                "entries:   %20lld\n"
                                " files:    %20lld\n"
                                " subdirs:  %20lld\n"
index ac5cfde3381b0674bcd9c091b0c399a14a6da155..23c21d5d89baacb82a8835b354d3f8d4edc9a5f2 100644 (file)
@@ -213,7 +213,7 @@ static int ceph_fill_dirfrag(struct inode *inode,
                frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
        dout(20, "fill_dirfrag %llx frag %x referral mds %d ndist=%d\n",
             ceph_ino(inode), frag->frag, frag->mds, frag->ndist);
-       
+
 out:
        spin_unlock(&inode->i_lock);
        return err;
@@ -432,10 +432,10 @@ no_change:
 /*
  * caller must hold session s_mutex.
  */
-void ceph_update_inode_lease(struct inode *inode,
-                            struct ceph_mds_reply_lease *lease,
-                            struct ceph_mds_session *session,
-                            unsigned long from_time)
+static int update_inode_lease(struct inode *inode,
+                             struct ceph_mds_reply_lease *lease,
+                             struct ceph_mds_session *session,
+                             unsigned long from_time)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int is_new = 0;
@@ -447,7 +447,7 @@ void ceph_update_inode_lease(struct inode *inode,
             inode, mask, duration, ttl);
 
        if (mask == 0)
-               return;
+               return 0;
 
        spin_lock(&inode->i_lock);
        /*
@@ -465,12 +465,15 @@ void ceph_update_inode_lease(struct inode *inode,
                        is_new = 1;
                }
                list_move_tail(&ci->i_lease_item, &session->s_inode_leases);
-       }
+       } else
+               mask = 0;
        spin_unlock(&inode->i_lock);
        if (is_new) {
                dout(10, "lease iget on %p\n", inode);
                igrab(inode);
        }
+
+       return mask;
 }
 
 /*
@@ -519,10 +522,10 @@ int ceph_inode_lease_valid(struct inode *inode, int mask)
 /*
  * caller should hold session s_mutex.
  */
-void ceph_update_dentry_lease(struct dentry *dentry,
-                             struct ceph_mds_reply_lease *lease,
-                             struct ceph_mds_session *session,
-                             unsigned long from_time)
+static void update_dentry_lease(struct dentry *dentry,
+                               struct ceph_mds_reply_lease *lease,
+                               struct ceph_mds_session *session,
+                               unsigned long from_time)
 {
        struct ceph_dentry_info *di;
        int is_new = 0;
@@ -617,6 +620,36 @@ int ceph_dentry_lease_valid(struct dentry *dentry)
 }
 
 
+/*
+ * splice a dentry to an inode.
+ * caller must hold directory i_mutex for this to be safe.
+ */
+static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
+{
+       struct dentry *realdn;
+
+       /* dn must be unhashed */
+       if (!d_unhashed(dn))
+               d_drop(dn);
+       realdn = d_materialise_unique(dn, in);
+       if (realdn && !IS_ERR(realdn)) {
+               dout(10, "dn %p (%d) spliced with %p (%d) "
+                    "inode %p ino %llx\n",
+                    dn, atomic_read(&dn->d_count),
+                    realdn, atomic_read(&realdn->d_count),
+                    realdn->d_inode,
+                    ceph_ino(realdn->d_inode));
+               dput(dn);
+               dn = realdn;
+               ceph_init_dentry(dn);
+       } else
+               dout(10, "dn %p attached to %p ino %llx\n",
+                    dn, dn->d_inode, ceph_ino(dn->d_inode));
+       if (d_unhashed(dn))
+               d_rehash(dn);
+       return dn;
+}
+
 /*
  * assimilate a full trace of inodes and dentries, from the root to
  * the item relevant for this reply, into our cache.  make any dcache
@@ -631,14 +664,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                    struct ceph_mds_session *session)
 {
        struct ceph_mds_reply_info *rinfo = &req->r_reply_info;
-       int err = 0;
+       int err = 0, mask;
        struct qstr dname;
-       struct dentry *dn = sb->s_root, *realdn;
+       struct dentry *dn = sb->s_root;
        struct dentry *parent = NULL;
        struct inode *in;
        struct ceph_mds_reply_inode *ininfo;
        int d = 0;
        u64 ino;
+       int have_icontent = 0;
 
        if (rinfo->trace_numi == 0) {
                dout(10, "fill_trace reply has empty trace!\n");
@@ -685,8 +719,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                                      rinfo->trace_dir[0]:0);
                if (err < 0)
                        return err;
-               ceph_update_inode_lease(in, rinfo->trace_ilease[0], session,
-                                       req->r_from_time);
+               mask = update_inode_lease(in, rinfo->trace_ilease[0],
+                                        session, req->r_from_time);
+               //have_icontent = mask & CEPH_LOCK_ICONTENT;
                if (sb->s_root == NULL)
                        sb->s_root = dn;
        }
@@ -696,19 +731,38 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                dname.name = rinfo->trace_dname[d];
                dname.len = rinfo->trace_dname_len[d];
                parent = dn;
+               dn = 0;
 
-               dout(10, "fill_trace %d/%d parent %p '%.*s' inode %p\n",
-                    (d+1), rinfo->trace_numd, parent,
-                    (int)dname.len, dname.name, parent->d_inode);
+               dout(10, "fill_trace %d/%d parent %p inode %p '%.*s'"
+                    " ic %d dmask %d\n",
+                    (d+1), rinfo->trace_numd, parent, parent->d_inode,
+                    (int)dname.len, dname.name,
+                    have_icontent, rinfo->trace_dlease[d]->mask);
+
+               /* do we have a dn lease? */
+               if (!have_icontent &&
+                   rinfo->trace_dlease[d]->mask == 0) {
+                       dout(0, "fill_trace  no icontent|dentry lease\n");
+                       goto no_dentry_lease;
+               }
+
+               /* try to take dir i_mutex */
+               if (req->r_locked_dir != parent->d_inode &&
+                   mutex_trylock(&parent->d_inode->i_mutex) == 0) {
+                       dout(0, "fill_trace  FAILED to take %p i_mutex\n",
+                            parent->d_inode);
+                       goto no_dentry_lease;
+               }
+
+               dout(10, "fill_trace  took %p i_mutex\n", parent->d_inode);
 
-               /* existing dentry? */
-               dn = 0;
                dname.hash = full_name_hash(dname.name, dname.len);
        retry_lookup:
+               /* existing dentry? */
                dn = d_lookup(parent, &dname);
                dout(10, "fill_trace d_lookup of '%.*s' got %p\n",
                     (int)dname.len, dname.name, dn);
-               
+
                /* use caller provided dentry?  for simplicity,
                 *  - only if there is no existing dn, and
                 *  - only if parent is correct
@@ -733,12 +787,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        if (!dn) {
                                derr(0, "d_alloc enomem\n");
                                err = -ENOMEM;
-                               break;
+                               goto out_dir;
                        }
                        dout(10, "fill_trace d_alloc %p '%.*s'\n", dn,
                             dn->d_name.len, dn->d_name.name);
                        ceph_init_dentry(dn);
-               }                               
+               }
                BUG_ON(!dn);
 
                /* null dentry? */
@@ -755,9 +809,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        if (d_unhashed(dn))
                                d_rehash(dn);
                        in = 0;
-                       ceph_update_dentry_lease(dn, rinfo->trace_dlease[d],
-                                                session, req->r_from_time);
-                       break;
+                       update_dentry_lease(dn, rinfo->trace_dlease[d],
+                                           session, req->r_from_time);
+                       goto out_dir;
                }
 
                /* rename? */
@@ -801,35 +855,20 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                                d_delete(dn);
                                dn = NULL;
                                in = NULL;
-                               break;
-                       }
-                       /* d_splice_alias wants dn unhashed */
-                       if (!d_unhashed(dn)) {
-                               dout(20, "d_drop %p\n", dn);
-                               d_drop(dn);
+                               goto out_dir;
                        }
-                       realdn = d_materialise_unique(dn, in);
-                       if (realdn) {
-                               dout(10, "dn %p (%d) spliced with %p (%d) "
-                                    "inode %p ino %llx\n",
-                                    dn, atomic_read(&dn->d_count),
-                                    realdn, atomic_read(&realdn->d_count), 
-                                    realdn->d_inode,
-                                    ceph_ino(realdn->d_inode));
-                               dput(dn);
-                               dn = realdn;
-                               ceph_init_dentry(dn);
-                       } else
-                               dout(10, "dn %p attached to %p ino %llx\n",
-                                    dn, dn->d_inode, ceph_ino(dn->d_inode));
-                       if (d_unhashed(dn))
-                               d_rehash(dn);
+                       dn = splice_dentry(dn, in);
                }
-               BUG_ON(d_unhashed(dn));
                BUG_ON(dn->d_parent != parent);
 
-               ceph_update_dentry_lease(dn, rinfo->trace_dlease[d],
-                                        session, req->r_from_time);
+               update_dentry_lease(dn, rinfo->trace_dlease[d],
+                                   session, req->r_from_time);
+
+               /* done with dn update */
+               if (req->r_locked_dir != parent->d_inode)
+                       mutex_unlock(&parent->d_inode->i_mutex);
+
+       update_inode:
                err = ceph_fill_inode(in,
                                      &rinfo->trace_in[d+1],
                                      rinfo->trace_numd <= d ?
@@ -841,15 +880,77 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        dn = NULL;
                        break;
                }
-               ceph_update_inode_lease(dn->d_inode, rinfo->trace_ilease[d+1],
-                                       session, req->r_from_time);
+               mask = update_inode_lease(dn->d_inode,
+                                        rinfo->trace_ilease[d+1],
+                                        session, req->r_from_time);
+               have_icontent = mask & CEPH_LOCK_ICONTENT;
+
                dput(parent);
                parent = NULL;
+               continue;
+
+
+       out_dir:
+               /* drop i_mutex */
+               if (req->r_locked_dir != parent->d_inode)
+                       mutex_unlock(&parent->d_inode->i_mutex);
+               break;
+
+
+       no_dentry_lease:
+               /*
+                * we have no lease or i_mutex for this dir, so do
+                * not update or hash a dentry.
+                */
+               if (d == rinfo->trace_numd-1 && req->r_last_dentry) {
+                       dn = req->r_last_dentry;
+                       dout(10, "fill_trace using provided dn %p\n", dn);
+                       ceph_init_dentry(dn);
+                       req->r_last_dentry = NULL;
+               }
+
+               /* null dentry? */
+               if (d+1 == rinfo->trace_numi) {
+                       if (dn && dn->d_inode)
+                               d_delete(dn);
+                       break;
+               }
+
+               /* find existing inode */
+               ininfo = rinfo->trace_in[d+1].in;
+               in = ceph_get_inode(parent->d_sb, le64_to_cpu(ininfo->ino));
+               if (IS_ERR(in)) {
+                       derr(30, "ceph_get_inode badness\n");
+                       err = PTR_ERR(in);
+                       in = NULL;
+                       break;
+               }
+               struct dentry *existing = d_find_alias(in);
+               if (existing) {
+                       if (dn)
+                               dput(dn);
+                       dn = existing;
+                       dout(10, " using existing %p\n", dn);
+               } else {
+                       if (dn && dn->d_inode == NULL) {
+                               dout(10, " instantiating provided %p\n", dn);
+                               d_instantiate(dn, in);
+                       } else {
+                               if (dn) {
+                                       dout(10, " ignoring provided dn %p\n",
+                                            dn);
+                                       dput(dn);
+                               }
+                               dn = d_alloc_anon(in);
+                               dout(10, " d_alloc_anon new dn %p\n", dn);
+                       }
+               }
+               goto update_inode;
        }
        if (parent)
                dput(parent);
 
-       dout(10, "fill_trace done, last dn %p in %p\n", dn, in);
+       dout(10, "fill_trace done err=%d, last dn %p in %p\n", err, dn, in);
        if (req->r_old_dentry)
                dput(req->r_old_dentry);
        if (req->r_last_dentry)
@@ -915,7 +1016,6 @@ retry_lookup:
                if (dn->d_inode)
                        in = dn->d_inode;
                else {
-                       struct dentry *new;
                        in = ceph_get_inode(parent->d_sb,
                                            rinfo->dir_in[i].in->ino);
                        if (in == NULL) {
@@ -924,34 +1024,18 @@ retry_lookup:
                                dput(dn);
                                return -ENOMEM;
                        }
-                       if (!d_unhashed(dn)) {
-                               dout(40, "d_drop %p\n", dn);
-                               d_drop(dn);
-                       }
-                       new = d_materialise_unique(dn, in);
-                       if (new) {
-                               dout(10, "dn %p (%d) spliced with %p (%d) "
-                                    "inode %p ino %llx\n",
-                                    dn, atomic_read(&dn->d_count),
-                                    new, atomic_read(&new->d_count), 
-                                    new->d_inode,
-                                    ceph_ino(new->d_inode));
-                               dput(dn);
-                               dn = new;
-                               ceph_init_dentry(dn);
-                       }
+                       dn = splice_dentry(dn, in);
                }
-               BUG_ON(d_unhashed(dn));
 
                if (ceph_fill_inode(in, &rinfo->dir_in[i], 0) < 0) {
                        dout(0, "ceph_fill_inode badness on %p\n", in);
                        dput(dn);
                        continue;
                }
-               ceph_update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                        req->r_session, req->r_from_time);
-               ceph_update_inode_lease(in, rinfo->dir_ilease[i],
-                                       req->r_session, req->r_from_time);
+               update_dentry_lease(dn, rinfo->dir_dlease[i],
+                                   req->r_session, req->r_from_time);
+               update_inode_lease(in, rinfo->dir_ilease[i],
+                                  req->r_session, req->r_from_time);
                dput(dn);
        }
        dout(10, "readdir_prepopulate done\n");
@@ -1628,7 +1712,7 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
                 */
                int not = want & ~(have & need);
                int revoking = implemented & ~have;
-               dout(30, "get_cap_refs have %d but not %d (revoking %d)\n", 
+               dout(30, "get_cap_refs have %d but not %d (revoking %d)\n",
                     have, not, revoking);
                if ((revoking & not) == 0) {
                        *got = need | (have & want);
index 45c398214682413a954ed45e3ec6f13088197cae..2c2bd61de0fd3639f9a78c74801a532600512e0b 100644 (file)
@@ -391,6 +391,7 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg)
        req = kmalloc(sizeof(*req), GFP_NOFS);
        req->r_request = msg;
        req->r_reply = 0;
+       req->r_err = 0;
        req->r_direct_dentry = 0;
        req->r_direct_mode = USE_ANY_MDS;
        req->r_direct_hash = 0;
@@ -1182,7 +1183,7 @@ done:
        mutex_unlock(&req->r_session->s_mutex);
        spin_lock(&mdsc->lock);
        if (err) {
-               req->r_reply = ERR_PTR(err);
+               req->r_err = err;
        } else {
                req->r_reply = msg;
                ceph_msg_get(msg);
index f2daac9d16f12d8ffbe385802d5ece51f2633307..70ab2509bf65166e564f5a670d2e730ccfb4d430 100644 (file)
@@ -85,6 +85,7 @@ struct ceph_mds_request {
        struct ceph_msg  *r_request;  /* original request */
        struct ceph_msg  *r_reply;
        struct ceph_mds_reply_info r_reply_info;
+       int r_err;
 
        /* to direct request */
        struct dentry *r_direct_dentry;
index caee8f092e8ddc1e7e91d6ed79fea7178a30c435..5dea66ef1498ab950eb74efe9e757a5235e0530a 100644 (file)
@@ -66,7 +66,11 @@ extern int ceph_debug_inode;
                     atomic_read(&dentry->d_count)-1); \
                dput(dentry);                          \
        } while (0)
-
+#define d_drop(dentry)                                \
+       do {                                           \
+               dout(20, "d_drop %p\n", dentry);       \
+               d_drop(dentry);                        \
+       } while (0)
 
 /*
  * subtract jiffies
@@ -443,14 +447,6 @@ extern int ceph_fill_trace(struct super_block *sb,
                           struct ceph_mds_session *session);
 extern int ceph_readdir_prepopulate(struct ceph_mds_request *req);
 
-extern void ceph_update_inode_lease(struct inode *inode,
-                                   struct ceph_mds_reply_lease *lease,
-                                   struct ceph_mds_session *seesion,
-                                   unsigned long from_time);
-extern void ceph_update_dentry_lease(struct dentry *dentry,
-                                    struct ceph_mds_reply_lease *lease,
-                                    struct ceph_mds_session *session,
-                                    unsigned long from_time);
 extern int ceph_inode_lease_valid(struct inode *inode, int mask);
 extern int ceph_dentry_lease_valid(struct dentry *dentry);