From: Sage Weil Date: Mon, 30 Jun 2008 18:46:54 +0000 (-0700) Subject: kclient: fill_trace takes i_mutex, avoids dentry if no lease X-Git-Tag: v0.3~39^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b36601b52a27b5ba3343f8061d1eeee46f59a638;p=ceph.git kclient: fill_trace takes i_mutex, avoids dentry if no lease --- diff --git a/src/kernel/dir.c b/src/kernel/dir.c index 30af6571386f4..90779edbe147b 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -285,7 +285,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, * @on_inode indicates that we should stat the ino, and not a path * built from @dentry. */ -struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry, +struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry, int mask, int on_inode, int locked_dir) { struct ceph_client *client = ceph_sb_to_client(sb); @@ -375,10 +375,11 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, d_drop(dentry); return PTR_ERR(req); } - ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); + req->r_locked_dir = dir; rhead = req->r_request->front.iov_base; rhead->args.mknod.mode = cpu_to_le32(mode); rhead->args.mknod.rdev = cpu_to_le32(rdev); + ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); err = ceph_mdsc_do_request(mdsc, req); if (!err && req->r_reply_info.trace_numd == 0) { /* no trace. do lookup, in case we are called from create. */ @@ -442,6 +443,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, d_drop(dentry); return PTR_ERR(req); } + req->r_locked_dir = dir; ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); err = ceph_mdsc_do_request(mdsc, req); ceph_mdsc_put_request(req); @@ -476,9 +478,14 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode) d_drop(dentry); return PTR_ERR(req); } - ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); + + dget(dentry); /* to match put_request below */ + req->r_last_dentry = dentry; /* use this dentry in fill_trace */ + req->r_locked_dir = dir; rhead = req->r_request->front.iov_base; rhead->args.mkdir.mode = cpu_to_le32(mode); + + ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); err = ceph_mdsc_do_request(mdsc, req); ceph_mdsc_put_request(req); if (err < 0) { @@ -524,6 +531,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, dget(dentry); /* to match put_request below */ req->r_last_dentry = dentry; /* use this dentry in fill_trace */ + req->r_locked_dir = old_dentry->d_inode; ceph_mdsc_lease_release(mdsc, dir, 0, CEPH_LOCK_ICONTENT); err = ceph_mdsc_do_request(mdsc, req); @@ -566,6 +574,9 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) kfree(path); if (IS_ERR(req)) return PTR_ERR(req); + + req->r_locked_dir = dir; + ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN|CEPH_LOCK_ICONTENT); ceph_mdsc_lease_release(mdsc, inode, 0, CEPH_LOCK_ILINK); @@ -688,8 +699,8 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, cf->dir_info = kmalloc(1024, GFP_NOFS); if (!cf->dir_info) return -ENOMEM; - cf->dir_info_len = - sprintf(cf->dir_info, + cf->dir_info_len = + sprintf(cf->dir_info, "entries: %20lld\n" " files: %20lld\n" " subdirs: %20lld\n" diff --git a/src/kernel/inode.c b/src/kernel/inode.c index ac5cfde3381b0..23c21d5d89baa 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -213,7 +213,7 @@ static int ceph_fill_dirfrag(struct inode *inode, frag->dist[i] = le32_to_cpu(dirinfo->dist[i]); dout(20, "fill_dirfrag %llx frag %x referral mds %d ndist=%d\n", ceph_ino(inode), frag->frag, frag->mds, frag->ndist); - + out: spin_unlock(&inode->i_lock); return err; @@ -432,10 +432,10 @@ no_change: /* * caller must hold session s_mutex. */ -void ceph_update_inode_lease(struct inode *inode, - struct ceph_mds_reply_lease *lease, - struct ceph_mds_session *session, - unsigned long from_time) +static int update_inode_lease(struct inode *inode, + struct ceph_mds_reply_lease *lease, + struct ceph_mds_session *session, + unsigned long from_time) { struct ceph_inode_info *ci = ceph_inode(inode); int is_new = 0; @@ -447,7 +447,7 @@ void ceph_update_inode_lease(struct inode *inode, inode, mask, duration, ttl); if (mask == 0) - return; + return 0; spin_lock(&inode->i_lock); /* @@ -465,12 +465,15 @@ void ceph_update_inode_lease(struct inode *inode, is_new = 1; } list_move_tail(&ci->i_lease_item, &session->s_inode_leases); - } + } else + mask = 0; spin_unlock(&inode->i_lock); if (is_new) { dout(10, "lease iget on %p\n", inode); igrab(inode); } + + return mask; } /* @@ -519,10 +522,10 @@ int ceph_inode_lease_valid(struct inode *inode, int mask) /* * caller should hold session s_mutex. */ -void ceph_update_dentry_lease(struct dentry *dentry, - struct ceph_mds_reply_lease *lease, - struct ceph_mds_session *session, - unsigned long from_time) +static void update_dentry_lease(struct dentry *dentry, + struct ceph_mds_reply_lease *lease, + struct ceph_mds_session *session, + unsigned long from_time) { struct ceph_dentry_info *di; int is_new = 0; @@ -617,6 +620,36 @@ int ceph_dentry_lease_valid(struct dentry *dentry) } +/* + * splice a dentry to an inode. + * caller must hold directory i_mutex for this to be safe. + */ +static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) +{ + struct dentry *realdn; + + /* dn must be unhashed */ + if (!d_unhashed(dn)) + d_drop(dn); + realdn = d_materialise_unique(dn, in); + if (realdn && !IS_ERR(realdn)) { + dout(10, "dn %p (%d) spliced with %p (%d) " + "inode %p ino %llx\n", + dn, atomic_read(&dn->d_count), + realdn, atomic_read(&realdn->d_count), + realdn->d_inode, + ceph_ino(realdn->d_inode)); + dput(dn); + dn = realdn; + ceph_init_dentry(dn); + } else + dout(10, "dn %p attached to %p ino %llx\n", + dn, dn->d_inode, ceph_ino(dn->d_inode)); + if (d_unhashed(dn)) + d_rehash(dn); + return dn; +} + /* * assimilate a full trace of inodes and dentries, from the root to * the item relevant for this reply, into our cache. make any dcache @@ -631,14 +664,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, struct ceph_mds_session *session) { struct ceph_mds_reply_info *rinfo = &req->r_reply_info; - int err = 0; + int err = 0, mask; struct qstr dname; - struct dentry *dn = sb->s_root, *realdn; + struct dentry *dn = sb->s_root; struct dentry *parent = NULL; struct inode *in; struct ceph_mds_reply_inode *ininfo; int d = 0; u64 ino; + int have_icontent = 0; if (rinfo->trace_numi == 0) { dout(10, "fill_trace reply has empty trace!\n"); @@ -685,8 +719,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, rinfo->trace_dir[0]:0); if (err < 0) return err; - ceph_update_inode_lease(in, rinfo->trace_ilease[0], session, - req->r_from_time); + mask = update_inode_lease(in, rinfo->trace_ilease[0], + session, req->r_from_time); + //have_icontent = mask & CEPH_LOCK_ICONTENT; if (sb->s_root == NULL) sb->s_root = dn; } @@ -696,19 +731,38 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, dname.name = rinfo->trace_dname[d]; dname.len = rinfo->trace_dname_len[d]; parent = dn; + dn = 0; - dout(10, "fill_trace %d/%d parent %p '%.*s' inode %p\n", - (d+1), rinfo->trace_numd, parent, - (int)dname.len, dname.name, parent->d_inode); + dout(10, "fill_trace %d/%d parent %p inode %p '%.*s'" + " ic %d dmask %d\n", + (d+1), rinfo->trace_numd, parent, parent->d_inode, + (int)dname.len, dname.name, + have_icontent, rinfo->trace_dlease[d]->mask); + + /* do we have a dn lease? */ + if (!have_icontent && + rinfo->trace_dlease[d]->mask == 0) { + dout(0, "fill_trace no icontent|dentry lease\n"); + goto no_dentry_lease; + } + + /* try to take dir i_mutex */ + if (req->r_locked_dir != parent->d_inode && + mutex_trylock(&parent->d_inode->i_mutex) == 0) { + dout(0, "fill_trace FAILED to take %p i_mutex\n", + parent->d_inode); + goto no_dentry_lease; + } + + dout(10, "fill_trace took %p i_mutex\n", parent->d_inode); - /* existing dentry? */ - dn = 0; dname.hash = full_name_hash(dname.name, dname.len); retry_lookup: + /* existing dentry? */ dn = d_lookup(parent, &dname); dout(10, "fill_trace d_lookup of '%.*s' got %p\n", (int)dname.len, dname.name, dn); - + /* use caller provided dentry? for simplicity, * - only if there is no existing dn, and * - only if parent is correct @@ -733,12 +787,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, if (!dn) { derr(0, "d_alloc enomem\n"); err = -ENOMEM; - break; + goto out_dir; } dout(10, "fill_trace d_alloc %p '%.*s'\n", dn, dn->d_name.len, dn->d_name.name); ceph_init_dentry(dn); - } + } BUG_ON(!dn); /* null dentry? */ @@ -755,9 +809,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, if (d_unhashed(dn)) d_rehash(dn); in = 0; - ceph_update_dentry_lease(dn, rinfo->trace_dlease[d], - session, req->r_from_time); - break; + update_dentry_lease(dn, rinfo->trace_dlease[d], + session, req->r_from_time); + goto out_dir; } /* rename? */ @@ -801,35 +855,20 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, d_delete(dn); dn = NULL; in = NULL; - break; - } - /* d_splice_alias wants dn unhashed */ - if (!d_unhashed(dn)) { - dout(20, "d_drop %p\n", dn); - d_drop(dn); + goto out_dir; } - realdn = d_materialise_unique(dn, in); - if (realdn) { - dout(10, "dn %p (%d) spliced with %p (%d) " - "inode %p ino %llx\n", - dn, atomic_read(&dn->d_count), - realdn, atomic_read(&realdn->d_count), - realdn->d_inode, - ceph_ino(realdn->d_inode)); - dput(dn); - dn = realdn; - ceph_init_dentry(dn); - } else - dout(10, "dn %p attached to %p ino %llx\n", - dn, dn->d_inode, ceph_ino(dn->d_inode)); - if (d_unhashed(dn)) - d_rehash(dn); + dn = splice_dentry(dn, in); } - BUG_ON(d_unhashed(dn)); BUG_ON(dn->d_parent != parent); - ceph_update_dentry_lease(dn, rinfo->trace_dlease[d], - session, req->r_from_time); + update_dentry_lease(dn, rinfo->trace_dlease[d], + session, req->r_from_time); + + /* done with dn update */ + if (req->r_locked_dir != parent->d_inode) + mutex_unlock(&parent->d_inode->i_mutex); + + update_inode: err = ceph_fill_inode(in, &rinfo->trace_in[d+1], rinfo->trace_numd <= d ? @@ -841,15 +880,77 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, dn = NULL; break; } - ceph_update_inode_lease(dn->d_inode, rinfo->trace_ilease[d+1], - session, req->r_from_time); + mask = update_inode_lease(dn->d_inode, + rinfo->trace_ilease[d+1], + session, req->r_from_time); + have_icontent = mask & CEPH_LOCK_ICONTENT; + dput(parent); parent = NULL; + continue; + + + out_dir: + /* drop i_mutex */ + if (req->r_locked_dir != parent->d_inode) + mutex_unlock(&parent->d_inode->i_mutex); + break; + + + no_dentry_lease: + /* + * we have no lease or i_mutex for this dir, so do + * not update or hash a dentry. + */ + if (d == rinfo->trace_numd-1 && req->r_last_dentry) { + dn = req->r_last_dentry; + dout(10, "fill_trace using provided dn %p\n", dn); + ceph_init_dentry(dn); + req->r_last_dentry = NULL; + } + + /* null dentry? */ + if (d+1 == rinfo->trace_numi) { + if (dn && dn->d_inode) + d_delete(dn); + break; + } + + /* find existing inode */ + ininfo = rinfo->trace_in[d+1].in; + in = ceph_get_inode(parent->d_sb, le64_to_cpu(ininfo->ino)); + if (IS_ERR(in)) { + derr(30, "ceph_get_inode badness\n"); + err = PTR_ERR(in); + in = NULL; + break; + } + struct dentry *existing = d_find_alias(in); + if (existing) { + if (dn) + dput(dn); + dn = existing; + dout(10, " using existing %p\n", dn); + } else { + if (dn && dn->d_inode == NULL) { + dout(10, " instantiating provided %p\n", dn); + d_instantiate(dn, in); + } else { + if (dn) { + dout(10, " ignoring provided dn %p\n", + dn); + dput(dn); + } + dn = d_alloc_anon(in); + dout(10, " d_alloc_anon new dn %p\n", dn); + } + } + goto update_inode; } if (parent) dput(parent); - dout(10, "fill_trace done, last dn %p in %p\n", dn, in); + dout(10, "fill_trace done err=%d, last dn %p in %p\n", err, dn, in); if (req->r_old_dentry) dput(req->r_old_dentry); if (req->r_last_dentry) @@ -915,7 +1016,6 @@ retry_lookup: if (dn->d_inode) in = dn->d_inode; else { - struct dentry *new; in = ceph_get_inode(parent->d_sb, rinfo->dir_in[i].in->ino); if (in == NULL) { @@ -924,34 +1024,18 @@ retry_lookup: dput(dn); return -ENOMEM; } - if (!d_unhashed(dn)) { - dout(40, "d_drop %p\n", dn); - d_drop(dn); - } - new = d_materialise_unique(dn, in); - if (new) { - dout(10, "dn %p (%d) spliced with %p (%d) " - "inode %p ino %llx\n", - dn, atomic_read(&dn->d_count), - new, atomic_read(&new->d_count), - new->d_inode, - ceph_ino(new->d_inode)); - dput(dn); - dn = new; - ceph_init_dentry(dn); - } + dn = splice_dentry(dn, in); } - BUG_ON(d_unhashed(dn)); if (ceph_fill_inode(in, &rinfo->dir_in[i], 0) < 0) { dout(0, "ceph_fill_inode badness on %p\n", in); dput(dn); continue; } - ceph_update_dentry_lease(dn, rinfo->dir_dlease[i], - req->r_session, req->r_from_time); - ceph_update_inode_lease(in, rinfo->dir_ilease[i], - req->r_session, req->r_from_time); + update_dentry_lease(dn, rinfo->dir_dlease[i], + req->r_session, req->r_from_time); + update_inode_lease(in, rinfo->dir_ilease[i], + req->r_session, req->r_from_time); dput(dn); } dout(10, "readdir_prepopulate done\n"); @@ -1628,7 +1712,7 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, */ int not = want & ~(have & need); int revoking = implemented & ~have; - dout(30, "get_cap_refs have %d but not %d (revoking %d)\n", + dout(30, "get_cap_refs have %d but not %d (revoking %d)\n", have, not, revoking); if ((revoking & not) == 0) { *got = need | (have & want); diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 45c3982146824..2c2bd61de0fd3 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -391,6 +391,7 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg) req = kmalloc(sizeof(*req), GFP_NOFS); req->r_request = msg; req->r_reply = 0; + req->r_err = 0; req->r_direct_dentry = 0; req->r_direct_mode = USE_ANY_MDS; req->r_direct_hash = 0; @@ -1182,7 +1183,7 @@ done: mutex_unlock(&req->r_session->s_mutex); spin_lock(&mdsc->lock); if (err) { - req->r_reply = ERR_PTR(err); + req->r_err = err; } else { req->r_reply = msg; ceph_msg_get(msg); diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index f2daac9d16f12..70ab2509bf651 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -85,6 +85,7 @@ struct ceph_mds_request { struct ceph_msg *r_request; /* original request */ struct ceph_msg *r_reply; struct ceph_mds_reply_info r_reply_info; + int r_err; /* to direct request */ struct dentry *r_direct_dentry; diff --git a/src/kernel/super.h b/src/kernel/super.h index caee8f092e8dd..5dea66ef1498a 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -66,7 +66,11 @@ extern int ceph_debug_inode; atomic_read(&dentry->d_count)-1); \ dput(dentry); \ } while (0) - +#define d_drop(dentry) \ + do { \ + dout(20, "d_drop %p\n", dentry); \ + d_drop(dentry); \ + } while (0) /* * subtract jiffies @@ -443,14 +447,6 @@ extern int ceph_fill_trace(struct super_block *sb, struct ceph_mds_session *session); extern int ceph_readdir_prepopulate(struct ceph_mds_request *req); -extern void ceph_update_inode_lease(struct inode *inode, - struct ceph_mds_reply_lease *lease, - struct ceph_mds_session *seesion, - unsigned long from_time); -extern void ceph_update_dentry_lease(struct dentry *dentry, - struct ceph_mds_reply_lease *lease, - struct ceph_mds_session *session, - unsigned long from_time); extern int ceph_inode_lease_valid(struct inode *inode, int mask); extern int ceph_dentry_lease_valid(struct dentry *dentry);