From: Jason Dillaman Date: Tue, 4 Jun 2019 17:48:57 +0000 (-0400) Subject: msg/async: avoid unnecessary costly wakeups for outbound messages X-Git-Tag: v14.2.3~124^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=aba609e725d640d5aa940b50534361f4e2f8d9bf;p=ceph.git msg/async: avoid unnecessary costly wakeups for outbound messages If a wakeup for an outbound message has already been scheduled or is currently executing within the worker thread, avoid re-adding a wakeup. For small IO sizes under high queue depths, these extra syscalls start to add up. For larger IO sizes or small queue depths, it doesn't hurt performance. fio --ioengine=rbd results: IOPS pre-change post-change 4K: 84.9k 98.3k 32K: 58.4k 59.5k 256K: 12.1k 12.2k 4M: 803 802 Signed-off-by: Jason Dillaman (cherry picked from commit 294c41f18adada6ab762d5b3f9f45d7c8bed4316) --- diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 714eff74df8a7..cec0f0dae083f 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -246,7 +246,8 @@ void ProtocolV1::send_message(Message *m) { out_q[m->get_priority()].emplace_back(std::move(bl), m); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; - if (can_write != WriteStatus::REPLACING) { + if (can_write != WriteStatus::REPLACING && !write_in_progress) { + write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); } } @@ -352,6 +353,7 @@ void ProtocolV1::write_event() { } else if (r > 0) break; } while (can_write == WriteStatus::CANWRITE); + write_in_progress = false; connection->write_lock.unlock(); // if r > 0 mean data still lefted, so no need _try_send. @@ -382,6 +384,7 @@ void ProtocolV1::write_event() { return; } } else { + write_in_progress = false; connection->write_lock.unlock(); connection->lock.lock(); connection->write_lock.lock(); @@ -1175,6 +1178,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { } void ProtocolV1::requeue_sent() { + write_in_progress = false; if (sent.empty()) { return; } @@ -1234,6 +1238,7 @@ void ProtocolV1::discard_out_queue() { } } out_q.clear(); + write_in_progress = false; } void ProtocolV1::reset_recv_state() { @@ -2260,6 +2265,7 @@ CtPtr ProtocolV1::replace(AsyncConnectionRef existing, << __func__ << " stop myself to swap existing" << dendl; exproto->can_write = WriteStatus::REPLACING; exproto->replacing = true; + exproto->write_in_progress = false; existing->state_offset = 0; // avoid previous thread modify event exproto->state = NONE; diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index 6248a6f2e1992..b332b5ff7bf47 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -108,6 +108,7 @@ protected: // priority queue for outbound msgs std::map>> out_q; bool keepalive; + bool write_in_progress = false; __u32 connect_seq, peer_global_seq; std::atomic in_seq{0}; diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 15c786ce75443..ed2c83cbe1d7f 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() { } } out_queue.clear(); + write_in_progress = false; } void ProtocolV2::reset_session() { @@ -181,6 +182,7 @@ void ProtocolV2::stop() { void ProtocolV2::fault() { _fault(); } void ProtocolV2::requeue_sent() { + write_in_progress = false; if (sent.empty()) { return; } @@ -428,7 +430,8 @@ void ProtocolV2::send_message(Message *m) { out_queue_entry_t{is_prepared, m}); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; - if ((!replacing && can_write) || state == STANDBY) { + if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) { + write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); } } @@ -622,6 +625,7 @@ void ProtocolV2::write_event() { } else if (r > 0) break; } while (can_write); + write_in_progress = false; // if r > 0 mean data still lefted, so no need _try_send. if (r == 0) { @@ -652,6 +656,7 @@ void ProtocolV2::write_event() { return; } } else { + write_in_progress = false; connection->write_lock.unlock(); connection->lock.lock(); connection->write_lock.lock(); @@ -2670,6 +2675,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, connection->dispatch_queue->queue_reset(connection); exproto->can_write = false; + exproto->write_in_progress = false; exproto->reconnecting = reconnecting; exproto->replacing = true; std::swap(exproto->session_stream_handlers, session_stream_handlers); diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 9ea40de8459e4..69bff1b90dd49 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -113,6 +113,7 @@ private: } pre_auth; bool keepalive; + bool write_in_progress = false; ostream &_conn_prefix(std::ostream *_dout); void run_continuation(Ct *pcontinuation);