req->r_direct_is_hash = true;
rhead = req->r_request->front.iov_base;
rhead->args.readdir.frag = cpu_to_le32(frag);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err < 0) {
ceph_mdsc_put_request(req);
return err;
rhead->args.stat.mask = cpu_to_le32(mask);
req->r_last_dentry = dget(dentry); /* try to use this in fill_trace */
req->r_locked_dir = dentry->d_parent->d_inode; /* by the VFS */
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
dentry = ceph_finish_lookup(req, dentry, err);
ceph_mdsc_put_request(req); /* will dput(dentry) */
dout(20, "do_lookup result=%p\n", dentry);
rhead->args.mknod.mode = cpu_to_le32(mode);
rhead->args.mknod.rdev = cpu_to_le32(rdev);
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
if (!err && req->r_reply_info.trace_numd == 0) {
/*
* no trace. do lookup, in case we are called from create
}
req->r_locked_dir = dir;
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
ceph_mdsc_put_request(req);
if (err)
d_drop(dentry);
rhead->args.mkdir.mode = cpu_to_le32(mode);
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
ceph_mdsc_put_request(req);
if (err < 0)
d_drop(dentry);
req->r_locked_dir = old_dentry->d_inode;
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
if (err) {
d_drop(dentry);
} else if (req->r_reply_info.trace_numd == 0) {
ceph_mdsc_lease_release(mdsc, dir, dentry,
CEPH_LOCK_DN);
ceph_release_caps(inode, CEPH_CAP_LINK_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, dir, req);
ceph_mdsc_put_request(req);
return err;
CEPH_LOCK_DN);
if (new_dentry->d_inode)
ceph_release_caps(new_dentry->d_inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, old_dir, req);
if (!err && req->r_reply_info.trace_numd == 0) {
/*
* no trace
NULL, USE_ANY_MDS);
if (IS_ERR(req))
return ERR_PTR(PTR_ERR(req));
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
inode = ceph_find_inode(sb, vino);
struct dentry *dentry;
struct ceph_mds_request *req;
struct ceph_file_info *cf = file->private_data;
+ struct inode *parent_inode = file->f_dentry->d_parent->d_inode;
int err;
int flags, fmode, wantcaps;
err = PTR_ERR(req);
goto out;
}
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
if (!err)
err = ceph_init_file(inode, file, req->r_fmode);
ceph_mdsc_put_request(req);
struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
struct file *file = nd->intent.open.file;
+ struct inode *parent_inode = file->f_dentry->d_parent->d_inode;
struct ceph_mds_request *req;
int err;
int flags = nd->intent.open.flags - 1; /* silly vfs! */
ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
req->r_last_dentry = dget(dentry); /* use this dentry in fill_trace */
req->r_locked_dir = dir; /* caller holds dir->i_mutex */
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
dentry = ceph_finish_lookup(req, dentry, err);
if (!err)
err = ceph_init_file(req->r_last_inode, file, req->r_fmode);
static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
{
struct inode *inode = dentry->d_inode;
- int ret;
+ int ret, err;
+ struct ceph_mds_request *req;
+ u64 nexttid = 0;
dout(10, "fsync on inode %p\n", inode);
ret = write_inode_now(inode, 1);
if (ret < 0)
return ret;
+ ret = 0;
+ if ((inode->i_mode & S_IFMT) == S_IFDIR) {
+ dout(0, "sync on directory\n");
+
+ do {
+ req = ceph_mdsc_get_listener_req(inode, nexttid);
+
+ if (!req)
+ break;
+ nexttid = req->r_tid + 1;
+
+ if (req->r_timeout) {
+ err = wait_for_completion_timeout(&req->r_safe_completion,
+ req->r_timeout);
+ if (err == 0)
+ ret = -EIO; /* timed out */
+ } else {
+ wait_for_completion(&req->r_safe_completion);
+ }
+ ceph_mdsc_put_request(req);
+ } while (req);
+ }
+
/*
* HMM: should we also ensure that caps are flushed to mds?
* It's not strictly necessary, since with the data on the
* Not mtime, though.
*/
- return 0;
+ return ret;
}
const struct file_operations ceph_file_fops = {
ci->i_vmtruncate_to = -1;
INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
+ INIT_RADIX_TREE(&ci->i_listener_tree, GFP_NOFS);
+ spin_lock_init(&ci->i_listener_lock);
+
return &ci->vfs_inode;
}
{
struct inode *inode = dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
const unsigned int ia_valid = attr->ia_valid;
}
reqh->args.chown.mask = cpu_to_le32(mask);
ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
dout(10, "chown result %d\n", err);
return err;
{
struct inode *inode = dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
struct ceph_mds_request *req;
reqh = req->r_request->front.iov_base;
reqh->args.chmod.mode = cpu_to_le32(attr->ia_mode);
ceph_release_caps(inode, CEPH_CAP_AUTH_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
dout(10, "chmod result %d\n", err);
return err;
static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
{
struct inode *inode = dentry->d_inode;
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
reqh->args.utime.mask |= cpu_to_le32(CEPH_UTIME_MTIME);
ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
dout(10, "utime result %d\n", err);
return err;
static int ceph_setattr_size(struct dentry *dentry, struct iattr *attr)
{
struct inode *inode = dentry->d_inode;
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
reqh = req->r_request->front.iov_base;
reqh->args.truncate.length = cpu_to_le64(attr->ia_size);
ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
dout(10, "truncate result %d\n", err);
__ceph_do_pending_vmtruncate(inode);
{
struct ceph_client *client = ceph_client(dentry->d_sb);
struct inode *inode = dentry->d_inode;
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_mds_client *mdsc = &client->mdsc;
struct ceph_mds_request *req;
struct ceph_mds_request_head *rhead;
req->r_request->hdr.data_off = cpu_to_le32(0);
ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
out:
struct ceph_client *client = ceph_client(dentry->d_sb);
struct ceph_mds_client *mdsc = &client->mdsc;
struct inode *inode = dentry->d_inode;
+ struct inode *parent_inode = dentry->d_parent->d_inode;
struct ceph_mds_request *req;
char *path;
int pathlen;
return PTR_ERR(req);
ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
return err;
}
static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
{
struct inode *inode = file->f_dentry->d_inode;
+ struct inode *parent_inode = file->f_dentry->d_parent->d_inode;
struct ceph_mds_client *mdsc = &ceph_sb_to_client(inode->i_sb)->mdsc;
char *path;
int pathlen;
reqh = req->r_request->front.iov_base;
reqh->args.setlayout.layout = layout;
ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, parent_inode, req);
ceph_mdsc_put_request(req);
return err;
}
req->r_fmode = -1;
atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */
init_completion(&req->r_completion);
+ init_completion(&req->r_safe_completion);
return req;
}
* Called under mdsc->mutex.
*/
static void __register_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
struct ceph_mds_request *req)
{
struct ceph_mds_request_head *head = req->r_request->front.iov_base;
+ struct ceph_inode_info *ci;
req->r_tid = ++mdsc->last_tid;
head->tid = cpu_to_le64(req->r_tid);
dout(30, "__register_request %p tid %lld\n", req, req->r_tid);
get_request(req);
radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
+ req->r_listener = listener;
+ if (listener) {
+ ci = ceph_inode(listener);
+ spin_lock(&ci->i_listener_lock);
+ radix_tree_insert(&ci->i_listener_tree, req->r_tid, (void *)req);
+ spin_unlock(&ci->i_listener_lock);
+ }
}
static void __unregister_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
+ struct ceph_inode_info *ci;
dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
radix_tree_delete(&mdsc->request_tree, req->r_tid);
+ if (req->r_listener) {
+ ci = ceph_inode(req->r_listener);
+ spin_lock(&ci->i_listener_lock);
+ radix_tree_delete(&ci->i_listener_tree, req->r_tid);
+ spin_unlock(&ci->i_listener_lock);
+ }
ceph_mdsc_put_request(req);
}
+struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode,
+ u64 tid)
+{
+ struct ceph_mds_request *req = NULL;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int got;
+
+ spin_lock(&ci->i_listener_lock);
+ got = radix_tree_gang_lookup(&ci->i_listener_tree,
+ (void **)&req, 0, 1);
+
+ if (got >= 0) {
+ get_request(req);
+ }
+ spin_unlock(&ci->i_listener_lock);
+
+ return req;
+}
+
static bool __have_session(struct ceph_mds_client *mdsc, int mds)
{
if (mds >= mdsc->max_sessions)
return mdsc->sessions[mds];
}
-
/*
* Choose mds to send request to next. If there is a hint set in
* the request (e.g., due to a prior forward hint from the mds), use
* session setup, forwarding, retry details.
*/
int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
struct ceph_mds_request *req)
{
struct ceph_mds_session *session = NULL;
struct ceph_mds_request_head *rhead;
int err;
int mds = -1;
+ int safe = 0;
dout(30, "do_request on %p\n", req);
mutex_lock(&mdsc->mutex);
- __register_request(mdsc, req);
+ __register_request(mdsc, listener, req);
retry:
if (req->r_timeout &&
time_after_eq(jiffies, req->r_started + req->r_timeout)) {
if (!err)
/* all is well, reply has been parsed. */
err = le32_to_cpu(req->r_reply_info.head->result);
+ if (req)
+ safe = req->r_reply_info.head->safe;
finish:
- __unregister_request(mdsc, req);
+ if (safe) {
+ complete(&req->r_safe_completion);
+ __unregister_request(mdsc, req);
+ }
+
mutex_unlock(&mdsc->mutex);
- ceph_msg_put(req->r_request);
- req->r_request = NULL;
+ if (safe) {
+ ceph_msg_put(req->r_request);
+ req->r_request = NULL;
+ }
dout(30, "do_request %p done, result %d\n", req, err);
return err;
dout(10, "handle_reply %p expected_cap=%p\n", req, req->r_expected_cap);
mds = le32_to_cpu(msg->hdr.src.name.num);
if (req->r_got_reply) {
- derr(1, "got reply on %llu, mds%d got more than one reply\n",
- tid, mds);
+ if (req->r_reply_info.head->safe) {
+ /*
+ We already handled the unsafe response, now do the cleanup.
+ Shouldn't we check the safe response to see if it matches
+ the unsafe one?
+ */
+ complete(&req->r_safe_completion);
+ __unregister_request(mdsc, req);
+ dout(10, "got another reply %llu, mds%d\n",
+ tid, mds);
+ ceph_msg_put(req->r_request);
+ req->r_request = NULL;
+ } else {
+ dout(0, "got another _unsafe_ reply %llu, mds%d\n",
+ tid, mds);
+ }
mutex_unlock(&mdsc->mutex);
ceph_mdsc_put_request(req);
return;
/* kick calling process */
complete(&req->r_completion);
ceph_mdsc_put_request(req);
+
return;
}
u32 r_direct_hash; /* choose dir frag based on this dentry hash */
bool r_direct_is_hash; /* true if r_direct_hash is valid */
+ struct inode *r_listener;
+
/* references to the trailing dentry and inode from parsing the
* mds response. also used to feed a VFS-provided dentry into
* the reply handler */
atomic_t r_ref;
struct completion r_completion;
+ struct completion r_safe_completion;
int r_got_reply;
};
u64 ino2, const char *path2,
struct dentry *ref, int want_auth);
extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
struct ceph_mds_request *req);
extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
extern void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
+extern struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode,
+ u64 tid);
#endif
req->r_timeout = client->mount_args.mount_timeout * HZ;
reqhead = req->r_request->front.iov_base;
reqhead->args.stat.mask = CEPH_STAT_CAP_INODE;
- err = ceph_mdsc_do_request(mdsc, req);
+ err = ceph_mdsc_do_request(mdsc, NULL, req);
if (err == 0) {
root = req->r_last_dentry;
dget(root);
loff_t i_vmtruncate_to; /* delayed truncate work */
struct work_struct i_vmtruncate_work;
+ struct radix_tree_root i_listener_tree; /* requests we pend on */
+ spinlock_t i_listener_lock;
+
struct inode vfs_inode; /* at end */
};