return seastar::keep_doing([=] {
return conn->read_message()
.then([=] (MessageRef msg) {
- if (msg) {
- return dispatcher->ms_dispatch(conn, std::move(msg));
- } else {
- return seastar::now();
- }
+ // start dispatch, ignoring exceptions from the application layer
+ seastar::with_gate(pending_dispatch, [=, msg = std::move(msg)] {
+ return dispatcher->ms_dispatch(conn, std::move(msg))
+ .handle_exception([] (std::exception_ptr eptr) {});
+ });
+ // return immediately to start on the next message
+ return seastar::now();
});
}).handle_exception_type([=] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
if (listener) {
listener->abort_accept();
}
+ // close all connections
return seastar::parallel_for_each(connections.begin(), connections.end(),
[this] (auto conn) {
return conn.second->close();
- }).finally([this] { connections.clear(); });
+ }).finally([this] {
+ connections.clear();
+ // closing connections will unblock any dispatchers that were waiting to
+ // send(). wait for any pending calls to finish
+ return pending_dispatch.close();
+ });
}
void SocketMessenger::set_default_policy(const SocketPolicy& p)
#include <map>
#include <optional>
+#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include "msg/Policy.h"
std::map<entity_addr_t, ConnectionRef> connections;
using Throttle = ceph::thread::Throttle;
ceph::net::PolicySet<Throttle> policy_set;
+ seastar::gate pending_dispatch;
seastar::future<> dispatch(ConnectionRef conn);