]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Fixed some potential bugs in message protocol changes
authorPatience Warnick <patience@cranium.pelton.net>
Tue, 1 Apr 2008 18:45:46 +0000 (11:45 -0700)
committerPatience Warnick <patience@cranium.pelton.net>
Tue, 1 Apr 2008 18:45:46 +0000 (11:45 -0700)
src/kernel/messenger.c

index 9fd553156e58f6af063eb902230f3db9144a415a..31db6a0dec683d53aee24e3451cf18cc68b3f69c 100644 (file)
@@ -142,7 +142,7 @@ static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection
        /* inc ref count */
        atomic_inc(&con->nref);
 
-       if (test_bit(ACCEPTING, &con->state)) {
+       if (test_and_clear_bit(ACCEPTING, &con->state)) {
                list_del(&con->list_bucket);
                put_connection(con);
        } else {
@@ -223,35 +223,6 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio
        spin_unlock(&msgr->con_lock);
 }
 
-/*
- * replace another connection
- *  (old and new should be for the _same_ peer, 
- *   and thus in the same pos in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
-{
-       clear_bit(OPEN, &old->state);
-
-       /* take old connections message queue */
-       spin_lock(&old->out_queue_lock);
-       if (!list_empty(&old->out_queue)) {
-               list_splice_init(&new->out_queue, &old->out_queue);
-       }
-       spin_unlock(&old->out_queue_lock);
-
-       new->connect_seq = old->connect_seq;
-       new->out_seq = old->out_seq;
-
-       /* replace list entry */
-       spin_lock(&msgr->con_lock);
-       list_add(&new->list_bucket, &old->list_bucket);
-       list_del(&old->list_bucket);
-       spin_unlock(&msgr->con_lock);
-
-       set_bit(CLOSED, &old->state);
-       put_connection(old); /* dec reference count */
-}
-
 /*
  * atomically queue read or write work on a connection.
  * bump reference to avoid races.
@@ -857,6 +828,7 @@ static void process_connect(struct ceph_connection *con)
                     con->actual_peer_addr.nonce);
                set_bit(CLOSED, &con->state);
                remove_connection(con->msgr, con);
+               return;
        }
        switch (con->in_tag) {
        case CEPH_MSGR_TAG_RESETSESSION:
@@ -865,7 +837,6 @@ static void process_connect(struct ceph_connection *con)
                reset_connection(con);
                prepare_write_connect(con->msgr, con);
                con->msgr->peer_reset(con);
-               ceph_queue_write(con);
                break;
        case CEPH_MSGR_TAG_RETRY:
                dout(10, 
@@ -873,7 +844,6 @@ static void process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect_seq);
                prepare_write_connect(con->msgr, con); 
-               ceph_queue_write(con);
                break;
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
@@ -892,7 +862,9 @@ static void process_connect(struct ceph_connection *con)
                con->delay = 10 * HZ;  /* maybe use default.. */
                ceph_send_fault(con);
        }
-               
+       if (test_bit(WRITE_PENDING, &con->state)) {
+               ceph_queue_write(con);
+       }
 }
 
 
@@ -926,6 +898,39 @@ static int read_accept_partial(struct ceph_connection *con)
        return 1; /* done */
 }
 
+/*
+ * replace another connection
+ *  (old and new should be for the _same_ peer, 
+ *   and thus in the same pos in the radix tree)
+ */
+static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+{
+       clear_bit(OPEN, &old->state);
+
+       /* take old connections message queue */
+       spin_lock(&old->out_queue_lock);
+       if (!list_empty(&old->out_queue)) {
+               list_splice_init(&new->out_queue, &old->out_queue);
+       }
+       spin_unlock(&old->out_queue_lock);
+
+       new->connect_seq = old->connect_seq;
+       new->out_seq = old->out_seq;
+
+       /* replace list entry */
+       spin_lock(&msgr->con_lock);
+       list_add(&new->list_bucket, &old->list_bucket);
+       list_del(&old->list_bucket);
+       spin_unlock(&msgr->con_lock);
+
+       set_bit(CLOSED, &old->state);
+       put_connection(old); /* dec reference count */
+
+       set_bit(OPEN, &new->state);
+       clear_bit(ACCEPTING, &new->state);
+       prepare_write_accept_reply(new, &tag_ready);
+}
+
 /*
  * call after a new connection's handshake has completed
  */
@@ -964,9 +969,6 @@ static void process_accept(struct ceph_connection *con)
                                /* incoming connection wins.. */
                                /* replace existing with new connection */
                                __replace_connection(msgr, existing, con);
-                               set_bit(OPEN, &con->state);
-                               clear_bit(ACCEPTING, &con->state);
-                               prepare_write_accept_reply(con, &tag_ready);
                        } else {
                                /* our existing outgoing connection wins..
                                   tell peer to wait for our outgoing 
@@ -991,7 +993,6 @@ static void process_accept(struct ceph_connection *con)
                dout(20, "process_accept no existing connection, connection now OPEN\n");
                __add_connection(msgr, con);
                 set_bit(OPEN, &con->state);
-               clear_bit(ACCEPTING, &con->state);
                prepare_write_accept_reply(con, &tag_ready);
        }
 done: