Gather buffers from pending messages/keepalive and send them together.
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
seastar::future<> Protocol::send(MessageRef msg)
{
if (write_state != write_state_t::drop) {
- conn.out_q.push(std::move(msg));
+ conn.out_q.push_back(std::move(msg));
write_event();
}
return seastar::now();
seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
{
switch (write_state) {
- case write_state_t::open:
- return seastar::futurize_apply([this] {
- if (need_keepalive) {
- return do_keepalive()
- .then([this] { need_keepalive = false; });
- }
- return seastar::now();
- }).then([this] {
- if (need_keepalive_ack) {
- return do_keepalive_ack()
- .then([this] { need_keepalive_ack = false; });
+ case write_state_t::open: {
+ size_t num_msgs = conn.out_q.size();
+ // we must have something to write...
+ ceph_assert(num_msgs || need_keepalive || need_keepalive_ack);
+ Message* msg_ptr = nullptr;
+ if (likely(num_msgs)) {
+ msg_ptr = conn.out_q.front().get();
+ }
+ // sweep all pending writes with the concrete Protocol
+ return socket->write(do_sweep_messages(
+ conn.out_q, num_msgs, need_keepalive, need_keepalive_ack))
+ .then([this, msg_ptr, num_msgs] {
+ need_keepalive = false;
+ need_keepalive_ack = false;
+ if (likely(num_msgs && msg_ptr == conn.out_q.front().get())) {
+ // we have sent some messages successfully
+ // and the out_q was not reset during socket write
+ conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
}
- return seastar::now();
- }).then([this] {
- if (!conn.out_q.empty()){
- MessageRef msg = conn.out_q.front();
- return write_message(msg)
- .then([this, msg] {
- if (msg == conn.out_q.front()) {
- conn.out_q.pop();
- }
- return stop_t::no;
- });
- } else {
+ if (conn.out_q.empty()) {
+ // good, we have nothing pending to send now.
return socket->flush().then([this] {
- if (!conn.out_q.empty()) {
- return stop_t::no;
- } else {
- // the dispatching can only stop when out_q is empty
+ if (conn.out_q.empty() && !need_keepalive && !need_keepalive_ack) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
ceph_assert(write_dispatching);
write_dispatching = false;
- return stop_t::yes;
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ return seastar::make_ready_future<stop_t>(stop_t::no);
}
});
+ } else {
+ // messages were enqueued during socket write
+ return seastar::make_ready_future<stop_t>(stop_t::no);
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
close();
- return stop_t::no;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
});
+ }
case write_state_t::delay: {
// delay dispatching writes until open
return state_changed.get_shared_future()
virtual void trigger_close() = 0;
- // encode/write a message
- virtual seastar::future<> write_message(MessageRef msg) = 0;
-
- virtual seastar::future<> do_keepalive() = 0;
-
- virtual seastar::future<> do_keepalive_ack() = 0;
+ virtual ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ bool require_keepalive_ack) = 0;
public:
const proto_t proto_type;
}
}
-void discard_up_to(std::queue<MessageRef>* queue,
+void discard_up_to(std::deque<MessageRef>* queue,
ceph::net::seq_num_t seq)
{
while (!queue->empty() &&
queue->front()->get_seq() < seq) {
- queue->pop();
+ queue->pop_front();
}
}
// open state
-seastar::future<> ProtocolV1::write_message(MessageRef msg)
+ceph::bufferlist ProtocolV1::do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ bool require_keepalive_ack)
{
- msg->set_seq(++conn.out_seq);
- auto& header = msg->get_header();
- header.src = messenger.get_myname();
- msg->encode(conn.features, messenger.get_crc_flags());
- if (session_security) {
- session_security->sign_message(msg.get());
- }
- bufferlist bl;
- bl.append(CEPH_MSGR_TAG_MSG);
- bl.append((const char*)&header, sizeof(header));
- bl.append(msg->get_payload());
- bl.append(msg->get_middle());
- bl.append(msg->get_data());
- auto& footer = msg->get_footer();
- if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
- bl.append((const char*)&footer, sizeof(footer));
- } else {
- ceph_msg_footer_old old_footer;
- if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
- old_footer.front_crc = footer.front_crc;
- old_footer.middle_crc = footer.middle_crc;
- } else {
- old_footer.front_crc = old_footer.middle_crc = 0;
- }
- if (messenger.get_crc_flags() & MSG_CRC_DATA) {
- old_footer.data_crc = footer.data_crc;
+ static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) +
+ sizeof(ceph_msg_header) +
+ sizeof(ceph_msg_footer);
+ static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) +
+ sizeof(ceph_msg_header) +
+ sizeof(ceph_msg_footer_old);
+
+ ceph::bufferlist bl;
+ if (likely(num_msgs)) {
+ if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+ bl.reserve(num_msgs * RESERVE_MSG_SIZE);
} else {
- old_footer.data_crc = 0;
+ bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD);
}
- old_footer.flags = footer.flags;
- bl.append((const char*)&old_footer, sizeof(old_footer));
}
- // write as a seastar::net::packet
- return socket->write(std::move(bl));
- // TODO: lossless policy
- // .then([this, msg = std::move(msg)] {
- // if (!policy.lossy) {
- // sent.push(std::move(msg));
- // }
- // });
-}
-seastar::future<> ProtocolV1::do_keepalive()
-{
- k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
- ceph::coarse_real_clock::now());
- logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
- return socket->write(make_static_packet(k.req));
-}
+ if (unlikely(require_keepalive)) {
+ k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+ ceph::coarse_real_clock::now());
+ logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec);
+ bl.append(create_static(k.req));
+ }
-seastar::future<> ProtocolV1::do_keepalive_ack()
-{
- logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
- return socket->write(make_static_packet(k.ack));
+ if (unlikely(require_keepalive_ack)) {
+ logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec);
+ bl.append(create_static(k.ack));
+ }
+
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ msg->set_seq(++conn.out_seq);
+ auto& header = msg->get_header();
+ header.src = messenger.get_myname();
+ msg->encode(conn.features, messenger.get_crc_flags());
+ if (session_security) {
+ session_security->sign_message(msg.get());
+ }
+ bl.append(CEPH_MSGR_TAG_MSG);
+ bl.append((const char*)&header, sizeof(header));
+ bl.append(msg->get_payload());
+ bl.append(msg->get_middle());
+ bl.append(msg->get_data());
+ auto& footer = msg->get_footer();
+ if (HAVE_FEATURE(conn.features, MSG_AUTH)) {
+ bl.append((const char*)&footer, sizeof(footer));
+ } else {
+ ceph_msg_footer_old old_footer;
+ if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
+ old_footer.front_crc = footer.front_crc;
+ old_footer.middle_crc = footer.middle_crc;
+ } else {
+ old_footer.front_crc = old_footer.middle_crc = 0;
+ }
+ if (messenger.get_crc_flags() & MSG_CRC_DATA) {
+ old_footer.data_crc = footer.data_crc;
+ } else {
+ old_footer.data_crc = 0;
+ }
+ old_footer.flags = footer.flags;
+ bl.append((const char*)&old_footer, sizeof(old_footer));
+ }
+ });
+
+ return bl;
}
seastar::future<> ProtocolV1::handle_keepalive2_ack()
void trigger_close() override;
- seastar::future<> write_message(MessageRef msg) override;
-
- seastar::future<> do_keepalive() override;
-
- seastar::future<> do_keepalive_ack() override;
+ ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ bool require_keepalive_ack) override;
private:
SocketMessenger &messenger;
// READY state
-seastar::future<> ProtocolV2::write_message(MessageRef msg)
+ceph::bufferlist ProtocolV2::do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ bool require_keepalive_ack)
{
- // TODO: move to common code
- // set priority
- msg->get_header().src = messenger.get_myname();
-
- msg->encode(conn.features, 0);
-
- msg->set_seq(++conn.out_seq);
- uint64_t ack_seq = conn.in_seq;
- // ack_left = 0;
-
- ceph_msg_header &header = msg->get_header();
- ceph_msg_footer &footer = msg->get_footer();
-
- ceph_msg_header2 header2{header.seq, header.tid,
- header.type, header.priority,
- header.version,
- 0, header.data_off,
- ack_seq,
- footer.flags, header.compat_version,
- header.reserved};
-
- auto message = MessageFrame::Encode(header2,
- msg->get_payload(), msg->get_middle(), msg->get_data());
- logger().debug("{} write msg type={} off={} seq={}",
- conn, header2.type, header2.data_off, header2.seq);
- return write_frame(message, false);
-}
+ ceph::bufferlist bl;
-seastar::future<> ProtocolV2::do_keepalive()
-{
- auto keepalive_frame = KeepAliveFrame::Encode();
- return write_frame(keepalive_frame, false);
-}
+ if (unlikely(require_keepalive)) {
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+ }
-seastar::future<> ProtocolV2::do_keepalive_ack()
-{
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
- return write_frame(keepalive_ack_frame, false);
+ if (unlikely(require_keepalive_ack)) {
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(last_keepalive_ack_to_send);
+ bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ }
+
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ // TODO: move to common code
+ // set priority
+ msg->get_header().src = messenger.get_myname();
+
+ msg->encode(conn.features, 0);
+
+ msg->set_seq(++conn.out_seq);
+ uint64_t ack_seq = conn.in_seq;
+ // ack_left = 0;
+
+ ceph_msg_header &header = msg->get_header();
+ ceph_msg_footer &footer = msg->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ 0, header.data_off,
+ ack_seq,
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto message = MessageFrame::Encode(header2,
+ msg->get_payload(), msg->get_middle(), msg->get_data());
+ logger().debug("{} write msg type={} off={} seq={}",
+ conn, header2.type, header2.data_off, header2.seq);
+ bl.append(message.get_buffer(session_stream_handlers));
+ });
+
+ return bl;
}
void ProtocolV2::handle_message_ack(seq_num_t seq) {
void trigger_close() override;
- seastar::future<> write_message(MessageRef msg) override;
-
- seastar::future<> do_keepalive() override;
-
- seastar::future<> do_keepalive_ack() override;
+ ceph::bufferlist do_sweep_messages(
+ const std::deque<MessageRef>& msgs,
+ size_t num_msgs,
+ bool require_keepalive,
+ bool require_keepalive_ack) override;
private:
SocketMessenger &messenger;
out_seq -= sent.size();
while (!sent.empty()) {
auto m = sent.front();
- sent.pop();
- out_q.push(std::move(m));
+ sent.pop_front();
+ out_q.push_back(std::move(m));
}
}
bool update_rx_seq(seq_num_t seq);
// messages to be resent after connection gets reset
- std::queue<MessageRef> out_q;
+ std::deque<MessageRef> out_q;
// messages sent, but not yet acked by peer
- std::queue<MessageRef> sent;
+ std::deque<MessageRef> sent;
// which of the peer_addrs we're connecting to (as client)
// or should reconnect to (as peer)
/// move all messages in the sent list back into the queue
void requeue_sent();
- std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() {
+ std::tuple<seq_num_t, std::deque<MessageRef>> get_out_queue() {
return {out_seq, std::move(out_q)};
}