con->out_msg_pos.page_pos);
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
- struct page *page = msg->pages[con->out_msg_pos.page];
- void *kaddr = kmap(page);
+ struct page *page;
+ void *kaddr;
+ mutex_lock(&msg->page_mutex);
+ page = msg->pages[con->out_msg_pos.page];
+ if (page)
+ kaddr = kmap(page);
+ else {
+ derr(0, "using zero page\n");
+ kaddr = page_address(page);
+ page = 0;
+ }
kv.iov_base = kaddr + con->out_msg_pos.page_pos;
kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
(int)(data_len - con->out_msg_pos.data_pos));
ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
- kunmap(page);
+ if (page)
+ kunmap(page);
+ mutex_unlock(&msg->page_mutex);
if (ret <= 0)
goto out;
con->out_msg_pos.data_pos += ret;
/* done */
dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
- con->out_msg = 0;
ret = 1;
out:
return ret;
}
/* kvec data queued? */
+more_kvec:
if (con->out_kvec_left) {
ret = write_partial_kvec(con);
if (ret == 0)
}
}
+ /* msg footer */
+ if (con->out_msg) {
+ con->out_footer.aborted =
+ cpu_to_le32(con->out_msg->pages_revoked);
+ con->out_kvec[0].iov_base = &con->out_footer;
+ con->out_kvec_bytes = con->out_kvec[0].iov_len =
+ sizeof(con->out_footer);
+ con->out_kvec_left = 1;
+ con->out_kvec_cur = con->out_kvec;
+ con->out_msg = 0;
+ goto more_kvec;
+ }
+
/* anything else pending? */
spin_lock(&con->out_queue_lock);
if (!list_empty(&con->out_queue)) {
dout(20, "read_message_partial con %p msg %p\n", con, m);
/* header */
- while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
- left = sizeof(struct ceph_msg_header) - con->in_base_pos;
+ while (con->in_base_pos < sizeof(m->hdr)) {
+ left = sizeof(m->hdr) - con->in_base_pos;
ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
left);
if (ret <= 0)
return ret;
con->in_base_pos += ret;
- if (con->in_base_pos == sizeof(struct ceph_msg_header))
- break;
}
/* front */
data_len = le32_to_cpu(m->hdr.data_len);
data_off = le32_to_cpu(m->hdr.data_off);
if (data_len == 0)
- goto done;
+ goto no_data;
if (m->nr_pages == 0) {
con->in_msg_pos.page = 0;
con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
}
}
-done:
+no_data:
+ /* footer */
+ dout(0, "reading footer, pos %d / %d\n",
+ con->in_base_pos, sizeof(m->hdr) + sizeof(m->footer));
+ while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
+ left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
+ dout(0, "left %d\n", left);
+ ret = ceph_tcp_recvmsg(con->sock, &m->footer +
+ (con->in_base_pos - sizeof(m->hdr)),
+ left);
+ dout(0, "got %d\n", ret);
+ if (ret <= 0)
+ return ret;
+ con->in_base_pos += ret;
+ dout(0, "new pos %d\n", con->in_base_pos);
+ }
+
dout(20, "read_message_partial got msg %p\n", m);
/* did i learn my ip? */
INIT_LIST_HEAD(&msgr->con_accepting);
INIT_RADIX_TREE(&msgr->con_tree, GFP_KERNEL);
+ msgr->zero_page = alloc_page(GFP_KERNEL | __GFP_ZERO);
+ if (!msgr->zero_page) {
+ kfree(msgr);
+ return ERR_PTR(-ENOMEM);
+ }
+ kmap(msgr->zero_page);
+
/* pick listening address */
if (myaddr) {
msgr->inst.addr = *myaddr;
/* stop listener */
ceph_sock_release(msgr->listen_sock);
+ kunmap(msgr->zero_page);
+ __free_page(msgr->zero_page);
kfree(msgr);
}
if (m == NULL)
goto out;
atomic_set(&m->nref, 1);
+ mutex_init(&m->page_mutex);
+
m->hdr.type = cpu_to_le32(type);
m->hdr.front_len = cpu_to_le32(front_len);
m->hdr.data_len = cpu_to_le32(page_len);
/* pages */
m->nr_pages = calc_pages_for(page_off, page_len);
m->pages = pages;
+ m->pages_revoked = 0;
INIT_LIST_HEAD(&m->list_head);
dout(20, "ceph_msg_new %p\n", m);
#ifndef __FS_CEPH_MESSENGER_H
#define __FS_CEPH_MESSENGER_H
-#include <linux/uio.h>
+#include <linux/mutex.h>
#include <linux/net.h>
#include <linux/radix-tree.h>
-#include <linux/workqueue.h>
+#include <linux/uio.h>
#include <linux/version.h>
+#include <linux/workqueue.h>
#include "ceph_fs.h"
struct list_head con_all; /* all connections */
struct list_head con_accepting; /* accepting */
struct radix_tree_root con_tree; /* established */
+ struct page *zero_page;
};
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
+ struct ceph_msg_footer footer; /* footer */
struct kvec front; /* first bit of message */
+ struct mutex page_mutex;
struct page **pages; /* data payload. NOT OWNER. */
+ int pages_revoked; /* if true, pages revoked before sent */
unsigned nr_pages; /* size of page array */
struct list_head list_head;
atomic_t nref;
*out_kvec_cur;
int out_kvec_left; /* kvec's left */
int out_kvec_bytes; /* bytes left */
+ struct ceph_msg_footer out_footer;
struct ceph_msg *out_msg;
struct ceph_msg_pos out_msg_pos;
dout(20) << "reader got data tail " << left << dendl;
}
}
-
- // unmarshall message
- dout(20) << "reader got " << front.length() << " + " << data.length() << " byte message from "
- << env.src << dendl;
- Message *m = decode_message(env, front, data);
+ // footer
+ ceph_msg_footer footer;
+ if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
+ return 0;
-
- return m;
+ dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
+ if (le32_to_cpu(footer.aborted)) {
+ dout(0) << "reader got " << front.length() << " + " << data.length()
+ << " byte message from " << env.src << ".. ABORTED" << dendl;
+ // MEH FIXME
+ Message *m = new MGenericMessage(CEPH_MSG_PING);
+ env.type = cpu_to_le32(CEPH_MSG_PING);
+ m->set_env(env);
+ return m;
+ }
+
+ dout(20) << "reader got " << front.length() << " + " << data.length()
+ << " byte message from " << env.src << dendl;
+ return decode_message(env, front, data);
}
<< " writing " << donow
<< dendl;
- if (msg.msg_iovlen >= IOV_MAX-1) {
+ if (msg.msg_iovlen >= IOV_MAX-2) {
if (do_sendmsg(sd, &msg, msglen))
return -1;
}
}
assert(left == 0);
-
+
+ // send footer
+ struct ceph_msg_footer f;
+ memset(&f, 0, sizeof(f));
+ msgvec[msg.msg_iovlen].iov_base = (void*)&f;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(f);
+ msglen += sizeof(f);
+ msg.msg_iovlen++;
+
// send
if (do_sendmsg(sd, &msg, msglen))
return -1;