From: Haomai Wang Date: Thu, 28 May 2015 14:39:24 +0000 (+0800) Subject: AsyncConnection: Allow msg encode without write_lock holding X-Git-Tag: v9.0.2~23^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=947c39d2707630ffa81a9dde031185fccf2a5b93;p=ceph.git AsyncConnection: Allow msg encode without write_lock holding Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 146424dfe150..6d2b8e89b4c4 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1937,7 +1937,7 @@ void AsyncConnection::accept(int incoming) int AsyncConnection::send_message(Message *m) { - ldout(async_msgr->cct, 10) << __func__ << dendl; + ldout(async_msgr->cct, 10) << __func__ << " m=" << m << dendl; if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; @@ -1952,18 +1952,27 @@ int AsyncConnection::send_message(Message *m) logger->inc(l_msgr_send_messages); bufferlist bl; - Mutex::Locker l(write_lock); - m->set_seq(out_seq.inc()); - m->get_header().src = async_msgr->get_myname(); + uint64_t f = get_features(); + + // optimistic think it's ok to encode(actually may broken now) if (!m->get_priority()) m->set_priority(async_msgr->get_default_send_priority()); - if (can_write == 1) - prepare_send_message(m, bl); + prepare_send_message(f, m, bl); + + Mutex::Locker l(write_lock); + m->set_req(out_seq.inc()); + // "features" changes will change the payload encoding + if (can_write == 0 || get_features() != f) { + // ensure the correctness of message encoding + bl.clear(); + ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous " + << f << " != " << get_features() << dedl; + } else { + inject_msg_header_crc(m, bl); + } if (!is_queued() && can_write == 1) { - ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl; - int r = write_message(m, bl); - if (r < 0) { + if (write_message(m, bl) < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; // we want to handle fault within internal thread center->dispatch_event_external(write_handler); @@ -2185,14 +2194,14 @@ void AsyncConnection::_stop() center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); } -void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl) +void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) { assert(write_lock.is_locked()); ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl; // associate message with Connection (for benefit of encode_payload) + m->get_header().src = async_msgr->get_myname(); m->set_connection(this); - uint64_t features = get_features(); if (m->empty_payload()) ldout(async_msgr->cct, 20) << __func__ << " encoding features " << features << " " << m << " " << *m << dendl; @@ -2243,12 +2252,8 @@ void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl) oldheader.src.addr = get_peer_addr(); oldheader.orig_src = oldheader.src; oldheader.reserved = header.reserved; - if (msgr->crcflags & MSG_CRC_HEADER) { - oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, - sizeof(oldheader) - sizeof(oldheader.crc)); - } else { - oldheader.crc = 0; - } + // delay crc calculate to "inject_msg_header_crc" + oldheader.crc = 0; bl.append((char*)&oldheader, sizeof(oldheader)); } @@ -2380,10 +2385,11 @@ void AsyncConnection::handle_write() break; // send_message may not encode message - if (!data.length()) - prepare_send_message(m, data); + if (!data.length()) { + prepare_send_message(get_features(), m, data); + inject_msg_header_crc(m, bl); + } - ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl; r = write_message(m, data); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index bf9df82863cb..11850856c129 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -54,7 +54,21 @@ class AsyncConnection : public Connection { // the main usage is avoid error happen outside messenger threads int _try_send(bufferlist &bl, bool send=true); int _send(Message *m); - void prepare_send_message(Message *m, bufferlist &bl); + void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); +#define HEADER_CRC_OFF (sizeof(char)+offset(ceph_msg_header, crc)) + void inject_msg_header_crc(m, bl) { + if (msgr->crcflags & MSG_CRC_HEADER) { + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + __le32 *header_crc = static_cast<__le32*>(&bl[HEADER_CRC_OFF]); + m->calc_header_crc(); + *header_crc = m->get_header().crc; + } else { + ceph_msg_header_old *oldheader = static_cast(&bl[sizeof(char)]); + oldheader->crc = ceph_crc32c(0, (unsigned char*)oldheader, + sizeof(*oldheader) - sizeof(oldheader->crc)); + } + } + } int read_until(uint64_t needed, char *p); int _process_connection(); void _connect();