From: Yingxin Cheng Date: Sun, 19 Jan 2020 06:53:24 +0000 (+0800) Subject: crimson/net: implement FixedCPUServerSocket X-Git-Tag: v15.1.1~394^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d12c84a971f3b86b79562679cdbe80058a593235;p=ceph.git crimson/net: implement FixedCPUServerSocket Adopt the policy load_balancing_algorithm::fixed to instruct seastar (posix-stack) to allocate connected sockets on the designated core, so we are not able to move them later in our application, which is not supported by seastar and result in undefined behaviors. Seastar requires server_socket to accept/listen on all available cores. We encapsulate the related implementations in the new FixedCPUServerSocket, so we can have a simpler shard-local crimson messenger. Even though FixedCPUServerSocket is a sharded service, only the service located on the shard where the socket is originally created will be serving incoming connections. it's not allowed to perform i/o with that connected socket on other cores. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index ef734a84ac9..95201dfcabf 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -3,6 +3,7 @@ #pragma once +#include #include #include #include @@ -10,6 +11,9 @@ #include "include/buffer.h" #include "msg/msg_types.h" +#include "crimson/common/log.h" +#include "Errors.h" + #ifdef UNIT_TESTS_BUILT #include "Interceptor.h" #endif @@ -17,7 +21,8 @@ namespace crimson::net { class Socket; -using SocketFRef = seastar::foreign_ptr>; +using SocketRef = std::unique_ptr; +using SocketFRef = seastar::foreign_ptr; class Socket { @@ -139,4 +144,156 @@ class Socket #endif }; +class FixedCPUServerSocket + : public seastar::peering_sharded_service { + const seastar::shard_id cpu; + entity_addr_t addr; + std::optional listener; + seastar::gate shutdown_gate; + + using sharded_service_t = seastar::sharded; + std::unique_ptr service; + + struct construct_tag {}; + + static seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_ms); + } + + seastar::future<> reset() { + return container().invoke_on_all([] (auto& ss) { + assert(ss.shutdown_gate.is_closed()); + ss.shutdown_gate = seastar::gate(); + ss.addr = entity_addr_t(); + ss.listener.reset(); + }); + } + +public: + FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} + ~FixedCPUServerSocket() { + assert(!listener); + // detect whether user have called destroy() properly + ceph_assert(!service); + } + + FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; + FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; + FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; + + seastar::future<> listen(entity_addr_t addr) { + assert(seastar::engine().cpu_id() == cpu); + logger().trace("FixedCPUServerSocket::listen({})...", addr); + return container().invoke_on_all([addr] (auto& ss) { + ss.addr = addr; + seastar::socket_address s_addr(addr.in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + lo.set_fixed_cpu(ss.cpu); + ss.listener = seastar::listen(s_addr, lo); + }).handle_exception_type([addr] (const std::system_error& e) { + if (e.code() == error::address_in_use) { + logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); + throw; + } else { + logger().error("FixedCPUServerSocket::listen({}): " + "got unexpeted error {}", addr, e); + ceph_abort(); + } + }); + } + + // fn_accept should be a nothrow function of type + // seastar::future<>(SocketRef, entity_addr_t) + template + seastar::future<> accept(Func&& fn_accept) { + assert(seastar::engine().cpu_id() == cpu); + logger().trace("FixedCPUServerSocket({})::accept()...", addr); + return container().invoke_on_all( + [fn_accept = std::move(fn_accept)] (auto& ss) mutable { + assert(ss.listener); + // gate accepting + // FixedCPUServerSocket::shutdown() will drain the continuations in the gate + // so ignore the returned future + std::ignore = seastar::with_gate(ss.shutdown_gate, + [&ss, fn_accept = std::move(fn_accept)] () mutable { + return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { + return Socket::accept(*ss.listener + ).then([&ss, fn_accept = std::move(fn_accept)] + (auto socket, entity_addr_t peer_addr) mutable { + // assert seastar::listen_options::set_fixed_cpu() works + assert(seastar::engine().cpu_id() == ss.cpu); + SocketRef _socket = socket.release(); + std::ignore = seastar::with_gate(ss.shutdown_gate, + [socket = std::move(_socket), peer_addr, + &ss, fn_accept = std::move(fn_accept)] () mutable { + logger().trace("FixedCPUServerSocket({})::accept(): " + "accepted peer {}", ss.addr, peer_addr); + return fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr] (auto eptr) { + logger().error("FixedCPUServerSocket({})::accept(): " + "fn_accept(s, {}) got unexpected exception {}", + ss.addr, peer_addr, eptr); + ceph_abort(); + }); + }); + }); + }).handle_exception_type([&ss] (const std::system_error& e) { + if (e.code() == error::connection_aborted || + e.code() == error::invalid_argument) { + logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", + ss.addr, e); + } else { + throw; + } + }).handle_exception([&ss] (auto eptr) { + logger().error("FixedCPUServerSocket({})::accept(): " + "got unexpected exception {}", ss.addr, eptr); + ceph_abort(); + }); + }); + }); + } + + seastar::future<> shutdown() { + assert(seastar::engine().cpu_id() == cpu); + logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); + return container().invoke_on_all([] (auto& ss) { + if (ss.listener) { + ss.listener->abort_accept(); + } + return ss.shutdown_gate.close(); + }).then([this] { + return reset(); + }); + } + + seastar::future<> destroy() { + assert(seastar::engine().cpu_id() == cpu); + return shutdown().then([this] { + // we should only construct/stop shards on #0 + return container().invoke_on(0, [] (auto& ss) { + assert(ss.service); + return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); + }); + }); + } + + static seastar::future create() { + auto cpu = seastar::engine().cpu_id(); + // we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [cpu] { + auto service = std::make_unique(); + return service->start(cpu, construct_tag{} + ).then([service = std::move(service)] () mutable { + auto p_shard = service.get(); + p_shard->local().service = std::move(service); + return p_shard; + }); + }).then([] (auto p_shard) { + return &p_shard->local(); + }); + } +}; + } // namespace crimson::net