]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: set MSG_MORE intelligently
authorPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Fri, 5 Feb 2016 14:38:32 +0000 (15:38 +0100)
committerPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Fri, 12 Feb 2016 13:25:45 +0000 (14:25 +0100)
In write_message, set "more" flag according to ack queue, if there are
messages to acknowledge, set it. Unset it otherwise. Same goes for queued
messages.

Signed-off-by: Piotr Dałek <piotr.dalek@ts.fujitsu.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 04e37018fbaf3a5d6304e665c860e30cd1bc1ab1..aafe894552ce81077a596a1fd0517613f1542b4f 100644 (file)
@@ -2061,7 +2061,7 @@ int AsyncConnection::send_message(Message *m)
     if (!can_fast_prepare)
       prepare_send_message(get_features(), m, bl);
     logger->inc(l_msgr_send_messages_inline);
-    if (write_message(m, bl) < 0) {
+    if (write_message(m, bl, false) < 0) {
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       // we want to handle fault within internal thread
       center->dispatch_event_external(write_handler);
@@ -2305,7 +2305,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
   bl.append(m->get_data());
 }
 
-ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl)
+ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
 {
   assert(can_write == CANWRITE);
   m->set_seq(out_seq.inc());
@@ -2386,7 +2386,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl)
   logger->inc(l_msgr_send_bytes, complete_bl.length());
   ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
                              << " " << m << dendl;
-  ssize_t rc = _try_send(complete_bl);
+  ssize_t rc = _try_send(complete_bl, true, more);
   if (rc < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
                               << cpp_strerror(errno) << dendl;
@@ -2455,7 +2455,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
   }
 
   ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
-  _try_send(bl, false, true);
+  _try_send(bl, false);
 }
 
 void AsyncConnection::handle_write()
@@ -2481,7 +2481,7 @@ void AsyncConnection::handle_write()
       if (!data.length())
         prepare_send_message(get_features(), m, data);
 
-      r = write_message(m, data);
+      r = write_message(m, data, _has_next_outgoing());
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
         write_lock.Unlock();
@@ -2499,7 +2499,8 @@ void AsyncConnection::handle_write()
       bl.append((char*)&s, sizeof(s));
       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
       ack_left.sub(left);
-      r = _try_send(bl, true, true);
+      left = ack_left.read();
+      r = _try_send(bl, true, left);
     } else if (is_queued()) {
       r = _try_send(bl);
     }
index c1c7051c1907ce2982ab49c193af81a686862f4f..e3d3d5901a5f95115f0dee70ae1f5d7b6d631fd1 100644 (file)
@@ -74,7 +74,7 @@ class AsyncConnection : public Connection {
   int randomize_out_seq();
   void handle_ack(uint64_t seq);
   void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
-  ssize_t write_message(Message *m, bufferlist& bl);
+  ssize_t write_message(Message *m, bufferlist& bl, bool more);
   ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
                     bufferlist &authorizer_reply) {
     bufferlist reply_bl;
@@ -117,6 +117,10 @@ class AsyncConnection : public Connection {
     }
     return m;
   }
+  bool _has_next_outgoing() {
+    assert(write_lock.is_locked());
+    return !out_q.empty();
+  }
 
  public:
   AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);