From: Sage Weil Date: Wed, 30 Apr 2008 03:49:01 +0000 (-0700) Subject: kclient: use MSG_MORE flag appropriately with sendmsg X-Git-Tag: v0.2~73 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1ec7f0f4de60f0a02ee7e9c1866309d6321f8cf2;p=ceph.git kclient: use MSG_MORE flag appropriately with sendmsg --- diff --git a/src/TODO b/src/TODO index 308d262bc3eb..175613780719 100644 --- a/src/TODO +++ b/src/TODO @@ -62,7 +62,6 @@ pgmon - watch osd utilization; adjust overload in cluster map mon -- use standby mds on mds failure.. not just mds boot - paxos need to clean up old states. - some sort of tester for PaxosService... - osdmon needs to lower-bound old osdmap versions it keeps around? diff --git a/src/kernel/ktcp.c b/src/kernel/ktcp.c index cd31c930a164..5220025ba0d2 100644 --- a/src/kernel/ktcp.c +++ b/src/kernel/ktcp.c @@ -285,12 +285,17 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) * Send a message this may return after partial send */ int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, - size_t kvlen, size_t len) + size_t kvlen, size_t len, int more) { struct msghdr msg = {.msg_flags = 0}; int rlen = 0; - msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; + msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL; + if (more) + msg.msg_flags |= MSG_MORE; + else + msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ + /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/ rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len); /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/ diff --git a/src/kernel/ktcp.h b/src/kernel/ktcp.h index 95e99f6e1121..a783a6e8a385 100644 --- a/src/kernel/ktcp.h +++ b/src/kernel/ktcp.h @@ -9,7 +9,7 @@ int ceph_tcp_connect(struct ceph_connection *); int ceph_tcp_listen(struct ceph_messenger *); int ceph_tcp_accept(struct socket *, struct ceph_connection *); int ceph_tcp_recvmsg(struct socket *, void *, size_t ); -int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t); +int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more); void ceph_sock_release(struct socket *); int ceph_workqueue_init(void); void ceph_workqueue_shutdown(void); diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 24ff5270dda5..8e453f643cb6 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -334,7 +334,8 @@ static int write_partial_kvec(struct ceph_connection *con) dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes); while (con->out_kvec_bytes > 0) { ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, - con->out_kvec_left, con->out_kvec_bytes); + con->out_kvec_left, con->out_kvec_bytes, + con->out_more); if (ret <= 0) goto out; con->out_kvec_bytes -= ret; @@ -375,6 +376,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, while (con->out_msg_pos.page < con->out_msg->nr_pages) { struct page *page; void *kaddr; + mutex_lock(&msg->page_mutex); if (msg->pages) { page = msg->pages[con->out_msg_pos.page]; @@ -386,7 +388,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, kv.iov_base = kaddr + con->out_msg_pos.page_pos; kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), (int)(data_len - con->out_msg_pos.data_pos)); - ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len); + ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1); if (msg->pages) kunmap(page); mutex_unlock(&msg->page_mutex); @@ -410,6 +412,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, con->out_kvec_left = 1; con->out_kvec_cur = con->out_kvec; con->out_msg = 0; + con->out_more = 0; /* end of message */ ret = 1; out: @@ -459,6 +462,7 @@ static void prepare_write_message(struct ceph_connection *con) con->out_kvec_left = v; con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len; con->out_kvec_cur = con->out_kvec; + con->out_more = le32_to_cpu(m->hdr.data_len); /* data? */ /* pages */ con->out_msg_pos.page = 0; @@ -485,6 +489,7 @@ static void prepare_write_ack(struct ceph_connection *con) con->out_kvec_left = 2; con->out_kvec_bytes = 1 + 4; con->out_kvec_cur = con->out_kvec; + con->out_more = 1; /* more will follow.. eventually.. */ set_bit(WRITE_PENDING, &con->state); } @@ -504,6 +509,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr, con->out_kvec_left = 2; con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4; con->out_kvec_cur = con->out_kvec; + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } @@ -516,6 +522,7 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr, con->out_kvec_left = 1; con->out_kvec_bytes = 4; con->out_kvec_cur = con->out_kvec; + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } @@ -527,6 +534,7 @@ static void prepare_write_accept_announce(struct ceph_messenger *msgr, con->out_kvec_left = 1; con->out_kvec_bytes = sizeof(msgr->inst.addr); con->out_kvec_cur = con->out_kvec; + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } @@ -537,6 +545,7 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) con->out_kvec_left = 1; con->out_kvec_bytes = 1; con->out_kvec_cur = con->out_kvec; + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } @@ -550,6 +559,7 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag) con->out_kvec_left = 2; con->out_kvec_bytes = 1 + 4; con->out_kvec_cur = con->out_kvec; + con->out_more = 0; set_bit(WRITE_PENDING, &con->state); } diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 59fb4204bf1f..356b8faf9228 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -113,6 +113,7 @@ struct ceph_connection { *out_kvec_cur; int out_kvec_left; /* kvec's left */ int out_kvec_bytes; /* bytes left */ + int out_more; /* there is more data after this kvec */ struct ceph_msg_footer out_footer; struct ceph_msg *out_msg; struct ceph_msg_pos out_msg_pos;