From 4c6431bb0c554347b0581cf58d1254361e3c1d90 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 5 Mar 2008 13:29:03 -0800 Subject: [PATCH] client: refactor mds requests to put most processing in reply handler The goal is to fix race issues between open reply processing (previously in calling processes context) and subsequent file_cap messages. This just pushes generic mds reply handling into the reply handler, including the ceph_insert_trace (somewhat heavyweight, unfortunately) and the cap creation/update. This effectively pushes a lot more code into the single thread thats doing mds replies, but simplifies a lot of code. Until scaling issues arise, I think it's fine. --- src/kernel/client.c | 49 +-- src/kernel/dir.c | 197 +++++------- src/kernel/file.c | 58 ++-- src/kernel/inode.c | 68 ++--- src/kernel/mds_client.c | 642 ++++++++++++++++++++++------------------ src/kernel/mds_client.h | 83 +++--- src/kernel/super.h | 5 +- 7 files changed, 525 insertions(+), 577 deletions(-) diff --git a/src/kernel/client.c b/src/kernel/client.c index e2db970eefc7b..368a09b4d877e 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -45,46 +45,13 @@ static void put_client_counter(void) spin_unlock(&ceph_client_spinlock); } - -int parse_open_reply(struct ceph_msg *reply, struct inode *inode, struct ceph_mds_session *session) -{ - struct ceph_mds_reply_head *head; - struct ceph_mds_reply_info rinfo; - int frommds = session->s_mds; - int err; - struct ceph_inode_cap *cap; - - /* parse reply */ - head = reply->front.iov_base; - err = le32_to_cpu(head->result); - dout(30, "parse_open_reply mds%d reports %d\n", frommds, err); - if (err < 0) - return err; - if ((err = ceph_mdsc_parse_reply_info(reply, &rinfo)) < 0) - return err; - BUG_ON(rinfo.trace_nr == 0); - if ((err = ceph_fill_inode(inode, rinfo.trace_in[rinfo.trace_nr-1].in)) < 0) - return err; - - /* fill in cap */ - cap = ceph_add_cap(inode, session, - le32_to_cpu(head->file_caps), - le32_to_cpu(head->file_caps_seq)); - if (IS_ERR(cap)) - return PTR_ERR(cap); - - ceph_mdsc_destroy_reply_info(&rinfo); - return 0; -} - static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *args, struct dentry **pmnt_root) { struct ceph_mds_client *mdsc = &client->mdsc; struct inode *root_inode, *mnt_inode = NULL; - struct ceph_msg *req = 0; + struct ceph_mds_request *req = 0; struct ceph_mds_request_head *reqhead; struct ceph_mds_reply_info rinfo; - struct ceph_mds_session *session; int frommds; int err; struct ceph_inode_cap *cap; @@ -96,15 +63,16 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN, 1, args->path, 0, 0); if (IS_ERR(req)) return PTR_ERR(req); - reqhead = req->front.iov_base; + reqhead = req->r_request->front.iov_base; reqhead->args.open.flags = O_DIRECTORY; reqhead->args.open.mode = 0; - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req)) < 0) return err; + rinfo = req->r_reply_info; err = le32_to_cpu(rinfo.head->result); if (err != 0) - return err; + goto out; if (rinfo.trace_nr == 0) { dout(10, "open_root_inode wtf, mds returns 0 but no trace\n"); err = -EINVAL; @@ -139,8 +107,8 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a } /* fill in cap */ - frommds = le32_to_cpu(rinfo.reply->hdr.src.name.num); - cap = ceph_add_cap(mnt_inode, session, + frommds = le32_to_cpu(req->r_reply->hdr.src.name.num); + cap = ceph_add_cap(mnt_inode, req->r_session, le32_to_cpu(rinfo.head->file_caps), le32_to_cpu(rinfo.head->file_caps_seq)); if (IS_ERR(cap)) { @@ -152,6 +120,7 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a ci->i_nr_by_mode[FILE_MODE_PIN]++; dout(30, "open_root_inode success, root dentry is %p.\n", client->sb->s_root); + ceph_mdsc_put_request(req); return 0; out2: @@ -160,7 +129,7 @@ out2: iput(root_inode); iput(mnt_inode); out: - ceph_mdsc_put_session(session); + ceph_mdsc_put_request(req); return err; } diff --git a/src/kernel/dir.c b/src/kernel/dir.c index 1f70e857b1bf5..689fda87c3f4d 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -104,16 +104,18 @@ static int ceph_dir_readdir(struct file *filp, void *dirent, filldir_t filldir) struct qstr dname; struct dentry *parent, *dn; struct inode *in; + struct ceph_mds_reply_info *rinfo; nextfrag: dout(5, "dir_readdir filp %p at frag %u off %u\n", filp, frag, off); - if (fi->frag != frag || fi->rinfo.reply == NULL) { - struct ceph_msg *req; + if (fi->frag != frag || fi->req == NULL) { + struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; + struct ceph_mds_reply_info *rinfo; /* query mds */ - if (fi->rinfo.reply) - ceph_mdsc_destroy_reply_info(&fi->rinfo); + if (fi->req) + ceph_mdsc_put_request(fi->req); dout(10, "dir_readdir querying mds for ino %llx frag %u\n", ceph_ino(inode), frag); @@ -121,22 +123,27 @@ nextfrag: ceph_ino(inode), "", 0, 0); if (IS_ERR(req)) return PTR_ERR(req); - rhead = req->front.iov_base; + fi->req = req; + rhead = req->r_request->front.iov_base; rhead->args.readdir.frag = cpu_to_le32(frag); - err = ceph_mdsc_do_request(mdsc, req, &fi->rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; - err = le32_to_cpu(fi->rinfo.head->result); + rinfo = &fi->req->r_reply_info; + err = le32_to_cpu(rinfo->head->result); dout(10, "dir_readdir got and parsed readdir result=%d" " on frag %u\n", err, frag); - if (err < 0) + if (err < 0) { + ceph_mdsc_put_request(req); + fi->req = 0; return err; + } /* pre-populate dentry cache */ parent = filp->f_dentry; - for (i = 0; i < fi->rinfo.dir_nr; i++) { - dname.name = fi->rinfo.dir_dname[i]; - dname.len = fi->rinfo.dir_dname_len[i]; + for (i = 0; i < rinfo->dir_nr; i++) { + dname.name = rinfo->dir_dname[i]; + dname.len = rinfo->dir_dname_len[i]; dname.hash = full_name_hash(dname.name, dname.len); dn = d_lookup(parent, &dname); @@ -164,8 +171,8 @@ nextfrag: ceph_touch_dentry(dn); if (ceph_ino(in) != - le64_to_cpu(fi->rinfo.dir_in[i].in->ino)) { - if (ceph_fill_inode(in, fi->rinfo.dir_in[i].in) < 0) { + le64_to_cpu(rinfo->dir_in[i].in->ino)) { + if (ceph_fill_inode(in, rinfo->dir_in[i].in) < 0) { dout(30, "ceph_fill_inode badness\n"); iput(in); d_delete(dn); @@ -173,9 +180,9 @@ nextfrag: } d_add(dn, in); dout(10, "dir_readdir added dentry %p inode %llx %d/%d\n", - dn, ceph_ino(in), i, fi->rinfo.dir_nr); + dn, ceph_ino(in), i, rinfo->dir_nr); } else { - if (ceph_fill_inode(in, fi->rinfo.dir_in[i].in) < 0) { + if (ceph_fill_inode(in, rinfo->dir_in[i].in) < 0) { dout(30, "ceph_fill_inode badness\n"); break; } @@ -209,15 +216,16 @@ nextfrag: } else skew = -2; - while (off+skew < fi->rinfo.dir_nr) { + rinfo = &fi->req->r_reply_info; + while (off+skew < rinfo->dir_nr) { dout(10, "dir_readdir off %d -> %d / %d name '%s'\n", off, off+skew, - fi->rinfo.dir_nr, fi->rinfo.dir_dname[off+skew]); + rinfo->dir_nr, rinfo->dir_dname[off+skew]); if (filldir(dirent, - fi->rinfo.dir_dname[off+skew], - fi->rinfo.dir_dname_len[off+skew], + rinfo->dir_dname[off+skew], + rinfo->dir_dname_len[off+skew], make_fpos(frag, off), - le64_to_cpu(fi->rinfo.dir_in[off+skew].in->ino), - le32_to_cpu(fi->rinfo.dir_in[off+skew].in->mode >> 12)) < 0) { + le64_to_cpu(rinfo->dir_in[off+skew].in->ino), + le32_to_cpu(rinfo->dir_in[off+skew].in->mode >> 12)) < 0) { dout(20, "filldir stopping us...\n"); return 0; } @@ -247,14 +255,13 @@ const struct file_operations ceph_dir_fops = { }; -int ceph_request_lookup(struct super_block *sb, struct dentry *dentry, - struct ceph_mds_reply_info *prinfo) +int ceph_request_lookup(struct super_block *sb, struct dentry *dentry) { struct ceph_client *client = ceph_sb_to_client(sb); struct ceph_mds_client *mdsc = &client->mdsc; char *path; int pathlen; - struct ceph_msg *req; + struct ceph_mds_request *req; int err; /* regular lookup */ @@ -266,12 +273,12 @@ int ceph_request_lookup(struct super_block *sb, struct dentry *dentry, kfree(path); if (IS_ERR(req)) return PTR_ERR(req); - err = ceph_mdsc_do_request(mdsc, req, prinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; - err = le32_to_cpu(prinfo->head->result); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); dout(20, "dir_lookup result=%d\n", err); - return err; } @@ -284,57 +291,26 @@ void ceph_touch_dentry(struct dentry *dentry) static struct dentry *ceph_dir_lookup(struct inode *dir, struct dentry *dentry, struct nameidata *nd) { - struct ceph_mds_reply_info rinfo; - struct inode *inode; int err; - ino_t ino; - int found = 0; dout(5, "dir_lookup in dir %p dentry %p '%s'\n", dir, dentry, dentry->d_name.name); /* open(|create) intent? */ + /* if (nd->flags & LOOKUP_OPEN) { err = ceph_lookup_open(dir, dentry, nd); return ERR_PTR(err); } + */ - err = ceph_request_lookup(dir->i_sb, dentry, &rinfo); - + err = ceph_request_lookup(dir->i_sb, dentry); if (err == -ENOENT) { + dout(10, "ENOENT, adding a null dentry\n"); ceph_touch_dentry(dentry); d_add(dentry, NULL); } else if (err < 0) return ERR_PTR(err); - if ((!err) && (rinfo.trace_nr > 0)) { - ino = le64_to_cpu(rinfo.trace_in[rinfo.trace_nr-1].in->ino); - dout(10, "got and parsed stat result, ino %lu\n", ino); - - inode = ilookup(dir->i_sb, ino); - - if (!inode) - inode = new_inode(dir->i_sb); - else - found++; - - if (!inode) - return ERR_PTR(-EACCES); - - err = ceph_fill_inode(inode, - rinfo.trace_in[rinfo.trace_nr-1].in); - if (err < 0) - return ERR_PTR(err); - - ceph_touch_dentry(dentry); - d_add(dentry, inode); - - if (found) - iput(inode); - - } else { - dout(10, "no trace in reply? wtf.\n"); - } - return NULL; } @@ -342,10 +318,8 @@ static int ceph_dir_mknod(struct inode *dir, struct dentry *dentry, int mode, de { struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; - struct inode *inode = NULL; - struct ceph_msg *req; + struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; - struct ceph_mds_reply_info rinfo; char *path; int pathlen; int err; @@ -361,30 +335,26 @@ static int ceph_dir_mknod(struct inode *dir, struct dentry *dentry, int mode, de d_drop(dentry); return PTR_ERR(req); } - rhead = req->front.iov_base; + rhead = req->r_request->front.iov_base; rhead->args.mknod.mode = cpu_to_le32(mode); rhead->args.mknod.rdev = cpu_to_le32(rdev); - err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) { d_drop(dentry); return err; } - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { - err = ceph_fill_trace(dir->i_sb, &rinfo, &inode, NULL); - - if (err < 0) - goto done; - - if (inode == NULL) { + if (req->r_last_inode == NULL) { /* TODO handle this one */ err = -ENOMEM; goto done; } - dout(10, "rinfo.dir_in=%p rinfo.trace_nr=%d\n", rinfo.trace_in, rinfo.trace_nr); + //dout(10, "rinfo.dir_in=%p rinfo.trace_nr=%d\n", rinfo.trace_in, rinfo.trace_nr); } done: + ceph_mdsc_put_request(req); return err; } @@ -392,9 +362,7 @@ static int ceph_dir_symlink(struct inode *dir, struct dentry *dentry, const char { struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; - struct inode *inode = NULL; - struct ceph_msg *req; - struct ceph_mds_reply_info rinfo; + struct ceph_mds_request *req; char *path; int pathlen; int err; @@ -410,27 +378,22 @@ static int ceph_dir_symlink(struct inode *dir, struct dentry *dentry, const char d_drop(dentry); return PTR_ERR(req); } - err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) { d_drop(dentry); return err; } - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { - err = ceph_fill_trace(dir->i_sb, &rinfo, &inode, NULL); - - if (err < 0) - goto done; - - if (inode == NULL) { + if (req->r_last_inode == NULL) { /* TODO handle this one */ err = -ENOMEM; goto done; } - dout(10, "rinfo.dir_in=%p rinfo.trace_nr=%d\n", rinfo.trace_in, rinfo.trace_nr); } done: + ceph_mdsc_put_request(req); return err; } @@ -438,10 +401,8 @@ static int ceph_dir_mkdir(struct inode *dir, struct dentry *dentry, int mode) { struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; - struct inode *inode = NULL; - struct ceph_msg *req; + struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; - struct ceph_mds_reply_info rinfo; char *path; int pathlen; int err; @@ -457,30 +418,24 @@ static int ceph_dir_mkdir(struct inode *dir, struct dentry *dentry, int mode) d_drop(dentry); return PTR_ERR(req); } - rhead = req->front.iov_base; + rhead = req->r_request->front.iov_base; rhead->args.mkdir.mode = cpu_to_le32(mode); - err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) { d_drop(dentry); return err; } - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { -/* inode_dec_link_count(inode); */ - err = ceph_fill_trace(dir->i_sb, &rinfo, &inode, NULL); - - if (err < 0) - goto done_mkdir; - - if (inode == NULL) { + if (req->r_last_inode == NULL) { /* TODO handle this one */ err = -ENOMEM; goto done_mkdir; } - dout(10, "rinfo.dir_in=%p rinfo.trace_nr=%d\n", rinfo.trace_in, rinfo.trace_nr); } done_mkdir: + ceph_mdsc_put_request(req); return err; } @@ -490,8 +445,7 @@ static int ceph_dir_unlink(struct inode *dir, struct dentry *dentry) struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; struct inode *inode = dentry->d_inode; - struct ceph_msg *req; - struct ceph_mds_reply_info rinfo; + struct ceph_mds_request *req; char *path; int pathlen; int err; @@ -507,15 +461,16 @@ static int ceph_dir_unlink(struct inode *dir, struct dentry *dentry) kfree(path); if (IS_ERR(req)) return PTR_ERR(req); - err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { - inode_dec_link_count(inode); + inode_dec_link_count(req->r_last_inode); /* FIXME update dir mtime etc. from reply trace */ } + ceph_mdsc_put_request(req); return err; } @@ -524,8 +479,7 @@ static int ceph_dir_rename(struct inode *old_dir, struct dentry *old_dentry, { struct ceph_client *client = ceph_sb_to_client(old_dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; - struct ceph_msg *req; - struct ceph_mds_reply_info rinfo; + struct ceph_mds_request *req; struct dentry *root = old_dir->i_sb->s_root; char *oldpath, *newpath; int oldpathlen, newpathlen; @@ -548,14 +502,15 @@ static int ceph_dir_rename(struct inode *old_dir, struct dentry *old_dentry, kfree(newpath); if (IS_ERR(req)) return PTR_ERR(req); - err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { /* FIXME update dir mtime etc. from reply trace */ } + ceph_mdsc_put_request(req); return err; } @@ -567,12 +522,9 @@ ceph_dir_create(struct inode *dir, struct dentry *dentry, int mode, ceph_ino_t pathbase; char *path; int pathlen; - struct ceph_msg *req; + struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; - struct ceph_mds_reply_info rinfo; - struct ceph_mds_session *session; int err; - struct inode *inode; dout(5, "create in dir %p dentry %p name '%s' flags %d\n", dir, dentry, dentry->d_name.name, mode); pathbase = ceph_ino(dir->i_sb->s_root->d_inode); @@ -583,29 +535,24 @@ ceph_dir_create(struct inode *dir, struct dentry *dentry, int mode, kfree(path); if (IS_ERR(req)) return PTR_ERR(req); - rhead = req->front.iov_base; + rhead = req->r_request->front.iov_base; rhead->args.mknod.mode = cpu_to_le32(mode); rhead->args.mknod.rdev = 0; - err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; dout(10, "create got and parsed result\n"); - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); if (err == 0) { - err = ceph_fill_trace(dir->i_sb, &rinfo, &inode, NULL); - - if (err < 0) - goto done_create; - - if (inode == NULL) { + if (req->r_last_inode == NULL) { err = -ENOMEM; goto done_create; } - dout(10, "rinfo.dir_in=%p rinfo.trace_nr=%d\n", rinfo.trace_in, rinfo.trace_nr); } done_create: + ceph_mdsc_put_request(req); return err; } @@ -641,6 +588,6 @@ const struct inode_operations ceph_dir_iops = { }; struct dentry_operations ceph_dentry_ops = { - .d_revalidate = ceph_d_revalidate, + .d_revalidate = ceph_d_revalidate, }; diff --git a/src/kernel/file.c b/src/kernel/file.c index eb90d48a99a38..b659d168f63ad 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -9,38 +9,33 @@ int ceph_debug_file = 50; /* * if err==0, caller is responsible for a put_session on *psession */ -int do_open_request(struct super_block *sb, struct dentry *dentry, - int flags, int create_mode, - struct ceph_mds_session **psession, - struct ceph_mds_reply_info *rinfo) +struct ceph_mds_request * +prepare_open_request(struct super_block *sb, struct dentry *dentry, + int flags, int create_mode) { struct ceph_client *client = ceph_sb_to_client(sb); struct ceph_mds_client *mdsc = &client->mdsc; u64 pathbase; char *path; int pathlen; - struct ceph_msg *req; + struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; - int err; - dout(5, "open dentry %p name '%s' flags %d\n", dentry, + dout(5, "prepare_open_request dentry %p name '%s' flags %d\n", dentry, dentry->d_name.name, flags); pathbase = ceph_ino(sb->s_root->d_inode); path = ceph_build_dentry_path(dentry, &pathlen); if (IS_ERR(path)) - return PTR_ERR(path); + return ERR_PTR(PTR_ERR(path)); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN, pathbase, path, 0, 0); kfree(path); - if (IS_ERR(req)) - return PTR_ERR(req); - rhead = req->front.iov_base; - rhead->args.open.flags = cpu_to_le32(flags); - rhead->args.open.mode = cpu_to_le32(create_mode); - err = ceph_mdsc_do_request(mdsc, req, rinfo, psession); - if (err < 0) - return err; - return 0; + if (!IS_ERR(req)) { + rhead = req->r_request->front.iov_base; + rhead->args.open.flags = cpu_to_le32(flags); + rhead->args.open.mode = cpu_to_le32(create_mode); + } + return req; } /* @@ -95,10 +90,10 @@ static int ceph_open_init_private_data(struct inode *inode, struct file *file, i int ceph_open(struct inode *inode, struct file *file) { + struct ceph_client *client = ceph_sb_to_client(inode->i_sb); + struct ceph_mds_client *mdsc = &client->mdsc; struct dentry *dentry; - struct ceph_mds_reply_info rinfo; - struct ceph_mds_session *session; - struct ceph_inode_cap *cap = 0; + struct ceph_mds_request *req; struct ceph_file_info *cf = file->private_data; int err; @@ -114,18 +109,20 @@ int ceph_open(struct inode *inode, struct file *file) if (file->f_flags == O_DIRECTORY && ... ) cap = ceph_find_cap(inode, 0); */ - if (!cap) { + //if (!cap) { dentry = list_entry(inode->i_dentry.next, struct dentry, d_alias); - err = do_open_request(inode->i_sb, dentry, file->f_flags, 0, - &session, &rinfo); + req = prepare_open_request(inode->i_sb, dentry, file->f_flags, 0); + if (IS_ERR(req)) + return PTR_ERR(req); + err = ceph_mdsc_do_request(mdsc, req); if (err < 0) return err; - err = proc_open_reply(inode, file, session, &rinfo); - ceph_mdsc_put_session(session); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); if (err < 0) return err; - } +//} err = ceph_open_init_private_data(inode, file, file->f_flags); @@ -149,9 +146,11 @@ int ceph_lookup_open(struct inode *dir, struct dentry *dentry, dout(5, "ceph_lookup_open in dir %p dentry %p '%s'\n", dir, dentry, dentry->d_name.name); - err = do_open_request(dir->i_sb, dentry, nd->intent.open.flags, +/* fixme + err = prepare_open_request(dir->i_sb, dentry, nd->intent.open.flags, nd->intent.open.create_mode, &session, &rinfo); +*/ if (err < 0) return err; err = le32_to_cpu(rinfo.head->result); @@ -198,7 +197,7 @@ int ceph_lookup_open(struct inode *dir, struct dentry *dentry, if (err == 0) err = ceph_open_init_private_data(inode, file, nd->intent.open.flags); out: - ceph_mdsc_put_session(session); + //ceph_mdsc_put_session(session); return err; } @@ -230,8 +229,7 @@ int ceph_release(struct inode *inode, struct file *file) if (wanted != ci->i_cap_wanted) ceph_mdsc_update_cap_wanted(ci, wanted); - if (cf->rinfo.reply) - ceph_mdsc_destroy_reply_info(&cf->rinfo); + ceph_mdsc_put_request(cf->req); kfree(cf); return 0; diff --git a/src/kernel/inode.c b/src/kernel/inode.c index e8c1e71fd125a..a9d7764ff4ada 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -519,11 +519,11 @@ const struct inode_operations ceph_symlink_iops = { /* * generics */ -struct ceph_msg *prepare_setattr(struct ceph_mds_client *mdsc, struct dentry *dentry, int op) +struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc, struct dentry *dentry, int op) { char *path; int pathlen; - struct ceph_msg *req; + struct ceph_mds_request *req; dout(5, "prepare_setattr dentry %p\n", dentry); path = ceph_build_dentry_path(dentry, &pathlen); @@ -541,9 +541,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) struct ceph_client *client = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; const unsigned int ia_valid = attr->ia_valid; - struct ceph_msg *req; + struct ceph_mds_request *req; struct ceph_mds_request_head *reqh; - struct ceph_mds_reply_info rinfo; int err; /* gratuitous debug output */ @@ -571,7 +570,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req = prepare_setattr(mdsc, dentry, CEPH_MDS_OP_CHOWN); if (IS_ERR(req)) return PTR_ERR(req); - reqh = req->front.iov_base; + reqh = req->r_request->front.iov_base; if (ia_valid & ATTR_UID) reqh->args.chown.uid = cpu_to_le32(attr->ia_uid); else @@ -580,13 +579,13 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) reqh->args.chown.gid = cpu_to_le32(attr->ia_gid); else reqh->args.chown.gid = cpu_to_le32(-1); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req)) < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); dout(10, "chown result %d\n", err); if (err) return err; - err = ceph_fill_trace(inode->i_sb, &rinfo, &inode, NULL); //if (err) return err; } @@ -595,16 +594,15 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req = prepare_setattr(mdsc, dentry, CEPH_MDS_OP_CHMOD); if (IS_ERR(req)) return PTR_ERR(req); - reqh = req->front.iov_base; + reqh = req->r_request->front.iov_base; reqh->args.chmod.mode = cpu_to_le32(attr->ia_mode); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req)) < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); dout(10, "chmod result %d\n", err); if (err) return err; - err = ceph_fill_trace(inode->i_sb, &rinfo, &inode, NULL); - //if (err) return err; } /* utimes */ @@ -614,17 +612,16 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req = prepare_setattr(mdsc, dentry, CEPH_MDS_OP_UTIME); if (IS_ERR(req)) return PTR_ERR(req); - reqh = req->front.iov_base; + reqh = req->r_request->front.iov_base; ceph_encode_timespec(&reqh->args.utime.mtime, &attr->ia_mtime); ceph_encode_timespec(&reqh->args.utime.atime, &attr->ia_atime); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req)) < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); dout(10, "utime result %d\n", err); if (err) return err; - err = ceph_fill_trace(inode->i_sb, &rinfo, &inode, NULL); - //if (err) return err; } /* truncate? */ @@ -639,16 +636,15 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) req = prepare_setattr(mdsc, dentry, CEPH_MDS_OP_TRUNCATE); if (IS_ERR(req)) return PTR_ERR(req); - reqh = req->front.iov_base; + reqh = req->r_request->front.iov_base; reqh->args.truncate.length = cpu_to_le64(attr->ia_size); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req)) < 0) return err; - err = le32_to_cpu(rinfo.head->result); + err = le32_to_cpu(req->r_reply_info.head->result); + ceph_mdsc_put_request(req); dout(10, "truncate result %d\n", err); if (err) return err; - err = ceph_fill_trace(inode->i_sb, &rinfo, &inode, NULL); - //if (err) return err; } return 0; @@ -657,40 +653,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) int ceph_inode_revalidate(struct dentry *dentry) { struct ceph_inode_info *ci; - struct ceph_mds_reply_info rinfo; - ino_t ino; - int err; if (dentry->d_inode == NULL) return -ENOENT; ci = ceph_inode(dentry->d_inode); - if (!ci) return -ENOENT; - if (ceph_lookup_cache && time_before(jiffies, ci->time+CACHE_HZ)) { + if (ceph_lookup_cache && time_before(jiffies, ci->time+CACHE_HZ)) return 0; - } - err = ceph_request_lookup(dentry->d_inode->i_sb, dentry, &rinfo); - - if (err < 0) - return err; - - if (rinfo.trace_nr > 0) { - ino = le64_to_cpu(rinfo.trace_in[rinfo.trace_nr-1].in->ino); - dout(10, "revalidate: got and parsed stat result, ino %lu\n", ino); - - err = ceph_fill_inode(dentry->d_inode, - rinfo.trace_in[rinfo.trace_nr-1].in); - if (err < 0) - return err; - } else { - dout(10, "no trace in reply? wtf.\n"); - } - - return err; + return ceph_request_lookup(dentry->d_inode->i_sb, dentry); } int ceph_inode_getattr(struct vfsmount *mnt, struct dentry *dentry, struct kstat *stat) diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index 4a04e015f0227..10badf1f34c12 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -49,87 +49,190 @@ static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int /* - * reference count request + * mds reply parsing */ -static void get_request(struct ceph_mds_request *req) +int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info) { - atomic_inc(&req->r_ref); + int err; + info->in = *p; + *p += sizeof(struct ceph_mds_reply_inode) + + sizeof(__u32)*le32_to_cpu(info->in->fragtree.nsplits); + if ((err == ceph_decode_32(p, end, &info->symlink_len)) < 0) + return err; + info->symlink = *p; + *p += info->symlink_len; + if (unlikely(*p > end)) + return -EINVAL; + return 0; } -static void put_request(struct ceph_mds_request *req) +int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info *info) { - if (atomic_dec_and_test(&req->r_ref)) { - ceph_msg_put(req->r_request); - kfree(req); - } + __u32 numi; + int err = -EINVAL; + + if ((err = ceph_decode_32(p, end, &numi)) < 0) + goto bad; + if (numi == 0) + goto done; /* hrm, this shouldn't actually happen, but.. */ + + /* alloc one longer shared array */ + info->trace_nr = numi; + info->trace_in = kmalloc(numi * (sizeof(*info->trace_in) + + sizeof(*info->trace_dir) + + sizeof(*info->trace_dname) + + sizeof(*info->trace_dname_len)), + GFP_KERNEL); + if (info->trace_in == NULL) + return -ENOMEM; + info->trace_dir = (void*)(info->trace_in + numi); + info->trace_dname = (void*)(info->trace_dir + numi); + info->trace_dname_len = (void*)(info->trace_dname + numi); + + while (1) { + /* inode */ + if ((err = parse_reply_info_in(p, end, &info->trace_in[numi-1])) < 0) + goto bad; + if (--numi == 0) + break; + /* dentry */ + if ((err == ceph_decode_32(p, end, &info->trace_dname_len[numi])) < 0) + goto bad; + info->trace_dname[numi] = *p; + *p += info->trace_dname_len[numi]; + if (*p > end) + goto bad; + /* dir */ + info->trace_dir[numi] = *p; + *p += sizeof(struct ceph_mds_reply_dirfrag) + + sizeof(__u32)*le32_to_cpu(info->trace_dir[numi]->ndist); + if (unlikely(*p > end)) + goto bad; + } + +done: + if (*p != end) + return -EINVAL; + return 0; + +bad: + derr(1, "problem parsing trace %d\n", err); + return err; } -static struct ceph_mds_request *find_request_and_lock(struct ceph_mds_client *mdsc, __u64 tid) +int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info *info) { - struct ceph_mds_request *req; - spin_lock(&mdsc->lock); - req = radix_tree_lookup(&mdsc->request_tree, tid); - if (!req) { - spin_unlock(&mdsc->lock); - return NULL; + __u32 num, i = 0; + int err = -EINVAL; + + info->dir_dir = *p; + if (*p + sizeof(*info->dir_dir) > end) + goto bad; + *p += sizeof(*info->dir_dir) + sizeof(__u32)*info->dir_dir->ndist; + if (*p > end) + goto bad; + + if ((err = ceph_decode_32(p, end, &num)) < 0) + goto bad; + if (num == 0) + goto done; + + /* alloc large array */ + info->dir_nr = num; + info->dir_in = kmalloc(num * (sizeof(*info->dir_in) + + sizeof(*info->dir_dname) + + sizeof(*info->dir_dname_len)), + GFP_KERNEL); + if (info->dir_in == NULL) + return -ENOMEM; + info->dir_dname = (void*)(info->dir_in + num); + info->dir_dname_len = (void*)(info->dir_dname + num); + + while (num) { + /* dentry, inode */ + if ((err == ceph_decode_32(p, end, &info->dir_dname_len[i])) < 0) + goto bad; + info->dir_dname[i] = *p; + *p += info->dir_dname_len[i]; + if (*p > end) + goto bad; + if ((err = parse_reply_info_in(p, end, &info->dir_in[i])) < 0) + goto bad; + i++; + num--; } - get_request(req); - return req; + +done: + return 0; + +bad: + derr(1, "problem parsing dir contents %d\n", err); + return err; } -static struct ceph_mds_request *new_request(struct ceph_msg *msg, int mds) +int parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info *info) { - struct ceph_mds_request *req; + void *p, *end; + __u32 len; + int err = -EINVAL; - req = kmalloc(sizeof(*req), GFP_KERNEL); - req->r_request = msg; - req->r_reply = 0; - req->r_num_mds = 0; - req->r_attempts = 0; - req->r_num_fwd = 0; - req->r_resend_mds = mds; - atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */ - init_completion(&req->r_completion); - ceph_msg_get(msg); /* grab reference */ + memset(info, 0, sizeof(*info)); + info->head = msg->front.iov_base; - return req; -} + /* trace */ + p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); + end = p + msg->front.iov_len; + if ((err = ceph_decode_32(&p, end, &len)) < 0) + goto bad; + if (len > 0 && + (p + len > end || + (err = parse_reply_info_trace(&p, p+len, info)) < 0)) + goto bad; + /* dir content */ + if ((err = ceph_decode_32(&p, end, &len)) < 0) + goto bad; + if (len > 0 && + (p + len > end || + (err = parse_reply_info_dir(&p, p+len, info)) < 0)) + goto bad; -/* - * register an in-flight request. - * fill in tid in msg request header - */ -void __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) -{ - struct ceph_mds_request_head *head = req->r_request->front.iov_base; - req->r_tid = head->tid = ++mdsc->last_tid; - dout(30, "__register_request %p tid %lld\n", req, req->r_tid); - get_request(req); - radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req); + return 0; +bad: + derr(1, "parse_reply err %d\n", err); + return err; } -static void __unregister_request(struct ceph_mds_client *mdsc, - struct ceph_mds_request *req) +void destroy_reply_info(struct ceph_mds_reply_info *info) { - dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); - radix_tree_delete(&mdsc->request_tree, req->r_tid); - put_request(req); + if (info->trace_in) kfree(info->trace_in); + if (info->dir_in) kfree(info->dir_in); } /* - * choose mds to send request to next + * sessions */ -static int choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) + +static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc, int mds) { - /* is there a specific mds we should try? */ - if (req->r_resend_mds >= 0 && - ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0) - return req->r_resend_mds; + struct ceph_mds_session *session; + if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0) + return NULL; + session = mdsc->sessions[mds]; + atomic_inc(&session->s_ref); + return session; +} - /* pick one at random */ - return ceph_mdsmap_get_random_mds(mdsc->mdsmap); +void put_session(struct ceph_mds_session *s) +{ + BUG_ON(s == NULL); + dout(10, "put_session %p %d -> %d\n", s, + atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); + if (atomic_dec_and_test(&s->s_ref)) { + kfree(s); + s = NULL; + } } /* @@ -179,7 +282,7 @@ static struct ceph_mds_session *__register_session(struct ceph_mds_client *mdsc, } } if (mdsc->sessions[mds]) { - ceph_mdsc_put_session(s); /* lost race */ + put_session(s); /* lost race */ return mdsc->sessions[mds]; } else { mdsc->sessions[mds] = s; @@ -188,23 +291,126 @@ static struct ceph_mds_session *__register_session(struct ceph_mds_client *mdsc, } } -static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc, int mds) +static void unregister_session(struct ceph_mds_client *mdsc, int mds) { - struct ceph_mds_session *session; - if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0) + dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]); + put_session(mdsc->sessions[mds]); + mdsc->sessions[mds] = 0; +} + + +/* + * requests + */ + +static void get_request(struct ceph_mds_request *req) +{ + atomic_inc(&req->r_ref); +} + +static void drop_request_session_attempt_refs(struct ceph_mds_request *req) +{ + int i; + for (i=0; ir_num_mds; i++) + put_session(req->r_mds[i]); + req->r_num_mds = 0; +} + +void ceph_mdsc_put_request(struct ceph_mds_request *req) +{ + dout(10, "put_request %p %d -> %d\n", req, + atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1); + if (atomic_dec_and_test(&req->r_ref)) { + if (req->r_request) + ceph_msg_put(req->r_request); + if (req->r_reply) { + ceph_msg_put(req->r_reply); + destroy_reply_info(&req->r_reply_info); + } + if (req->r_session) + put_session(req->r_session); + drop_request_session_attempt_refs(req); + kfree(req); + } +} + +static struct ceph_mds_request *find_request_and_lock(struct ceph_mds_client *mdsc, __u64 tid) +{ + struct ceph_mds_request *req; + spin_lock(&mdsc->lock); + req = radix_tree_lookup(&mdsc->request_tree, tid); + if (!req) { + spin_unlock(&mdsc->lock); return NULL; - session = mdsc->sessions[mds]; - atomic_inc(&session->s_ref); - return session; + } + get_request(req); + return req; } -static void unregister_session(struct ceph_mds_client *mdsc, int mds) +static struct ceph_mds_request *new_request(struct ceph_msg *msg) { - dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]); - ceph_mdsc_put_session(mdsc->sessions[mds]); - mdsc->sessions[mds] = 0; + struct ceph_mds_request *req; + + req = kmalloc(sizeof(*req), GFP_KERNEL); + req->r_request = msg; + req->r_reply = 0; + req->r_last_inode = 0; + req->r_last_dentry = 0; + req->r_expects_cap = false; + req->r_cap = 0; + req->r_session = 0; + req->r_num_mds = 0; + req->r_attempts = 0; + req->r_num_fwd = 0; + req->r_resend_mds = -1; + atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */ + init_completion(&req->r_completion); + ceph_msg_get(msg); /* grab reference */ + + return req; } + +/* + * register an in-flight request. + * fill in tid in msg request header + */ +void __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) +{ + struct ceph_mds_request_head *head = req->r_request->front.iov_base; + req->r_tid = head->tid = ++mdsc->last_tid; + dout(30, "__register_request %p tid %lld\n", req, req->r_tid); + get_request(req); + radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req); +} + +static void __unregister_request(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); + radix_tree_delete(&mdsc->request_tree, req->r_tid); + ceph_mdsc_put_request(req); +} + + +/* + * choose mds to send request to next + */ +static int choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) +{ + /* is there a specific mds we should try? */ + if (req->r_resend_mds >= 0 && + ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0) + return req->r_resend_mds; + + /* pick one at random */ + return ceph_mdsmap_get_random_mds(mdsc->mdsmap); +} + + +/* + * session messages + */ static struct ceph_msg *create_session_msg(__u32 op, __u64 seq) { struct ceph_msg *msg; @@ -411,7 +617,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg dout(0, "bad session op %d\n", op); BUG_ON(1); } - ceph_mdsc_put_session(session); + put_session(session); spin_unlock(&mdsc->lock); out: @@ -427,12 +633,13 @@ bad: /* exported functions */ -struct ceph_msg * +struct ceph_mds_request * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, - ceph_ino_t ino1, const char *path1, + ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2) { - struct ceph_msg *req; + struct ceph_msg *msg; + struct ceph_mds_request *req; struct ceph_mds_request_head *head; void *p, *end; int pathlen; @@ -441,14 +648,19 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, if (path1) pathlen += strlen(path1); if (path2) pathlen += strlen(path2); - req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(struct ceph_mds_request_head) + pathlen, 0, 0, 0); - if (IS_ERR(req)) + if (IS_ERR(msg)) + return ERR_PTR(PTR_ERR(msg)); + req = new_request(msg); + if (IS_ERR(req)) { + ceph_msg_put(msg); return req; - head = req->front.iov_base; - p = req->front.iov_base + sizeof(*head); - end = req->front.iov_base + req->front.iov_len; + } + head = msg->front.iov_base; + p = msg->front.iov_base + sizeof(*head); + end = msg->front.iov_base + msg->front.iov_len; /* encode head */ head->client_inst = mdsc->client->msgr->inst; @@ -471,7 +683,6 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, dout(10, "create_request path2 %llx/%s\n", ino2, path2); BUG_ON(p != end); - return req; } @@ -488,21 +699,16 @@ __u64 get_oldest_tid(struct ceph_mds_client *mdsc) return first->r_tid; } -int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, - struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession) +int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { - struct ceph_mds_request *req; - struct ceph_mds_request_head *rhead; struct ceph_mds_session *session; - struct ceph_msg *reply = 0; + struct ceph_mds_request_head *rhead; int err; int mds = -1; - - dout(30, "do_request on %p\n", msg); - BUG_ON(le32_to_cpu(msg->hdr.type) != CEPH_MSG_CLIENT_REQUEST); - - req = new_request(msg, mds); - + + dout(30, "do_request on %p\n", req); + BUG_ON(le32_to_cpu(req->r_request->hdr.type) != CEPH_MSG_CLIENT_REQUEST); + radix_tree_preload(GFP_KERNEL); spin_lock(&mdsc->lock); __register_request(mdsc, req); @@ -518,7 +724,8 @@ retry: /* get session */ session = __get_session(mdsc, mds); - dout(30, "do_request __get_session returned %p state %d\n", session, (session ? session->s_state:0)); + dout(30, "do_request __get_session returned %p state %d\n", + session, (session ? session->s_state:0)); if (!session) session = __register_session(mdsc, mds); dout(30, "do_request session %p state %d\n", session, session->s_state); @@ -533,13 +740,13 @@ retry: if (session->s_state != CEPH_MDS_SESSION_OPEN) { dout(30, "do_request session %p not open, state=%d, waiting\n", session, session->s_state); - ceph_mdsc_put_session(session); + put_session(session); goto retry; } /* make request? */ BUG_ON(req->r_num_mds >= 2); - req->r_mds[req->r_num_mds++] = mds; + req->r_mds[req->r_num_mds++] = session; req->r_resend_mds = -1; /* forget any specific mds hint */ req->r_attempts++; rhead = req->r_request->front.iov_base; @@ -551,28 +758,20 @@ retry: send_msg_mds(mdsc, req->r_request, mds); wait_for_completion(&req->r_completion); spin_lock(&mdsc->lock); + if (!req->r_reply) + goto retry; /* clean up request, parse reply */ - if (!req->r_reply) { - ceph_mdsc_put_session(session); - goto retry; - } - reply = req->r_reply; __unregister_request(mdsc, req); spin_unlock(&mdsc->lock); - put_request(req); - if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) { - ceph_mdsc_put_session(session); - return err; - } - dout(30, "do_request done on %p result %d tracelen %d\n", msg, - rinfo->head->result, rinfo->trace_nr); + ceph_msg_put(req->r_request); + req->r_request = 0; + drop_request_session_attempt_refs(req); - if (psession) - *psession = session; - else - ceph_mdsc_put_session(session); + dout(30, "do_request done on %p result %d tracelen %d\n", req, + req->r_reply_info.head->result, + req->r_reply_info.trace_nr); return 0; } @@ -580,197 +779,65 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { struct ceph_mds_request *req; struct ceph_mds_reply_head *head = msg->front.iov_base; + struct ceph_mds_reply_info *rinfo; __u64 tid; + int err; + int mds; /* extract tid */ if (msg->front.iov_len < sizeof(*head)) { - dout(1, "got corrupt (short) reply\n"); - goto done; + dout(1, "handle_reply got corrupt (short) reply\n"); + return; } tid = le64_to_cpu(head->tid); /* pass to blocked caller */ req = find_request_and_lock(mdsc, tid); if (!req) { - dout(1, "got reply on unknown tid %llu\n", tid); - } else { - BUG_ON(req->r_reply); - req->r_reply = msg; - spin_unlock(&mdsc->lock); - - ceph_msg_get(msg); - complete(&req->r_completion); - put_request(req); - } -done: - return; -} + dout(1, "handle_reply on unknown tid %llu\n", tid); + return; + } -/* - * mds reply parsing - */ -int parse_reply_info_in(void **p, void *end, struct ceph_mds_reply_info_in *info) -{ - int err; - info->in = *p; - *p += sizeof(struct ceph_mds_reply_inode) + - sizeof(__u32)*le32_to_cpu(info->in->fragtree.nsplits); - if ((err == ceph_decode_32(p, end, &info->symlink_len)) < 0) - return err; - info->symlink = *p; - *p += info->symlink_len; - if (unlikely(*p > end)) - return -EINVAL; - return 0; -} + /* session */ + mds = le32_to_cpu(msg->hdr.src.name.num); + req->r_session = __get_session(mdsc, mds); + BUG_ON(req->r_session == 0); -int parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info *info) -{ - __u32 numi; - int err = -EINVAL; + BUG_ON(req->r_reply); + req->r_reply = msg; + ceph_msg_get(msg); - if ((err = ceph_decode_32(p, end, &numi)) < 0) - goto bad; - if (numi == 0) - goto done; /* hrm, this shouldn't actually happen, but.. */ + spin_unlock(&mdsc->lock); /* unlock */ - /* alloc one longer shared array */ - info->trace_nr = numi; - info->trace_in = kmalloc(numi * (sizeof(*info->trace_in) + - sizeof(*info->trace_dir) + - sizeof(*info->trace_dname) + - sizeof(*info->trace_dname_len)), - GFP_KERNEL); - if (info->trace_in == NULL) - return -ENOMEM; - info->trace_dir = (void*)(info->trace_in + numi); - info->trace_dname = (void*)(info->trace_dir + numi); - info->trace_dname_len = (void*)(info->trace_dname + numi); - - while (1) { - /* inode */ - if ((err = parse_reply_info_in(p, end, &info->trace_in[numi-1])) < 0) - goto bad; - if (--numi == 0) - break; - /* dentry */ - if ((err == ceph_decode_32(p, end, &info->trace_dname_len[numi])) < 0) - goto bad; - info->trace_dname[numi] = *p; - *p += info->trace_dname_len[numi]; - if (*p > end) - goto bad; - /* dir */ - info->trace_dir[numi] = *p; - *p += sizeof(struct ceph_mds_reply_dirfrag) + - sizeof(__u32)*le32_to_cpu(info->trace_dir[numi]->ndist); - if (unlikely(*p > end)) - goto bad; + /* parse */ + rinfo = &req->r_reply_info; + if ((err = parse_reply_info(msg, rinfo)) < 0) { + derr(0, "handle_reply got corrupt reply, resend?\n"); + BUG_ON(1); + /* hrm! retry? pass error up? FIXME */ } - -done: - if (*p != end) - return -EINVAL; - return 0; -bad: - derr(1, "problem parsing trace %d\n", err); - return err; -} - -int parse_reply_info_dir(void **p, void *end, struct ceph_mds_reply_info *info) -{ - __u32 num, i = 0; - int err = -EINVAL; - - info->dir_dir = *p; - if (*p + sizeof(*info->dir_dir) > end) - goto bad; - *p += sizeof(*info->dir_dir) + sizeof(__u32)*info->dir_dir->ndist; - if (*p > end) - goto bad; - - if ((err = ceph_decode_32(p, end, &num)) < 0) - goto bad; - if (num == 0) - goto done; - - /* alloc large array */ - info->dir_nr = num; - info->dir_in = kmalloc(num * (sizeof(*info->dir_in) + - sizeof(*info->dir_dname) + - sizeof(*info->dir_dname_len)), - GFP_KERNEL); - if (info->dir_in == NULL) - return -ENOMEM; - info->dir_dname = (void*)(info->dir_in + num); - info->dir_dname_len = (void*)(info->dir_dname + num); - - while (num) { - /* dentry, inode */ - if ((err == ceph_decode_32(p, end, &info->dir_dname_len[i])) < 0) - goto bad; - info->dir_dname[i] = *p; - *p += info->dir_dname_len[i]; - if (*p > end) - goto bad; - if ((err = parse_reply_info_in(p, end, &info->dir_in[i])) < 0) - goto bad; - i++; - num--; + err = le32_to_cpu(rinfo->head->result); + dout(10, "handle_reply tid %lld result %d\n", tid, err); + if (err == 0 && + mdsc->client->sb->s_root) { /* mounted? */ + err = ceph_fill_trace(mdsc->client->sb, rinfo, + &req->r_last_inode, + &req->r_last_dentry); + + if (req->r_expects_cap) { + req->r_cap = ceph_add_cap(req->r_last_inode, req->r_session, + le32_to_cpu(rinfo->head->file_caps), + le32_to_cpu(rinfo->head->file_caps_seq)); + } } - -done: - return 0; - -bad: - derr(1, "problem parsing dir contents %d\n", err); - return err; -} - - -int ceph_mdsc_parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info *info) -{ - void *p, *end; - __u32 len; - int err = -EINVAL; - - memset(info, 0, sizeof(*info)); - info->head = msg->front.iov_base; - - /* trace */ - p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); - end = p + msg->front.iov_len; - if ((err = ceph_decode_32(&p, end, &len)) < 0) - goto bad; - if (len > 0 && - (p + len > end || - (err = parse_reply_info_trace(&p, p+len, info)) < 0)) - goto bad; - - /* dir content */ - if ((err = ceph_decode_32(&p, end, &len)) < 0) - goto bad; - if (len > 0 && - (p + len > end || - (err = parse_reply_info_dir(&p, p+len, info)) < 0)) - goto bad; - - info->reply = msg; - return 0; -bad: - derr(1, "parse_reply err %d\n", err); - ceph_msg_put(msg); - return err; + + /* kick calling process */ + complete(&req->r_completion); + ceph_mdsc_put_request(req); + return; } -void ceph_mdsc_destroy_reply_info(struct ceph_mds_reply_info *info) -{ - if (info->trace_in) kfree(info->trace_in); - if (info->dir_in) kfree(info->dir_in); - ceph_msg_put(info->reply); - info->reply = 0; -} /* @@ -785,7 +852,8 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg int err; void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; - + int frommds = le32_to_cpu(msg->hdr.src.name.num); + /* decode */ if ((err = ceph_decode_64(&p, end, &tid)) != 0) goto bad; @@ -807,21 +875,22 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg if (fwd_seq > req->r_num_fwd) { req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; + drop_request_session_attempt_refs(req); req->r_num_mds = 1; - req->r_mds[0] = le32_to_cpu(msg->hdr.src.name.num); + req->r_mds[0] = __get_session(mdsc, frommds); } spin_unlock(&mdsc->lock); } else { /* no, resend. */ BUG_ON(fwd_seq <= req->r_num_fwd); /* forward race not possible; mds would drop */ - - req->r_num_mds = 0; + drop_request_session_attempt_refs(req); req->r_resend_mds = next_mds; spin_unlock(&mdsc->lock); complete(&req->r_completion); } - put_request(req); + ceph_mdsc_put_request(req); + spin_unlock(&mdsc->lock); return; bad: @@ -846,9 +915,10 @@ void kick_requests(struct ceph_mds_client *mdsc, int mds) if (got == 0) break; nexttid = reqs[got-1]->r_tid + 1; for (i=0; ir_num_mds >= 1 && reqs[i]->r_mds[0] == mds) || - (reqs[i]->r_num_mds >= 2 && reqs[i]->r_mds[1] == mds)) { + if ((reqs[i]->r_num_mds >= 1 && reqs[i]->r_mds[0]->s_mds == mds) || + (reqs[i]->r_num_mds >= 2 && reqs[i]->r_mds[1]->s_mds == mds)) { dout(10, " kicking req %llu\n", reqs[i]->r_tid); + /* FIXME */ complete(&reqs[i]->r_completion); } } @@ -970,7 +1040,7 @@ send: } else { dout(0, "WARNING: reconnect on %p raced with somethign and lost?\n", session); } - ceph_mdsc_put_session(session); + put_session(session); } return; diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index dee3ddfabd3cd..bb84e61707862 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -11,6 +11,31 @@ struct ceph_client; +/* + * for mds reply parsing + */ +struct ceph_mds_reply_info_in { + struct ceph_mds_reply_inode *in; + __u32 symlink_len; + char *symlink; +}; + +struct ceph_mds_reply_info { + struct ceph_mds_reply_head *head; + + int trace_nr; + struct ceph_mds_reply_info_in *trace_in; + struct ceph_mds_reply_dirfrag **trace_dir; + char **trace_dname; + __u32 *trace_dname_len; + + struct ceph_mds_reply_dirfrag *dir_dir; + int dir_nr; + struct ceph_mds_reply_info_in *dir_in; + char **dir_dname; + __u32 *dir_dname_len; +}; + /* * state associated with each MDS<->client session */ @@ -40,8 +65,15 @@ struct ceph_mds_request { __u64 r_tid; struct ceph_msg * r_request; /* original request */ struct ceph_msg * r_reply; - - __u32 r_mds[2]; /* set of mds's with whom request may be outstanding */ + struct ceph_mds_reply_info r_reply_info; + struct inode * r_last_inode; + struct dentry * r_last_dentry; + bool r_expects_cap; + struct ceph_inode_cap * r_cap; + struct ceph_mds_session * r_session; + struct ceph_mds_session * r_mds[2]; + + //__u32 r_mds[2]; /* set of mds's with whom request may be outstanding */ int r_num_mds; /* items in r_mds */ int r_attempts; /* resend attempts */ @@ -68,32 +100,6 @@ struct ceph_mds_client { struct delayed_work delayed_work; /* delayed work */ }; -/* - * for mds reply parsing - */ -struct ceph_mds_reply_info_in { - struct ceph_mds_reply_inode *in; - __u32 symlink_len; - char *symlink; -}; - -struct ceph_mds_reply_info { - struct ceph_msg *reply; - struct ceph_mds_reply_head *head; - - int trace_nr; - struct ceph_mds_reply_info_in *trace_in; - struct ceph_mds_reply_dirfrag **trace_dir; - char **trace_dname; - __u32 *trace_dname_len; - - struct ceph_mds_reply_dirfrag *dir_dir; - int dir_nr; - struct ceph_mds_reply_info_in *dir_in; - char **dir_dname; - __u32 *dir_dname_len; -}; - extern const char* ceph_mds_op_name(int op); extern void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client); @@ -108,23 +114,8 @@ extern void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_ struct ceph_inode_info; extern int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted); -extern struct ceph_msg *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2); -extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, - struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession); - -static __inline__ void ceph_mdsc_put_session(struct ceph_mds_session *s) -{ - BUG_ON(s == NULL); - if (atomic_dec_and_test(&s->s_ref)) { - kfree(s); - s = NULL; - } -} - - -extern int ceph_mdsc_parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info *info); -extern void ceph_mdsc_destroy_reply_info(struct ceph_mds_reply_info *info); -extern void ceph_mdsc_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *i); - +extern struct ceph_mds_request *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2); +extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req); +extern void ceph_mdsc_put_request(struct ceph_mds_request *req); #endif diff --git a/src/kernel/super.h b/src/kernel/super.h index 1355bc7bb6f61..f140681bf9595 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -249,7 +249,7 @@ static inline struct ceph_client *ceph_sb_to_client(struct super_block *sb) struct ceph_file_info { u32 frag; /* one frag at a time; screw seek_dir() on large dirs */ int mode; /* initialized on open */ - struct ceph_mds_reply_info rinfo; + struct ceph_mds_request *req; }; @@ -323,8 +323,7 @@ extern int ceph_fill_trace(struct super_block *sb, struct ceph_mds_reply_info *prinfo, struct inode **lastinode, struct dentry **lastdentry); -extern int ceph_request_lookup(struct super_block *sb, struct dentry *dentry, - struct ceph_mds_reply_info *prinfo); +extern int ceph_request_lookup(struct super_block *sb, struct dentry *dentry); extern void ceph_touch_dentry(struct dentry *dentry); /* proc.c */ -- 2.39.5