auto gate_closed = gate.close();
if (dispatch_reset) {
- try {
- dispatcher->ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
- } catch (...) {
- logger().error("{} got unexpected exception in ms_handle_reset() {}",
- conn, std::current_exception());
- }
+ dispatcher->ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
}
// asynchronous operations
return;
}
- // start dispatch, ignoring exceptions from the application layer
- gate.dispatch_in_background("ms_dispatch", *this, [this, msg = std::move(msg_ref)] {
- logger().debug("{} <== #{} === {} ({})",
- conn, msg->get_seq(), *msg, msg->get_type());
- return dispatcher->ms_dispatch(&conn, std::move(msg));
- });
+ logger().debug("{} <== #{} === {} ({})",
+ conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type());
+ std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref));
});
}
set_write_state(write_state_t::open);
if (type == open_t::connected) {
- gate.dispatch_in_background("ms_handle_connect", *this, [this] {
- return dispatcher->ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
+ dispatcher->ms_handle_connect(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
} else { // type == open_t::accepted
- gate.dispatch_in_background("ms_handle_accept", *this, [this] {
- return dispatcher->ms_handle_accept(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
- });
+ dispatcher->ms_handle_accept(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
}
gate.dispatch_in_background("execute_open", *this, [this] {
+#include "crimson/common/log.h"
#include "crimson/net/chained_dispatchers.h"
#include "crimson/net/Connection.h"
#include "msg/Message.h"
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+}
+
seastar::future<>
ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
MessageRef m) {
- return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) {
- return dispatcher.ms_dispatch(conn, m);
- });
+ try {
+ return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) {
+ return dispatcher.ms_dispatch(conn, m);
+ }).handle_exception([conn] (std::exception_ptr eptr) {
+ logger().error("{} got unexpected exception in ms_dispatch() throttling {}",
+ *conn, eptr);
+ ceph_abort();
+ });
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_dispatch() {}",
+ *conn, std::current_exception());
+ ceph_abort();
+ return seastar::now();
+ }
}
void
ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
- for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_accept(conn);
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_accept(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_accept() {}",
+ *conn, std::current_exception());
+ ceph_abort();
}
}
void
ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
- for(auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_connect(conn);
+ try {
+ for(auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_connect(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_connect() {}",
+ *conn, std::current_exception());
+ ceph_abort();
}
}
void
ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
- for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_reset(conn, is_replace);
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_reset(conn, is_replace);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
}
}
void
ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
- for (auto& dispatcher : dispatchers) {
- dispatcher.ms_handle_remote_reset(conn);
+ try {
+ for (auto& dispatcher : dispatchers) {
+ dispatcher.ms_handle_remote_reset(conn);
+ }
+ } catch (...) {
+ logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
+ *conn, std::current_exception());
+ ceph_abort();
}
}