]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use MSG_MORE flag appropriately with sendmsg
authorSage Weil <sage@newdream.net>
Wed, 30 Apr 2008 03:49:01 +0000 (20:49 -0700)
committerSage Weil <sage@newdream.net>
Wed, 30 Apr 2008 03:49:01 +0000 (20:49 -0700)
src/TODO
src/kernel/ktcp.c
src/kernel/ktcp.h
src/kernel/messenger.c
src/kernel/messenger.h

index 308d262bc3ebbc10363c4b1c9341747674d59a7d..175613780719f8405ccb6b0dc6b68383cefd470d 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -62,7 +62,6 @@ pgmon
 - watch osd utilization; adjust overload in cluster map
 
 mon
-- use standby mds on mds failure.. not just mds boot
 - paxos need to clean up old states.
 - some sort of tester for PaxosService...
 - osdmon needs to lower-bound old osdmap versions it keeps around?
index cd31c930a1645bee2fa84404fee424c308521190..5220025ba0d20bb7ee770d32ef233baebf501da1 100644 (file)
@@ -285,12 +285,17 @@ int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
  * Send a message this may return after partial send
  */
 int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
-                    size_t kvlen, size_t len)
+                    size_t kvlen, size_t len, int more)
 {
        struct msghdr msg = {.msg_flags = 0};
        int rlen = 0;
 
-       msg.msg_flags |=  MSG_DONTWAIT | MSG_NOSIGNAL;
+       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+       if (more)
+               msg.msg_flags |= MSG_MORE;
+       else
+               msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
+
        /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
        rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
        /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
index 95e99f6e112120ba3b8db056291ce242b4589279..a783a6e8a385acc00fc17527f8d29d828f5ea777 100644 (file)
@@ -9,7 +9,7 @@ int ceph_tcp_connect(struct ceph_connection *);
 int ceph_tcp_listen(struct ceph_messenger *);
 int ceph_tcp_accept(struct socket *, struct ceph_connection *);
 int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t);
+int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
 void ceph_sock_release(struct socket *);
 int ceph_workqueue_init(void);
 void ceph_workqueue_shutdown(void);
index 24ff5270dda50ac659094f97e440fc2e8bdfb186..8e453f643cb612d4711726316fa8e87becfbec7d 100644 (file)
@@ -334,7 +334,8 @@ static int write_partial_kvec(struct ceph_connection *con)
        dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
        while (con->out_kvec_bytes > 0) {
                ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
-                                      con->out_kvec_left, con->out_kvec_bytes);
+                                      con->out_kvec_left, con->out_kvec_bytes,
+                                      con->out_more);
                if (ret <= 0)
                        goto out;
                con->out_kvec_bytes -= ret;
@@ -375,6 +376,7 @@ static int write_partial_msg_pages(struct ceph_connection *con,
        while (con->out_msg_pos.page < con->out_msg->nr_pages) {
                struct page *page;
                void *kaddr;
+
                mutex_lock(&msg->page_mutex);
                if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
@@ -386,7 +388,7 @@ static int write_partial_msg_pages(struct ceph_connection *con,
                kv.iov_base = kaddr + con->out_msg_pos.page_pos;
                kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
                                 (int)(data_len - con->out_msg_pos.data_pos));
-               ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
+               ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1);
                if (msg->pages)
                        kunmap(page);
                mutex_unlock(&msg->page_mutex);
@@ -410,6 +412,7 @@ static int write_partial_msg_pages(struct ceph_connection *con,
        con->out_kvec_left = 1;
        con->out_kvec_cur = con->out_kvec;
        con->out_msg = 0;
+       con->out_more = 0;  /* end of message */
        
        ret = 1;
 out:
@@ -459,6 +462,7 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_kvec_left = v;
        con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = le32_to_cpu(m->hdr.data_len);  /* data? */
 
        /* pages */
        con->out_msg_pos.page = 0;
@@ -485,6 +489,7 @@ static void prepare_write_ack(struct ceph_connection *con)
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + 4;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 1;  /* more will follow.. eventually.. */
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -504,6 +509,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        con->out_kvec_left = 2;
        con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -516,6 +522,7 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr,
        con->out_kvec_left = 1;
        con->out_kvec_bytes = 4;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -527,6 +534,7 @@ static void prepare_write_accept_announce(struct ceph_messenger *msgr,
        con->out_kvec_left = 1;
        con->out_kvec_bytes = sizeof(msgr->inst.addr);
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -537,6 +545,7 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
        con->out_kvec_left = 1;
        con->out_kvec_bytes = 1;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
@@ -550,6 +559,7 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag)
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + 4;
        con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 }
 
index 59fb4204bf1f9038890c7b7c139caa6addc54d23..356b8faf9228308b238aa3342b1e526e66055b00 100644 (file)
@@ -113,6 +113,7 @@ struct ceph_connection {
                *out_kvec_cur;
        int out_kvec_left;   /* kvec's left */
        int out_kvec_bytes;  /* bytes left */
+       int out_more;        /* there is more data after this kvec */
        struct ceph_msg_footer out_footer;
        struct ceph_msg *out_msg;
        struct ceph_msg_pos out_msg_pos;