#pragma once
+#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
#include <seastar/net/packet.hh>
#include "include/buffer.h"
#include "msg/msg_types.h"
+#include "crimson/common/log.h"
+#include "Errors.h"
+
#ifdef UNIT_TESTS_BUILT
#include "Interceptor.h"
#endif
namespace crimson::net {
class Socket;
-using SocketFRef = seastar::foreign_ptr<std::unique_ptr<Socket>>;
+using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
class Socket
{
#endif
};
+class FixedCPUServerSocket
+ : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+ const seastar::shard_id cpu;
+ entity_addr_t addr;
+ std::optional<seastar::server_socket> listener;
+ seastar::gate shutdown_gate;
+
+ using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+ std::unique_ptr<sharded_service_t> service;
+
+ struct construct_tag {};
+
+ static seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_ms);
+ }
+
+ seastar::future<> reset() {
+ return container().invoke_on_all([] (auto& ss) {
+ assert(ss.shutdown_gate.is_closed());
+ ss.shutdown_gate = seastar::gate();
+ ss.addr = entity_addr_t();
+ ss.listener.reset();
+ });
+ }
+
+public:
+ FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
+ ~FixedCPUServerSocket() {
+ assert(!listener);
+ // detect whether user have called destroy() properly
+ ceph_assert(!service);
+ }
+
+ FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
+ FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
+ FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+
+ seastar::future<> listen(entity_addr_t addr) {
+ assert(seastar::engine().cpu_id() == cpu);
+ logger().trace("FixedCPUServerSocket::listen({})...", addr);
+ return container().invoke_on_all([addr] (auto& ss) {
+ ss.addr = addr;
+ seastar::socket_address s_addr(addr.in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ lo.set_fixed_cpu(ss.cpu);
+ ss.listener = seastar::listen(s_addr, lo);
+ }).handle_exception_type([addr] (const std::system_error& e) {
+ if (e.code() == error::address_in_use) {
+ logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+ throw;
+ } else {
+ logger().error("FixedCPUServerSocket::listen({}): "
+ "got unexpeted error {}", addr, e);
+ ceph_abort();
+ }
+ });
+ }
+
+ // fn_accept should be a nothrow function of type
+ // seastar::future<>(SocketRef, entity_addr_t)
+ template <typename Func>
+ seastar::future<> accept(Func&& fn_accept) {
+ assert(seastar::engine().cpu_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::accept()...", addr);
+ return container().invoke_on_all(
+ [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+ assert(ss.listener);
+ // gate accepting
+ // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+ // so ignore the returned future
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
+ return Socket::accept(*ss.listener
+ ).then([&ss, fn_accept = std::move(fn_accept)]
+ (auto socket, entity_addr_t peer_addr) mutable {
+ // assert seastar::listen_options::set_fixed_cpu() works
+ assert(seastar::engine().cpu_id() == ss.cpu);
+ SocketRef _socket = socket.release();
+ std::ignore = seastar::with_gate(ss.shutdown_gate,
+ [socket = std::move(_socket), peer_addr,
+ &ss, fn_accept = std::move(fn_accept)] () mutable {
+ logger().trace("FixedCPUServerSocket({})::accept(): "
+ "accepted peer {}", ss.addr, peer_addr);
+ return fn_accept(std::move(socket), peer_addr
+ ).handle_exception([&ss, peer_addr] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "fn_accept(s, {}) got unexpected exception {}",
+ ss.addr, peer_addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }).handle_exception_type([&ss] (const std::system_error& e) {
+ if (e.code() == error::connection_aborted ||
+ e.code() == error::invalid_argument) {
+ logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
+ ss.addr, e);
+ } else {
+ throw;
+ }
+ }).handle_exception([&ss] (auto eptr) {
+ logger().error("FixedCPUServerSocket({})::accept(): "
+ "got unexpected exception {}", ss.addr, eptr);
+ ceph_abort();
+ });
+ });
+ });
+ }
+
+ seastar::future<> shutdown() {
+ assert(seastar::engine().cpu_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
+ return container().invoke_on_all([] (auto& ss) {
+ if (ss.listener) {
+ ss.listener->abort_accept();
+ }
+ return ss.shutdown_gate.close();
+ }).then([this] {
+ return reset();
+ });
+ }
+
+ seastar::future<> destroy() {
+ assert(seastar::engine().cpu_id() == cpu);
+ return shutdown().then([this] {
+ // we should only construct/stop shards on #0
+ return container().invoke_on(0, [] (auto& ss) {
+ assert(ss.service);
+ return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+ });
+ });
+ }
+
+ static seastar::future<FixedCPUServerSocket*> create() {
+ auto cpu = seastar::engine().cpu_id();
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [cpu] {
+ auto service = std::make_unique<sharded_service_t>();
+ return service->start(cpu, construct_tag{}
+ ).then([service = std::move(service)] () mutable {
+ auto p_shard = service.get();
+ p_shard->local().service = std::move(service);
+ return p_shard;
+ });
+ }).then([] (auto p_shard) {
+ return &p_shard->local();
+ });
+ }
+};
+
} // namespace crimson::net