struct AnonConnection : public Connection {
entity_addr_t socket_addr;
- int send_message(Message *m) override {
- ceph_abort_msg("send_message on anonymous connection");
- }
void send_keepalive() override {
ceph_abort_msg("send_keepalive on anonymous connection");
}
return socket_addr;
}
+protected:
+ int send_msg(MessageRef&& m) override {
+ ceph_abort_msg("send_message on anonymous connection");
+ }
+
private:
FRIEND_MAKE_REF(AnonConnection);
explicit AnonConnection(CephContext *cct, const entity_addr_t& sa)
* on the Connection policy.
*
* @param m The Message to send. The Messenger consumes a single reference
- * when you pass it in.
+ * (if a stupid pointer) when you pass it in.
*
* @return 0 on success, or -errno on failure.
*/
- virtual int send_message(Message *m) = 0;
-
- virtual int send_message2(MessageRef m)
- {
- return send_message(m.detach()); /* send_message(Message *m) consumes a reference */
+ int send_message(Message* _m) {
+ auto m = ceph::ref_t<Message>(_m, false); /* consume ref */
+ return send_msg(std::move(m));
+ }
+ int send_message2(const MessageRef& m) {
+ return send_msg(MessageRef(m));
+ }
+ int send_message2(MessageRef&& m) {
+ return send_msg(std::move(m));
}
/**
{}
~Connection() override;
+
+ virtual int send_msg(MessageRef&& m) = 0;
};
using ConnectionRef = ceph::ref_t<Connection>;
#define dout_subsys ceph_subsys_ms
#include "common/debug.h"
-using ceph::cref_t;
using ceph::ref_t;
/*******************
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
-bool DispatchQueue::can_fast_dispatch(const cref_t<Message> &m) const
+bool DispatchQueue::can_fast_dispatch(const Message& m) const
{
return msgr->ms_can_fast_dispatch(m);
}
msgr->ms_fast_preprocess(m);
}
-void DispatchQueue::enqueue(const ref_t<Message>& m, int priority, uint64_t id)
+void DispatchQueue::enqueue(ref_t<Message>&& m, int priority, uint64_t id)
{
std::lock_guard l{lock};
if (stop) {
return;
}
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
- QueueItem item{m};
+ QueueItem item{std::move(m)};
add_arrival(item);
if (priority >= CEPH_MSG_PRIO_LOW) {
mqueue.enqueue_strict(id, priority, std::move(item));
cond.notify_one();
}
-void DispatchQueue::local_delivery(const ref_t<Message>& m, int priority)
+void DispatchQueue::local_delivery(ref_t<Message>&& m, int priority)
{
auto local_delivery_stamp = ceph_clock_now();
m->set_recv_stamp(local_delivery_stamp);
std::lock_guard l{local_delivery_lock};
if (local_messages.empty())
local_delivery_cond.notify_all();
- local_messages.emplace(m, priority);
+ local_messages.emplace(std::move(m), priority);
return;
}
auto p = std::move(local_messages.front());
local_messages.pop();
l.unlock();
- const ref_t<Message>& m = p.first;
+ auto& m = p.first;
int priority = p.second;
fast_preprocess(m);
- if (can_fast_dispatch(m)) {
+ if (can_fast_dispatch(*m)) {
fast_dispatch(m);
} else {
- enqueue(m, priority, 0);
+ enqueue(std::move(m), priority, 0);
}
l.lock();
}
ArrivalSet marrival;
class QueueItem {
- int type;
+ int type = -1;
ConnectionRef con;
ceph::ref_t<Message> m;
public:
- explicit QueueItem(const ceph::ref_t<Message>& m) : type(-1), con(0), m(m) {}
- QueueItem(int type, Connection *con) : type(type), con(con), m(0) {}
+ explicit QueueItem(ceph::ref_t<Message>&& m) : m(std::move(m)) {}
+ QueueItem(int type, Connection *con) : type(type), con(con) {}
bool is_code() const {
return type != -1;
}
Throttle dispatch_throttler;
bool stop;
- void local_delivery(const ceph::ref_t<Message>& m, int priority);
- void local_delivery(Message* m, int priority) {
- return local_delivery(ceph::ref_t<Message>(m, false), priority); /* consume ref */
- }
+ void local_delivery(ceph::ref_t<Message>&& m, int priority);
void run_local_delivery();
double get_max_age(utime_t now) const;
cond.notify_all();
}
- bool can_fast_dispatch(const ceph::cref_t<Message> &m) const;
+ bool can_fast_dispatch(const Message& m) const;
void fast_dispatch(const ceph::ref_t<Message>& m);
void fast_dispatch(Message* m) {
return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
void fast_preprocess(const ceph::ref_t<Message>& m);
- void enqueue(const ceph::ref_t<Message>& m, int priority, uint64_t id);
- void enqueue(Message* m, int priority, uint64_t id) {
- return enqueue(ceph::ref_t<Message>(m, false), priority, id); /* consume ref */
- }
+ void enqueue(ceph::ref_t<Message>&& m, int priority, uint64_t id);
void discard_queue(uint64_t id);
void discard_local();
uint64_t get_id() {
* @param m The message we want to fast dispatch.
* @returns True if the message can be fast dispatched; false otherwise.
*/
- virtual bool ms_can_fast_dispatch(const Message *m) const { return false; }
- virtual bool ms_can_fast_dispatch2(const MessageConstRef& m) const {
- return ms_can_fast_dispatch(m.get());
+ virtual bool ms_can_fast_dispatch(const Message* m) const {
+ return false;
+ }
+ virtual bool ms_can_fast_dispatch2(const Message& m) const {
+ return ms_can_fast_dispatch(&m);
}
/**
* This function determines if a dispatcher is included in the
*
* @param m The Message we are testing.
*/
- bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
+ bool ms_can_fast_dispatch(const Message& m) {
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
if (dispatcher->ms_can_fast_dispatch2(m)) {
return true;
void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
- if (dispatcher->ms_can_fast_dispatch2(m)) {
+ if (dispatcher->ms_can_fast_dispatch2(*m)) {
dispatcher->ms_fast_dispatch2(m);
return;
}
<< m->get_source_inst() << dendl;
ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
}
- void ms_deliver_dispatch(Message *m) {
- return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
- }
/**
* Notify each Dispatcher of a new Connection. Call
* this function whenever a new Connection is initiated or
center->dispatch_event_external(read_handler);
}
-int AsyncConnection::send_message(Message *m)
+int AsyncConnection::send_msg(MessageRef&& m)
{
FUNCTRACE(async_msgr->cct);
+
lgeneric_subdout(async_msgr->cct, ms,
1) << "-- " << async_msgr->get_myaddrs() << " --> "
<< get_peer_addrs() << " -- "
if (is_blackhole()) {
lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
<< " blackhole " << *m << dendl;
- m->put();
return 0;
}
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
std::lock_guard<std::mutex> l(write_lock);
if (protocol->is_connected()) {
- dispatch_queue->local_delivery(m, m->get_priority());
+ dispatch_queue->local_delivery(std::move(m), m->get_priority());
} else {
ldout(async_msgr->cct, 10) << __func__ << " loopback connection closed."
<< " Drop message " << m << dendl;
- m->put();
}
return 0;
}
// may disturb users
logger->inc(l_msgr_send_messages);
- protocol->send_message(m);
+ protocol->send_message(std::move(m));
return 0;
}
void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
{
- Message *m = nullptr;
+ MessageRef m;
{
std::lock_guard<std::mutex> l(delay_lock);
register_time_events.erase(id);
if (stop_dispatch)
- return ;
+ return;
if (delay_queue.empty())
- return ;
- m = delay_queue.front();
+ return;
+ m = std::move(delay_queue.front());
delay_queue.pop_front();
}
- if (msgr->ms_can_fast_dispatch(m)) {
+ if (msgr->ms_can_fast_dispatch(*m)) {
dispatch_queue->fast_dispatch(m);
} else {
- dispatch_queue->enqueue(m, m->get_priority(), conn_id);
+ auto p = m->get_priority();
+ dispatch_queue->enqueue(std::move(m), p, conn_id);
}
}
[this]() mutable {
std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
- Message *m = delay_queue.front();
- dispatch_queue->dispatch_throttle_release(
+ {
+ auto& m = delay_queue.front();
+ dispatch_queue->dispatch_throttle_release(
m->get_dispatch_throttle_size());
- m->put();
+ }
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->submit_to(
center->get_id(), [this] () mutable {
std::lock_guard<std::mutex> l(delay_lock);
- while (!delay_queue.empty()) {
- Message *m = delay_queue.front();
- if (msgr->ms_can_fast_dispatch(m)) {
+ for (; !delay_queue.empty(); delay_queue.pop_front()) {
+ auto& m = delay_queue.front();
+ if (msgr->ms_can_fast_dispatch(*m)) {
dispatch_queue->fast_dispatch(m);
} else {
- dispatch_queue->enqueue(m, m->get_priority(), conn_id);
+ auto p = m->get_priority();
+ dispatch_queue->enqueue(std::move(m), p, conn_id);
}
- delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
*/
class DelayedDelivery : public EventCallback {
std::set<uint64_t> register_time_events; // need to delete it if stop
- std::deque<Message*> delay_queue;
+ std::deque<MessageRef> delay_queue;
std::mutex delay_lock;
AsyncMessenger *msgr;
EventCenter *center;
}
void set_center(EventCenter *c) { center = c; }
void do_request(uint64_t id) override;
- void queue(double delay_period, Message *m) {
+ void queue(double delay_period, MessageRef&& m) {
std::lock_guard<std::mutex> l(delay_lock);
- delay_queue.push_back(m);
+ delay_queue.push_back(std::move(m));
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard();
void accept(ConnectedSocket socket,
const entity_addr_t &listen_addr,
const entity_addr_t &peer_addr);
- int send_message(Message *m) override;
void send_keepalive() override;
void mark_down() override;
int port;
public:
Messenger::Policy policy;
+protected:
+ int send_msg(MessageRef&& m) override;
private:
DispatchQueue *dispatch_queue;
// signal and handle connection failure
virtual void fault() = 0;
// send message
- virtual void send_message(Message *m) = 0;
+ virtual void send_message(MessageRef&& m) = 0;
// send keepalive
virtual void send_keepalive() = 0;
}
}
-void ProtocolV1::send_message(Message *m) {
+void ProtocolV1::send_message(MessageRef&& m) {
ceph::buffer::list bl;
uint64_t f = connection->get_features();
// TODO: Currently not all messages supports reencode like MOSDMap, so here
// only let fast dispatch support messages prepare message
- bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+ bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m);
bool is_prepared = false;
if (can_fast_prepare && f) {
prepare_send_message(f, m, bl);
if (can_write == WriteStatus::CLOSED) {
ldout(cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
- m->put();
} else {
+ ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << *m << dendl;
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
- out_q[m->get_priority()].emplace_back(out_q_entry_t{
- std::move(bl), m, is_prepared});
- ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
- << dendl;
+ out_q[m->get_priority()].emplace_back(out_q_entry_t{std::move(bl), std::move(m), is_prepared});
if (can_write != WriteStatus::REPLACING && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
-void ProtocolV1::prepare_send_message(uint64_t features, Message *m,
+void ProtocolV1::prepare_send_message(uint64_t features, const MessageRef& m,
ceph::buffer::list &bl) {
ldout(cct, 20) << __func__ << " m " << *m << dendl;
}
const out_q_entry_t out_entry = _get_next_outgoing();
- Message *m = out_entry.m;
+ auto& m = out_entry.m;
ceph::buffer::list data = out_entry.bl;
if (!m) {
if (!connection->policy.lossy) {
// put on sent list
sent.push_back(m);
- m->get();
}
more = !out_q.empty();
connection->write_lock.unlock();
static const int max_pending = 128;
int i = 0;
auto now = ceph::mono_clock::now();
- Message *pending[max_pending];
+ std::array<MessageRef, max_pending> pending;
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
- Message *m = sent.front();
+ auto& m = pending[i++] = std::move(sent.front());
sent.pop_front();
- pending[i++] = m;
ldout(cct, 10) << __func__ << " got ack seq " << seq
<< " >= " << m->get_seq() << " on " << m << " " << *m
<< dendl;
}
connection->write_lock.unlock();
connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
- for (int k = 0; k < i; k++) {
- pending[k]->put();
- }
return CONTINUE(wait_message);
}
ldout(cct, 20) << __func__ << " got " << front.length() << " + "
<< middle.length() << " + " << data.length() << " byte message"
<< dendl;
- Message *message = decode_message(cct, messenger->crcflags, current_header,
+ Message *_message = decode_message(cct, messenger->crcflags, current_header,
footer, front, middle, data, connection);
- if (!message) {
+ if (!_message) {
ldout(cct, 1) << __func__ << " decode message failed " << dendl;
return _fault();
}
+ auto message = ceph::ref_t<Message>(_message, false); /* consume ref */
+ _message = nullptr;
+
//
// Check the signature if one should be present. A zero return indicates
// success. PLR
if (session_security.get() == NULL) {
ldout(cct, 10) << __func__ << " no session security set" << dendl;
} else {
- if (session_security->check_message_signature(message)) {
+ if (session_security->check_message_signature(message.get())) {
ldout(cct, 0) << __func__ << " Signature check failed" << dendl;
- message->put();
return _fault();
}
}
ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
<< " <= " << cur_seq << " " << message << " " << *message
<< ", discarding" << dendl;
- message->put();
if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
cct->_conf->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
if (connection->is_blackhole()) {
ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
- message->put();
goto out;
}
<< (ceph_clock_now() + delay_period) << " on " << message
<< " " << *message << dendl;
}
- connection->delay_state->queue(delay_period, message);
- } else if (messenger->ms_can_fast_dispatch(message)) {
+ connection->delay_state->queue(delay_period, std::move(message));
+ } else if (messenger->ms_can_fast_dispatch(*message)) {
connection->lock.unlock();
connection->dispatch_queue->fast_dispatch(message);
connection->recv_start_time = ceph::mono_clock::now();
connection->recv_start_time - fast_dispatch_time);
connection->lock.lock();
} else {
- connection->dispatch_queue->enqueue(message, message->get_priority(),
- connection->conn_id);
+ auto p = message->get_priority();
+ connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id);
}
out:
}
}
-ssize_t ProtocolV1::write_message(Message *m, ceph::buffer::list &bl, bool more) {
+ssize_t ProtocolV1::write_message(const MessageRef& m, ceph::buffer::list &bl, bool more) {
FUNCTRACE(cct);
ceph_assert(connection->center->in_thread());
m->set_seq(++out_seq);
if (session_security.get() == NULL) {
ldout(cct, 20) << __func__ << " no session security" << dendl;
} else {
- if (session_security->sign_message(m)) {
+ if (session_security->sign_message(m.get())) {
ldout(cct, 20) << __func__ << " failed to sign m=" << m
<< "): sig = " << footer.sig << dendl;
} else {
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
#endif
- m->put();
return rc;
}
auto &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
- while (!sent.empty()) {
- Message *m = sent.back();
- sent.pop_back();
+ for (; !sent.empty(); sent.pop_back()) {
+ auto& m = sent.back();
ldout(cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
m->clear_payload();
- rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false});
+ rq.push_front(out_q_entry_t{ceph::buffer::list(), std::move(m), false});
}
}
}
auto &rq = it->second;
uint64_t count = out_seq;
- while (!rq.empty()) {
- Message* const m = rq.front().m;
+ for (; !rq.empty(); rq.pop_front()) {
+ auto& m = rq.front().m;
if (m->get_seq() == 0 || m->get_seq() > seq) break;
ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq "
<< m->get_seq() << " <= " << seq << ", discarding"
<< dendl;
- m->put();
- rq.pop_front();
count++;
}
if (rq.empty()) out_q.erase(it);
void ProtocolV1::discard_out_queue() {
ldout(cct, 10) << __func__ << " started" << dendl;
- for (Message *msg : sent) {
+ for (const auto& msg : sent) {
ldout(cct, 20) << __func__ << " discard " << msg << dendl;
- msg->put();
}
sent.clear();
- for (auto& [ prio, entries ] : out_q) {
- static_cast<void>(prio);
+ for ([[maybe_unused]] auto& [ prio, entries ] : out_q) {
for (auto& entry : entries) {
ldout(cct, 20) << __func__ << " discard " << entry.m << dendl;
- entry.m->put();
}
}
out_q.clear();
ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() {
out_q_entry_t out_entry;
- if (const auto it = out_q.begin(); it != out_q.end()) {
- ceph_assert(!it->second.empty());
- const auto p = it->second.begin();
- out_entry = *p;
- it->second.erase(p);
- if (it->second.empty()) out_q.erase(it);
+ if (auto it = out_q.begin(); it != out_q.end()) {
+ auto& q = it->second;
+ ceph_assert(!q.empty());
+ out_entry = std::move(q.front());
+ q.pop_front();
+ if (q.empty()) {
+ out_q.erase(it);
+ }
}
return out_entry;
}
#include "Protocol.h"
#include "AsyncConnection.h"
+#include <deque>
+
struct AuthSessionHandler;
class ProtocolV1;
using CtPtr = Ct<ProtocolV1>*;
enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
std::atomic<WriteStatus> can_write;
- std::list<Message *> sent; // the first ceph::buffer::list need to inject seq
+ std::deque<MessageRef> sent; // the first ceph::buffer::list need to inject seq
//struct for outbound msgs
struct out_q_entry_t {
ceph::buffer::list bl;
- Message* m {nullptr};
+ MessageRef m;
bool is_prepared {false};
};
// priority queue for outbound msgs
/**
* A queue for each priority value, highest priority first.
*/
- std::map<int, std::list<out_q_entry_t>, std::greater<int>> out_q;
+ std::map<int, std::deque<out_q_entry_t>, std::greater<int>> out_q;
bool keepalive;
bool write_in_progress = false;
out_q_entry_t _get_next_outgoing();
- void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
- ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);
+ void prepare_send_message(uint64_t features, const MessageRef& m, ceph::buffer::list &bl);
+ ssize_t write_message(const MessageRef& m, ceph::buffer::list &bl, bool more);
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
virtual bool is_connected() override;
virtual void stop() override;
virtual void fault() override;
- virtual void send_message(Message *m) override;
+ virtual void send_message(MessageRef&& m) override;
virtual void send_keepalive() override;
virtual void read_event() override;
void ProtocolV2::discard_out_queue() {
ldout(cct, 10) << __func__ << " started" << dendl;
- for (Message *msg : sent) {
- ldout(cct, 20) << __func__ << " discard " << msg << dendl;
- msg->put();
+ for (auto& msg : sent) {
+ ldout(cct, 20) << __func__ << " discard " << *msg << dendl;
}
sent.clear();
for (auto& [ prio, entries ] : out_queue) {
static_cast<void>(prio);
for (auto& entry : entries) {
ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
- entry.m->put();
}
}
out_queue.clear();
auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
- while (!sent.empty()) {
- Message *m = sent.back();
- sent.pop_back();
+ for (; !sent.empty(); sent.pop_back()) {
+ auto& m = sent.back();
ldout(cct, 5) << __func__ << " requeueing message m=" << m
<< " seq=" << m->get_seq() << " type=" << m->get_type() << " "
<< *m << dendl;
m->clear_payload();
- rq.emplace_front(out_queue_entry_t{false, m});
+ rq.emplace_front(out_queue_entry_t{false, std::move(m)});
}
}
}
auto& rq = it->second;
uint64_t count = out_seq;
- while (!rq.empty()) {
- Message* const m = rq.front().m;
- if (m->get_seq() == 0 || m->get_seq() > seq) break;
+ for (; !rq.empty(); rq.pop_front()) {
+ auto& m = rq.front().m;
+ if (m->get_seq() == 0 || m->get_seq() > seq) {
+ break;
+ }
ldout(cct, 5) << __func__ << " discarding message m=" << m
<< " seq=" << m->get_seq() << " ack_seq=" << seq << " "
<< *m << dendl;
- m->put();
- rq.pop_front();
count++;
}
if (rq.empty()) out_queue.erase(it);
return nullptr;
}
-void ProtocolV2::prepare_send_message(uint64_t features,
- Message *m) {
+void ProtocolV2::prepare_send_message(uint64_t features, const MessageRef& m) {
ldout(cct, 20) << __func__ << " m=" << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
m->encode(features, 0);
}
-void ProtocolV2::send_message(Message *m) {
+void ProtocolV2::send_message(MessageRef&& m) {
uint64_t f = connection->get_features();
// TODO: Currently not all messages supports reencode like MOSDMap, so here
// only let fast dispatch support messages prepare message
- const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+ const bool can_fast_prepare = messenger->ms_can_fast_dispatch(*m);
bool is_prepared;
if (can_fast_prepare && f) {
prepare_send_message(f, m);
}
if (state == CLOSED) {
ldout(cct, 10) << __func__ << " connection closed."
- << " Drop message " << m << dendl;
- m->put();
+ << " Drop message " << *m << dendl;
} else {
- ldout(cct, 5) << __func__ << " enqueueing message m=" << m
- << " type=" << m->get_type() << " " << *m << dendl;
+ ldout(cct, 5) << __func__ << " enqueueing message m=" << *m << dendl;
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
- out_queue[m->get_priority()].emplace_back(
- out_queue_entry_t{is_prepared, m});
- ldout(cct, 15) << __func__ << " message queued for async transmission m=" << m
- << dendl;
+ auto& oq = out_queue[m->get_priority()];
+ oq.emplace_back(out_queue_entry_t{is_prepared, std::move(m)});
if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
return out_entry;
}
-ssize_t ProtocolV2::write_message(Message *m, bool more) {
+ssize_t ProtocolV2::write_message(const MessageRef& m, bool more) {
FUNCTRACE(cct);
ceph_assert(connection->center->in_thread());
m->set_seq(++out_seq);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
#endif
- m->put();
return rc;
}
// trim sent list
static const int max_pending = 128;
int i = 0;
- Message *pending[max_pending];
+ std::array<MessageRef, max_pending> pending;
auto now = ceph::mono_clock::now();
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
- Message *m = sent.front();
+ auto& m = pending[i++] = std::move(sent.front());
sent.pop_front();
- pending[i++] = m;
ldout(cct, 10) << __func__ << " got ack seq " << seq
<< " >= " << m->get_seq() << " on " << m << " " << *m
<< dendl;
}
connection->write_lock.unlock();
connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
- for (int k = 0; k < i; k++) {
- pending[k]->put();
- }
}
void ProtocolV2::reset_compression() {
}
}
- const auto out_entry = _get_next_outgoing();
+ auto out_entry = _get_next_outgoing();
if (!out_entry.m) {
break;
}
if (!connection->policy.lossy) {
// put on sent list
sent.push_back(out_entry.m);
- out_entry.m->get();
}
more = !out_queue.empty();
connection->write_lock.unlock();
ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
ceph_le32(0), ceph_le64(0), current_header.flags};
- Message *message = decode_message(cct, 0, header, footer,
+ Message* _m = decode_message(cct, 0, header, footer,
msg_frame.front(),
msg_frame.middle(),
msg_frame.data(),
connection);
- if (!message) {
+ if (!_m) {
ldout(cct, 1) << __func__ << " decode message failed " << dendl;
return _fault();
- } else {
- state = READ_MESSAGE_COMPLETE;
}
+ auto message = ceph::ref_t<Message>(_m, false); /* consume ref */
+ _m = nullptr;
+ state = READ_MESSAGE_COMPLETE;
+
INTERCEPT(17);
message->set_byte_throttler(connection->policy.throttler_bytes);
ldout(cct, 0) << __func__ << " got old message " << message->get_seq()
<< " <= " << cur_seq << " " << message << " " << *message
<< ", discarding" << dendl;
- message->put();
if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
cct->_conf->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
if (connection->is_blackhole()) {
ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
- message->put();
goto out;
}
<< (ceph_clock_now() + delay_period) << " on " << message
<< " " << *message << dendl;
}
- connection->delay_state->queue(delay_period, message);
- } else if (messenger->ms_can_fast_dispatch(message)) {
+ connection->delay_state->queue(delay_period, std::move(message));
+ } else if (messenger->ms_can_fast_dispatch(*message)) {
connection->lock.unlock();
- connection->dispatch_queue->fast_dispatch(message);
+ connection->dispatch_queue->fast_dispatch(std::move(message));
connection->recv_start_time = ceph::mono_clock::now();
connection->logger->tinc(l_msgr_running_fast_dispatch_time,
connection->recv_start_time - fast_dispatch_time);
return nullptr;
}
} else {
- connection->dispatch_queue->enqueue(message, message->get_priority(),
- connection->conn_id);
+ auto p = message->get_priority();
+ connection->dispatch_queue->enqueue(std::move(message), p, connection->conn_id);
}
handle_message_ack(current_header.ack_seq);
#include "compression_onwire.h"
#include "frames_v2.h"
+#include <deque>
+
class ProtocolV2 : public Protocol {
private:
enum State {
bool can_write;
struct out_queue_entry_t {
bool is_prepared {false};
- Message* m {nullptr};
+ MessageRef m;
};
/**
* A queue for each priority value, highest priority first.
*/
- std::map<int, std::list<out_queue_entry_t>, std::greater<int>> out_queue;
+ std::map<int, std::deque<out_queue_entry_t>, std::greater<int>> out_queue;
- std::list<Message *> sent;
+ std::deque<MessageRef> sent;
std::atomic<uint64_t> out_seq{0};
std::atomic<uint64_t> in_seq{0};
std::atomic<uint64_t> ack_left{0};
Ct<ProtocolV2> *_fault();
void discard_out_queue();
void reset_session();
- void prepare_send_message(uint64_t features, Message *m);
+ void prepare_send_message(uint64_t features, const MessageRef& m);
out_queue_entry_t _get_next_outgoing();
- ssize_t write_message(Message *m, bool more);
+ ssize_t write_message(const MessageRef& m, bool more);
void handle_message_ack(uint64_t seq);
void reset_compression();
virtual bool is_connected() override;
virtual void stop() override;
virtual void fault() override;
- virtual void send_message(Message *m) override;
+ virtual void send_message(MessageRef&& m) override;
virtual void send_keepalive() override;
virtual void read_event() override;