From: Patrick Donnelly Date: Tue, 19 Aug 2025 21:27:29 +0000 (-0400) Subject: msg: use MessageRef to manage pointer lifetime X-Git-Tag: testing/wip-pdonnell-testing-20260126.152838~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6fe70cb5abf88730bee5de10d0e8de46f7906bee;p=ceph-ci.git msg: use MessageRef to manage pointer lifetime To simplify reasoning about upcoming changes to incoming/pending messages. Signed-off-by: Patrick Donnelly --- diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index d553f97a421..950d2d22a85 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -4214,9 +4214,6 @@ void Monitor::forward_request_leader(MonOpRequestRef op) struct AnonConnection : public Connection { entity_addr_t socket_addr; - int send_message(Message *m) override { - ceph_abort_msg("send_message on anonymous connection"); - } void send_keepalive() override { ceph_abort_msg("send_keepalive on anonymous connection"); } @@ -4234,6 +4231,11 @@ struct AnonConnection : public Connection { return socket_addr; } +protected: + int send_msg(MessageRef&& m) override { + ceph_abort_msg("send_message on anonymous connection"); + } + private: FRIEND_MAKE_REF(AnonConnection); explicit AnonConnection(CephContext *cct, const entity_addr_t& sa) diff --git a/src/msg/Connection.h b/src/msg/Connection.h index b8fcaeebfca..156200ac24c 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -114,15 +114,19 @@ public: * on the Connection policy. * * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. + * (if a stupid pointer) when you pass it in. * * @return 0 on success, or -errno on failure. */ - virtual int send_message(Message *m) = 0; - - virtual int send_message2(MessageRef m) - { - return send_message(m.detach()); /* send_message(Message *m) consumes a reference */ + int send_message(Message* _m) { + auto m = ceph::ref_t(_m, false); /* consume ref */ + return send_msg(std::move(m)); + } + int send_message2(const MessageRef& m) { + return send_msg(MessageRef(m)); + } + int send_message2(MessageRef&& m) { + return send_msg(std::move(m)); } /** @@ -254,6 +258,8 @@ protected: {} ~Connection() override; + + virtual int send_msg(MessageRef&& m) = 0; }; using ConnectionRef = ceph::ref_t; diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index ad172590cb3..45a51e6fd1b 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -21,7 +21,6 @@ #define dout_subsys ceph_subsys_ms #include "common/debug.h" -using ceph::cref_t; using ceph::ref_t; /******************* @@ -64,7 +63,7 @@ void DispatchQueue::post_dispatch(const ref_t& m, uint64_t msize) ldout(cct,20) << "done calling dispatch on " << m << dendl; } -bool DispatchQueue::can_fast_dispatch(const cref_t &m) const +bool DispatchQueue::can_fast_dispatch(const Message& m) const { return msgr->ms_can_fast_dispatch(m); } @@ -81,14 +80,14 @@ void DispatchQueue::fast_preprocess(const ref_t& m) msgr->ms_fast_preprocess(m); } -void DispatchQueue::enqueue(const ref_t& m, int priority, uint64_t id) +void DispatchQueue::enqueue(ref_t&& m, int priority, uint64_t id) { std::lock_guard l{lock}; if (stop) { return; } ldout(cct,20) << "queue " << m << " prio " << priority << dendl; - QueueItem item{m}; + QueueItem item{std::move(m)}; add_arrival(item); if (priority >= CEPH_MSG_PRIO_LOW) { mqueue.enqueue_strict(id, priority, std::move(item)); @@ -98,7 +97,7 @@ void DispatchQueue::enqueue(const ref_t& m, int priority, uint64_t id) cond.notify_one(); } -void DispatchQueue::local_delivery(const ref_t& m, int priority) +void DispatchQueue::local_delivery(ref_t&& m, int priority) { auto local_delivery_stamp = ceph_clock_now(); m->set_recv_stamp(local_delivery_stamp); @@ -107,7 +106,7 @@ void DispatchQueue::local_delivery(const ref_t& m, int priority) std::lock_guard l{local_delivery_lock}; if (local_messages.empty()) local_delivery_cond.notify_all(); - local_messages.emplace(m, priority); + local_messages.emplace(std::move(m), priority); return; } @@ -124,13 +123,13 @@ void DispatchQueue::run_local_delivery() auto p = std::move(local_messages.front()); local_messages.pop(); l.unlock(); - const ref_t& m = p.first; + auto& m = p.first; int priority = p.second; fast_preprocess(m); - if (can_fast_dispatch(m)) { + if (can_fast_dispatch(*m)) { fast_dispatch(m); } else { - enqueue(m, priority, 0); + enqueue(std::move(m), priority, 0); } l.lock(); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index 1c1893a2814..401b07374d4 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -43,12 +43,12 @@ class DispatchQueue { ArrivalSet marrival; class QueueItem { - int type; + int type = -1; ConnectionRef con; ceph::ref_t m; public: - explicit QueueItem(const ceph::ref_t& m) : type(-1), con(0), m(m) {} - QueueItem(int type, Connection *con) : type(type), con(con), m(0) {} + explicit QueueItem(ceph::ref_t&& m) : m(std::move(m)) {} + QueueItem(int type, Connection *con) : type(type), con(con) {} bool is_code() const { return type != -1; } @@ -127,10 +127,7 @@ class DispatchQueue { Throttle dispatch_throttler; bool stop; - void local_delivery(const ceph::ref_t& m, int priority); - void local_delivery(Message* m, int priority) { - return local_delivery(ceph::ref_t(m, false), priority); /* consume ref */ - } + void local_delivery(ceph::ref_t&& m, int priority); void run_local_delivery(); double get_max_age(utime_t now) const; @@ -198,16 +195,13 @@ class DispatchQueue { cond.notify_all(); } - bool can_fast_dispatch(const ceph::cref_t &m) const; + bool can_fast_dispatch(const Message& m) const; void fast_dispatch(const ceph::ref_t& m); void fast_dispatch(Message* m) { return fast_dispatch(ceph::ref_t(m, false)); /* consume ref */ } void fast_preprocess(const ceph::ref_t& m); - void enqueue(const ceph::ref_t& m, int priority, uint64_t id); - void enqueue(Message* m, int priority, uint64_t id) { - return enqueue(ceph::ref_t(m, false), priority, id); /* consume ref */ - } + void enqueue(ceph::ref_t&& m, int priority, uint64_t id); void discard_queue(uint64_t id); void discard_local(); uint64_t get_id() { diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 58fc628f768..9dc34671f67 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -67,9 +67,11 @@ public: * @param m The message we want to fast dispatch. * @returns True if the message can be fast dispatched; false otherwise. */ - virtual bool ms_can_fast_dispatch(const Message *m) const { return false; } - virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const { - return ms_can_fast_dispatch(m.get()); + virtual bool ms_can_fast_dispatch(const Message* m) const { + return false; + } + virtual bool ms_can_fast_dispatch2(const Message& m) const { + return ms_can_fast_dispatch(&m); } /** * This function determines if a dispatcher is included in the diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 8dc49e2483c..d43c978f2c4 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -698,7 +698,7 @@ public: * * @param m The Message we are testing. */ - bool ms_can_fast_dispatch(const ceph::cref_t& m) { + bool ms_can_fast_dispatch(const Message& m) { for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) { return true; @@ -716,7 +716,7 @@ public: void ms_fast_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch2(m)) { + if (dispatcher->ms_can_fast_dispatch2(*m)) { dispatcher->ms_fast_dispatch2(m); return; } @@ -758,9 +758,6 @@ public: << m->get_source_inst() << dendl; ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); } - void ms_deliver_dispatch(Message *m) { - return ms_deliver_dispatch(ceph::ref_t(m, false)); /* consume ref */ - } /** * Notify each Dispatcher of a new Connection. Call * this function whenever a new Connection is initiated or diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 52d1330f24e..5d812f96263 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -543,9 +543,10 @@ void AsyncConnection::accept(ConnectedSocket socket, center->dispatch_event_external(read_handler); } -int AsyncConnection::send_message(Message *m) +int AsyncConnection::send_msg(MessageRef&& m) { FUNCTRACE(async_msgr->cct); + lgeneric_subdout(async_msgr->cct, ms, 1) << "-- " << async_msgr->get_myaddrs() << " --> " << get_peer_addrs() << " -- " @@ -556,7 +557,6 @@ int AsyncConnection::send_message(Message *m) if (is_blackhole()) { lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type) << " blackhole " << *m << dendl; - m->put(); return 0; } @@ -578,11 +578,10 @@ int AsyncConnection::send_message(Message *m) ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; std::lock_guard l(write_lock); if (protocol->is_connected()) { - dispatch_queue->local_delivery(m, m->get_priority()); + dispatch_queue->local_delivery(std::move(m), m->get_priority()); } else { ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed." << " Drop message " << m << dendl; - m->put(); } return 0; } @@ -591,7 +590,7 @@ int AsyncConnection::send_message(Message *m) // may disturb users logger->inc(l_msgr_send_messages); - protocol->send_message(m); + protocol->send_message(std::move(m)); return 0; } @@ -661,21 +660,22 @@ void AsyncConnection::shutdown_socket() { void AsyncConnection::DelayedDelivery::do_request(uint64_t id) { - Message *m = nullptr; + MessageRef m; { std::lock_guard l(delay_lock); register_time_events.erase(id); if (stop_dispatch) - return ; + return; if (delay_queue.empty()) - return ; - m = delay_queue.front(); + return; + m = std::move(delay_queue.front()); delay_queue.pop_front(); } - if (msgr->ms_can_fast_dispatch(m)) { + if (msgr->ms_can_fast_dispatch(*m)) { dispatch_queue->fast_dispatch(m); } else { - dispatch_queue->enqueue(m, m->get_priority(), conn_id); + auto p = m->get_priority(); + dispatch_queue->enqueue(std::move(m), p, conn_id); } } @@ -685,10 +685,11 @@ void AsyncConnection::DelayedDelivery::discard() { [this]() mutable { std::lock_guard l(delay_lock); while (!delay_queue.empty()) { - Message *m = delay_queue.front(); - dispatch_queue->dispatch_throttle_release( + { + auto& m = delay_queue.front(); + dispatch_queue->dispatch_throttle_release( m->get_dispatch_throttle_size()); - m->put(); + } delay_queue.pop_front(); } for (auto i : register_time_events) @@ -704,14 +705,14 @@ void AsyncConnection::DelayedDelivery::flush() { center->submit_to( center->get_id(), [this] () mutable { std::lock_guard l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front(); - if (msgr->ms_can_fast_dispatch(m)) { + for (; !delay_queue.empty(); delay_queue.pop_front()) { + auto& m = delay_queue.front(); + if (msgr->ms_can_fast_dispatch(*m)) { dispatch_queue->fast_dispatch(m); } else { - dispatch_queue->enqueue(m, m->get_priority(), conn_id); + auto p = m->get_priority(); + dispatch_queue->enqueue(std::move(m), p, conn_id); } - delay_queue.pop_front(); } for (auto i : register_time_events) center->delete_time_event(i); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index ac11fd1ae9d..a109e4f8624 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -80,7 +80,7 @@ class AsyncConnection : public Connection { */ class DelayedDelivery : public EventCallback { std::set register_time_events; // need to delete it if stop - std::deque delay_queue; + std::deque delay_queue; std::mutex delay_lock; AsyncMessenger *msgr; EventCenter *center; @@ -99,9 +99,9 @@ class AsyncConnection : public Connection { } void set_center(EventCenter *c) { center = c; } void do_request(uint64_t id) override; - void queue(double delay_period, Message *m) { + void queue(double delay_period, MessageRef&& m) { std::lock_guard l(delay_lock); - delay_queue.push_back(m); + delay_queue.push_back(std::move(m)); register_time_events.insert(center->create_time_event(delay_period*1000000, this)); } void discard(); @@ -129,7 +129,6 @@ public: void accept(ConnectedSocket socket, const entity_addr_t &listen_addr, const entity_addr_t &peer_addr); - int send_message(Message *m) override; void send_keepalive() override; void mark_down() override; @@ -182,6 +181,8 @@ public: int port; public: Messenger::Policy policy; +protected: + int send_msg(MessageRef&& m) override; private: DispatchQueue *dispatch_queue; diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index e2a0fd55958..e3d62c6b1f7 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -124,7 +124,7 @@ public: // signal and handle connection failure virtual void fault() = 0; // send message - virtual void send_message(Message *m) = 0; + virtual void send_message(MessageRef&& m) = 0; // send keepalive virtual void send_keepalive() = 0; diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index 00fab0a910e..fc4f2b483d7 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -210,13 +210,13 @@ void ProtocolV1::fault() { } } -void ProtocolV1::send_message(Message *m) { +void ProtocolV1::send_message(MessageRef&& m) { ceph::buffer::list bl; uint64_t f = connection->get_features(); // TODO: Currently not all messages supports reencode like MOSDMap, so here // only let fast dispatch support messages prepare message - bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); + bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m); bool is_prepared = false; if (can_fast_prepare && f) { prepare_send_message(f, m, bl); @@ -236,14 +236,11 @@ void ProtocolV1::send_message(Message *m) { if (can_write == WriteStatus::CLOSED) { ldout(cct, 10) << __func__ << " connection closed." << " Drop message " << m << dendl; - m->put(); } else { + ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << *m << dendl; m->queue_start = ceph::mono_clock::now(); m->trace.event("async enqueueing message"); - out_q[m->get_priority()].emplace_back(out_q_entry_t{ - std::move(bl), m, is_prepared}); - ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m - << dendl; + out_q[m->get_priority()].emplace_back(out_q_entry_t{std::move(bl), std::move(m), is_prepared}); if (can_write != WriteStatus::REPLACING && !write_in_progress) { write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); @@ -251,7 +248,7 @@ void ProtocolV1::send_message(Message *m) { } } -void ProtocolV1::prepare_send_message(uint64_t features, Message *m, +void ProtocolV1::prepare_send_message(uint64_t features, const MessageRef& m, ceph::buffer::list &bl) { ldout(cct, 20) << __func__ << " m " << *m << dendl; @@ -326,7 +323,7 @@ void ProtocolV1::write_event() { } const out_q_entry_t out_entry = _get_next_outgoing(); - Message *m = out_entry.m; + auto& m = out_entry.m; ceph::buffer::list data = out_entry.bl; if (!m) { @@ -336,7 +333,6 @@ void ProtocolV1::write_event() { if (!connection->policy.lossy) { // put on sent list sent.push_back(m); - m->get(); } more = !out_q.empty(); connection->write_lock.unlock(); @@ -617,21 +613,17 @@ CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) { static const int max_pending = 128; int i = 0; auto now = ceph::mono_clock::now(); - Message *pending[max_pending]; + std::array pending; connection->write_lock.lock(); while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) { - Message *m = sent.front(); + auto& m = pending[i++] = std::move(sent.front()); sent.pop_front(); - pending[i++] = m; ldout(cct, 10) << __func__ << " got ack seq " << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; } connection->write_lock.unlock(); connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now); - for (int k = 0; k < i; k++) { - pending[k]->put(); - } return CONTINUE(wait_message); } @@ -942,13 +934,16 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { ldout(cct, 20) << __func__ << " got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; - Message *message = decode_message(cct, messenger->crcflags, current_header, + Message *_message = decode_message(cct, messenger->crcflags, current_header, footer, front, middle, data, connection); - if (!message) { + if (!_message) { ldout(cct, 1) << __func__ << " decode message failed " << dendl; return _fault(); } + auto message = ceph::ref_t(_message, false); /* consume ref */ + _message = nullptr; + // // Check the signature if one should be present. A zero return indicates // success. PLR @@ -957,9 +952,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { if (session_security.get() == NULL) { ldout(cct, 10) << __func__ << " no session security set" << dendl; } else { - if (session_security->check_message_signature(message)) { + if (session_security->check_message_signature(message.get())) { ldout(cct, 0) << __func__ << " Signature check failed" << dendl; - message->put(); return _fault(); } } @@ -984,7 +978,6 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { ldout(cct, 0) << __func__ << " got old message " << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message << ", discarding" << dendl; - message->put(); if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && cct->_conf->ms_die_on_old_message) { ceph_assert(0 == "old msgs despite reconnect_seq feature"); @@ -1033,7 +1026,6 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { if (connection->is_blackhole()) { ldout(cct, 10) << __func__ << " blackhole " << *message << dendl; - message->put(); goto out; } @@ -1055,8 +1047,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { << (ceph_clock_now() + delay_period) << " on " << message << " " << *message << dendl; } - connection->delay_state->queue(delay_period, message); - } else if (messenger->ms_can_fast_dispatch(message)) { + connection->delay_state->queue(delay_period, std::move(message)); + } else if (messenger->ms_can_fast_dispatch(*message)) { connection->lock.unlock(); connection->dispatch_queue->fast_dispatch(message); connection->recv_start_time = ceph::mono_clock::now(); @@ -1064,8 +1056,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) { connection->recv_start_time - fast_dispatch_time); connection->lock.lock(); } else { - connection->dispatch_queue->enqueue(message, message->get_priority(), - connection->conn_id); + auto p = message->get_priority(); + connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id); } out: @@ -1120,7 +1112,7 @@ void ProtocolV1::randomize_out_seq() { } } -ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) { +ssize_t ProtocolV1::write_message(const MessageRef& m, ceph::buffer::list &bl, bool more) { FUNCTRACE(cct); ceph_assert(connection->center->in_thread()); m->set_seq(++out_seq); @@ -1141,7 +1133,7 @@ ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) if (session_security.get() == NULL) { ldout(cct, 20) << __func__ << " no session security" << dendl; } else { - if (session_security->sign_message(m)) { + if (session_security->sign_message(m.get())) { ldout(cct, 20) << __func__ << " failed to sign m=" << m << "): sig = " << footer.sig << dendl; } else { @@ -1205,7 +1197,6 @@ ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false); #endif - m->put(); return rc; } @@ -1218,13 +1209,12 @@ void ProtocolV1::requeue_sent() { auto &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); - while (!sent.empty()) { - Message *m = sent.back(); - sent.pop_back(); + for (; !sent.empty(); sent.pop_back()) { + auto& m = sent.back(); ldout(cct, 10) << __func__ << " " << *m << " for resend " << " (" << m->get_seq() << ")" << dendl; m->clear_payload(); - rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false}); + rq.push_front(out_q_entry_t{ceph::buffer::list(), std::move(m), false}); } } @@ -1237,14 +1227,12 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { } auto &rq = it->second; uint64_t count = out_seq; - while (!rq.empty()) { - Message* const m = rq.front().m; + for (; !rq.empty(); rq.pop_front()) { + auto& m = rq.front().m; if (m->get_seq() == 0 || m->get_seq() > seq) break; ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq " << m->get_seq() << " <= " << seq << ", discarding" << dendl; - m->put(); - rq.pop_front(); count++; } if (rq.empty()) out_q.erase(it); @@ -1258,16 +1246,13 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { void ProtocolV1::discard_out_queue() { ldout(cct, 10) << __func__ << " started" << dendl; - for (Message *msg : sent) { + for (const auto& msg : sent) { ldout(cct, 20) << __func__ << " discard " << msg << dendl; - msg->put(); } sent.clear(); - for (auto& [ prio, entries ] : out_q) { - static_cast(prio); + for ([[maybe_unused]] auto& [ prio, entries ] : out_q) { for (auto& entry : entries) { ldout(cct, 20) << __func__ << " discard " << entry.m << dendl; - entry.m->put(); } } out_q.clear(); @@ -1338,12 +1323,14 @@ void ProtocolV1::reset_recv_state() ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() { out_q_entry_t out_entry; - if (const auto it = out_q.begin(); it != out_q.end()) { - ceph_assert(!it->second.empty()); - const auto p = it->second.begin(); - out_entry = *p; - it->second.erase(p); - if (it->second.empty()) out_q.erase(it); + if (auto it = out_q.begin(); it != out_q.end()) { + auto& q = it->second; + ceph_assert(!q.empty()); + out_entry = std::move(q.front()); + q.pop_front(); + if (q.empty()) { + out_q.erase(it); + } } return out_entry; } diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index 2f7eb1e01ee..200d01ad35c 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -7,6 +7,8 @@ #include "Protocol.h" #include "AsyncConnection.h" +#include + struct AuthSessionHandler; class ProtocolV1; using CtPtr = Ct*; @@ -106,11 +108,11 @@ protected: enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED }; std::atomic can_write; - std::list sent; // the first ceph::buffer::list need to inject seq + std::deque sent; // the first ceph::buffer::list need to inject seq //struct for outbound msgs struct out_q_entry_t { ceph::buffer::list bl; - Message* m {nullptr}; + MessageRef m; bool is_prepared {false}; }; // priority queue for outbound msgs @@ -118,7 +120,7 @@ protected: /** * A queue for each priority value, highest priority first. */ - std::map, std::greater> out_q; + std::map, std::greater> out_q; bool keepalive; bool write_in_progress = false; @@ -209,8 +211,8 @@ protected: out_q_entry_t _get_next_outgoing(); - void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl); - ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more); + void prepare_send_message(uint64_t features, const MessageRef& m, ceph::buffer::list &bl); + ssize_t write_message(const MessageRef& m, ceph::buffer::list &bl, bool more); void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); @@ -230,7 +232,7 @@ public: virtual bool is_connected() override; virtual void stop() override; virtual void fault() override; - virtual void send_message(Message *m) override; + virtual void send_message(MessageRef&& m) override; virtual void send_keepalive() override; virtual void read_event() override; diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 53221aa9cf1..376477fa2ee 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -132,16 +132,14 @@ bool ProtocolV2::is_connected() { return can_write; } void ProtocolV2::discard_out_queue() { ldout(cct, 10) << __func__ << " started" << dendl; - for (Message *msg : sent) { - ldout(cct, 20) << __func__ << " discard " << msg << dendl; - msg->put(); + for (auto& msg : sent) { + ldout(cct, 20) << __func__ << " discard " << *msg << dendl; } sent.clear(); for (auto& [ prio, entries ] : out_queue) { static_cast(prio); for (auto& entry : entries) { ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl; - entry.m->put(); } } out_queue.clear(); @@ -202,14 +200,13 @@ void ProtocolV2::requeue_sent() { auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); - while (!sent.empty()) { - Message *m = sent.back(); - sent.pop_back(); + for (; !sent.empty(); sent.pop_back()) { + auto& m = sent.back(); ldout(cct, 5) << __func__ << " requeueing message m=" << m << " seq=" << m->get_seq() << " type=" << m->get_type() << " " << *m << dendl; m->clear_payload(); - rq.emplace_front(out_queue_entry_t{false, m}); + rq.emplace_front(out_queue_entry_t{false, std::move(m)}); } } @@ -222,14 +219,14 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { } auto& rq = it->second; uint64_t count = out_seq; - while (!rq.empty()) { - Message* const m = rq.front().m; - if (m->get_seq() == 0 || m->get_seq() > seq) break; + for (; !rq.empty(); rq.pop_front()) { + auto& m = rq.front().m; + if (m->get_seq() == 0 || m->get_seq() > seq) { + break; + } ldout(cct, 5) << __func__ << " discarding message m=" << m << " seq=" << m->get_seq() << " ack_seq=" << seq << " " << *m << dendl; - m->put(); - rq.pop_front(); count++; } if (rq.empty()) out_queue.erase(it); @@ -419,8 +416,7 @@ CtPtr ProtocolV2::_fault() { return nullptr; } -void ProtocolV2::prepare_send_message(uint64_t features, - Message *m) { +void ProtocolV2::prepare_send_message(uint64_t features, const MessageRef& m) { ldout(cct, 20) << __func__ << " m=" << *m << dendl; // associate message with Connection (for benefit of encode_payload) @@ -431,12 +427,12 @@ void ProtocolV2::prepare_send_message(uint64_t features, m->encode(features, 0); } -void ProtocolV2::send_message(Message *m) { +void ProtocolV2::send_message(MessageRef&& m) { uint64_t f = connection->get_features(); // TODO: Currently not all messages supports reencode like MOSDMap, so here // only let fast dispatch support messages prepare message - const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); + const bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m); bool is_prepared; if (can_fast_prepare && f) { prepare_send_message(f, m); @@ -456,17 +452,13 @@ void ProtocolV2::send_message(Message *m) { } if (state == CLOSED) { ldout(cct, 10) << __func__ << " connection closed." - << " Drop message " << m << dendl; - m->put(); + << " Drop message " << *m << dendl; } else { - ldout(cct, 5) << __func__ << " enqueueing message m=" << m - << " type=" << m->get_type() << " " << *m << dendl; + ldout(cct, 5) << __func__ << " enqueueing message m=" << *m << dendl; m->queue_start = ceph::mono_clock::now(); m->trace.event("async enqueueing message"); - out_queue[m->get_priority()].emplace_back( - out_queue_entry_t{is_prepared, m}); - ldout(cct, 15) << __func__ << " message queued for async transmission m=" << m - << dendl; + auto& oq = out_queue[m->get_priority()]; + oq.emplace_back(out_queue_entry_t{is_prepared, std::move(m)}); if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) { write_in_progress = true; connection->center->dispatch_event_external(connection->write_handler); @@ -525,7 +517,7 @@ ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() { return out_entry; } -ssize_t ProtocolV2::write_message(Message *m, bool more) { +ssize_t ProtocolV2::write_message(const MessageRef& m, bool more) { FUNCTRACE(cct); ceph_assert(connection->center->in_thread()); m->set_seq(++out_seq); @@ -585,7 +577,6 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false); #endif - m->put(); return rc; } @@ -616,22 +607,18 @@ void ProtocolV2::handle_message_ack(uint64_t seq) { // trim sent list static const int max_pending = 128; int i = 0; - Message *pending[max_pending]; + std::array pending; auto now = ceph::mono_clock::now(); connection->write_lock.lock(); while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) { - Message *m = sent.front(); + auto& m = pending[i++] = std::move(sent.front()); sent.pop_front(); - pending[i++] = m; ldout(cct, 10) << __func__ << " got ack seq " << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; } connection->write_lock.unlock(); connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now); - for (int k = 0; k < i; k++) { - pending[k]->put(); - } } void ProtocolV2::reset_compression() { @@ -671,7 +658,7 @@ void ProtocolV2::write_event() { } } - const auto out_entry = _get_next_outgoing(); + auto out_entry = _get_next_outgoing(); if (!out_entry.m) { break; } @@ -679,7 +666,6 @@ void ProtocolV2::write_event() { if (!connection->policy.lossy) { // put on sent list sent.push_back(out_entry.m); - out_entry.m->get(); } more = !out_queue.empty(); connection->write_lock.unlock(); @@ -1436,18 +1422,20 @@ CtPtr ProtocolV2::handle_message() { ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), ceph_le32(0), ceph_le64(0), current_header.flags}; - Message *message = decode_message(cct, 0, header, footer, + Message* _m = decode_message(cct, 0, header, footer, msg_frame.front(), msg_frame.middle(), msg_frame.data(), connection); - if (!message) { + if (!_m) { ldout(cct, 1) << __func__ << " decode message failed " << dendl; return _fault(); - } else { - state = READ_MESSAGE_COMPLETE; } + auto message = ceph::ref_t(_m, false); /* consume ref */ + _m = nullptr; + state = READ_MESSAGE_COMPLETE; + INTERCEPT(17); message->set_byte_throttler(connection->policy.throttler_bytes); @@ -1471,7 +1459,6 @@ CtPtr ProtocolV2::handle_message() { ldout(cct, 0) << __func__ << " got old message " << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message << ", discarding" << dendl; - message->put(); if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && cct->_conf->ms_die_on_old_message) { ceph_assert(0 == "old msgs despite reconnect_seq feature"); @@ -1521,7 +1508,6 @@ CtPtr ProtocolV2::handle_message() { if (connection->is_blackhole()) { ldout(cct, 10) << __func__ << " blackhole " << *message << dendl; - message->put(); goto out; } @@ -1546,10 +1532,10 @@ CtPtr ProtocolV2::handle_message() { << (ceph_clock_now() + delay_period) << " on " << message << " " << *message << dendl; } - connection->delay_state->queue(delay_period, message); - } else if (messenger->ms_can_fast_dispatch(message)) { + connection->delay_state->queue(delay_period, std::move(message)); + } else if (messenger->ms_can_fast_dispatch(*message)) { connection->lock.unlock(); - connection->dispatch_queue->fast_dispatch(message); + connection->dispatch_queue->fast_dispatch(std::move(message)); connection->recv_start_time = ceph::mono_clock::now(); connection->logger->tinc(l_msgr_running_fast_dispatch_time, connection->recv_start_time - fast_dispatch_time); @@ -1561,8 +1547,8 @@ CtPtr ProtocolV2::handle_message() { return nullptr; } } else { - connection->dispatch_queue->enqueue(message, message->get_priority(), - connection->conn_id); + auto p = message->get_priority(); + connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id); } handle_message_ack(current_header.ack_seq); diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index e82989599df..73c225e6b31 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -11,6 +11,8 @@ #include "compression_onwire.h" #include "frames_v2.h" +#include + class ProtocolV2 : public Protocol { private: enum State { @@ -92,15 +94,15 @@ private: bool can_write; struct out_queue_entry_t { bool is_prepared {false}; - Message* m {nullptr}; + MessageRef m; }; /** * A queue for each priority value, highest priority first. */ - std::map, std::greater> out_queue; + std::map, std::greater> out_queue; - std::list sent; + std::deque sent; std::atomic out_seq{0}; std::atomic in_seq{0}; std::atomic ack_left{0}; @@ -154,9 +156,9 @@ private: Ct *_fault(); void discard_out_queue(); void reset_session(); - void prepare_send_message(uint64_t features, Message *m); + void prepare_send_message(uint64_t features, const MessageRef& m); out_queue_entry_t _get_next_outgoing(); - ssize_t write_message(Message *m, bool more); + ssize_t write_message(const MessageRef& m, bool more); void handle_message_ack(uint64_t seq); void reset_compression(); @@ -217,7 +219,7 @@ public: virtual bool is_connected() override; virtual void stop() override; virtual void fault() override; - virtual void send_message(Message *m) override; + virtual void send_message(MessageRef&& m) override; virtual void send_keepalive() override; virtual void read_event() override;