int AsyncConnection::send_message(Message *m)
{
- ldout(async_msgr->cct, 10) << __func__ << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " m=" << m << dendl;
if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
logger->inc(l_msgr_send_messages);
bufferlist bl;
- Mutex::Locker l(write_lock);
- m->set_seq(out_seq.inc());
- m->get_header().src = async_msgr->get_myname();
+ uint64_t f = get_features();
+
+ // optimistic think it's ok to encode(actually may broken now)
if (!m->get_priority())
m->set_priority(async_msgr->get_default_send_priority());
- if (can_write == 1)
- prepare_send_message(m, bl);
+ prepare_send_message(f, m, bl);
+
+ Mutex::Locker l(write_lock);
+ m->set_req(out_seq.inc());
+ // "features" changes will change the payload encoding
+ if (can_write == 0 || get_features() != f) {
+ // ensure the correctness of message encoding
+ bl.clear();
+ ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
+ << f << " != " << get_features() << dedl;
+ } else {
+ inject_msg_header_crc(m, bl);
+ }
if (!is_queued() && can_write == 1) {
- ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
- int r = write_message(m, bl);
- if (r < 0) {
+ if (write_message(m, bl) < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
// we want to handle fault within internal thread
center->dispatch_event_external(write_handler);
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
}
-void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl)
+void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
{
assert(write_lock.is_locked());
ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
+ m->get_header().src = async_msgr->get_myname();
m->set_connection(this);
- uint64_t features = get_features();
if (m->empty_payload())
ldout(async_msgr->cct, 20) << __func__ << " encoding features "
<< features << " " << m << " " << *m << dendl;
oldheader.src.addr = get_peer_addr();
oldheader.orig_src = oldheader.src;
oldheader.reserved = header.reserved;
- if (msgr->crcflags & MSG_CRC_HEADER) {
- oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
- sizeof(oldheader) - sizeof(oldheader.crc));
- } else {
- oldheader.crc = 0;
- }
+ // delay crc calculate to "inject_msg_header_crc"
+ oldheader.crc = 0;
bl.append((char*)&oldheader, sizeof(oldheader));
}
break;
// send_message may not encode message
- if (!data.length())
- prepare_send_message(m, data);
+ if (!data.length()) {
+ prepare_send_message(get_features(), m, data);
+ inject_msg_header_crc(m, bl);
+ }
- ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
r = write_message(m, data);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
// the main usage is avoid error happen outside messenger threads
int _try_send(bufferlist &bl, bool send=true);
int _send(Message *m);
- void prepare_send_message(Message *m, bufferlist &bl);
+ void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
+#define HEADER_CRC_OFF (sizeof(char)+offset(ceph_msg_header, crc))
+ void inject_msg_header_crc(m, bl) {
+ if (msgr->crcflags & MSG_CRC_HEADER) {
+ if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
+ __le32 *header_crc = static_cast<__le32*>(&bl[HEADER_CRC_OFF]);
+ m->calc_header_crc();
+ *header_crc = m->get_header().crc;
+ } else {
+ ceph_msg_header_old *oldheader = static_cast<ceph_msg_header_old*>(&bl[sizeof(char)]);
+ oldheader->crc = ceph_crc32c(0, (unsigned char*)oldheader,
+ sizeof(*oldheader) - sizeof(oldheader->crc));
+ }
+ }
+ }
int read_until(uint64_t needed, char *p);
int _process_connection();
void _connect();