class Messenger;
using MessengerRef = seastar::shared_ptr<Messenger>;
+using MessageFRef = seastar::foreign_ptr<MessageURef>;
+
} // namespace crimson::net
return (messenger.get_myaddr() > peer_addr || policy.server);
}
-seastar::future<> SocketConnection::send(MessageURef msg)
+seastar::future<> SocketConnection::send(MessageURef _msg)
{
+ MessageFRef msg = seastar::make_foreign(std::move(_msg));
return seastar::smp::submit_to(
io_handler->get_shard_id(),
[this, msg=std::move(msg)]() mutable {
virtual bool is_connected() const = 0;
- virtual seastar::future<> send(MessageURef) = 0;
+ virtual seastar::future<> send(MessageFRef) = 0;
virtual seastar::future<> send_keepalive() = 0;
std::for_each(
out_pending_msgs.begin(),
out_pending_msgs.begin()+num_msgs,
- [this, &bl](const MessageURef& msg) {
+ [this, &bl](const MessageFRef& msg) {
// set priority
msg->get_header().src = conn.messenger.get_myname();
return bl;
}
-seastar::future<> IOHandler::send(MessageURef msg)
+seastar::future<> IOHandler::send(MessageFRef msg)
{
ceph_assert_always(seastar::this_shard_id() == sid);
if (io_state != io_state_t::drop) {
out_seq -= out_sent_msgs.size();
logger().debug("{} requeue {} items, revert out_seq to {}",
conn, out_sent_msgs.size(), out_seq);
- for (MessageURef& msg : out_sent_msgs) {
+ for (MessageFRef& msg : out_sent_msgs) {
msg->clear_payload();
msg->set_seq(0);
}
return protocol_is_connected;
}
- seastar::future<> send(MessageURef msg) final;
+ seastar::future<> send(MessageFRef msg) final;
seastar::future<> send_keepalive() final;
seq_num_t out_seq = 0;
// messages to be resent after connection gets reset
- std::deque<MessageURef> out_pending_msgs;
+ std::deque<MessageFRef> out_pending_msgs;
// messages sent, but not yet acked by peer
- std::deque<MessageURef> out_sent_msgs;
+ std::deque<MessageFRef> out_sent_msgs;
bool need_keepalive = false;