From c5a6b6a5ae801323fe89434367964db7d07c1e64 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Piotr=20Da=C5=82ek?= Date: Thu, 11 Feb 2016 09:38:46 +0100 Subject: [PATCH] msg/async: cut the middle-man MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Get rid of complete_bl and let messenger write directly to outcoming_bl of a connection. Also, if message bufferlist is small enough, append its contents to outcoming_bl directly, so we'll use less iovecs and in best case, pack entire message (together with header and footer added in write_message()) in single bufferptr. Signed-off-by: Piotr Dałek --- src/msg/async/AsyncConnection.cc | 60 ++++++++++++++++---------------- src/msg/async/AsyncConnection.h | 5 +-- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index aafe894552ce..cd913c9df370 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -45,6 +45,7 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { // 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; +const int ASYNC_COALESCE_THRESHOLD = 256; class C_time_wakeup : public EventCallback { AsyncConnectionRef conn; @@ -344,13 +345,8 @@ ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more) // return the remaining bytes, it may larger than the length of ptr // else return < 0 means error -ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send, bool more) +ssize_t AsyncConnection::_try_send(bool send, bool more) { - ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl; - if (send_bl.length()) { - outcoming_bl.claim_append(send_bl); - } - if (!send) return 0; @@ -2338,14 +2334,13 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) << "): sig = " << footer.sig << dendl; } } + + unsigned original_bl_len = outcoming_bl.length(); - bufferlist complete_bl; - // send tag - char tag = CEPH_MSGR_TAG_MSG; - complete_bl.append(&tag, sizeof(tag)); + outcoming_bl.append(CEPH_MSGR_TAG_MSG); if (has_feature(CEPH_FEATURE_NOSRCADDR)) { - complete_bl.append((char*)&header, sizeof(header)); + outcoming_bl.append((char*)&header, sizeof(header)); } else { ceph_msg_header_old oldheader; memcpy(&oldheader, &header, sizeof(header)); @@ -2355,7 +2350,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) oldheader.reserved = header.reserved; oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); - complete_bl.append((char*)&oldheader, sizeof(oldheader)); + outcoming_bl.append((char*)&oldheader, sizeof(oldheader)); } ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type @@ -2364,12 +2359,19 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) << " data=" << header.data_len << " off " << header.data_off << dendl; - complete_bl.claim_append(bl); + if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { + std::list::const_iterator pb; + for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) { + outcoming_bl.append((char*)pb->c_str(), pb->length()); + } + } else { + outcoming_bl.claim_append(bl); + } // send footer; if receiver doesn't support signatures, use the old footer format ceph_msg_footer_old old_footer; if (has_feature(CEPH_FEATURE_MSG_AUTH)) { - complete_bl.append((char*)&footer, sizeof(footer)); + outcoming_bl.append((char*)&footer, sizeof(footer)); } else { if (msgr->crcflags & MSG_CRC_HEADER) { old_footer.front_crc = footer.front_crc; @@ -2380,13 +2382,13 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) } old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0; old_footer.flags = footer.flags; - complete_bl.append((char*)&old_footer, sizeof(old_footer)); + outcoming_bl.append((char*)&old_footer, sizeof(old_footer)); } - logger->inc(l_msgr_send_bytes, complete_bl.length()); + logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len); ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() << " " << m << dendl; - ssize_t rc = _try_send(complete_bl, true, more); + ssize_t rc = _try_send(true, more); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(errno) << dendl; @@ -2435,7 +2437,6 @@ void AsyncConnection::mark_down() void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { assert(write_lock.is_locked()); - bufferlist bl; utime_t t = ceph_clock_now(async_msgr->cct); struct ceph_timespec ts; @@ -2443,25 +2444,24 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) if (ack) { assert(tp); tp->encode_timeval(&ts); - bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); - bl.append((char*)&ts, sizeof(ts)); + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + outcoming_bl.append((char*)&ts, sizeof(ts)); } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { struct ceph_timespec ts; t.encode_timeval(&ts); - bl.append(CEPH_MSGR_TAG_KEEPALIVE2); - bl.append((char*)&ts, sizeof(ts)); + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2); + outcoming_bl.append((char*)&ts, sizeof(ts)); } else { - bl.append(CEPH_MSGR_TAG_KEEPALIVE); + outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE); } ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl; - _try_send(bl, false); + _try_send(false); } void AsyncConnection::handle_write() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; - bufferlist bl; ssize_t r = 0; write_lock.Lock(); @@ -2495,14 +2495,14 @@ void AsyncConnection::handle_write() if (left) { ceph_le64 s; s = in_seq.read(); - bl.append(CEPH_MSGR_TAG_ACK); - bl.append((char*)&s, sizeof(s)); + outcoming_bl.append(CEPH_MSGR_TAG_ACK); + outcoming_bl.append((char*)&s, sizeof(s)); ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left.sub(left); left = ack_left.read(); - r = _try_send(bl, true, left); + r = _try_send(true, left); } else if (is_queued()) { - r = _try_send(bl); + r = _try_send(); } write_lock.Unlock(); @@ -2519,7 +2519,7 @@ void AsyncConnection::handle_write() << " policy.server is false" << dendl; _connect(); } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { - r = _try_send(bl); + r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; write_lock.Unlock(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index e3d3d5901a5f..4a76a7c1b241 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -53,11 +53,12 @@ class AsyncConnection : public Connection { ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool send=true, bool more=false) { Mutex::Locker l(write_lock); - return _try_send(bl, send, more); + outcoming_bl.claim_append(bl); + return _try_send(send, more); } // if "send" is false, it will only append bl to send buffer // the main usage is avoid error happen outside messenger threads - ssize_t _try_send(bufferlist &bl, bool send=true, bool more=false); + ssize_t _try_send(bool send=true, bool more=false); ssize_t _send(Message *m); void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); ssize_t read_until(unsigned needed, char *p); -- 2.47.3