From 6094787028d1f796bab02f7bb8bd7b8433ebba7f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 29 Apr 2008 17:58:16 -0700 Subject: [PATCH] kclient: fixed various msgr bugs on incoming connections --- src/TODO | 1 + src/kernel/client.c | 1 + src/kernel/messenger.c | 39 +++++++++++++++++++++++++++++++-------- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/TODO b/src/TODO index 175613780719f..308d262bc3ebb 100644 --- a/src/TODO +++ b/src/TODO @@ -62,6 +62,7 @@ pgmon - watch osd utilization; adjust overload in cluster map mon +- use standby mds on mds failure.. not just mds boot - paxos need to clean up old states. - some sort of tester for PaxosService... - osdmon needs to lower-bound old osdmap versions it keeps around? diff --git a/src/kernel/client.c b/src/kernel/client.c index 5aed7698ed27d..455a7e1dad956 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -343,6 +343,7 @@ const char *ceph_msg_type_name(int type) case CEPH_MSG_PING: return "ping"; case CEPH_MSG_PING_ACK: return "ping_ack"; case CEPH_MSG_MON_MAP: return "mon_map"; + case CEPH_MSG_MON_GET_MAP: return "mon_get_map"; case CEPH_MSG_CLIENT_MOUNT: return "client_mount"; case CEPH_MSG_CLIENT_UNMOUNT: return "client_unmount"; case CEPH_MSG_STATFS: return "statfs"; diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 1306249b66b54..207dd41e64014 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -267,6 +267,10 @@ static void ceph_fault(struct ceph_connection *con) dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n", con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); + ceph_debug_console = 1; + ceph_debug_msgr = 40; + ceph_debug = 40; + /* PW if never get here remove */ if (test_bit(WAIT, &con->state)) { derr(30, "fault socket close during WAIT state\n"); @@ -488,6 +492,11 @@ static void prepare_write_ack(struct ceph_connection *con) set_bit(WRITE_PENDING, &con->state); } +static void prepare_read_connect(struct ceph_connection *con) +{ + con->in_base_pos = 0; +} + static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con) { @@ -502,6 +511,18 @@ static void prepare_write_connect(struct ceph_messenger *msgr, set_bit(WRITE_PENDING, &con->state); } +static void prepare_write_connect_retry(struct ceph_messenger *msgr, + struct ceph_connection *con) +{ + con->out_connect_seq = cpu_to_le32(con->connect_seq); + con->out_kvec[0].iov_base = &con->out_connect_seq; + con->out_kvec[0].iov_len = 4; + con->out_kvec_left = 1; + con->out_kvec_bytes = 4; + con->out_kvec_cur = con->out_kvec; + set_bit(WRITE_PENDING, &con->state); +} + static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con) { @@ -566,6 +587,7 @@ more: if (test_and_clear_bit(STANDBY, &con->state)) con->connect_seq++; prepare_write_connect(msgr, con); + prepare_read_connect(con); set_bit(CONNECTING, &con->state); con->in_tag = CEPH_MSGR_TAG_READY; dout(5, "try_write initiating connect on %p new state %lu\n", @@ -924,7 +946,7 @@ static void process_connect(struct ceph_connection *con) dout(10, "process_connect got RESET peer seq %u\n", le32_to_cpu(con->in_connect_seq)); reset_connection(con); - prepare_write_connect(con->msgr, con); + prepare_write_connect_retry(con->msgr, con); con->msgr->peer_reset(con->msgr->parent, &con->peer_name); break; case CEPH_MSGR_TAG_RETRY: @@ -933,7 +955,7 @@ static void process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq)); con->connect_seq = le32_to_cpu(con->in_connect_seq); - prepare_write_connect(con->msgr, con); + prepare_write_connect_retry(con->msgr, con); break; case CEPH_MSGR_TAG_WAIT: dout(10, "process_connect peer connecting WAIT\n"); @@ -1013,10 +1035,8 @@ static void __replace_connection(struct ceph_messenger *msgr, new->out_seq = old->out_seq; /* replace list entry */ - spin_lock(&msgr->con_lock); list_add(&new->list_bucket, &old->list_bucket); list_del_init(&old->list_bucket); - spin_unlock(&msgr->con_lock); set_bit(CLOSED, &old->state); put_connection(old); /* dec reference count */ @@ -1091,7 +1111,7 @@ static void process_accept(struct ceph_connection *con) prepare_write_accept_reply(con, &tag_ready); } spin_unlock(&msgr->con_lock); - /* queue write */ + ceph_queue_write(con); put_connection(con); } @@ -1219,6 +1239,12 @@ static void try_accept(struct work_struct *work) derr(1, "kmalloc failure accepting new connection\n"); goto done; } + + new_con->connect_seq = 1; + set_bit(ACCEPTING, &new_con->state); + clear_bit(NEW, &new_con->state); + new_con->in_tag = CEPH_MSGR_TAG_READY; /* eventually, hopefully */ + if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { derr(1, "error accepting connection\n"); put_connection(new_con); @@ -1226,9 +1252,6 @@ static void try_accept(struct work_struct *work) } dout(5, "accepted connection \n"); - new_con->connect_seq = 1; - set_bit(ACCEPTING, &new_con->state); - clear_bit(NEW, &new_con->state); prepare_write_accept_announce(msgr, new_con); add_connection_accepting(msgr, new_con); -- 2.39.5