spin_unlock(&msgr->con_lock);
}
-
/*
* replace another connection
- * (old and new should be for the _same_ peer, and thus in the same pos in the radix tree)
+ * (old and new should be for the _same_ peer,
+ * and thus in the same pos in the radix tree)
*/
static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
-{
- spin_lock(&msgr->con_lock);
- list_add(&new->list_bucket, &old->list_bucket);
- list_del(&old->list_bucket);
- spin_unlock(&msgr->con_lock);
- put_connection(old); /* dec reference count */
-}
-
-static void replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
{
clear_bit(OPEN, &old->state);
+
+ /* take old connections message queue */
spin_lock(&old->out_queue_lock);
if (!list_empty(&old->out_queue)) {
list_splice_init(&new->out_queue, &old->out_queue);
- new->out_seq = old->out_seq;
}
spin_unlock(&old->out_queue_lock);
+
+ new->connect_seq = old->connect_seq;
+ new->out_seq = old->out_seq;
+
+ /* replace list entry */
+ spin_lock(&msgr->con_lock);
+ list_add(&new->list_bucket, &old->list_bucket);
+ list_del(&old->list_bucket);
+ spin_unlock(&msgr->con_lock);
+
set_bit(CLOSED, &old->state);
+ put_connection(old); /* dec reference count */
}
/*
{
con->out_kvec[0].iov_base = &msgr->inst.addr;
con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
- /* con->out32 = cpu_to_le32(con->out_connect_seq); */
con->out_connect_seq = cpu_to_le32(con->connect_seq);
- /* con->out_kvec[1].iov_base = &con->out32; */
con->out_kvec[1].iov_base = &con->out_connect_seq;
con->out_kvec[1].iov_len = 4;
con->out_kvec_left = 2;
{
con->out_kvec[0].iov_base = ptag;
con->out_kvec[0].iov_len = 1;
- /* con->out32 = cpu_to_le32(con->out_connect_seq); */
con->out_connect_seq = cpu_to_le32(con->connect_seq);
- /* con->out_kvec[1].iov_base = &con->out32; */
con->out_kvec[1].iov_base = &con->out_connect_seq;
con->out_kvec[1].iov_len = 4;
con->out_kvec_left = 2;
return ret; /* done */
}
+/*
+ * Reset a connection
+ */
+static void reset_connection(struct ceph_connection *con)
+{
+ /* reset connection, out_queue, msg_ and connect_seq */
+ /* discard existing out_queue and msg_seq */
+ while (!list_empty(&con->out_queue)) {
+ struct ceph_msg *m;
+ m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
+ list_del(&m->list_head);
+ ceph_msg_put(m);
+ }
+ con->connect_seq = 0;
+ con->out_seq = 0;
+ con->out_msg = 0;
+ con->in_seq = 0;
+ con->in_msg = 0;
+}
+
static void process_connect(struct ceph_connection *con)
{
dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
case CEPH_MSGR_TAG_RESETSESSION:
dout(10, "process_connect got session RESET peers in_connect_seq %u\n",
le32_to_cpu(con->in_connect_seq));
- /* PW discard out_queue ? */
- con->out_seq = 0;
- con->in_seq = 0;
- con->connect_seq = 0;
+ reset_connection(con);
prepare_write_connect(con->msgr, con);
con->msgr->peer_reset(con);
ceph_queue_write(con);
if (existing) {
if (peer_cseq < existing->connect_seq) {
if (peer_cseq == 0) {
- /* reset existing connection, out_queue, msg_ and connect_seq */
- con->connect_seq = 0;
- con->out_connect_seq = 0;
- /* PW need to discard existing outqueue and msg_seq */
- msgr->peer_reset(con);
+ /* reset existing connection */
+ reset_connection(existing);
/* replace connection */
__replace_connection(msgr, existing, con);
+ msgr->peer_reset(con);
} else {
/* old attempt or peer didn't get the READY */
/* send retry with peers connect seq */
con->connect_seq = existing->connect_seq;
prepare_write_accept_retry(con, &tag_retry);
}
- } else if (peer_cseq == existing->connect_seq) {
+ } else if (peer_cseq == existing->connect_seq &&
+ (test_bit(CONNECTING, &existing->state) ||
+ test_bit(WAIT, &existing->state))) {
/* connection race */
- dout(20, "process_accept connection race state = %lu\n", con->state);
- if (ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)
- && (test_bit(CONNECTING, &existing->state) ||
- test_bit(WAIT, &existing->state))) {
+ dout(20, "process_accept connection race state = %lu\n",
+ con->state);
+ if (ceph_entity_addr_equal(&msgr->inst.addr,
+ &con->peer_addr)) {
/* incoming connection wins.. */
/* replace existing with new connection */
__replace_connection(msgr, existing, con);
- /* steal message queue */
- list_splice_init(&con->out_queue, &existing->out_queue);
- con->out_seq = existing->out_seq;
- con->connect_seq = existing->connect_seq;
set_bit(OPEN, &con->state);
clear_bit(ACCEPTING, &con->state);
- set_bit(CLOSED, &existing->state);
- clear_bit(OPEN, &existing->state);
prepare_write_accept_reply(con, &tag_ready);
} else {
- /* our existing outgoing connection wins.. */
- /* peer wait for our outgoing connection to go through */
+ /* our existing outgoing connection wins..
+ tell peer to wait for our outgoing
+ connection to go through */
prepare_write_accept_reply(con, &tag_wait);
- set_bit(WAIT, &con->state);
goto done;
}
} else if (existing->connect_seq == 0 &&
prepare_write_accept_reply(con, &tag_reset);
goto done;
} else {
- /* reconnect case */
- /* replace connection */
+ /* reconnect case, replace connection */
__replace_connection(msgr, existing, con);
}
put_connection(existing);
dout(5, "accepted connection \n");
new_con->in_tag = CEPH_MSGR_TAG_READY;
- new_con->out_connect_seq = 1;
+ new_con->connect_seq = 1;
set_bit(ACCEPTING, &new_con->state);
clear_bit(NEW,&new_con->state);
prepare_write_accept_announce(msgr, new_con);