From bee75a94f3071096df577ea4ae7e2282bc334196 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 29 Apr 2008 16:23:02 -0700 Subject: [PATCH] kclient: some msgr cleanups --- src/kernel/messenger.c | 92 +++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index df121f90afed3..25b75f24c0145 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -441,7 +441,7 @@ static void prepare_write_message(struct ceph_connection *con) /* move to sending/sent list */ list_del_init(&m->list_head); list_add_tail(&m->list_head, &con->out_sent); - con->out_msg = m; /* FIXME: do we want to take a reference here? */ + con->out_msg = m; /* we dont bother taking a reference here. */ /* encode header */ dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n", @@ -567,6 +567,7 @@ more: con->connect_seq++; prepare_write_connect(msgr, con); set_bit(CONNECTING, &con->state); + con->in_tag = CEPH_MSGR_TAG_READY; dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state); ret = ceph_tcp_connect(con); @@ -635,7 +636,6 @@ static int prepare_read_message(struct ceph_connection *con) { int err; BUG_ON(con->in_msg != NULL); - con->in_tag = CEPH_MSGR_TAG_MSG; con->in_base_pos = 0; con->in_msg = ceph_msg_new(0, 0, 0, 0, 0); if (IS_ERR(con->in_msg)) { @@ -760,13 +760,34 @@ no_data: return 1; /* done! */ } +static void process_message(struct ceph_connection *con) +{ + /* if first message, set peer_name */ + if (con->peer_name.type == 0) + con->peer_name = con->in_msg->hdr.src.name; + + spin_lock(&con->out_queue_lock); + con->in_seq++; + spin_unlock(&con->out_queue_lock); + ceph_queue_write(con); + + dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n", + con->in_msg, le32_to_cpu(con->in_msg->hdr.seq), + ENTITY_NAME(con->in_msg->hdr.src.name), + le32_to_cpu(con->in_msg->hdr.type), + ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)), + le32_to_cpu(con->in_msg->hdr.front_len), + le32_to_cpu(con->in_msg->hdr.data_len)); + con->msgr->dispatch(con->msgr->parent, con->in_msg); + con->in_msg = 0; + con->in_tag = CEPH_MSGR_TAG_READY; +} /* * prepare to read an ack */ static void prepare_read_ack(struct ceph_connection *con) { - con->in_tag = CEPH_MSGR_TAG_ACK; con->in_base_pos = 0; } @@ -787,10 +808,12 @@ static int read_ack_partial(struct ceph_connection *con) return 1; /* done */ } -static void process_ack(struct ceph_connection *con, __u32 ack) +static void process_ack(struct ceph_connection *con) { struct ceph_msg *m; - __u64 seq; + u32 ack = le32_to_cpu(con->in_partial_ack); + u64 seq; + while (!list_empty(&con->out_sent)) { m = list_entry(con->out_sent.next, struct ceph_msg, list_head); seq = le64_to_cpu(m->hdr.seq); @@ -801,6 +824,7 @@ static void process_ack(struct ceph_connection *con, __u32 ack) list_del_init(&m->list_head); ceph_msg_put(m); } + con->in_tag = CEPH_MSGR_TAG_READY; } @@ -921,7 +945,7 @@ static void process_connect(struct ceph_connection *con) default: derr(1, "process_connect protocol error, will retry\n"); con->delay = BASE_DELAY_INTERVAL; - con->error_msg = "protocol error"; + con->error_msg = "protocol error, garbage tag during connect"; ceph_fault(con); } if (test_bit(WRITE_PENDING, &con->state)) @@ -1107,10 +1131,7 @@ more: if (ret <= 0) goto done; process_connect(con); - if (con->in_tag == CEPH_MSGR_TAG_RETRY) - goto more; - if (test_bit(CLOSED, &con->state)) - goto done; + goto more; } if (con->in_base_pos < 0) { @@ -1128,58 +1149,40 @@ more: if (ret <= 0) goto done; dout(30, "try_read got tag %d\n", (int)con->in_tag); - if (con->in_tag == CEPH_MSGR_TAG_MSG) + switch (con->in_tag) { + case CEPH_MSGR_TAG_MSG: prepare_read_message(con); - else if (con->in_tag == CEPH_MSGR_TAG_ACK) + break; + case CEPH_MSGR_TAG_ACK: prepare_read_ack(con); - else if (con->in_tag == CEPH_MSGR_TAG_CLOSE) { + break; + case CEPH_MSGR_TAG_CLOSE: set_bit(CLOSED, &con->state); /* fixme */ goto done; - } else { - derr(2, "try_read got bad tag %d\n", (int)con->in_tag); - ret = -EINVAL; - goto bad; + default: + goto bad_tag; } - goto more; } if (con->in_tag == CEPH_MSGR_TAG_MSG) { ret = read_message_partial(con); if (ret <= 0) goto done; - - /* if first message, set peer_name */ - if (con->peer_name.type == 0) - con->peer_name = con->in_msg->hdr.src.name; - - spin_lock(&con->out_queue_lock); - con->in_seq++; - spin_unlock(&con->out_queue_lock); - ceph_queue_write(con); - - dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n", - con->in_msg, le32_to_cpu(con->in_msg->hdr.seq), - ENTITY_NAME(con->in_msg->hdr.src.name), - le32_to_cpu(con->in_msg->hdr.type), - ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)), - le32_to_cpu(con->in_msg->hdr.front_len), - le32_to_cpu(con->in_msg->hdr.data_len)); - msgr->dispatch(con->msgr->parent, con->in_msg); - con->in_msg = 0; - con->in_tag = CEPH_MSGR_TAG_READY; + process_message(con); goto more; } if (con->in_tag == CEPH_MSGR_TAG_ACK) { ret = read_ack_partial(con); if (ret <= 0) goto done; - /* got an ack */ - process_ack(con, con->in_partial_ack); - con->in_tag = CEPH_MSGR_TAG_READY; + process_ack(con); goto more; } + +bad_tag: derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag); -bad: - BUG_ON(1); /* shouldn't get here */ + con->error_msg = "protocol error, garbage tag"; + ceph_fault(con); + done: clear_bit(READING, &con->state); if (test_bit(READABLE, &con->state)) { @@ -1209,7 +1212,7 @@ static void try_accept(struct work_struct *work) /* initialize the msgr connection */ new_con = new_connection(msgr); if (new_con == NULL) { - derr(1, "malloc failure\n"); + derr(1, "kmalloc failure accepting new connection\n"); goto done; } if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) { @@ -1219,7 +1222,6 @@ static void try_accept(struct work_struct *work) } dout(5, "accepted connection \n"); - new_con->in_tag = CEPH_MSGR_TAG_READY; new_con->connect_seq = 1; set_bit(ACCEPTING, &new_con->state); clear_bit(NEW, &new_con->state); -- 2.39.5