]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: async mds requests
authorSage Weil <sage@newdream.net>
Wed, 18 Feb 2009 22:13:49 +0000 (14:13 -0800)
committerSage Weil <sage@newdream.net>
Thu, 19 Feb 2009 19:58:25 +0000 (11:58 -0800)
Restructure the mds client to handle mds request asynchronously, so
that we can handle requests that are not managed by a blocking
thread.

src/TODO
src/kernel/caps.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/snap.c
src/kernel/super.c
src/kernel/super.h

index cbddec519c2aacfe4bb980009eced0cbfbb1132d..b916dcd76fa9269dc678b3a1f1dbdf5f05f8ed3b 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -44,6 +44,9 @@ timer
 - each SafeTimer should just be its own thread.
 
 kernel client
+- async mds requests
+- pass dentries directly into mds requests; rebuild messages on message retry
+- optional or no fill_trace?
 - flock, fnctl locks
 - async xattrs
 - avoid pinning inodes with expireable caps?
index 99da64d104ef9d1f0a7b6a1f049b73c089116ef3..25340319b68231f8235476e7299a8680dc2e38fe 100644 (file)
@@ -446,7 +446,7 @@ void ceph_remove_cap(struct ceph_cap *cap)
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
                                struct ceph_inode_info *ci)
 {
-       ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+       ci->i_hold_caps_until = round_jiffies(jiffies + CEPH_CAP_DELAY);
        dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
             ci->i_hold_caps_until);
        spin_lock(&mdsc->cap_delay_lock);
@@ -691,7 +691,7 @@ retry:
                if (!session) {
                        spin_unlock(&inode->i_lock);
                        mutex_lock(&mdsc->mutex);
-                       session = __ceph_get_mds_session(mdsc, mds);
+                       session = __ceph_lookup_mds_session(mdsc, mds);
                        mutex_unlock(&mdsc->mutex);
                        if (session) {
                                dout(10, "inverting session/ino locks on %p\n",
@@ -1625,7 +1625,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
 
        /* find session */
        mutex_lock(&mdsc->mutex);
-       session = __ceph_get_mds_session(mdsc, mds);
+       session = __ceph_lookup_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                dout(10, "WTF, got cap but no session for mds%d\n", mds);
index 60624985a8fc909154a89ff9f0ca519a8cf769b5..6cb37ccdfc1edd911a2fe8f4ef3e3d4959d962ac 100644 (file)
@@ -13,6 +13,8 @@ int ceph_debug_mdsc = -1;
 #include "messenger.h"
 #include "decode.h"
 
+static void __wake_requests(struct ceph_mds_client *mdsc,
+                           struct list_head *head);
 
 /*
  * address and send message to a given mds
@@ -278,30 +280,39 @@ static const char *session_state_name(int s)
        }
 }
 
+static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+{
+       dout(30, "get_session %p %d -> %d\n", s,
+            atomic_read(&s->s_ref), atomic_read(&s->s_ref)+1);
+       atomic_inc(&s->s_ref);
+       return s;
+}
+
+void ceph_put_mds_session(struct ceph_mds_session *s)
+{
+       dout(30, "put_session %p %d -> %d\n", s,
+            atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
+       if (atomic_dec_and_test(&s->s_ref))
+               kfree(s);
+}
+
 /*
  * called under mdsc->mutex
  */
-struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc,
-                                               int mds)
+struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
+                                                  int mds)
 {
        struct ceph_mds_session *session;
 
        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
                return NULL;
        session = mdsc->sessions[mds];
-       dout(30, "get_mds_session %p %d -> %d\n", session,
+       dout(30, "lookup_mds_session %p %d -> %d\n", session,
             atomic_read(&session->s_ref), atomic_read(&session->s_ref)+1);
-       atomic_inc(&session->s_ref);
+       get_session(session);
        return session;
 }
 
-void ceph_put_mds_session(struct ceph_mds_session *s)
-{
-       dout(30, "put_mds_session %p %d -> %d\n", s,
-            atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
-       if (atomic_dec_and_test(&s->s_ref))
-               kfree(s);
-}
 
 /*
  * create+register a new session for given mds.
@@ -326,7 +337,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_rdcaps);
        s->s_nr_caps = 0;
        atomic_set(&s->s_ref, 1);
-       init_completion(&s->s_completion);
+       INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
 
        dout(10, "register_session mds%d\n", mds);
@@ -610,43 +621,25 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 }
 
 /*
- * Register request with mon_client for a new mds map.  Wait until
- * we get one (or time out).
+ * Register request with mon_client for a new mds map. 
  *
- * called under mdsc->mutex (dropped while we wait)
+ * called under mdsc->mutex.
  */
-static int wait_for_new_map(struct ceph_mds_client *mdsc,
-                            unsigned long timeout)
+static void request_new_map(struct ceph_mds_client *mdsc)
 {
-       u32 have;
-       int err = 0;
-
-       dout(30, "wait_for_new_map enter\n");
-       have = mdsc->mdsmap->m_epoch;
+       dout(30, "request_new_map enter\n");
        mutex_unlock(&mdsc->mutex);
-       ceph_monc_request_mdsmap(&mdsc->client->monc, have+1);
-       if (timeout) {
-               err = wait_for_completion_timeout(&mdsc->map_waiters, timeout);
-               if (err > 0)
-                       err = 0;
-               else if (err == 0)
-                       err = -EIO;
-       } else {
-               wait_for_completion(&mdsc->map_waiters);
-       }
+       ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch+1);
        mutex_lock(&mdsc->mutex);
-       dout(30, "wait_for_new_map err %d\n", err);
-       return err;
 }
 
 /*
- * open a new session with the given mds, and wait for mds ack.  the
- * timeout is optional.
+ * send session open request.
  *
  * called under mdsc->mutex
  */
-static int open_session(struct ceph_mds_client *mdsc,
-                       struct ceph_mds_session *session, unsigned long timeout)
+static int __open_session(struct ceph_mds_client *mdsc,
+                         struct ceph_mds_session *session)
 {
        struct ceph_msg *msg;
        int mstate;
@@ -656,45 +649,21 @@ static int open_session(struct ceph_mds_client *mdsc,
        /* wait for mds to go active? */
        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
        dout(10, "open_session to mds%d, state %d\n", mds, mstate);
-       if (mstate < CEPH_MDS_STATE_ACTIVE) {
-               err = wait_for_new_map(mdsc, timeout);
-               if (err)
-                       return err;
-               mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
-               if (mstate < CEPH_MDS_STATE_ACTIVE) {
-                       dout(30, "open_session mds%d now %d still not active\n",
-                            mds, mstate);
-                       return -EAGAIN;  /* hrm, try again? */
-               }
-       }
-
        session->s_state = CEPH_MDS_SESSION_OPENING;
        session->s_renew_requested = jiffies;
        mutex_unlock(&mdsc->mutex);
 
        /* send connect message */
        msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
-       if (IS_ERR(msg))
-               return PTR_ERR(msg);
-       ceph_send_msg_mds(mdsc, msg, mds);
-
-       /* wait for session to open (or fail, or close) */
-       dout(30, "open_session waiting on session %p\n", session);
-       if (timeout) {
-               err = wait_for_completion_timeout(&session->s_completion,
-                                                 timeout);
-               if (err > 0)
-                       err = 0;
-               else if (err == 0)
-                       err = -EIO;
-       } else {
-               wait_for_completion(&session->s_completion);
+       if (IS_ERR(msg)) {
+               err = PTR_ERR(msg);
+               goto out;
        }
-       dout(30, "open_session done waiting on session %p, state %d\n",
-            session, session->s_state);
+       ceph_send_msg_mds(mdsc, msg, mds);
 
+out:
        mutex_lock(&mdsc->mutex);
-       return err;
+       return 0;
 }
 
 /*
@@ -751,38 +720,6 @@ static void wake_up_session_caps(struct ceph_mds_session *session)
        }
 }
 
-/*
- * Wake up threads with requests pending for @mds, so that they can
- * resubmit their requests to a possibly different mds.  If @all is set,
- * wake up if their requests has been forwarded to @mds, too.
- */
-static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
-{
-       struct ceph_mds_request *reqs[10];
-       u64 nexttid = 0;
-       int i, got;
-
-       dout(20, "kick_requests mds%d\n", mds);
-       while (nexttid < mdsc->last_tid) {
-               got = radix_tree_gang_lookup(&mdsc->request_tree,
-                                            (void **)&reqs, nexttid, 10);
-               if (got == 0)
-                       break;
-               nexttid = reqs[got-1]->r_tid + 1;
-               for (i = 0; i < got; i++) {
-                       if (!reqs[i]->r_got_unsafe &&
-                           ((reqs[i]->r_session &&
-                             reqs[i]->r_session->s_mds == mds) ||
-                            (all && reqs[i]->r_fwd_session &&
-                             reqs[i]->r_fwd_session->s_mds == mds))) {
-                               dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
-                               put_request_sessions(reqs[i]);
-                               complete(&reqs[i]->r_completion);
-                       }
-               }
-       }
-}
-
 /*
  * Send periodic message to MDS renewing all currently held caps.  The
  * ack will reset the expiration for all caps from this session.
@@ -928,94 +865,6 @@ void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
 }
 
 
-/*
- * handle a mds session control message
- */
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
-                             struct ceph_msg *msg)
-{
-       u32 op;
-       u64 seq;
-       struct ceph_mds_session *session = NULL;
-       int mds;
-       struct ceph_mds_session_head *h = msg->front.iov_base;
-
-       if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
-               return;
-       mds = le32_to_cpu(msg->hdr.src.name.num);
-
-       /* decode */
-       if (msg->front.iov_len != sizeof(*h))
-               goto bad;
-       op = le32_to_cpu(h->op);
-       seq = le64_to_cpu(h->seq);
-
-       mutex_lock(&mdsc->mutex);
-       session = __ceph_get_mds_session(mdsc, mds);
-       if (session && mdsc->mdsmap)
-               /* FIXME: this ttl calculation is generous */
-               session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
-       mutex_unlock(&mdsc->mutex);
-
-       if (!session) {
-               if (op != CEPH_SESSION_OPEN) {
-                       dout(10, "handle_session no session for mds%d\n", mds);
-                       return;
-               }
-               dout(10, "handle_session creating session for mds%d\n", mds);
-               session = register_session(mdsc, mds);
-       }
-
-       mutex_lock(&session->s_mutex);
-
-       dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
-            mds, ceph_session_op_name(op), session,
-            session_state_name(session->s_state), seq);
-       switch (op) {
-       case CEPH_SESSION_OPEN:
-               session->s_state = CEPH_MDS_SESSION_OPEN;
-               renewed_caps(mdsc, session, 0);
-               complete(&session->s_completion);
-               if (mdsc->stopping)
-                       __close_session(mdsc, session);
-               break;
-
-       case CEPH_SESSION_RENEWCAPS:
-               renewed_caps(mdsc, session, 1);
-               break;
-
-       case CEPH_SESSION_CLOSE:
-               unregister_session(mdsc, mds);
-               remove_session_caps(session);
-               complete(&session->s_completion); /* for good measure */
-               complete(&mdsc->session_close_waiters);
-               kick_requests(mdsc, mds, 0);      /* cur only */
-               break;
-
-       case CEPH_SESSION_STALE:
-               dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
-               spin_lock(&session->s_cap_lock);
-               session->s_cap_gen++;
-               session->s_cap_ttl = 0;
-               spin_unlock(&session->s_cap_lock);
-               send_renew_caps(mdsc, session);
-               break;
-
-       default:
-               derr(0, "bad session op %d from mds%d\n", op, mds);
-               WARN_ON(1);
-       }
-
-       mutex_unlock(&session->s_mutex);
-       ceph_put_mds_session(session);
-       return;
-
-bad:
-       derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
-            (int)msg->front.iov_len, (int)sizeof(*h));
-       return;
-}
-
 
 /*
  * create an mds request and message.
@@ -1136,27 +985,20 @@ static void __prepare_send_request(struct ceph_mds_client *mdsc,
 }
 
 /*
- * Synchrously perform an mds request.  Take care of all of the
- * session setup, forwarding, retry details.
+ * send request, or put it on the appropriate wait list.
  */
-int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
-                        struct inode *listener,
-                        struct ceph_mds_request *req)
+static int __do_request(struct ceph_mds_client *mdsc,
+                       struct ceph_mds_request *req)
 {
        struct ceph_mds_session *session = NULL;
-       int err;
        int mds = -1;
-       int safe = 0;
+       int err = -EAGAIN;
 
-       dout(30, "do_request on %p\n", req);
+       if (req->r_reply)
+               goto out;
 
-       mutex_lock(&mdsc->mutex);
-       __register_request(mdsc, listener, req);
-retry:
        if (req->r_timeout &&
            time_after_eq(jiffies, req->r_started + req->r_timeout)) {
-               if (session && session->s_state == CEPH_MDS_SESSION_OPENING)
-                       unregister_session(mdsc, mds);
                dout(10, "do_request timed out\n");
                err = -EIO;
                goto finish;
@@ -1166,82 +1008,146 @@ retry:
        if (mds < 0 ||
            ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
                dout(30, "do_request no mds or not active, waiting for map\n");
-               err = wait_for_new_map(mdsc, req->r_timeout);
-               if (err)
-                       goto finish;
-               goto retry;
+               list_add(&req->r_wait, &mdsc->waiting_for_map);
+               request_new_map(mdsc);
+               goto out;
        }
 
-       /* get session */
-       session = __ceph_get_mds_session(mdsc, mds);
+       /* get, open session */
+       session = __ceph_lookup_mds_session(mdsc, mds);
        if (!session)
                session = register_session(mdsc, mds);
        dout(30, "do_request mds%d session %p state %s\n", mds, session,
             session_state_name(session->s_state));
-
-       /* open? */
-       err = 0;
-       if (session->s_state == CEPH_MDS_SESSION_NEW ||
-           session->s_state == CEPH_MDS_SESSION_CLOSING)
-               err = open_session(mdsc, session, req->r_timeout);
-       if (session->s_state != CEPH_MDS_SESSION_OPEN ||
-           err == -EAGAIN) {
-               dout(30, "do_request session %p not open, state=%s\n",
-                    session, session_state_name(session->s_state));
-               ceph_put_mds_session(session);
-               goto retry;
+       if (session->s_state != CEPH_MDS_SESSION_OPEN) {
+               if (session->s_state == CEPH_MDS_SESSION_NEW ||
+                   session->s_state == CEPH_MDS_SESSION_CLOSING)
+                       __open_session(mdsc, session);
+               list_add(&req->r_wait, &session->s_waiting);
+               request_new_map(mdsc);
+               goto out_session;
        }
 
-       BUG_ON(req->r_session);
-       req->r_session = session; /* request now owns the session ref */
+       /* send request */
+       req->r_session = get_session(session);
        req->r_resend_mds = -1;   /* forget any previous mds hint */
 
        if (req->r_request_started == 0)   /* note request start time */
                req->r_request_started = jiffies;
 
        __prepare_send_request(mdsc, req);
-       mutex_unlock(&mdsc->mutex);
 
+       mutex_unlock(&mdsc->mutex);
        ceph_msg_get(req->r_request);
        ceph_send_msg_mds(mdsc, req->r_request, mds);
+       mutex_lock(&mdsc->mutex);
 
-       if (req->r_timeout) {
-               err = wait_for_completion_timeout(&req->r_completion,
-                                                 req->r_timeout);
-               if (err > 0)
-                       err = 0;
-               else if (err == 0)
-                       err = -EIO;  /* timed out */
-       } else {
-               err = 0;
-               wait_for_completion(&req->r_completion);
+       err = 0;
+out_session:
+       ceph_put_mds_session(session);
+out:
+       return err;
+
+finish:
+       req->r_reply = ERR_PTR(err);
+       complete(&req->r_completion);
+       goto out;
+}
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+                           struct list_head *head)
+{
+       struct list_head *p, *n;
+
+       list_for_each_safe(p, n, head) {
+               struct ceph_mds_request *req =
+                       list_entry(p, struct ceph_mds_request, r_wait);
+               list_del_init(&req->r_wait);
+               __do_request(mdsc, req);
        }
+}
+
+/*
+ * Wake up threads with requests pending for @mds, so that they can
+ * resubmit their requests to a possibly different mds.  If @all is set,
+ * wake up if their requests has been forwarded to @mds, too.
+ */
+static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+       struct ceph_mds_request *reqs[10];
+       u64 nexttid = 0;
+       int i, got;
+
+       dout(20, "kick_requests mds%d\n", mds);
+       while (nexttid < mdsc->last_tid) {
+               got = radix_tree_gang_lookup(&mdsc->request_tree,
+                                            (void **)&reqs, nexttid, 10);
+               if (got == 0)
+                       break;
+               nexttid = reqs[got-1]->r_tid + 1;
+               for (i = 0; i < got; i++) {
+                       if (!reqs[i]->r_got_unsafe &&
+                           ((reqs[i]->r_session &&
+                             reqs[i]->r_session->s_mds == mds) ||
+                            (all && reqs[i]->r_fwd_session &&
+                             reqs[i]->r_fwd_session->s_mds == mds))) {
+                               dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
+                               put_request_sessions(reqs[i]);
+                               __do_request(mdsc, reqs[i]);
+                       }
+               }
+       }
+}
+
+
+/*
+ * Synchrously perform an mds request.  Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+                        struct inode *listener,
+                        struct ceph_mds_request *req)
+{
+       int err;
+       int safe = 0;
+
+       dout(30, "do_request on %p\n", req);
+
        mutex_lock(&mdsc->mutex);
-       if (req->r_reply == NULL && !err) {
-               put_request_sessions(req);
-               goto retry;
+       __register_request(mdsc, listener, req);
+       __do_request(mdsc, req);
+
+       if (!req->r_reply) {
+               mutex_unlock(&mdsc->mutex);
+               if (req->r_timeout) {
+                       err = wait_for_completion_timeout(&req->r_completion,
+                                                         req->r_timeout);
+                       if (err > 0)
+                               err = 0;
+                       else if (err == 0)
+                               req->r_reply = ERR_PTR(-EIO);
+               } else {
+                       wait_for_completion(&req->r_completion);
+               }
+               mutex_lock(&mdsc->mutex);
        }
+
        if (IS_ERR(req->r_reply)) {
                err = PTR_ERR(req->r_reply);
                req->r_reply = NULL;
-       }
-       if (!err)
-               /* all is well, reply has been parsed. */
+               safe = 1;
+       } else {
                err = le32_to_cpu(req->r_reply_info.head->result);
-       if (req)
                safe = req->r_reply_info.head->safe;
-finish:
-       if (safe) {
-               complete(&req->r_safe_completion);
-               __unregister_request(mdsc, req);
        }
 
-       mutex_unlock(&mdsc->mutex);
-
        if (safe) {
+               complete(&req->r_safe_completion);
+               __unregister_request(mdsc, req);
                ceph_msg_put(req->r_request);
                req->r_request = NULL;
        }
+       mutex_unlock(&mdsc->mutex);
 
        dout(30, "do_request %p done, result %d\n", req, err);
        return err;
@@ -1313,7 +1219,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
        if (req->r_session && req->r_session->s_mds != mds) {
                ceph_put_mds_session(req->r_session);
-               req->r_session = __ceph_get_mds_session(mdsc, mds);
+               req->r_session = __ceph_lookup_mds_session(mdsc, mds);
        }
        if (req->r_session == NULL) {
                derr(1, "got reply on %llu, but no session for mds%d\n",
@@ -1433,8 +1339,8 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                     from_mds);
                req->r_num_fwd = fwd_seq;
                put_request_sessions(req);
-               req->r_session = __ceph_get_mds_session(mdsc, next_mds);
-               req->r_fwd_session = __ceph_get_mds_session(mdsc, from_mds);
+               req->r_session = __ceph_lookup_mds_session(mdsc, next_mds);
+               req->r_fwd_session = __ceph_lookup_mds_session(mdsc, from_mds);
        } else {
                /* no, resend. */
                /* forward race not possible; mds would drop */
@@ -1442,7 +1348,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                req->r_num_fwd = fwd_seq;
                req->r_resend_mds = next_mds;
                put_request_sessions(req);
-               complete(&req->r_completion);  /* wake up do_request */
+               __do_request(mdsc, req);
        }
        ceph_mdsc_put_request(req);
 out:
@@ -1453,6 +1359,100 @@ bad:
        derr(0, "problem decoding message, err=%d\n", err);
 }
 
+/*
+ * handle a mds session control message
+ */
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+                             struct ceph_msg *msg)
+{
+       u32 op;
+       u64 seq;
+       struct ceph_mds_session *session = NULL;
+       int mds;
+       struct ceph_mds_session_head *h = msg->front.iov_base;
+       int wake = 0;
+
+       if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+               return;
+       mds = le32_to_cpu(msg->hdr.src.name.num);
+
+       /* decode */
+       if (msg->front.iov_len != sizeof(*h))
+               goto bad;
+       op = le32_to_cpu(h->op);
+       seq = le64_to_cpu(h->seq);
+
+       mutex_lock(&mdsc->mutex);
+       session = __ceph_lookup_mds_session(mdsc, mds);
+       if (session && mdsc->mdsmap)
+               /* FIXME: this ttl calculation is generous */
+               session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
+       mutex_unlock(&mdsc->mutex);
+
+       if (!session) {
+               if (op != CEPH_SESSION_OPEN) {
+                       dout(10, "handle_session no session for mds%d\n", mds);
+                       return;
+               }
+               dout(10, "handle_session creating session for mds%d\n", mds);
+               session = register_session(mdsc, mds);
+       }
+
+       mutex_lock(&session->s_mutex);
+
+       dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+            mds, ceph_session_op_name(op), session,
+            session_state_name(session->s_state), seq);
+       switch (op) {
+       case CEPH_SESSION_OPEN:
+               session->s_state = CEPH_MDS_SESSION_OPEN;
+               renewed_caps(mdsc, session, 0);
+               wake = 1;
+               if (mdsc->stopping)
+                       __close_session(mdsc, session);
+               break;
+
+       case CEPH_SESSION_RENEWCAPS:
+               renewed_caps(mdsc, session, 1);
+               break;
+
+       case CEPH_SESSION_CLOSE:
+               unregister_session(mdsc, mds);
+               remove_session_caps(session);
+               wake = 1; /* for good measure */
+               complete(&mdsc->session_close_waiters);
+               kick_requests(mdsc, mds, 0);      /* cur only */
+               break;
+
+       case CEPH_SESSION_STALE:
+               dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
+               spin_lock(&session->s_cap_lock);
+               session->s_cap_gen++;
+               session->s_cap_ttl = 0;
+               spin_unlock(&session->s_cap_lock);
+               send_renew_caps(mdsc, session);
+               break;
+
+       default:
+               derr(0, "bad session op %d from mds%d\n", op, mds);
+               WARN_ON(1);
+       }
+
+       mutex_unlock(&session->s_mutex);
+       if (wake) {
+               mutex_lock(&mdsc->mutex);
+               __wake_requests(mdsc, &session->s_waiting);
+               mutex_unlock(&mdsc->mutex);
+       }
+       ceph_put_mds_session(session);
+       return;
+
+bad:
+       derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
+            (int)msg->front.iov_len, (int)sizeof(*h));
+       return;
+}
+
 
 /*
  * called under session->mutex.
@@ -1507,7 +1507,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        dout(1, "reconnect to recovering mds%d\n", mds);
 
        /* find session */
-       session = __ceph_get_mds_session(mdsc, mds);
+       session = __ceph_lookup_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
 
        if (session) {
@@ -1654,7 +1654,7 @@ send:
 
        if (session) {
                session->s_state = CEPH_MDS_SESSION_OPEN;
-               complete(&session->s_completion);
+               __wake_requests(mdsc, &session->s_waiting);
        }
 
 out:
@@ -1730,7 +1730,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                        if (s->s_state == CEPH_MDS_SESSION_OPENING) {
                                /* the session never opened, just close it
                                 * out now */
-                               complete(&s->s_completion);
+                               __wake_requests(mdsc, &s->s_waiting);
                                unregister_session(mdsc, i);
                        }
 
@@ -1792,7 +1792,7 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
        /* find session */
        mutex_lock(&mdsc->mutex);
-       session = __ceph_get_mds_session(mdsc, mds);
+       session = __ceph_lookup_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                derr(0, "WTF, got lease but no session for mds%d\n", mds);
@@ -1946,13 +1946,14 @@ static void delayed_work(struct work_struct *work)
                mdsc->last_renew_caps = jiffies;
 
        for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i);
+               struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
                if (s == NULL)
                        continue;
                if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
                        dout(10, "resending session close request for mds%d\n",
                             s->s_mds);
                        request_close_session(mdsc, s);
+                       ceph_put_mds_session(s);
                        continue;
                }
                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
@@ -1990,8 +1991,8 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        mdsc->client = client;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = NULL;            /* none yet */
-       init_completion(&mdsc->map_waiters);
        init_completion(&mdsc->session_close_waiters);
+       INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->sessions = NULL;
        mdsc->max_sessions = 0;
        mdsc->stopping = 0;
@@ -2019,7 +2020,7 @@ static void drop_leases(struct ceph_mds_client *mdsc)
        dout(10, "drop_leases\n");
        mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *s = __ceph_get_mds_session(mdsc, i);
+               struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
                if (!s)
                        continue;
                mutex_unlock(&mdsc->mutex);
@@ -2049,7 +2050,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        struct ceph_mds_session *session;
        int i;
        int n;
-       unsigned long started, timeout = 60 * HZ;
+       unsigned long started, timeout = CEPH_MOUNT_TIMEOUT;
        struct ceph_client *client = mdsc->client;
 
        dout(10, "close_sessions\n");
@@ -2085,7 +2086,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                dout(10, "closing sessions\n");
                n = 0;
                for (i = 0; i < mdsc->max_sessions; i++) {
-                       session = __ceph_get_mds_session(mdsc, i);
+                       session = __ceph_lookup_mds_session(mdsc, i);
                        if (!session)
                                continue;
                        mutex_unlock(&mdsc->mutex);
@@ -2109,6 +2110,21 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                mutex_lock(&mdsc->mutex);
        }
 
+       /* tear down remaining sessions */
+       for (i = 0; i < mdsc->max_sessions; i++) {
+               if (mdsc->sessions[i]) {
+                       session = get_session(mdsc->sessions[i]);
+                       unregister_session(mdsc, i);
+                       mutex_unlock(&mdsc->mutex);
+                       mutex_lock(&session->s_mutex);
+                       remove_session_caps(session);
+                       mutex_unlock(&session->s_mutex);
+                       ceph_put_mds_session(session);
+                       mutex_lock(&mdsc->mutex);                       
+               }
+       }
+
+
        WARN_ON(!list_empty(&mdsc->cap_delay_list));
 
        mutex_unlock(&mdsc->mutex);
@@ -2197,9 +2213,10 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                mdsc->mdsmap = newmap;  /* first mds map */
        }
 
+       __wake_requests(mdsc, &mdsc->waiting_for_map);
+
        mutex_unlock(&mdsc->mutex);
        schedule_delayed(mdsc);
-       complete(&mdsc->map_waiters);
        return;
 
 bad_unlock:
index 7f93791fe7305d1bc61c4cd73e4a8619597c3464..b65bb7a8506d6e31b0e45de6e91ee501de0fdfcc 100644 (file)
@@ -120,7 +120,7 @@ struct ceph_mds_session {
        struct list_head  s_rdcaps;   /* just the readonly caps */
        int               s_nr_caps;
        atomic_t          s_ref;
-       struct completion s_completion;
+       struct list_head  s_waiting;  /* waiting requests */
        struct list_head  s_unsafe;   /* unsafe requests */
 };
 
@@ -149,6 +149,8 @@ struct ceph_mds_request {
        unsigned long r_request_started; /* start time for mds request only,
                                            used to measure lease durations */
 
+       struct list_head r_wait;
+
        /* for choosing which mds to send this request to */
        struct dentry *r_direct_dentry;
        int r_direct_mode;
@@ -189,7 +191,8 @@ struct ceph_mds_client {
        struct mutex            mutex;         /* all nested structures */
 
        struct ceph_mdsmap      *mdsmap;
-       struct completion       map_waiters, session_close_waiters;
+       struct completion       session_close_waiters;
+       struct list_head        waiting_for_map;
 
        struct ceph_mds_session **sessions;    /* NULL for mds if no session */
        int                     max_sessions;  /* len of s_mds_sessions */
@@ -219,7 +222,7 @@ struct ceph_mds_client {
 
 extern const char *ceph_mds_op_name(int op);
 
-extern struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *, int mds);
+extern struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
 
 inline static struct ceph_mds_session *
 ceph_get_mds_session(struct ceph_mds_session *s)
index 89e94f8666b13705c5aef89ea90cc942b27c07bf..888e5772d1b68c227ac73f98e8691f5e0661c2cb 100644 (file)
@@ -718,7 +718,7 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 
        /* find session */
        mutex_lock(&mdsc->mutex);
-       session = __ceph_get_mds_session(mdsc, mds);
+       session = __ceph_lookup_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                dout(10, "WTF, got snap but no session for mds%d\n", mds);
index 9560c004184f753c5d345ad34a4ff36077424b5f..5e2d4415d3fadc963718c88660107c0f177ecbd5 100644 (file)
@@ -1050,6 +1050,7 @@ static int ceph_get_sb(struct file_system_type *fs_type,
        return 0;
 
 out_splat:
+       ceph_mdsc_close_sessions(&client->mdsc);
        up_write(&sb->s_umount);
        deactivate_super(sb);
        goto out_final;
index 8251811dd7f30c796df7d9abddb8763bc919e917..136ed6e183d504d72c0d571bd3f31dce44d35983 100644 (file)
@@ -26,6 +26,9 @@
 #define CEPH_BLOCK_SHIFT   20  /* 1 MB */
 #define CEPH_BLOCK         (1 << CEPH_BLOCK_SHIFT)
 
+#define CEPH_MOUNT_TIMEOUT  (60*HZ)
+#define CEPH_CAP_DELAY      (5*HZ)  /* cap release delay */
+
 /*
  * subtract jiffies
  */
@@ -131,7 +134,7 @@ static inline struct ceph_client *ceph_client(struct super_block *sb)
  * capabilities.
  *
  * Each cap is referenced by the inode's i_caps tree and by a per-mds
- * session capability list.
+ * session capability list(s).
  */
 struct ceph_cap {
        struct ceph_inode_info *ci;