{
ssize_t r = 0;
int prev_state = state;
- bool already_dispatch_writer = false;
+ bool need_dispatch_writer = false;
std::lock_guard<std::mutex> l(lock);
last_active = ceph::coarse_mono_clock::now();
do {
<< " " << *message << dendl;
ack_left.inc();
- // if send_message always send inline, it may have no
- // opportunity to send seq ack.
- if (!already_dispatch_writer) {
- center->dispatch_event_external(write_handler);
- already_dispatch_writer = true;
- }
-
+ need_dispatch_writer = true;
state = STATE_OPEN;
logger->inc(l_msgr_recv_messages);
}
} while (prev_state != state);
+ if (need_dispatch_writer && is_connected())
+ center->dispatch_event_external(write_handler);
return;
fail: