// return the remaining bytes, it may larger than the length of ptr
// else return < 0 means error
-ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send)
+ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send, bool more)
{
ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
if (send_bl.length()) {
size--;
}
- ssize_t r = do_sendmsg(msg, msglen, left_pbrs);
+ ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more);
if (r < 0)
return r;
}
ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
- _try_send(bl, false);
+ _try_send(bl, false, true);
}
void AsyncConnection::handle_write()
bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
ack_left.sub(left);
- r = _try_send(bl);
+ r = _try_send(bl, true, true);
} else if (is_queued()) {
r = _try_send(bl);
}
void suppress_sigpipe();
void restore_sigpipe();
ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
- ssize_t try_send(bufferlist &bl, bool send=true) {
+ ssize_t try_send(bufferlist &bl, bool send=true, bool more=false) {
Mutex::Locker l(write_lock);
- return _try_send(bl, send);
+ return _try_send(bl, send, more);
}
// if "send" is false, it will only append bl to send buffer
// the main usage is avoid error happen outside messenger threads
- ssize_t _try_send(bufferlist &bl, bool send=true);
+ ssize_t _try_send(bufferlist &bl, bool send=true, bool more=false);
ssize_t _send(Message *m);
void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
ssize_t read_until(unsigned needed, char *p);