if (!can_fast_prepare)
prepare_send_message(get_features(), m, bl);
logger->inc(l_msgr_send_messages_inline);
- if (write_message(m, bl) < 0) {
+ if (write_message(m, bl, false) < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
// we want to handle fault within internal thread
center->dispatch_event_external(write_handler);
bl.append(m->get_data());
}
-ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl)
+ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
{
assert(can_write == CANWRITE);
m->set_seq(out_seq.inc());
logger->inc(l_msgr_send_bytes, complete_bl.length());
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
- ssize_t rc = _try_send(complete_bl);
+ ssize_t rc = _try_send(complete_bl, true, more);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(errno) << dendl;
}
ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
- _try_send(bl, false, true);
+ _try_send(bl, false);
}
void AsyncConnection::handle_write()
if (!data.length())
prepare_send_message(get_features(), m, data);
- r = write_message(m, data);
+ r = write_message(m, data, _has_next_outgoing());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
write_lock.Unlock();
bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
ack_left.sub(left);
- r = _try_send(bl, true, true);
+ left = ack_left.read();
+ r = _try_send(bl, true, left);
} else if (is_queued()) {
r = _try_send(bl);
}
int randomize_out_seq();
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
- ssize_t write_message(Message *m, bufferlist& bl);
+ ssize_t write_message(Message *m, bufferlist& bl, bool more);
ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply) {
bufferlist reply_bl;
}
return m;
}
+ bool _has_next_outgoing() {
+ assert(write_lock.is_locked());
+ return !out_q.empty();
+ }
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);