/* inc ref count */
atomic_inc(&con->nref);
- if (test_bit(ACCEPTING, &con->state)) {
+ if (test_and_clear_bit(ACCEPTING, &con->state)) {
list_del(&con->list_bucket);
put_connection(con);
} else {
spin_unlock(&msgr->con_lock);
}
-/*
- * replace another connection
- * (old and new should be for the _same_ peer,
- * and thus in the same pos in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
-{
- clear_bit(OPEN, &old->state);
-
- /* take old connections message queue */
- spin_lock(&old->out_queue_lock);
- if (!list_empty(&old->out_queue)) {
- list_splice_init(&new->out_queue, &old->out_queue);
- }
- spin_unlock(&old->out_queue_lock);
-
- new->connect_seq = old->connect_seq;
- new->out_seq = old->out_seq;
-
- /* replace list entry */
- spin_lock(&msgr->con_lock);
- list_add(&new->list_bucket, &old->list_bucket);
- list_del(&old->list_bucket);
- spin_unlock(&msgr->con_lock);
-
- set_bit(CLOSED, &old->state);
- put_connection(old); /* dec reference count */
-}
-
/*
* atomically queue read or write work on a connection.
* bump reference to avoid races.
con->actual_peer_addr.nonce);
set_bit(CLOSED, &con->state);
remove_connection(con->msgr, con);
+ return;
}
switch (con->in_tag) {
case CEPH_MSGR_TAG_RESETSESSION:
reset_connection(con);
prepare_write_connect(con->msgr, con);
con->msgr->peer_reset(con);
- ceph_queue_write(con);
break;
case CEPH_MSGR_TAG_RETRY:
dout(10,
le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect_seq);
prepare_write_connect(con->msgr, con);
- ceph_queue_write(con);
break;
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
con->delay = 10 * HZ; /* maybe use default.. */
ceph_send_fault(con);
}
-
+ if (test_bit(WRITE_PENDING, &con->state)) {
+ ceph_queue_write(con);
+ }
}
return 1; /* done */
}
+/*
+ * replace another connection
+ * (old and new should be for the _same_ peer,
+ * and thus in the same pos in the radix tree)
+ */
+static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+{
+ clear_bit(OPEN, &old->state);
+
+ /* take old connections message queue */
+ spin_lock(&old->out_queue_lock);
+ if (!list_empty(&old->out_queue)) {
+ list_splice_init(&new->out_queue, &old->out_queue);
+ }
+ spin_unlock(&old->out_queue_lock);
+
+ new->connect_seq = old->connect_seq;
+ new->out_seq = old->out_seq;
+
+ /* replace list entry */
+ spin_lock(&msgr->con_lock);
+ list_add(&new->list_bucket, &old->list_bucket);
+ list_del(&old->list_bucket);
+ spin_unlock(&msgr->con_lock);
+
+ set_bit(CLOSED, &old->state);
+ put_connection(old); /* dec reference count */
+
+ set_bit(OPEN, &new->state);
+ clear_bit(ACCEPTING, &new->state);
+ prepare_write_accept_reply(new, &tag_ready);
+}
+
/*
* call after a new connection's handshake has completed
*/
/* incoming connection wins.. */
/* replace existing with new connection */
__replace_connection(msgr, existing, con);
- set_bit(OPEN, &con->state);
- clear_bit(ACCEPTING, &con->state);
- prepare_write_accept_reply(con, &tag_ready);
} else {
/* our existing outgoing connection wins..
tell peer to wait for our outgoing
dout(20, "process_accept no existing connection, connection now OPEN\n");
__add_connection(msgr, con);
set_bit(OPEN, &con->state);
- clear_bit(ACCEPTING, &con->state);
prepare_write_accept_reply(con, &tag_ready);
}
done: