From fddd4f5db0a5f63344fd446f4c8fdf2753d0bef5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 17 Jan 2008 16:01:36 -0800 Subject: [PATCH] link caps into a per-mds-session list --- src/include/ceph_fs.h | 1 + src/kernel/client.c | 33 ++++++++++++---------- src/kernel/dir.c | 14 +++++----- src/kernel/file.c | 6 ++-- src/kernel/inode.c | 30 ++++++++++++++++---- src/kernel/mds_client.c | 62 +++++++++++++++++++++++++---------------- src/kernel/mds_client.h | 10 ++++++- src/kernel/messenger.h | 2 +- src/kernel/super.h | 3 +- src/mds/Server.h | 2 +- 10 files changed, 107 insertions(+), 56 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 4e48400b0626e..2ad3f846b21c9 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -454,6 +454,7 @@ struct ceph_mds_cap_reconnect { __le64 size; struct ceph_timeval mtime, atime; } __attribute__ ((packed)); +/* followed by encoded string */ diff --git a/src/kernel/client.c b/src/kernel/client.c index 8985c484e28c4..f992a154ab2b3 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -46,11 +46,11 @@ static void put_client_counter(void) } -int parse_open_reply(struct ceph_msg *reply, struct inode *inode) +int parse_open_reply(struct ceph_msg *reply, struct inode *inode, struct ceph_mds_session *session) { struct ceph_mds_reply_head *head; struct ceph_mds_reply_info rinfo; - int frommds = reply->hdr.src.name.num; + int frommds = session->s_mds; int err; struct ceph_inode_cap *cap; @@ -67,7 +67,7 @@ int parse_open_reply(struct ceph_msg *reply, struct inode *inode) return err; /* fill in cap */ - cap = ceph_add_cap(inode, frommds, + cap = ceph_add_cap(inode, session, le32_to_cpu(head->file_caps), le32_to_cpu(head->file_caps_seq)); if (IS_ERR(cap)) @@ -85,6 +85,7 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a struct ceph_msg *req = 0; struct ceph_mds_request_head *reqhead; struct ceph_mds_reply_info rinfo; + struct ceph_mds_session *session; int frommds; int err; struct ceph_inode_cap *cap; @@ -98,12 +99,12 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a reqhead = req->front.iov_base; reqhead->args.open.flags = O_DIRECTORY; reqhead->args.open.mode = 0; - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session)) < 0) return err; err = le32_to_cpu(rinfo.head->result); - if (err != 0) - return err; + if (err != 0) + goto out; if (rinfo.trace_nr == 0) { dout(10, "open_root_inode wtf, mds returns 0 but no trace\n"); err = -EINVAL; @@ -111,22 +112,24 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a /* create root inode */ inode = iget_locked(client->sb, rinfo.trace_in[rinfo.trace_nr-1].in->ino); - if (inode == NULL) - return -ENOMEM; + if (inode == NULL) { + err = -ENOMEM; + goto out; + } if (inode->i_state & I_NEW) unlock_new_inode(inode); if ((err = ceph_fill_inode(inode, rinfo.trace_in[rinfo.trace_nr-1].in)) < 0) - goto out; + goto out2; /* fill in cap */ frommds = rinfo.reply->hdr.src.name.num; - cap = ceph_add_cap(inode, frommds, + cap = ceph_add_cap(inode, session, le32_to_cpu(rinfo.head->file_caps), le32_to_cpu(rinfo.head->file_caps_seq)); if (IS_ERR(cap)) { err = PTR_ERR(cap); - goto out; + goto out2; } ci = ceph_inode(inode); ci->i_nr_by_mode[FILE_MODE_PIN]++; @@ -135,16 +138,18 @@ static int open_root_inode(struct ceph_client *client, struct ceph_mount_args *a if (root == NULL) { err = -ENOMEM; /* fixme: also close? */ - goto out; + goto out2; } client->sb->s_root = root; - dout(30, "open_root_inode success.\n"); + dout(30, "open_root_inode success, root d is %p.\n", root); return 0; -out: +out2: dout(30, "open_root_inode failure %d\n", err); iput(inode); +out: + ceph_mdsc_put_session(session); return err; } diff --git a/src/kernel/dir.c b/src/kernel/dir.c index f8932c718f012..e3db23c83d77f 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -117,7 +117,7 @@ nextfrag: return PTR_ERR(req); rhead = req->front.iov_base; rhead->args.readdir.frag = cpu_to_le32(frag); - if ((err = ceph_mdsc_do_request(mdsc, req, &fi->rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &fi->rinfo, 0)) < 0) return err; err = le32_to_cpu(fi->rinfo.head->result); dout(10, "dir_readdir got and parsed readdir result=%d on frag %u\n", err, frag); @@ -251,7 +251,7 @@ static struct dentry *ceph_dir_lookup(struct inode *dir, struct dentry *dentry, kfree(path); if (IS_ERR(req)) return ERR_PTR(PTR_ERR(req)); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) return ERR_PTR(err); err = le32_to_cpu(rinfo.head->result); dout(20, "dir_lookup result=%d\n", err); @@ -407,7 +407,7 @@ static int ceph_dir_mknod(struct inode *dir, struct dentry *dentry, int mode, de rhead = req->front.iov_base; rhead->args.mknod.mode = cpu_to_le32(mode); rhead->args.mknod.rdev = cpu_to_le32(rdev); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) { + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) { d_drop(dentry); return err; } @@ -453,7 +453,7 @@ static int ceph_dir_symlink(struct inode *dir, struct dentry *dentry, const char d_drop(dentry); return PTR_ERR(req); } - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) { + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) { d_drop(dentry); return err; } @@ -502,7 +502,7 @@ static int ceph_dir_mkdir(struct inode *dir, struct dentry *dentry, int mode) } rhead = req->front.iov_base; rhead->args.mkdir.mode = cpu_to_le32(mode); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) { + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) { d_drop(dentry); return err; } @@ -549,7 +549,7 @@ static int ceph_dir_unlink(struct inode *dir, struct dentry *dentry) kfree(path); if (IS_ERR(req)) return PTR_ERR(req); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) return err; err = le32_to_cpu(rinfo.head->result); @@ -589,7 +589,7 @@ static int ceph_dir_rename(struct inode *old_dir, struct dentry *old_dentry, kfree(newpath); if (IS_ERR(req)) return PTR_ERR(req); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, 0)) < 0) return err; err = le32_to_cpu(rinfo.head->result); diff --git a/src/kernel/file.c b/src/kernel/file.c index f6c87636851a8..7839e8d20f54c 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -16,6 +16,7 @@ struct ceph_inode_cap *ceph_do_open(struct inode *inode, struct file *file) struct ceph_msg *req; struct ceph_mds_request_head *rhead; struct ceph_mds_reply_info rinfo; + struct ceph_mds_session *session; struct dentry *dentry; int frommds; struct ceph_inode_cap *cap; @@ -34,14 +35,15 @@ struct ceph_inode_cap *ceph_do_open(struct inode *inode, struct file *file) return ERR_PTR(PTR_ERR(req)); rhead = req->front.iov_base; rhead->args.open.flags = cpu_to_le32(flags); - if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0) + if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, &session)) < 0) return ERR_PTR(err); dout(10, "open got and parsed result\n"); frommds = rinfo.reply->hdr.src.name.num; - cap = ceph_add_cap(inode, frommds, + cap = ceph_add_cap(inode, session, le32_to_cpu(rinfo.head->file_caps), le32_to_cpu(rinfo.head->file_caps_seq)); + ceph_mdsc_put_session(session); return cap; } diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 207cf764d336f..623a2e2481b23 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -152,8 +152,9 @@ static struct ceph_inode_cap *get_cap_for_mds(struct inode *inode, int mds) } -struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq) +struct ceph_inode_cap *ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u32 cap, u32 seq) { + int mds = session->s_mds; struct ceph_inode_info *ci = ceph_inode(inode); int i; @@ -174,12 +175,18 @@ struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 s kfree(o); ci->i_max_caps *= 2; } - + ci->i_caps[i].ci = ci; ci->i_caps[i].caps = 0; ci->i_caps[i].mds = mds; ci->i_caps[i].seq = 0; ci->i_caps[i].flags = 0; ci->i_nr_caps++; + + ci->i_caps[i].session = session; + spin_lock(&session->s_cap_lock); + list_add(&ci->i_caps[i].session_caps, &session->s_caps); + session->s_nr_caps++; + spin_unlock(&session->s_cap_lock); } dout(10, "add_cap inode %p (%lu) got cap %d %xh now %xh seq %d from %d\n", @@ -200,11 +207,24 @@ int ceph_get_caps(struct ceph_inode_info *ci) return have; } +void __remove_cap(struct ceph_inode_cap *cap) +{ + /* remove from session list */ + struct ceph_mds_session *session = cap->session; + spin_lock(&session->s_cap_lock); + list_del(&cap->session_caps); + session->s_nr_caps--; + spin_unlock(&session->s_cap_lock); + cap->session = 0; +} + void ceph_remove_caps(struct ceph_inode_info *ci) { - dout(10, "remove_caps on %p nr %d i_caps %p\n", &ci->vfs_inode, - ci->i_nr_caps, ci->i_caps); + int i; + dout(10, "remove_caps on %p nr %d\n", &ci->vfs_inode, ci->i_nr_caps); if (ci->i_nr_caps) { + for (i=0; ii_nr_caps; i++) + __remove_cap(&ci->i_caps[i]); iput(&ci->vfs_inode); ci->i_nr_caps = 0; if (ci->i_caps != ci->i_caps_static) { @@ -243,7 +263,7 @@ int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, dout(10, "2\n"); if (!cap) { dout(10, "adding new cap inode %p for mds%d\n", inode, mds); - cap = ceph_add_cap(inode, mds, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq)); + cap = ceph_add_cap(inode, session, le32_to_cpu(grant->caps), le32_to_cpu(grant->seq)); return 0; } diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index ffb24a4aea0c1..5c17ee8978e00 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -112,6 +112,7 @@ static void register_session(struct ceph_mds_client *mdsc, int mds) s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; s->s_cap_seq = 0; + spin_lock_init(&s->s_cap_lock); INIT_LIST_HEAD(&s->s_caps); s->s_nr_caps = 0; atomic_set(&s->s_ref, 1); @@ -132,16 +133,10 @@ static struct ceph_mds_session *get_session(struct ceph_mds_client *mdsc, int md return session; } -static void put_session(struct ceph_mds_session *s) -{ - if (atomic_dec_and_test(&s->s_ref)) - kfree(s); -} - static void unregister_session(struct ceph_mds_client *mdsc, int mds) { dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]); - put_session(mdsc->sessions[mds]); + ceph_mdsc_put_session(mdsc->sessions[mds]); mdsc->sessions[mds] = 0; } @@ -234,7 +229,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg session = get_session(mdsc, from); session->s_state = CEPH_MDS_SESSION_OPEN; complete(&session->s_completion); - put_session(session); + ceph_mdsc_put_session(session); break; case CEPH_SESSION_CLOSE: @@ -247,7 +242,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg dout(1, "ignoring session close from mds%d, seq %llu < my seq %llu\n", msg->hdr.src.name.num, seq, session->s_cap_seq); } - put_session(session); + ceph_mdsc_put_session(session); break; default: @@ -345,13 +340,14 @@ __u64 get_oldest_tid(struct ceph_mds_client *mdsc) } int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, - struct ceph_mds_reply_info *rinfo, int mds) + struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession) { struct ceph_mds_request *req; struct ceph_mds_request_head *rhead; struct ceph_mds_session *session; struct ceph_msg *reply = 0; int err; + int mds = -1; dout(30, "do_request on %p type %d\n", msg, msg->hdr.type); @@ -379,10 +375,9 @@ retry: } if (session->s_state != CEPH_MDS_SESSION_OPEN) { dout(30, "do_request session %p not open, %d\n", session, session->s_state); - put_session(session); + ceph_mdsc_put_session(session); goto retry; } - put_session(session); /* make request? */ BUG_ON(req->r_num_mds >= 2); @@ -400,17 +395,25 @@ retry: spin_lock(&mdsc->lock); /* clean up request, parse reply */ - if (!req->r_reply) + if (!req->r_reply) { + ceph_mdsc_put_session(session); goto retry; + } reply = req->r_reply; unregister_request(mdsc, req); spin_unlock(&mdsc->lock); put_request(req); - if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) + if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) { + ceph_mdsc_put_session(session); return err; + } dout(30, "do_request done on %p result %d tracelen %d\n", msg, rinfo->head->result, rinfo->trace_nr); + if (psession) + *psession = session; + else + ceph_mdsc_put_session(session); return 0; } @@ -716,6 +719,7 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) char *path; int pathlen, err; struct dentry *dentry; + struct ceph_inode_info *ci; dout(10, "send_mds_reconnect mds%d\n", mds); @@ -735,8 +739,8 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) dout(10, "session %p state %d\n", session, session->s_state); /* estimate needed space */ - len += session->s_nr_caps * sizeof(struct ceph_mds_cap_reconnect); - len += session->s_nr_caps * (100); /* ugly hack */ + len += (session->s_nr_caps+1) * sizeof(struct ceph_mds_cap_reconnect); + len += (session->s_nr_caps+1) * (100); /* ugly hack */ dout(40, "estimating i need %d bytes for %d caps\n", len, session->s_nr_caps); /* build reply */ @@ -745,19 +749,27 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) return; p = reply->front.iov_base; end = p + len; - + /* traverse this session's caps */ ceph_encode_8(&p, end, 0); + dout(10, "locking\n"); + spin_lock(&session->s_cap_lock); + dout(10, "locked\n"); ceph_encode_32(&p, end, session->s_nr_caps); + dout(10, "iterating %p %p\n", session->s_caps.next, session->s_caps.prev); list_for_each(cp, &session->s_caps) { + dout(10, "cp is %p .. %p %p\n", cp, cp->next, cp->prev); cap = list_entry(cp, struct ceph_inode_cap, session_caps); - dout(10, " adding cap %p on ino %lx\n", cap, cap->ci->vfs_inode.i_ino); - ceph_encode_32(&p, end, ceph_caps_wanted(cap->ci)); - ceph_encode_32(&p, end, ceph_caps_issued(cap->ci)); - ceph_encode_64(&p, end, cap->ci->i_wr_size); - ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_mtime); //i_wr_mtime - ceph_encode_timespec(&p, end, &cap->ci->vfs_inode.i_atime); /* atime.. fixme */ - dentry = list_entry(&cap->ci->vfs_inode.i_dentry, struct dentry, d_alias); + ci = cap->ci; + dout(10, "cap is %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); + dout(10, " adding cap %p on ino %lx\n", cap, ci->vfs_inode.i_ino); + ceph_encode_64(&p, end, ci->vfs_inode.i_ino); + ceph_encode_32(&p, end, ceph_caps_wanted(ci)); + ceph_encode_32(&p, end, ceph_caps_issued(ci)); + ceph_encode_64(&p, end, ci->i_wr_size); + ceph_encode_timespec(&p, end, &ci->vfs_inode.i_mtime); //i_wr_mtime + ceph_encode_timespec(&p, end, &ci->vfs_inode.i_atime); /* atime.. fixme */ + dentry = d_find_alias(&ci->vfs_inode); path = ceph_build_dentry_path(dentry, &pathlen); if (IS_ERR(path)) { err = PTR_ERR(path); @@ -779,6 +791,8 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) ceph_encode_string(&p, end, path, pathlen); kfree(path); } + dout(10, "unlocking\n"); + spin_unlock(&session->s_cap_lock); len = p - reply->front.iov_base; reply->hdr.front_len = reply->front.iov_len = len; diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index d4943738eb7fa..10e30c0942357 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -24,6 +24,7 @@ struct ceph_mds_session { int s_mds; int s_state; __u64 s_cap_seq; /* cap message count/seq from mds */ + spinlock_t s_cap_lock; struct list_head s_caps; int s_nr_caps; atomic_t s_ref; @@ -105,7 +106,14 @@ extern int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted); extern struct ceph_msg *ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2); extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, - struct ceph_mds_reply_info *rinfo, int mds); + struct ceph_mds_reply_info *rinfo, struct ceph_mds_session **psession); + +static __inline__ void ceph_mdsc_put_session(struct ceph_mds_session *s) +{ + if (atomic_dec_and_test(&s->s_ref)) + kfree(s); +} + extern int ceph_mdsc_parse_reply_info(struct ceph_msg *msg, struct ceph_mds_reply_info *info); extern void ceph_mdsc_destroy_reply_info(struct ceph_mds_reply_info *info); diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 948afd8b43f87..3cbe178548fee 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -270,7 +270,7 @@ static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, static __inline__ int ceph_encode_string(void **p, void *end, const char *s, __u32 len) { - BUG_ON(*p + sizeof(len) > end); + BUG_ON(*p + sizeof(len) + len > end); ceph_encode_32(p, end, len); if (len) memcpy(*p, s, len); *p += len; diff --git a/src/kernel/super.h b/src/kernel/super.h index d268c11cf45fc..6df3004567397 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -93,6 +93,7 @@ struct ceph_inode_cap { u64 seq; int flags; /* stale, etc.? */ struct ceph_inode_info *ci; + struct ceph_mds_session *session; struct list_head session_caps; /* per-session caplist */ }; @@ -215,7 +216,7 @@ extern int ceph_mount(struct ceph_client *client, struct ceph_mount_args *args); /* inode.c */ extern int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info); extern struct ceph_inode_cap *ceph_find_cap(struct inode *inode, int want); -extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, int mds, u32 cap, u32 seq); +extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u32 cap, u32 seq); extern void ceph_remove_caps(struct ceph_inode_info *ci); extern int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_file_caps *grant, struct ceph_mds_session *session); diff --git a/src/mds/Server.h b/src/mds/Server.h index d2252f33df7bc..664d22e13ad7f 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -62,7 +62,7 @@ public: void terminate_sessions(); void reconnect_clients(); void handle_client_reconnect(class MClientReconnect *m); - void process_reconnect_cap(CInode *in, int from, inode_caps_reconnect_t& capinfo); + void process_reconnect_cap(CInode *in, int from, ceph_mds_cap_reconnect& capinfo); void add_reconnected_cap_inode(CInode *in) { reconnected_caps.insert(in); } -- 2.39.5