]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: avoid unnecessary costly wakeups for outbound messages 28388/head
authorJason Dillaman <dillaman@redhat.com>
Tue, 4 Jun 2019 17:48:57 +0000 (13:48 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 4 Jun 2019 19:01:16 +0000 (15:01 -0400)
If a wakeup for an outbound message has already been scheduled or is
currently executing within the worker thread, avoid re-adding a wakeup.
For small IO sizes under high queue depths, these extra syscalls start
to add up. For larger IO sizes or small queue depths, it doesn't hurt
performance.

fio --ioengine=rbd results:

IOPS pre-change post-change
4K: 84.9k 98.3k
32K: 58.4k 59.5k
256K: 12.1k 12.2k
4M: 803 802

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV1.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index b9bfaec369cccaa02caa2c4f4dcafb0e6f4382ed..b410a2e6b79e9da17d80668503cf8065dce4204b 100644 (file)
@@ -240,7 +240,8 @@ void ProtocolV1::send_message(Message *m) {
     out_q[m->get_priority()].emplace_back(std::move(bl), m);
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if (can_write != WriteStatus::REPLACING) {
+    if (can_write != WriteStatus::REPLACING && !write_in_progress) {
+      write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -348,6 +349,7 @@ void ProtocolV1::write_event() {
       } else if (r > 0)
         break;
     } while (can_write == WriteStatus::CANWRITE);
+    write_in_progress = false;
     connection->write_lock.unlock();
 
     // if r > 0 mean data still lefted, so no need _try_send.
@@ -378,6 +380,7 @@ void ProtocolV1::write_event() {
       return;
     }
   } else {
+    write_in_progress = false;
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
@@ -1174,6 +1177,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) {
 }
 
 void ProtocolV1::requeue_sent() {
+  write_in_progress = false;
   if (sent.empty()) {
     return;
   }
@@ -1233,6 +1237,7 @@ void ProtocolV1::discard_out_queue() {
     }
   }
   out_q.clear();
+  write_in_progress = false;
 }
 
 void ProtocolV1::reset_recv_state()
@@ -2305,6 +2310,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing,
         << __func__ << " stop myself to swap existing" << dendl;
     exproto->can_write = WriteStatus::REPLACING;
     exproto->replacing = true;
+    exproto->write_in_progress = false;
     existing->state_offset = 0;
     // avoid previous thread modify event
     exproto->state = NONE;
index 43256543cde322cbd2392e455a6ae72e996af228..72b707fe2a6cc1dc11431dfe8909a8aca452103b 100644 (file)
@@ -108,6 +108,7 @@ protected:
   // priority queue for outbound msgs
   std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
   bool keepalive;
+  bool write_in_progress = false;
 
   __u32 connect_seq, peer_global_seq;
   std::atomic<uint64_t> in_seq{0};
index b8d506fb8086b3fbd3b70bcf49de04e2e434ed15..fc7a004b3438d96a81e11636393b12da85db4d1f 100644 (file)
@@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() {
     }
   }
   out_queue.clear();
+  write_in_progress = false;
 }
 
 void ProtocolV2::reset_session() {
@@ -181,6 +182,7 @@ void ProtocolV2::stop() {
 void ProtocolV2::fault() { _fault(); }
 
 void ProtocolV2::requeue_sent() {
+  write_in_progress = false;
   if (sent.empty()) {
     return;
   }
@@ -424,7 +426,8 @@ void ProtocolV2::send_message(Message *m) {
       out_queue_entry_t{is_prepared, m});
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
-    if ((!replacing && can_write) || state == STANDBY) {
+    if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
+      write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
     }
   }
@@ -629,6 +632,7 @@ void ProtocolV2::write_event() {
       } else if (r > 0)
         break;
     } while (can_write);
+    write_in_progress = false;
 
     // if r > 0 mean data still lefted, so no need _try_send.
     if (r == 0) {
@@ -657,6 +661,7 @@ void ProtocolV2::write_event() {
       return;
     }
   } else {
+    write_in_progress = false;
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
@@ -2673,6 +2678,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   connection->dispatch_queue->queue_reset(connection);
 
   exproto->can_write = false;
+  exproto->write_in_progress = false;
   exproto->reconnecting = reconnecting;
   exproto->replacing = true;
   std::swap(exproto->session_stream_handlers, session_stream_handlers);
index 070f6910b78edf4c1484c2f5ed40272764350bd0..e5544f987460114bffc3c841f03166d12262892e 100644 (file)
@@ -113,6 +113,7 @@ private:
   } pre_auth;
 
   bool keepalive;
+  bool write_in_progress = false;
 
   ostream &_conn_prefix(std::ostream *_dout);
   void run_continuation(Ct<ProtocolV2> *pcontinuation);