return fut;
}
-seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn,
- MessageRef m)
+std::tuple<bool, seastar::future<>>
+Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- return gate.dispatch(__func__, *this, [this, conn, &m] {
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
switch(m->get_type()) {
case MSG_MGR_MAP:
return handle_mgr_map(conn, boost::static_pointer_cast<MMgrMap>(m));
case MSG_MGR_CONFIGURE:
return handle_mgr_conf(conn, boost::static_pointer_cast<MMgrConfigure>(m));
default:
+ dispatched = false;
return seastar::now();
}
});
+ return {dispatched, seastar::now()};
}
void Client::ms_handle_connect(crimson::net::ConnectionRef c)
void report();
private:
- seastar::future<> ms_dispatch(crimson::net::Connection* conn,
- Ref<Message> m) override;
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* conn, Ref<Message> m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_connect(crimson::net::ConnectionRef conn) final;
seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
return !active_con;
}
-seastar::future<>
+std::tuple<bool, seastar::future<>>
Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- return gate.dispatch(__func__, *this, [this, conn, &m] {
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
// we only care about these message types
switch (m->get_type()) {
case CEPH_MSG_MON_MAP:
return handle_config(
boost::static_pointer_cast<MConfig>(m));
default:
+ dispatched = false;
return seastar::now();
}
});
+ return {dispatched, seastar::now()};
}
void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
private:
void tick();
- seastar::future<> ms_dispatch(crimson::net::Connection* conn,
- MessageRef m) override;
+ std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection* conn,
+ MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> handle_monmap(crimson::net::Connection* conn,
public:
virtual ~Dispatcher() {}
- virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) {
- return seastar::make_ready_future<>();
+ // Dispatchers are put into a chain as described by chain-of-responsibility
+ // pattern. If any of the dispatchers claims this message, it returns true
+ // to prevent other dispatchers from processing it, and returns a future
+ // to throttle the connection if it's too busy. Else, it returns false and
+ // the second future is ignored.
+ virtual std::tuple<bool, seastar::future<>> ms_dispatch(Connection* conn, MessageRef m) {
+ return {false, seastar::now<>()};
}
+
virtual void ms_handle_accept(ConnectionRef conn) {}
virtual void ms_handle_connect(ConnectionRef conn) {}
ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
MessageRef m) {
try {
- return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) {
- return dispatcher.ms_dispatch(conn, m);
- }).handle_exception([conn] (std::exception_ptr eptr) {
- logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
- *conn, eptr);
- ceph_abort();
- });
+ for (auto& dispatcher : dispatchers) {
+ auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m);
+ if (dispatched) {
+ return std::move(throttle_future
+ ).handle_exception([conn] (std::exception_ptr eptr) {
+ logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
+ *conn, eptr);
+ ceph_abort();
+ });
+ }
+ assert(throttle_future.available());
+ }
} catch (...) {
logger().error("{} got unexpected exception in ms_dispatch() {}",
*conn, std::current_exception());
ceph_abort();
- return seastar::now();
}
+ if (!dispatchers.empty()) {
+ logger().error("ms_dispatch unhandled message {}", *m);
+ }
+ return seastar::now();
}
void
using crimson::net::Dispatcher;
-// in existing Messenger, dispatchers are put into a chain as described by
-// chain-of-responsibility pattern. we could do the same to stop processing
-// the message once any of the dispatchers claims this message, and prevent
-// other dispatchers from reading it. but this change is more involved as
-// it requires changing the ms_ methods to return a bool. so as an intermediate
-// solution, we are using an observer dispatcher to notify all the interested
-// or unintersted parties.
class ChainedDispatchers {
boost::intrusive::slist<
Dispatcher,
peers.erase(peer);
}
-seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn,
- MessageRef m)
+std::tuple<bool, seastar::future<>>
+Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- return gate.dispatch(__func__, *this, [this, conn, &m] {
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
switch (m->get_type()) {
case MSG_OSD_PING:
return handle_osd_ping(conn, boost::static_pointer_cast<MOSDPing>(m));
default:
+ dispatched = false;
return seastar::now();
}
});
+ return {dispatched, seastar::now()};
}
void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
void set_require_authorizer(bool);
// Dispatcher methods
- seastar::future<> ms_dispatch(crimson::net::Connection* conn,
- MessageRef m) override;
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
void ms_handle_connect(crimson::net::ConnectionRef conn) override;
void ms_handle_accept(crimson::net::ConnectionRef conn) override;
});
}
-seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+std::tuple<bool, seastar::future<>>
+OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
{
- return gate.dispatch(__func__, *this, [this, conn, &m] {
- if (state.is_stopping()) {
- return seastar::now();
- }
+ if (state.is_stopping()) {
+ return {false, seastar::now()};
+ }
+ bool dispatched = true;
+ gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
switch (m->get_type()) {
case CEPH_MSG_OSD_MAP:
return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
case MSG_OSD_SCRUB2:
return handle_scrub(conn, boost::static_pointer_cast<MOSDScrub2>(m));
default:
- logger().info("ms_dispatch unhandled message {}", *m);
+ dispatched = false;
return seastar::now();
}
});
+ return {dispatched, seastar::now()};
}
void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
OSDSuperblock superblock;
// Dispatcher methods
- seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final;
+ std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection*, MessageRef) final;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
- return c->send(make_message<MPing>());
+ std::ignore = c->send(make_message<MPing>());
+ return {true, seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
ceph_assert(added);
session->connected_time = mono_clock::now();
}
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
auto session = find_session(c);
++(session->count);
if (verbose) {
ceph_assert(found != pending_conns.end());
found->second.set_value();
}
- return seastar::now();
+ return {true, seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
seastar::promise<> on_done; // satisfied when first dispatch unblocks
crimson::auth::DummyAuthClientServer dummy_auth;
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
switch (++count) {
case 1:
// block on the first request until we reenter with the second
- return on_second.get_future().then([this] {
- on_done.set_value();
- });
+ std::ignore = on_second.get_future().then([this] { on_done.set_value(); });
+ break;
case 2:
on_second.set_value();
- return seastar::now();
+ break;
default:
throw std::runtime_error("unexpected count");
}
+ return {true, seastar::now()};
}
seastar::future<> wait() { return on_done.get_future(); }
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
- return c->send(make_message<MPing>());
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
+ std::ignore = c->send(make_message<MPing>());
+ return {true, seastar::now()};
}
public:
bool stop_send = false;
seastar::promise<> stopped_send_promise;
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
- return seastar::now();
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
+ return {true, seastar::now()};
}
public:
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
- seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
auto result = interceptor.find_result(c->shared_from_this());
if (result == nullptr) {
logger().error("Untracked ms dispatched connection: {}", *c);
}
logger().info("[Test] got op, left {} ops -- [{}] {}",
pending_receive, result->index, *c);
- return seastar::now();
+ return {true, seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
std::unique_ptr<FailoverSuite> test_suite;
- seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
switch (m->get_type()) {
case CEPH_MSG_PING:
ceph_assert(recv_pong);
recv_pong->set_value();
recv_pong = std::nullopt;
- return seastar::now();
+ break;
case MSG_COMMAND_REPLY:
ceph_assert(recv_cmdreply);
recv_cmdreply->set_value();
recv_cmdreply = std::nullopt;
- return seastar::now();
+ break;
case MSG_COMMAND: {
auto m_cmd = boost::static_pointer_cast<MCommand>(m);
ceph_assert(static_cast<cmd_t>(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op);
ceph_assert(test_suite);
test_suite->notify_peer_reply();
- return seastar::now();
+ break;
}
default:
logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
ceph_abort();
}
+ return {true, seastar::now()};
}
private:
ConnectionRef tracked_conn;
unsigned pending_send = 0;
- seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
ceph_assert(tracked_conn == c->shared_from_this());
- return op_callback();
+ std::ignore = op_callback();
+ return {true, seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
const entity_addr_t test_peer_addr;
std::unique_ptr<FailoverSuitePeer> test_suite;
- seastar::future<> ms_dispatch(Connection* c, MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
ceph_assert(cmd_conn == c->shared_from_this());
switch (m->get_type()) {
case CEPH_MSG_PING:
- return c->send(make_message<MPing>());
+ std::ignore = c->send(make_message<MPing>());
+ break;
case MSG_COMMAND: {
auto m_cmd = boost::static_pointer_cast<MCommand>(m);
auto cmd = static_cast<cmd_t>(m_cmd->cmd[0][0]);
if (cmd == cmd_t::shutdown) {
logger().info("CmdSrv shutdown...");
// forwarded to FailoverTestPeer::wait()
- cmd_msgr->remove_dispatcher(*this);
- (void) cmd_msgr->shutdown();
- return seastar::now();
+ cmd_msgr->stop();
+ std::ignore = cmd_msgr->shutdown();
+ } else {
+ std::ignore = handle_cmd(cmd, m_cmd).then([c] {
+ return c->send(make_message<MCommandReply>());
+ });
}
- return handle_cmd(cmd, m_cmd).then([c] {
- return c->send(make_message<MCommandReply>());
- });
+ break;
}
default:
logger().error("{} got unexpected msg from cmd client: {}", *c, m);
ceph_abort();
}
+ return {true, seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
msg_data.append_zero(msg_len);
}
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
// server replies with MOSDOp to generate server-side write workload
bufferlist data(msg_data);
rep->write(0, msg_len, data);
rep->set_tid(m->get_tid());
- return c->send(std::move(rep));
+ std::ignore = c->send(std::move(rep));
+ return {true, seastar::now()};
}
seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
void ms_handle_connect(crimson::net::ConnectionRef conn) override {
conn_stats.connected_time = mono_clock::now();
}
- seastar::future<> ms_dispatch(crimson::net::Connection* c,
- MessageRef m) override {
+ std::tuple<bool, seastar::future<>> ms_dispatch(
+ crimson::net::Connection* c, MessageRef m) override {
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
++(conn_stats.received_count);
depth.signal(1);
- return seastar::now();
+ return {true, seastar::now()};
}
// should start messenger at this shard?