});
}
+ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack)
+{
+ ceph::bufferlist bl = do_sweep_messages(conn.out_q,
+ num_msgs,
+ require_keepalive,
+ keepalive_ack,
+ require_ack);
+ if (!conn.policy.lossy) {
+ conn.sent.insert(conn.sent.end(),
+ conn.out_q.begin(),
+ conn.out_q.end());
+ }
+ conn.out_q.clear();
+ return bl;
+}
+
seastar::future<> Protocol::send(MessageRef msg)
{
if (write_state != write_state_t::drop) {
if (unlikely(!still_queued)) {
return try_exit_sweep();
}
- conn.pending_q.clear();
- conn.pending_q.swap(conn.out_q);
- if (!conn.policy.lossy) {
- conn.sent.insert(conn.sent.end(),
- conn.pending_q.begin(),
- conn.pending_q.end());
- }
auto acked = ack_left;
assert(acked == 0 || conn.in_seq > 0);
// sweep all pending writes with the concrete Protocol
- return socket->write(do_sweep_messages(
- conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+ return socket->write(sweep_messages_and_move_to_sent(
+ num_msgs, need_keepalive, keepalive_ack, acked > 0)
).then([this, prv_keepalive_ack=keepalive_ack, acked] {
need_keepalive = false;
if (keepalive_ack == prv_keepalive_ack) {
virtual void notify_write() {};
virtual void on_closed() {}
+
+ private:
+ ceph::bufferlist sweep_messages_and_move_to_sent(
+ size_t num_msgs,
+ bool require_keepalive,
+ std::optional<utime_t> keepalive_ack,
+ bool require_ack);
public:
const proto_t proto_type;