From: Jeff Layton Date: Wed, 27 Nov 2019 17:06:14 +0000 (-0500) Subject: ceph: attempt to do async create when possible X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=4fc56b484fc5dc805e26081efa2fd9fe63734a1f;p=ceph-client.git ceph: attempt to do async create when possible With the Octopus release, the MDS will hand out directoy create caps. If we have Fxc caps on the directory, and complete directory information or a known negative dentry, then we can return without waiting on the reply, allowing the open() call to return very quickly to userland. We use the normal ceph_fill_inode() routine to fill in the inode, so we have to gin up some reply inode information with what we'd expect a newly-created inode to have. The client assumes that it has a full set of caps on the new inode, and that the MDS will revoke them when there is conflicting access. This functionality is gated on the enable_async_dirops module option, along with async unlinks. Signed-off-by: Jeff Layton --- diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d4d7a277faf1f..89e8b5c48975a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -450,6 +450,141 @@ copy_file_layout(struct inode *dst, struct inode *src) spin_unlock(&cdst->i_ceph_lock); } +static bool get_caps_for_async_create(struct inode *dir, struct dentry *dentry) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + int ret, want, got; + + /* + * We can do an async create if we either have a valid negative dentry + * or the complete contents of the directory. Do a quick check without + * cap refs. + */ + if ((d_in_lookup(dentry) && !__ceph_dir_is_complete(ci)) || + !ceph_file_layout_is_valid(&ci->i_layout)) + return false; + + /* Try to get caps */ + want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE; + ret = ceph_try_get_caps(dir, 0, want, true, &got); + dout("Fx on %p ret=%d got=%d\n", dir, ret, got); + if (ret != 1) + return false; + if (got != want) { + ceph_put_cap_refs(ci, got); + return false; + } + + /* Check again, now that we hold cap refs */ + if ((d_in_lookup(dentry) && !__ceph_dir_is_complete(ci)) || + !ceph_file_layout_is_valid(&ci->i_layout)) { + ceph_put_cap_refs(ci, got); + return false; + } + + return true; +} + +static void ceph_async_create_cb(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + /* If we never sent anything then nothing to clean up */ + if (req->r_err == -ECHILD) + goto out; + + mapping_set_error(req->r_parent->i_mapping, req->r_err); + + if (req->r_target_inode) { + u64 ino = ceph_vino(req->r_target_inode).ino; + + if (req->r_deleg_ino != ino) + pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%lx target=0x%llx\n", + __func__, req->r_err, req->r_deleg_ino, ino); + mapping_set_error(req->r_target_inode->i_mapping, req->r_err); + } else { + pr_warn("%s: no req->r_target_inode for 0x%lx\n", __func__, + req->r_deleg_ino); + } +out: + ceph_put_cap_refs(ceph_inode(req->r_parent), + CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE); +} + +static int ceph_finish_async_open(struct inode *dir, struct dentry *dentry, + struct file *file, umode_t mode, + struct ceph_mds_request *req, + struct ceph_acl_sec_ctx *as_ctx) +{ + int ret; + struct ceph_mds_reply_inode in = { }; + struct ceph_mds_reply_info_in iinfo = { .in = &in }; + struct ceph_inode_info *ci = ceph_inode(dir); + struct inode *inode; + struct timespec64 now; + struct ceph_vino vino = { .ino = req->r_deleg_ino, + .snap = CEPH_NOSNAP }; + + ktime_get_real_ts64(&now); + + inode = ceph_get_inode(dentry->d_sb, vino); + if (IS_ERR(inode)) + return PTR_ERR(inode); + + /* If we can't get a buffer, just carry on */ + iinfo.xattr_data = kzalloc(4, GFP_NOFS); + if (iinfo.xattr_data) + iinfo.xattr_len = 4; + + iinfo.inline_version = CEPH_INLINE_NONE; + iinfo.change_attr = 1; + ceph_encode_timespec64(&iinfo.btime, &now); + + in.ino = cpu_to_le64(vino.ino); + in.snapid = cpu_to_le64(CEPH_NOSNAP); + in.version = cpu_to_le64(1); // ??? + in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE); + in.cap.cap_id = cpu_to_le64(1); + in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino); + in.cap.flags = CEPH_CAP_FLAG_AUTH; + in.ctime = in.mtime = in.atime = iinfo.btime; + in.mode = cpu_to_le32((u32)mode); + in.truncate_seq = cpu_to_le32(1); + in.truncate_size = cpu_to_le64(ci->i_truncate_size); + in.max_size = cpu_to_le64(ci->i_max_size); + in.xattr_version = cpu_to_le64(1); + in.uid = cpu_to_le32(from_kuid(&init_user_ns, current_fsuid())); + in.gid = cpu_to_le32(from_kgid(&init_user_ns, current_fsgid())); + in.nlink = cpu_to_le32(1); + + ceph_file_layout_to_legacy(&ci->i_layout, &in.layout); + + ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session, + req->r_fmode, NULL); + if (ret) { + dout("%s failed to fill inode: %d\n", __func__, ret); + if (inode->i_state & I_NEW) + discard_new_inode(inode); + } else { + struct dentry *dn; + + dout("%s d_adding new inode 0x%llx to 0x%lx/%s\n", __func__, + vino.ino, dir->i_ino, dentry->d_name.name); + ceph_dir_clear_ordered(dir); + ceph_init_inode_acls(inode, as_ctx); + if (inode->i_state & I_NEW) + unlock_new_inode(inode); + if (d_in_lookup(dentry) || d_really_is_negative(dentry)) { + if (!d_unhashed(dentry)) + d_drop(dentry); + dn = d_splice_alias(inode, dentry); + WARN_ON_ONCE(dn && dn != dentry); + } + file->f_mode |= FMODE_CREATED; + ret = finish_open(file, dentry, ceph_open); + } + return ret; +} + /* * Do a lookup + open with a single request. If we get a non-existent * file or symlink, return 1 so the VFS can retry. @@ -462,6 +597,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct ceph_mds_request *req; struct dentry *dn; struct ceph_acl_sec_ctx as_ctx = {}; + bool try_async = enable_async_dirops; int mask; int err; @@ -486,6 +622,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, return -ENOENT; } +retry: /* do the open */ req = prepare_open_request(dir->i_sb, flags, mode); if (IS_ERR(req)) { @@ -494,6 +631,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } req->r_dentry = dget(dentry); req->r_num_caps = 2; + mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; + if (ceph_security_xattr_wanted(dir)) + mask |= CEPH_CAP_XATTR_SHARED; + req->r_args.open.mask = cpu_to_le32(mask); + req->r_parent = dir; + if (flags & O_CREAT) { req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -501,21 +644,37 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, req->r_pagelist = as_ctx.pagelist; as_ctx.pagelist = NULL; } + if (try_async && get_caps_for_async_create(dir, dentry)) { + set_bit(CEPH_MDS_R_DELEG_INO, &req->r_req_flags); + req->r_callback = ceph_async_create_cb; + err = ceph_mdsc_submit_request(mdsc, dir, req); + switch (err) { + case 0: + /* set up inode, dentry and return */ + err = ceph_finish_async_open(dir, dentry, file, + mode, req, &as_ctx); + goto out_req; + case -ECHILD: + /* do a sync create */ + try_async = false; + as_ctx.pagelist = req->r_pagelist; + req->r_pagelist = NULL; + ceph_mdsc_put_request(req); + goto retry; + default: + /* Hard error, give up */ + goto out_req; + } + } } - mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; - if (ceph_security_xattr_wanted(dir)) - mask |= CEPH_CAP_XATTR_SHARED; - req->r_args.open.mask = cpu_to_le32(mask); - - req->r_parent = dir; set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); err = ceph_handle_snapdir(req, dentry, err); if (err) - goto out_req; + goto out_fmode; if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -529,7 +688,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, dn = NULL; } if (err) - goto out_req; + goto out_fmode; if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) { /* make vfs retry on splice, ENOENT, or symlink */ dout("atomic_open finish_no_open on dn %p\n", dn); @@ -545,9 +704,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, } err = finish_open(file, dentry, ceph_open); } -out_req: +out_fmode: if (!req->r_err && req->r_target_inode) ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); +out_req: ceph_mdsc_put_request(req); out_ctx: ceph_release_acl_sec_ctx(&as_ctx); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 9e7492b21b50b..c76d6e7f8136c 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2620,14 +2620,16 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, flags |= CEPH_MDS_FLAG_REPLAY; if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; - rhead->flags = cpu_to_le32(flags); - rhead->num_fwd = req->r_num_fwd; - rhead->num_retry = req->r_attempts - 1; - if (test_bit(CEPH_MDS_R_DELEG_INO, &req->r_req_flags)) + if (test_bit(CEPH_MDS_R_DELEG_INO, &req->r_req_flags)) { rhead->ino = cpu_to_le64(req->r_deleg_ino); - else + flags |= CEPH_MDS_FLAG_ASYNC; + } else { rhead->ino = 0; + } + rhead->flags = cpu_to_le32(flags); + rhead->num_fwd = req->r_num_fwd; + rhead->num_retry = req->r_attempts - 1; dout(" r_parent = %p\n", req->r_parent); return 0; } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index ec4d66d7c2618..0f754937deed1 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -136,7 +136,6 @@ struct ceph_fs_client { #endif }; - /* * File i/o capability. This tracks shared state with the metadata * server that allows us to cache or writeback attributes or to read diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index a099f60feb7bf..b127563e21a12 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -444,8 +444,9 @@ union ceph_mds_request_args { } __attribute__ ((packed)) lookupino; } __attribute__ ((packed)); -#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */ -#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */ +#define CEPH_MDS_FLAG_REPLAY 1 /* this is a replayed op */ +#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */ +#define CEPH_MDS_FLAG_ASYNC 4 /* request is asynchronous */ struct ceph_mds_request_head { __le64 oldest_client_tid; @@ -658,6 +659,9 @@ int ceph_flags_to_mode(int flags); #define CEPH_CAP_ANY (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \ CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \ CEPH_CAP_PIN) +#define CEPH_CAP_ALL_FILE (CEPH_CAP_PIN | CEPH_CAP_ANY_SHARED | \ + CEPH_CAP_AUTH_EXCL | CEPH_CAP_XATTR_EXCL | \ + CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR) #define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \ CEPH_LOCK_IXATTR)