]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: avoid unnecessary costly wakeups for outbound messages 29141/head
authorJason Dillaman <dillaman@redhat.com>
Tue, 4 Jun 2019 17:48:57 +0000 (13:48 -0400)
committerSage Weil <sage@redhat.com>
Fri, 19 Jul 2019 18:05:13 +0000 (13:05 -0500)
If a wakeup for an outbound message has already been scheduled or is
currently executing within the worker thread, avoid re-adding a wakeup.
For small IO sizes under high queue depths, these extra syscalls start
to add up. For larger IO sizes or small queue depths, it doesn't hurt
performance.

fio --ioengine=rbd results:

IOPS pre-change post-change
4K: 84.9k 98.3k
32K: 58.4k 59.5k
256K: 12.1k 12.2k
4M: 803 802

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 294c41f18adada6ab762d5b3f9f45d7c8bed4316)

src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV1.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 714eff74df8a775abd0dd03fb7b093a3a9271290..cec0f0dae083f55d960013d88b09c14cbfff8318 100644 (file)
@@ -246,7 +246,8 @@ void ProtocolV1::send_message(Message *m) {
     out_q[m->get_priority()].emplace_back(std::move(bl), m);
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if (can_write != WriteStatus::REPLACING) {
+    if (can_write != WriteStatus::REPLACING && !write_in_progress) {
+      write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -352,6 +353,7 @@ void ProtocolV1::write_event() {
       } else if (r > 0)
         break;
     } while (can_write == WriteStatus::CANWRITE);
+    write_in_progress = false;
     connection->write_lock.unlock();
 
     // if r > 0 mean data still lefted, so no need _try_send.
@@ -382,6 +384,7 @@ void ProtocolV1::write_event() {
       return;
     }
   } else {
+    write_in_progress = false;
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
@@ -1175,6 +1178,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
 }
 
 void ProtocolV1::requeue_sent() {
+  write_in_progress = false;
   if (sent.empty()) {
     return;
   }
@@ -1234,6 +1238,7 @@ void ProtocolV1::discard_out_queue() {
     }
   }
   out_q.clear();
+  write_in_progress = false;
 }
 
 void ProtocolV1::reset_recv_state() {
@@ -2260,6 +2265,7 @@ CtPtr ProtocolV1::replace(AsyncConnectionRef existing,
         << __func__ << " stop myself to swap existing" << dendl;
     exproto->can_write = WriteStatus::REPLACING;
     exproto->replacing = true;
+    exproto->write_in_progress = false;
     existing->state_offset = 0;
     // avoid previous thread modify event
     exproto->state = NONE;
index 6248a6f2e199283095e56f7b04c9605d8bf616a8..b332b5ff7bf47d64374c900efa1bf5389df7d554 100644 (file)
@@ -108,6 +108,7 @@ protected:
   // priority queue for outbound msgs
   std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
   bool keepalive;
+  bool write_in_progress = false;
 
   __u32 connect_seq, peer_global_seq;
   std::atomic<uint64_t> in_seq{0};
index 15c786ce7544393b315a904c05c1e4e0281aec28..ed2c83cbe1d7fb4ee69d53aad991dc2c3567dc30 100644 (file)
@@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() {
     }
   }
   out_queue.clear();
+  write_in_progress = false;
 }
 
 void ProtocolV2::reset_session() {
@@ -181,6 +182,7 @@ void ProtocolV2::stop() {
 void ProtocolV2::fault() { _fault(); }
 
 void ProtocolV2::requeue_sent() {
+  write_in_progress = false;
   if (sent.empty()) {
     return;
   }
@@ -428,7 +430,8 @@ void ProtocolV2::send_message(Message *m) {
       out_queue_entry_t{is_prepared, m});
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if ((!replacing && can_write) || state == STANDBY) {
+    if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
+      write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -622,6 +625,7 @@ void ProtocolV2::write_event() {
       } else if (r > 0)
         break;
     } while (can_write);
+    write_in_progress = false;
 
     // if r > 0 mean data still lefted, so no need _try_send.
     if (r == 0) {
@@ -652,6 +656,7 @@ void ProtocolV2::write_event() {
       return;
     }
   } else {
+    write_in_progress = false;
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
@@ -2670,6 +2675,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   connection->dispatch_queue->queue_reset(connection);
 
   exproto->can_write = false;
+  exproto->write_in_progress = false;
   exproto->reconnecting = reconnecting;
   exproto->replacing = true;
   std::swap(exproto->session_stream_handlers, session_stream_handlers);
index 9ea40de8459e4391ca48c8e7d3339974c2cf6b11..69bff1b90dd4986ea46d55094e619d5ced2d0b71 100644 (file)
@@ -113,6 +113,7 @@ private:
   } pre_auth;
 
   bool keepalive;
+  bool write_in_progress = false;
 
   ostream &_conn_prefix(std::ostream *_dout);
   void run_continuation(Ct<ProtocolV2> *pcontinuation);