if (cct->_conf->client_inject_release_failure) {
ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
} else {
- session.con->send_message(session.release);
+ session.con->send_message2(session.release);
}
session.release.reset();
}
#include <ostream>
#include <boost/intrusive_ptr.hpp>
-// Because intusive_ptr clobbers our assert...
-#include "include/assert.h"
-
-#include "include/types.h"
-#include "include/buffer.h"
#include "common/RefCountedObj.h"
-
-#include "common/debug.h"
#include "common/config.h"
+#include "common/debug.h"
+#include "include/assert.h" // Because intusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
+#include "msg/MessageRef.h"
// ======================================================
// abstract Connection, for keeping per-connection state
-class Message;
class Messenger;
struct Connection : public RefCountedObject {
*/
virtual int send_message(Message *m) = 0;
- int send_message(boost::intrusive_ptr<Message> m)
+ virtual int send_message2(MessageRef m)
{
return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
}
return (now - marrival.begin()->first);
}
-uint64_t DispatchQueue::pre_dispatch(Message *m)
+uint64_t DispatchQueue::pre_dispatch(const Message::ref& m)
{
ldout(cct,1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
return msize;
}
-void DispatchQueue::post_dispatch(Message *m, uint64_t msize)
+void DispatchQueue::post_dispatch(const Message::ref& m, uint64_t msize)
{
dispatch_throttle_release(msize);
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
-bool DispatchQueue::can_fast_dispatch(const Message *m) const
+bool DispatchQueue::can_fast_dispatch(const Message::const_ref &m) const
{
return msgr->ms_can_fast_dispatch(m);
}
-void DispatchQueue::fast_dispatch(Message *m)
+void DispatchQueue::fast_dispatch(const Message::ref& m)
{
uint64_t msize = pre_dispatch(m);
msgr->ms_fast_dispatch(m);
post_dispatch(m, msize);
}
-void DispatchQueue::fast_preprocess(Message *m)
+void DispatchQueue::fast_preprocess(const Message::ref& m)
{
msgr->ms_fast_preprocess(m);
}
-void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
+void DispatchQueue::enqueue(const Message::ref& m, int priority, uint64_t id)
{
-
Mutex::Locker l(lock);
if (stop) {
- m->put();
return;
}
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
add_arrival(m);
if (priority >= CEPH_MSG_PRIO_LOW) {
- mqueue.enqueue_strict(
- id, priority, QueueItem(m));
+ mqueue.enqueue_strict(id, priority, QueueItem(m));
} else {
- mqueue.enqueue(
- id, priority, m->get_cost(), QueueItem(m));
+ mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
}
cond.Signal();
}
-void DispatchQueue::local_delivery(Message *m, int priority)
+void DispatchQueue::local_delivery(const Message::ref& m, int priority)
{
m->set_recv_stamp(ceph_clock_now());
Mutex::Locker l(local_delivery_lock);
local_delivery_cond.Wait(local_delivery_lock);
continue;
}
- pair<Message *, int> mp = local_messages.front();
+ auto p = local_messages.front();
local_messages.pop_front();
local_delivery_lock.Unlock();
- Message *m = mp.first;
- int priority = mp.second;
+ const Message::ref& m = p.first;
+ int priority = p.second;
fast_preprocess(m);
if (can_fast_dispatch(m)) {
fast_dispatch(m);
ceph_abort();
}
} else {
- Message *m = qitem.get_message();
+ const Message::ref& m = qitem.get_message();
if (stop) {
ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;
- m->put();
} else {
uint64_t msize = pre_dispatch(m);
msgr->ms_deliver_dispatch(m);
i != removed.end();
++i) {
assert(!(i->is_code())); // We don't discard id 0, ever!
- Message *m = i->get_message();
+ const Message::ref& m = i->get_message();
remove_arrival(m);
dispatch_throttle_release(m->get_dispatch_throttle_size());
- m->put();
}
}
void DispatchQueue::discard_local()
{
- for (list<pair<Message *, int> >::iterator p = local_messages.begin();
+ for (list<pair<Message::ref, int> >::iterator p = local_messages.begin();
p != local_messages.end();
++p) {
ldout(cct,20) << __func__ << " " << p->first << dendl;
- p->first->put();
}
local_messages.clear();
}
#include "common/Thread.h"
#include "common/PrioritizedQueue.h"
+#include "Message.h"
+
class CephContext;
class Messenger;
-class Message;
struct Connection;
/**
class QueueItem {
int type;
ConnectionRef con;
- MessageRef m;
+ Message::ref m;
public:
- explicit QueueItem(Message *m) : type(-1), con(0), m(m) {}
+ explicit QueueItem(const Message::ref& m) : type(-1), con(0), m(m) {}
QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
bool is_code() const {
return type != -1;
assert(is_code());
return type;
}
- Message *get_message() {
+ const Message::ref& get_message() {
assert(!is_code());
- return m.get();
+ return m;
}
Connection *get_connection() {
assert(is_code());
PrioritizedQueue<QueueItem, uint64_t> mqueue;
- set<pair<double, Message*> > marrival;
- map<Message *, set<pair<double, Message*> >::iterator> marrival_map;
- void add_arrival(Message *m) {
+ std::set<pair<double, Message::ref>> marrival;
+ map<Message::ref, decltype(marrival)::iterator> marrival_map;
+ void add_arrival(const Message::ref& m) {
marrival_map.insert(
make_pair(
m,
)
);
}
- void remove_arrival(Message *m) {
- map<Message *, set<pair<double, Message*> >::iterator>::iterator i =
- marrival_map.find(m);
- assert(i != marrival_map.end());
- marrival.erase(i->second);
- marrival_map.erase(i);
+ void remove_arrival(const Message::ref& m) {
+ auto it = marrival_map.find(m);
+ assert(it != marrival_map.end());
+ marrival.erase(it->second);
+ marrival_map.erase(it);
}
std::atomic<uint64_t> next_id;
Mutex local_delivery_lock;
Cond local_delivery_cond;
bool stop_local_delivery;
- list<pair<Message *, int> > local_messages;
+ list<pair<Message::ref, int>> local_messages;
class LocalDeliveryThread : public Thread {
DispatchQueue *dq;
public:
}
} local_delivery_thread;
- uint64_t pre_dispatch(Message *m);
- void post_dispatch(Message *m, uint64_t msize);
+ uint64_t pre_dispatch(const Message::ref& m);
+ void post_dispatch(const Message::ref& m, uint64_t msize);
public:
Throttle dispatch_throttler;
bool stop;
- void local_delivery(Message *m, int priority);
+ void local_delivery(const Message::ref& m, int priority);
+ void local_delivery(Message* m, int priority) {
+ return local_delivery(Message::ref(m, false), priority); /* consume ref */
+ }
void run_local_delivery();
double get_max_age(utime_t now) const;
cond.Signal();
}
- bool can_fast_dispatch(const Message *m) const;
- void fast_dispatch(Message *m);
- void fast_preprocess(Message *m);
- void enqueue(Message *m, int priority, uint64_t id);
+ bool can_fast_dispatch(const Message::const_ref &m) const;
+ void fast_dispatch(const Message::ref& m);
+ void fast_dispatch(Message* m) {
+ return fast_dispatch(Message::ref(m, false)); /* consume ref */
+ }
+ void fast_preprocess(const Message::ref& m);
+ void enqueue(const Message::ref& m, int priority, uint64_t id);
+ void enqueue(Message* m, int priority, uint64_t id) {
+ return enqueue(Message::ref(m, false), priority, id); /* consume ref */
+ }
void discard_queue(uint64_t id);
void discard_local();
uint64_t get_id() {
#include <memory>
#include "include/buffer_fwd.h"
#include "include/assert.h"
+#include "msg/MessageRef.h"
class Messenger;
-class Message;
class Connection;
class AuthAuthorizer;
class CryptoKey;
* @param m The message we want to fast dispatch.
* @returns True if the message can be fast dispatched; false otherwise.
*/
- virtual bool ms_can_fast_dispatch(const Message *m) const { return false;}
+ virtual bool ms_can_fast_dispatch(const Message *m) const { return false; }
+ virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const {
+ return ms_can_fast_dispatch(m.get());
+ }
/**
* This function determines if a dispatcher is included in the
* list of fast-dispatch capable Dispatchers.
* @param m The Message to fast dispatch.
*/
virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }
+
+ /* ms_fast_dispatch2 because otherwise the child must define both */
+ virtual void ms_fast_dispatch2(const MessageRef &m) {
+ /* allow old style dispatch handling that expects a Message * with a floating ref */
+ return ms_fast_dispatch(MessageRef(m).detach()); /* XXX N.B. always consumes ref */
+ }
+
/**
* Let the Dispatcher preview a Message before it is dispatched. This
* function is called on *every* Message, prior to the fast/regular dispatch
* @param m A message which has been received
*/
virtual void ms_fast_preprocess(Message *m) {}
+
+ /* ms_fast_preprocess2 because otherwise the child must define both */
+ virtual void ms_fast_preprocess2(const MessageRef &m) {
+ /* allow old style dispatch handling that expects a Message* */
+ return ms_fast_preprocess(m.get());
+ }
+
/**
* The Messenger calls this function to deliver a single message.
*
* @param m The message being delivered. You (the Dispatcher)
* are given a single reference count on it.
*/
- virtual bool ms_dispatch(Message *m) = 0;
+ virtual bool ms_dispatch(Message *m) {
+ ceph_abort();
+ }
+
+ /* ms_dispatch2 because otherwise the child must define both */
+ virtual bool ms_dispatch2(const MessageRef &m) {
+ /* allow old style dispatch handling that expects a Message * with a floating ref */
+ MessageRef mr(m);
+ if (ms_dispatch(mr.get())) {
+ mr.detach(); /* dispatcher consumed ref */
+ return true;
+ }
+ return false;
+ }
/**
* This function will be called whenever a Connection is newly-created
#include <stdlib.h>
#include <ostream>
-#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/list.hpp>
-// Because intrusive_ptr clobbers our assert...
-#include "include/assert.h"
-#include "include/types.h"
-#include "include/buffer.h"
+#include "common/RefCountedObj.h"
#include "common/ThrottleInterface.h"
+#include "common/config.h"
+#include "common/debug.h"
#include "common/zipkin_trace.h"
-#include "msg_types.h"
-
-#include "common/RefCountedObj.h"
+#include "include/assert.h" // Because intrusive_ptr clobbers our assert...
+#include "include/buffer.h"
+#include "include/types.h"
#include "msg/Connection.h"
-
-#include "common/debug.h"
-#include "common/config.h"
+#include "msg/MessageRef.h"
+#include "msg_types.h"
// monitor internal
#define MSG_MON_SCRUB 64
bi::list_member_hook<> dispatch_q;
public:
+ using ref = MessageRef;
+ using const_ref = MessageConstRef;
+
// zipkin tracing
ZTracer::Trace trace;
void encode_trace(bufferlist &bl, uint64_t features) const;
void encode(uint64_t features, int crcflags);
};
-typedef boost::intrusive_ptr<Message> MessageRef;
extern Message *decode_message(CephContext *cct, int crcflags,
ceph_msg_header &header,
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc. <contact@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MESSAGEREF_H
+#define CEPH_MESSAGEREF_H
+
+#include <boost/intrusive_ptr.hpp>
+
+class Message;
+
+typedef boost::intrusive_ptr<Message> MessageRef;
+typedef boost::intrusive_ptr<Message const> MessageConstRef;
+
+#endif
*
* @param m The Message we are testing.
*/
- bool ms_can_fast_dispatch(const Message* m) {
+ bool ms_can_fast_dispatch(const Message::const_ref& m) {
for (const auto &dispatcher : fast_dispatchers) {
- if (dispatcher->ms_can_fast_dispatch(m))
+ if (dispatcher->ms_can_fast_dispatch2(m))
return true;
}
return false;
/**
* Deliver a single Message via "fast dispatch".
*
- * @param m The Message we are fast dispatching. We take ownership
- * of one reference to it.
+ * @param m The Message we are fast dispatching.
* If none of our Dispatchers can handle it, ceph_abort().
*/
- void ms_fast_dispatch(Message* m) {
+ void ms_fast_dispatch(const Message::ref &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : fast_dispatchers) {
- if (dispatcher->ms_can_fast_dispatch(m)) {
- dispatcher->ms_fast_dispatch(m);
+ if (dispatcher->ms_can_fast_dispatch2(m)) {
+ dispatcher->ms_fast_dispatch2(m);
return;
}
}
ceph_abort();
}
+ void ms_fast_dispatch(Message *m) {
+ return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */
+ }
/**
*
*/
- void ms_fast_preprocess(Message* m) {
+ void ms_fast_preprocess(const Message::ref &m) {
for (const auto &dispatcher : fast_dispatchers) {
- dispatcher->ms_fast_preprocess(m);
+ dispatcher->ms_fast_preprocess2(m);
}
}
/**
* in sequence until one of them handles it.
* If none of our Dispatchers can handle it, assert(0).
*
- * @param m The Message to deliver. We take ownership of
- * one reference to it.
+ * @param m The Message to deliver.
*/
- void ms_deliver_dispatch(Message* m) {
+ void ms_deliver_dispatch(const Message::ref &m) {
m->set_dispatch_stamp(ceph_clock_now());
for (const auto &dispatcher : dispatchers) {
- if (dispatcher->ms_dispatch(m))
+ if (dispatcher->ms_dispatch2(m))
return;
}
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
assert(!cct->_conf->ms_die_on_unhandled_msg);
- m->put();
+ }
+ void ms_deliver_dispatch(Message *m) {
+ return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
}
/**
* Notify each Dispatcher of a new Connection. Call
void QueueStrategy::entry(QSThread *thrd)
{
- Message *m = NULL;
for (;;) {
+ Message::ref m;
lock.Lock();
for (;;) {
if (! mqueue.empty()) {
- m = &(mqueue.front());
+ m = Message::ref(&mqueue.front(), false);
mqueue.pop_front();
break;
}
- m = NULL;
if (stop)
break;
disp_threads.push_front(*thrd);
lock.Unlock();
if (stop) {
if (!m) break;
- m->put();
continue;
}
get_messenger()->ms_deliver_dispatch(m);