]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fixed mds client session handling, request kicking
authorSage Weil <sage@newdream.net>
Mon, 21 Apr 2008 17:56:19 +0000 (10:56 -0700)
committerSage Weil <sage@newdream.net>
Mon, 21 Apr 2008 17:56:19 +0000 (10:56 -0700)
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/osd_client.c

index e563b716d4f971aa8650e5f3df5e5c18521378f9..8aefa225b7b34717c05511f9aff91e5623cd027c 100644 (file)
@@ -353,12 +353,16 @@ static void get_request(struct ceph_mds_request *req)
        atomic_inc(&req->r_ref);
 }
 
-static void drop_request_session_attempt_refs(struct ceph_mds_request *req)
+static void put_request_sessions(struct ceph_mds_request *req)
 {
-       int i;
-       for (i = 0; i < req->r_num_mds; i++)
-               put_session(req->r_mds[i]);
-       req->r_num_mds = 0;
+       if (req->r_session) {
+               put_session(req->r_session);
+               req->r_session = 0;
+       }
+       if (req->r_fwd_session) {
+               put_session(req->r_fwd_session);
+               req->r_fwd_session = 0;
+       }
 }
 
 void ceph_mdsc_put_request(struct ceph_mds_request *req)
@@ -372,13 +376,11 @@ void ceph_mdsc_put_request(struct ceph_mds_request *req)
                        ceph_msg_put(req->r_reply);
                        destroy_reply_info(&req->r_reply_info);
                }
-               if (req->r_session)
-                       put_session(req->r_session);
                if (req->r_last_inode)
                        iput(req->r_last_inode);
                if (req->r_last_dentry)
                        dput(req->r_last_dentry);
-               drop_request_session_attempt_refs(req);
+               put_request_sessions(req);
                kfree(req);
        }
 }
@@ -411,7 +413,7 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg)
        req->r_fmode = 0;
        req->r_cap = 0;
        req->r_session = 0;
-       req->r_num_mds = 0;
+       req->r_fwd_session = 0;
        req->r_attempts = 0;
        req->r_num_fwd = 0;
        req->r_resend_mds = -1;
@@ -748,6 +750,35 @@ static void wake_up_session_caps(struct ceph_mds_session *session)
        }
 }
 
+/*
+ * kick outstanding requests
+ */
+void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+       struct ceph_mds_request *reqs[10];
+       u64 nexttid = 0;
+       int i, got;
+
+       dout(20, "kick_requests mds%d\n", mds);
+       while (nexttid < mdsc->last_tid) {
+               got = radix_tree_gang_lookup(&mdsc->request_tree,
+                                            (void **)&reqs, nexttid, 10);
+               if (got == 0)
+                       break;
+               nexttid = reqs[got-1]->r_tid + 1;
+               for (i = 0; i < got; i++) {
+                       if ((reqs[i]->r_session &&
+                            reqs[i]->r_session->s_mds == mds) ||
+                           (all && reqs[i]->r_fwd_session &&
+                            reqs[i]->r_fwd_session->s_mds == mds)) {
+                               dout(10, " kicking req %llu\n", reqs[i]->r_tid);
+                               put_request_sessions(reqs[i]);
+                               complete(&reqs[i]->r_completion);
+                       }
+               }
+       }
+}
+
 void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
                              struct ceph_msg *msg)
 {
@@ -783,19 +814,13 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
                break;
 
        case CEPH_SESSION_CLOSE:
-               if (session->s_cap_seq == seq) {
-                       dout(2, "session close from mds%d\n", mds);
-                       complete(&session->s_completion); /* for good measure */
-                       __unregister_session(mdsc, mds);
-               } else {
-                       dout(2, "ignoring session close from mds%d, "
-                            "seq %llu < my seq %llu\n",
-                            le32_to_cpu(msg->hdr.src.name.num),
-                            seq, session->s_cap_seq);
-               }
+               dout(2, "session close from mds%d\n", mds);
+               complete(&session->s_completion); /* for good measure */
+               __unregister_session(mdsc, mds);
                remove_session_caps(session);
                remove_session_leases(session);
                complete(&mdsc->session_close_waiters);
+               kick_requests(mdsc, mds, 0); /* cur only */
                break;
 
        case CEPH_SESSION_RENEWCAPS:
@@ -965,12 +990,12 @@ retry:
        }
 
        /* make request? */
-       if (req->r_num_mds == 0) {
+       if (req->r_from_time == 0) {
                req->r_from_time = jiffies;
                dout(30, "do_request from_time %lu\n", req->r_from_time);
        }
-       BUG_ON(req->r_num_mds >= 2);
-       req->r_mds[req->r_num_mds++] = session;
+       BUG_ON(req->r_session);
+       req->r_session = session;
        req->r_resend_mds = -1;  /* forget any specific mds hint */
        req->r_attempts++;
        rhead = req->r_request->front.iov_base;
@@ -980,6 +1005,7 @@ retry:
        /* send and wait */
        spin_unlock(&mdsc->lock);
        dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
+       req->r_request = ceph_msg_maybe_dup(req->r_request);  
        ceph_msg_get(req->r_request);
        send_msg_mds(mdsc, req->r_request, mds);
        wait_for_completion(&req->r_completion);
@@ -1001,7 +1027,6 @@ retry:
 
        ceph_msg_put(req->r_request);
        req->r_request = 0;
-       drop_request_session_attempt_refs(req);
 
        err = le32_to_cpu(req->r_reply_info.head->result);
        dout(30, "do_request done on %p result %d\n", req, err);
@@ -1033,8 +1058,16 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        }
        dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
        mds = le32_to_cpu(msg->hdr.src.name.num);
-       req->r_session = __get_session(mdsc, mds);
-       BUG_ON(req->r_session == 0);
+       if (req->r_session && req->r_session->s_mds != mds) {
+               put_session(req->r_session);
+               req->r_session = __get_session(mdsc, mds);
+       }
+       if (req->r_session == 0) {
+               derr(1, "got reply on %llu, but no session for mds%d\n",
+                    tid, mds);
+               spin_unlock(&mdsc->lock);
+               return;
+       }
        BUG_ON(req->r_reply);
        spin_unlock(&mdsc->lock);
 
@@ -1107,7 +1140,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
        int err = -EINVAL;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
-       int frommds = le32_to_cpu(msg->hdr.src.name.num);
+       int from_mds = le32_to_cpu(msg->hdr.src.name.num);
 
        /* decode */
        ceph_decode_need(&p, end, sizeof(__u64)+2*sizeof(__u32), bad);
@@ -1128,16 +1161,16 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                if (fwd_seq > req->r_num_fwd) {
                        req->r_num_fwd = fwd_seq;
                        req->r_resend_mds = next_mds;
-                       drop_request_session_attempt_refs(req);
-                       req->r_num_mds = 1;
-                       req->r_mds[0] = __get_session(mdsc, frommds);
+                       put_request_sessions(req);
+                       req->r_session = __get_session(mdsc, next_mds);
+                       req->r_fwd_session = __get_session(mdsc, from_mds);
                }
                spin_unlock(&mdsc->lock);
        } else {
                /* no, resend. */
                /* forward race not possible; mds would drop */
                BUG_ON(fwd_seq <= req->r_num_fwd);
-               drop_request_session_attempt_refs(req);
+               put_request_sessions(req);
                req->r_resend_mds = next_mds;
                spin_unlock(&mdsc->lock);
                complete(&req->r_completion);
@@ -1153,35 +1186,6 @@ bad:
 
 
 
-/*
- * kick outstanding requests
- */
-void kick_requests(struct ceph_mds_client *mdsc, int mds)
-{
-       struct ceph_mds_request *reqs[10];
-       u64 nexttid = 0;
-       int i, got;
-
-       dout(20, "kick_requests mds%d\n", mds);
-       while (nexttid < mdsc->last_tid) {
-               got = radix_tree_gang_lookup(&mdsc->request_tree,
-                                            (void **)&reqs, nexttid, 10);
-               if (got == 0)
-                       break;
-               nexttid = reqs[got-1]->r_tid + 1;
-               for (i = 0; i < got; i++) {
-                       if ((reqs[i]->r_num_mds >= 1 &&
-                            reqs[i]->r_mds[0]->s_mds == mds) ||
-                           (reqs[i]->r_num_mds >= 2 &&
-                            reqs[i]->r_mds[1]->s_mds == mds)) {
-                               dout(10, " kicking req %llu\n", reqs[i]->r_tid);
-                               /* FIXME */
-                               complete(&reqs[i]->r_completion);
-                       }
-               }
-       }
-}
-
 /*
  * send an reconnect to a recovering mds
  */
@@ -1247,6 +1251,9 @@ retry:
        list_for_each(cp, &session->s_caps) {
                cap = list_entry(cp, struct ceph_inode_cap, session_caps);
                ci = cap->ci;
+               dentry = d_find_alias(&ci->vfs_inode);
+               if (dentry == NULL)
+                       continue;
                ceph_decode_need(&p, end, sizeof(u64) +
                                 sizeof(struct ceph_mds_cap_reconnect),
                                 needmore);
@@ -1263,9 +1270,6 @@ retry:
                ceph_encode_timespec(&rec->mtime, &ci->vfs_inode.i_mtime);
                ceph_encode_timespec(&rec->atime, &ci->vfs_inode.i_atime);
                spin_unlock(&ci->vfs_inode.i_lock);
-               dentry = d_find_alias(&ci->vfs_inode);
-               if (dentry == NULL)
-                       continue;
                path = ceph_build_dentry_path(dentry, &pathlen);
                if (IS_ERR(path)) {
                        err = PTR_ERR(path);
@@ -1363,7 +1367,7 @@ void check_new_map(struct ceph_mds_client *mdsc,
                        __unregister_session(mdsc, i);
                        break;
                case CEPH_MDS_SESSION_OPEN:
-                       kick_requests(mdsc, i);
+                       kick_requests(mdsc, i, 1); /* cur or forwarder */
                        break;
                }
        }
index 7fcde726dbd359d1e6f6bd8469f1c4f6b5daed99..5bd203e9441b2226f8b51039aaf13ceb35064a30 100644 (file)
@@ -81,8 +81,7 @@ struct ceph_mds_request {
        unsigned long           r_from_time;
        struct ceph_inode_cap  *r_cap;
        struct ceph_mds_session *r_session;
-       struct ceph_mds_session *r_mds[2];
-       int               r_num_mds;    /* items in r_mds */
+       struct ceph_mds_session *r_fwd_session;  /* forwarded from */
 
        int               r_attempts;   /* resend attempts */
        int               r_num_fwd;    /* number of forward attempts */
index 44ed2af88cfe0192da724edf712316bb0991ad58..f3401fe094c7d36890c758afa6771a65272bcd28 100644 (file)
@@ -1287,6 +1287,34 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr,
 }
 
 
+/*
+ * a single ceph_msg can't be queued for send twice, unless it's
+ * already been delivered (i.e. we have the only remaining reference).
+ * so, dup the message if there is more than once reference.
+ */
+struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *old)
+{
+       struct ceph_msg *dup;
+
+       if (atomic_read(&old->nref) == 1)
+               return old;  /* we have only ref, all is well */
+       
+       dup = ceph_msg_new(le32_to_cpu(old->hdr.type), 
+                          le32_to_cpu(old->hdr.front_len),
+                          le32_to_cpu(old->hdr.data_len), 
+                          le32_to_cpu(old->hdr.data_off),
+                          old->pages);
+       BUG_ON(!dup);
+       memcpy(dup->front.iov_base, old->front.iov_base,
+              le32_to_cpu(old->hdr.front_len));
+       if (old->pages)
+               derr(0, "WARNING: unsafely referenced old pages for %p\n",
+                    old);
+       ceph_msg_put(old);
+       return dup;
+}
+
+
 /*
  * queue up an outgoing message.
  *
index 43aba7b554af7f9dc582f4d6eed1d7d509782dc3..47958f1a34df93644ca42c0c7ce25a14ee28e252 100644 (file)
@@ -154,6 +154,8 @@ static inline void ceph_msg_put_list(struct list_head *head)
        }
 }
 
+extern struct ceph_msg *ceph_msg_maybe_dup(struct ceph_msg *msg);
+
 extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                         unsigned long timeout);
 
index 71c84d359822c8f6f44980923daff7ed64256020..24cb35cd3e583964f413eb92589153f11a6bf8f6 100644 (file)
@@ -164,32 +164,6 @@ static int pick_osd(struct ceph_osd_client *osdc,
        return -1;
 }
 
-/*
- * a single ceph_msg can't be queued for send twice, unless it's
- * already been delivered (i.e. we have the only remaining reference).
- */
-static struct ceph_msg *redup_request(struct ceph_msg *old)
-{
-       struct ceph_msg *dup;
-
-       if (atomic_read(&old->nref) == 1)
-               return old;  /* we have only ref, all good */
-       
-       dup = ceph_msg_new(le32_to_cpu(old->hdr.type), 
-                          le32_to_cpu(old->hdr.front_len),
-                          le32_to_cpu(old->hdr.data_len), 
-                          le32_to_cpu(old->hdr.data_off),
-                          old->pages);
-       BUG_ON(!dup);
-       memcpy(dup->front.iov_base, old->front.iov_base,
-              le32_to_cpu(old->hdr.front_len));
-       if (old->pages)
-               derr(0, "WARNING: unsafely referenced old pages for %p\n",
-                    old);
-       ceph_msg_put(old);
-       return dup;
-}
-
 /*
  * caller should hold map_sem (for read)
  */
@@ -291,7 +265,7 @@ more:
                dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
                get_request(req);
                spin_unlock(&osdc->request_lock);
-               req->r_request = redup_request(req->r_request);  
+               req->r_request = ceph_msg_maybe_dup(req->r_request);  
                send_request(osdc, req, osd);
                put_request(req);
                goto more;