From: Patience Warnick Date: Tue, 1 Apr 2008 18:45:46 +0000 (-0700) Subject: Fixed some potential bugs in message protocol changes X-Git-Tag: v0.2~229^2~3^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e41b4cc5283cd369c121197e97c3b4365c15e6cc;p=ceph.git Fixed some potential bugs in message protocol changes --- diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 9fd553156e5..31db6a0dec6 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -142,7 +142,7 @@ static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection /* inc ref count */ atomic_inc(&con->nref); - if (test_bit(ACCEPTING, &con->state)) { + if (test_and_clear_bit(ACCEPTING, &con->state)) { list_del(&con->list_bucket); put_connection(con); } else { @@ -223,35 +223,6 @@ static void remove_connection(struct ceph_messenger *msgr, struct ceph_connectio spin_unlock(&msgr->con_lock); } -/* - * replace another connection - * (old and new should be for the _same_ peer, - * and thus in the same pos in the radix tree) - */ -static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new) -{ - clear_bit(OPEN, &old->state); - - /* take old connections message queue */ - spin_lock(&old->out_queue_lock); - if (!list_empty(&old->out_queue)) { - list_splice_init(&new->out_queue, &old->out_queue); - } - spin_unlock(&old->out_queue_lock); - - new->connect_seq = old->connect_seq; - new->out_seq = old->out_seq; - - /* replace list entry */ - spin_lock(&msgr->con_lock); - list_add(&new->list_bucket, &old->list_bucket); - list_del(&old->list_bucket); - spin_unlock(&msgr->con_lock); - - set_bit(CLOSED, &old->state); - put_connection(old); /* dec reference count */ -} - /* * atomically queue read or write work on a connection. * bump reference to avoid races. @@ -857,6 +828,7 @@ static void process_connect(struct ceph_connection *con) con->actual_peer_addr.nonce); set_bit(CLOSED, &con->state); remove_connection(con->msgr, con); + return; } switch (con->in_tag) { case CEPH_MSGR_TAG_RESETSESSION: @@ -865,7 +837,6 @@ static void process_connect(struct ceph_connection *con) reset_connection(con); prepare_write_connect(con->msgr, con); con->msgr->peer_reset(con); - ceph_queue_write(con); break; case CEPH_MSGR_TAG_RETRY: dout(10, @@ -873,7 +844,6 @@ static void process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect_seq); prepare_write_connect(con->msgr, con); - ceph_queue_write(con); break; case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); @@ -892,7 +862,9 @@ static void process_connect(struct ceph_connection *con) con->delay = 10 * HZ; /* maybe use default.. */ ceph_send_fault(con); } - + if (test_bit(WRITE_PENDING, &con->state)) { + ceph_queue_write(con); + } } @@ -926,6 +898,39 @@ static int read_accept_partial(struct ceph_connection *con) return 1; /* done */ } +/* + * replace another connection + * (old and new should be for the _same_ peer, + * and thus in the same pos in the radix tree) + */ +static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new) +{ + clear_bit(OPEN, &old->state); + + /* take old connections message queue */ + spin_lock(&old->out_queue_lock); + if (!list_empty(&old->out_queue)) { + list_splice_init(&new->out_queue, &old->out_queue); + } + spin_unlock(&old->out_queue_lock); + + new->connect_seq = old->connect_seq; + new->out_seq = old->out_seq; + + /* replace list entry */ + spin_lock(&msgr->con_lock); + list_add(&new->list_bucket, &old->list_bucket); + list_del(&old->list_bucket); + spin_unlock(&msgr->con_lock); + + set_bit(CLOSED, &old->state); + put_connection(old); /* dec reference count */ + + set_bit(OPEN, &new->state); + clear_bit(ACCEPTING, &new->state); + prepare_write_accept_reply(new, &tag_ready); +} + /* * call after a new connection's handshake has completed */ @@ -964,9 +969,6 @@ static void process_accept(struct ceph_connection *con) /* incoming connection wins.. */ /* replace existing with new connection */ __replace_connection(msgr, existing, con); - set_bit(OPEN, &con->state); - clear_bit(ACCEPTING, &con->state); - prepare_write_accept_reply(con, &tag_ready); } else { /* our existing outgoing connection wins.. tell peer to wait for our outgoing @@ -991,7 +993,6 @@ static void process_accept(struct ceph_connection *con) dout(20, "process_accept no existing connection, connection now OPEN\n"); __add_connection(msgr, con); set_bit(OPEN, &con->state); - clear_bit(ACCEPTING, &con->state); prepare_write_accept_reply(con, &tag_ready); } done: