#pragma once
-#include <seastar/core/future.hh>
-#include <seastar/core/sharded.hh>
-#include <boost/intrusive/slist.hpp>
-
-#include "crimson/common/gated.h"
#include "Fwd.h"
class AuthAuthorizer;
namespace crimson::net {
-class Dispatcher : public boost::intrusive::slist_base_hook<
- boost::intrusive::link_mode<
- boost::intrusive::safe_link>> {
+class Dispatcher {
public:
virtual ~Dispatcher() {}
#pragma once
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sharded.hh>
class Connection;
using ConnectionRef = seastar::shared_ptr<Connection>;
-class Dispatcher;
+class ChainedDispatchers;
class Messenger;
using MessengerRef = seastar::shared_ptr<Messenger>;
#pragma once
-#include <seastar/core/future.hh>
+#include <list>
#include "Fwd.h"
#include "crimson/common/throttle.h"
-#include "crimson/net/chained_dispatchers.h"
#include "msg/Message.h"
#include "msg/Policy.h"
class Interceptor;
#endif
+class Dispatcher;
+
using Throttle = crimson::common::Throttle;
using SocketPolicy = ceph::net::Policy<Throttle>;
uint32_t min_port, uint32_t max_port) = 0;
/// start the messenger
- virtual seastar::future<> start(ChainedDispatchersRef) = 0;
+ virtual seastar::future<> start(const std::list<Dispatcher*>&) = 0;
+
+ seastar::future<> start(Dispatcher& dispatcher) {
+ std::list<Dispatcher*> dispatchers;
+ dispatchers.push_back(&dispatcher);
+ return start(dispatchers);
+ }
/// either return an existing connection to the peer,
/// or a new pending connection
// wait for messenger shutdown
virtual seastar::future<> wait() = 0;
- virtual void remove_dispatcher(Dispatcher&) = 0;
+ // stop dispatching events and messages
+ virtual void stop() = 0;
+
+ virtual bool is_started() const = 0;
- virtual bool dispatcher_chain_empty() const = 0;
- /// stop listenening and wait for all connections to close. safe to destruct
- /// after this future becomes available
+ // free internal resources before destruction, must be called after stopped,
+ // and must be called if is bound.
virtual seastar::future<> shutdown() = 0;
uint32_t get_crc_flags() const {
#include "crimson/common/log.h"
#include "crimson/net/Errors.h"
-#include "crimson/net/Dispatcher.h"
+#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Socket.h"
#include "crimson/net/SocketConnection.h"
#include "msg/Message.h"
namespace crimson::net {
Protocol::Protocol(proto_t type,
- ChainedDispatchersRef& dispatcher,
+ ChainedDispatchers& dispatchers,
SocketConnection& conn)
: proto_type(type),
- dispatcher(dispatcher),
+ dispatchers(dispatchers),
conn(conn),
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
{}
auto gate_closed = gate.close();
if (dispatch_reset) {
- dispatcher->ms_handle_reset(
+ dispatchers.ms_handle_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
is_replace);
}
virtual void print(std::ostream&) const = 0;
protected:
Protocol(proto_t type,
- ChainedDispatchersRef& dispatcher,
+ ChainedDispatchers& dispatchers,
SocketConnection& conn);
virtual void trigger_close() = 0;
SocketRef socket;
protected:
- ChainedDispatchersRef dispatcher;
+ ChainedDispatchers& dispatchers;
SocketConnection &conn;
AuthConnectionMetaRef auth_meta;
#include "crimson/auth/AuthClient.h"
#include "crimson/auth/AuthServer.h"
#include "crimson/common/log.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
#include "Errors.h"
#include "Socket.h"
#include "SocketConnection.h"
namespace crimson::net {
-ProtocolV1::ProtocolV1(ChainedDispatchersRef& dispatcher,
+ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger)
- : Protocol(proto_t::v1, dispatcher, conn), messenger{messenger} {}
+ : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {}
ProtocolV1::~ProtocolV1() {}
set_write_state(write_state_t::open);
if (type == open_t::connected) {
- dispatcher->ms_handle_connect(
+ dispatchers.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
} else { // type == open_t::accepted
- dispatcher->ms_handle_accept(
+ dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
class ProtocolV1 final : public Protocol {
public:
- ProtocolV1(ChainedDispatchersRef& dispatcher,
+ ProtocolV1(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV1() override;
#include "crimson/auth/AuthServer.h"
#include "crimson/common/formatter.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
#include "Errors.h"
#include "Socket.h"
#include "SocketConnection.h"
});
}
-ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger)
- : Protocol(proto_t::v2, dispatcher, conn),
+ : Protocol(proto_t::v2, dispatchers, conn),
messenger{messenger},
protocol_timer{conn}
{}
client_cookie = generate_client_cookie();
peer_global_seq = 0;
reset_write();
- dispatcher->ms_handle_remote_reset(
+ dispatchers.ms_handle_remote_reset(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
}
accept_me();
}
- dispatcher->ms_handle_accept(
+ dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gated_execute("execute_establishing", [this] {
if (socket) {
socket->shutdown();
}
- dispatcher->ms_handle_accept(
+ dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gate.dispatch_in_background("trigger_replacing", *this,
[this,
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, write_state_t::open, false);
if (dispatch_connect) {
- dispatcher->ms_handle_connect(
+ dispatchers.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
#ifdef UNIT_TESTS_BUILT
class ProtocolV2 final : public Protocol {
public:
- ProtocolV2(ChainedDispatchersRef& dispatcher,
+ ProtocolV2(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV2() override;
using crimson::common::local_conf;
SocketConnection::SocketConnection(SocketMessenger& messenger,
- ChainedDispatchersRef& dispatcher,
+ ChainedDispatchers& dispatchers,
bool is_msgr2)
: messenger(messenger)
{
if (is_msgr2) {
- protocol = std::make_unique<ProtocolV2>(dispatcher, *this, messenger);
+ protocol = std::make_unique<ProtocolV2>(dispatchers, *this, messenger);
} else {
- protocol = std::make_unique<ProtocolV1>(dispatcher, *this, messenger);
+ protocol = std::make_unique<ProtocolV1>(dispatchers, *this, messenger);
}
#ifdef UNIT_TESTS_BUILT
if (messenger.interceptor) {
#include "msg/Policy.h"
#include "crimson/common/throttle.h"
-#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Socket.h"
namespace crimson::net {
-class Dispatcher;
class Protocol;
class SocketMessenger;
class SocketConnection;
public:
SocketConnection(SocketMessenger& messenger,
- ChainedDispatchersRef& dispatcher,
+ ChainedDispatchers& dispatchers,
bool is_msgr2);
~SocketConnection() override;
#include "auth/Auth.h"
#include "Errors.h"
-#include "Dispatcher.h"
#include "Socket.h"
namespace {
});
}
-seastar::future<> SocketMessenger::start(ChainedDispatchersRef chained_dispatchers) {
+seastar::future<> SocketMessenger::start(const std::list<Dispatcher*>& _dispatchers) {
assert(seastar::this_shard_id() == master_sid);
- dispatchers = chained_dispatchers;
+ dispatchers.assign(_dispatchers);
if (listener) {
// make sure we have already bound to a valid address
ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
{
assert(seastar::this_shard_id() == master_sid);
return seastar::futurize_invoke([this] {
- if (dispatchers) {
- assert(dispatchers->empty());
- }
+ assert(dispatchers.empty());
if (listener) {
auto d_listener = listener;
listener = nullptr;
#pragma once
+#include <list>
#include <map>
-#include <optional>
#include <set>
+#include <vector>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
seastar::promise<> shutdown_promise;
FixedCPUServerSocket* listener = nullptr;
- // as we want to unregister a dispatcher from the messengers when stopping
- // that dispatcher, we have to use intrusive slist which, when used with
- // "boost::intrusive::linear<true>", can tolerate ongoing iteration of the
- // list when removing an element. However, the downside of this is that an
- // element can only be attached to one slist. So, as we need to make multiple
- // messenger reference the same set of dispatchers, we have to make them share
- // the same ChainedDispatchers, which means registering/unregistering an element
- // to one messenger will affect other messengers that share the same ChainedDispatchers.
- ChainedDispatchersRef dispatchers;
+ ChainedDispatchers dispatchers;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
std::vector<SocketConnectionRef> closing_conns;
seastar::future<> try_bind(const entity_addrvec_t& addr,
uint32_t min_port, uint32_t max_port) override;
- seastar::future<> start(ChainedDispatchersRef dispatchers) override;
+ seastar::future<> start(const std::list<Dispatcher*>& dispatchers) override;
ConnectionRef connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) override;
return shutdown_promise.get_future();
}
- void remove_dispatcher(Dispatcher& disp) override {
- dispatchers->erase(disp);
+ void stop() override {
+ dispatchers.clear();
}
- bool dispatcher_chain_empty() const override {
- return !dispatchers || dispatchers->empty();
+
+ bool is_started() const override {
+ return !dispatchers.empty();
}
+
seastar::future<> shutdown() override;
void print(ostream& out) const override {
#include "crimson/common/log.h"
#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
#include "msg/Message.h"
namespace {
}
}
+namespace crimson::net {
+
seastar::future<>
ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
MessageRef m) {
try {
for (auto& dispatcher : dispatchers) {
- auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m);
+ auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m);
if (dispatched) {
return std::move(throttle_future
).handle_exception([conn] (std::exception_ptr eptr) {
ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_accept(conn);
+ dispatcher->ms_handle_accept(conn);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_accept() {}",
ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
try {
for(auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_connect(conn);
+ dispatcher->ms_handle_connect(conn);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_connect() {}",
ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_reset(conn, is_replace);
+ dispatcher->ms_handle_reset(conn, is_replace);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_reset() {}",
ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
try {
for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_remote_reset(conn);
+ dispatcher->ms_handle_remote_reset(conn);
}
} catch (...) {
logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
ceph_abort();
}
}
+
+}
#pragma once
-#include <boost/intrusive/slist.hpp>
+#include <list>
-#include "crimson/net/Dispatcher.h"
+#include "Fwd.h"
#include "crimson/common/log.h"
-using crimson::net::Dispatcher;
+namespace crimson::net {
+
+class Dispatcher;
class ChainedDispatchers {
- boost::intrusive::slist<
- Dispatcher,
- boost::intrusive::linear<true>,
- boost::intrusive::cache_last<true>> dispatchers;
public:
- void push_front(Dispatcher& dispatcher) {
- dispatchers.push_front(dispatcher);
- }
- void push_back(Dispatcher& dispatcher) {
- dispatchers.push_back(dispatcher);
+ void assign(const std::list<Dispatcher*> _dispatchers) {
+ assert(empty());
+ assert(!_dispatchers.empty());
+ dispatchers = _dispatchers;
}
- void erase(Dispatcher& dispatcher) {
- dispatchers.erase(dispatchers.iterator_to(dispatcher));
+ void clear() {
+ dispatchers.clear();
}
bool empty() const {
return dispatchers.empty();
void ms_handle_connect(crimson::net::ConnectionRef conn);
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+
+ private:
+ std::list<Dispatcher*> dispatchers;
};
-using ChainedDispatchersRef = seastar::lw_shared_ptr<ChainedDispatchers>;
+}
SocketPolicy::lossy_client(0));
back_msgr->set_policy(entity_name_t::TYPE_OSD,
SocketPolicy::lossy_client(0));
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
return seastar::when_all_succeed(start_messenger(*front_msgr,
- front_addrs,
- chained_dispatchers),
+ front_addrs),
start_messenger(*back_msgr,
- back_addrs,
- chained_dispatchers))
+ back_addrs))
.then_unpack([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
seastar::future<>
Heartbeat::start_messenger(crimson::net::Messenger& msgr,
- const entity_addrvec_t& addrs,
- ChainedDispatchersRef chained_dispatchers)
+ const entity_addrvec_t& addrs)
{
return msgr.try_bind(addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([&msgr, chained_dispatchers]() mutable {
- return msgr.start(chained_dispatchers);
+ .then([this, &msgr]() mutable {
+ return msgr.start(*this);
});
}
{
logger().info("{}", __func__);
timer.cancel();
- if (!front_msgr->dispatcher_chain_empty())
- front_msgr->remove_dispatcher(*this);
- if (!back_msgr->dispatcher_chain_empty())
- back_msgr->remove_dispatcher(*this);
+ front_msgr->stop();
+ back_msgr->stop();
return gate.close().then([this] {
return seastar::when_all_succeed(front_msgr->shutdown(),
back_msgr->shutdown());
#include <cstdint>
#include <seastar/core/future.hh>
#include "common/ceph_time.h"
-#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Fwd.h"
void add_reporter_peers(int whoami);
seastar::future<> start_messenger(crimson::net::Messenger& msgr,
- const entity_addrvec_t& addrs,
- ChainedDispatchersRef);
+ const entity_addrvec_t& addrs);
private:
const osd_id_t whoami;
const crimson::osd::ShardServices& service;
cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
SocketPolicy::stateless_server(0));
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_front(*mgrc);
- chained_dispatchers->push_front(*monc);
- chained_dispatchers->push_front(*this);
+ std::list<Dispatcher*> dispatchers;
+ dispatchers.push_front(mgrc.get());
+ dispatchers.push_front(monc.get());
+ dispatchers.push_front(this);
return seastar::when_all_succeed(
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this, chained_dispatchers]() mutable {
- return cluster_msgr->start(chained_dispatchers);
+ .then([this, dispatchers]() mutable {
+ return cluster_msgr->start(dispatchers);
}),
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this, chained_dispatchers]() mutable {
- return public_msgr->start(chained_dispatchers);
+ .then([this, dispatchers]() mutable {
+ return public_msgr->start(dispatchers);
}));
}).then_unpack([this] {
return seastar::when_all_succeed(monc->start(),
return prepare_to_stop().then([this] {
state.set_stopping();
logger().debug("prepared to stop");
- if (!public_msgr->dispatcher_chain_empty()) {
- public_msgr->remove_dispatcher(*this);
- public_msgr->remove_dispatcher(*mgrc);
- public_msgr->remove_dispatcher(*monc);
- }
- if (!cluster_msgr->dispatcher_chain_empty()) {
- cluster_msgr->remove_dispatcher(*this);
- cluster_msgr->remove_dispatcher(*mgrc);
- cluster_msgr->remove_dispatcher(*monc);
- }
+ public_msgr->stop();
+ cluster_msgr->stop();
auto gate_close_fut = gate.close();
return asok->stop().then([this] {
return heartbeat->stop();
#include "crimson/common/type_helpers.h"
#include "crimson/common/auth_handler.h"
#include "crimson/common/gated.h"
-#include "crimson/net/chained_dispatchers.h"
#include "crimson/admin/admin_socket.h"
#include "crimson/common/simple_lru.h"
#include "crimson/common/shared_lru.h"
msgr->set_require_authorizer(false);
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
- return msgr->start(chained_dispatchers);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(*this);
});
}
seastar::future<> shutdown() {
ceph_assert(msgr);
- msgr->remove_dispatcher(*this);
+ msgr->stop();
return msgr->shutdown();
}
};
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->start(chained_dispatchers);
+ return msgr->start(*this);
}
seastar::future<> shutdown() {
ceph_assert(msgr);
- msgr->remove_dispatcher(*this);
+ msgr->stop();
return msgr->shutdown();
}
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
- return msgr->start(chained_dispatchers);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(*this);
});
}
};
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->start(chained_dispatchers);
+ return msgr->start(*this);
}
};
};
return server->wait();
}).finally([client] {
logger().info("client shutdown...");
- client->msgr->remove_dispatcher(*client);
+ client->msgr->stop();
return client->msgr->shutdown();
}).finally([server] {
logger().info("server shutdown...");
- server->msgr->remove_dispatcher(*server);
+ server->msgr->stop();
return server->msgr->shutdown();
}).finally([server, client] {
logger().info("test_concurrent_dispatch() done!\n");
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
- return msgr->start(chained_dispatchers);
+ return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->start(*this);
});
}
entity_addr_t get_addr() const {
return msgr->get_myaddr();
}
seastar::future<> shutdown() {
- msgr->remove_dispatcher(*this);
+ msgr->stop();
return msgr->shutdown();
}
};
msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->start(chained_dispatchers);
+ return msgr->start(*this);
}
void send_pings(const entity_addr_t& addr) {
auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
});
}
seastar::future<> shutdown() {
- msgr->remove_dispatcher(*this);
+ msgr->stop();
return msgr->shutdown().then([this] {
stop_send = true;
return stopped_send_promise.get_future();
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
test_msgr->interceptor = &interceptor;
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return test_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
- return test_msgr->start(chained_dispatchers);
+ return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return test_msgr->start(*this);
});
}
}
seastar::future<> shutdown() {
- test_msgr->remove_dispatcher(*this);
+ test_msgr->stop();
return test_msgr->shutdown();
}
cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
cmd_msgr->set_auth_client(&dummy_auth);
cmd_msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return cmd_msgr->start(chained_dispatchers).then([this, cmd_peer_addr] {
+ return cmd_msgr->start(*this).then([this, cmd_peer_addr] {
logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
return pingpong();
return cmd_conn->send(m).then([] {
return seastar::sleep(200ms);
}).finally([this] {
- cmd_msgr->remove_dispatcher(*this);
+ cmd_msgr->stop();
return cmd_msgr->shutdown();
});
}
peer_msgr->set_default_policy(policy);
peer_msgr->set_auth_client(&dummy_auth);
peer_msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- return peer_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
- chained_dispatchers->push_back(*this);
- return peer_msgr->start(chained_dispatchers);
+ return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return peer_msgr->start(*this);
});
}
: peer_msgr(peer_msgr), op_callback(op_callback) { }
seastar::future<> shutdown() {
- peer_msgr->remove_dispatcher(*this);
+ peer_msgr->stop();
return peer_msgr->shutdown();
}
cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
cmd_msgr->set_auth_client(&dummy_auth);
cmd_msgr->set_auth_server(&dummy_auth);
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then(
- [this, chained_dispatchers]() mutable {
- return cmd_msgr->start(chained_dispatchers);
+ return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+ return cmd_msgr->start(*this);
}).handle_exception_type([cmd_peer_addr](const std::system_error& e) {
if (e.code() == std::errc::address_in_use) {
logger().error("FailoverTestPeer::init({}) "
msgr->set_crc_data();
}
return msgr->bind(entity_addrvec_t{addr}).then([this] {
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return msgr->start(chained_dispatchers);
+ return msgr->start(*this);
});
});
}
logger().info("{} shutdown...", lname);
return seastar::smp::submit_to(msgr_sid, [this] {
ceph_assert(msgr);
+ msgr->stop();
return msgr->shutdown();
});
}
}
seastar::future<> init(bool v1_crc_enabled) {
- auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
- chained_dispatchers->push_back(*this);
- return container().invoke_on_all([v1_crc_enabled, chained_dispatchers] (auto& client) mutable {
+ return container().invoke_on_all([v1_crc_enabled] (auto& client) {
if (client.is_active()) {
client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
client.msgr->set_crc_header();
client.msgr->set_crc_data();
}
- return client.msgr->start(chained_dispatchers);
+ return client.msgr->start(client);
}
return seastar::now();
});
if (client.is_active()) {
logger().info("{} shutdown...", client.lname);
ceph_assert(client.msgr);
+ client.msgr->stop();
return client.msgr->shutdown().then([&client] {
return client.stop_dispatch_messages();
});