]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
message protocol changes for connect.
authorPatience Warnick <patience@cranium.pelton.net>
Mon, 17 Mar 2008 16:30:07 +0000 (09:30 -0700)
committerPatience Warnick <patience@cranium.pelton.net>
Mon, 17 Mar 2008 16:30:07 +0000 (09:30 -0700)
src/kernel/messenger.c

index aa9e7cec2c03e12bc09babd2bc6c5b07ebdd1f2d..b408f93416a0bd48d914fae2da7d0c0f2488f6b9 100644 (file)
@@ -8,17 +8,18 @@
 #include "messenger.h"
 #include "ktcp.h"
 
-int ceph_debug_msgr = 1;
+int ceph_debug_msgr = 50;
 #define DOUT_VAR ceph_debug_msgr
 #define DOUT_PREFIX "msgr: " 
 #include "super.h"
 
 /* static tag bytes */
 static char tag_ready = CEPH_MSGR_TAG_READY;
-static char tag_reject = CEPH_MSGR_TAG_REJECT;
+static char tag_reset = CEPH_MSGR_TAG_RESETSESSION;
+static char tag_retry = CEPH_MSGR_TAG_RETRY;
+static char tag_wait = CEPH_MSGR_TAG_WAIT;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
-//static char tag_close = CEPH_MSGR_TAG_CLOSE;
 
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
 static void try_read(struct work_struct *);
@@ -291,23 +292,48 @@ static void ceph_send_fault(struct ceph_connection *con)
             con, con->state,
             IPQUADPORT(con->peer_addr.ipaddr));
 
-       if (!test_and_clear_bit(CONNECTING, &con->state)){
-               derr(1, "CONNECTING bit not set\n");
-               /* TBD: reset buffer correctly */
-               /* reset buffer */
-               spin_lock(&con->out_queue_lock);
-               list_splice_init(&con->out_sent, &con->out_queue);
-               spin_unlock(&con->out_queue_lock);
-               clear_bit(OPEN, &con->state);
+       /* PW if never get here remove */
+       if (test_bit(WAIT, &con->state)) {
+                derr(30, "ceph_send_fault received socket close during WAIT state\n");
+               return;
        }
 
-       /* retry with delay */
-       ceph_queue_delayed_write(con);
+       if (con->delay) {
+               derr(30, "ceph_send_fault tcp_close delay != 0\n");
+               if (con->sock)
+                       sock_release(con->sock);
+               con->sock = NULL;
+               set_bit(NEW, &con->state);
 
-       if (con->delay < MAX_DELAY_INTERVAL)
-               con->delay *= 2;
-       else
-               con->delay = MAX_DELAY_INTERVAL;
+               /* If there are no messages in the queue, place the connection 
+               * in a STANDBY state otherwise retry with delay */
+               if (list_empty(&con->out_queue)) {
+                       dout(10, "setting STANDBY bit\n");
+                       set_bit(STANDBY, &con->state);
+                       return;
+               }
+
+               if (!test_and_clear_bit(CONNECTING, &con->state)){
+                       derr(1, "CONNECTING bit not set\n");
+                       /* TBD: reset buffer correctly */
+                       /* reset buffer */
+                       spin_lock(&con->out_queue_lock);
+                       list_splice_init(&con->out_sent, &con->out_queue);
+                       spin_unlock(&con->out_queue_lock);
+                       clear_bit(OPEN, &con->state);
+               }
+
+               /* retry with delay */
+               ceph_queue_delayed_write(con);
+
+               if (con->delay < MAX_DELAY_INTERVAL)
+                       con->delay *= 2;
+               else
+                       con->delay = MAX_DELAY_INTERVAL;
+       } else {
+               dout(30, "ceph_send_fault tcp_close delay = 0\n");
+               remove_connection(con->msgr, con);
+       }
 }
 
 
@@ -495,31 +521,10 @@ static void try_write(void *arg)
 
        if (test_bit(CLOSED, &con->state)) {
                dout(5, "try_write closed\n");
-               remove_connection(msgr, con);
                goto done;
        }
 
        if (test_and_clear_bit(SOCK_CLOSE, &con->state)) {
-               dout(5, "try_write TCP_CLOSE received\n");
-               if (!con->delay) {
-                       dout(30, "try_write tcp_close delay = 0\n");
-                       remove_connection(msgr, con);
-                       goto done;
-               }
-               dout(30, "try_write tcp_close delay != 0\n");
-               if (con->sock)
-                       sock_release(con->sock);
-               con->sock = NULL;
-               set_bit(NEW, &con->state);
-
-               /* If there are no messages in the queue, place the connection 
-               * in a STANDBY state otherwise retry with delay */
-               if (list_empty(&con->out_queue)) {
-                       dout(10, "setting STANDBY bit\n");
-                       set_bit(STANDBY, &con->state);
-                       goto done;
-               }
-
                ceph_send_fault(con);
                goto done;
        }
@@ -528,11 +533,12 @@ more:
 
        /* initiate connect? */
        if (test_and_clear_bit(NEW, &con->state)) {
+               if (test_and_clear_bit(STANDBY, &con->state))
+                       con->connect_seq++;
                prepare_write_connect(msgr, con);
                set_bit(CONNECTING, &con->state);
                dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
                ret = ceph_tcp_connect(con);
-               dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
                if (ret < 0) {
                        derr(1, "try_write tcp connect error %d\n", ret);
                        remove_connection(msgr, con);
@@ -545,17 +551,20 @@ more:
                ret = write_partial_kvec(con);
                if (ret == 0)
                        goto done;
-               if (test_and_clear_bit(REJECTING, &con->state)) {
-                       dout(30, "try_write done rejecting, state %lu, closing\n", con->state);
-                       /* FIXME do something else here, pbly? */
-                       remove_connection(msgr, con);
-               }
                if (ret < 0) {
                        dout(30, "try_write write_partial_kvec returned error %d\n", ret);
                        goto done;
                }
        }
 
+       /* check if connect handshake finished.. if not requeue and return.. */
+/*
+       if (!test_bit(OPEN, &con->state)) {
+                dout(5, "try_write state = %lu, need to requeue and exit\n", con->state);
+                goto done;
+        }
+*/
+       
        /* msg pages? */
        if (con->out_msg) {
                ret = write_partial_msg_pages(con, con->out_msg);
@@ -788,44 +797,62 @@ static int read_connect_partial(struct ceph_connection *con)
        ret = 1;
 out:
        dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
-       dout(20, "read_connect_partial peer_connect_seq = %d\n", con->peer_connect_seq);
+       dout(20, "read_connect_partial peer_connect_seq = %u\n", con->peer_connect_seq);
        return ret; /* done */
 }
 
 static void process_connect(struct ceph_connection *con)
 {
        dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
-       clear_bit(CONNECTING, &con->state);
        if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
                derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n",
                     IPQUADPORT(con->peer_addr.ipaddr),
                     con->peer_addr.nonce,
                     IPQUADPORT(con->actual_peer_addr.ipaddr),
                     con->actual_peer_addr.nonce);
-               con->in_tag = CEPH_MSGR_TAG_REJECT;
-       }
-       if (con->in_tag == CEPH_MSGR_TAG_REJECT) {
-               dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq);
                set_bit(CLOSED, &con->state);
+               remove_connection(con->msgr, con);
        }
-       if (con->in_tag == CEPH_MSGR_TAG_READY) {
+       switch (con->in_tag) {
+       case CEPH_MSGR_TAG_RESETSESSION:
+               dout(10, "process_connect got session RESET peer_connect_seq %u\n", 
+                    con->peer_connect_seq);
+               /* PW discard out_queue ? */
+               con->out_seq = 0;
+               con->in_seq = 0;
+               con->connect_seq = 0;
+               prepare_write_connect(con->msgr, con);
+               con->msgr->peer_reset(con);
+               ceph_queue_write(con);
+               break;
+       case CEPH_MSGR_TAG_RETRY:
+               dout(10, "process_connect got session RETRY connect_seq = %u, 
+                    peer_connect_seq = %u\n", con->connect_seq, con->peer_connect_seq);
+               con->connect_seq = con->peer_connect_seq;
+               prepare_write_connect(con->msgr, con);
+               ceph_queue_write(con);
+               break;
+       case CEPH_MSGR_TAG_WAIT:
+               dout(10, "process_connect peer connecting WAIT\n");
+               set_bit(WAIT, &con->state);
+               if (con->sock)
+                       sock_release(con->sock);
+               con->sock = NULL;
+               break;
+       case CEPH_MSGR_TAG_READY:
                dout(10, "process_connect got READY, now open\n");
+               clear_bit(CONNECTING, &con->state);
                set_bit(OPEN, &con->state);
-               if (test_bit(STANDBY, &con->state)) {
-                       dout(30, "process_connect peer_connect_seq = %d\n", 
-                            con->peer_connect_seq);
-                       dout(30, "process_connect connect_seq = %d\n", 
-                            con->connect_seq);
-/*
-                       if (con->peer_connect_seq > con->connect_seq)
-                               con->msgr->peer_reset(con);
-*/
-               }
+               break;
+       default:
+               derr(1, "process_connect protocol error, try connecting again in a bit\n");
+               con->delay = 10 * HZ;  /* maybe use default.. */
+               ceph_send_fault(con);
        }
+               
 }
 
 
-
 /*
  * read portion of accept-side handshake on a newly accepted connection
  */
@@ -870,7 +897,6 @@ static void process_accept(struct ceph_connection *con)
        existing = __get_connection(msgr, &con->peer_addr);
        if (existing) {
                dout(20, "process_accept we have existing connection\n"); 
-               //spin_lock(&existing->lock);
                /* replace existing connection? */
                if ((test_bit(CONNECTING, &existing->state) && 
                     ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) ||
@@ -892,7 +918,7 @@ static void process_accept(struct ceph_connection *con)
                        /* callback to mds */
                } else {
                        /* reject new connection */
-                       set_bit(REJECTING, &con->state);
+                       /* set_bit(REJECTING, &con->state); */
                        con->connect_seq = existing->connect_seq; /* send this with the reject */
                }
                //spin_unlock(&existing->lock);
@@ -908,9 +934,11 @@ static void process_accept(struct ceph_connection *con)
 
        /* the result? */
        clear_bit(ACCEPTING, &con->state);
+/*
        if (test_bit(REJECTING, &con->state))
                prepare_write_accept_reply(con, &tag_reject);
        else
+*/
                prepare_write_accept_reply(con, &tag_ready);
        /* queue write */
        ceph_queue_write(con);
@@ -1200,6 +1228,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo
                } else {
                        con = newcon;
                        con->peer_addr = msg->hdr.dst.addr;
+                       con->peer_name = msg->hdr.dst.name;
                        __add_connection(msgr, con);
                        dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));