]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cleaned up connection spin_locks: READING/WRITING bits now used for mutual exclusion
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 3 Dec 2007 06:17:32 +0000 (06:17 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 3 Dec 2007 06:17:32 +0000 (06:17 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2172 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/kernel/client.c
trunk/ceph/kernel/ktcp.c
trunk/ceph/kernel/mds_client.c
trunk/ceph/kernel/mds_client.h
trunk/ceph/kernel/messenger.c
trunk/ceph/kernel/messenger.h

index 49181b39fd21dfbcae63a9e3d49a5c07325c4a29..9e16eed94ee504207c2afc72a267d9c9334e66d4 100644 (file)
@@ -144,7 +144,8 @@ trymount:
 static void handle_monmap(struct ceph_client *client, struct ceph_msg *msg)
 {
        int err;
-       
+       int first = (client->monc.monmap.epoch == 0);
+
        dout(1, "handle_monmap had epoch %d\n", client->monc.monmap.epoch);
 
        /* parse */
@@ -153,10 +154,11 @@ static void handle_monmap(struct ceph_client *client, struct ceph_msg *msg)
                                 msg->front.iov_base + msg->front.iov_len);
        if (err != 0)
                return;
-       
-       if (client->whoami < 0) {
+
+       if (first) {
                client->whoami = msg->hdr.dst.name.num;
                client->msgr->inst.name = msg->hdr.dst.name;
+               dout(1, "i am client%d\n", client->whoami);
        }
 }
 
@@ -262,6 +264,9 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg)
                if (!had && client->mdsc.mdsmap) 
                        got_first_map(client, 1);
                break;
+       case CEPH_MSG_CLIENT_SESSION:
+               ceph_mdsc_handle_session(&client->mdsc, msg);
+               break;
        case CEPH_MSG_CLIENT_REPLY:
                ceph_mdsc_handle_reply(&client->mdsc, msg);
                break;
index e750ef99bc848e83090ff161e971570bdb19ab89..9b0014a50a1fb087bc99eb407fc4469b1e36ff76 100644 (file)
@@ -29,8 +29,9 @@ static void ceph_data_ready(struct sock *sk, int count_unused)
 {
         struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
        if (con && (sk->sk_state != TCP_CLOSE_WAIT)) {
-               dout(30, "ceph_data_ready connection %p state = %u, queuing rwork\n",
+               dout(30, "ceph_data_ready on %p state = %u, queuing rwork\n",
                     con, con->state);
+               set_bit(READABLE, &con->state);
                queue_work(recv_wq, &con->rwork);
        }
 }
@@ -44,7 +45,8 @@ static void ceph_write_space(struct sock *sk)
        /* only queue to workqueue if not already queued */
         if (con && !work_pending(&con->swork) &&
            test_bit(WRITE_PENDING, &con->state)) {
-                dout(30, "ceph_write_space %p queueing write work\n", con);
+                dout(30, "ceph_write_space %p queuing write work\n", con);
+               set_bit(WRITEABLE, &con->state);
                 queue_work(send_wq, &con->swork);
         }
 }
index 1c7c26e5f03c6c9554f952da51a1151d1d783d97..38036f30a26dfc98918421d17b8afd6473f29ae8 100644 (file)
@@ -43,19 +43,18 @@ register_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds)
        struct ceph_mds_request *req;
 
        req = kmalloc(sizeof(*req), GFP_KERNEL);
-
+       req->r_tid = ++mdsc->last_tid;
        req->r_request = msg;
-       ceph_msg_get(msg);  /* grab reference */
        req->r_reply = 0;
        req->r_num_mds = 0;
        req->r_attempts = 0;
        req->r_num_fwd = 0;
        req->r_resend_mds = mds;
        atomic_set(&req->r_ref, 2);  /* one for request_tree, one for caller */
+       init_completion(&req->r_completion);
 
-       req->r_tid = ++mdsc->last_tid;
        radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req);
-
+       ceph_msg_get(msg);  /* grab reference */
        return req;
 }
 
@@ -89,8 +88,8 @@ static void register_session(struct ceph_mds_client *mdsc, int mds)
        if (mds >= mdsc->max_sessions) {
                struct ceph_mds_session **sa;
                /* realloc */
-               dout(50, "mdsc register_session realloc to %d\n", mds);
-               sa = kzalloc(mds * sizeof(struct ceph_mds_session), GFP_KERNEL);
+               dout(50, "mdsc register_session realloc to %d\n", mds+1);
+               sa = kzalloc((mds+1) * sizeof(struct ceph_mds_session), GFP_KERNEL);
                BUG_ON(sa == NULL);  /* i am lazy */
                if (mdsc->sessions) {
                        memcpy(sa, mdsc->sessions, 
@@ -98,12 +97,13 @@ static void register_session(struct ceph_mds_client *mdsc, int mds)
                        kfree(mdsc->sessions);
                }
                mdsc->sessions = sa;
+               mdsc->max_sessions = mds+1;
        }
        s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
        s->s_state = 0;
        s->s_cap_seq = 0;
-       init_completion(&s->s_completion);
        atomic_set(&s->s_ref, 1);
+       init_completion(&s->s_completion);
        mdsc->sessions[mds] = s;
 }
 
@@ -223,10 +223,12 @@ bad:
 
 static void wait_for_new_map(struct ceph_mds_client *mdsc)
 {
+       dout(30, "wait_for_new_map enter\n");
        if (mdsc->last_requested_map < mdsc->mdsmap->m_epoch)
                ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch);
 
        wait_for_completion(&mdsc->map_waiters);
+       dout(30, "wait_for_new_map exit\n");
 }
 
 /* exported functions */
@@ -253,10 +255,12 @@ ceph_mdsc_create_request_msg(struct ceph_mds_client *mdsc, int op,
        struct ceph_msg *req;
        struct ceph_client_request_head *head;
        void *p, *end;
+       int pathlen = 2*(sizeof(ino1) + sizeof(__u32));
+       if (path1) pathlen += strlen(path1);
+       if (path2) pathlen += strlen(path2);
 
        req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, 
-                          sizeof(struct ceph_client_request_head) +
-                          sizeof(ino1)*2 + sizeof(__u32)*2 + strlen(path1) + strlen(path2),
+                          sizeof(struct ceph_client_request_head) + pathlen,
                           0, 0);
        if (IS_ERR(req))
                return req;
@@ -328,16 +332,23 @@ retry:
        }
 
        /* wait */
+       dout(30, "mdsc_do_request 1\n");
        spin_unlock(&mdsc->lock);
+       dout(30, "mdsc_do_request 2\n");
        wait_for_completion(&req->r_completion);
+       dout(30, "mdsc_do_request 3\n");
 
        if (!req->r_reply) {
+               dout(30, "mdsc_do_request 4\n");
                spin_lock(&mdsc->lock);
+               dout(30, "mdsc_do_request 5\n");
                goto retry;
        }
        reply = req->r_reply;
 
+       dout(30, "mdsc_do_request 6\n");
        spin_lock(&mdsc->lock);
+       dout(30, "mdsc_do_request 7\n");
        unregister_request(mdsc, req);
        spin_unlock(&mdsc->lock);
 
@@ -354,17 +365,19 @@ int ceph_mdsc_do(struct ceph_mds_client *mdsc, int op,
        struct ceph_client_reply_head *head;
        int ret;
 
+       dout(30, "mdsc do 1\n");
        req = ceph_mdsc_create_request_msg(mdsc, op, ino1, path1, ino2, path2);
        if (IS_ERR(req)) 
                return PTR_ERR(req);
+       dout(30, "mdsc do 2\n");
 
        reply = ceph_mdsc_do_request(mdsc, req, -1);
        if (IS_ERR(reply))
                return PTR_ERR(reply);
-
+       dout(30, "mdsc do 3\n");
        head = reply->front.iov_base;
        ret = head->result;
-
+       dout(30, "mdsc do 4\n");
        ceph_msg_put(reply);
        return ret;
 }
index e1e29df87f1c9a7aeed5b69651418d145a3e8f8b..7d3cf97f179e437d6ae1b61b94a7671711985d78 100644 (file)
@@ -65,8 +65,9 @@ extern void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *cli
 struct ceph_msg *ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct ceph_msg *msg, int mds);
 int ceph_mdsc_do(struct ceph_mds_client *mdsc, int op, ceph_ino_t ino1, const char *path1, ceph_ino_t ino2, const char *path2);
 
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
+extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
 extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
 extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg);
 
 #endif
index 2825ad26b116fc37e0482b25f6552352ea9970e3..f6bca4da7bce7ffcea9b974d828836dbfb4ee260 100644 (file)
@@ -58,18 +58,19 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
                return NULL;
 
        con->msgr = msgr;
+       set_bit(NEW, &con->state);
+       atomic_set(&con->nref, 1);
 
        INIT_LIST_HEAD(&con->list_all);
        INIT_LIST_HEAD(&con->list_bucket);
+
+       spin_lock_init(&con->out_queue_lock);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
 
-       spin_lock_init(&con->lock);
-       set_bit(NEW, &con->state);
-       INIT_WORK(&con->rwork, try_read);       /* setup work structure */
-       INIT_WORK(&con->swork, try_write);      /* setup work structure */
+       INIT_WORK(&con->rwork, try_read);
+       INIT_WORK(&con->swork, try_write);
 
-       atomic_inc(&con->nref);
        return con;
 }
 
@@ -392,16 +393,44 @@ static void try_write(struct work_struct *work)
        struct ceph_messenger *msgr;
        int ret = 1;
 
-       dout(30, "try_write start\n");
        con = container_of(work, struct ceph_connection, swork);
        msgr = con->msgr;
+       dout(30, "try_write start %p state %d\n", con, con->state);
+
+retry:
+       if (test_and_set_bit(WRITING, &con->state) != 0) {
+               dout(30, "try_write connection already writing\n");
+               return;
+       }
+       clear_bit(WRITEABLE, &con->state);
 
 more:
        dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
+       /* initiate connect? */
+       if (test_and_clear_bit(NEW, &con->state)) {
+               prepare_write_connect(msgr, con);
+               set_bit(CONNECTING, &con->state);
+               dout(5, "try_write initiating connect on %p new state %u\n", con, con->state);
+               ret = ceph_tcp_connect(con);
+               dout(5, "try_write initiated connect\n");
+               if (ret < 0) {
+                       /* fault */
+                       derr(1, "connect error, FIXME\n");
+                       goto done;
+               }
+       }
+       /*if (test_bit(CONNECTING, &con->state)) {
+               dout(30, "try_write still connecting, doing nothing for now\n");
+               goto done;
+       }
+       */
+
        /* kvec data queued? */
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
+               if (ret == 0)
+                       goto done;
                if (test_and_clear_bit(REJECTING, &con->state)) {
                        dout(30, "try_write done rejecting, state %u, closing\n", con->state);
                        /* FIXME do something else here, pbly? */
@@ -409,7 +438,7 @@ more:
                        set_bit(CLOSED, &con->state);
                        put_connection(con);
                }
-               if (ret <= 0) {
+               if (ret < 0) {
                        /* TBD: handle error; return for now */
                        con->error = ret;
                        goto done; /* error */
@@ -424,21 +453,40 @@ more:
        }
        
        /* anything else pending? */
+       spin_lock(&con->out_queue_lock);
        if (con->in_seq > con->in_seq_acked) {
                prepare_write_ack(con);
-               goto more;
-       }
-       if (!list_empty(&con->out_queue)) {
+       } else if (!list_empty(&con->out_queue)) {
                prepare_write_message(con);
-               goto more;
+       } else {
+               /* hmm, nothing to do! No more writes pending? */
+               dout(30, "try_write nothing else to write.\n");
+               clear_bit(WRITING, &con->state);         /* clear this first */
+               clear_bit(WRITE_PENDING, &con->state);   /* and this second, to avoid a race. */
+               spin_unlock(&con->out_queue_lock);
+               return;
        }
-       
-       /* hmm, nothing to do! No more writes pending? */
-       dout(30, "try_write nothing else to write\n");
-       clear_bit(WRITE_PENDING, &con->state);
+       spin_unlock(&con->out_queue_lock);
+       goto more;
 
 done:
        dout(30, "try_write done\n");
+       clear_bit(WRITING, &con->state);
+
+       /*
+        * See if we became WRITEABLE again to avoid race against socket.  
+        * Otherwise, this would be bad:
+        *  A B
+        *  -   enter try_write, do some work
+        *  -   socket fills, we get -EAGAIN or whatever
+        *    - socket becomes writeable again, work is queued
+        *    - new try_write sees WRITING bit, exits
+        *  -   original try_write clears WRITING bit
+        */
+       if (test_bit(WRITEABLE, &con->state)) {
+               dout(30, "try_write writeable flag got set again, looping just in case\n");
+               goto retry;
+       }
        return;
 }
 
@@ -676,7 +724,7 @@ static void process_accept(struct ceph_connection *con)
        spin_lock(&con->msgr->con_lock);
        existing = get_connection(con->msgr, &con->peer_addr);
        if (existing) {
-               spin_lock(&existing->lock);
+               //spin_lock(&existing->lock);
                /* replace existing connection? */
                if ((test_bit(CONNECTING, &existing->state) && 
                     compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
@@ -695,7 +743,7 @@ static void process_accept(struct ceph_connection *con)
                        set_bit(REJECTING, &con->state);
                        con->connect_seq = existing->connect_seq; /* send this with the reject */
                }
-               spin_unlock(&existing->lock);
+               //spin_unlock(&existing->lock);
                put_connection(existing);
        } else {
                add_connection(con->msgr, con);
@@ -721,10 +769,16 @@ void try_read(struct work_struct *work)
        struct ceph_connection *con;
        struct ceph_messenger *msgr;
 
+       dout(20, "Entering try_read\n");
        con = container_of(work, struct ceph_connection, rwork);
-       spin_lock(&con->lock);
        msgr = con->msgr;
-       dout(20, "Entered try_read\n");
+
+retry:
+       if (test_and_set_bit(READING, &con->state)) {
+               dout(20, "try_read already reading\n");
+               return;
+       }
+       clear_bit(READABLE, &con->state);
 
 more:
        /*
@@ -739,8 +793,7 @@ more:
                dout(20, "try_read accepting\n");
                ret = read_accept_partial(con);
                if (ret <= 0) goto done;
-               /* accepted */
-               process_accept(con);
+               process_accept(con);            /* accepted */
                goto more;
        }
        if (test_bit(CONNECTING, &con->state)) {
@@ -785,8 +838,11 @@ more:
 bad:
        BUG_ON(1); /* shouldn't get here */
 done:
-       con->error = ret;
-       spin_unlock(&con->lock);
+       clear_bit(READING, &con->state);
+       if (test_bit(READABLE, &con->state)) {
+               dout(30, "try_read readable flag set again, looping\n");
+               goto retry;
+       }
        dout(20, "Exited try_read\n");
        return;
 }
@@ -914,40 +970,23 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg)
                     ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
        }                    
 
-       spin_lock(&con->lock);
-
-       /* initiate connect? */
-       dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state);
-       if (test_bit(NEW, &con->state)) {
-               prepare_write_connect(msgr, con);
-               spin_unlock(&con->lock);   /* hrm */
-               dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state);
-               ret = ceph_tcp_connect(con);
-               dout(5, "ceph_msg_send done initiating connect on %p new state %u\n", con, con->state);
-               spin_lock(&con->lock);
-               if (ret < 0) {
-                       derr(1, "connection failure to peer %x:%d\n",
-                            ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
-                            ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
-                       remove_connection(msgr, con);
-                       goto out;
-               }
-               set_bit(CONNECTING, &con->state);
-       }
-       
        /* queue */
+       spin_lock(&con->out_queue_lock);
        msg->hdr.seq = ++con->out_seq;
        dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
             ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
        ceph_msg_get(msg);
        list_add_tail(&msg->list_head, &con->out_queue);
-
-       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 
+       spin_unlock(&con->out_queue_lock);
+               
+       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) {
+               dout(30, "ceph_msg_send queuing new swork on %p\n", con);
                queue_work(send_wq, &con->swork);
+               dout(30, "ceph_msg_send queued\n");
+       }
 
-out:
-       spin_unlock(&con->lock);
        put_connection(con);
+       dout(30, "ceph_msg_send done\n");
        return ret;
 }
 
index 0362cf73a523ad8357f2770afee684cc2d2e8ba6..8a6168a5381d089528f06b287bad4d3bea08a7c2 100644 (file)
@@ -50,14 +50,18 @@ struct ceph_msg_pos {
 };
 
 
-/* current state of connection */
-#define NEW            1
-#define CONNECTING     2
-#define ACCEPTING      3
-#define OPEN           4
-#define WRITE_PENDING  5
-#define REJECTING      6
-#define CLOSED         7
+/* ceph_connection state bit flags */
+#define NEW            0
+#define CONNECTING     1
+#define ACCEPTING      2
+#define OPEN           3
+#define WRITE_PENDING  4  /* we have data to send */
+#define WRITEABLE      5  /* set when socket becomes writeable */
+#define WRITING        6  /* provides mutual exclusion, protecting out_kvec, etc. */
+#define READABLE       7  /* set when socket gets new data */
+#define READING        8  /* provides mutual exclusion, protecting in_* */
+#define REJECTING      9
+#define CLOSED        10
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
@@ -65,7 +69,6 @@ struct ceph_connection {
        __u32 state;            /* connection state */
        
        atomic_t nref;
-       spinlock_t lock;        /* connection lock */
 
        struct list_head list_all;   /* msgr->con_all */
        struct list_head list_bucket;  /* msgr->con_open or con_accepting */
@@ -80,7 +83,10 @@ struct ceph_connection {
        __u32 peer_connect_seq;
 
        /* out queue */
+       spinlock_t out_queue_lock;   /* protects out_queue, out_sent, out_seq */
        struct list_head out_queue;
+       struct list_head out_sent;   /* sending/sent but unacked; resend if connection drops */
+
        struct ceph_msg_header out_hdr;
        struct kvec out_kvec[4],
                *out_kvec_cur;
@@ -90,7 +96,6 @@ struct ceph_connection {
        struct ceph_msg *out_msg;
        struct ceph_msg_pos out_msg_pos;
 
-       struct list_head out_sent;   /* sending/sent but unacked; resend if connection drops */
 
        /* partially read message contents */
        char in_tag;       /* READY (accepting, or no in-progress read) or ACK or MSG */
@@ -224,11 +229,11 @@ static __inline__ int ceph_encode_32(void **p, void *end, __u32 v) {
 
 static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, const char *path)
 {
-       __u32 len = strlen(path);
+       __u32 len = path ? strlen(path):0;
        BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end);
        ceph_encode_64(p, end, ino);
        ceph_encode_32(p, end, len);
-       memcpy(*p, path, len);
+       if (len) memcpy(*p, path, len);
        *p += len;
        return 0;
 }