]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: some msgr cleanups
authorSage Weil <sage@newdream.net>
Tue, 29 Apr 2008 23:23:02 +0000 (16:23 -0700)
committerSage Weil <sage@newdream.net>
Tue, 29 Apr 2008 23:23:02 +0000 (16:23 -0700)
src/kernel/messenger.c

index df121f90afed3b18c3275d74c1cd9d4ca78601c6..25b75f24c0145ca2b86eda3711a4ed25707f07b3 100644 (file)
@@ -441,7 +441,7 @@ static void prepare_write_message(struct ceph_connection *con)
        /* move to sending/sent list */
        list_del_init(&m->list_head);
        list_add_tail(&m->list_head, &con->out_sent);
-       con->out_msg = m;  /* FIXME: do we want to take a reference here? */
+       con->out_msg = m;  /* we dont bother taking a reference here. */
 
        /* encode header */
        dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
@@ -567,6 +567,7 @@ more:
                        con->connect_seq++;
                prepare_write_connect(msgr, con);
                set_bit(CONNECTING, &con->state);
+               con->in_tag = CEPH_MSGR_TAG_READY;
                dout(5, "try_write initiating connect on %p new state %lu\n",
                     con, con->state);
                ret = ceph_tcp_connect(con);
@@ -635,7 +636,6 @@ static int prepare_read_message(struct ceph_connection *con)
 {
        int err;
        BUG_ON(con->in_msg != NULL);
-       con->in_tag = CEPH_MSGR_TAG_MSG;
        con->in_base_pos = 0;
        con->in_msg = ceph_msg_new(0, 0, 0, 0, 0);
        if (IS_ERR(con->in_msg)) {
@@ -760,13 +760,34 @@ no_data:
        return 1; /* done! */
 }
 
+static void process_message(struct ceph_connection *con)
+{
+       /* if first message, set peer_name */
+       if (con->peer_name.type == 0)
+               con->peer_name = con->in_msg->hdr.src.name;
+       
+       spin_lock(&con->out_queue_lock);
+       con->in_seq++;
+       spin_unlock(&con->out_queue_lock);
+       ceph_queue_write(con);
+       
+       dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
+            con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
+            ENTITY_NAME(con->in_msg->hdr.src.name),
+            le32_to_cpu(con->in_msg->hdr.type),
+            ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
+            le32_to_cpu(con->in_msg->hdr.front_len),
+            le32_to_cpu(con->in_msg->hdr.data_len));
+       con->msgr->dispatch(con->msgr->parent, con->in_msg);
+       con->in_msg = 0;
+       con->in_tag = CEPH_MSGR_TAG_READY;
+}
 
 /*
  * prepare to read an ack
  */
 static void prepare_read_ack(struct ceph_connection *con)
 {
-       con->in_tag = CEPH_MSGR_TAG_ACK;
        con->in_base_pos = 0;
 }
 
@@ -787,10 +808,12 @@ static int read_ack_partial(struct ceph_connection *con)
        return 1; /* done */
 }
 
-static void process_ack(struct ceph_connection *con, __u32 ack)
+static void process_ack(struct ceph_connection *con)
 {
        struct ceph_msg *m;
-       __u64 seq;
+       u32 ack = le32_to_cpu(con->in_partial_ack);
+       u64 seq;
+
        while (!list_empty(&con->out_sent)) {
                m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
                seq = le64_to_cpu(m->hdr.seq);
@@ -801,6 +824,7 @@ static void process_ack(struct ceph_connection *con, __u32 ack)
                list_del_init(&m->list_head);
                ceph_msg_put(m);
        }
+       con->in_tag = CEPH_MSGR_TAG_READY;
 }
 
 
@@ -921,7 +945,7 @@ static void process_connect(struct ceph_connection *con)
        default:
                derr(1, "process_connect protocol error, will retry\n");
                con->delay = BASE_DELAY_INTERVAL;
-               con->error_msg = "protocol error";
+               con->error_msg = "protocol error, garbage tag during connect";
                ceph_fault(con);
        }
        if (test_bit(WRITE_PENDING, &con->state))
@@ -1107,10 +1131,7 @@ more:
                if (ret <= 0)
                        goto done;
                process_connect(con);
-               if (con->in_tag == CEPH_MSGR_TAG_RETRY)
-                       goto more;
-               if (test_bit(CLOSED, &con->state))
-                       goto done;
+               goto more;
        }
 
        if (con->in_base_pos < 0) {
@@ -1128,58 +1149,40 @@ more:
                if (ret <= 0)
                        goto done;
                dout(30, "try_read got tag %d\n", (int)con->in_tag);
-               if (con->in_tag == CEPH_MSGR_TAG_MSG)
+               switch (con->in_tag) {
+               case CEPH_MSGR_TAG_MSG:
                        prepare_read_message(con);
-               else if (con->in_tag == CEPH_MSGR_TAG_ACK)
+                       break;
+               case CEPH_MSGR_TAG_ACK:
                        prepare_read_ack(con);
-               else if (con->in_tag == CEPH_MSGR_TAG_CLOSE) {
+                       break;
+               case CEPH_MSGR_TAG_CLOSE:
                        set_bit(CLOSED, &con->state);   /* fixme */
                        goto done;
-               } else {
-                       derr(2, "try_read got bad tag %d\n", (int)con->in_tag);
-                       ret = -EINVAL;
-                       goto bad;
+               default:
+                       goto bad_tag;
                }
-               goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
                ret = read_message_partial(con);
                if (ret <= 0)
                        goto done;
-
-               /* if first message, set peer_name */
-               if (con->peer_name.type == 0)
-                       con->peer_name = con->in_msg->hdr.src.name;
-               
-               spin_lock(&con->out_queue_lock);
-               con->in_seq++;
-               spin_unlock(&con->out_queue_lock);
-               ceph_queue_write(con);
-               
-               dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
-                    con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
-                    ENTITY_NAME(con->in_msg->hdr.src.name),
-                    le32_to_cpu(con->in_msg->hdr.type),
-                    ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
-                    le32_to_cpu(con->in_msg->hdr.front_len),
-                    le32_to_cpu(con->in_msg->hdr.data_len));
-               msgr->dispatch(con->msgr->parent, con->in_msg);
-               con->in_msg = 0;
-               con->in_tag = CEPH_MSGR_TAG_READY;
+               process_message(con);
                goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
                ret = read_ack_partial(con);
                if (ret <= 0)
                        goto done;
-               /* got an ack */
-               process_ack(con, con->in_partial_ack);
-               con->in_tag = CEPH_MSGR_TAG_READY;
+               process_ack(con);
                goto more;
        }
+
+bad_tag:
        derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag);
-bad:
-       BUG_ON(1); /* shouldn't get here */
+       con->error_msg = "protocol error, garbage tag";
+       ceph_fault(con);
+
 done:
        clear_bit(READING, &con->state);
        if (test_bit(READABLE, &con->state)) {
@@ -1209,7 +1212,7 @@ static void try_accept(struct work_struct *work)
        /* initialize the msgr connection */
        new_con = new_connection(msgr);
        if (new_con == NULL) {
-               derr(1, "malloc failure\n");
+               derr(1, "kmalloc failure accepting new connection\n");
                goto done;
        }
        if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
@@ -1219,7 +1222,6 @@ static void try_accept(struct work_struct *work)
        }
        dout(5, "accepted connection \n");
 
-       new_con->in_tag = CEPH_MSGR_TAG_READY;
        new_con->connect_seq = 1;
        set_bit(ACCEPTING, &new_con->state);
        clear_bit(NEW, &new_con->state);