case write_state_t::open: {
size_t num_msgs = conn.out_q.size();
// we must have something to write...
- ceph_assert(num_msgs || need_keepalive || keepalive_ack.has_value());
+ ceph_assert(is_queued());
MessageRef front_msg;
if (likely(num_msgs)) {
front_msg = conn.out_q.front();
// and the out_q was not reset during socket write
conn.out_q.erase(conn.out_q.begin(), conn.out_q.begin()+num_msgs);
}
- if (conn.out_q.empty() && !keepalive_ack.has_value()) {
+ if (!is_queued()) {
// good, we have nothing pending to send now.
return socket->flush().then([this] {
- if (conn.out_q.empty() && !need_keepalive && !keepalive_ack.has_value()) {
+ if (!is_queued()) {
// still nothing pending to send after flush,
// the dispatching can ONLY stop now
ceph_assert(write_dispatching);
void notify_keepalive_ack(utime_t keepalive_ack);
+ bool is_queued() const {
+ return (!conn.out_q.empty() ||
+ need_keepalive ||
+ keepalive_ack.has_value());
+ }
+
private:
write_state_t write_state = write_state_t::none;
// wait until current state changed