]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
more message protocol changes..
authorPatience Warnick <patience@cranium.pelton.net>
Mon, 31 Mar 2008 23:43:16 +0000 (16:43 -0700)
committerPatience Warnick <patience@cranium.pelton.net>
Mon, 31 Mar 2008 23:43:16 +0000 (16:43 -0700)
src/kernel/messenger.c

index 0d38189b29a7dabe954fc258d62ba45e3ec2dcd0..9fd553156e58f6af063eb902230f3db9144a415a 100644 (file)
@@ -223,30 +223,33 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio
        spin_unlock(&msgr->con_lock);
 }
 
-
 /*
  * replace another connection
- *  (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
+ *  (old and new should be for the _same_ peer, 
+ *   and thus in the same pos in the radix tree)
  */
 static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
-{
-       spin_lock(&msgr->con_lock);
-       list_add(&new->list_bucket, &old->list_bucket);
-       list_del(&old->list_bucket);
-       spin_unlock(&msgr->con_lock);
-       put_connection(old); /* dec reference count */
-}
-
-static void replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
 {
        clear_bit(OPEN, &old->state);
+
+       /* take old connections message queue */
        spin_lock(&old->out_queue_lock);
        if (!list_empty(&old->out_queue)) {
                list_splice_init(&new->out_queue, &old->out_queue);
-               new->out_seq = old->out_seq;
        }
        spin_unlock(&old->out_queue_lock);
+
+       new->connect_seq = old->connect_seq;
+       new->out_seq = old->out_seq;
+
+       /* replace list entry */
+       spin_lock(&msgr->con_lock);
+       list_add(&new->list_bucket, &old->list_bucket);
+       list_del(&old->list_bucket);
+       spin_unlock(&msgr->con_lock);
+
        set_bit(CLOSED, &old->state);
+       put_connection(old); /* dec reference count */
 }
 
 /*
@@ -475,9 +478,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_conne
 {
        con->out_kvec[0].iov_base = &msgr->inst.addr;
        con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
-       /* con->out32 = cpu_to_le32(con->out_connect_seq); */
        con->out_connect_seq = cpu_to_le32(con->connect_seq);
-       /* con->out_kvec[1].iov_base = &con->out32; */
        con->out_kvec[1].iov_base = &con->out_connect_seq;
        con->out_kvec[1].iov_len = 4;
        con->out_kvec_left = 2;
@@ -510,9 +511,7 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag)
 {
        con->out_kvec[0].iov_base = ptag;
        con->out_kvec[0].iov_len = 1;
-       /* con->out32 = cpu_to_le32(con->out_connect_seq); */
        con->out_connect_seq = cpu_to_le32(con->connect_seq);
-       /* con->out_kvec[1].iov_base = &con->out32; */
        con->out_kvec[1].iov_base = &con->out_connect_seq;
        con->out_kvec[1].iov_len = 4;
        con->out_kvec_left = 2;
@@ -827,6 +826,26 @@ out:
        return ret; /* done */
 }
 
+/*
+ * Reset a connection
+ */
+static void reset_connection(struct ceph_connection *con)
+{
+       /* reset connection, out_queue, msg_ and connect_seq */
+       /* discard existing out_queue and msg_seq */
+       while (!list_empty(&con->out_queue)) {
+               struct ceph_msg *m;
+               m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
+               list_del(&m->list_head);
+               ceph_msg_put(m);
+       }
+       con->connect_seq = 0;
+       con->out_seq = 0;
+       con->out_msg = 0;
+       con->in_seq = 0;
+       con->in_msg = 0;
+}
+
 static void process_connect(struct ceph_connection *con)
 {
        dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -843,10 +862,7 @@ static void process_connect(struct ceph_connection *con)
        case CEPH_MSGR_TAG_RESETSESSION:
                dout(10, "process_connect got session RESET peers in_connect_seq %u\n", 
                     le32_to_cpu(con->in_connect_seq));
-               /* PW discard out_queue ? */
-               con->out_seq = 0;
-               con->in_seq = 0;
-               con->connect_seq = 0;
+               reset_connection(con);
                prepare_write_connect(con->msgr, con);
                con->msgr->peer_reset(con);
                ceph_queue_write(con);
@@ -926,42 +942,36 @@ static void process_accept(struct ceph_connection *con)
        if (existing) {
                if (peer_cseq < existing->connect_seq) {
                        if (peer_cseq == 0) {
-                               /* reset existing connection, out_queue, msg_ and connect_seq */
-                               con->connect_seq = 0;
-                               con->out_connect_seq = 0;
-                               /* PW need to discard existing outqueue and msg_seq */
-                               msgr->peer_reset(con);
+                               /* reset existing connection */
+                               reset_connection(existing);
                                /* replace connection */
                                __replace_connection(msgr, existing, con);
+                               msgr->peer_reset(con);
                        } else {
                                /* old attempt or peer didn't get the READY */
                                /* send retry with peers connect seq */
                                con->connect_seq = existing->connect_seq;
                                prepare_write_accept_retry(con, &tag_retry);
                        }
-               } else if (peer_cseq == existing->connect_seq) {
+               } else if (peer_cseq == existing->connect_seq &&
+                          (test_bit(CONNECTING, &existing->state) || 
+                           test_bit(WAIT, &existing->state))) {
                        /* connection race */
-                       dout(20, "process_accept connection race state = %lu\n", con->state);
-                       if (ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)
-                           && (test_bit(CONNECTING, &existing->state) || 
-                               test_bit(WAIT, &existing->state))) {
+                       dout(20, "process_accept connection race state = %lu\n",
+                            con->state);
+                       if (ceph_entity_addr_equal(&msgr->inst.addr, 
+                                                  &con->peer_addr)) {
                                /* incoming connection wins.. */
                                /* replace existing with new connection */
                                __replace_connection(msgr, existing, con);
-                               /* steal message queue */
-                               list_splice_init(&con->out_queue, &existing->out_queue);
-                               con->out_seq = existing->out_seq;
-                               con->connect_seq = existing->connect_seq;
                                set_bit(OPEN, &con->state);
                                clear_bit(ACCEPTING, &con->state);
-                               set_bit(CLOSED, &existing->state);
-                               clear_bit(OPEN, &existing->state);
                                prepare_write_accept_reply(con, &tag_ready);
                        } else {
-                               /* our existing outgoing connection wins.. */
-                               /* peer wait for our outgoing connection to go through */
+                               /* our existing outgoing connection wins..
+                                  tell peer to wait for our outgoing 
+                                  connection to go through */
                                prepare_write_accept_reply(con, &tag_wait);
-                               set_bit(WAIT, &con->state);
                                goto done;
                        }
                } else if (existing->connect_seq == 0 && 
@@ -970,8 +980,7 @@ static void process_accept(struct ceph_connection *con)
                        prepare_write_accept_reply(con, &tag_reset);
                        goto done;
                } else {
-                       /* reconnect case */
-                       /* replace connection */
+                       /* reconnect case, replace connection */
                        __replace_connection(msgr, existing, con);
                }
                put_connection(existing);
@@ -1139,7 +1148,7 @@ static void try_accept(void *arg)
        dout(5, "accepted connection \n");
 
        new_con->in_tag = CEPH_MSGR_TAG_READY;
-       new_con->out_connect_seq = 1;
+       new_con->connect_seq = 1;
        set_bit(ACCEPTING, &new_con->state);
        clear_bit(NEW,&new_con->state);
        prepare_write_accept_announce(msgr, new_con);