#include "messenger.h"
#include "ktcp.h"
-int ceph_debug_msgr = 1;
+int ceph_debug_msgr = 50;
#define DOUT_VAR ceph_debug_msgr
#define DOUT_PREFIX "msgr: "
#include "super.h"
/* static tag bytes */
static char tag_ready = CEPH_MSGR_TAG_READY;
-static char tag_reject = CEPH_MSGR_TAG_REJECT;
+static char tag_reset = CEPH_MSGR_TAG_RESETSESSION;
+static char tag_retry = CEPH_MSGR_TAG_RETRY;
+static char tag_wait = CEPH_MSGR_TAG_WAIT;
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
-//static char tag_close = CEPH_MSGR_TAG_CLOSE;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
static void try_read(struct work_struct *);
con, con->state,
IPQUADPORT(con->peer_addr.ipaddr));
- if (!test_and_clear_bit(CONNECTING, &con->state)){
- derr(1, "CONNECTING bit not set\n");
- /* TBD: reset buffer correctly */
- /* reset buffer */
- spin_lock(&con->out_queue_lock);
- list_splice_init(&con->out_sent, &con->out_queue);
- spin_unlock(&con->out_queue_lock);
- clear_bit(OPEN, &con->state);
+ /* PW if never get here remove */
+ if (test_bit(WAIT, &con->state)) {
+ derr(30, "ceph_send_fault received socket close during WAIT state\n");
+ return;
}
- /* retry with delay */
- ceph_queue_delayed_write(con);
+ if (con->delay) {
+ derr(30, "ceph_send_fault tcp_close delay != 0\n");
+ if (con->sock)
+ sock_release(con->sock);
+ con->sock = NULL;
+ set_bit(NEW, &con->state);
- if (con->delay < MAX_DELAY_INTERVAL)
- con->delay *= 2;
- else
- con->delay = MAX_DELAY_INTERVAL;
+ /* If there are no messages in the queue, place the connection
+ * in a STANDBY state otherwise retry with delay */
+ if (list_empty(&con->out_queue)) {
+ dout(10, "setting STANDBY bit\n");
+ set_bit(STANDBY, &con->state);
+ return;
+ }
+
+ if (!test_and_clear_bit(CONNECTING, &con->state)){
+ derr(1, "CONNECTING bit not set\n");
+ /* TBD: reset buffer correctly */
+ /* reset buffer */
+ spin_lock(&con->out_queue_lock);
+ list_splice_init(&con->out_sent, &con->out_queue);
+ spin_unlock(&con->out_queue_lock);
+ clear_bit(OPEN, &con->state);
+ }
+
+ /* retry with delay */
+ ceph_queue_delayed_write(con);
+
+ if (con->delay < MAX_DELAY_INTERVAL)
+ con->delay *= 2;
+ else
+ con->delay = MAX_DELAY_INTERVAL;
+ } else {
+ dout(30, "ceph_send_fault tcp_close delay = 0\n");
+ remove_connection(con->msgr, con);
+ }
}
if (test_bit(CLOSED, &con->state)) {
dout(5, "try_write closed\n");
- remove_connection(msgr, con);
goto done;
}
if (test_and_clear_bit(SOCK_CLOSE, &con->state)) {
- dout(5, "try_write TCP_CLOSE received\n");
- if (!con->delay) {
- dout(30, "try_write tcp_close delay = 0\n");
- remove_connection(msgr, con);
- goto done;
- }
- dout(30, "try_write tcp_close delay != 0\n");
- if (con->sock)
- sock_release(con->sock);
- con->sock = NULL;
- set_bit(NEW, &con->state);
-
- /* If there are no messages in the queue, place the connection
- * in a STANDBY state otherwise retry with delay */
- if (list_empty(&con->out_queue)) {
- dout(10, "setting STANDBY bit\n");
- set_bit(STANDBY, &con->state);
- goto done;
- }
-
ceph_send_fault(con);
goto done;
}
/* initiate connect? */
if (test_and_clear_bit(NEW, &con->state)) {
+ if (test_and_clear_bit(STANDBY, &con->state))
+ con->connect_seq++;
prepare_write_connect(msgr, con);
set_bit(CONNECTING, &con->state);
dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
ret = ceph_tcp_connect(con);
- dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state);
if (ret < 0) {
derr(1, "try_write tcp connect error %d\n", ret);
remove_connection(msgr, con);
ret = write_partial_kvec(con);
if (ret == 0)
goto done;
- if (test_and_clear_bit(REJECTING, &con->state)) {
- dout(30, "try_write done rejecting, state %lu, closing\n", con->state);
- /* FIXME do something else here, pbly? */
- remove_connection(msgr, con);
- }
if (ret < 0) {
dout(30, "try_write write_partial_kvec returned error %d\n", ret);
goto done;
}
}
+ /* check if connect handshake finished.. if not requeue and return.. */
+/*
+ if (!test_bit(OPEN, &con->state)) {
+ dout(5, "try_write state = %lu, need to requeue and exit\n", con->state);
+ goto done;
+ }
+*/
+
/* msg pages? */
if (con->out_msg) {
ret = write_partial_msg_pages(con, con->out_msg);
ret = 1;
out:
dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
- dout(20, "read_connect_partial peer_connect_seq = %d\n", con->peer_connect_seq);
+ dout(20, "read_connect_partial peer_connect_seq = %u\n", con->peer_connect_seq);
return ret; /* done */
}
static void process_connect(struct ceph_connection *con)
{
dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
- clear_bit(CONNECTING, &con->state);
if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n",
IPQUADPORT(con->peer_addr.ipaddr),
con->peer_addr.nonce,
IPQUADPORT(con->actual_peer_addr.ipaddr),
con->actual_peer_addr.nonce);
- con->in_tag = CEPH_MSGR_TAG_REJECT;
- }
- if (con->in_tag == CEPH_MSGR_TAG_REJECT) {
- dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq);
set_bit(CLOSED, &con->state);
+ remove_connection(con->msgr, con);
}
- if (con->in_tag == CEPH_MSGR_TAG_READY) {
+ switch (con->in_tag) {
+ case CEPH_MSGR_TAG_RESETSESSION:
+ dout(10, "process_connect got session RESET peer_connect_seq %u\n",
+ con->peer_connect_seq);
+ /* PW discard out_queue ? */
+ con->out_seq = 0;
+ con->in_seq = 0;
+ con->connect_seq = 0;
+ prepare_write_connect(con->msgr, con);
+ con->msgr->peer_reset(con);
+ ceph_queue_write(con);
+ break;
+ case CEPH_MSGR_TAG_RETRY:
+ dout(10, "process_connect got session RETRY connect_seq = %u,
+ peer_connect_seq = %u\n", con->connect_seq, con->peer_connect_seq);
+ con->connect_seq = con->peer_connect_seq;
+ prepare_write_connect(con->msgr, con);
+ ceph_queue_write(con);
+ break;
+ case CEPH_MSGR_TAG_WAIT:
+ dout(10, "process_connect peer connecting WAIT\n");
+ set_bit(WAIT, &con->state);
+ if (con->sock)
+ sock_release(con->sock);
+ con->sock = NULL;
+ break;
+ case CEPH_MSGR_TAG_READY:
dout(10, "process_connect got READY, now open\n");
+ clear_bit(CONNECTING, &con->state);
set_bit(OPEN, &con->state);
- if (test_bit(STANDBY, &con->state)) {
- dout(30, "process_connect peer_connect_seq = %d\n",
- con->peer_connect_seq);
- dout(30, "process_connect connect_seq = %d\n",
- con->connect_seq);
-/*
- if (con->peer_connect_seq > con->connect_seq)
- con->msgr->peer_reset(con);
-*/
- }
+ break;
+ default:
+ derr(1, "process_connect protocol error, try connecting again in a bit\n");
+ con->delay = 10 * HZ; /* maybe use default.. */
+ ceph_send_fault(con);
}
+
}
-
/*
* read portion of accept-side handshake on a newly accepted connection
*/
existing = __get_connection(msgr, &con->peer_addr);
if (existing) {
dout(20, "process_accept we have existing connection\n");
- //spin_lock(&existing->lock);
/* replace existing connection? */
if ((test_bit(CONNECTING, &existing->state) &&
ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) ||
/* callback to mds */
} else {
/* reject new connection */
- set_bit(REJECTING, &con->state);
+ /* set_bit(REJECTING, &con->state); */
con->connect_seq = existing->connect_seq; /* send this with the reject */
}
//spin_unlock(&existing->lock);
/* the result? */
clear_bit(ACCEPTING, &con->state);
+/*
if (test_bit(REJECTING, &con->state))
prepare_write_accept_reply(con, &tag_reject);
else
+*/
prepare_write_accept_reply(con, &tag_ready);
/* queue write */
ceph_queue_write(con);
} else {
con = newcon;
con->peer_addr = msg->hdr.dst.addr;
+ con->peer_name = msg->hdr.dst.name;
__add_connection(msgr, con);
dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));