assert(!open_write);
open_write = true;
- MessageRef front_msg;
- if (likely(num_msgs)) {
- front_msg = conn.out_q.front();
- }
+ conn.pending_q.clear();
+ conn.pending_q.swap(conn.out_q);
// sweep all pending writes with the concrete Protocol
return socket->write(do_sweep_messages(
- conn.out_q, num_msgs, need_keepalive, keepalive_ack))
- .then([this, front_msg, num_msgs, prv_keepalive_ack=keepalive_ack] {
+ conn.pending_q, num_msgs, need_keepalive, keepalive_ack))
+ .then([this, prv_keepalive_ack=keepalive_ack] {
need_keepalive = false;
if (keepalive_ack == prv_keepalive_ack) {
keepalive_ack = std::nullopt;
}
- if (likely(num_msgs && front_msg == conn.out_q.front())) {
- // we have sent some messages successfully
- // and the out_q was not reset during socket write
- conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
- }
if (!is_queued()) {
// good, we have nothing pending to send now.
return socket->flush().then([this] {
// messages to be resent after connection gets reset
std::deque<MessageRef> out_q;
+ std::deque<MessageRef> pending_q;
// messages sent, but not yet acked by peer
std::deque<MessageRef> sent;