return fut;
}
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
return seastar::now();
}
});
- return {dispatched, seastar::now()};
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Client::ms_handle_connect(crimson::net::ConnectionRef c)
void report();
private:
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, Ref<Message> m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_connect(crimson::net::ConnectionRef conn) final;
return !active_con;
}
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
return seastar::now();
}
});
- return {dispatched, seastar::now()};
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */)
private:
void tick();
- std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
- MessageRef m) override;
+ std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
+ MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
seastar::future<> handle_monmap(crimson::net::ConnectionRef conn,
virtual ~Dispatcher() {}
// Dispatchers are put into a chain as described by chain-of-responsibility
- // pattern. If any of the dispatchers claims this message, it returns true
- // to prevent other dispatchers from processing it, and returns a future
- // to throttle the connection if it's too busy. Else, it returns false and
- // the second future is ignored.
- virtual std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
+ // pattern. If any of the dispatchers claims this message, it returns a valid
+ // future to prevent other dispatchers from processing it, and this is also
+ // used to throttle the connection if it's too busy.
+ virtual std::optional<seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
virtual void ms_handle_accept(ConnectionRef conn) {}
MessageRef m) {
try {
for (auto& dispatcher : dispatchers) {
- auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m);
- if (dispatched) {
- return std::move(throttle_future
+ auto dispatched = dispatcher->ms_dispatch(conn, m);
+ if (dispatched.has_value()) {
+ return std::move(*dispatched
).handle_exception([conn] (std::exception_ptr eptr) {
logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
*conn, eptr);
ceph_abort();
});
}
- assert(throttle_future.available());
}
} catch (...) {
logger().error("{} got unexpected exception in ms_dispatch() {}",
peers.erase(peer);
}
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
bool dispatched = true;
return seastar::now();
}
});
- return {dispatched, seastar::now()};
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
void set_require_authorizer(bool);
// Dispatcher methods
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef conn, MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
void ms_handle_connect(crimson::net::ConnectionRef conn) override;
});
}
-std::tuple<bool, seastar::future<>>
+std::optional<seastar::future<>>
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
if (state.is_stopping()) {
- return {false, seastar::now()};
+ return {};
}
bool dispatched = true;
gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
return seastar::now();
}
});
- return {dispatched, seastar::now()};
+ return (dispatched ? std::make_optional(seastar::now()) : std::nullopt);
}
void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
OSDSuperblock superblock;
// Dispatcher methods
- std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
+ std::optional<seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
std::ignore = c->send(make_message<MPing>());
- return {true, seastar::now()};
+ return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
ceph_assert(added);
session->connected_time = mono_clock::now();
}
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
auto session = find_session(c);
++(session->count);
ceph_assert(found != pending_conns.end());
found->second.set_value();
}
- return {true, seastar::now()};
+ return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
seastar::promise<> on_done; // satisfied when first dispatch unblocks
crimson::auth::DummyAuthClientServer dummy_auth;
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
switch (++count) {
case 1:
default:
throw std::runtime_error("unexpected count");
}
- return {true, seastar::now()};
+ return {seastar::now()};
}
seastar::future<> wait() { return on_done.get_future(); }
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
- return {true, seastar::now()};
+ return {seastar::now()};
}
seastar::future<> init(const entity_name_t& name,
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
std::ignore = c->send(make_message<MPing>());
- return {true, seastar::now()};
+ return {seastar::now()};
}
public:
bool stop_send = false;
seastar::promise<> stopped_send_promise;
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
- return {true, seastar::now()};
+ return {seastar::now()};
}
public:
unsigned pending_peer_receive = 0;
unsigned pending_receive = 0;
- std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
auto result = interceptor.find_result(c);
if (result == nullptr) {
logger().error("Untracked ms dispatched connection: {}", *c);
}
logger().info("[Test] got op, left {} ops -- [{}] {}",
pending_receive, result->index, *c);
- return {true, seastar::now()};
+ return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
std::unique_ptr<FailoverSuite> test_suite;
- std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
switch (m->get_type()) {
case CEPH_MSG_PING:
ceph_assert(recv_pong);
logger().error("{} got unexpected msg from cmd server: {}", *c, *m);
ceph_abort();
}
- return {true, seastar::now()};
+ return {seastar::now()};
}
private:
ConnectionRef tracked_conn;
unsigned pending_send = 0;
- std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
logger().info("[TestPeer] got op from Test");
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
ceph_assert(tracked_conn == c);
std::ignore = op_callback();
- return {true, seastar::now()};
+ return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
const entity_addr_t test_peer_addr;
std::unique_ptr<FailoverSuitePeer> test_suite;
- std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+ std::optional<seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
ceph_assert(cmd_conn == c);
switch (m->get_type()) {
case CEPH_MSG_PING:
logger().error("{} got unexpected msg from cmd client: {}", *c, m);
ceph_abort();
}
- return {true, seastar::now()};
+ return {seastar::now()};
}
void ms_handle_accept(ConnectionRef conn) override {
msg_data.append_zero(msg_len);
}
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
rep->write(0, msg_len, data);
rep->set_tid(m->get_tid());
std::ignore = c->send(std::move(rep));
- return {true, seastar::now()};
+ return {seastar::now()};
}
seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) {
void ms_handle_connect(crimson::net::ConnectionRef conn) override {
conn_stats.connected_time = mono_clock::now();
}
- std::tuple<bool, seastar::future<>> ms_dispatch(
+ std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
++(conn_stats.received_count);
depth.signal(1);
- return {true, seastar::now()};
+ return {seastar::now()};
}
// should start messenger at this shard?