dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
+ ceph_debug_console = 1;
+ ceph_debug_msgr = 40;
+ ceph_debug = 40;
+
/* PW if never get here remove */
if (test_bit(WAIT, &con->state)) {
derr(30, "fault socket close during WAIT state\n");
set_bit(WRITE_PENDING, &con->state);
}
+static void prepare_read_connect(struct ceph_connection *con)
+{
+ con->in_base_pos = 0;
+}
+
static void prepare_write_connect(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
set_bit(WRITE_PENDING, &con->state);
}
+static void prepare_write_connect_retry(struct ceph_messenger *msgr,
+ struct ceph_connection *con)
+{
+ con->out_connect_seq = cpu_to_le32(con->connect_seq);
+ con->out_kvec[0].iov_base = &con->out_connect_seq;
+ con->out_kvec[0].iov_len = 4;
+ con->out_kvec_left = 1;
+ con->out_kvec_bytes = 4;
+ con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
+}
+
static void prepare_write_accept_announce(struct ceph_messenger *msgr,
struct ceph_connection *con)
{
if (test_and_clear_bit(STANDBY, &con->state))
con->connect_seq++;
prepare_write_connect(msgr, con);
+ prepare_read_connect(con);
set_bit(CONNECTING, &con->state);
con->in_tag = CEPH_MSGR_TAG_READY;
dout(5, "try_write initiating connect on %p new state %lu\n",
dout(10, "process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_connect_seq));
reset_connection(con);
- prepare_write_connect(con->msgr, con);
+ prepare_write_connect_retry(con->msgr, con);
con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
break;
case CEPH_MSGR_TAG_RETRY:
le32_to_cpu(con->out_connect_seq),
le32_to_cpu(con->in_connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect_seq);
- prepare_write_connect(con->msgr, con);
+ prepare_write_connect_retry(con->msgr, con);
break;
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
new->out_seq = old->out_seq;
/* replace list entry */
- spin_lock(&msgr->con_lock);
list_add(&new->list_bucket, &old->list_bucket);
list_del_init(&old->list_bucket);
- spin_unlock(&msgr->con_lock);
set_bit(CLOSED, &old->state);
put_connection(old); /* dec reference count */
prepare_write_accept_reply(con, &tag_ready);
}
spin_unlock(&msgr->con_lock);
- /* queue write */
+
ceph_queue_write(con);
put_connection(con);
}
derr(1, "kmalloc failure accepting new connection\n");
goto done;
}
+
+ new_con->connect_seq = 1;
+ set_bit(ACCEPTING, &new_con->state);
+ clear_bit(NEW, &new_con->state);
+ new_con->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */
+
if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
derr(1, "error accepting connection\n");
put_connection(new_con);
}
dout(5, "accepted connection \n");
- new_con->connect_seq = 1;
- set_bit(ACCEPTING, &new_con->state);
- clear_bit(NEW, &new_con->state);
prepare_write_accept_announce(msgr, new_con);
add_connection_accepting(msgr, new_con);