From 266c95510d8b8ea8a6c2969bbc570c6ef8dc322f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 21 Apr 2008 10:56:19 -0700 Subject: [PATCH] kclient: fixed mds client session handling, request kicking --- src/kernel/mds_client.c | 130 +++++++++++++++++++++------------------- src/kernel/mds_client.h | 3 +- src/kernel/messenger.c | 28 +++++++++ src/kernel/messenger.h | 2 + src/kernel/osd_client.c | 28 +-------- 5 files changed, 99 insertions(+), 92 deletions(-) diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index e563b716d4f97..8aefa225b7b34 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -353,12 +353,16 @@ static void get_request(struct ceph_mds_request *req) atomic_inc(&req->r_ref); } -static void drop_request_session_attempt_refs(struct ceph_mds_request *req) +static void put_request_sessions(struct ceph_mds_request *req) { - int i; - for (i = 0; i < req->r_num_mds; i++) - put_session(req->r_mds[i]); - req->r_num_mds = 0; + if (req->r_session) { + put_session(req->r_session); + req->r_session = 0; + } + if (req->r_fwd_session) { + put_session(req->r_fwd_session); + req->r_fwd_session = 0; + } } void ceph_mdsc_put_request(struct ceph_mds_request *req) @@ -372,13 +376,11 @@ void ceph_mdsc_put_request(struct ceph_mds_request *req) ceph_msg_put(req->r_reply); destroy_reply_info(&req->r_reply_info); } - if (req->r_session) - put_session(req->r_session); if (req->r_last_inode) iput(req->r_last_inode); if (req->r_last_dentry) dput(req->r_last_dentry); - drop_request_session_attempt_refs(req); + put_request_sessions(req); kfree(req); } } @@ -411,7 +413,7 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg) req->r_fmode = 0; req->r_cap = 0; req->r_session = 0; - req->r_num_mds = 0; + req->r_fwd_session = 0; req->r_attempts = 0; req->r_num_fwd = 0; req->r_resend_mds = -1; @@ -748,6 +750,35 @@ static void wake_up_session_caps(struct ceph_mds_session *session) } } +/* + * kick outstanding requests + */ +void kick_requests(struct ceph_mds_client *mdsc, int mds, int all) +{ + struct ceph_mds_request *reqs[10]; + u64 nexttid = 0; + int i, got; + + dout(20, "kick_requests mds%d\n", mds); + while (nexttid < mdsc->last_tid) { + got = radix_tree_gang_lookup(&mdsc->request_tree, + (void **)&reqs, nexttid, 10); + if (got == 0) + break; + nexttid = reqs[got-1]->r_tid + 1; + for (i = 0; i < got; i++) { + if ((reqs[i]->r_session && + reqs[i]->r_session->s_mds == mds) || + (all && reqs[i]->r_fwd_session && + reqs[i]->r_fwd_session->s_mds == mds)) { + dout(10, " kicking req %llu\n", reqs[i]->r_tid); + put_request_sessions(reqs[i]); + complete(&reqs[i]->r_completion); + } + } + } +} + void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg) { @@ -783,19 +814,13 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, break; case CEPH_SESSION_CLOSE: - if (session->s_cap_seq == seq) { - dout(2, "session close from mds%d\n", mds); - complete(&session->s_completion); /* for good measure */ - __unregister_session(mdsc, mds); - } else { - dout(2, "ignoring session close from mds%d, " - "seq %llu < my seq %llu\n", - le32_to_cpu(msg->hdr.src.name.num), - seq, session->s_cap_seq); - } + dout(2, "session close from mds%d\n", mds); + complete(&session->s_completion); /* for good measure */ + __unregister_session(mdsc, mds); remove_session_caps(session); remove_session_leases(session); complete(&mdsc->session_close_waiters); + kick_requests(mdsc, mds, 0); /* cur only */ break; case CEPH_SESSION_RENEWCAPS: @@ -965,12 +990,12 @@ retry: } /* make request? */ - if (req->r_num_mds == 0) { + if (req->r_from_time == 0) { req->r_from_time = jiffies; dout(30, "do_request from_time %lu\n", req->r_from_time); } - BUG_ON(req->r_num_mds >= 2); - req->r_mds[req->r_num_mds++] = session; + BUG_ON(req->r_session); + req->r_session = session; req->r_resend_mds = -1; /* forget any specific mds hint */ req->r_attempts++; rhead = req->r_request->front.iov_base; @@ -980,6 +1005,7 @@ retry: /* send and wait */ spin_unlock(&mdsc->lock); dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap); + req->r_request = ceph_msg_maybe_dup(req->r_request); ceph_msg_get(req->r_request); send_msg_mds(mdsc, req->r_request, mds); wait_for_completion(&req->r_completion); @@ -1001,7 +1027,6 @@ retry: ceph_msg_put(req->r_request); req->r_request = 0; - drop_request_session_attempt_refs(req); err = le32_to_cpu(req->r_reply_info.head->result); dout(30, "do_request done on %p result %d\n", req, err); @@ -1033,8 +1058,16 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) } dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap); mds = le32_to_cpu(msg->hdr.src.name.num); - req->r_session = __get_session(mdsc, mds); - BUG_ON(req->r_session == 0); + if (req->r_session && req->r_session->s_mds != mds) { + put_session(req->r_session); + req->r_session = __get_session(mdsc, mds); + } + if (req->r_session == 0) { + derr(1, "got reply on %llu, but no session for mds%d\n", + tid, mds); + spin_unlock(&mdsc->lock); + return; + } BUG_ON(req->r_reply); spin_unlock(&mdsc->lock); @@ -1107,7 +1140,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, int err = -EINVAL; void *p = msg->front.iov_base; void *end = p + msg->front.iov_len; - int frommds = le32_to_cpu(msg->hdr.src.name.num); + int from_mds = le32_to_cpu(msg->hdr.src.name.num); /* decode */ ceph_decode_need(&p, end, sizeof(__u64)+2*sizeof(__u32), bad); @@ -1128,16 +1161,16 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, if (fwd_seq > req->r_num_fwd) { req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; - drop_request_session_attempt_refs(req); - req->r_num_mds = 1; - req->r_mds[0] = __get_session(mdsc, frommds); + put_request_sessions(req); + req->r_session = __get_session(mdsc, next_mds); + req->r_fwd_session = __get_session(mdsc, from_mds); } spin_unlock(&mdsc->lock); } else { /* no, resend. */ /* forward race not possible; mds would drop */ BUG_ON(fwd_seq <= req->r_num_fwd); - drop_request_session_attempt_refs(req); + put_request_sessions(req); req->r_resend_mds = next_mds; spin_unlock(&mdsc->lock); complete(&req->r_completion); @@ -1153,35 +1186,6 @@ bad: -/* - * kick outstanding requests - */ -void kick_requests(struct ceph_mds_client *mdsc, int mds) -{ - struct ceph_mds_request *reqs[10]; - u64 nexttid = 0; - int i, got; - - dout(20, "kick_requests mds%d\n", mds); - while (nexttid < mdsc->last_tid) { - got = radix_tree_gang_lookup(&mdsc->request_tree, - (void **)&reqs, nexttid, 10); - if (got == 0) - break; - nexttid = reqs[got-1]->r_tid + 1; - for (i = 0; i < got; i++) { - if ((reqs[i]->r_num_mds >= 1 && - reqs[i]->r_mds[0]->s_mds == mds) || - (reqs[i]->r_num_mds >= 2 && - reqs[i]->r_mds[1]->s_mds == mds)) { - dout(10, " kicking req %llu\n", reqs[i]->r_tid); - /* FIXME */ - complete(&reqs[i]->r_completion); - } - } - } -} - /* * send an reconnect to a recovering mds */ @@ -1247,6 +1251,9 @@ retry: list_for_each(cp, &session->s_caps) { cap = list_entry(cp, struct ceph_inode_cap, session_caps); ci = cap->ci; + dentry = d_find_alias(&ci->vfs_inode); + if (dentry == NULL) + continue; ceph_decode_need(&p, end, sizeof(u64) + sizeof(struct ceph_mds_cap_reconnect), needmore); @@ -1263,9 +1270,6 @@ retry: ceph_encode_timespec(&rec->mtime, &ci->vfs_inode.i_mtime); ceph_encode_timespec(&rec->atime, &ci->vfs_inode.i_atime); spin_unlock(&ci->vfs_inode.i_lock); - dentry = d_find_alias(&ci->vfs_inode); - if (dentry == NULL) - continue; path = ceph_build_dentry_path(dentry, &pathlen); if (IS_ERR(path)) { err = PTR_ERR(path); @@ -1363,7 +1367,7 @@ void check_new_map(struct ceph_mds_client *mdsc, __unregister_session(mdsc, i); break; case CEPH_MDS_SESSION_OPEN: - kick_requests(mdsc, i); + kick_requests(mdsc, i, 1); /* cur or forwarder */ break; } } diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index 7fcde726dbd35..5bd203e9441b2 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -81,8 +81,7 @@ struct ceph_mds_request { unsigned long r_from_time; struct ceph_inode_cap *r_cap; struct ceph_mds_session *r_session; - struct ceph_mds_session *r_mds[2]; - int r_num_mds; /* items in r_mds */ + struct ceph_mds_session *r_fwd_session; /* forwarded from */ int r_attempts; /* resend attempts */ int r_num_fwd; /* number of forward attempts */ diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 44ed2af88cfe0..f3401fe094c7d 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -1287,6 +1287,34 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, } +/* + * a single ceph_msg can't be queued for send twice, unless it's + * already been delivered (i.e. we have the only remaining reference). + * so, dup the message if there is more than once reference. + */ +struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old) +{ + struct ceph_msg *dup; + + if (atomic_read(&old->nref) == 1) + return old; /* we have only ref, all is well */ + + dup = ceph_msg_new(le32_to_cpu(old->hdr.type), + le32_to_cpu(old->hdr.front_len), + le32_to_cpu(old->hdr.data_len), + le32_to_cpu(old->hdr.data_off), + old->pages); + BUG_ON(!dup); + memcpy(dup->front.iov_base, old->front.iov_base, + le32_to_cpu(old->hdr.front_len)); + if (old->pages) + derr(0, "WARNING: unsafely referenced old pages for %p\n", + old); + ceph_msg_put(old); + return dup; +} + + /* * queue up an outgoing message. * diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 43aba7b554af7..47958f1a34df9 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -154,6 +154,8 @@ static inline void ceph_msg_put_list(struct list_head *head) } } +extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg); + extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 71c84d359822c..24cb35cd3e583 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -164,32 +164,6 @@ static int pick_osd(struct ceph_osd_client *osdc, return -1; } -/* - * a single ceph_msg can't be queued for send twice, unless it's - * already been delivered (i.e. we have the only remaining reference). - */ -static struct ceph_msg *redup_request(struct ceph_msg *old) -{ - struct ceph_msg *dup; - - if (atomic_read(&old->nref) == 1) - return old; /* we have only ref, all good */ - - dup = ceph_msg_new(le32_to_cpu(old->hdr.type), - le32_to_cpu(old->hdr.front_len), - le32_to_cpu(old->hdr.data_len), - le32_to_cpu(old->hdr.data_off), - old->pages); - BUG_ON(!dup); - memcpy(dup->front.iov_base, old->front.iov_base, - le32_to_cpu(old->hdr.front_len)); - if (old->pages) - derr(0, "WARNING: unsafely referenced old pages for %p\n", - old); - ceph_msg_put(old); - return dup; -} - /* * caller should hold map_sem (for read) */ @@ -291,7 +265,7 @@ more: dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd); get_request(req); spin_unlock(&osdc->request_lock); - req->r_request = redup_request(req->r_request); + req->r_request = ceph_msg_maybe_dup(req->r_request); send_request(osdc, req, osd); put_request(req); goto more; -- 2.39.5