]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mdsc: more locking cleanup (not done yet)
authorSage Weil <sage@newdream.net>
Tue, 22 Jan 2008 23:47:43 +0000 (15:47 -0800)
committerSage Weil <sage@newdream.net>
Tue, 22 Jan 2008 23:47:43 +0000 (15:47 -0800)
src/kernel/mds_client.c

index abc3ea18971c935f4d74f1190072b9792a25e049..8a15fe76dafb871701bf0d8e3d7f03898984413f 100644 (file)
@@ -37,6 +37,19 @@ static void put_request(struct ceph_mds_request *req)
        } 
 }
 
+static struct ceph_mds_request *find_request_and_lock(struct ceph_mds_client *mdsc, __u64 tid)
+{
+       struct ceph_mds_request *req;
+       spin_lock(&mdsc->lock);
+       req = radix_tree_lookup(&mdsc->request_tree, tid);
+       if (!req) {
+               spin_unlock(&mdsc->lock);
+               return NULL;
+       }
+       get_request(req);
+       return req;
+}
+
 static struct ceph_mds_request *new_request(struct ceph_msg *msg, int mds)
 {
        struct ceph_mds_request *req;
@@ -92,26 +105,16 @@ static int choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req
        return ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 }
 
-static void register_session(struct ceph_mds_client *mdsc, int mds)
+/*
+ * create+register a new session for given mds.  
+ *  drop locks for kmalloc, check for races.
+ */
+static struct ceph_mds_session *__register_session(struct ceph_mds_client *mdsc, int mds)
 {
        struct ceph_mds_session *s;
 
-       /* register */
-       dout(10, "register_session mds%d\n", mds);
-       if (mds >= mdsc->max_sessions) {
-               struct ceph_mds_session **sa;
-               /* realloc */
-               dout(50, "register_session realloc to %d\n", mds+1);
-               sa = kzalloc((mds+1) * sizeof(struct ceph_mds_session), GFP_KERNEL);
-               BUG_ON(sa == NULL);  /* i am lazy */
-               if (mdsc->sessions) {
-                       memcpy(sa, mdsc->sessions, 
-                              mdsc->max_sessions*sizeof(struct ceph_mds_session));
-                       kfree(mdsc->sessions);
-               }
-               mdsc->sessions = sa;
-               mdsc->max_sessions = mds+1;
-       }
+       spin_unlock(&mdsc->lock);
+
        s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
@@ -121,16 +124,49 @@ static void register_session(struct ceph_mds_client *mdsc, int mds)
        s->s_nr_caps = 0;
        atomic_set(&s->s_ref, 1);
        init_completion(&s->s_completion);
-       mdsc->sessions[mds] = s;
+
+       spin_lock(&mdsc->lock);
+
+       /* register */
+       dout(10, "register_session mds%d\n", mds);
+       if (mds >= mdsc->max_sessions) {
+               int newmax = 1 << get_count_order(mds+1);
+               struct ceph_mds_session **sa;
+
+               dout(50, "register_session realloc to %d\n", newmax);
+               spin_unlock(&mdsc->lock);
+               sa = kzalloc(newmax * sizeof(void*), GFP_KERNEL);
+               spin_lock(&mdsc->lock);
+               if (sa == NULL)
+                       return ERR_PTR(-ENOMEM);
+               if (mdsc->max_sessions < newmax) {
+                       if (mdsc->sessions) {
+                               memcpy(sa, mdsc->sessions, 
+                                      mdsc->max_sessions*sizeof(struct ceph_mds_session));
+                               kfree(mdsc->sessions);
+                       }
+                       mdsc->sessions = sa;
+                       mdsc->max_sessions = newmax;
+               } else {
+                       kfree(sa);  /* lost race */
+               }
+       }
+       if (mdsc->sessions[mds]) {
+               ceph_mdsc_put_session(s); /* lost race */
+               return mdsc->sessions[mds];
+       } else {
+               mdsc->sessions[mds] = s;
+               return s;
+       }
 }
 
-static struct ceph_mds_session *get_session(struct ceph_mds_client *mdsc, int mds)
+static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc, int mds)
 {
        struct ceph_mds_session *session;
        
-       dout(10, "get_session %d max %d\n", mds, mdsc->max_sessions);
+       dout(10, "__get_session %d max %d\n", mds, mdsc->max_sessions);
        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0) 
-               register_session(mdsc, mds);
+               return NULL;
        session = mdsc->sessions[mds];
 
        atomic_inc(&session->s_ref);
@@ -195,19 +231,21 @@ static int open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *s
                }
        } 
        
+       session->s_state = CEPH_MDS_SESSION_OPENING;
+       spin_unlock(&mdsc->lock);
+
        /* send connect message */
        msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_cap_seq);
        if (IS_ERR(msg))
                return PTR_ERR(msg);  /* fixme */
-       session->s_state = CEPH_MDS_SESSION_OPENING;
        send_msg_mds(mdsc, msg, mds);
 
        /* wait for session to open (or fail, or close) */
-       spin_unlock(&mdsc->lock);
        dout(30, "open_session waiting on session %p\n", session);
        wait_for_completion(&session->s_completion);
        dout(30, "open_session done waiting on session %p, state %d\n", 
             session, session->s_state);
+
        spin_lock(&mdsc->lock);
        return 0;
 }
@@ -229,6 +267,10 @@ static int resume_session(struct ceph_mds_client *mdsc, struct ceph_mds_session
        }
        spin_unlock(&session->s_cap_lock);
        
+       /*
+        * fixme: drop mdsc->lock?
+        */
+
        /* send request_resume message */
        msg = create_session_msg(CEPH_SESSION_REQUEST_RESUME, session->s_cap_seq);
        if (IS_ERR(msg))
@@ -292,7 +334,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg
        seq = le64_to_cpu(h->seq);
        
        /* handle */
-       session = get_session(mdsc, mds);
+       session = __get_session(mdsc, mds);
        dout(1, "handle_session op %d seq %llu\n", op, seq);
        spin_lock(&mdsc->lock);
        switch (op) {
@@ -438,7 +480,9 @@ retry:
        dout(30, "do_request chose mds%d\n", mds);
 
        /* get session */
-       session = get_session(mdsc, mds);
+       session = __get_session(mdsc, mds);
+       if (!session)
+               session = __register_session(mdsc, mds);
        dout(30, "do_request session %p state %d\n", session, session->s_state);
 
        /* open? */
@@ -461,10 +505,10 @@ retry:
        rhead = req->r_request->front.iov_base;
        rhead->retry_attempt = cpu_to_le32(req->r_attempts-1);
        rhead->oldest_client_tid = cpu_to_le64(get_oldest_tid(mdsc));
-       send_msg_mds(mdsc, req->r_request, mds);
 
-       /* wait */
+       /* send and wait */
        spin_unlock(&mdsc->lock);
+       send_msg_mds(mdsc, req->r_request, mds);
        wait_for_completion(&req->r_completion);
        spin_lock(&mdsc->lock);
 
@@ -477,13 +521,14 @@ retry:
        __unregister_request(mdsc, req);
        spin_unlock(&mdsc->lock);
        put_request(req);
+
        if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) {
                ceph_mdsc_put_session(session);
                return err;
        }
-
        dout(30, "do_request done on %p result %d tracelen %d\n", msg, 
             rinfo->head->result, rinfo->trace_nr);
+
        if (psession)
                *psession = session;
        else
@@ -505,23 +550,18 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        tid = le64_to_cpu(head->tid);
 
        /* pass to blocked caller */
-       spin_lock(&mdsc->lock);
-       req = radix_tree_lookup(&mdsc->request_tree, tid);
+       req = find_request_and_lock(mdsc, tid);
        if (!req) {
-               spin_unlock(&mdsc->lock);
                dout(1, "got reply on unknown tid %llu\n", tid);
-               goto done;
+       } else {
+               BUG_ON(req->r_reply);
+               req->r_reply = msg;
+               spin_unlock(&mdsc->lock);
+               
+               ceph_msg_get(msg);
+               complete(&req->r_completion);
+               put_request(req);
        }
-       get_request(req);
-       spin_unlock(&mdsc->lock);
-
-       /* FIXME: locking on request? */
-       BUG_ON(req->r_reply);
-       req->r_reply = msg;
-       ceph_msg_get(msg);
-       complete(&req->r_completion);
-       put_request(req);
-       
 done:
        return;
 }
@@ -715,13 +755,9 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg
                goto bad;
 
        /* handle */
-       spin_lock(&mdsc->lock);
-       req = radix_tree_lookup(&mdsc->request_tree, tid);
-       if (req) get_request(req);
-       if (!req) {
-               spin_unlock(&mdsc->lock);
+       req = find_request_and_lock(mdsc, tid);
+       if (!req)
                return;  /* dup reply? */
-       }
        
        /* do we have a session with the dest mds? */
        if (next_mds < mdsc->max_sessions &&
@@ -781,6 +817,8 @@ void kick_requests(struct ceph_mds_client *mdsc, int mds)
 
 /*
  * send an MClientReconnect to a recovering mds
+ *
+ * FIXME: needs to drop mdsc->lock ...
  */
 void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 {
@@ -799,7 +837,7 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        dout(10, "send_mds_reconnect mds%d\n", mds);
 
        /* find session */
-       session = get_session(mdsc, mds);
+       session = __get_session(mdsc, mds);
        if (!session) {
                dout(20, "no session for mds%d, sending short reconnect\n", mds);
                reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0);
@@ -856,7 +894,7 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                if (p + pathlen + 4 + sizeof(struct ceph_mds_cap_reconnect) > end) {
                        /* der, realloc front */
                        /*
-                        * this is broken (kmalloc under spinlock).  should probably drop
+                        * fixme: this is broken (kmalloc under spinlock).  should probably drop
                         * lock, and retry with higher initial estimate?
                         */
                        int off = end-p;
@@ -1054,7 +1092,7 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, struct ceph_msg *ms
        size = le64_to_cpu(h->seq);
 
        /* find session */
-       session = get_session(&client->mdsc, mds);
+       session = __get_session(&client->mdsc, mds);
        if (!session) {
                dout(10, "WTF, got filecap msg but no session for mds%d\n", mds);
                return;
@@ -1106,7 +1144,7 @@ int ceph_mdsc_update_cap_wanted(struct ceph_inode_info *ci, int wanted)
        for (i=0; i<ci->i_nr_caps; i++) {
                cap = &ci->i_caps[i];
 
-               session = get_session(mdsc, cap->mds);
+               session = __get_session(mdsc, cap->mds);
                BUG_ON(!session);
 
                cap->caps &= wanted;  /* drop caps we don't want */