{
int ret;
+ dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
con->out_kvec_left, con->out_kvec_bytes);
int ret;
unsigned data_len = le32_to_cpu(msg->hdr.data_len);
- dout(30, "write_partial_msg_pages %p on %d/%d offset %d\n",
- con, con->out_msg_pos.page, con->out_msg->nr_pages,
+ dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n",
+ con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);
while (con->out_msg_pos.page < con->out_msg->nr_pages) {
/* done */
dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
+
+ con->out_footer.aborted =
+ cpu_to_le32(con->out_msg->pages_revoked);
+ con->out_kvec[0].iov_base = &con->out_footer;
+ con->out_kvec_bytes = con->out_kvec[0].iov_len =
+ sizeof(con->out_footer);
+ con->out_kvec_left = 1;
+ con->out_kvec_cur = con->out_kvec;
+ con->out_msg = 0;
+
ret = 1;
out:
return ret;
con->out_msg = m; /* FIXME: do we want to take a reference here? */
/* encode header */
- dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n",
+ dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
m, le64_to_cpu(m->hdr.seq), le32_to_cpu(m->hdr.type),
- le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len));
+ le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
+ m->nr_pages);
BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
/* tag + hdr + front */
}
/* msg pages? */
- if (con->out_msg) {
+ if (con->out_msg && con->out_msg->nr_pages) {
ret = write_partial_msg_pages(con, con->out_msg);
+ if (ret == 1)
+ goto more_kvec;
if (ret == 0)
goto done;
if (ret < 0) {
goto done;
}
}
-
- /* msg footer */
- if (con->out_msg) {
- con->out_footer.aborted =
- cpu_to_le32(con->out_msg->pages_revoked);
- con->out_kvec[0].iov_base = &con->out_footer;
- con->out_kvec_bytes = con->out_kvec[0].iov_len =
- sizeof(con->out_footer);
- con->out_kvec_left = 1;
- con->out_kvec_cur = con->out_kvec;
- con->out_msg = 0;
- goto more_kvec;
- }
+ con->out_msg = 0; /* done with this message. */
/* anything else pending? */
spin_lock(&con->out_queue_lock);
}
}
-no_data:
/* footer */
dout(0, "reading footer, pos %d / %d\n",
con->in_base_pos, sizeof(m->hdr) + sizeof(m->footer));
dout(0, "new pos %d\n", con->in_base_pos);
}
+no_data:
dout(20, "read_message_partial got msg %p\n", m);
/* did i learn my ip? */
ceph_msg_type_name(le32_to_cpu(msg->hdr.type)),
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.data_len));
- dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg,
- le64_to_cpu(msg->hdr.seq), ENTITY_NAME(msg->hdr.dst.name), con);
+ dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p pgs %d\n", msg,
+ le64_to_cpu(msg->hdr.seq), ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
list_add_tail(&msg->list_head, &con->out_queue);
spin_unlock(&con->out_queue_lock);
m->pages_revoked = 0;
INIT_LIST_HEAD(&m->list_head);
- dout(20, "ceph_msg_new %p\n", m);
+ dout(20, "ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, m->nr_pages);
return m;
out2:
data.push_back(bp);
dout(20) << "reader got data tail " << left << dendl;
}
- }
- // footer
- ceph_msg_footer footer;
- if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
- return 0;
+ // footer
+ ceph_msg_footer footer;
+ if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
+ return 0;
- dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
- if (le32_to_cpu(footer.aborted)) {
- dout(0) << "reader got " << front.length() << " + " << data.length()
- << " byte message from " << env.src << ".. ABORTED" << dendl;
- // MEH FIXME
- Message *m = new MGenericMessage(CEPH_MSG_PING);
- env.type = cpu_to_le32(CEPH_MSG_PING);
- m->set_env(env);
- return m;
- }
+ dout(10) << "aborted = " << le32_to_cpu(footer.aborted) << dendl;
+ if (le32_to_cpu(footer.aborted)) {
+ dout(0) << "reader got " << front.length() << " + " << data.length()
+ << " byte message from " << env.src << ".. ABORTED" << dendl;
+ // MEH FIXME
+ Message *m = new MGenericMessage(CEPH_MSG_PING);
+ env.type = cpu_to_le32(CEPH_MSG_PING);
+ m->set_env(env);
+ return m;
+ }
+ }
dout(20) << "reader got " << front.length() << " + " << data.length()
<< " byte message from " << env.src << dendl;
}
assert(left == 0);
- // send footer
- struct ceph_msg_footer f;
- memset(&f, 0, sizeof(f));
- msgvec[msg.msg_iovlen].iov_base = (void*)&f;
- msgvec[msg.msg_iovlen].iov_len = sizeof(f);
- msglen += sizeof(f);
- msg.msg_iovlen++;
+ if (data.length()) {
+ // send data footer
+ struct ceph_msg_footer f;
+ memset(&f, 0, sizeof(f));
+ msgvec[msg.msg_iovlen].iov_base = (void*)&f;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(f);
+ msglen += sizeof(f);
+ msg.msg_iovlen++;
+ }
// send
if (do_sendmsg(sd, &msg, msglen))