From 197f1aebd371f1fd78dc22b5df61e403dd1ed0ae Mon Sep 17 00:00:00 2001 From: Patience Warnick Date: Tue, 18 Mar 2008 12:54:07 -0700 Subject: [PATCH] Some process_accept changes for message protocol chnages.. --- src/kernel/messenger.c | 79 ++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 41 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index bb98fc4ed4ae3..53529fc9f0d4a 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -894,61 +894,58 @@ static void process_accept(struct ceph_connection *con) struct ceph_connection *existing; struct ceph_messenger *msgr = con->msgr; - /* do we already have a connection for this peer? */ + /* do we have an existing connection for this peer? */ radix_tree_preload(GFP_KERNEL); spin_lock(&msgr->con_lock); existing = __get_connection(msgr, &con->peer_addr); if (existing) { - dout(20, "process_accept we have existing connection\n"); - /* replace existing connection? */ - if ((test_bit(CONNECTING, &existing->state) && - ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) || - (test_bit(OPEN, &existing->state) && - con->connect_seq == existing->connect_seq)) { - /* replace existing with new connection */ - __replace_connection(msgr, existing, con); - /* steal message queue */ - list_splice_init(&con->out_queue, &existing->out_queue); /* fixme order */ - con->out_seq = existing->out_seq; - set_bit(OPEN, &con->state); - set_bit(CLOSED, &existing->state); - clear_bit(OPEN, &existing->state); - } else if (test_bit(STANDBY, &existing->state) && - existing->connect_seq != con->connect_seq) { - dout(20, "process_accept connect_seq mismatchi con = %d, existing = %d\n", - con->connect_seq, existing->connect_seq); - msgr->peer_reset(con); - /* callback to mds */ - } else { - /* reject new connection */ - /* set_bit(REJECTING, &con->state); */ - con->connect_seq = existing->connect_seq; /* send this with the reject */ + if (existing->connect_seq > con->connect_seq) { + /* old attempt or peer didn't get the READY */ + /* send retry with peers connect seq */ + con->connect_seq = existing->connect_seq; + prepare_write_accept_reply(con, &tag_retry); + set_bit(RETRY, &con->state); + } else if (existing->connect_seq == con->connect_seq) { + /* connection race */ + /* PW not sure we need this.. */ + /* if (test_bit(CONNECTING, &existing->state) && */ + if (ceph_entity_addr_equal(&msgr->inst.addr, &con->peer_addr)) { + /* replace existing with new connection */ + __replace_connection(msgr, existing, con); + /* steal message queue */ + list_splice_init(&con->out_queue, &existing->out_queue); + con->out_seq = existing->out_seq; + set_bit(OPEN, &con->state); + clear_bit(ACCEPTING, &con->state); + set_bit(CLOSED, &existing->state); + clear_bit(OPEN, &existing->state); + prepare_write_accept_reply(con, &tag_ready); + } else { + /* peer wait for our outgoing connection to go through */ + prepare_write_accept_reply(con, &tag_wait); + set_bit(WAIT, &con->state); + } + } else if (con->peer_connect_seq == 0 && + test_bit(STANDBY, &existing->state)) { + /* peer reset opening new connection */ + msgr->peer_reset(con); + /* discard existing outqueue and msg_seq */ + } else if (existing->connect_seq < con->connect_seq) { + /* reconnect case */ } - //spin_unlock(&existing->lock); put_connection(existing); + } else if (con->peer_connect_seq > 0) { + prepare_write_accept_reply(con, &tag_reset); } else { - if (con->peer_connect_seq > con->connect_seq) - msgr->peer_reset(con); - dout(20, "process_accept no existing connection\n"); - __add_connection(msgr, con); - set_bit(OPEN, &con->state); + clear_bit(ACCEPTING, &con->state); + prepare_write_accept_reply(con, &tag_ready); } spin_unlock(&msgr->con_lock); - - /* the result? */ - clear_bit(ACCEPTING, &con->state); -/* - if (test_bit(REJECTING, &con->state)) - prepare_write_accept_reply(con, &tag_reject); - else -*/ - prepare_write_accept_reply(con, &tag_ready); /* queue write */ ceph_queue_write(con); put_connection(con); } - /* * worker function when data is available on the socket */ -- 2.39.5