/*
* message header
*/
-struct ceph_msg_header {
+struct ceph_msg_header_old {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
+struct ceph_msg_header {
+ __le64 seq; /* message seq# for this session */
+ __le64 tid; /* transaction id */
+ __le16 type; /* message type */
+ __le16 priority; /* priority. higher value == higher priority */
+ __le16 version; /* version of message encoding */
+
+ __le32 front_len; /* bytes in main payload */
+ __le32 middle_len;/* bytes in middle payload */
+ __le32 data_len; /* bytes of data payload */
+ __le16 data_off; /* sender: include full offset;
+ receiver: mask against ~PAGE_MASK */
+
+ struct ceph_entity_name src;
+ __le32 reserved;
+ __le32 crc; /* header crc32c */
+} __attribute__ ((packed));
+
#define CEPH_MSG_PRIO_LOW 64
#define CEPH_MSG_PRIO_DEFAULT 127
#define CEPH_MSG_PRIO_HIGH 196
int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest)
{
// set envelope
- m->get_header().src = get_myinst();
- m->get_header().orig_src = m->get_header().src;
+ m->get_header().src = get_myname();
if (!m->get_priority()) m->set_priority(get_default_send_priority());
int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest)
{
// set envelope
- m->get_header().src = get_myinst();
- m->get_header().orig_src = m->get_header().src;
+ m->get_header().src = get_myname();
if (!m->get_priority()) m->set_priority(get_default_send_priority());
ceph_msg_header header;
ceph_msg_footer footer;
-
- if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
- return 0;
+ __u32 header_crc;
+
+ if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
+ if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
+ return 0;
+ header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+ } else {
+ ceph_msg_header_old oldheader;
+ if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
+ return 0;
+ // this is fugly
+ memcpy(&header, &oldheader, sizeof(header));
+ header.src = oldheader.src.name;
+ header.reserved = oldheader.reserved;
+ header.crc = oldheader.crc;
+ header_crc = crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+ }
dout(20) << "reader got envelope type=" << header.type
- << " src " << header.src
+ << " src " << entity_name_t(header.src)
<< " front=" << header.front_len
<< " data=" << header.data_len
<< " off " << header.data_off
<< dendl;
// verify header crc
- __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
if (header_crc != header.crc) {
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return 0;
}
- // ok, now it's safe to change the header..
- // munge source address?
- entity_addr_t srcaddr = header.src.addr;
- if (srcaddr.is_blank_addr()) {
- dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
- ceph_entity_addr enc_peer_addr = peer_addr;
- header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr;
- }
-
// read front
bufferlist front;
int front_len = header.front_len;
dout(10) << "aborted = " << aborted << dendl;
if (aborted) {
dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
- << " byte message from " << header.src << ".. ABORTED" << dendl;
+ << " byte message.. ABORTED" << dendl;
// MEH FIXME
Message *m = new MGenericMessage(CEPH_MSG_PING);
header.type = CEPH_MSG_PING;
}
dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
- << " byte message from " << header.src << dendl;
+ << " byte message" << dendl;
return decode_message(header, footer, front, middle, data);
}
msg.msg_iovlen++;
// send envelope
- msgvec[msg.msg_iovlen].iov_base = (char*)&header;
- msgvec[msg.msg_iovlen].iov_len = sizeof(header);
- msglen += sizeof(header);
- msg.msg_iovlen++;
+ ceph_msg_header_old oldheader;
+ if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
+ msgvec[msg.msg_iovlen].iov_base = (char*)&header;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(header);
+ msglen += sizeof(header);
+ msg.msg_iovlen++;
+ } else {
+ memcpy(&oldheader, &header, sizeof(header));
+ oldheader.src.name = header.src;
+ oldheader.src.addr = connection_state->get_peer_addr();
+ oldheader.orig_src = oldheader.src;
+ oldheader.reserved = header.reserved;
+ oldheader.crc = crc32c_le(0, (unsigned char*)&oldheader,
+ sizeof(oldheader) - sizeof(oldheader.crc));
+ msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
+ msglen += sizeof(oldheader);
+ msg.msg_iovlen++;
+ }
// payload (front+data)
list<bufferptr>::const_iterator pb = blist.buffers().begin();