]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: implement shard-local messenger internally
authorYingxin Cheng <yingxin.cheng@intel.com>
Sun, 19 Jan 2020 07:23:25 +0000 (15:23 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Feb 2020 02:47:38 +0000 (10:47 +0800)
Adopt FixedCPUServerSocket and don't move sockets across cores after
connected/accepted.

Implement the messenger to be managed in one CPU only, since we have
encapsulated the seastar listen-on-all requirement inside
FixedCPUServerSocket, and the requirements of a cross-core messenger has
not been defined yet.

The messenger interfaces can also be simplified, but will be in another
patch.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Messenger.cc
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h

index 6992c45e95b0f3d0df2de86d07833a642d7a50cb..2b2a6ff2a14519087f20b2020c2b3d429684b7bc 100644 (file)
@@ -12,10 +12,15 @@ Messenger::create(const entity_name_t& name,
                   const uint64_t nonce,
                   const int master_sid)
 {
-  return create_sharded<SocketMessenger>(name, lname, nonce, master_sid)
-    .then([](Messenger *msgr) {
-      return msgr;
-    });
+  // enforce the messenger to a specific core (master_sid)
+  // TODO: drop the cross-core feature and cleanup the related interfaces in
+  // the future.
+  ceph_assert(master_sid >= 0);
+  return create_sharded<SocketMessenger>(
+      name, lname, nonce, static_cast<seastar::shard_id>(master_sid)
+  ).then([](Messenger *msgr) {
+    return msgr;
+  });
 }
 
 } // namespace crimson::net
index e2cbaeece0268437fe37df0870c4ab071227bfa8..b5c58f43999fad06989e3939b4efba04b5757bd0 100644 (file)
@@ -33,42 +33,50 @@ namespace crimson::net {
 SocketMessenger::SocketMessenger(const entity_name_t& myname,
                                  const std::string& logic_name,
                                  uint32_t nonce,
-                                 int master_sid)
+                                 seastar::shard_id master_sid)
   : Messenger{myname},
     master_sid{master_sid},
-    sid{seastar::engine().cpu_id()},
     logic_name{logic_name},
     nonce{nonce}
 {}
 
 seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
 {
+  assert(seastar::engine().cpu_id() == master_sid);
   auto my_addrs = addrs;
   for (auto& addr : my_addrs.v) {
     addr.nonce = nonce;
   }
-  return container().invoke_on_all([my_addrs](auto& msgr) {
-      return msgr.Messenger::set_myaddrs(my_addrs);
-    });
+  return Messenger::set_myaddrs(my_addrs);
 }
 
-seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
 {
+  assert(seastar::engine().cpu_id() == master_sid);
   ceph_assert(addrs.front().get_family() == AF_INET);
-  auto my_addrs = addrs;
-  for (auto& addr : my_addrs.v) {
-    addr.nonce = nonce;
-  }
-  logger().info("{} listening on {}", *this, my_addrs.front().in4_addr());
-  return container().invoke_on_all([my_addrs](auto& msgr) {
-    msgr.do_bind(my_addrs);
-  }).handle_exception_type([this] (const std::system_error& e) {
-    if (e.code() == error::address_in_use) {
-      throw e;
+  return set_myaddrs(addrs).then([this] {
+    if (!listener) {
+      return FixedCPUServerSocket::create().then([this] (auto _listener) {
+        listener = _listener;
+      });
     } else {
-      logger().error("{} bind: unexpected error {}", *this, e);
-      ceph_abort();
+      return seastar::now();
     }
+  }).then([this] {
+    auto listen_addr = get_myaddr();
+    logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr());
+    if (!listener) {
+      logger().warn("{} do_bind: listener doesn't exist", *this);
+      return seastar::now();
+    }
+    return listener->listen(listen_addr);
+  });
+}
+
+seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+{
+  return do_bind(addrs).then([this] {
+    logger().info("{} bind: done", *this);
   });
 }
 
@@ -78,221 +86,163 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
 {
   auto addr = addrs.front();
   if (addr.get_port() != 0) {
-    return bind(addrs);
+    return do_bind(addrs).then([this] {
+      logger().info("{} try_bind: done", *this);
+    });
   }
   ceph_assert(min_port <= max_port);
   return seastar::do_with(uint32_t(min_port),
-    [this, max_port, addr] (auto& port) {
-      return seastar::repeat([this, max_port, addr, &port] {
-          auto to_bind = addr;
-          to_bind.set_port(port);
-          return bind(entity_addrvec_t{to_bind})
-            .then([this] {
-              logger().info("{}: try_bind: done", *this);
-              return stop_t::yes;
-            }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
-              ceph_assert(e.code() == error::address_in_use);
-              logger().trace("{}: try_bind: {} already used", *this, port);
-              if (port == max_port) {
-                throw e;
-              }
-              ++port;
-              return stop_t::no;
-            });
-        });
+                          [this, max_port, addr] (auto& port) {
+    return seastar::repeat([this, max_port, addr, &port] {
+      auto to_bind = addr;
+      to_bind.set_port(port);
+      return do_bind(entity_addrvec_t{to_bind}).then([this] {
+        logger().info("{} try_bind: done", *this);
+        return stop_t::yes;
+      }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
+        assert(e.code() == error::address_in_use);
+        logger().trace("{} try_bind: {} already used", *this, port);
+        if (port == max_port) {
+          throw;
+        }
+        ++port;
+        return stop_t::no;
+      });
     });
+  });
 }
 
 seastar::future<> SocketMessenger::start(Dispatcher *disp) {
-  return container().invoke_on_all([disp](auto& msgr) {
-      return msgr.do_start(disp->get_local_shard());
-    });
-}
-
-seastar::future<crimson::net::ConnectionXRef>
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
-{
-  // make sure we connect to a valid peer_addr
-  ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
-  ceph_assert(peer_addr.get_port() > 0);
-
-  auto shard = locate_shard(peer_addr);
-  return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
-      return msgr.do_connect(peer_addr, peer_type);
-    }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) {
-      return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn));
-    });
-}
-
-seastar::future<> SocketMessenger::stop()
-{
-  return do_shutdown();
-}
-
-seastar::future<> SocketMessenger::shutdown()
-{
-  return container().invoke_on_all([](auto& msgr) {
-      return msgr.do_shutdown();
-    }).finally([this] {
-      return container().invoke_on_all([](auto& msgr) {
-          msgr.shutdown_promise.set_value();
-        });
-    });
-}
-
-void SocketMessenger::do_bind(const entity_addrvec_t& addrs)
-{
-  // safe to discard an immediate ready future
-  (void) Messenger::set_myaddrs(addrs);
-
-  // TODO: v2: listen on multiple addresses
-  seastar::socket_address address(addrs.front().in4_addr());
-  seastar::listen_options lo;
-  lo.reuse_address = true;
-  listener = seastar::listen(address, lo);
-}
+  assert(seastar::engine().cpu_id() == master_sid);
 
-seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
-{
-  dispatcher = disp;
-  started = true;
-
-  // start listening if bind() was called
+  dispatcher = disp->get_local_shard();
   if (listener) {
     // make sure we have already bound to a valid address
     ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
     ceph_assert(get_myaddr().get_port() > 0);
 
-    // forwarded to accepting_complete
-    (void) seastar::keep_doing([this] {
-        return Socket::accept(*listener)
-          .then([this] (SocketFRef socket,
-                        entity_addr_t peer_addr) {
-            auto shard = locate_shard(peer_addr);
-#warning fixme
-            // we currently do dangerous i/o from a Connection core, different from the Socket core.
-            return container().invoke_on(shard,
-              [sock = std::move(socket), peer_addr, this](auto& msgr) mutable {
-                SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
-                    msgr, *msgr.dispatcher, get_myaddr().is_msgr2());
-                conn->start_accept(std::move(sock), peer_addr);
-              });
-          });
-      }).handle_exception_type([this] (const std::system_error& e) {
-        // stop gracefully on connection_aborted and invalid_argument
-        if (e.code() != error::connection_aborted &&
-            e.code() != error::invalid_argument) {
-          logger().error("{} unexpected error during accept: {}", *this, e);
-          ceph_abort();
-        }
-      }).handle_exception([this] (auto eptr) {
-        logger().error("{} unexpected exception during accept: {}", *this, eptr);
-        ceph_abort();
-      }).then([this] () { return accepting_complete.set_value(); });
-  } else {
-    accepting_complete.set_value();
+    return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
+      assert(seastar::engine().cpu_id() == master_sid);
+      SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
+          *this, *dispatcher, get_myaddr().is_msgr2());
+      // TODO: use SocketRef
+      conn->start_accept(seastar::make_foreign(std::move(socket)), peer_addr);
+      return seastar::now();
+    });
   }
   return seastar::now();
 }
 
-seastar::foreign_ptr<crimson::net::ConnectionRef>
-SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+seastar::future<crimson::net::ConnectionXRef>
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
 {
+  assert(seastar::engine().cpu_id() == master_sid);
+
+  // make sure we connect to a valid peer_addr
+  ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
+  ceph_assert(peer_addr.get_port() > 0);
+
+  // TODO: use ConnectionRef
   if (auto found = lookup_conn(peer_addr); found) {
-    return seastar::make_foreign(found->shared_from_this());
+    return seastar::make_ready_future<ConnectionXRef>(
+      seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
+        seastar::make_foreign(found->shared_from_this())));
   }
   SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
       *this, *dispatcher, peer_addr.is_msgr2());
   conn->start_connect(peer_addr, peer_type);
-  return seastar::make_foreign(conn->shared_from_this());
+  return seastar::make_ready_future<ConnectionXRef>(
+    seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(
+      seastar::make_foreign(conn->shared_from_this())));
 }
 
-seastar::future<> SocketMessenger::do_shutdown()
+seastar::future<> SocketMessenger::shutdown()
 {
-  if (!started) {
-    return seastar::now();
-  }
-
-  if (listener) {
-    listener->abort_accept();
-  }
+  assert(seastar::engine().cpu_id() == master_sid);
+  return seastar::futurize_apply([this] {
+    if (listener) {
+      auto d_listener = listener;
+      listener = nullptr;
+      return d_listener->destroy();
+    } else {
+      return seastar::now();
+    }
   // close all connections
-  return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+  }).then([this] {
+    return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
       return conn->close();
-    }).then([this] {
-      ceph_assert(accepting_conns.empty());
-      return seastar::parallel_for_each(connections, [] (auto conn) {
-          return conn.second->close();
-        });
-    }).then([this] {
-      return accepting_complete.get_shared_future();
-    }).finally([this] {
-      ceph_assert(connections.empty());
     });
+  }).then([this] {
+    ceph_assert(accepting_conns.empty());
+    return seastar::parallel_for_each(connections, [] (auto conn) {
+      return conn.second->close();
+    });
+  }).then([this] {
+    ceph_assert(connections.empty());
+    shutdown_promise.set_value();
+  });
 }
 
 seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
 {
-  // make sure we there's no racing to learn address from peer
-  return container().invoke_on(0, [peer_addr_for_me, &conn] (auto& msgr) {
-    if (!msgr.need_addr) {
-      if ((!msgr.get_myaddr().is_any() &&
-           msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
-          msgr.get_myaddr().get_family() != peer_addr_for_me.get_family() ||
-          !msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
-        logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
-                      conn, peer_addr_for_me, msgr.get_myaddr());
-        throw std::system_error(
-            make_error_code(crimson::net::error::bad_peer_address));
-      }
-      return seastar::now();
+  assert(seastar::engine().cpu_id() == master_sid);
+  if (!need_addr) {
+    if ((!get_myaddr().is_any() &&
+         get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
+        get_myaddr().get_family() != peer_addr_for_me.get_family() ||
+        !get_myaddr().is_same_host(peer_addr_for_me)) {
+      logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
+                    conn, peer_addr_for_me, get_myaddr());
+      throw std::system_error(
+          make_error_code(crimson::net::error::bad_peer_address));
     }
-    msgr.need_addr = false;
-
-    if (msgr.get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
-      // Not bound
+    return seastar::now();
+  }
+  need_addr = false;
+
+  if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
+    // Not bound
+    entity_addr_t addr = peer_addr_for_me;
+    addr.set_type(entity_addr_t::TYPE_ANY);
+    addr.set_port(0);
+    return set_myaddrs(entity_addrvec_t{addr}
+    ).then([this, &conn, peer_addr_for_me] {
+      logger().info("{} learned myaddr={} (unbound) from {}",
+                    conn, get_myaddr(), peer_addr_for_me);
+    });
+  } else {
+    // Already bound
+    if (!get_myaddr().is_any() &&
+        get_myaddr().get_type() != peer_addr_for_me.get_type()) {
+      logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
+                    conn, peer_addr_for_me, get_myaddr());
+      throw std::system_error(
+          make_error_code(crimson::net::error::bad_peer_address));
+    }
+    if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
+      logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
+                    conn, peer_addr_for_me, get_myaddr());
+      throw std::system_error(
+          make_error_code(crimson::net::error::bad_peer_address));
+    }
+    if (get_myaddr().is_blank_ip()) {
       entity_addr_t addr = peer_addr_for_me;
-      addr.set_type(entity_addr_t::TYPE_ANY);
-      addr.set_port(0);
-      return msgr.set_myaddrs(entity_addrvec_t{addr}
-      ).then([&msgr, &conn, peer_addr_for_me] {
-        logger().info("{} learned myaddr={} (unbound) from {}",
-                      conn, msgr.get_myaddr(), peer_addr_for_me);
+      addr.set_type(get_myaddr().get_type());
+      addr.set_port(get_myaddr().get_port());
+      return set_myaddrs(entity_addrvec_t{addr}
+      ).then([this, &conn, peer_addr_for_me] {
+        logger().info("{} learned myaddr={} (blank IP) from {}",
+                      conn, get_myaddr(), peer_addr_for_me);
       });
+    } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
+      logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
+                    conn, peer_addr_for_me, get_myaddr());
+      throw std::system_error(
+          make_error_code(crimson::net::error::bad_peer_address));
     } else {
-      // Already bound
-      if (!msgr.get_myaddr().is_any() &&
-          msgr.get_myaddr().get_type() != peer_addr_for_me.get_type()) {
-        logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
-                      conn, peer_addr_for_me, msgr.get_myaddr());
-        throw std::system_error(
-            make_error_code(crimson::net::error::bad_peer_address));
-      }
-      if (msgr.get_myaddr().get_family() != peer_addr_for_me.get_family()) {
-        logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
-                      conn, peer_addr_for_me, msgr.get_myaddr());
-        throw std::system_error(
-            make_error_code(crimson::net::error::bad_peer_address));
-      }
-      if (msgr.get_myaddr().is_blank_ip()) {
-        entity_addr_t addr = peer_addr_for_me;
-        addr.set_type(msgr.get_myaddr().get_type());
-        addr.set_port(msgr.get_myaddr().get_port());
-        return msgr.set_myaddrs(entity_addrvec_t{addr}
-        ).then([&msgr, &conn, peer_addr_for_me] {
-          logger().info("{} learned myaddr={} (blank IP) from {}",
-                        conn, msgr.get_myaddr(), peer_addr_for_me);
-        });
-      } else if (!msgr.get_myaddr().is_same_host(peer_addr_for_me)) {
-        logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
-                      conn, peer_addr_for_me, msgr.get_myaddr());
-        throw std::system_error(
-            make_error_code(crimson::net::error::bad_peer_address));
-      } else {
-        return seastar::now();
-      }
+      return seastar::now();
     }
-  });
+  }
 }
 
 SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
@@ -323,19 +273,6 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
   policy_set.set_throttlers(peer_type, throttle, nullptr);
 }
 
-seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
-{
-  ceph_assert(addr.get_family() == AF_INET);
-  if (master_sid >= 0) {
-    return master_sid;
-  }
-  std::size_t seed = 0;
-  boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
-  //boost::hash_combine(seed, addr.u.sin.sin_port);
-  //boost::hash_combine(seed, addr.nonce);
-  return seed % seastar::smp::count;
-}
-
 crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
 {
   if (auto found = connections.find(addr);
@@ -358,9 +295,6 @@ void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
 
 void SocketMessenger::register_conn(SocketConnectionRef conn)
 {
-  if (master_sid >= 0) {
-    ceph_assert(static_cast<int>(sid) == master_sid);
-  }
   auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
   std::ignore = i;
   ceph_assert(added);
@@ -378,12 +312,10 @@ void SocketMessenger::unregister_conn(SocketConnectionRef conn)
 seastar::future<uint32_t>
 SocketMessenger::get_global_seq(uint32_t old)
 {
-  return container().invoke_on(0, [old] (auto& msgr) {
-    if (old > msgr.global_seq) {
-      msgr.global_seq = old;
-    }
-    return ++msgr.global_seq;
-  });
+  if (old > global_seq) {
+    global_seq = old;
+  }
+  return seastar::make_ready_future<uint32_t>(++global_seq);
 }
 
 } // namespace crimson::net
index 2eecc306859015c425d0dd662c742bb2a41a5328..71ef441e9e9a8e9e17f4d2bd7ad0bca85f83d590 100644 (file)
 
 namespace crimson::net {
 
+class FixedCPUServerSocket;
+
 class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
-  const int master_sid;
-  const seastar::shard_id sid;
+  const seastar::shard_id master_sid;
   seastar::promise<> shutdown_promise;
 
-  std::optional<seastar::server_socket> listener;
+  FixedCPUServerSocket* listener = nullptr;
   Dispatcher *dispatcher = nullptr;
   std::map<entity_addr_t, SocketConnectionRef> connections;
   std::set<SocketConnectionRef> accepting_conns;
@@ -43,30 +44,16 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   // specifying we haven't learned our addr; set false when we find it.
   bool need_addr = true;
   uint32_t global_seq = 0;
-
-  seastar::future<> accept(seastar::connected_socket socket,
-                           seastar::socket_address paddr);
-
-  void do_bind(const entity_addrvec_t& addr);
-
   bool started = false;
-  seastar::shared_promise<> accepting_complete;
-  seastar::future<> do_start(Dispatcher *disp);
-  seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
-                                                 const entity_type_t& peer_type);
-  seastar::future<> do_shutdown();
-  // conn sharding options:
-  // 0. Compatible (master_sid >= 0): place all connections to one master shard
-  // 1. Simplest (master_sid < 0): sharded by ip only
-  // 2. Balanced (not implemented): sharded by ip + port + nonce,
-  //        but, need to move SocketConnection between cores.
-  seastar::shard_id locate_shard(const entity_addr_t& addr);
+
+  seastar::future<> do_bind(const entity_addrvec_t& addr);
 
  public:
   SocketMessenger(const entity_name_t& myname,
                   const std::string& logic_name,
                   uint32_t nonce,
-                  int master_sid);
+                  seastar::shard_id master_sid);
+  ~SocketMessenger() override { ceph_assert(!listener); }
 
   seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
 
@@ -83,6 +70,7 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
                                           const entity_type_t& peer_type) override;
   // can only wait once
   seastar::future<> wait() override {
+    assert(seastar::engine().cpu_id() == master_sid);
     return shutdown_promise.get_future();
   }
 
@@ -118,12 +106,9 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   void unaccept_conn(SocketConnectionRef);
   void register_conn(SocketConnectionRef);
   void unregister_conn(SocketConnectionRef);
-
-  // required by sharded<>
-  seastar::future<> stop();
-
   seastar::shard_id shard_id() const {
-    return sid;
+    assert(seastar::engine().cpu_id() == master_sid);
+    return master_sid;
   }
 };