From: Jason Dillaman Date: Tue, 4 Jun 2019 17:48:57 +0000 (-0400) Subject: msg/async: avoid unnecessary costly wakeups for outbound messages X-Git-Tag: v15.1.0~2511^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=294c41f18adada6ab762d5b3f9f45d7c8bed4316;p=ceph.git msg/async: avoid unnecessary costly wakeups for outbound messages If a wakeup for an outbound message has already been scheduled or is currently executing within the worker thread, avoid re-adding a wakeup. For small IO sizes under high queue depths, these extra syscalls start to add up. For larger IO sizes or small queue depths, it doesn't hurt performance. fio --ioengine=rbd results: IOPS pre-change post-change 4K: 84.9k 98.3k 32K: 58.4k 59.5k 256K: 12.1k 12.2k 4M: 803 802 Signed-off-by: Jason Dillaman --- diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index b9bfaec369cc..b410a2e6b79e 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -240,7 +240,8 @@ void ProtocolV1::send_message(Message *m) { out_q[m->get_priority()].emplace_back(std::move(bl), m); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; - if (can_write != WriteStatus::REPLACING) { + if (can_write != WriteStatus::REPLACING && !write_in_progress) { + write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); } } @@ -348,6 +349,7 @@ void ProtocolV1::write_event() { } else if (r > 0) break; } while (can_write == WriteStatus::CANWRITE); + write_in_progress = false; connection->write_lock.unlock(); // if r > 0 mean data still lefted, so no need _try_send. @@ -378,6 +380,7 @@ void ProtocolV1::write_event() { return; } } else { + write_in_progress = false; connection->write_lock.unlock(); connection->lock.lock(); connection->write_lock.lock(); @@ -1174,6 +1177,7 @@ ssize_t ProtocolV1::write_message(Message *m, bufferlist &bl, bool more) { } void ProtocolV1::requeue_sent() { + write_in_progress = false; if (sent.empty()) { return; } @@ -1233,6 +1237,7 @@ void ProtocolV1::discard_out_queue() { } } out_q.clear(); + write_in_progress = false; } void ProtocolV1::reset_recv_state() @@ -2305,6 +2310,7 @@ CtPtr ProtocolV1::replace(const AsyncConnectionRef& existing, << __func__ << " stop myself to swap existing" << dendl; exproto->can_write = WriteStatus::REPLACING; exproto->replacing = true; + exproto->write_in_progress = false; existing->state_offset = 0; // avoid previous thread modify event exproto->state = NONE; diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index 43256543cde3..72b707fe2a6c 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -108,6 +108,7 @@ protected: // priority queue for outbound msgs std::map>> out_q; bool keepalive; + bool write_in_progress = false; __u32 connect_seq, peer_global_seq; std::atomic in_seq{0}; diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index b8d506fb8086..fc7a004b3438 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -132,6 +132,7 @@ void ProtocolV2::discard_out_queue() { } } out_queue.clear(); + write_in_progress = false; } void ProtocolV2::reset_session() { @@ -181,6 +182,7 @@ void ProtocolV2::stop() { void ProtocolV2::fault() { _fault(); } void ProtocolV2::requeue_sent() { + write_in_progress = false; if (sent.empty()) { return; } @@ -424,7 +426,8 @@ void ProtocolV2::send_message(Message *m) { out_queue_entry_t{is_prepared, m}); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; - if ((!replacing && can_write) || state == STANDBY) { + if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) { + write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); } } @@ -629,6 +632,7 @@ void ProtocolV2::write_event() { } else if (r > 0) break; } while (can_write); + write_in_progress = false; // if r > 0 mean data still lefted, so no need _try_send. if (r == 0) { @@ -657,6 +661,7 @@ void ProtocolV2::write_event() { return; } } else { + write_in_progress = false; connection->write_lock.unlock(); connection->lock.lock(); connection->write_lock.lock(); @@ -2673,6 +2678,7 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing, connection->dispatch_queue->queue_reset(connection); exproto->can_write = false; + exproto->write_in_progress = false; exproto->reconnecting = reconnecting; exproto->replacing = true; std::swap(exproto->session_stream_handlers, session_stream_handlers); diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 070f6910b78e..e5544f987460 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -113,6 +113,7 @@ private: } pre_auth; bool keepalive; + bool write_in_progress = false; ostream &_conn_prefix(std::ostream *_dout); void run_continuation(Ct *pcontinuation);