in_seq = message->get_seq();
ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
<< " " << message << " " << *message << dendl;
+
+ // if send_message always successfully send, it may have no
+ // opportunity to send seq ack. 10 is a experience value.
+ if (in_seq > in_seq_acked + 10) {
+ center->create_time_event(2, write_handler);
+ }
+
state = STATE_OPEN;
async_msgr->ms_fast_preprocess(message);
return ;
}
- shutdown_socket();
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ if (sd >= 0) {
+ shutdown_socket();
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ }
open_write = false;
// requeue sent items
outcoming_bl.clear();
if (policy.lossy)
was_session_reset();
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ if (sd >= 0)
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
open_write = false;
state = STATE_CLOSED;
}
bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl;
in_seq_acked = s;
- _try_send(bl, false);
+ _try_send(bl);
}
} else if (state != STATE_CONNECTING) {
r = _try_send(bl);