]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg: use MessageRef to manage pointer lifetime
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 19 Aug 2025 21:27:29 +0000 (17:27 -0400)
committerPatrick Donnelly <pdonnell@ibm.com>
Mon, 29 Dec 2025 22:34:46 +0000 (17:34 -0500)
To simplify reasoning about upcoming changes to incoming/pending
messages.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
13 files changed:
src/mon/Monitor.cc
src/msg/Connection.h
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Messenger.h
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/Protocol.h
src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV1.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 38f5a95962511083466496bdc4d9db5ceba6360c..3a90987836de74c79517b1866239522c2e24b641 100644 (file)
@@ -4194,9 +4194,6 @@ void Monitor::forward_request_leader(MonOpRequestRef op)
 struct AnonConnection : public Connection {
   entity_addr_t socket_addr;
 
-  int send_message(Message *m) override {
-    ceph_abort_msg("send_message on anonymous connection");
-  }
   void send_keepalive() override {
     ceph_abort_msg("send_keepalive on anonymous connection");
   }
@@ -4214,6 +4211,11 @@ struct AnonConnection : public Connection {
     return socket_addr;
   }
 
+protected:
+  int send_msg(MessageRef&& m) override {
+    ceph_abort_msg("send_message on anonymous connection");
+  }
+
 private:
   FRIEND_MAKE_REF(AnonConnection);
   explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
index b8fcaeebfca04116026a1c4822110528d2fcdb9f..156200ac24c72002c40660eaaef400cd691e1dfc 100644 (file)
@@ -114,15 +114,19 @@ public:
    * on the Connection policy.
    *
    * @param m The Message to send. The Messenger consumes a single reference
-   * when you pass it in.
+   * (if a stupid pointer) when you pass it in.
    *
    * @return 0 on success, or -errno on failure.
    */
-  virtual int send_message(Message *m) = 0;
-
-  virtual int send_message2(MessageRef m)
-  {
-    return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
+  int send_message(Message* _m) {
+    auto m = ceph::ref_t<Message>(_m, false); /* consume ref */
+    return send_msg(std::move(m));
+  }
+  int send_message2(const MessageRef& m) {
+    return send_msg(MessageRef(m));
+  }
+  int send_message2(MessageRef&& m) {
+    return send_msg(std::move(m));
   }
 
   /**
@@ -254,6 +258,8 @@ protected:
   {}
 
   ~Connection() override;
+
+  virtual int send_msg(MessageRef&& m) = 0;
 };
 
 using ConnectionRef = ceph::ref_t<Connection>;
index ad172590cb323f010d3883f25bbe2a5c93ab085e..45a51e6fd1bebc4817b4d695293cadf23fe11d08 100644 (file)
@@ -21,7 +21,6 @@
 #define dout_subsys ceph_subsys_ms
 #include "common/debug.h"
 
-using ceph::cref_t;
 using ceph::ref_t;
 
 /*******************
@@ -64,7 +63,7 @@ void DispatchQueue::post_dispatch(const ref_t<Message>& m, uint64_t msize)
   ldout(cct,20) << "done calling dispatch on " << m << dendl;
 }
 
-bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
+bool DispatchQueue::can_fast_dispatch(const Message& m) const
 {
   return msgr->ms_can_fast_dispatch(m);
 }
@@ -81,14 +80,14 @@ void DispatchQueue::fast_preprocess(const ref_t<Message>& m)
   msgr->ms_fast_preprocess(m);
 }
 
-void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
+void DispatchQueue::enqueue(ref_t<Message>&& m, int priority, uint64_t id)
 {
   std::lock_guard l{lock};
   if (stop) {
     return;
   }
   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
-  QueueItem item{m};
+  QueueItem item{std::move(m)};
   add_arrival(item);
   if (priority >= CEPH_MSG_PRIO_LOW) {
     mqueue.enqueue_strict(id, priority, std::move(item));
@@ -98,7 +97,7 @@ void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
   cond.notify_one();
 }
 
-void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
+void DispatchQueue::local_delivery(ref_t<Message>&& m, int priority)
 {
   auto local_delivery_stamp = ceph_clock_now();
   m->set_recv_stamp(local_delivery_stamp);
@@ -107,7 +106,7 @@ void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
   std::lock_guard l{local_delivery_lock};
   if (local_messages.empty())
     local_delivery_cond.notify_all();
-  local_messages.emplace(m, priority);
+  local_messages.emplace(std::move(m), priority);
   return;
 }
 
@@ -124,13 +123,13 @@ void DispatchQueue::run_local_delivery()
     auto p = std::move(local_messages.front());
     local_messages.pop();
     l.unlock();
-    const ref_t<Message>& m = p.first;
+    auto& m = p.first;
     int priority = p.second;
     fast_preprocess(m);
-    if (can_fast_dispatch(m)) {
+    if (can_fast_dispatch(*m)) {
       fast_dispatch(m);
     } else {
-      enqueue(m, priority, 0);
+      enqueue(std::move(m), priority, 0);
     }
     l.lock();
   }
index 1c1893a28140f164e58e752df44d2fa387e0d8e3..401b07374d4d3492eac9ca263416081d27f84d25 100644 (file)
@@ -43,12 +43,12 @@ class DispatchQueue {
   ArrivalSet marrival;
 
   class QueueItem {
-    int type;
+    int type = -1;
     ConnectionRef con;
     ceph::ref_t<Message> m;
   public:
-    explicit QueueItem(const ceph::ref_t<Message>& m) : type(-1), con(0), m(m) {}
-    QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
+    explicit QueueItem(ceph::ref_t<Message>&& m) : m(std::move(m)) {}
+    QueueItem(int type, Connection *con) : type(type), con(con) {}
     bool is_code() const {
       return type != -1;
     }
@@ -127,10 +127,7 @@ class DispatchQueue {
   Throttle dispatch_throttler;
 
   bool stop;
-  void local_delivery(const ceph::ref_t<Message>& m, int priority);
-  void local_delivery(Message* m, int priority) {
-    return local_delivery(ceph::ref_t<Message>(m, false), priority); /* consume ref */
-  }
+  void local_delivery(ceph::ref_t<Message>&& m, int priority);
   void run_local_delivery();
 
   double get_max_age(utime_t now) const;
@@ -198,16 +195,13 @@ class DispatchQueue {
     cond.notify_all();
   }
 
-  bool can_fast_dispatch(const ceph::cref_t<Message> &m) const;
+  bool can_fast_dispatch(const Message& m) const;
   void fast_dispatch(const ceph::ref_t<Message>& m);
   void fast_dispatch(Message* m) {
     return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
   }
   void fast_preprocess(const ceph::ref_t<Message>& m);
-  void enqueue(const ceph::ref_t<Message>& m, int priority, uint64_t id);
-  void enqueue(Message* m, int priority, uint64_t id) {
-    return enqueue(ceph::ref_t<Message>(m, false), priority, id); /* consume ref */
-  }
+  void enqueue(ceph::ref_t<Message>&& m, int priority, uint64_t id);
   void discard_queue(uint64_t id);
   void discard_local();
   uint64_t get_id() {
index 58fc628f7683b658e707e4ccb25d580a41947afc..9dc34671f673dcaeba603f0152c24aae60658163 100644 (file)
@@ -67,9 +67,11 @@ public:
    * @param m The message we want to fast dispatch.
    * @returns True if the message can be fast dispatched; false otherwise.
    */
-  virtual bool ms_can_fast_dispatch(const Message *m) const { return false; }
-  virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const {
-    return ms_can_fast_dispatch(m.get());
+  virtual bool ms_can_fast_dispatch(const Message* m) const {
+    return false;
+  }
+  virtual bool ms_can_fast_dispatch2(const Message& m) const {
+    return ms_can_fast_dispatch(&m);
   }
   /**
    * This function determines if a dispatcher is included in the
index 8dc49e2483cd4fda72568b1e92420fd6277dd9d3..d43c978f2c4f0082c31cda5d04b3c07d603e7a67 100644 (file)
@@ -698,7 +698,7 @@ public:
    *
    * @param m The Message we are testing.
    */
-  bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
+  bool ms_can_fast_dispatch(const Message& m) {
     for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
       if (dispatcher->ms_can_fast_dispatch2(m)) {
         return true;
@@ -716,7 +716,7 @@ public:
   void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
     m->set_dispatch_stamp(ceph_clock_now());
     for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
-      if (dispatcher->ms_can_fast_dispatch2(m)) {
+      if (dispatcher->ms_can_fast_dispatch2(*m)) {
         dispatcher->ms_fast_dispatch2(m);
         return;
       }
@@ -758,9 +758,6 @@ public:
                         << m->get_source_inst() << dendl;
     ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
   }
-  void ms_deliver_dispatch(Message *m) {
-    return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
-  }
   /**
    * Notify each Dispatcher of a new Connection. Call
    * this function whenever a new Connection is initiated or
index 52d1330f24e1d6602fc50b6c08a17035ca492c31..5d812f962631e560a5f853fe71381e22e53625d0 100644 (file)
@@ -543,9 +543,10 @@ void AsyncConnection::accept(ConnectedSocket socket,
   center->dispatch_event_external(read_handler);
 }
 
-int AsyncConnection::send_message(Message *m)
+int AsyncConnection::send_msg(MessageRef&& m)
 {
   FUNCTRACE(async_msgr->cct);
+
   lgeneric_subdout(async_msgr->cct, ms,
                   1) << "-- " << async_msgr->get_myaddrs() << " --> "
                      << get_peer_addrs() << " -- "
@@ -556,7 +557,6 @@ int AsyncConnection::send_message(Message *m)
   if (is_blackhole()) {
     lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
       << " blackhole " << *m << dendl;
-    m->put();
     return 0;
   }
 
@@ -578,11 +578,10 @@ int AsyncConnection::send_message(Message *m)
     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
     std::lock_guard<std::mutex> l(write_lock);
     if (protocol->is_connected()) {
-      dispatch_queue->local_delivery(m, m->get_priority());
+      dispatch_queue->local_delivery(std::move(m), m->get_priority());
     } else {
       ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
                                  << " Drop message " << m << dendl;
-      m->put();
     }
     return 0;
   }
@@ -591,7 +590,7 @@ int AsyncConnection::send_message(Message *m)
   // may disturb users
   logger->inc(l_msgr_send_messages);
 
-  protocol->send_message(m);
+  protocol->send_message(std::move(m));
   return 0;
 }
 
@@ -661,21 +660,22 @@ void AsyncConnection::shutdown_socket() {
 
 void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
 {
-  Message *m = nullptr;
+  MessageRef m;
   {
     std::lock_guard<std::mutex> l(delay_lock);
     register_time_events.erase(id);
     if (stop_dispatch)
-      return ;
+      return;
     if (delay_queue.empty())
-      return ;
-    m = delay_queue.front();
+      return;
+    m = std::move(delay_queue.front());
     delay_queue.pop_front();
   }
-  if (msgr->ms_can_fast_dispatch(m)) {
+  if (msgr->ms_can_fast_dispatch(*m)) {
     dispatch_queue->fast_dispatch(m);
   } else {
-    dispatch_queue->enqueue(m, m->get_priority(), conn_id);
+    auto p = m->get_priority();
+    dispatch_queue->enqueue(std::move(m), p, conn_id);
   }
 }
 
@@ -685,10 +685,11 @@ void AsyncConnection::DelayedDelivery::discard() {
                     [this]() mutable {
                       std::lock_guard<std::mutex> l(delay_lock);
                       while (!delay_queue.empty()) {
-                        Message *m = delay_queue.front();
-                        dispatch_queue->dispatch_throttle_release(
+                        {
+                          auto& m = delay_queue.front();
+                          dispatch_queue->dispatch_throttle_release(
                             m->get_dispatch_throttle_size());
-                        m->put();
+                        }
                         delay_queue.pop_front();
                       }
                       for (auto i : register_time_events)
@@ -704,14 +705,14 @@ void AsyncConnection::DelayedDelivery::flush() {
   center->submit_to(
       center->get_id(), [this] () mutable {
     std::lock_guard<std::mutex> l(delay_lock);
-    while (!delay_queue.empty()) {
-      Message *m = delay_queue.front();
-      if (msgr->ms_can_fast_dispatch(m)) {
+    for (; !delay_queue.empty(); delay_queue.pop_front()) {
+      auto& m = delay_queue.front();
+      if (msgr->ms_can_fast_dispatch(*m)) {
         dispatch_queue->fast_dispatch(m);
       } else {
-        dispatch_queue->enqueue(m, m->get_priority(), conn_id);
+        auto p = m->get_priority();
+        dispatch_queue->enqueue(std::move(m), p, conn_id);
       }
-      delay_queue.pop_front();
     }
     for (auto i : register_time_events)
       center->delete_time_event(i);
index ac11fd1ae9dc919c7bd8bcac5c167130912b191f..a109e4f86246a4431753187f4f558d6250aefa49 100644 (file)
@@ -80,7 +80,7 @@ class AsyncConnection : public Connection {
    */
   class DelayedDelivery : public EventCallback {
     std::set<uint64_t> register_time_events; // need to delete it if stop
-    std::deque<Message*> delay_queue;
+    std::deque<MessageRef> delay_queue;
     std::mutex delay_lock;
     AsyncMessenger *msgr;
     EventCenter *center;
@@ -99,9 +99,9 @@ class AsyncConnection : public Connection {
     }
     void set_center(EventCenter *c) { center = c; }
     void do_request(uint64_t id) override;
-    void queue(double delay_period, Message *m) {
+    void queue(double delay_period, MessageRef&& m) {
       std::lock_guard<std::mutex> l(delay_lock);
-      delay_queue.push_back(m);
+      delay_queue.push_back(std::move(m));
       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
     void discard();
@@ -129,7 +129,6 @@ public:
   void accept(ConnectedSocket socket,
              const entity_addr_t &listen_addr,
              const entity_addr_t &peer_addr);
-  int send_message(Message *m) override;
 
   void send_keepalive() override;
   void mark_down() override;
@@ -182,6 +181,8 @@ public:
   int port;
 public:
   Messenger::Policy policy;
+protected:
+  int send_msg(MessageRef&& m) override;
 private:
 
   DispatchQueue *dispatch_queue;
index e2a0fd559583c9164e276a314d7b30fc747e2f7a..e3d62c6b1f74c703b4c05104d28cf7f530991204 100644 (file)
@@ -124,7 +124,7 @@ public:
   // signal and handle connection failure
   virtual void fault() = 0;
   // send message
-  virtual void send_message(Message *m) = 0;
+  virtual void send_message(MessageRef&& m) = 0;
   // send keepalive
   virtual void send_keepalive() = 0;
 
index 00fab0a910e1c5a45a2df349024c8b0dfc355b67..fc4f2b483d76f3f2df7e0874a4facda0bc7278bf 100644 (file)
@@ -210,13 +210,13 @@ void ProtocolV1::fault() {
   }
 }
 
-void ProtocolV1::send_message(Message *m) {
+void ProtocolV1::send_message(MessageRef&& m) {
   ceph::buffer::list bl;
   uint64_t f = connection->get_features();
 
   // TODO: Currently not all messages supports reencode like MOSDMap, so here
   // only let fast dispatch support messages prepare message
-  bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+  bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m);
   bool is_prepared = false;
   if (can_fast_prepare && f) {
     prepare_send_message(f, m, bl);
@@ -236,14 +236,11 @@ void ProtocolV1::send_message(Message *m) {
   if (can_write == WriteStatus::CLOSED) {
     ldout(cct, 10) << __func__ << " connection closed."
                    << " Drop message " << m << dendl;
-    m->put();
   } else {
+    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << *m << dendl;
     m->queue_start = ceph::mono_clock::now();
     m->trace.event("async enqueueing message");
-    out_q[m->get_priority()].emplace_back(out_q_entry_t{
-      std::move(bl), m, is_prepared});
-    ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
-                   << dendl;
+    out_q[m->get_priority()].emplace_back(out_q_entry_t{std::move(bl), std::move(m), is_prepared});
     if (can_write != WriteStatus::REPLACING && !write_in_progress) {
       write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
@@ -251,7 +248,7 @@ void ProtocolV1::send_message(Message *m) {
   }
 }
 
-void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
+void ProtocolV1::prepare_send_message(uint64_t features, const MessageRef& m,
                                       ceph::buffer::list &bl) {
   ldout(cct, 20) << __func__ << " m " << *m << dendl;
 
@@ -326,7 +323,7 @@ void ProtocolV1::write_event() {
       }
 
       const out_q_entry_t out_entry = _get_next_outgoing();
-      Message *m = out_entry.m;
+      auto& m = out_entry.m;
       ceph::buffer::list data = out_entry.bl;
 
       if (!m) {
@@ -336,7 +333,6 @@ void ProtocolV1::write_event() {
       if (!connection->policy.lossy) {
         // put on sent list
         sent.push_back(m);
-        m->get();
       }
       more = !out_q.empty();
       connection->write_lock.unlock();
@@ -617,21 +613,17 @@ CtPtr ProtocolV1::handle_tag_ack(char *buffer, int r) {
   static const int max_pending = 128;
   int i = 0;
   auto now = ceph::mono_clock::now();
-  Message *pending[max_pending];
+  std::array<MessageRef, max_pending> pending;
   connection->write_lock.lock();
   while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
-    Message *m = sent.front();
+    auto& m = pending[i++] = std::move(sent.front());
     sent.pop_front();
-    pending[i++] = m;
     ldout(cct, 10) << __func__ << " got ack seq " << seq
                    << " >= " << m->get_seq() << " on " << m << " " << *m
                    << dendl;
   }
   connection->write_lock.unlock();
   connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
-  for (int k = 0; k < i; k++) {
-    pending[k]->put();
-  }
 
   return CONTINUE(wait_message);
 }
@@ -942,13 +934,16 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " got " << front.length() << " + "
                  << middle.length() << " + " << data.length() << " byte message"
                  << dendl;
-  Message *message = decode_message(cct, messenger->crcflags, current_header,
+  Message *_message = decode_message(cct, messenger->crcflags, current_header,
                                     footer, front, middle, data, connection);
-  if (!message) {
+  if (!_message) {
     ldout(cct, 1) << __func__ << " decode message failed " << dendl;
     return _fault();
   }
 
+  auto message = ceph::ref_t<Message>(_message, false); /* consume ref */
+  _message = nullptr;
+
   //
   //  Check the signature if one should be present.  A zero return indicates
   //  success. PLR
@@ -957,9 +952,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
   if (session_security.get() == NULL) {
     ldout(cct, 10) << __func__ << " no session security set" << dendl;
   } else {
-    if (session_security->check_message_signature(message)) {
+    if (session_security->check_message_signature(message.get())) {
       ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
-      message->put();
       return _fault();
     }
   }
@@ -984,7 +978,6 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
     ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
                   << " <= " << cur_seq << " " << message << " " << *message
                   << ", discarding" << dendl;
-    message->put();
     if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
         cct->_conf->ms_die_on_old_message) {
       ceph_assert(0 == "old msgs despite reconnect_seq feature");
@@ -1033,7 +1026,6 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
 
   if (connection->is_blackhole()) {
     ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
-    message->put();
     goto out;
   }
 
@@ -1055,8 +1047,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
                     << (ceph_clock_now() + delay_period) << " on " << message
                     << " " << *message << dendl;
     }
-    connection->delay_state->queue(delay_period, message);
-  } else if (messenger->ms_can_fast_dispatch(message)) {
+    connection->delay_state->queue(delay_period, std::move(message));
+  } else if (messenger->ms_can_fast_dispatch(*message)) {
     connection->lock.unlock();
     connection->dispatch_queue->fast_dispatch(message);
     connection->recv_start_time = ceph::mono_clock::now();
@@ -1064,8 +1056,8 @@ CtPtr ProtocolV1::handle_message_footer(char *buffer, int r) {
                              connection->recv_start_time - fast_dispatch_time);
     connection->lock.lock();
   } else {
-    connection->dispatch_queue->enqueue(message, message->get_priority(),
-                                        connection->conn_id);
+    auto p = message->get_priority();
+    connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id);
   }
 
  out:
@@ -1120,7 +1112,7 @@ void ProtocolV1::randomize_out_seq() {
   }
 }
 
-ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) {
+ssize_t ProtocolV1::write_message(const MessageRef& m, ceph::buffer::list &bl, bool more) {
   FUNCTRACE(cct);
   ceph_assert(connection->center->in_thread());
   m->set_seq(++out_seq);
@@ -1141,7 +1133,7 @@ ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more)
   if (session_security.get() == NULL) {
     ldout(cct, 20) << __func__ << " no session security" << dendl;
   } else {
-    if (session_security->sign_message(m)) {
+    if (session_security->sign_message(m.get())) {
       ldout(cct, 20) << __func__ << " failed to sign m=" << m
                      << "): sig = " << footer.sig << dendl;
     } else {
@@ -1205,7 +1197,6 @@ ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more)
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
 #endif
-  m->put();
 
   return rc;
 }
@@ -1218,13 +1209,12 @@ void ProtocolV1::requeue_sent() {
 
   auto &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   out_seq -= sent.size();
-  while (!sent.empty()) {
-    Message *m = sent.back();
-    sent.pop_back();
+  for (; !sent.empty(); sent.pop_back()) {
+    auto& m = sent.back();
     ldout(cct, 10) << __func__ << " " << *m << " for resend "
                    << " (" << m->get_seq() << ")" << dendl;
     m->clear_payload();
-    rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false});
+    rq.push_front(out_q_entry_t{ceph::buffer::list(), std::move(m), false});
   }
 }
 
@@ -1237,14 +1227,12 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   }
   auto &rq = it->second;
   uint64_t count = out_seq;
-  while (!rq.empty()) {
-    Message* const m = rq.front().m;
+  for (; !rq.empty(); rq.pop_front()) {
+    auto& m = rq.front().m;
     if (m->get_seq() == 0 || m->get_seq() > seq) break;
     ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq "
                    << m->get_seq() << " <= " << seq << ", discarding"
                    << dendl;
-    m->put();
-    rq.pop_front();
     count++;
   }
   if (rq.empty()) out_q.erase(it);
@@ -1258,16 +1246,13 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
 void ProtocolV1::discard_out_queue() {
   ldout(cct, 10) << __func__ << " started" << dendl;
 
-  for (Message *msg : sent) {
+  for (const auto& msg : sent) {
     ldout(cct, 20) << __func__ << " discard " << msg << dendl;
-    msg->put();
   }
   sent.clear();
-  for (auto& [ prio, entries ] : out_q) {
-    static_cast<void>(prio);
+  for ([[maybe_unused]] auto& [ prio, entries ] : out_q) {
     for (auto& entry : entries) {
       ldout(cct, 20) << __func__ << " discard " << entry.m << dendl;
-      entry.m->put();
     }
   }
   out_q.clear();
@@ -1338,12 +1323,14 @@ void ProtocolV1::reset_recv_state()
 
 ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() {
   out_q_entry_t out_entry;
-  if (const auto it = out_q.begin(); it != out_q.end()) {
-    ceph_assert(!it->second.empty());
-    const auto p = it->second.begin();
-    out_entry = *p;
-    it->second.erase(p);
-    if (it->second.empty()) out_q.erase(it);
+  if (auto it = out_q.begin(); it != out_q.end()) {
+    auto& q = it->second;
+    ceph_assert(!q.empty());
+    out_entry = std::move(q.front());
+    q.pop_front();
+    if (q.empty()) {
+      out_q.erase(it);
+    }
   }
   return out_entry;
 }
index 2f7eb1e01eefa37267f0c9ab1c41690f40830b06..200d01ad35c66df8669cdc0360a70c58d2a58b50 100644 (file)
@@ -7,6 +7,8 @@
 #include "Protocol.h"
 #include "AsyncConnection.h"
 
+#include <deque>
+
 struct AuthSessionHandler;
 class ProtocolV1;
 using CtPtr = Ct<ProtocolV1>*;
@@ -106,11 +108,11 @@ protected:
 
   enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
   std::atomic<WriteStatus> can_write;
-  std::list<Message *> sent;  // the first ceph::buffer::list need to inject seq
+  std::deque<MessageRef> sent;  // the first ceph::buffer::list need to inject seq
   //struct for outbound msgs
   struct out_q_entry_t {
     ceph::buffer::list bl;
-    Message* m {nullptr};
+    MessageRef m;
     bool is_prepared {false};
   };
   // priority queue for outbound msgs
@@ -118,7 +120,7 @@ protected:
   /**
    * A queue for each priority value, highest priority first.
    */
-  std::map<int, std::list<out_q_entry_t>, std::greater<int>> out_q;
+  std::map<int, std::deque<out_q_entry_t>, std::greater<int>> out_q;
 
   bool keepalive;
   bool write_in_progress = false;
@@ -209,8 +211,8 @@ protected:
 
   out_q_entry_t _get_next_outgoing();
 
-  void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
-  ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);
+  void prepare_send_message(uint64_t features, const MessageRef& m, ceph::buffer::list &bl);
+  ssize_t write_message(const MessageRef& m, ceph::buffer::list &bl, bool more);
 
   void requeue_sent();
   uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
@@ -230,7 +232,7 @@ public:
   virtual bool is_connected() override;
   virtual void stop() override;
   virtual void fault() override;
-  virtual void send_message(Message *m) override;
+  virtual void send_message(MessageRef&& m) override;
   virtual void send_keepalive() override;
 
   virtual void read_event() override;
index 53221aa9cf1c428dcf489a069a560addd86d5ae1..376477fa2eebbcd60a5ebc89dadeb5ed7d0182cd 100644 (file)
@@ -132,16 +132,14 @@ bool ProtocolV2::is_connected() { return can_write; }
 void ProtocolV2::discard_out_queue() {
   ldout(cct, 10) << __func__ << " started" << dendl;
 
-  for (Message *msg : sent) {
-    ldout(cct, 20) << __func__ << " discard " << msg << dendl;
-    msg->put();
+  for (auto& msg : sent) {
+    ldout(cct, 20) << __func__ << " discard " << *msg << dendl;
   }
   sent.clear();
   for (auto& [ prio, entries ] : out_queue) {
     static_cast<void>(prio);
     for (auto& entry : entries) {
       ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
-      entry.m->put();
     }
   }
   out_queue.clear();
@@ -202,14 +200,13 @@ void ProtocolV2::requeue_sent() {
 
   auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
   out_seq -= sent.size();
-  while (!sent.empty()) {
-    Message *m = sent.back();
-    sent.pop_back();
+  for (; !sent.empty(); sent.pop_back()) {
+    auto& m = sent.back();
     ldout(cct, 5) << __func__ << " requeueing message m=" << m
                   << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
                   << *m << dendl;
     m->clear_payload();
-    rq.emplace_front(out_queue_entry_t{false, m});
+    rq.emplace_front(out_queue_entry_t{false, std::move(m)});
   }
 }
 
@@ -222,14 +219,14 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   }
   auto& rq = it->second;
   uint64_t count = out_seq;
-  while (!rq.empty()) {
-    Message* const m = rq.front().m;
-    if (m->get_seq() == 0 || m->get_seq() > seq) break;
+  for (; !rq.empty(); rq.pop_front()) {
+    auto& m = rq.front().m;
+    if (m->get_seq() == 0 || m->get_seq() > seq) {
+      break;
+    }
     ldout(cct, 5) << __func__ << " discarding message m=" << m
                   << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
                   << *m << dendl;
-    m->put();
-    rq.pop_front();
     count++;
   }
   if (rq.empty()) out_queue.erase(it);
@@ -419,8 +416,7 @@ CtPtr ProtocolV2::_fault() {
   return nullptr;
 }
 
-void ProtocolV2::prepare_send_message(uint64_t features,
-                                     Message *m) {
+void ProtocolV2::prepare_send_message(uint64_t features, const MessageRef& m) {
   ldout(cct, 20) << __func__ << " m=" << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
@@ -431,12 +427,12 @@ void ProtocolV2::prepare_send_message(uint64_t features,
   m->encode(features, 0);
 }
 
-void ProtocolV2::send_message(Message *m) {
+void ProtocolV2::send_message(MessageRef&& m) {
   uint64_t f = connection->get_features();
 
   // TODO: Currently not all messages supports reencode like MOSDMap, so here
   // only let fast dispatch support messages prepare message
-  const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+  const bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m);
   bool is_prepared;
   if (can_fast_prepare && f) {
     prepare_send_message(f, m);
@@ -456,17 +452,13 @@ void ProtocolV2::send_message(Message *m) {
   }
   if (state == CLOSED) {
     ldout(cct, 10) << __func__ << " connection closed."
-                   << " Drop message " << m << dendl;
-    m->put();
+                   << " Drop message " << *m << dendl;
   } else {
-    ldout(cct, 5) << __func__ << " enqueueing message m=" << m
-                  << " type=" << m->get_type() << " " << *m << dendl;
+    ldout(cct, 5) << __func__ << " enqueueing message m=" << *m << dendl;
     m->queue_start = ceph::mono_clock::now();
     m->trace.event("async enqueueing message");
-    out_queue[m->get_priority()].emplace_back(
-      out_queue_entry_t{is_prepared, m});
-    ldout(cct, 15) << __func__ << " message queued for async transmission m=" << m
-                   << dendl;
+    auto& oq = out_queue[m->get_priority()];
+    oq.emplace_back(out_queue_entry_t{is_prepared, std::move(m)});
     if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
       write_in_progress = true;
       connection->center->dispatch_event_external(connection->write_handler);
@@ -525,7 +517,7 @@ ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
   return out_entry;
 }
 
-ssize_t ProtocolV2::write_message(Message *m, bool more) {
+ssize_t ProtocolV2::write_message(const MessageRef& m, bool more) {
   FUNCTRACE(cct);
   ceph_assert(connection->center->in_thread());
   m->set_seq(++out_seq);
@@ -585,7 +577,6 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
 #endif
-  m->put();
 
   return rc;
 }
@@ -616,22 +607,18 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
   // trim sent list
   static const int max_pending = 128;
   int i = 0;
-  Message *pending[max_pending];
+  std::array<MessageRef, max_pending> pending;
   auto now = ceph::mono_clock::now();
   connection->write_lock.lock();
   while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
-    Message *m = sent.front();
+    auto& m = pending[i++] = std::move(sent.front());
     sent.pop_front();
-    pending[i++] = m;
     ldout(cct, 10) << __func__ << " got ack seq " << seq
                    << " >= " << m->get_seq() << " on " << m << " " << *m
                    << dendl;
   }
   connection->write_lock.unlock();
   connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
-  for (int k = 0; k < i; k++) {
-    pending[k]->put();
-  }
 }
 
 void ProtocolV2::reset_compression() {
@@ -671,7 +658,7 @@ void ProtocolV2::write_event() {
        }
       }
 
-      const auto out_entry = _get_next_outgoing();
+      auto out_entry = _get_next_outgoing();
       if (!out_entry.m) {
         break;
       }
@@ -679,7 +666,6 @@ void ProtocolV2::write_event() {
       if (!connection->policy.lossy) {
         // put on sent list
         sent.push_back(out_entry.m);
-        out_entry.m->get();
       }
       more = !out_queue.empty();
       connection->write_lock.unlock();
@@ -1436,18 +1422,20 @@ CtPtr ProtocolV2::handle_message() {
   ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
                         ceph_le32(0), ceph_le64(0), current_header.flags};
 
-  Message *message = decode_message(cct, 0, header, footer,
+  Message* _m = decode_message(cct, 0, header, footer,
       msg_frame.front(),
       msg_frame.middle(),
       msg_frame.data(),
       connection);
-  if (!message) {
+  if (!_m) {
     ldout(cct, 1) << __func__ << " decode message failed " << dendl;
     return _fault();
-  } else {
-    state = READ_MESSAGE_COMPLETE;
   }
 
+  auto message = ceph::ref_t<Message>(_m, false); /* consume ref */
+  _m = nullptr;
+  state = READ_MESSAGE_COMPLETE;
+
   INTERCEPT(17);
 
   message->set_byte_throttler(connection->policy.throttler_bytes);
@@ -1471,7 +1459,6 @@ CtPtr ProtocolV2::handle_message() {
     ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
                   << " <= " << cur_seq << " " << message << " " << *message
                   << ", discarding" << dendl;
-    message->put();
     if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
         cct->_conf->ms_die_on_old_message) {
       ceph_assert(0 == "old msgs despite reconnect_seq feature");
@@ -1521,7 +1508,6 @@ CtPtr ProtocolV2::handle_message() {
 
   if (connection->is_blackhole()) {
     ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
-    message->put();
     goto out;
   }
 
@@ -1546,10 +1532,10 @@ CtPtr ProtocolV2::handle_message() {
                     << (ceph_clock_now() + delay_period) << " on " << message
                     << " " << *message << dendl;
     }
-    connection->delay_state->queue(delay_period, message);
-  } else if (messenger->ms_can_fast_dispatch(message)) {
+    connection->delay_state->queue(delay_period, std::move(message));
+  } else if (messenger->ms_can_fast_dispatch(*message)) {
     connection->lock.unlock();
-    connection->dispatch_queue->fast_dispatch(message);
+    connection->dispatch_queue->fast_dispatch(std::move(message));
     connection->recv_start_time = ceph::mono_clock::now();
     connection->logger->tinc(l_msgr_running_fast_dispatch_time,
                              connection->recv_start_time - fast_dispatch_time);
@@ -1561,8 +1547,8 @@ CtPtr ProtocolV2::handle_message() {
       return nullptr;
     }
   } else {
-    connection->dispatch_queue->enqueue(message, message->get_priority(),
-                                        connection->conn_id);
+    auto p = message->get_priority();
+    connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id);
   }
 
   handle_message_ack(current_header.ack_seq);
index e82989599dfb5a48e3b66dfdff9a11238d726bde..73c225e6b3199cd30fdd8627b01cadff3326b6e0 100644 (file)
@@ -11,6 +11,8 @@
 #include "compression_onwire.h"
 #include "frames_v2.h"
 
+#include <deque>
+
 class ProtocolV2 : public Protocol {
 private:
   enum State {
@@ -92,15 +94,15 @@ private:
   bool can_write;
   struct out_queue_entry_t {
     bool is_prepared {false};
-    Message* m {nullptr};
+    MessageRef m;
   };
 
   /**
    * A queue for each priority value, highest priority first.
    */
-  std::map<int, std::list<out_queue_entry_t>, std::greater<int>> out_queue;
+  std::map<int, std::deque<out_queue_entry_t>, std::greater<int>> out_queue;
 
-  std::list<Message *> sent;
+  std::deque<MessageRef> sent;
   std::atomic<uint64_t> out_seq{0};
   std::atomic<uint64_t> in_seq{0};
   std::atomic<uint64_t> ack_left{0};
@@ -154,9 +156,9 @@ private:
   Ct<ProtocolV2> *_fault();
   void discard_out_queue();
   void reset_session();
-  void prepare_send_message(uint64_t features, Message *m);
+  void prepare_send_message(uint64_t features, const MessageRef& m);
   out_queue_entry_t _get_next_outgoing();
-  ssize_t write_message(Message *m, bool more);
+  ssize_t write_message(const MessageRef& m, bool more);
   void handle_message_ack(uint64_t seq);
   void reset_compression();
 
@@ -217,7 +219,7 @@ public:
   virtual bool is_connected() override;
   virtual void stop() override;
   virtual void fault() override;
-  virtual void send_message(Message *m) override;
+  virtual void send_message(MessageRef&& m) override;
   virtual void send_keepalive() override;
 
   virtual void read_event() override;