fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
- return seastar::with_gate(messenger.pending_dispatch, [this] {
- return dispatcher.ms_handle_connect(this);
- });
+ return dispatcher.ms_handle_connect(this);
}).then([this] {
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
fut.forward_to(std::move(h.promise));
}).then([this] {
// notify the dispatcher and allow them to reject the connection
- return seastar::with_gate(messenger.pending_dispatch, [=] {
- return dispatcher.ms_handle_accept(this);
- });
+ return dispatcher.ms_handle_accept(this);
}).then([this] {
messenger.register_conn(this);
messenger.unaccept_conn(this);
return read_message()
.then([this] (MessageRef msg) {
// start dispatch, ignoring exceptions from the application layer
- seastar::with_gate(messenger.pending_dispatch, [this, msg = std::move(msg)] {
+ seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] {
return dispatcher.ms_dispatch(this, std::move(msg))
.handle_exception([] (std::exception_ptr eptr) {});
});
}).handle_exception_type([this] (const std::system_error& e) {
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
- return seastar::with_gate(messenger.pending_dispatch, [this] {
- return dispatcher.ms_handle_reset(this);
- });
+ return dispatcher.ms_handle_reset(this);
} else if (e.code() == error::read_eof) {
- return seastar::with_gate(messenger.pending_dispatch, [this] {
- return dispatcher.ms_handle_remote_reset(this);
- });
+ return dispatcher.ms_handle_remote_reset(this);
} else {
throw e;
}
});
}).finally([this] {
ceph_assert(connections.empty());
- // closing connections will unblock any dispatchers that were waiting to
- // send(). wait for any pending calls to finish
- return pending_dispatch.close();
});
}
auth_proto_t protocol,
bufferlist& auth)
{
- return seastar::with_gate(pending_dispatch, [=, &auth] {
- return dispatcher->ms_verify_authorizer(peer_type, protocol, auth);
- });
+ return dispatcher->ms_verify_authorizer(peer_type, protocol, auth);
}
seastar::future<std::unique_ptr<AuthAuthorizer>>
SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new)
{
- return seastar::with_gate(pending_dispatch, [=] {
- return dispatcher->ms_get_authorizer(peer_type, force_new);
- });
+ return dispatcher->ms_get_authorizer(peer_type, force_new);
}
bool force_new) override;
public:
- // TODO: change to per-connection messenger gate
- seastar::gate pending_dispatch;
-
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);