From 8e356bf4034d49660e21b2e8fec7cc0b79f4cc85 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 28 Aug 2009 14:27:18 -0700 Subject: [PATCH] kclient: make con_close, con_reopen work --- src/kernel/messenger.c | 108 +++++++++++++++++++++++------------------ src/kernel/messenger.h | 3 ++ 2 files changed, 63 insertions(+), 48 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 1f42b110fb99f..56c00c3a6dcc1 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -232,6 +232,25 @@ static int con_close_socket(struct ceph_connection *con) return rc; } +/* + * Reset a connection. Discard all incoming and outgoing messages + * and clear *_seq state. + */ +static void reset_connection(struct ceph_connection *con) +{ + /* reset connection, out_queue, msg_ and connect_seq */ + /* discard existing out_queue and msg_seq */ + spin_lock(&con->out_queue_lock); + ceph_msg_put_list(&con->out_queue); + ceph_msg_put_list(&con->out_sent); + + con->connect_seq = 0; + con->out_seq = 0; + con->out_msg = NULL; + con->in_seq = 0; + spin_unlock(&con->out_queue_lock); +} + /* * mark a peer down. drop any open connections. */ @@ -240,6 +259,7 @@ void ceph_con_close(struct ceph_connection *con) dout("close %p peer %u.%u.%u.%u:%u\n", con, IPQUADPORT(con->peer_addr.ipaddr)); set_bit(CLOSED, &con->state); /* in case there's queued work */ + reset_connection(con); queue_con(con); } @@ -249,12 +269,24 @@ void ceph_con_close(struct ceph_connection *con) void ceph_con_destroy(struct ceph_connection *con) { dout("con_destroy %p destroying\n", con); - ceph_msg_put_list(&con->out_queue); - ceph_msg_put_list(&con->out_sent); - set_bit(CLOSED, &con->state); + reset_connection(con); + set_bit(DEAD, &con->state); con_close_socket(con); /* silently ignore errors */ } +/* + * Reopen a closed connection, with a new peer address. + */ +void ceph_con_reopen(struct ceph_connection *con, struct ceph_entity_addr *addr) +{ + dout("con_reopen %p %u.%u.%u.%u:%u\n", con, IPQUADPORT(addr->ipaddr)); + BUG_ON(!test_bit(CLOSED, &con->state)); + set_bit(REOPEN, &con->state); + clear_bit(CLOSED, &con->state); + memcpy(&con->peer_addr, addr, sizeof(*addr)); + queue_con(con); +} + /* * generic get/put */ @@ -727,31 +759,6 @@ static int verify_hello(struct ceph_connection *con) return 0; } -/* - * Reset a connection. Discard all incoming and outgoing messages - * and clear *_seq state. - */ -static void reset_connection(struct ceph_connection *con) -{ - pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n", - ENTITY_NAME(con->peer_name), - IPQUADPORT(con->peer_addr.ipaddr)); - - /* reset connection, out_queue, msg_ and connect_seq */ - /* discard existing out_queue and msg_seq */ - spin_lock(&con->out_queue_lock); - ceph_msg_put_list(&con->out_queue); - ceph_msg_put_list(&con->out_sent); - - con->connect_seq = 0; - con->out_seq = 0; - con->out_msg = NULL; - con->in_seq = 0; - con->in_msg = NULL; - spin_unlock(&con->out_queue_lock); -} - - static int process_connect(struct ceph_connection *con) { dout("process_connect on %p tag %d\n", con, (int)con->in_tag); @@ -791,6 +798,9 @@ static int process_connect(struct ceph_connection *con) */ dout("process_connect got RESET peer seq %u\n", le32_to_cpu(con->in_connect.connect_seq)); + pr_err("ceph %s%d %u.%u.%u.%u:%u connection reset\n", + ENTITY_NAME(con->peer_name), + IPQUADPORT(con->peer_addr.ipaddr)); reset_connection(con); prepare_write_connect_retry(con->msgr, con); prepare_read_connect(con); @@ -1123,24 +1133,27 @@ no_data: */ static void process_message(struct ceph_connection *con) { + struct ceph_msg *msg = con->in_msg; + + con->in_msg = NULL; + /* if first message, set peer_name */ if (con->peer_name.type == 0) - con->peer_name = con->in_msg->hdr.src.name; + con->peer_name = msg->hdr.src.name; spin_lock(&con->out_queue_lock); con->in_seq++; spin_unlock(&con->out_queue_lock); dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n", - con->in_msg, le64_to_cpu(con->in_msg->hdr.seq), - ENTITY_NAME(con->in_msg->hdr.src.name), - le16_to_cpu(con->in_msg->hdr.type), - ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)), - le32_to_cpu(con->in_msg->hdr.front_len), - le32_to_cpu(con->in_msg->hdr.data_len), + msg, le64_to_cpu(msg->hdr.seq), + ENTITY_NAME(msg->hdr.src.name), + le16_to_cpu(msg->hdr.type), + ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), + le32_to_cpu(msg->hdr.front_len), + le32_to_cpu(msg->hdr.data_len), con->in_front_crc, con->in_middle_crc, con->in_data_crc); - con->ops->dispatch(con, con->in_msg); - con->in_msg = NULL; + con->ops->dispatch(con, msg); prepare_read_tag(con); } @@ -1276,7 +1289,7 @@ more: ret = read_partial_connect(con); if (ret <= 0) goto done; - if (process_connect(con) < 0) { + if (process_connect(con) < 0 || test_bit(WAIT, &con->state)) { ret = -1; goto out; } @@ -1384,8 +1397,8 @@ bad_tag: static void queue_con(struct ceph_connection *con) { if (test_bit(WAIT, &con->state) || - test_bit(CLOSED, &con->state)) { - dout("queue_con %p ignoring: WAIT|CLOSED\n", + test_bit(DEAD, &con->state)) { + dout("queue_con %p ignoring: WAIT|DEAD\n", con); return; } @@ -1424,15 +1437,14 @@ more: if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ dout("con_work CLOSED\n"); - ceph_con_destroy(con); - if (test_and_clear_bit(REOPEN, &con->state)) { - /* reopen w/ new peer */ - dout("con_work REPOPEN\n"); - clear_bit(CLOSED, &con->state); - goto more; - } + con_close_socket(con); goto done; } + if (test_and_clear_bit(REOPEN, &con->state)) { + /* reopen w/ new peer */ + dout("con_work REOPEN\n"); + con_close_socket(con); + } if (test_bit(WAIT, &con->state)) { /* we are a zombie */ dout("con_work WAIT\n"); goto done; @@ -1544,6 +1556,7 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr) dout("create ip not specified, initially INADDR_ANY\n"); msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY); msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */ + msgr->inst.addr.nonce = get_random_int(); } msgr->inst.addr.ipaddr.sin_family = AF_INET; @@ -1773,7 +1786,6 @@ void ceph_msg_kfree(struct ceph_msg *m) else kfree(m->front.iov_base); kfree(m); - dout("msg_kfree %p done\n", m); } /* diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 051b9411bce78..465999c5f6b25 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -135,6 +135,7 @@ struct ceph_msg_pos { #define SOCK_CLOSED 11 /* socket state changed to closed */ #define REGISTERED 12 /* connection appears in con_tree */ #define REOPEN 13 /* reopen connection w/ new peer */ +#define DEAD 14 /* dead, about to kfree */ /* * A single connection with another host. @@ -223,6 +224,8 @@ extern void ceph_con_destroy(struct ceph_connection *con); extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg); extern void ceph_con_keepalive(struct ceph_connection *con); extern void ceph_con_close(struct ceph_connection *con); +extern void ceph_con_reopen(struct ceph_connection *con, + struct ceph_entity_addr *addr); extern struct ceph_connection *ceph_con_get(struct ceph_connection *con); extern void ceph_con_put(struct ceph_connection *con); -- 2.39.5