From fc1ba61adbb751eb11c91ca50ccc42d046930810 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 6 Jan 2009 15:01:26 -0800 Subject: [PATCH] kclient: handle safe reply, sync dir waits for safe reply --- src/kernel/dir.c | 16 ++++----- src/kernel/export.c | 2 +- src/kernel/file.c | 35 ++++++++++++++++--- src/kernel/inode.c | 21 ++++++++---- src/kernel/ioctl.c | 3 +- src/kernel/mds_client.c | 74 +++++++++++++++++++++++++++++++++++++---- src/kernel/mds_client.h | 6 ++++ src/kernel/super.c | 2 +- src/kernel/super.h | 3 ++ 9 files changed, 134 insertions(+), 28 deletions(-) diff --git a/src/kernel/dir.c b/src/kernel/dir.c index c9d029ca7f529..960e96ed47412 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -173,7 +173,7 @@ nextfrag: req->r_direct_is_hash = true; rhead = req->r_request->front.iov_base; rhead->args.readdir.frag = cpu_to_le32(frag); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, NULL, req); if (err < 0) { ceph_mdsc_put_request(req); return err; @@ -376,7 +376,7 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry, rhead->args.stat.mask = cpu_to_le32(mask); req->r_last_dentry = dget(dentry); /* try to use this in fill_trace */ req->r_locked_dir = dentry->d_parent->d_inode; /* by the VFS */ - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, NULL, req); dentry = ceph_finish_lookup(req, dentry, err); ceph_mdsc_put_request(req); /* will dput(dentry) */ dout(20, "do_lookup result=%p\n", dentry); @@ -438,7 +438,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, rhead->args.mknod.mode = cpu_to_le32(mode); rhead->args.mknod.rdev = cpu_to_le32(rdev); ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && req->r_reply_info.trace_numd == 0) { /* * no trace. do lookup, in case we are called from create @@ -512,7 +512,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, } req->r_locked_dir = dir; ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, dir, req); ceph_mdsc_put_request(req); if (err) d_drop(dentry); @@ -568,7 +568,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode) rhead->args.mkdir.mode = cpu_to_le32(mode); ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, dir, req); ceph_mdsc_put_request(req); if (err < 0) d_drop(dentry); @@ -614,7 +614,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_locked_dir = old_dentry->d_inode; ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, dir, req); if (err) { d_drop(dentry); } else if (req->r_reply_info.trace_numd == 0) { @@ -681,7 +681,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN); ceph_release_caps(inode, CEPH_CAP_LINK_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, dir, req); ceph_mdsc_put_request(req); return err; @@ -730,7 +730,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, CEPH_LOCK_DN); if (new_dentry->d_inode) ceph_release_caps(new_dentry->d_inode, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, old_dir, req); if (!err && req->r_reply_info.trace_numd == 0) { /* * no trace diff --git a/src/kernel/export.c b/src/kernel/export.c index fe8b0bd507f59..956d2bc1c59bd 100644 --- a/src/kernel/export.c +++ b/src/kernel/export.c @@ -88,7 +88,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, NULL, USE_ANY_MDS); if (IS_ERR(req)) return ERR_PTR(PTR_ERR(req)); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, NULL, req); ceph_mdsc_put_request(req); inode = ceph_find_inode(sb, vino); diff --git a/src/kernel/file.c b/src/kernel/file.c index f1e809ddfc7e1..cb701878142f3 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -94,6 +94,7 @@ int ceph_open(struct inode *inode, struct file *file) struct dentry *dentry; struct ceph_mds_request *req; struct ceph_file_info *cf = file->private_data; + struct inode *parent_inode = file->f_dentry->d_parent->d_inode; int err; int flags, fmode, wantcaps; @@ -136,7 +137,7 @@ int ceph_open(struct inode *inode, struct file *file) err = PTR_ERR(req); goto out; } - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); if (!err) err = ceph_init_file(inode, file, req->r_fmode); ceph_mdsc_put_request(req); @@ -166,6 +167,7 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; struct file *file = nd->intent.open.file; + struct inode *parent_inode = file->f_dentry->d_parent->d_inode; struct ceph_mds_request *req; int err; int flags = nd->intent.open.flags - 1; /* silly vfs! */ @@ -181,7 +183,7 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); req->r_last_dentry = dget(dentry); /* use this dentry in fill_trace */ req->r_locked_dir = dir; /* caller holds dir->i_mutex */ - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); dentry = ceph_finish_lookup(req, dentry, err); if (!err) err = ceph_init_file(req->r_last_inode, file, req->r_fmode); @@ -419,13 +421,38 @@ out: static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) { struct inode *inode = dentry->d_inode; - int ret; + int ret, err; + struct ceph_mds_request *req; + u64 nexttid = 0; dout(10, "fsync on inode %p\n", inode); ret = write_inode_now(inode, 1); if (ret < 0) return ret; + ret = 0; + if ((inode->i_mode & S_IFMT) == S_IFDIR) { + dout(0, "sync on directory\n"); + + do { + req = ceph_mdsc_get_listener_req(inode, nexttid); + + if (!req) + break; + nexttid = req->r_tid + 1; + + if (req->r_timeout) { + err = wait_for_completion_timeout(&req->r_safe_completion, + req->r_timeout); + if (err == 0) + ret = -EIO; /* timed out */ + } else { + wait_for_completion(&req->r_safe_completion); + } + ceph_mdsc_put_request(req); + } while (req); + } + /* * HMM: should we also ensure that caps are flushed to mds? * It's not strictly necessary, since with the data on the @@ -433,7 +460,7 @@ static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) * Not mtime, though. */ - return 0; + return ret; } const struct file_operations ceph_file_fops = { diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 55b6c692d2daa..68752261e14ca 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -293,6 +293,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_vmtruncate_to = -1; INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); + INIT_RADIX_TREE(&ci->i_listener_tree, GFP_NOFS); + spin_lock_init(&ci->i_listener_lock); + return &ci->vfs_inode; } @@ -1388,6 +1391,7 @@ static int ceph_setattr_chown(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_client *client = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; const unsigned int ia_valid = attr->ia_valid; @@ -1424,7 +1428,7 @@ static int ceph_setattr_chown(struct dentry *dentry, struct iattr *attr) } reqh->args.chown.mask = cpu_to_le32(mask); ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); dout(10, "chown result %d\n", err); return err; @@ -1434,6 +1438,7 @@ static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_client *client = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; struct ceph_mds_request *req; @@ -1457,7 +1462,7 @@ static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr) reqh = req->r_request->front.iov_base; reqh->args.chmod.mode = cpu_to_le32(attr->ia_mode); ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); dout(10, "chmod result %d\n", err); return err; @@ -1466,6 +1471,7 @@ static int ceph_setattr_chmod(struct dentry *dentry, struct iattr *attr) static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *client = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; @@ -1526,7 +1532,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr) reqh->args.utime.mask |= cpu_to_le32(CEPH_UTIME_MTIME); ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); dout(10, "utime result %d\n", err); return err; @@ -1535,6 +1541,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr) static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr) { struct inode *inode = dentry->d_inode; + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *client = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; @@ -1569,7 +1576,7 @@ static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr) reqh = req->r_request->front.iov_base; reqh->args.truncate.length = cpu_to_le64(attr->ia_size); ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); dout(10, "truncate result %d\n", err); __ceph_do_pending_vmtruncate(inode); @@ -1933,6 +1940,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name, { struct ceph_client *client = ceph_client(dentry->d_sb); struct inode *inode = dentry->d_inode; + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_mds_client *mdsc = &client->mdsc; struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; @@ -1993,7 +2001,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name, req->r_request->hdr.data_off = cpu_to_le32(0); ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); out: @@ -2010,6 +2018,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name) struct ceph_client *client = ceph_client(dentry->d_sb); struct ceph_mds_client *mdsc = &client->mdsc; struct inode *inode = dentry->d_inode; + struct inode *parent_inode = dentry->d_parent->d_inode; struct ceph_mds_request *req; char *path; int pathlen; @@ -2037,7 +2046,7 @@ int ceph_removexattr(struct dentry *dentry, const char *name) return PTR_ERR(req); ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); return err; } diff --git a/src/kernel/ioctl.c b/src/kernel/ioctl.c index 4b253a8c5417e..dda200ebb1783 100644 --- a/src/kernel/ioctl.c +++ b/src/kernel/ioctl.c @@ -28,6 +28,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) static long ceph_ioctl_set_layout(struct file *file, void __user *arg) { struct inode *inode = file->f_dentry->d_inode; + struct inode *parent_inode = file->f_dentry->d_parent->d_inode; struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc; char *path; int pathlen; @@ -52,7 +53,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) reqh = req->r_request->front.iov_base; reqh->args.setlayout.layout = layout; ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE); - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); return err; } diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 82bace27376f3..aa3fb5c763e97 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -438,6 +438,7 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg) req->r_fmode = -1; atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */ init_completion(&req->r_completion); + init_completion(&req->r_safe_completion); return req; } @@ -447,24 +448,59 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg) * Called under mdsc->mutex. */ static void __register_request(struct ceph_mds_client *mdsc, + struct inode *listener, struct ceph_mds_request *req) { struct ceph_mds_request_head *head = req->r_request->front.iov_base; + struct ceph_inode_info *ci; req->r_tid = ++mdsc->last_tid; head->tid = cpu_to_le64(req->r_tid); dout(30, "__register_request %p tid %lld\n", req, req->r_tid); get_request(req); radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req); + req->r_listener = listener; + if (listener) { + ci = ceph_inode(listener); + spin_lock(&ci->i_listener_lock); + radix_tree_insert(&ci->i_listener_tree, req->r_tid, (void *)req); + spin_unlock(&ci->i_listener_lock); + } } static void __unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { + struct ceph_inode_info *ci; dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid); radix_tree_delete(&mdsc->request_tree, req->r_tid); + if (req->r_listener) { + ci = ceph_inode(req->r_listener); + spin_lock(&ci->i_listener_lock); + radix_tree_delete(&ci->i_listener_tree, req->r_tid); + spin_unlock(&ci->i_listener_lock); + } ceph_mdsc_put_request(req); } +struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode, + u64 tid) +{ + struct ceph_mds_request *req = NULL; + struct ceph_inode_info *ci = ceph_inode(inode); + int got; + + spin_lock(&ci->i_listener_lock); + got = radix_tree_gang_lookup(&ci->i_listener_tree, + (void **)&req, 0, 1); + + if (got >= 0) { + get_request(req); + } + spin_unlock(&ci->i_listener_lock); + + return req; +} + static bool __have_session(struct ceph_mds_client *mdsc, int mds) { if (mds >= mdsc->max_sessions) @@ -472,7 +508,6 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds) return mdsc->sessions[mds]; } - /* * Choose mds to send request to next. If there is a hint set in * the request (e.g., due to a prior forward hint from the mds), use @@ -1107,17 +1142,19 @@ static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) * session setup, forwarding, retry details. */ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, + struct inode *listener, struct ceph_mds_request *req) { struct ceph_mds_session *session = NULL; struct ceph_mds_request_head *rhead; int err; int mds = -1; + int safe = 0; dout(30, "do_request on %p\n", req); mutex_lock(&mdsc->mutex); - __register_request(mdsc, req); + __register_request(mdsc, listener, req); retry: if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { @@ -1207,12 +1244,20 @@ retry: if (!err) /* all is well, reply has been parsed. */ err = le32_to_cpu(req->r_reply_info.head->result); + if (req) + safe = req->r_reply_info.head->safe; finish: - __unregister_request(mdsc, req); + if (safe) { + complete(&req->r_safe_completion); + __unregister_request(mdsc, req); + } + mutex_unlock(&mdsc->mutex); - ceph_msg_put(req->r_request); - req->r_request = NULL; + if (safe) { + ceph_msg_put(req->r_request); + req->r_request = NULL; + } dout(30, "do_request %p done, result %d\n", req, err); return err; @@ -1254,8 +1299,22 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) dout(10, "handle_reply %p expected_cap=%p\n", req, req->r_expected_cap); mds = le32_to_cpu(msg->hdr.src.name.num); if (req->r_got_reply) { - derr(1, "got reply on %llu, mds%d got more than one reply\n", - tid, mds); + if (req->r_reply_info.head->safe) { + /* + We already handled the unsafe response, now do the cleanup. + Shouldn't we check the safe response to see if it matches + the unsafe one? + */ + complete(&req->r_safe_completion); + __unregister_request(mdsc, req); + dout(10, "got another reply %llu, mds%d\n", + tid, mds); + ceph_msg_put(req->r_request); + req->r_request = NULL; + } else { + dout(0, "got another _unsafe_ reply %llu, mds%d\n", + tid, mds); + } mutex_unlock(&mdsc->mutex); ceph_mdsc_put_request(req); return; @@ -1324,6 +1383,7 @@ done: /* kick calling process */ complete(&req->r_completion); ceph_mdsc_put_request(req); + return; } diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index 3a2e0f58f8755..f54b6873e325e 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -152,6 +152,8 @@ struct ceph_mds_request { u32 r_direct_hash; /* choose dir frag based on this dentry hash */ bool r_direct_is_hash; /* true if r_direct_hash is valid */ + struct inode *r_listener; + /* references to the trailing dentry and inode from parsing the * mds response. also used to feed a VFS-provided dentry into * the reply handler */ @@ -170,6 +172,7 @@ struct ceph_mds_request { atomic_t r_ref; struct completion r_completion; + struct completion r_safe_completion; int r_got_reply; }; @@ -247,6 +250,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, u64 ino2, const char *path2, struct dentry *ref, int want_auth); extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, + struct inode *listener, struct ceph_mds_request *req); extern void ceph_mdsc_put_request(struct ceph_mds_request *req); @@ -256,5 +260,7 @@ extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds); extern void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); +extern struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode, + u64 tid); #endif diff --git a/src/kernel/super.c b/src/kernel/super.c index 489c1eea05b3f..7e7e09828e0c7 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -727,7 +727,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client, req->r_timeout = client->mount_args.mount_timeout * HZ; reqhead = req->r_request->front.iov_base; reqhead->args.stat.mask = CEPH_STAT_CAP_INODE; - err = ceph_mdsc_do_request(mdsc, req); + err = ceph_mdsc_do_request(mdsc, NULL, req); if (err == 0) { root = req->r_last_dentry; dget(root); diff --git a/src/kernel/super.h b/src/kernel/super.h index b4a8ca505d068..43fa69253c81a 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -274,6 +274,9 @@ struct ceph_inode_info { loff_t i_vmtruncate_to; /* delayed truncate work */ struct work_struct i_vmtruncate_work; + struct radix_tree_root i_listener_tree; /* requests we pend on */ + spinlock_t i_listener_lock; + struct inode vfs_inode; /* at end */ }; -- 2.39.5