From: Yingxin Cheng Date: Thu, 25 Apr 2019 19:50:00 +0000 (+0800) Subject: crimson/net: gather message buffers when send X-Git-Tag: v15.1.0~2798^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b5e9c8906571fb73a66f8f40b7a19fda91ae264b;p=ceph-ci.git crimson/net: gather message buffers when send Gather buffers from pending messages/keepalive and send them together. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 1248bbc9335..3e8538fa06e 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -72,7 +72,7 @@ seastar::future<> Protocol::close() seastar::future<> Protocol::send(MessageRef msg) { if (write_state != write_state_t::drop) { - conn.out_q.push(std::move(msg)); + conn.out_q.push_back(std::move(msg)); write_event(); } return seastar::now(); @@ -98,46 +98,49 @@ void Protocol::notify_keepalive_ack() seastar::future Protocol::do_write_dispatch_sweep() { switch (write_state) { - case write_state_t::open: - return seastar::futurize_apply([this] { - if (need_keepalive) { - return do_keepalive() - .then([this] { need_keepalive = false; }); - } - return seastar::now(); - }).then([this] { - if (need_keepalive_ack) { - return do_keepalive_ack() - .then([this] { need_keepalive_ack = false; }); + case write_state_t::open: { + size_t num_msgs = conn.out_q.size(); + // we must have something to write... + ceph_assert(num_msgs || need_keepalive || need_keepalive_ack); + Message* msg_ptr = nullptr; + if (likely(num_msgs)) { + msg_ptr = conn.out_q.front().get(); + } + // sweep all pending writes with the concrete Protocol + return socket->write(do_sweep_messages( + conn.out_q, num_msgs, need_keepalive, need_keepalive_ack)) + .then([this, msg_ptr, num_msgs] { + need_keepalive = false; + need_keepalive_ack = false; + if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) { + // we have sent some messages successfully + // and the out_q was not reset during socket write + conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs); } - return seastar::now(); - }).then([this] { - if (!conn.out_q.empty()){ - MessageRef msg = conn.out_q.front(); - return write_message(msg) - .then([this, msg] { - if (msg == conn.out_q.front()) { - conn.out_q.pop(); - } - return stop_t::no; - }); - } else { + if (conn.out_q.empty()) { + // good, we have nothing pending to send now. return socket->flush().then([this] { - if (!conn.out_q.empty()) { - return stop_t::no; - } else { - // the dispatching can only stop when out_q is empty + if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) { + // still nothing pending to send after flush, + // the dispatching can ONLY stop now ceph_assert(write_dispatching); write_dispatching = false; - return stop_t::yes; + return seastar::make_ready_future(stop_t::yes); + } else { + // something is pending to send during flushing + return seastar::make_ready_future(stop_t::no); } }); + } else { + // messages were enqueued during socket write + return seastar::make_ready_future(stop_t::no); } }).handle_exception([this] (std::exception_ptr eptr) { logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr); close(); - return stop_t::no; + return seastar::make_ready_future(stop_t::no); }); + } case write_state_t::delay: { // delay dispatching writes until open return state_changed.get_shared_future() diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 73c43b6f695..78ffcb6f0df 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -44,12 +44,11 @@ class Protocol { virtual void trigger_close() = 0; - // encode/write a message - virtual seastar::future<> write_message(MessageRef msg) = 0; - - virtual seastar::future<> do_keepalive() = 0; - - virtual seastar::future<> do_keepalive_ack() = 0; + virtual ceph::bufferlist do_sweep_messages( + const std::deque& msgs, + size_t num_msgs, + bool require_keepalive, + bool require_keepalive_ack) = 0; public: const proto_t proto_type; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 408f7f0d0fe..5225de1f0fa 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -129,12 +129,12 @@ uint32_t get_proto_version(entity_type_t peer_type, bool connect) } } -void discard_up_to(std::queue* queue, +void discard_up_to(std::deque* queue, ceph::net::seq_num_t seq) { while (!queue->empty() && queue->front()->get_seq() < seq) { - queue->pop(); + queue->pop_front(); } } @@ -651,62 +651,75 @@ void ProtocolV1::start_accept(SocketFRef&& sock, // open state -seastar::future<> ProtocolV1::write_message(MessageRef msg) +ceph::bufferlist ProtocolV1::do_sweep_messages( + const std::deque& msgs, + size_t num_msgs, + bool require_keepalive, + bool require_keepalive_ack) { - msg->set_seq(++conn.out_seq); - auto& header = msg->get_header(); - header.src = messenger.get_myname(); - msg->encode(conn.features, messenger.get_crc_flags()); - if (session_security) { - session_security->sign_message(msg.get()); - } - bufferlist bl; - bl.append(CEPH_MSGR_TAG_MSG); - bl.append((const char*)&header, sizeof(header)); - bl.append(msg->get_payload()); - bl.append(msg->get_middle()); - bl.append(msg->get_data()); - auto& footer = msg->get_footer(); - if (HAVE_FEATURE(conn.features, MSG_AUTH)) { - bl.append((const char*)&footer, sizeof(footer)); - } else { - ceph_msg_footer_old old_footer; - if (messenger.get_crc_flags() & MSG_CRC_HEADER) { - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - } else { - old_footer.front_crc = old_footer.middle_crc = 0; - } - if (messenger.get_crc_flags() & MSG_CRC_DATA) { - old_footer.data_crc = footer.data_crc; + static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer); + static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) + + sizeof(ceph_msg_header) + + sizeof(ceph_msg_footer_old); + + ceph::bufferlist bl; + if (likely(num_msgs)) { + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.reserve(num_msgs * RESERVE_MSG_SIZE); } else { - old_footer.data_crc = 0; + bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD); } - old_footer.flags = footer.flags; - bl.append((const char*)&old_footer, sizeof(old_footer)); } - // write as a seastar::net::packet - return socket->write(std::move(bl)); - // TODO: lossless policy - // .then([this, msg = std::move(msg)] { - // if (!policy.lossy) { - // sent.push(std::move(msg)); - // } - // }); -} -seastar::future<> ProtocolV1::do_keepalive() -{ - k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( - ceph::coarse_real_clock::now()); - logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); - return socket->write(make_static_packet(k.req)); -} + if (unlikely(require_keepalive)) { + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); + bl.append(create_static(k.req)); + } -seastar::future<> ProtocolV1::do_keepalive_ack() -{ - logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec); - return socket->write(make_static_packet(k.ack)); + if (unlikely(require_keepalive_ack)) { + logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec); + bl.append(create_static(k.ack)); + } + + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + msg->set_seq(++conn.out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(conn.features, messenger.get_crc_flags()); + if (session_security) { + session_security->sign_message(msg.get()); + } + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + }); + + return bl; } seastar::future<> ProtocolV1::handle_keepalive2_ack() diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index ac65c8f9957..5f6a75da781 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -26,11 +26,11 @@ class ProtocolV1 final : public Protocol { void trigger_close() override; - seastar::future<> write_message(MessageRef msg) override; - - seastar::future<> do_keepalive() override; - - seastar::future<> do_keepalive_ack() override; + ceph::bufferlist do_sweep_messages( + const std::deque& msgs, + size_t num_msgs, + bool require_keepalive, + bool require_keepalive_ack) override; private: SocketMessenger &messenger; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index e127a8a23c7..877a07b6471 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1342,46 +1342,54 @@ seastar::future<> ProtocolV2::send_reconnect_ok() // READY state -seastar::future<> ProtocolV2::write_message(MessageRef msg) +ceph::bufferlist ProtocolV2::do_sweep_messages( + const std::deque& msgs, + size_t num_msgs, + bool require_keepalive, + bool require_keepalive_ack) { - // TODO: move to common code - // set priority - msg->get_header().src = messenger.get_myname(); - - msg->encode(conn.features, 0); - - msg->set_seq(++conn.out_seq); - uint64_t ack_seq = conn.in_seq; - // ack_left = 0; - - ceph_msg_header &header = msg->get_header(); - ceph_msg_footer &footer = msg->get_footer(); - - ceph_msg_header2 header2{header.seq, header.tid, - header.type, header.priority, - header.version, - 0, header.data_off, - ack_seq, - footer.flags, header.compat_version, - header.reserved}; - - auto message = MessageFrame::Encode(header2, - msg->get_payload(), msg->get_middle(), msg->get_data()); - logger().debug("{} write msg type={} off={} seq={}", - conn, header2.type, header2.data_off, header2.seq); - return write_frame(message, false); -} + ceph::bufferlist bl; -seastar::future<> ProtocolV2::do_keepalive() -{ - auto keepalive_frame = KeepAliveFrame::Encode(); - return write_frame(keepalive_frame, false); -} + if (unlikely(require_keepalive)) { + auto keepalive_frame = KeepAliveFrame::Encode(); + bl.append(keepalive_frame.get_buffer(session_stream_handlers)); + } -seastar::future<> ProtocolV2::do_keepalive_ack() -{ - auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send); - return write_frame(keepalive_ack_frame, false); + if (unlikely(require_keepalive_ack)) { + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send); + bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); + } + + std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { + // TODO: move to common code + // set priority + msg->get_header().src = messenger.get_myname(); + + msg->encode(conn.features, 0); + + msg->set_seq(++conn.out_seq); + uint64_t ack_seq = conn.in_seq; + // ack_left = 0; + + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + 0, header.data_off, + ack_seq, + footer.flags, header.compat_version, + header.reserved}; + + auto message = MessageFrame::Encode(header2, + msg->get_payload(), msg->get_middle(), msg->get_data()); + logger().debug("{} write msg type={} off={} seq={}", + conn, header2.type, header2.data_off, header2.seq); + bl.append(message.get_buffer(session_stream_handlers)); + }); + + return bl; } void ProtocolV2::handle_message_ack(seq_num_t seq) { diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index e176dc343fa..eb1bc65005d 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -25,11 +25,11 @@ class ProtocolV2 final : public Protocol { void trigger_close() override; - seastar::future<> write_message(MessageRef msg) override; - - seastar::future<> do_keepalive() override; - - seastar::future<> do_keepalive_ack() override; + ceph::bufferlist do_sweep_messages( + const std::deque& msgs, + size_t num_msgs, + bool require_keepalive, + bool require_keepalive_ack) override; private: SocketMessenger &messenger; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index dc35c66a05e..a8f5cac1cb3 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -81,8 +81,8 @@ void SocketConnection::requeue_sent() out_seq -= sent.size(); while (!sent.empty()) { auto m = sent.front(); - sent.pop(); - out_q.push(std::move(m)); + sent.pop_front(); + out_q.push_back(std::move(m)); } } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index e24c2633e1f..482cbd723d1 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -58,9 +58,9 @@ class SocketConnection : public Connection { bool update_rx_seq(seq_num_t seq); // messages to be resent after connection gets reset - std::queue out_q; + std::deque out_q; // messages sent, but not yet acked by peer - std::queue sent; + std::deque sent; // which of the peer_addrs we're connecting to (as client) // or should reconnect to (as peer) @@ -115,7 +115,7 @@ class SocketConnection : public Connection { /// move all messages in the sent list back into the queue void requeue_sent(); - std::tuple> get_out_queue() { + std::tuple> get_out_queue() { return {out_seq, std::move(out_q)}; }