From: Piotr Dałek Date: Fri, 5 Feb 2016 14:38:32 +0000 (+0100) Subject: msg/async: set MSG_MORE intelligently X-Git-Tag: v10.1.0~330^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=153f71904f4dd2cff22687f48bbdadfafe221765;p=ceph.git msg/async: set MSG_MORE intelligently In write_message, set "more" flag according to ack queue, if there are messages to acknowledge, set it. Unset it otherwise. Same goes for queued messages. Signed-off-by: Piotr Dałek --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 04e37018fbaf..aafe894552ce 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2061,7 +2061,7 @@ int AsyncConnection::send_message(Message *m) if (!can_fast_prepare) prepare_send_message(get_features(), m, bl); logger->inc(l_msgr_send_messages_inline); - if (write_message(m, bl) < 0) { + if (write_message(m, bl, false) < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; // we want to handle fault within internal thread center->dispatch_event_external(write_handler); @@ -2305,7 +2305,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer bl.append(m->get_data()); } -ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl) +ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) { assert(can_write == CANWRITE); m->set_seq(out_seq.inc()); @@ -2386,7 +2386,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl) logger->inc(l_msgr_send_bytes, complete_bl.length()); ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() << " " << m << dendl; - ssize_t rc = _try_send(complete_bl); + ssize_t rc = _try_send(complete_bl, true, more); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(errno) << dendl; @@ -2455,7 +2455,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) } ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl; - _try_send(bl, false, true); + _try_send(bl, false); } void AsyncConnection::handle_write() @@ -2481,7 +2481,7 @@ void AsyncConnection::handle_write() if (!data.length()) prepare_send_message(get_features(), m, data); - r = write_message(m, data); + r = write_message(m, data, _has_next_outgoing()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; write_lock.Unlock(); @@ -2499,7 +2499,8 @@ void AsyncConnection::handle_write() bl.append((char*)&s, sizeof(s)); ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; ack_left.sub(left); - r = _try_send(bl, true, true); + left = ack_left.read(); + r = _try_send(bl, true, left); } else if (is_queued()) { r = _try_send(bl); } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index c1c7051c1907..e3d3d5901a5f 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -74,7 +74,7 @@ class AsyncConnection : public Connection { int randomize_out_seq(); void handle_ack(uint64_t seq); void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL); - ssize_t write_message(Message *m, bufferlist& bl); + ssize_t write_message(Message *m, bufferlist& bl, bool more); ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, bufferlist &authorizer_reply) { bufferlist reply_bl; @@ -117,6 +117,10 @@ class AsyncConnection : public Connection { } return m; } + bool _has_next_outgoing() { + assert(write_lock.is_locked()); + return !out_q.empty(); + } public: AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);