]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: implement FixedCPUServerSocket
authorYingxin Cheng <yingxin.cheng@intel.com>
Sun, 19 Jan 2020 06:53:24 +0000 (14:53 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Feb 2020 02:46:34 +0000 (10:46 +0800)
Adopt the policy load_balancing_algorithm::fixed to instruct seastar
(posix-stack) to allocate connected sockets on the designated core, so
we are not able to move them later in our application, which is not
supported by seastar and result in undefined behaviors.

Seastar requires server_socket to accept/listen on all available cores.
We encapsulate the related implementations in the new
FixedCPUServerSocket, so we can have a simpler shard-local crimson
messenger.

Even though FixedCPUServerSocket is a sharded service, only the service
located on the shard where the socket is originally created will be
serving incoming connections. it's not allowed to perform i/o with that
connected socket on other cores.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Socket.h

index ef734a84ac96f0c1ace7061ab21511c3ea214af0..95201dfcabfb28950181acd121ed01790c781ecd 100644 (file)
@@ -3,6 +3,7 @@
 
 #pragma once
 
+#include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sharded.hh>
 #include <seastar/net/packet.hh>
@@ -10,6 +11,9 @@
 #include "include/buffer.h"
 #include "msg/msg_types.h"
 
+#include "crimson/common/log.h"
+#include "Errors.h"
+
 #ifdef UNIT_TESTS_BUILT
 #include "Interceptor.h"
 #endif
@@ -17,7 +21,8 @@
 namespace crimson::net {
 
 class Socket;
-using SocketFRef = seastar::foreign_ptr<std::unique_ptr<Socket>>;
+using SocketRef = std::unique_ptr<Socket>;
+using SocketFRef = seastar::foreign_ptr<SocketRef>;
 
 class Socket
 {
@@ -139,4 +144,156 @@ class Socket
 #endif
 };
 
+class FixedCPUServerSocket
+    : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+  const seastar::shard_id cpu;
+  entity_addr_t addr;
+  std::optional<seastar::server_socket> listener;
+  seastar::gate shutdown_gate;
+
+  using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+  std::unique_ptr<sharded_service_t> service;
+
+  struct construct_tag {};
+
+  static seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_ms);
+  }
+
+  seastar::future<> reset() {
+    return container().invoke_on_all([] (auto& ss) {
+      assert(ss.shutdown_gate.is_closed());
+      ss.shutdown_gate = seastar::gate();
+      ss.addr = entity_addr_t();
+      ss.listener.reset();
+    });
+  }
+
+public:
+  FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {}
+  ~FixedCPUServerSocket() {
+    assert(!listener);
+    // detect whether user have called destroy() properly
+    ceph_assert(!service);
+  }
+
+  FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
+  FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
+  FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+
+  seastar::future<> listen(entity_addr_t addr) {
+    assert(seastar::engine().cpu_id() == cpu);
+    logger().trace("FixedCPUServerSocket::listen({})...", addr);
+    return container().invoke_on_all([addr] (auto& ss) {
+      ss.addr = addr;
+      seastar::socket_address s_addr(addr.in4_addr());
+      seastar::listen_options lo;
+      lo.reuse_address = true;
+      lo.set_fixed_cpu(ss.cpu);
+      ss.listener = seastar::listen(s_addr, lo);
+    }).handle_exception_type([addr] (const std::system_error& e) {
+      if (e.code() == error::address_in_use) {
+        logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+        throw;
+      } else {
+        logger().error("FixedCPUServerSocket::listen({}): "
+                       "got unexpeted error {}", addr, e);
+        ceph_abort();
+      }
+    });
+  }
+
+  // fn_accept should be a nothrow function of type
+  // seastar::future<>(SocketRef, entity_addr_t)
+  template <typename Func>
+  seastar::future<> accept(Func&& fn_accept) {
+    assert(seastar::engine().cpu_id() == cpu);
+    logger().trace("FixedCPUServerSocket({})::accept()...", addr);
+    return container().invoke_on_all(
+        [fn_accept = std::move(fn_accept)] (auto& ss) mutable {
+      assert(ss.listener);
+      // gate accepting
+      // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+      // so ignore the returned future
+      std::ignore = seastar::with_gate(ss.shutdown_gate,
+          [&ss, fn_accept = std::move(fn_accept)] () mutable {
+        return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable {
+          return Socket::accept(*ss.listener
+          ).then([&ss, fn_accept = std::move(fn_accept)]
+                 (auto socket, entity_addr_t peer_addr) mutable {
+            // assert seastar::listen_options::set_fixed_cpu() works
+            assert(seastar::engine().cpu_id() == ss.cpu);
+            SocketRef _socket = socket.release();
+            std::ignore = seastar::with_gate(ss.shutdown_gate,
+                [socket = std::move(_socket), peer_addr,
+                 &ss, fn_accept = std::move(fn_accept)] () mutable {
+              logger().trace("FixedCPUServerSocket({})::accept(): "
+                             "accepted peer {}", ss.addr, peer_addr);
+              return fn_accept(std::move(socket), peer_addr
+              ).handle_exception([&ss, peer_addr] (auto eptr) {
+                logger().error("FixedCPUServerSocket({})::accept(): "
+                               "fn_accept(s, {}) got unexpected exception {}",
+                               ss.addr, peer_addr, eptr);
+                ceph_abort();
+              });
+            });
+          });
+        }).handle_exception_type([&ss] (const std::system_error& e) {
+          if (e.code() == error::connection_aborted ||
+              e.code() == error::invalid_argument) {
+            logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})",
+                           ss.addr, e);
+          } else {
+            throw;
+          }
+        }).handle_exception([&ss] (auto eptr) {
+          logger().error("FixedCPUServerSocket({})::accept(): "
+                         "got unexpected exception {}", ss.addr, eptr);
+          ceph_abort();
+        });
+      });
+    });
+  }
+
+  seastar::future<> shutdown() {
+    assert(seastar::engine().cpu_id() == cpu);
+    logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
+    return container().invoke_on_all([] (auto& ss) {
+      if (ss.listener) {
+        ss.listener->abort_accept();
+      }
+      return ss.shutdown_gate.close();
+    }).then([this] {
+      return reset();
+    });
+  }
+
+  seastar::future<> destroy() {
+    assert(seastar::engine().cpu_id() == cpu);
+    return shutdown().then([this] {
+      // we should only construct/stop shards on #0
+      return container().invoke_on(0, [] (auto& ss) {
+        assert(ss.service);
+        return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+      });
+    });
+  }
+
+  static seastar::future<FixedCPUServerSocket*> create() {
+    auto cpu = seastar::engine().cpu_id();
+    // we should only construct/stop shards on #0
+    return seastar::smp::submit_to(0, [cpu] {
+      auto service = std::make_unique<sharded_service_t>();
+      return service->start(cpu, construct_tag{}
+      ).then([service = std::move(service)] () mutable {
+        auto p_shard = service.get();
+        p_shard->local().service = std::move(service);
+        return p_shard;
+      });
+    }).then([] (auto p_shard) {
+      return &p_shard->local();
+    });
+  }
+};
+
 } // namespace crimson::net