* Send a message this may return after partial send
*/
int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
- size_t kvlen, size_t len)
+ size_t kvlen, size_t len, int more)
{
struct msghdr msg = {.msg_flags = 0};
int rlen = 0;
- msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+ msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+ if (more)
+ msg.msg_flags |= MSG_MORE;
+ else
+ msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
+
/*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
/*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
int ceph_tcp_listen(struct ceph_messenger *);
int ceph_tcp_accept(struct socket *, struct ceph_connection *);
int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t);
+int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
void ceph_sock_release(struct socket *);
int ceph_workqueue_init(void);
void ceph_workqueue_shutdown(void);
dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
- con->out_kvec_left, con->out_kvec_bytes);
+ con->out_kvec_left, con->out_kvec_bytes,
+ con->out_more);
if (ret <= 0)
goto out;
con->out_kvec_bytes -= ret;
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
struct page *page;
void *kaddr;
+
mutex_lock(&msg->page_mutex);
if (msg->pages) {
page = msg->pages[con->out_msg_pos.page];
kv.iov_base = kaddr + con->out_msg_pos.page_pos;
kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
- ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
+ ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len, 1);
if (msg->pages)
kunmap(page);
mutex_unlock(&msg->page_mutex);
con->out_kvec_left = 1;
con->out_kvec_cur = con->out_kvec;
con->out_msg = 0;
+ con->out_more = 0; /* end of message */
ret = 1;
out:
con->out_kvec_left = v;
con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = le32_to_cpu(m->hdr.data_len); /* data? */
/* pages */
con->out_msg_pos.page = 0;
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + 4;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 1; /* more will follow.. eventually.. */
set_bit(WRITE_PENDING, &con->state);
}
con->out_kvec_left = 2;
con->out_kvec_bytes = sizeof(msgr->inst.addr) + 4;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
con->out_kvec_left = 1;
con->out_kvec_bytes = 4;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(msgr->inst.addr);
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
con->out_kvec_left = 1;
con->out_kvec_bytes = 1;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + 4;
con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
set_bit(WRITE_PENDING, &con->state);
}