// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead
const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
+const int ASYNC_COALESCE_THRESHOLD = 256;
class C_time_wakeup : public EventCallback {
AsyncConnectionRef conn;
// return the remaining bytes, it may larger than the length of ptr
// else return < 0 means error
-ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send, bool more)
+ssize_t AsyncConnection::_try_send(bool send, bool more)
{
- ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
- if (send_bl.length()) {
- outcoming_bl.claim_append(send_bl);
- }
-
if (!send)
return 0;
<< "): sig = " << footer.sig << dendl;
}
}
+
+ unsigned original_bl_len = outcoming_bl.length();
- bufferlist complete_bl;
- // send tag
- char tag = CEPH_MSGR_TAG_MSG;
- complete_bl.append(&tag, sizeof(tag));
+ outcoming_bl.append(CEPH_MSGR_TAG_MSG);
if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
- complete_bl.append((char*)&header, sizeof(header));
+ outcoming_bl.append((char*)&header, sizeof(header));
} else {
ceph_msg_header_old oldheader;
memcpy(&oldheader, &header, sizeof(header));
oldheader.reserved = header.reserved;
oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
sizeof(oldheader) - sizeof(oldheader.crc));
- complete_bl.append((char*)&oldheader, sizeof(oldheader));
+ outcoming_bl.append((char*)&oldheader, sizeof(oldheader));
}
ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type
<< " data=" << header.data_len
<< " off " << header.data_off << dendl;
- complete_bl.claim_append(bl);
+ if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
+ std::list<buffer::ptr>::const_iterator pb;
+ for (pb = bl.buffers().begin(); pb != bl.buffers().end(); ++pb) {
+ outcoming_bl.append((char*)pb->c_str(), pb->length());
+ }
+ } else {
+ outcoming_bl.claim_append(bl);
+ }
// send footer; if receiver doesn't support signatures, use the old footer format
ceph_msg_footer_old old_footer;
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
- complete_bl.append((char*)&footer, sizeof(footer));
+ outcoming_bl.append((char*)&footer, sizeof(footer));
} else {
if (msgr->crcflags & MSG_CRC_HEADER) {
old_footer.front_crc = footer.front_crc;
}
old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
old_footer.flags = footer.flags;
- complete_bl.append((char*)&old_footer, sizeof(old_footer));
+ outcoming_bl.append((char*)&old_footer, sizeof(old_footer));
}
- logger->inc(l_msgr_send_bytes, complete_bl.length());
+ logger->inc(l_msgr_send_bytes, outcoming_bl.length() - original_bl_len);
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
- ssize_t rc = _try_send(complete_bl, true, more);
+ ssize_t rc = _try_send(true, more);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(errno) << dendl;
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
assert(write_lock.is_locked());
- bufferlist bl;
utime_t t = ceph_clock_now(async_msgr->cct);
struct ceph_timespec ts;
if (ack) {
assert(tp);
tp->encode_timeval(&ts);
- bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
- bl.append((char*)&ts, sizeof(ts));
+ outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+ outcoming_bl.append((char*)&ts, sizeof(ts));
} else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
struct ceph_timespec ts;
t.encode_timeval(&ts);
- bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
- bl.append((char*)&ts, sizeof(ts));
+ outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE2);
+ outcoming_bl.append((char*)&ts, sizeof(ts));
} else {
- bl.append(CEPH_MSGR_TAG_KEEPALIVE);
+ outcoming_bl.append(CEPH_MSGR_TAG_KEEPALIVE);
}
ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
- _try_send(bl, false);
+ _try_send(false);
}
void AsyncConnection::handle_write()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
- bufferlist bl;
ssize_t r = 0;
write_lock.Lock();
if (left) {
ceph_le64 s;
s = in_seq.read();
- bl.append(CEPH_MSGR_TAG_ACK);
- bl.append((char*)&s, sizeof(s));
+ outcoming_bl.append(CEPH_MSGR_TAG_ACK);
+ outcoming_bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
ack_left.sub(left);
left = ack_left.read();
- r = _try_send(bl, true, left);
+ r = _try_send(true, left);
} else if (is_queued()) {
- r = _try_send(bl);
+ r = _try_send();
}
write_lock.Unlock();
<< " policy.server is false" << dendl;
_connect();
} else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) {
- r = _try_send(bl);
+ r = _try_send();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
write_lock.Unlock();