return crimson::get_logger(ceph_subsys_ms);
}
+using tmp_buf = Socket::tmp_buf;
+using packet = Socket::packet;
+
// an input_stream consumer that reads buffer segments into a bufferlist up to
// the given number of remaining bytes
struct bufferlist_consumer {
bufferlist_consumer(bufferlist& bl, size_t& remaining)
: bl(bl), remaining(remaining) {}
- using tmp_buf = seastar::temporary_buffer<char>;
using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
// consume some or all of a buffer segment
};
};
+seastar::future<> inject_delay()
+{
+ if (float delay_period = local_conf()->ms_inject_internal_delays;
+ delay_period) {
+ logger().debug("Socket::inject_delay: sleep for {}", delay_period);
+ return seastar::sleep(
+ std::chrono::milliseconds((int)(delay_period * 1000.0)));
+ }
+ return seastar::now();
+}
+
+void inject_failure()
+{
+ if (local_conf()->ms_inject_socket_failures) {
+ uint64_t rand =
+ ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
+ if (rand % local_conf()->ms_inject_socket_failures == 0) {
+ logger().warn("Socket::inject_failure: injecting socket failure");
+ throw std::system_error(make_error_code(
+ error::negotiation_failure));
+ }
+ }
+}
+
} // anonymous namespace
-seastar::future<bufferlist> Socket::read(size_t bytes)
+Socket::Socket(
+ seastar::connected_socket &&_socket,
+ side_t _side,
+ uint16_t e_port,
+ construct_tag)
+ : sid{seastar::this_shard_id()},
+ socket(std::move(_socket)),
+ in(socket.input()),
+ // the default buffer size 8192 is too small that may impact our write
+ // performance. see seastar::net::connected_socket::output()
+ out(socket.output(65536)),
+ socket_is_shutdown(false),
+ side(_side),
+ ephemeral_port(e_port)
+{
+}
+
+Socket::~Socket()
+{
+#ifndef NDEBUG
+ assert(closed);
+#endif
+}
+
+seastar::future<bufferlist>
+Socket::read(size_t bytes)
{
#ifdef UNIT_TESTS_BUILT
return try_trap_pre(next_trap_read).then([bytes, this] {
});
});
#ifdef UNIT_TESTS_BUILT
- }).then([this] (auto buf) {
+ }).then([this](auto buf) {
return try_trap_post(next_trap_read
- ).then([buf = std::move(buf)] () mutable {
+ ).then([buf = std::move(buf)]() mutable {
return std::move(buf);
});
});
}
inject_failure();
return inject_delay(
- ).then([buf = std::move(buf)] () mutable {
+ ).then([buf = std::move(buf)]() mutable {
return seastar::make_ready_future<tmp_buf>(std::move(buf));
});
});
#ifdef UNIT_TESTS_BUILT
- }).then([this] (auto buf) {
+ }).then([this](auto buf) {
return try_trap_post(next_trap_read
- ).then([buf = std::move(buf)] () mutable {
+ ).then([buf = std::move(buf)]() mutable {
return std::move(buf);
});
});
#endif
}
+seastar::future<>
+Socket::write(packet &&buf)
+{
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ return out.write(std::move(buf));
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
+seastar::future<>
+Socket::flush()
+{
+ inject_failure();
+ return inject_delay().then([this] {
+ return out.flush();
+ });
+}
+
+seastar::future<>
+Socket::write_flush(packet &&buf)
+{
+#ifdef UNIT_TESTS_BUILT
+ return try_trap_pre(next_trap_write
+ ).then([buf = std::move(buf), this]() mutable {
+#endif
+ inject_failure();
+ return inject_delay(
+ ).then([buf = std::move(buf), this]() mutable {
+ return out.write(std::move(buf)
+ ).then([this] {
+ return out.flush();
+ });
+ });
+#ifdef UNIT_TESTS_BUILT
+ }).then([this] {
+ return try_trap_post(next_trap_write);
+ });
+#endif
+}
+
void Socket::shutdown() {
socket_is_shutdown = true;
socket.shutdown_input();
static inline seastar::future<>
close_and_handle_errors(seastar::output_stream<char>& out)
{
- return out.close().handle_exception_type([] (const std::system_error& e) {
+ return out.close().handle_exception_type([](const std::system_error& e) {
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset) {
- logger().error("Socket::close(): unexpected error {}", e);
+ logger().error("Socket::close(): unexpected error {}", e.what());
ceph_abort();
}
// can happen when out is already shutdown, ignore
});
}
-seastar::future<> Socket::close() {
+seastar::future<>
+Socket::close() {
#ifndef NDEBUG
ceph_assert(!closed);
closed = true;
close_and_handle_errors(out)
).then_unpack([] {
return seastar::make_ready_future<>();
- }).handle_exception([] (auto eptr) {
- logger().error("Socket::close(): unexpected exception {}", eptr);
+ }).handle_exception([](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("Socket::close(): unexpected exception {}", e_what);
ceph_abort();
});
}
-seastar::future<> Socket::inject_delay () {
- if (float delay_period = local_conf()->ms_inject_internal_delays;
- delay_period) {
- logger().debug("Socket::inject_delay: sleep for {}", delay_period);
- return seastar::sleep(
- std::chrono::milliseconds((int)(delay_period * 1000.0)));
- }
- return seastar::now();
+seastar::future<SocketRef>
+Socket::connect(const entity_addr_t &peer_addr)
+{
+ inject_failure();
+ return inject_delay(
+ ).then([peer_addr] {
+ return seastar::connect(peer_addr.in4_addr());
+ }).then([peer_addr](seastar::connected_socket socket) {
+ auto ret = std::make_unique<Socket>(
+ std::move(socket), side_t::connector, 0, construct_tag{});
+ logger().debug("Socket::connect(): connected to {}, socket {}",
+ peer_addr, fmt::ptr(ret));
+ return ret;
+ });
}
-void Socket::inject_failure()
-{
- if (local_conf()->ms_inject_socket_failures) {
- uint64_t rand =
- ceph::util::generate_random_number<uint64_t>(1, RAND_MAX);
- if (rand % local_conf()->ms_inject_socket_failures == 0) {
- if (true) {
- logger().warn("Socket::inject_failure: injecting socket failure");
- throw std::system_error(make_error_code(
- crimson::net::error::negotiation_failure));
- }
+#ifdef UNIT_TESTS_BUILT
+void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
+ blocker = blocker_;
+ if (type == bp_type_t::READ) {
+ ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+ next_trap_read = action;
+ } else { // type == bp_type_t::WRITE
+ if (next_trap_write == bp_action_t::CONTINUE) {
+ next_trap_write = action;
+ } else if (next_trap_write == bp_action_t::FAULT) {
+ // do_sweep_messages() may combine multiple write events into one socket write
+ ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+ } else {
+ ceph_abort();
}
}
}
-#ifdef UNIT_TESTS_BUILT
-seastar::future<> Socket::try_trap_pre(bp_action_t& trap) {
+seastar::future<>
+Socket::try_trap_pre(bp_action_t& trap) {
auto action = trap;
trap = bp_action_t::CONTINUE;
switch (action) {
break;
case bp_action_t::FAULT:
logger().info("[Test] got FAULT");
- throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
+ throw std::system_error(make_error_code(error::negotiation_failure));
case bp_action_t::BLOCK:
logger().info("[Test] got BLOCK");
return blocker->block();
return seastar::make_ready_future<>();
}
-seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
+seastar::future<>
+Socket::try_trap_post(bp_action_t& trap) {
auto action = trap;
trap = bp_action_t::CONTINUE;
switch (action) {
}
return seastar::make_ready_future<>();
}
+#endif
-void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
- blocker = blocker_;
- if (type == bp_type_t::READ) {
- ceph_assert(next_trap_read == bp_action_t::CONTINUE);
- next_trap_read = action;
- } else { // type == bp_type_t::WRITE
- if (next_trap_write == bp_action_t::CONTINUE) {
- next_trap_write = action;
- } else if (next_trap_write == bp_action_t::FAULT) {
- // do_sweep_messages() may combine multiple write events into one socket write
- ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
- } else {
- ceph_abort();
- }
- }
+FixedCPUServerSocket::FixedCPUServerSocket(
+ seastar::shard_id cpu,
+ construct_tag)
+ : fixed_cpu{cpu}
+{
}
-#endif
-crimson::net::listen_ertr::future<>
+FixedCPUServerSocket::~FixedCPUServerSocket()
+{
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert(!service);
+}
+
+listen_ertr::future<>
FixedCPUServerSocket::listen(entity_addr_t addr)
{
- assert(seastar::this_shard_id() == cpu);
- logger().trace("FixedCPUServerSocket::listen({})...", addr);
- return container().invoke_on_all([addr] (auto& ss) {
- ss.addr = addr;
+ assert(seastar::this_shard_id() == fixed_cpu);
+ logger().debug("FixedCPUServerSocket({})::listen()...", addr);
+ return container().invoke_on_all([addr](auto& ss) {
+ ss.listen_addr = addr;
seastar::socket_address s_addr(addr.in4_addr());
seastar::listen_options lo;
lo.reuse_address = true;
- lo.set_fixed_cpu(ss.cpu);
+ lo.set_fixed_cpu(ss.fixed_cpu);
ss.listener = seastar::listen(s_addr, lo);
}).then([] {
return listen_ertr::now();
}).handle_exception_type(
- [addr] (const std::system_error& e) -> listen_ertr::future<> {
+ [addr](const std::system_error& e) -> listen_ertr::future<> {
if (e.code() == std::errc::address_in_use) {
- logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+ logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr);
return crimson::ct_error::address_in_use::make();
} else if (e.code() == std::errc::address_not_available) {
- logger().trace("FixedCPUServerSocket::listen({}): address not available",
+ logger().debug("FixedCPUServerSocket({})::listen(): address not available",
addr);
return crimson::ct_error::address_not_available::make();
}
- logger().error("FixedCPUServerSocket::listen({}): "
- "got unexpeted error {}", addr, e);
+ logger().error("FixedCPUServerSocket({})::listen(): "
+ "got unexpeted error {}", addr, e.what());
ceph_abort();
});
}
-seastar::future<> FixedCPUServerSocket::shutdown()
+seastar::future<>
+FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
+{
+ assert(seastar::this_shard_id() == fixed_cpu);
+ logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr);
+ return container().invoke_on_all([_fn_accept](auto &ss) {
+ assert(ss.listener);
+ ss.fn_accept = _fn_accept;
+ // gate accepting
+ // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
+ return seastar::keep_doing([&ss] {
+ return ss.listener->accept(
+ ).then([&ss](seastar::accept_result accept_result) {
+ // assert seastar::listen_options::set_fixed_cpu() works
+ assert(seastar::this_shard_id() == ss.fixed_cpu);
+ auto [socket, paddr] = std::move(accept_result);
+ entity_addr_t peer_addr;
+ peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+ peer_addr.set_type(ss.listen_addr.get_type());
+ SocketRef _socket = std::make_unique<Socket>(
+ std::move(socket), Socket::side_t::acceptor,
+ peer_addr.get_port(), Socket::construct_tag{});
+ logger().debug("FixedCPUServerSocket({})::accept(): "
+ "accepted peer {}, socket {}",
+ ss.listen_addr, peer_addr, fmt::ptr(_socket));
+ std::ignore = seastar::with_gate(
+ ss.shutdown_gate,
+ [socket=std::move(_socket), peer_addr, &ss]() mutable {
+ return ss.fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.listen_addr, peer_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss](const std::system_error& e) {
+ if (e.code() == std::errc::connection_aborted ||
+ e.code() == std::errc::invalid_argument) {
+ logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+ ss.listen_addr, e.what());
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss](auto eptr) {
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.listen_addr, e_what);
+ ceph_abort();
+ });
+ });
+ });
+}
+
+seastar::future<>
+FixedCPUServerSocket::shutdown_destroy()
{
- assert(seastar::this_shard_id() == cpu);
- logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
- return container().invoke_on_all([] (auto& ss) {
+ assert(seastar::this_shard_id() == fixed_cpu);
+ logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr);
+ // shutdown shards
+ return container().invoke_on_all([](auto& ss) {
if (ss.listener) {
ss.listener->abort_accept();
}
return ss.shutdown_gate.close();
}).then([this] {
- return reset();
- });
-}
-
-seastar::future<> FixedCPUServerSocket::destroy()
-{
- assert(seastar::this_shard_id() == cpu);
- return shutdown().then([this] {
- // we should only construct/stop shards on #0
- return container().invoke_on(0, [] (auto& ss) {
+ // destroy shards
+ return container().invoke_on_all([](auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.listen_addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }).then([this] {
+ // stop the sharded service: we should only construct/stop shards on #0
+ return container().invoke_on(0, [](auto& ss) {
assert(ss.service);
return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
});
});
}
-seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create()
+seastar::future<FixedCPUServerSocket*>
+FixedCPUServerSocket::create()
{
- auto cpu = seastar::this_shard_id();
- // we should only construct/stop shards on #0
- return seastar::smp::submit_to(0, [cpu] {
+ auto fixed_cpu = seastar::this_shard_id();
+ // start the sharded service: we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [fixed_cpu] {
auto service = std::make_unique<sharded_service_t>();
- return service->start(cpu, construct_tag{}
- ).then([service = std::move(service)] () mutable {
+ return service->start(fixed_cpu, construct_tag{}
+ ).then([service = std::move(service)]() mutable {
auto p_shard = service.get();
p_shard->local().service = std::move(service);
return p_shard;
});
- }).then([] (auto p_shard) {
+ }).then([](auto p_shard) {
return &p_shard->local();
});
}
class Socket;
using SocketRef = std::unique_ptr<Socket>;
-class Socket
-{
+class Socket {
struct construct_tag {};
- public:
+public:
// if acceptor side, peer is using a different port (ephemeral_port)
// if connector side, I'm using a different port (ephemeral_port)
enum class side_t {
acceptor,
connector
};
+ Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag);
- Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag)
- : sid{seastar::this_shard_id()},
- socket(std::move(_socket)),
- in(socket.input()),
- // the default buffer size 8192 is too small that may impact our write
- // performance. see seastar::net::connected_socket::output()
- out(socket.output(65536)),
- socket_is_shutdown(false),
- side(_side),
- ephemeral_port(e_port) {}
-
- ~Socket() {
-#ifndef NDEBUG
- assert(closed);
-#endif
- }
+ ~Socket();
Socket(Socket&& o) = delete;
- static seastar::future<SocketRef>
- connect(const entity_addr_t& peer_addr) {
- inject_failure();
- return inject_delay(
- ).then([peer_addr] {
- return seastar::connect(peer_addr.in4_addr());
- }).then([] (seastar::connected_socket socket) {
- return std::make_unique<Socket>(
- std::move(socket), side_t::connector, 0, construct_tag{});
- });
+ side_t get_side() const {
+ return side;
+ }
+
+ uint16_t get_ephemeral_port() const {
+ return ephemeral_port;
+ }
+
+ seastar::socket_address get_local_address() const {
+ return socket.local_address();
+ }
+
+ bool is_shutdown() const {
+ return socket_is_shutdown;
+ }
+
+ // learn my ephemeral_port as connector.
+ // unfortunately, there's no way to identify which port I'm using as
+ // connector with current seastar interface.
+ void learn_ephemeral_port_as_connector(uint16_t port) {
+ assert(side == side_t::connector &&
+ (ephemeral_port == 0 || ephemeral_port == port));
+ ephemeral_port = port;
}
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
+
using tmp_buf = seastar::temporary_buffer<char>;
using packet = seastar::net::packet;
seastar::future<tmp_buf> read_exactly(size_t bytes);
- seastar::future<> write(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
- return try_trap_pre(next_trap_write
- ).then([buf = std::move(buf), this] () mutable {
-#endif
- inject_failure();
- return inject_delay(
- ).then([buf = std::move(buf), this] () mutable {
- return out.write(std::move(buf));
- });
-#ifdef UNIT_TESTS_BUILT
- }).then([this] {
- return try_trap_post(next_trap_write);
- });
-#endif
- }
- seastar::future<> flush() {
- inject_failure();
- return inject_delay().then([this] {
- return out.flush();
- });
- }
- seastar::future<> write_flush(packet&& buf) {
-#ifdef UNIT_TESTS_BUILT
- return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable {
-#endif
- inject_failure();
- return inject_delay(
- ).then([buf = std::move(buf), this] () mutable {
- return out.write(std::move(buf)).then([this] { return out.flush(); });
- });
-#ifdef UNIT_TESTS_BUILT
- }).then([this] {
- return try_trap_post(next_trap_write);
- });
-#endif
- }
+ seastar::future<> write(packet &&buf);
- bool is_shutdown() const {
- return socket_is_shutdown;
- }
+ seastar::future<> flush();
+
+ seastar::future<> write_flush(packet &&buf);
// preemptively disable further reads or writes, can only be shutdown once.
void shutdown();
/// Socket can only be closed once.
seastar::future<> close();
- static seastar::future<> inject_delay();
+ static seastar::future<SocketRef>
+ connect(const entity_addr_t& peer_addr);
- static void inject_failure();
+ /*
+ * test interfaces
+ */
// shutdown for tests
void force_shutdown() {
socket.shutdown_output();
}
- side_t get_side() const {
- return side;
- }
-
- uint16_t get_ephemeral_port() const {
- return ephemeral_port;
- }
-
- // learn my ephemeral_port as connector.
- // unfortunately, there's no way to identify which port I'm using as
- // connector with current seastar interface.
- void learn_ephemeral_port_as_connector(uint16_t port) {
- assert(side == side_t::connector &&
- (ephemeral_port == 0 || ephemeral_port == port));
- ephemeral_port = port;
- }
-
- seastar::socket_address get_local_address() const {
- return socket.local_address();
- }
-
- private:
+private:
const seastar::shard_id sid;
seastar::connected_socket socket;
seastar::input_stream<char> in;
} r;
#ifdef UNIT_TESTS_BUILT
- public:
+public:
void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_);
- private:
+private:
+ seastar::future<> try_trap_pre(bp_action_t& trap);
+
+ seastar::future<> try_trap_post(bp_action_t& trap);
+
bp_action_t next_trap_read = bp_action_t::CONTINUE;
bp_action_t next_trap_write = bp_action_t::CONTINUE;
socket_blocker* blocker = nullptr;
- seastar::future<> try_trap_pre(bp_action_t& trap);
- seastar::future<> try_trap_post(bp_action_t& trap);
#endif
friend class FixedCPUServerSocket;
class FixedCPUServerSocket
: public seastar::peering_sharded_service<FixedCPUServerSocket> {
- const seastar::shard_id cpu;
- entity_addr_t addr;
- std::optional<seastar::server_socket> listener;
- seastar::gate shutdown_gate;
- using accept_func_t =
- std::function<seastar::future<>(SocketRef, entity_addr_t)>;
- accept_func_t fn_accept;
-
- using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
- std::unique_ptr<sharded_service_t> service;
-
struct construct_tag {};
- static seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_ms);
- }
-
- seastar::future<> reset() {
- return container().invoke_on_all([] (auto& ss) {
- assert(ss.shutdown_gate.is_closed());
- ss.addr = entity_addr_t();
- ss.listener.reset();
- });
- }
-
public:
- FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
- ~FixedCPUServerSocket() {
- assert(!listener);
- // detect whether user have called destroy() properly
- ceph_assert(!service);
- }
+ FixedCPUServerSocket(seastar::shard_id cpu, construct_tag);
+
+ ~FixedCPUServerSocket();
FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
listen_ertr::future<> listen(entity_addr_t addr);
- seastar::future<> accept(accept_func_t &&_fn_accept) {
- assert(seastar::this_shard_id() == cpu);
- logger().debug("FixedCPUServerSocket({})::accept()...", addr);
- return container().invoke_on_all([_fn_accept](auto &ss) {
- assert(ss.listener);
- ss.fn_accept = _fn_accept;
- // gate accepting
- // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
- // so ignore the returned future
- std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
- return seastar::keep_doing([&ss] {
- return ss.listener->accept(
- ).then([&ss](seastar::accept_result accept_result) {
- // assert seastar::listen_options::set_fixed_cpu() works
- assert(seastar::this_shard_id() == ss.cpu);
- auto [socket, paddr] = std::move(accept_result);
- entity_addr_t peer_addr;
- peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- peer_addr.set_type(ss.addr.get_type());
- SocketRef _socket = std::make_unique<Socket>(
- std::move(socket), Socket::side_t::acceptor,
- peer_addr.get_port(), Socket::construct_tag{});
- logger().debug("FixedCPUServerSocket({})::accept(): "
- "accepted peer {}, socket {}",
- ss.addr, peer_addr, fmt::ptr(_socket));
- std::ignore = seastar::with_gate(
- ss.shutdown_gate,
- [socket=std::move(_socket), peer_addr, &ss]() mutable {
- return ss.fn_accept(std::move(socket), peer_addr
- ).handle_exception([&ss, peer_addr](auto eptr) {
- const char *e_what;
- try {
- std::rethrow_exception(eptr);
- } catch (std::exception &e) {
- e_what = e.what();
- }
- logger().error("FixedCPUServerSocket({})::accept(): "
- "fn_accept(s, {}) got unexpected exception {}",
- ss.addr, peer_addr, e_what);
- ceph_abort();
- });
- });
- });
- }).handle_exception_type([&ss](const std::system_error& e) {
- if (e.code() == std::errc::connection_aborted ||
- e.code() == std::errc::invalid_argument) {
- logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
- ss.addr, e.what());
- } else {
- throw;
- }
- }).handle_exception([&ss](auto eptr) {
- const char *e_what;
- try {
- std::rethrow_exception(eptr);
- } catch (std::exception &e) {
- e_what = e.what();
- }
- logger().error("FixedCPUServerSocket({})::accept(): "
- "got unexpected exception {}", ss.addr, e_what);
- ceph_abort();
- });
- });
- });
- }
+ using accept_func_t =
+ std::function<seastar::future<>(SocketRef, entity_addr_t)>;
+ seastar::future<> accept(accept_func_t &&_fn_accept);
+
+ seastar::future<> shutdown_destroy();
- seastar::future<> shutdown();
- seastar::future<> destroy();
static seastar::future<FixedCPUServerSocket*> create();
+
+private:
+ const seastar::shard_id fixed_cpu;
+ entity_addr_t listen_addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+ accept_func_t fn_accept;
+
+ using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
};
} // namespace crimson::net