From 539781438c2b6196b7dd6c6d6a2cbbea6b16bdca Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 29 Nov 2007 05:28:29 +0000 Subject: [PATCH] fixed slab corruption, cleaned up msg get/put git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@2143 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/kernel/client.c | 6 +++--- trunk/ceph/kernel/ktcp.c | 8 +++---- trunk/ceph/kernel/mds_client.c | 4 ---- trunk/ceph/kernel/messenger.c | 39 +++++++++++++++++++--------------- trunk/ceph/kernel/osd_client.c | 1 - 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/trunk/ceph/kernel/client.c b/trunk/ceph/kernel/client.c index 8bedc6ed7fbd4..b64bda1124621 100644 --- a/trunk/ceph/kernel/client.c +++ b/trunk/ceph/kernel/client.c @@ -247,8 +247,7 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) /* mds client */ case CEPH_MSG_MDS_MAP: - //ceph_mdsc_handle_map(&client->mdsc, msg); - ceph_msg_put(msg); + ceph_mdsc_handle_map(&client->mdsc, msg); break; case CEPH_MSG_CLIENT_REPLY: ceph_mdsc_handle_reply(&client->mdsc, msg); @@ -267,6 +266,7 @@ void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg) default: derr(1, "dispatch unknown message type %d\n", msg->hdr.type); - ceph_msg_put(msg); } + + ceph_msg_put(msg); } diff --git a/trunk/ceph/kernel/ktcp.c b/trunk/ceph/kernel/ktcp.c index bbd3ddae2dfb4..76b2e99b3a17c 100644 --- a/trunk/ceph/kernel/ktcp.c +++ b/trunk/ceph/kernel/ktcp.c @@ -48,11 +48,11 @@ static void ceph_state_change(struct sock *sk) dout(30, "ceph_state_change %p state = %u\n", con, con->state); if (sk->sk_state == TCP_ESTABLISHED) { - /*if (test_bit(CONNECTING, &con->state) || - test_bit(ACCEPTING, &con->state)) {*/ + if (test_bit(CONNECTING, &con->state) || + test_bit(ACCEPTING, &con->state)) { dout(30, "ceph_state_change %p socket established, queuing swork\n", con); queue_work(send_wq, &con->swork); - /*}*/ + } } } @@ -222,7 +222,7 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) struct msghdr msg = {.msg_flags = 0}; int rlen = 0; /* length read */ - //dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len); + //dout(30, "ceph_tcp_recvmsg %p len %d %p-%p\n", sock, (int)len, buf, buf+len); msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; /* receive one kvec for now... */ rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); diff --git a/trunk/ceph/kernel/mds_client.c b/trunk/ceph/kernel/mds_client.c index 807783eb7b760..8604b0c22aa2e 100644 --- a/trunk/ceph/kernel/mds_client.c +++ b/trunk/ceph/kernel/mds_client.c @@ -212,7 +212,6 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg spin_unlock(&mdsc->lock); out: - ceph_msg_put(msg); return; bad: @@ -398,7 +397,6 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) put_request(req); done: - ceph_msg_put(msg); return; } @@ -624,7 +622,6 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg put_request(req); out: - ceph_msg_put(msg); return; bad: @@ -688,7 +685,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) complete(&mdsc->map_waiters); out: - ceph_msg_put(msg); return; bad: dout(1, "corrupt map\n"); diff --git a/trunk/ceph/kernel/messenger.c b/trunk/ceph/kernel/messenger.c index d99c30598070a..ebab2fe6c9170 100644 --- a/trunk/ceph/kernel/messenger.c +++ b/trunk/ceph/kernel/messenger.c @@ -13,7 +13,7 @@ static char tag_ready = CEPH_MSGR_TAG_READY; static char tag_reject = CEPH_MSGR_TAG_REJECT; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; -static char tag_close = CEPH_MSGR_TAG_CLOSE; +//static char tag_close = CEPH_MSGR_TAG_CLOSE; static void try_read(struct work_struct *); static void try_write(struct work_struct *); @@ -396,6 +396,8 @@ static void try_write(struct work_struct *work) msgr = con->msgr; more: + dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes); + /* kvec data queued? */ if (con->out_kvec_left) { ret = write_partial_kvec(con); @@ -444,17 +446,18 @@ done: */ static int prepare_read_message(struct ceph_connection *con) { + int err; BUG_ON(con->in_msg != NULL); con->in_tag = CEPH_MSGR_TAG_MSG; con->in_base_pos = 0; - con->in_msg = kzalloc(sizeof(*con->in_msg), GFP_KERNEL); - if (con->in_msg == NULL) { - /* TBD: we don't check for error in caller, handle error */ - derr(1, "kmalloc failure on incoming message\n"); - return -ENOMEM; + con->in_msg = ceph_msg_new(0, 0, 0, 0); + if (IS_ERR(con->in_msg)) { + /* TBD: we don't check for error in caller, handle error here? */ + err = PTR_ERR(con->in_msg); + con->in_msg = 0; + derr(1, "kmalloc failure on incoming message %d\n", err); + return err; } - - ceph_msg_get(con->in_msg); return 0; } @@ -582,7 +585,7 @@ static int read_connect_partial(struct ceph_connection *con) } /* in_tag */ - to = sizeof(con->actual_peer_addr) + 1; + to += 1; if (con->in_base_pos < to) { ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); if (ret <= 0) goto out; @@ -725,7 +728,10 @@ more: * TBD: maybe store error in ceph_connection */ - if (test_bit(CLOSED, &con->state)) goto done; + if (test_bit(CLOSED, &con->state)) { + dout(20, "try_read closed\n"); + goto done; + } if (test_bit(ACCEPTING, &con->state)) { dout(20, "try_read accepting\n"); ret = read_accept_partial(con); @@ -759,9 +765,7 @@ more: if (con->in_tag == CEPH_MSGR_TAG_MSG) { ret = read_message_partial(con); if (ret <= 0) goto done; - /* got a full message! */ - msgr->dispatch(con->msgr->parent, con->in_msg); - ceph_msg_put(con->in_msg); + msgr->dispatch(con->msgr->parent, con->in_msg); /* fixme: use a workqueue */ con->in_msg = 0; con->in_tag = CEPH_MSGR_TAG_READY; goto more; @@ -912,6 +916,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state); if (test_and_clear_bit(NEW, &con->state)) { set_bit(CONNECTING, &con->state); + prepare_write_connect(msgr, con); dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state); ret = ceph_tcp_connect(con); if (ret < 0) { @@ -919,10 +924,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr), ntohs(msg->hdr.dst.addr.ipaddr.sin_port)); remove_connection(msgr, con); - put_connection(con); - return(ret); + goto out; } - prepare_write_connect(msgr, con); } /* queue */ @@ -932,6 +935,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg) ceph_msg_get(msg); list_add_tail(&msg->list_head, &con->out_queue); +out: spin_unlock(&con->lock); put_connection(con); return ret; @@ -959,10 +963,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_of if (m->front.iov_base == NULL) goto out2; dout(50, "ceph_msg_new front is %p len %d\n", m->front.iov_base, front_len); - m->front.iov_len = front_len; } else { m->front.iov_base = 0; } + m->front.iov_len = front_len; /* pages */ m->nr_pages = calc_pages_for(page_len, page_off); @@ -991,6 +995,7 @@ void ceph_msg_put(struct ceph_msg *m) int i; if (atomic_dec_and_test(&m->nref)) { dout(30, "ceph_msg_put last one on %p\n", m); + BUG_ON(!list_empty(&m->list_head)); if (m->pages) { for (i=0; inr_pages; i++) if (m->pages[i]) diff --git a/trunk/ceph/kernel/osd_client.c b/trunk/ceph/kernel/osd_client.c index f012e655ec8b5..aa5d5dcf9e7e9 100644 --- a/trunk/ceph/kernel/osd_client.c +++ b/trunk/ceph/kernel/osd_client.c @@ -314,7 +314,6 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) } out: - ceph_msg_put(msg); return; bad: -- 2.39.5