]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: dispatch intrusive_ptr Messages
authorPatrick Donnelly <pdonnell@redhat.com>
Sun, 29 Jul 2018 19:27:04 +0000 (12:27 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 15 Aug 2018 04:20:55 +0000 (21:20 -0700)
This codifies the giving of a reference to the Dispatcher and helps avoid
memory leaks. Old-style dispatch is kept to allow older code to continue
working.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/client/Client.cc
src/msg/Connection.h
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Message.h
src/msg/MessageRef.h [new file with mode: 0644]
src/msg/Messenger.h
src/msg/QueueStrategy.cc

index d54a23eea59b6dea9fa7ad9c1f49d00d86198f9d..7f8041d8391c7e4c5173ce64502c863dbf479575 100644 (file)
@@ -6033,7 +6033,7 @@ void Client::flush_cap_releases()
       if (cct->_conf->client_inject_release_failure) {
         ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
       } else {
-        session.con->send_message(session.release);
+        session.con->send_message2(session.release);
       }
       session.release.reset();
     }
index d4ed134b2714ff9d63e97c19bebd7f66de3da7f9..7d0f39385282ff1f310317709b86c742f7f8e328 100644 (file)
 #include <ostream>
 
 #include <boost/intrusive_ptr.hpp>
-// Because intusive_ptr clobbers our assert...
-#include "include/assert.h"
-
-#include "include/types.h"
-#include "include/buffer.h"
 
 #include "common/RefCountedObj.h"
-
-#include "common/debug.h"
 #include "common/config.h"
+#include "common/debug.h"
+#include "include/assert.h" // Because intusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
+#include "msg/MessageRef.h"
 
 
 // ======================================================
 
 // abstract Connection, for keeping per-connection state
 
-class Message;
 class Messenger;
 
 struct Connection : public RefCountedObject {
@@ -109,7 +106,7 @@ public:
    */
   virtual int send_message(Message *m) = 0;
 
-  int send_message(boost::intrusive_ptr<Message> m)
+  virtual int send_message2(MessageRef m)
   {
     return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
   }
index 6755213e92f861475a668d89a85f603dd65d2a0d..85616f68f9a56a8554e5df259f78509c8c653bd6 100644 (file)
@@ -36,7 +36,7 @@ double DispatchQueue::get_max_age(utime_t now) const {
     return (now - marrival.begin()->first);
 }
 
-uint64_t DispatchQueue::pre_dispatch(Message *m)
+uint64_t DispatchQueue::pre_dispatch(const Message::ref& m)
 {
   ldout(cct,1) << "<== " << m->get_source_inst()
               << " " << m->get_seq()
@@ -54,50 +54,46 @@ uint64_t DispatchQueue::pre_dispatch(Message *m)
   return msize;
 }
 
-void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
+void DispatchQueue::post_dispatch(const Message::ref& m, uint64_t msize)
 {
   dispatch_throttle_release(msize);
   ldout(cct,20) << "done calling dispatch on " << m << dendl;
 }
 
-bool DispatchQueue::can_fast_dispatch(const Message *m) const
+bool DispatchQueue::can_fast_dispatch(const Message::const_ref &m) const
 {
   return msgr->ms_can_fast_dispatch(m);
 }
 
-void DispatchQueue::fast_dispatch(Message *m)
+void DispatchQueue::fast_dispatch(const Message::ref& m)
 {
   uint64_t msize = pre_dispatch(m);
   msgr->ms_fast_dispatch(m);
   post_dispatch(m, msize);
 }
 
-void DispatchQueue::fast_preprocess(Message *m)
+void DispatchQueue::fast_preprocess(const Message::ref& m)
 {
   msgr->ms_fast_preprocess(m);
 }
 
-void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
+void DispatchQueue::enqueue(const Message::ref& m, int priority, uint64_t id)
 {
-
   Mutex::Locker l(lock);
   if (stop) {
-    m->put();
     return;
   }
   ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
   add_arrival(m);
   if (priority >= CEPH_MSG_PRIO_LOW) {
-    mqueue.enqueue_strict(
-        id, priority, QueueItem(m));
+    mqueue.enqueue_strict(id, priority, QueueItem(m));
   } else {
-    mqueue.enqueue(
-        id, priority, m->get_cost(), QueueItem(m));
+    mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
   }
   cond.Signal();
 }
 
-void DispatchQueue::local_delivery(Message *m, int priority)
+void DispatchQueue::local_delivery(const Message::ref& m, int priority)
 {
   m->set_recv_stamp(ceph_clock_now());
   Mutex::Locker l(local_delivery_lock);
@@ -117,11 +113,11 @@ void DispatchQueue::run_local_delivery()
       local_delivery_cond.Wait(local_delivery_lock);
       continue;
     }
-    pair<Message *, int> mp = local_messages.front();
+    auto p = local_messages.front();
     local_messages.pop_front();
     local_delivery_lock.Unlock();
-    Message *m = mp.first;
-    int priority = mp.second;
+    const Message::ref& m = p.first;
+    int priority = p.second;
     fast_preprocess(m);
     if (can_fast_dispatch(m)) {
       fast_dispatch(m);
@@ -192,10 +188,9 @@ void DispatchQueue::entry()
          ceph_abort();
        }
       } else {
-       Message *m = qitem.get_message();
+       const Message::ref& m = qitem.get_message();
        if (stop) {
          ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
-         m->put();
        } else {
          uint64_t msize = pre_dispatch(m);
          msgr->ms_deliver_dispatch(m);
@@ -222,10 +217,9 @@ void DispatchQueue::discard_queue(uint64_t id) {
        i != removed.end();
        ++i) {
     assert(!(i->is_code())); // We don't discard id 0, ever!
-    Message *m = i->get_message();
+    const Message::ref& m = i->get_message();
     remove_arrival(m);
     dispatch_throttle_release(m->get_dispatch_throttle_size());
-    m->put();
   }
 }
 
@@ -245,11 +239,10 @@ void DispatchQueue::wait()
 
 void DispatchQueue::discard_local()
 {
-  for (list<pair<Message *, int> >::iterator p = local_messages.begin();
+  for (list<pair<Message::ref, int> >::iterator p = local_messages.begin();
        p != local_messages.end();
        ++p) {
     ldout(cct,20) << __func__ << " " << p->first << dendl;
-    p->first->put();
   }
   local_messages.clear();
 }
index e95712d5d79398af7cc2aa9a520881d67d3428d7..a99dc48e869e8b709b7063fa5ea538f74cfdde77 100644 (file)
 #include "common/Thread.h"
 #include "common/PrioritizedQueue.h"
 
+#include "Message.h"
+
 class CephContext;
 class Messenger;
-class Message;
 struct Connection;
 
 /**
@@ -41,9 +42,9 @@ class DispatchQueue {
   class QueueItem {
     int type;
     ConnectionRef con;
-    MessageRef m;
+    Message::ref m;
   public:
-    explicit QueueItem(Message *m) : type(-1), con(0), m(m) {}
+    explicit QueueItem(const Message::ref& m) : type(-1), con(0), m(m) {}
     QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
     bool is_code() const {
       return type != -1;
@@ -52,9 +53,9 @@ class DispatchQueue {
       assert(is_code());
       return type;
     }
-    Message *get_message() {
+    const Message::ref& get_message() {
       assert(!is_code());
-      return m.get();
+      return m;
     }
     Connection *get_connection() {
       assert(is_code());
@@ -69,9 +70,9 @@ class DispatchQueue {
 
   PrioritizedQueue<QueueItem, uint64_t> mqueue;
 
-  set<pair<double, Message*> > marrival;
-  map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
-  void add_arrival(Message *m) {
+  std::set<pair<double, Message::ref>> marrival;
+  map<Message::ref, decltype(marrival)::iterator> marrival_map;
+  void add_arrival(const Message::ref& m) {
     marrival_map.insert(
       make_pair(
        m,
@@ -79,12 +80,11 @@ class DispatchQueue {
        )
       );
   }
-  void remove_arrival(Message *m) {
-    map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
-      marrival_map.find(m);
-    assert(i != marrival_map.end());
-    marrival.erase(i->second);
-    marrival_map.erase(i);
+  void remove_arrival(const Message::ref& m) {
+    auto it = marrival_map.find(m);
+    assert(it != marrival_map.end());
+    marrival.erase(it->second);
+    marrival_map.erase(it);
   }
 
   std::atomic<uint64_t> next_id;
@@ -107,7 +107,7 @@ class DispatchQueue {
   Mutex local_delivery_lock;
   Cond local_delivery_cond;
   bool stop_local_delivery;
-  list<pair<Message *, int> > local_messages;
+  list<pair<Message::ref, int>> local_messages;
   class LocalDeliveryThread : public Thread {
     DispatchQueue *dq;
   public:
@@ -118,8 +118,8 @@ class DispatchQueue {
     }
   } local_delivery_thread;
 
-  uint64_t pre_dispatch(Message *m);
-  void post_dispatch(Message *m, uint64_t msize);
+  uint64_t pre_dispatch(const Message::ref& m);
+  void post_dispatch(const Message::ref& m, uint64_t msize);
 
  public:
 
@@ -127,7 +127,10 @@ class DispatchQueue {
   Throttle dispatch_throttler;
 
   bool stop;
-  void local_delivery(Message *m, int priority);
+  void local_delivery(const Message::ref& m, int priority);
+  void local_delivery(Message* m, int priority) {
+    return local_delivery(Message::ref(m, false), priority); /* consume ref */
+  }
   void run_local_delivery();
 
   double get_max_age(utime_t now) const;
@@ -195,10 +198,16 @@ class DispatchQueue {
     cond.Signal();
   }
 
-  bool can_fast_dispatch(const Message *m) const;
-  void fast_dispatch(Message *m);
-  void fast_preprocess(Message *m);
-  void enqueue(Message *m, int priority, uint64_t id);
+  bool can_fast_dispatch(const Message::const_ref &m) const;
+  void fast_dispatch(const Message::ref& m);
+  void fast_dispatch(Message* m) {
+    return fast_dispatch(Message::ref(m, false)); /* consume ref */
+  }
+  void fast_preprocess(const Message::ref& m);
+  void enqueue(const Message::ref& m, int priority, uint64_t id);
+  void enqueue(Message* m, int priority, uint64_t id) {
+    return enqueue(Message::ref(m, false), priority, id); /* consume ref */
+  }
   void discard_queue(uint64_t id);
   void discard_local();
   uint64_t get_id() {
index faab248d60ffa796b2f54ab86cd500a60c3159fd..c5f19736cda154d198376910d17dd6b494f14fc8 100644 (file)
@@ -19,9 +19,9 @@
 #include <memory>
 #include "include/buffer_fwd.h"
 #include "include/assert.h"
+#include "msg/MessageRef.h"
 
 class Messenger;
-class Message;
 class Connection;
 class AuthAuthorizer;
 class CryptoKey;
@@ -60,7 +60,10 @@ public:
    * @param m The message we want to fast dispatch.
    * @returns True if the message can be fast dispatched; false otherwise.
    */
-  virtual bool ms_can_fast_dispatch(const Message *m) const { return false;}
+  virtual bool ms_can_fast_dispatch(const Message *m) const { return false; }
+  virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const {
+    return ms_can_fast_dispatch(m.get());
+  }
   /**
    * This function determines if a dispatcher is included in the
    * list of fast-dispatch capable Dispatchers.
@@ -75,6 +78,13 @@ public:
    * @param m The Message to fast dispatch.
    */
   virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }
+
+  /* ms_fast_dispatch2 because otherwise the child must define both */
+  virtual void ms_fast_dispatch2(const MessageRef &m) {
+    /* allow old style dispatch handling that expects a Message * with a floating ref */
+    return ms_fast_dispatch(MessageRef(m).detach()); /* XXX N.B. always consumes ref */
+  }
+
   /**
    * Let the Dispatcher preview a Message before it is dispatched. This
    * function is called on *every* Message, prior to the fast/regular dispatch
@@ -91,13 +101,33 @@ public:
    * @param m A message which has been received
    */
   virtual void ms_fast_preprocess(Message *m) {}
+
+  /* ms_fast_preprocess2 because otherwise the child must define both */
+  virtual void ms_fast_preprocess2(const MessageRef &m) {
+    /* allow old style dispatch handling that expects a Message* */
+    return ms_fast_preprocess(m.get());
+  }
+
   /**
    * The Messenger calls this function to deliver a single message.
    *
    * @param m The message being delivered. You (the Dispatcher)
    * are given a single reference count on it.
    */
-  virtual bool ms_dispatch(Message *m) = 0;
+  virtual bool ms_dispatch(Message *m) {
+    ceph_abort();
+  }
+
+  /* ms_dispatch2 because otherwise the child must define both */
+  virtual bool ms_dispatch2(const MessageRef &m) {
+    /* allow old style dispatch handling that expects a Message * with a floating ref */
+    MessageRef mr(m);
+    if (ms_dispatch(mr.get())) {
+      mr.detach(); /* dispatcher consumed ref */
+      return true;
+    }
+    return false;
+  }
 
   /**
    * This function will be called whenever a Connection is newly-created
index be3f599f18cbec1c46fe9df96a1e98feb1427801..5a52b6370ecfc1633b54dd20ec24471f6dc86748 100644 (file)
 #include <stdlib.h>
 #include <ostream>
 
-#include <boost/intrusive_ptr.hpp>
 #include <boost/intrusive/list.hpp>
-// Because intrusive_ptr clobbers our assert...
-#include "include/assert.h"
 
-#include "include/types.h"
-#include "include/buffer.h"
+#include "common/RefCountedObj.h"
 #include "common/ThrottleInterface.h"
+#include "common/config.h"
+#include "common/debug.h"
 #include "common/zipkin_trace.h"
-#include "msg_types.h"
-
-#include "common/RefCountedObj.h"
+#include "include/assert.h" // Because intrusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
 #include "msg/Connection.h"
-
-#include "common/debug.h"
-#include "common/config.h"
+#include "msg/MessageRef.h"
+#include "msg_types.h"
 
 // monitor internal
 #define MSG_MON_SCRUB              64
@@ -259,6 +256,9 @@ protected:
   bi::list_member_hook<> dispatch_q;
 
 public:
+  using ref = MessageRef;
+  using const_ref = MessageConstRef;
+
   // zipkin tracing
   ZTracer::Trace trace;
   void encode_trace(bufferlist &bl, uint64_t features) const;
@@ -501,7 +501,6 @@ public:
 
   void encode(uint64_t features, int crcflags);
 };
-typedef boost::intrusive_ptr<Message> MessageRef;
 
 extern Message *decode_message(CephContext *cct, int crcflags,
                               ceph_msg_header &header,
diff --git a/src/msg/MessageRef.h b/src/msg/MessageRef.h
new file mode 100644 (file)
index 0000000..6dac3b8
--- /dev/null
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc. <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_MESSAGEREF_H
+#define CEPH_MESSAGEREF_H
+#include <boost/intrusive_ptr.hpp>
+
+class Message;
+
+typedef boost::intrusive_ptr<Message> MessageRef;
+typedef boost::intrusive_ptr<Message const> MessageConstRef;
+
+#endif
index 491b3df830f0e8a76d6b65e1828497b7dd138638..187223b03ed896e8145eae190646d27482d32c57 100644 (file)
@@ -594,9 +594,9 @@ public:
    *
    * @param m The Message we are testing.
    */
-  bool ms_can_fast_dispatch(const Message* m) {
+  bool ms_can_fast_dispatch(const Message::const_ref& m) {
     for (const auto &dispatcher : fast_dispatchers) {
-      if (dispatcher->ms_can_fast_dispatch(m))
+      if (dispatcher->ms_can_fast_dispatch2(m))
        return true;
     }
     return false;
@@ -605,26 +605,28 @@ public:
   /**
    * Deliver a single Message via "fast dispatch".
    *
-   * @param m The Message we are fast dispatching. We take ownership
-   * of one reference to it.
+   * @param m The Message we are fast dispatching.
    * If none of our Dispatchers can handle it, ceph_abort().
    */
-  void ms_fast_dispatch(Message* m) {
+  void ms_fast_dispatch(const Message::ref &m) {
     m->set_dispatch_stamp(ceph_clock_now());
     for (const auto &dispatcher : fast_dispatchers) {
-      if (dispatcher->ms_can_fast_dispatch(m)) {
-       dispatcher->ms_fast_dispatch(m);
+      if (dispatcher->ms_can_fast_dispatch2(m)) {
+       dispatcher->ms_fast_dispatch2(m);
        return;
       }
     }
     ceph_abort();
   }
+  void ms_fast_dispatch(Message *m) {
+    return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */
+  }
   /**
    *
    */
-  void ms_fast_preprocess(Message* m) {
+  void ms_fast_preprocess(const Message::ref &m) {
     for (const auto &dispatcher : fast_dispatchers) {
-      dispatcher->ms_fast_preprocess(m);
+      dispatcher->ms_fast_preprocess2(m);
     }
   }
   /**
@@ -632,19 +634,20 @@ public:
    *  in sequence until one of them handles it.
    *  If none of our Dispatchers can handle it, assert(0).
    *
-   *  @param m The Message to deliver. We take ownership of
-   *  one reference to it.
+   *  @param m The Message to deliver.
    */
-  void ms_deliver_dispatch(Message* m) {
+  void ms_deliver_dispatch(const Message::ref &m) {
     m->set_dispatch_stamp(ceph_clock_now());
     for (const auto &dispatcher : dispatchers) {
-      if (dispatcher->ms_dispatch(m))
+      if (dispatcher->ms_dispatch2(m))
        return;
     }
     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                         << m->get_source_inst() << dendl;
     assert(!cct->_conf->ms_die_on_unhandled_msg);
-    m->put();
+  }
+  void ms_deliver_dispatch(Message *m) {
+    return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
   }
   /**
    * Notify each Dispatcher of a new Connection. Call
index 8ad15b6d5534724f5109dce34fde9d75e76afea4..0bbee4f3835cdee4857c27e6f08f8accb726c5c9 100644 (file)
@@ -45,16 +45,15 @@ void QueueStrategy::ds_dispatch(Message *m) {
 
 void QueueStrategy::entry(QSThread *thrd)
 {
-  Message *m = NULL;
   for (;;) {
+    Message::ref m;
     lock.Lock();
     for (;;) {
       if (! mqueue.empty()) {
-       m = &(mqueue.front());
+       m = Message::ref(&mqueue.front(), false);
        mqueue.pop_front();
        break;
       }
-      m = NULL;
       if (stop)
        break;
       disp_threads.push_front(*thrd);
@@ -63,7 +62,6 @@ void QueueStrategy::entry(QSThread *thrd)
     lock.Unlock();
     if (stop) {
        if (!m) break;
-       m->put();
        continue;
     }
     get_messenger()->ms_deliver_dispatch(m);