struct ceph_msg *m = con->in_msg;
void *p;
int ret;
- int want, left;
+ int to, want, left;
unsigned front_len, data_len, data_off;
dout(20, "read_message_partial con %p msg %p\n", con, m);
no_data:
/* footer */
- while (con->in_base_pos < sizeof(m->hdr) + sizeof(m->footer)) {
- left = sizeof(m->hdr) + sizeof(m->footer) - con->in_base_pos;
+ to = sizeof(m->hdr) + sizeof(m->footer);
+ while (con->in_base_pos < to) {
+ left = to - con->in_base_pos;
ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
(con->in_base_pos - sizeof(m->hdr)),
left);
return ret;
con->in_base_pos += ret;
}
-
dout(20, "read_message_partial got msg %p\n", m);
/* crc ok? */
- if (con->in_front_crc != con->in_msg->footer.front_crc) {
+ if (con->in_front_crc != m->footer.front_crc) {
derr(0, "read_message_partial %p front crc %u != expected %u\n",
con->in_msg,
- con->in_front_crc, con->in_msg->footer.front_crc);
+ con->in_front_crc, m->footer.front_crc);
return -EIO;
}
- if (con->in_data_crc != con->in_msg->footer.data_crc) {
+ if (con->in_data_crc != m->footer.data_crc) {
derr(0, "read_message_partial %p data crc %u != expected %u\n",
con->in_msg,
- con->in_data_crc, con->in_msg->footer.data_crc);
+ con->in_data_crc, m->footer.data_crc);
return -EIO;
}
<< " <== " << m->get_source_inst()
<< " ==== " << *m
<< " ==== " << m->get_payload().length() << "+" << m->get_data().length()
- << " (" << m->front_crc << " " << m->data_crc << ")"
+ << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
<< " " << m
<< dendl;
dispatch(m);
m->set_source_inst(_myinst);
m->set_orig_source_inst(_myinst);
m->set_dest_inst(dest);
+ m->calc_data_crc();
dout(1) << m->get_source()
<< " --> " << dest.name << " " << dest.addr
<< " -- " << *m
- << " -- " << m
- << dendl;
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
+ << dendl;
rank.submit_message(m, dest.addr);
// set envelope
m->set_source_inst(_myinst);
m->set_dest_inst(dest);
+ m->calc_data_crc();
dout(1) << m->get_source()
<< " **> " << dest.name << " " << dest.addr
<< " -- " << *m
- << " -- " << m
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
<< dendl;
rank.submit_message(m, dest.addr);
m->set_source_inst(_myinst);
m->set_orig_source_inst(_myinst);
m->set_dest_inst(dest);
+ m->calc_data_crc();
dout(1) << "lazy " << m->get_source()
<< " --> " << dest.name << " " << dest.addr
<< " -- " << *m
- << " -- " << m
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
<< dendl;
rank.submit_message(m, dest.addr, true);
// encode and copy out of *m
if (m->empty_payload())
m->encode_payload();
- bufferlist payload, data;
- payload.claim(m->get_payload());
- data.claim(m->get_data());
- ceph_msg_header hdr = m->get_header();
+ m->calc_front_crc();
lock.Lock();
sent.push_back(m); // move to sent list
lock.Unlock();
dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
- int rc = write_message(m, &hdr, payload, data);
+ int rc = write_message(m);
lock.Lock();
if (rc < 0) {
- derr(1) << "writer error sending " << m << " to " << hdr.dst << ", "
+ derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
<< errno << ": " << strerror(errno) << dendl;
fault();
}
// verify header crc
__u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
if (header_crc != header.crc) {
- dout(0) << "reader got bad header crc " << header_crc << " != " << header_crc << dendl;
+ dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return 0;
}
}
-int Rank::Pipe::write_message(Message *m, ceph_msg_header *header,
- bufferlist &payload, bufferlist &data)
+int Rank::Pipe::write_message(Message *m)
{
- struct ceph_msg_footer f;
- memset(&f, 0, sizeof(f));
+ ceph_msg_header& header = m->get_header();
+ ceph_msg_footer& footer = m->get_footer();
// get envelope, buffers
- header->front_len = payload.length();
- header->data_len = data.length();
-
- // calculate header, footer crc
- header->crc = crc32c_le(0, (unsigned char*)header, sizeof(*header) - sizeof(header->crc));
- f.front_crc = payload.crc32c(0);
- f.data_crc = data.crc32c(0);
+ header.front_len = m->get_payload().length();
+ header.data_len = m->get_data().length();
+ footer.aborted = 0;
+ m->calc_header_crc();
- bufferlist blist = payload;
- blist.append(data);
+ bufferlist blist = m->get_payload();
+ blist.append(m->get_data());
- dout(20) << "write_message " << m << " to " << header->dst << dendl;
+ dout(20) << "write_message " << m << " to " << header.dst << dendl;
// set up msghdr and iovecs
struct msghdr msg;
msg.msg_iovlen++;
// send envelope
- msgvec[msg.msg_iovlen].iov_base = (char*)header;
- msgvec[msg.msg_iovlen].iov_len = sizeof(*header);
- msglen += sizeof(*header);
+ msgvec[msg.msg_iovlen].iov_base = (char*)&header;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(header);
+ msglen += sizeof(header);
msg.msg_iovlen++;
// payload (front+data)
}
assert(left == 0);
- // send data footer
- msgvec[msg.msg_iovlen].iov_base = (void*)&f;
- msgvec[msg.msg_iovlen].iov_len = sizeof(f);
- msglen += sizeof(f);
+ // send footer
+ msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
+ msglen += sizeof(footer);
msg.msg_iovlen++;
// send