entity_addr_t addr;
std::optional<seastar::server_socket> listener;
seastar::gate shutdown_gate;
+ using accept_func_t =
+ std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+ accept_func_t fn_accept;
using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
std::unique_ptr<sharded_service_t> service;
listen_ertr::future<> listen(entity_addr_t addr);
- // fn_accept should be a nothrow function of type
- // seastar::future<>(SocketRef, entity_addr_t)
- template <typename Func>
- seastar::future<> accept(Func&& fn_accept) {
+ seastar::future<> accept(accept_func_t &&_fn_accept) {
assert(seastar::this_shard_id() == cpu);
- logger().trace("FixedCPUServerSocket({})::accept()...", addr);
- return container().invoke_on_all(
- [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+ logger().debug("FixedCPUServerSocket({})::accept()...", addr);
+ return container().invoke_on_all([_fn_accept](auto &ss) {
assert(ss.listener);
+ ss.fn_accept = _fn_accept;
// gate accepting
// FixedCPUServerSocket::shutdown() will drain the continuations in the gate
// so ignore the returned future
- std::ignore = seastar::with_gate(ss.shutdown_gate,
- [&ss, fn_accept = std::move(fn_accept)] () mutable {
- return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
- return ss.listener->accept().then(
- [&ss, fn_accept = std::move(fn_accept)]
- (seastar::accept_result accept_result) mutable {
+ std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+ return seastar::keep_doing([&ss] {
+ return ss.listener->accept(
+ ).then([&ss](seastar::accept_result accept_result) {
// assert seastar::listen_options::set_fixed_cpu() works
assert(seastar::this_shard_id() == ss.cpu);
auto [socket, paddr] = std::move(accept_result);
SocketRef _socket = std::make_unique<Socket>(
std::move(socket), Socket::side_t::acceptor,
peer_addr.get_port(), Socket::construct_tag{});
- std::ignore = seastar::with_gate(ss.shutdown_gate,
- [socket = std::move(_socket), peer_addr,
- &ss, fn_accept = std::move(fn_accept)] () mutable {
- logger().trace("FixedCPUServerSocket({})::accept(): "
- "accepted peer {}", ss.addr, peer_addr);
- return fn_accept(std::move(socket), peer_addr
- ).handle_exception([&ss, peer_addr] (auto eptr) {
+ logger().debug("FixedCPUServerSocket({})::accept(): "
+ "accepted peer {}, socket {}",
+ ss.addr, peer_addr, fmt::ptr(_socket));
+ std::ignore = seastar::with_gate(
+ ss.shutdown_gate,
+ [socket=std::move(_socket), peer_addr, &ss]() mutable {
+ return ss.fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
logger().error("FixedCPUServerSocket({})::accept(): "
"fn_accept(s, {}) got unexpected exception {}",
- ss.addr, peer_addr, eptr);
+ ss.addr, peer_addr, e_what);
ceph_abort();
});
});
});
- }).handle_exception_type([&ss] (const std::system_error& e) {
+ }).handle_exception_type([&ss](const std::system_error& e) {
if (e.code() == std::errc::connection_aborted ||
e.code() == std::errc::invalid_argument) {
- logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
- ss.addr, e);
+ logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+ ss.addr, e.what());
} else {
throw;
}
- }).handle_exception([&ss] (auto eptr) {
+ }).handle_exception([&ss](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
logger().error("FixedCPUServerSocket({})::accept(): "
- "got unexpected exception {}", ss.addr, eptr);
+ "got unexpected exception {}", ss.addr, e_what);
ceph_abort();
});
});