From ca1a0fa5e37f183fe0823cfec5cf4ba244d9cea1 Mon Sep 17 00:00:00 2001 From: Patience Warnick Date: Mon, 17 Mar 2008 09:30:07 -0700 Subject: [PATCH] message protocol changes for connect. --- src/kernel/messenger.c | 157 ++++++++++++++++++++++++----------------- 1 file changed, 93 insertions(+), 64 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index aa9e7cec2c03e..b408f93416a0b 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -8,17 +8,18 @@ #include "messenger.h" #include "ktcp.h" -int ceph_debug_msgr = 1; +int ceph_debug_msgr = 50; #define DOUT_VAR ceph_debug_msgr #define DOUT_PREFIX "msgr: " #include "super.h" /* static tag bytes */ static char tag_ready = CEPH_MSGR_TAG_READY; -static char tag_reject = CEPH_MSGR_TAG_REJECT; +static char tag_reset = CEPH_MSGR_TAG_RESETSESSION; +static char tag_retry = CEPH_MSGR_TAG_RETRY; +static char tag_wait = CEPH_MSGR_TAG_WAIT; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; -//static char tag_close = CEPH_MSGR_TAG_CLOSE; #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20) static void try_read(struct work_struct *); @@ -291,23 +292,48 @@ static void ceph_send_fault(struct ceph_connection *con) con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); - if (!test_and_clear_bit(CONNECTING, &con->state)){ - derr(1, "CONNECTING bit not set\n"); - /* TBD: reset buffer correctly */ - /* reset buffer */ - spin_lock(&con->out_queue_lock); - list_splice_init(&con->out_sent, &con->out_queue); - spin_unlock(&con->out_queue_lock); - clear_bit(OPEN, &con->state); + /* PW if never get here remove */ + if (test_bit(WAIT, &con->state)) { + derr(30, "ceph_send_fault received socket close during WAIT state\n"); + return; } - /* retry with delay */ - ceph_queue_delayed_write(con); + if (con->delay) { + derr(30, "ceph_send_fault tcp_close delay != 0\n"); + if (con->sock) + sock_release(con->sock); + con->sock = NULL; + set_bit(NEW, &con->state); - if (con->delay < MAX_DELAY_INTERVAL) - con->delay *= 2; - else - con->delay = MAX_DELAY_INTERVAL; + /* If there are no messages in the queue, place the connection + * in a STANDBY state otherwise retry with delay */ + if (list_empty(&con->out_queue)) { + dout(10, "setting STANDBY bit\n"); + set_bit(STANDBY, &con->state); + return; + } + + if (!test_and_clear_bit(CONNECTING, &con->state)){ + derr(1, "CONNECTING bit not set\n"); + /* TBD: reset buffer correctly */ + /* reset buffer */ + spin_lock(&con->out_queue_lock); + list_splice_init(&con->out_sent, &con->out_queue); + spin_unlock(&con->out_queue_lock); + clear_bit(OPEN, &con->state); + } + + /* retry with delay */ + ceph_queue_delayed_write(con); + + if (con->delay < MAX_DELAY_INTERVAL) + con->delay *= 2; + else + con->delay = MAX_DELAY_INTERVAL; + } else { + dout(30, "ceph_send_fault tcp_close delay = 0\n"); + remove_connection(con->msgr, con); + } } @@ -495,31 +521,10 @@ static void try_write(void *arg) if (test_bit(CLOSED, &con->state)) { dout(5, "try_write closed\n"); - remove_connection(msgr, con); goto done; } if (test_and_clear_bit(SOCK_CLOSE, &con->state)) { - dout(5, "try_write TCP_CLOSE received\n"); - if (!con->delay) { - dout(30, "try_write tcp_close delay = 0\n"); - remove_connection(msgr, con); - goto done; - } - dout(30, "try_write tcp_close delay != 0\n"); - if (con->sock) - sock_release(con->sock); - con->sock = NULL; - set_bit(NEW, &con->state); - - /* If there are no messages in the queue, place the connection - * in a STANDBY state otherwise retry with delay */ - if (list_empty(&con->out_queue)) { - dout(10, "setting STANDBY bit\n"); - set_bit(STANDBY, &con->state); - goto done; - } - ceph_send_fault(con); goto done; } @@ -528,11 +533,12 @@ more: /* initiate connect? */ if (test_and_clear_bit(NEW, &con->state)) { + if (test_and_clear_bit(STANDBY, &con->state)) + con->connect_seq++; prepare_write_connect(msgr, con); set_bit(CONNECTING, &con->state); dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state); ret = ceph_tcp_connect(con); - dout(30, "try_write returned from connect ret = %d state = %lu\n", ret, con->state); if (ret < 0) { derr(1, "try_write tcp connect error %d\n", ret); remove_connection(msgr, con); @@ -545,17 +551,20 @@ more: ret = write_partial_kvec(con); if (ret == 0) goto done; - if (test_and_clear_bit(REJECTING, &con->state)) { - dout(30, "try_write done rejecting, state %lu, closing\n", con->state); - /* FIXME do something else here, pbly? */ - remove_connection(msgr, con); - } if (ret < 0) { dout(30, "try_write write_partial_kvec returned error %d\n", ret); goto done; } } + /* check if connect handshake finished.. if not requeue and return.. */ +/* + if (!test_bit(OPEN, &con->state)) { + dout(5, "try_write state = %lu, need to requeue and exit\n", con->state); + goto done; + } +*/ + /* msg pages? */ if (con->out_msg) { ret = write_partial_msg_pages(con, con->out_msg); @@ -788,44 +797,62 @@ static int read_connect_partial(struct ceph_connection *con) ret = 1; out: dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret); - dout(20, "read_connect_partial peer_connect_seq = %d\n", con->peer_connect_seq); + dout(20, "read_connect_partial peer_connect_seq = %u\n", con->peer_connect_seq); return ret; /* done */ } static void process_connect(struct ceph_connection *con) { dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag); - clear_bit(CONNECTING, &con->state); if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) { derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n", IPQUADPORT(con->peer_addr.ipaddr), con->peer_addr.nonce, IPQUADPORT(con->actual_peer_addr.ipaddr), con->actual_peer_addr.nonce); - con->in_tag = CEPH_MSGR_TAG_REJECT; - } - if (con->in_tag == CEPH_MSGR_TAG_REJECT) { - dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq); set_bit(CLOSED, &con->state); + remove_connection(con->msgr, con); } - if (con->in_tag == CEPH_MSGR_TAG_READY) { + switch (con->in_tag) { + case CEPH_MSGR_TAG_RESETSESSION: + dout(10, "process_connect got session RESET peer_connect_seq %u\n", + con->peer_connect_seq); + /* PW discard out_queue ? */ + con->out_seq = 0; + con->in_seq = 0; + con->connect_seq = 0; + prepare_write_connect(con->msgr, con); + con->msgr->peer_reset(con); + ceph_queue_write(con); + break; + case CEPH_MSGR_TAG_RETRY: + dout(10, "process_connect got session RETRY connect_seq = %u, + peer_connect_seq = %u\n", con->connect_seq, con->peer_connect_seq); + con->connect_seq = con->peer_connect_seq; + prepare_write_connect(con->msgr, con); + ceph_queue_write(con); + break; + case CEPH_MSGR_TAG_WAIT: + dout(10, "process_connect peer connecting WAIT\n"); + set_bit(WAIT, &con->state); + if (con->sock) + sock_release(con->sock); + con->sock = NULL; + break; + case CEPH_MSGR_TAG_READY: dout(10, "process_connect got READY, now open\n"); + clear_bit(CONNECTING, &con->state); set_bit(OPEN, &con->state); - if (test_bit(STANDBY, &con->state)) { - dout(30, "process_connect peer_connect_seq = %d\n", - con->peer_connect_seq); - dout(30, "process_connect connect_seq = %d\n", - con->connect_seq); -/* - if (con->peer_connect_seq > con->connect_seq) - con->msgr->peer_reset(con); -*/ - } + break; + default: + derr(1, "process_connect protocol error, try connecting again in a bit\n"); + con->delay = 10 * HZ; /* maybe use default.. */ + ceph_send_fault(con); } + } - /* * read portion of accept-side handshake on a newly accepted connection */ @@ -870,7 +897,6 @@ static void process_accept(struct ceph_connection *con) existing = __get_connection(msgr, &con->peer_addr); if (existing) { dout(20, "process_accept we have existing connection\n"); - //spin_lock(&existing->lock); /* replace existing connection? */ if ((test_bit(CONNECTING, &existing->state) && ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) || @@ -892,7 +918,7 @@ static void process_accept(struct ceph_connection *con) /* callback to mds */ } else { /* reject new connection */ - set_bit(REJECTING, &con->state); + /* set_bit(REJECTING, &con->state); */ con->connect_seq = existing->connect_seq; /* send this with the reject */ } //spin_unlock(&existing->lock); @@ -908,9 +934,11 @@ static void process_accept(struct ceph_connection *con) /* the result? */ clear_bit(ACCEPTING, &con->state); +/* if (test_bit(REJECTING, &con->state)) prepare_write_accept_reply(con, &tag_reject); else +*/ prepare_write_accept_reply(con, &tag_ready); /* queue write */ ceph_queue_write(con); @@ -1200,6 +1228,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo } else { con = newcon; con->peer_addr = msg->hdr.dst.addr; + con->peer_name = msg->hdr.dst.name; __add_connection(msgr, con); dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con, IPQUADPORT(msg->hdr.dst.addr.ipaddr)); -- 2.39.5