From: Patrick Donnelly Date: Sun, 29 Jul 2018 19:27:04 +0000 (-0700) Subject: msg: dispatch intrusive_ptr Messages X-Git-Tag: v14.0.1~575^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1aeb2f7eab89c217891259a883865afe9a0e6c5c;p=ceph.git msg: dispatch intrusive_ptr Messages This codifies the giving of a reference to the Dispatcher and helps avoid memory leaks. Old-style dispatch is kept to allow older code to continue working. Signed-off-by: Patrick Donnelly --- diff --git a/src/client/Client.cc b/src/client/Client.cc index d54a23eea59b6..7f8041d8391c7 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -6033,7 +6033,7 @@ void Client::flush_cap_releases() if (cct->_conf->client_inject_release_failure) { ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl; } else { - session.con->send_message(session.release); + session.con->send_message2(session.release); } session.release.reset(); } diff --git a/src/msg/Connection.h b/src/msg/Connection.h index d4ed134b2714f..7d0f39385282f 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -19,23 +19,20 @@ #include #include -// Because intusive_ptr clobbers our assert... -#include "include/assert.h" - -#include "include/types.h" -#include "include/buffer.h" #include "common/RefCountedObj.h" - -#include "common/debug.h" #include "common/config.h" +#include "common/debug.h" +#include "include/assert.h" // Because intusive_ptr clobbers our assert... +#include "include/buffer.h" +#include "include/types.h" +#include "msg/MessageRef.h" // ====================================================== // abstract Connection, for keeping per-connection state -class Message; class Messenger; struct Connection : public RefCountedObject { @@ -109,7 +106,7 @@ public: */ virtual int send_message(Message *m) = 0; - int send_message(boost::intrusive_ptr m) + virtual int send_message2(MessageRef m) { return send_message(m.detach()); /* send_message(Message *m) consumes a reference */ } diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index 6755213e92f86..85616f68f9a56 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -36,7 +36,7 @@ double DispatchQueue::get_max_age(utime_t now) const { return (now - marrival.begin()->first); } -uint64_t DispatchQueue::pre_dispatch(Message *m) +uint64_t DispatchQueue::pre_dispatch(const Message::ref& m) { ldout(cct,1) << "<== " << m->get_source_inst() << " " << m->get_seq() @@ -54,50 +54,46 @@ uint64_t DispatchQueue::pre_dispatch(Message *m) return msize; } -void DispatchQueue::post_dispatch(Message *m, uint64_t msize) +void DispatchQueue::post_dispatch(const Message::ref& m, uint64_t msize) { dispatch_throttle_release(msize); ldout(cct,20) << "done calling dispatch on " << m << dendl; } -bool DispatchQueue::can_fast_dispatch(const Message *m) const +bool DispatchQueue::can_fast_dispatch(const Message::const_ref &m) const { return msgr->ms_can_fast_dispatch(m); } -void DispatchQueue::fast_dispatch(Message *m) +void DispatchQueue::fast_dispatch(const Message::ref& m) { uint64_t msize = pre_dispatch(m); msgr->ms_fast_dispatch(m); post_dispatch(m, msize); } -void DispatchQueue::fast_preprocess(Message *m) +void DispatchQueue::fast_preprocess(const Message::ref& m) { msgr->ms_fast_preprocess(m); } -void DispatchQueue::enqueue(Message *m, int priority, uint64_t id) +void DispatchQueue::enqueue(const Message::ref& m, int priority, uint64_t id) { - Mutex::Locker l(lock); if (stop) { - m->put(); return; } ldout(cct,20) << "queue " << m << " prio " << priority << dendl; add_arrival(m); if (priority >= CEPH_MSG_PRIO_LOW) { - mqueue.enqueue_strict( - id, priority, QueueItem(m)); + mqueue.enqueue_strict(id, priority, QueueItem(m)); } else { - mqueue.enqueue( - id, priority, m->get_cost(), QueueItem(m)); + mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m)); } cond.Signal(); } -void DispatchQueue::local_delivery(Message *m, int priority) +void DispatchQueue::local_delivery(const Message::ref& m, int priority) { m->set_recv_stamp(ceph_clock_now()); Mutex::Locker l(local_delivery_lock); @@ -117,11 +113,11 @@ void DispatchQueue::run_local_delivery() local_delivery_cond.Wait(local_delivery_lock); continue; } - pair mp = local_messages.front(); + auto p = local_messages.front(); local_messages.pop_front(); local_delivery_lock.Unlock(); - Message *m = mp.first; - int priority = mp.second; + const Message::ref& m = p.first; + int priority = p.second; fast_preprocess(m); if (can_fast_dispatch(m)) { fast_dispatch(m); @@ -192,10 +188,9 @@ void DispatchQueue::entry() ceph_abort(); } } else { - Message *m = qitem.get_message(); + const Message::ref& m = qitem.get_message(); if (stop) { ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl; - m->put(); } else { uint64_t msize = pre_dispatch(m); msgr->ms_deliver_dispatch(m); @@ -222,10 +217,9 @@ void DispatchQueue::discard_queue(uint64_t id) { i != removed.end(); ++i) { assert(!(i->is_code())); // We don't discard id 0, ever! - Message *m = i->get_message(); + const Message::ref& m = i->get_message(); remove_arrival(m); dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); } } @@ -245,11 +239,10 @@ void DispatchQueue::wait() void DispatchQueue::discard_local() { - for (list >::iterator p = local_messages.begin(); + for (list >::iterator p = local_messages.begin(); p != local_messages.end(); ++p) { ldout(cct,20) << __func__ << " " << p->first << dendl; - p->first->put(); } local_messages.clear(); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index e95712d5d7939..a99dc48e869e8 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -26,9 +26,10 @@ #include "common/Thread.h" #include "common/PrioritizedQueue.h" +#include "Message.h" + class CephContext; class Messenger; -class Message; struct Connection; /** @@ -41,9 +42,9 @@ class DispatchQueue { class QueueItem { int type; ConnectionRef con; - MessageRef m; + Message::ref m; public: - explicit QueueItem(Message *m) : type(-1), con(0), m(m) {} + explicit QueueItem(const Message::ref& m) : type(-1), con(0), m(m) {} QueueItem(int type, Connection *con) : type(type), con(con), m(0) {} bool is_code() const { return type != -1; @@ -52,9 +53,9 @@ class DispatchQueue { assert(is_code()); return type; } - Message *get_message() { + const Message::ref& get_message() { assert(!is_code()); - return m.get(); + return m; } Connection *get_connection() { assert(is_code()); @@ -69,9 +70,9 @@ class DispatchQueue { PrioritizedQueue mqueue; - set > marrival; - map >::iterator> marrival_map; - void add_arrival(Message *m) { + std::set> marrival; + map marrival_map; + void add_arrival(const Message::ref& m) { marrival_map.insert( make_pair( m, @@ -79,12 +80,11 @@ class DispatchQueue { ) ); } - void remove_arrival(Message *m) { - map >::iterator>::iterator i = - marrival_map.find(m); - assert(i != marrival_map.end()); - marrival.erase(i->second); - marrival_map.erase(i); + void remove_arrival(const Message::ref& m) { + auto it = marrival_map.find(m); + assert(it != marrival_map.end()); + marrival.erase(it->second); + marrival_map.erase(it); } std::atomic next_id; @@ -107,7 +107,7 @@ class DispatchQueue { Mutex local_delivery_lock; Cond local_delivery_cond; bool stop_local_delivery; - list > local_messages; + list> local_messages; class LocalDeliveryThread : public Thread { DispatchQueue *dq; public: @@ -118,8 +118,8 @@ class DispatchQueue { } } local_delivery_thread; - uint64_t pre_dispatch(Message *m); - void post_dispatch(Message *m, uint64_t msize); + uint64_t pre_dispatch(const Message::ref& m); + void post_dispatch(const Message::ref& m, uint64_t msize); public: @@ -127,7 +127,10 @@ class DispatchQueue { Throttle dispatch_throttler; bool stop; - void local_delivery(Message *m, int priority); + void local_delivery(const Message::ref& m, int priority); + void local_delivery(Message* m, int priority) { + return local_delivery(Message::ref(m, false), priority); /* consume ref */ + } void run_local_delivery(); double get_max_age(utime_t now) const; @@ -195,10 +198,16 @@ class DispatchQueue { cond.Signal(); } - bool can_fast_dispatch(const Message *m) const; - void fast_dispatch(Message *m); - void fast_preprocess(Message *m); - void enqueue(Message *m, int priority, uint64_t id); + bool can_fast_dispatch(const Message::const_ref &m) const; + void fast_dispatch(const Message::ref& m); + void fast_dispatch(Message* m) { + return fast_dispatch(Message::ref(m, false)); /* consume ref */ + } + void fast_preprocess(const Message::ref& m); + void enqueue(const Message::ref& m, int priority, uint64_t id); + void enqueue(Message* m, int priority, uint64_t id) { + return enqueue(Message::ref(m, false), priority, id); /* consume ref */ + } void discard_queue(uint64_t id); void discard_local(); uint64_t get_id() { diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index faab248d60ffa..c5f19736cda15 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -19,9 +19,9 @@ #include #include "include/buffer_fwd.h" #include "include/assert.h" +#include "msg/MessageRef.h" class Messenger; -class Message; class Connection; class AuthAuthorizer; class CryptoKey; @@ -60,7 +60,10 @@ public: * @param m The message we want to fast dispatch. * @returns True if the message can be fast dispatched; false otherwise. */ - virtual bool ms_can_fast_dispatch(const Message *m) const { return false;} + virtual bool ms_can_fast_dispatch(const Message *m) const { return false; } + virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const { + return ms_can_fast_dispatch(m.get()); + } /** * This function determines if a dispatcher is included in the * list of fast-dispatch capable Dispatchers. @@ -75,6 +78,13 @@ public: * @param m The Message to fast dispatch. */ virtual void ms_fast_dispatch(Message *m) { ceph_abort(); } + + /* ms_fast_dispatch2 because otherwise the child must define both */ + virtual void ms_fast_dispatch2(const MessageRef &m) { + /* allow old style dispatch handling that expects a Message * with a floating ref */ + return ms_fast_dispatch(MessageRef(m).detach()); /* XXX N.B. always consumes ref */ + } + /** * Let the Dispatcher preview a Message before it is dispatched. This * function is called on *every* Message, prior to the fast/regular dispatch @@ -91,13 +101,33 @@ public: * @param m A message which has been received */ virtual void ms_fast_preprocess(Message *m) {} + + /* ms_fast_preprocess2 because otherwise the child must define both */ + virtual void ms_fast_preprocess2(const MessageRef &m) { + /* allow old style dispatch handling that expects a Message* */ + return ms_fast_preprocess(m.get()); + } + /** * The Messenger calls this function to deliver a single message. * * @param m The message being delivered. You (the Dispatcher) * are given a single reference count on it. */ - virtual bool ms_dispatch(Message *m) = 0; + virtual bool ms_dispatch(Message *m) { + ceph_abort(); + } + + /* ms_dispatch2 because otherwise the child must define both */ + virtual bool ms_dispatch2(const MessageRef &m) { + /* allow old style dispatch handling that expects a Message * with a floating ref */ + MessageRef mr(m); + if (ms_dispatch(mr.get())) { + mr.detach(); /* dispatcher consumed ref */ + return true; + } + return false; + } /** * This function will be called whenever a Connection is newly-created diff --git a/src/msg/Message.h b/src/msg/Message.h index be3f599f18cbe..5a52b6370ecfc 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -18,22 +18,19 @@ #include #include -#include #include -// Because intrusive_ptr clobbers our assert... -#include "include/assert.h" -#include "include/types.h" -#include "include/buffer.h" +#include "common/RefCountedObj.h" #include "common/ThrottleInterface.h" +#include "common/config.h" +#include "common/debug.h" #include "common/zipkin_trace.h" -#include "msg_types.h" - -#include "common/RefCountedObj.h" +#include "include/assert.h" // Because intrusive_ptr clobbers our assert... +#include "include/buffer.h" +#include "include/types.h" #include "msg/Connection.h" - -#include "common/debug.h" -#include "common/config.h" +#include "msg/MessageRef.h" +#include "msg_types.h" // monitor internal #define MSG_MON_SCRUB 64 @@ -259,6 +256,9 @@ protected: bi::list_member_hook<> dispatch_q; public: + using ref = MessageRef; + using const_ref = MessageConstRef; + // zipkin tracing ZTracer::Trace trace; void encode_trace(bufferlist &bl, uint64_t features) const; @@ -501,7 +501,6 @@ public: void encode(uint64_t features, int crcflags); }; -typedef boost::intrusive_ptr MessageRef; extern Message *decode_message(CephContext *cct, int crcflags, ceph_msg_header &header, diff --git a/src/msg/MessageRef.h b/src/msg/MessageRef.h new file mode 100644 index 0000000000000..6dac3b8657e7d --- /dev/null +++ b/src/msg/MessageRef.h @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MESSAGEREF_H +#define CEPH_MESSAGEREF_H + +#include + +class Message; + +typedef boost::intrusive_ptr MessageRef; +typedef boost::intrusive_ptr MessageConstRef; + +#endif diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 491b3df830f0e..187223b03ed89 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -594,9 +594,9 @@ public: * * @param m The Message we are testing. */ - bool ms_can_fast_dispatch(const Message* m) { + bool ms_can_fast_dispatch(const Message::const_ref& m) { for (const auto &dispatcher : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch(m)) + if (dispatcher->ms_can_fast_dispatch2(m)) return true; } return false; @@ -605,26 +605,28 @@ public: /** * Deliver a single Message via "fast dispatch". * - * @param m The Message we are fast dispatching. We take ownership - * of one reference to it. + * @param m The Message we are fast dispatching. * If none of our Dispatchers can handle it, ceph_abort(). */ - void ms_fast_dispatch(Message* m) { + void ms_fast_dispatch(const Message::ref &m) { m->set_dispatch_stamp(ceph_clock_now()); for (const auto &dispatcher : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch(m)) { - dispatcher->ms_fast_dispatch(m); + if (dispatcher->ms_can_fast_dispatch2(m)) { + dispatcher->ms_fast_dispatch2(m); return; } } ceph_abort(); } + void ms_fast_dispatch(Message *m) { + return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */ + } /** * */ - void ms_fast_preprocess(Message* m) { + void ms_fast_preprocess(const Message::ref &m) { for (const auto &dispatcher : fast_dispatchers) { - dispatcher->ms_fast_preprocess(m); + dispatcher->ms_fast_preprocess2(m); } } /** @@ -632,19 +634,20 @@ public: * in sequence until one of them handles it. * If none of our Dispatchers can handle it, assert(0). * - * @param m The Message to deliver. We take ownership of - * one reference to it. + * @param m The Message to deliver. */ - void ms_deliver_dispatch(Message* m) { + void ms_deliver_dispatch(const Message::ref &m) { m->set_dispatch_stamp(ceph_clock_now()); for (const auto &dispatcher : dispatchers) { - if (dispatcher->ms_dispatch(m)) + if (dispatcher->ms_dispatch2(m)) return; } lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; assert(!cct->_conf->ms_die_on_unhandled_msg); - m->put(); + } + void ms_deliver_dispatch(Message *m) { + return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */ } /** * Notify each Dispatcher of a new Connection. Call diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc index 8ad15b6d55347..0bbee4f3835cd 100644 --- a/src/msg/QueueStrategy.cc +++ b/src/msg/QueueStrategy.cc @@ -45,16 +45,15 @@ void QueueStrategy::ds_dispatch(Message *m) { void QueueStrategy::entry(QSThread *thrd) { - Message *m = NULL; for (;;) { + Message::ref m; lock.Lock(); for (;;) { if (! mqueue.empty()) { - m = &(mqueue.front()); + m = Message::ref(&mqueue.front(), false); mqueue.pop_front(); break; } - m = NULL; if (stop) break; disp_threads.push_front(*thrd); @@ -63,7 +62,6 @@ void QueueStrategy::entry(QSThread *thrd) lock.Unlock(); if (stop) { if (!m) break; - m->put(); continue; } get_messenger()->ms_deliver_dispatch(m);