]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: cleanup interfaces to start and stop a messenger
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 27 Nov 2020 06:03:43 +0000 (14:03 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 1 Dec 2020 01:33:29 +0000 (09:33 +0800)
There is no on-going iterations in the new
ChainedDispatchers::ms_dispatch() implementation, so we no longer need
to worry about removing dispatchers when stopping the messenger. So the
""boost::intrusive::slist" is not needed, and we can use cleaner
interfaces to start and stop the messenger.

Also fixed an regression issue in perf_crimson_msgr caused by
ChainedDispatchers.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
21 files changed:
src/crimson/net/Dispatcher.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV1.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/test/crimson/test_messenger.cc
src/tools/crimson/perf_crimson_msgr.cc

index e106f7a4b4e5fa5936eb6ffa83396059301afbde..00cd7d474a2a3f4efdb65073fc2c5fade8f22c15 100644 (file)
 
 #pragma once
 
-#include <seastar/core/future.hh>
-#include <seastar/core/sharded.hh>
-#include <boost/intrusive/slist.hpp>
-
-#include "crimson/common/gated.h"
 #include "Fwd.h"
 
 class AuthAuthorizer;
 
 namespace crimson::net {
 
-class Dispatcher : public boost::intrusive::slist_base_hook<
-                           boost::intrusive::link_mode<
-                             boost::intrusive::safe_link>> {
+class Dispatcher {
  public:
   virtual ~Dispatcher() {}
 
index 2221533967243c4932f855523109b0cfceb8629d..8dab402b3962189225d30932b1a59173903925b2 100644 (file)
@@ -14,6 +14,8 @@
 
 #pragma once
 
+#include <seastar/core/future.hh>
+#include <seastar/core/future-util.hh>
 #include <seastar/core/shared_ptr.hh>
 #include <seastar/core/sharded.hh>
 
@@ -34,7 +36,7 @@ using stop_t = seastar::stop_iteration;
 class Connection;
 using ConnectionRef = seastar::shared_ptr<Connection>;
 
-class Dispatcher;
+class ChainedDispatchers;
 
 class Messenger;
 using MessengerRef = seastar::shared_ptr<Messenger>;
index 1ba943665926d04cfd5bb09e8243f049fe608173..60326135bc6ea2d5572b04a4048a6b030d349da3 100644 (file)
 
 #pragma once
 
-#include <seastar/core/future.hh>
+#include <list>
 
 #include "Fwd.h"
 #include "crimson/common/throttle.h"
-#include "crimson/net/chained_dispatchers.h"
 #include "msg/Message.h"
 #include "msg/Policy.h"
 
@@ -35,6 +34,8 @@ namespace crimson::net {
 class Interceptor;
 #endif
 
+class Dispatcher;
+
 using Throttle = crimson::common::Throttle;
 using SocketPolicy = ceph::net::Policy<Throttle>;
 
@@ -77,7 +78,13 @@ public:
                                      uint32_t min_port, uint32_t max_port) = 0;
 
   /// start the messenger
-  virtual seastar::future<> start(ChainedDispatchersRef) = 0;
+  virtual seastar::future<> start(const std::list<Dispatcher*>&) = 0;
+
+  seastar::future<> start(Dispatcher& dispatcher) {
+    std::list<Dispatcher*> dispatchers;
+    dispatchers.push_back(&dispatcher);
+    return start(dispatchers);
+  }
 
   /// either return an existing connection to the peer,
   /// or a new pending connection
@@ -94,11 +101,13 @@ public:
   // wait for messenger shutdown
   virtual seastar::future<> wait() = 0;
 
-  virtual void remove_dispatcher(Dispatcher&) = 0;
+  // stop dispatching events and messages
+  virtual void stop() = 0;
+
+  virtual bool is_started() const = 0;
 
-  virtual bool dispatcher_chain_empty() const = 0;
-  /// stop listenening and wait for all connections to close. safe to destruct
-  /// after this future becomes available
+  // free internal resources before destruction, must be called after stopped,
+  // and must be called if is bound.
   virtual seastar::future<> shutdown() = 0;
 
   uint32_t get_crc_flags() const {
index 6bbb8e7bff77ccaab6b0cbfc799146e187936f15..541f227e45d20dc4178d5d2519eb51b51a535f5b 100644 (file)
@@ -7,7 +7,7 @@
 
 #include "crimson/common/log.h"
 #include "crimson/net/Errors.h"
-#include "crimson/net/Dispatcher.h"
+#include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/Socket.h"
 #include "crimson/net/SocketConnection.h"
 #include "msg/Message.h"
@@ -21,10 +21,10 @@ namespace {
 namespace crimson::net {
 
 Protocol::Protocol(proto_t type,
-                   ChainedDispatchersRef& dispatcher,
+                   ChainedDispatchers& dispatchers,
                    SocketConnection& conn)
   : proto_type(type),
-    dispatcher(dispatcher),
+    dispatchers(dispatchers),
     conn(conn),
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()}
 {}
@@ -73,7 +73,7 @@ void Protocol::close(bool dispatch_reset,
   auto gate_closed = gate.close();
 
   if (dispatch_reset) {
-    dispatcher->ms_handle_reset(
+    dispatchers.ms_handle_reset(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
         is_replace);
   }
index 3deb706acb4eb67abac2770d461f4621cb5a428a..dc4e4f2af8f33c188740fd10b6bae8a267191f8d 100644 (file)
@@ -50,7 +50,7 @@ class Protocol {
   virtual void print(std::ostream&) const = 0;
  protected:
   Protocol(proto_t type,
-           ChainedDispatchersRef& dispatcher,
+           ChainedDispatchers& dispatchers,
            SocketConnection& conn);
 
   virtual void trigger_close() = 0;
@@ -71,7 +71,7 @@ class Protocol {
   SocketRef socket;
 
  protected:
-  ChainedDispatchersRef dispatcher;
+  ChainedDispatchers& dispatchers;
   SocketConnection &conn;
 
   AuthConnectionMetaRef auth_meta;
index 9b6c59f8923202e04e1abfe5bfac7356c1f2a2bd..95afb61c1cc9e0e753756e8e7ff89a7bf48f4e6d 100644 (file)
@@ -15,7 +15,7 @@
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/log.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
 #include "Errors.h"
 #include "Socket.h"
 #include "SocketConnection.h"
@@ -125,10 +125,10 @@ void discard_up_to(std::deque<MessageRef>* queue,
 
 namespace crimson::net {
 
-ProtocolV1::ProtocolV1(ChainedDispatchersRef& dispatcher,
+ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers,
                        SocketConnection& conn,
                        SocketMessenger& messenger)
-  : Protocol(proto_t::v1, dispatcher, conn), messenger{messenger} {}
+  : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {}
 
 ProtocolV1::~ProtocolV1() {}
 
@@ -917,10 +917,10 @@ void ProtocolV1::execute_open(open_t type)
   set_write_state(write_state_t::open);
 
   if (type == open_t::connected) {
-    dispatcher->ms_handle_connect(
+    dispatchers.ms_handle_connect(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   } else { // type == open_t::accepted
-    dispatcher->ms_handle_accept(
+    dispatchers.ms_handle_accept(
         seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
 
index d7d642c572757daab78cabe25dc4567af9450a60..c71af598bcfc52c30b6942e6f6e77932466482dc 100644 (file)
@@ -12,7 +12,7 @@ namespace crimson::net {
 
 class ProtocolV1 final : public Protocol {
  public:
-  ProtocolV1(ChainedDispatchersRef& dispatcher,
+  ProtocolV1(ChainedDispatchers& dispatchers,
              SocketConnection& conn,
              SocketMessenger& messenger);
   ~ProtocolV1() override;
index de1d6f55f810c4ddbe65d1903760680e70a791d5..194b6af486bc2dffe6b076f20153935e167beac5 100644 (file)
@@ -12,7 +12,7 @@
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/formatter.h"
 
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
 #include "Errors.h"
 #include "Socket.h"
 #include "SocketConnection.h"
@@ -143,10 +143,10 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
   });
 }
 
-ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
                        SocketConnection& conn,
                        SocketMessenger& messenger)
-  : Protocol(proto_t::v2, dispatcher, conn),
+  : Protocol(proto_t::v2, dispatchers, conn),
     messenger{messenger},
     protocol_timer{conn}
 {}
@@ -385,7 +385,7 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
     reset_write();
-    dispatcher->ms_handle_remote_reset(
+    dispatchers.ms_handle_remote_reset(
        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
 }
@@ -1601,7 +1601,7 @@ void ProtocolV2::execute_establishing(
     accept_me();
   }
 
-  dispatcher->ms_handle_accept(
+  dispatchers.ms_handle_accept(
       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
 
   gated_execute("execute_establishing", [this] {
@@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   if (socket) {
     socket->shutdown();
   }
-  dispatcher->ms_handle_accept(
+  dispatchers.ms_handle_accept(
       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   gate.dispatch_in_background("trigger_replacing", *this,
                  [this,
@@ -1942,7 +1942,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect)
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   trigger_state(state_t::READY, write_state_t::open, false);
   if (dispatch_connect) {
-    dispatcher->ms_handle_connect(
+    dispatchers.ms_handle_connect(
        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
 #ifdef UNIT_TESTS_BUILT
index d4672c4ce49e3bea4064f75b917fb0eb11131d46..be9a2281668757ff6149399242ba628b393328a1 100644 (file)
@@ -13,7 +13,7 @@ namespace crimson::net {
 
 class ProtocolV2 final : public Protocol {
  public:
-  ProtocolV2(ChainedDispatchersRef& dispatcher,
+  ProtocolV2(ChainedDispatchers& dispatchers,
              SocketConnection& conn,
              SocketMessenger& messenger);
   ~ProtocolV2() override;
index b0c7197eedb2f360818a599beec7062161ed6b2f..623dca32f0b1c03aae7d85bfb89ff1d2dacd49e4 100644 (file)
@@ -26,14 +26,14 @@ using namespace crimson::net;
 using crimson::common::local_conf;
 
 SocketConnection::SocketConnection(SocketMessenger& messenger,
-                                   ChainedDispatchersRef& dispatcher,
+                                   ChainedDispatchers& dispatchers,
                                    bool is_msgr2)
   : messenger(messenger)
 {
   if (is_msgr2) {
-    protocol = std::make_unique<ProtocolV2>(dispatcher, *this, messenger);
+    protocol = std::make_unique<ProtocolV2>(dispatchers, *this, messenger);
   } else {
-    protocol = std::make_unique<ProtocolV1>(dispatcher, *this, messenger);
+    protocol = std::make_unique<ProtocolV1>(dispatchers, *this, messenger);
   }
 #ifdef UNIT_TESTS_BUILT
   if (messenger.interceptor) {
index 0af08e0e4f28cc72448f37a7a7847adbd7d6649a..9c977c7cf66c30e0301fdc2c395d73a3287230bc 100644 (file)
 
 #include "msg/Policy.h"
 #include "crimson/common/throttle.h"
-#include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Socket.h"
 
 namespace crimson::net {
 
-class Dispatcher;
 class Protocol;
 class SocketMessenger;
 class SocketConnection;
@@ -55,7 +53,7 @@ class SocketConnection : public Connection {
 
  public:
   SocketConnection(SocketMessenger& messenger,
-                   ChainedDispatchersRef& dispatcher,
+                   ChainedDispatchers& dispatchers,
                    bool is_msgr2);
   ~SocketConnection() override;
 
index 11914d71bd3d1951cab9814b554ff5d71a761f26..07a86bfdfa9a3403e74798f8b7770373e8aac2e2 100644 (file)
@@ -19,7 +19,6 @@
 
 #include "auth/Auth.h"
 #include "Errors.h"
-#include "Dispatcher.h"
 #include "Socket.h"
 
 namespace {
@@ -111,10 +110,10 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs,
   });
 }
 
-seastar::future<> SocketMessenger::start(ChainedDispatchersRef chained_dispatchers) {
+seastar::future<> SocketMessenger::start(const std::list<Dispatcher*>& _dispatchers) {
   assert(seastar::this_shard_id() == master_sid);
 
-  dispatchers = chained_dispatchers;
+  dispatchers.assign(_dispatchers);
   if (listener) {
     // make sure we have already bound to a valid address
     ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
@@ -154,9 +153,7 @@ seastar::future<> SocketMessenger::shutdown()
 {
   assert(seastar::this_shard_id() == master_sid);
   return seastar::futurize_invoke([this] {
-    if (dispatchers) {
-      assert(dispatchers->empty());
-    }
+    assert(dispatchers.empty());
     if (listener) {
       auto d_listener = listener;
       listener = nullptr;
index 90c38accf108681a79d48f7eb12cc88c099043fb..33da5f1a4805ba7dde4baf76976be043c0346daa 100644 (file)
 
 #pragma once
 
+#include <list>
 #include <map>
-#include <optional>
 #include <set>
+#include <vector>
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sharded.hh>
@@ -35,15 +36,7 @@ class SocketMessenger final : public Messenger {
   seastar::promise<> shutdown_promise;
 
   FixedCPUServerSocket* listener = nullptr;
-  // as we want to unregister a dispatcher from the messengers when stopping
-  // that dispatcher, we have to use intrusive slist which, when used with
-  // "boost::intrusive::linear<true>", can tolerate ongoing iteration of the
-  // list when removing an element. However, the downside of this is that an
-  // element can only be attached to one slist. So, as we need to make multiple
-  // messenger reference the same set of dispatchers, we have to make them share
-  // the same ChainedDispatchers, which means registering/unregistering an element
-  // to one messenger will affect other messengers that share the same ChainedDispatchers.
-  ChainedDispatchersRef dispatchers;
+  ChainedDispatchers dispatchers;
   std::map<entity_addr_t, SocketConnectionRef> connections;
   std::set<SocketConnectionRef> accepting_conns;
   std::vector<SocketConnectionRef> closing_conns;
@@ -73,7 +66,7 @@ class SocketMessenger final : public Messenger {
   seastar::future<> try_bind(const entity_addrvec_t& addr,
                              uint32_t min_port, uint32_t max_port) override;
 
-  seastar::future<> start(ChainedDispatchersRef dispatchers) override;
+  seastar::future<> start(const std::list<Dispatcher*>& dispatchers) override;
 
   ConnectionRef connect(const entity_addr_t& peer_addr,
                         const entity_name_t& peer_name) override;
@@ -83,12 +76,14 @@ class SocketMessenger final : public Messenger {
     return shutdown_promise.get_future();
   }
 
-  void remove_dispatcher(Dispatcher& disp) override {
-    dispatchers->erase(disp);
+  void stop() override {
+    dispatchers.clear();
   }
-  bool dispatcher_chain_empty() const override {
-    return !dispatchers || dispatchers->empty();
+
+  bool is_started() const override {
+    return !dispatchers.empty();
   }
+
   seastar::future<> shutdown() override;
 
   void print(ostream& out) const override {
index 717a2db748eceee5ddbb8e38b36dad1b6403e95d..6aa9b93293135f9c68200556544d6127739a2af7 100644 (file)
@@ -1,6 +1,7 @@
 #include "crimson/common/log.h"
 #include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/Connection.h"
+#include "crimson/net/Dispatcher.h"
 #include "msg/Message.h"
 
 namespace {
@@ -9,12 +10,14 @@ namespace {
   }
 }
 
+namespace crimson::net {
+
 seastar::future<>
 ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
                                 MessageRef m) {
   try {
     for (auto& dispatcher : dispatchers) {
-      auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m);
+      auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m);
       if (dispatched) {
         return std::move(throttle_future
         ).handle_exception([conn] (std::exception_ptr eptr) {
@@ -40,7 +43,7 @@ void
 ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher.ms_handle_accept(conn);
+      dispatcher->ms_handle_accept(conn);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_accept() {}",
@@ -53,7 +56,7 @@ void
 ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) {
   try {
     for(auto& dispatcher : dispatchers) {
-      dispatcher.ms_handle_connect(conn);
+      dispatcher->ms_handle_connect(conn);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_connect() {}",
@@ -66,7 +69,7 @@ void
 ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher.ms_handle_reset(conn, is_replace);
+      dispatcher->ms_handle_reset(conn, is_replace);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_reset() {}",
@@ -79,7 +82,7 @@ void
 ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
   try {
     for (auto& dispatcher : dispatchers) {
-      dispatcher.ms_handle_remote_reset(conn);
+      dispatcher->ms_handle_remote_reset(conn);
     }
   } catch (...) {
     logger().error("{} got unexpected exception in ms_handle_remote_reset() {}",
@@ -87,3 +90,5 @@ ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) {
     ceph_abort();
   }
 }
+
+}
index 139a825e9445ebec4c392839788dd4e6b5b56326..9df7b36f1f77ae60c7110464d663798999a867f4 100644 (file)
@@ -3,27 +3,24 @@
 
 #pragma once
 
-#include <boost/intrusive/slist.hpp>
+#include <list>
 
-#include "crimson/net/Dispatcher.h"
+#include "Fwd.h"
 #include "crimson/common/log.h"
 
-using crimson::net::Dispatcher;
+namespace crimson::net {
+
+class Dispatcher;
 
 class ChainedDispatchers {
-  boost::intrusive::slist<
-    Dispatcher,
-    boost::intrusive::linear<true>,
-    boost::intrusive::cache_last<true>> dispatchers;
 public:
-  void push_front(Dispatcher& dispatcher) {
-    dispatchers.push_front(dispatcher);
-  }
-  void push_back(Dispatcher& dispatcher) {
-    dispatchers.push_back(dispatcher);
+  void assign(const std::list<Dispatcher*> _dispatchers) {
+    assert(empty());
+    assert(!_dispatchers.empty());
+    dispatchers = _dispatchers;
   }
-  void erase(Dispatcher& dispatcher) {
-    dispatchers.erase(dispatchers.iterator_to(dispatcher));
+  void clear() {
+    dispatchers.clear();
   }
   bool empty() const {
     return dispatchers.empty();
@@ -33,6 +30,9 @@ public:
   void ms_handle_connect(crimson::net::ConnectionRef conn);
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn);
+
+ private:
+  std::list<Dispatcher*> dispatchers;
 };
 
-using ChainedDispatchersRef = seastar::lw_shared_ptr<ChainedDispatchers>;
+}
index 0f8b20768e46d7d0084de7b8acfe54eff943360d..fcf1f30b4e77ddc9032a8872f4573cf364cd1678 100644 (file)
@@ -57,14 +57,10 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
                          SocketPolicy::lossy_client(0));
   back_msgr->set_policy(entity_name_t::TYPE_OSD,
                         SocketPolicy::lossy_client(0));
-  auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-  chained_dispatchers->push_back(*this);
   return seastar::when_all_succeed(start_messenger(*front_msgr,
-                                                  front_addrs,
-                                                  chained_dispatchers),
+                                                  front_addrs),
                                    start_messenger(*back_msgr,
-                                                  back_addrs,
-                                                  chained_dispatchers))
+                                                  back_addrs))
     .then_unpack([this] {
       timer.arm_periodic(
         std::chrono::seconds(local_conf()->osd_heartbeat_interval));
@@ -73,14 +69,13 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
 
 seastar::future<>
 Heartbeat::start_messenger(crimson::net::Messenger& msgr,
-                           const entity_addrvec_t& addrs,
-                          ChainedDispatchersRef chained_dispatchers)
+                           const entity_addrvec_t& addrs)
 {
   return msgr.try_bind(addrs,
                        local_conf()->ms_bind_port_min,
                        local_conf()->ms_bind_port_max)
-  .then([&msgr, chained_dispatchers]() mutable {
-    return msgr.start(chained_dispatchers);
+  .then([this, &msgr]() mutable {
+    return msgr.start(*this);
   });
 }
 
@@ -88,10 +83,8 @@ seastar::future<> Heartbeat::stop()
 {
   logger().info("{}", __func__);
   timer.cancel();
-  if (!front_msgr->dispatcher_chain_empty())
-    front_msgr->remove_dispatcher(*this);
-  if (!back_msgr->dispatcher_chain_empty())
-    back_msgr->remove_dispatcher(*this);
+  front_msgr->stop();
+  back_msgr->stop();
   return gate.close().then([this] {
     return seastar::when_all_succeed(front_msgr->shutdown(),
                                     back_msgr->shutdown());
index 9d85b526ca25380b9cd9508b5f95b7a1c7ef1cb8..3e875565696a985ec42b5a0e67bd4ee83299670d 100644 (file)
@@ -6,7 +6,6 @@
 #include <cstdint>
 #include <seastar/core/future.hh>
 #include "common/ceph_time.h"
-#include "crimson/net/chained_dispatchers.h"
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Fwd.h"
 
@@ -71,8 +70,7 @@ private:
   void add_reporter_peers(int whoami);
 
   seastar::future<> start_messenger(crimson::net::Messenger& msgr,
-                                   const entity_addrvec_t& addrs,
-                                   ChainedDispatchersRef);
+                                   const entity_addrvec_t& addrs);
 private:
   const osd_id_t whoami;
   const crimson::osd::ShardServices& service;
index c169ece16000b18e1838fc1cb481478fd864a6fa..eeb8915e668d4f13d4769ca27b49b26f7820b07f 100644 (file)
@@ -265,22 +265,22 @@ seastar::future<> OSD::start()
     cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT,
                              SocketPolicy::stateless_server(0));
 
-    auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-    chained_dispatchers->push_front(*mgrc);
-    chained_dispatchers->push_front(*monc);
-    chained_dispatchers->push_front(*this);
+    std::list<Dispatcher*> dispatchers;
+    dispatchers.push_front(mgrc.get());
+    dispatchers.push_front(monc.get());
+    dispatchers.push_front(this);
     return seastar::when_all_succeed(
       cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
                              local_conf()->ms_bind_port_min,
                              local_conf()->ms_bind_port_max)
-        .then([this, chained_dispatchers]() mutable {
-         return cluster_msgr->start(chained_dispatchers);
+        .then([this, dispatchers]() mutable {
+         return cluster_msgr->start(dispatchers);
        }),
       public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
                             local_conf()->ms_bind_port_min,
                             local_conf()->ms_bind_port_max)
-        .then([this, chained_dispatchers]() mutable {
-         return public_msgr->start(chained_dispatchers);
+        .then([this, dispatchers]() mutable {
+         return public_msgr->start(dispatchers);
        }));
   }).then_unpack([this] {
     return seastar::when_all_succeed(monc->start(),
@@ -452,16 +452,8 @@ seastar::future<> OSD::stop()
   return prepare_to_stop().then([this] {
     state.set_stopping();
     logger().debug("prepared to stop");
-    if (!public_msgr->dispatcher_chain_empty()) {
-      public_msgr->remove_dispatcher(*this);
-      public_msgr->remove_dispatcher(*mgrc);
-      public_msgr->remove_dispatcher(*monc);
-    }
-    if (!cluster_msgr->dispatcher_chain_empty()) {
-      cluster_msgr->remove_dispatcher(*this);
-      cluster_msgr->remove_dispatcher(*mgrc);
-      cluster_msgr->remove_dispatcher(*monc);
-    }
+    public_msgr->stop();
+    cluster_msgr->stop();
     auto gate_close_fut = gate.close();
     return asok->stop().then([this] {
       return heartbeat->stop();
index eac8be594279e18e2106f86046b2e6a11df13f1a..2f25fbe79a7d1a65e2eab7b7e6a5230b9ee8e215 100644 (file)
@@ -13,7 +13,6 @@
 #include "crimson/common/type_helpers.h"
 #include "crimson/common/auth_handler.h"
 #include "crimson/common/gated.h"
-#include "crimson/net/chained_dispatchers.h"
 #include "crimson/admin/admin_socket.h"
 #include "crimson/common/simple_lru.h"
 #include "crimson/common/shared_lru.h"
index d50e082e16600fc4f2a65024ced2eee9186e0f3e..3691a68537c2c08d67bf1851bce85f094745ae7f 100644 (file)
@@ -67,15 +67,13 @@ static seastar::future<> test_echo(unsigned rounds,
         msgr->set_require_authorizer(false);
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
-          return msgr->start(chained_dispatchers);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(*this);
         });
       }
       seastar::future<> shutdown() {
         ceph_assert(msgr);
-       msgr->remove_dispatcher(*this);
+       msgr->stop();
         return msgr->shutdown();
       }
     };
@@ -140,14 +138,12 @@ static seastar::future<> test_echo(unsigned rounds,
         msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->start(chained_dispatchers);
+        return msgr->start(*this);
       }
 
       seastar::future<> shutdown() {
         ceph_assert(msgr);
-       msgr->remove_dispatcher(*this);
+       msgr->stop();
         return msgr->shutdown();
       }
 
@@ -295,10 +291,8 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
-          return msgr->start(chained_dispatchers);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(*this);
         });
       }
     };
@@ -320,9 +314,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
         msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->start(chained_dispatchers);
+        return msgr->start(*this);
       }
     };
   };
@@ -352,11 +344,11 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
     return server->wait();
   }).finally([client] {
     logger().info("client shutdown...");
-    client->msgr->remove_dispatcher(*client);
+    client->msgr->stop();
     return client->msgr->shutdown();
   }).finally([server] {
     logger().info("server shutdown...");
-    server->msgr->remove_dispatcher(*server);
+    server->msgr->stop();
     return server->msgr->shutdown();
   }).finally([server, client] {
     logger().info("test_concurrent_dispatch() done!\n");
@@ -385,17 +377,15 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
-          return msgr->start(chained_dispatchers);
+        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->start(*this);
         });
       }
       entity_addr_t get_addr() const {
         return msgr->get_myaddr();
       }
       seastar::future<> shutdown() {
-       msgr->remove_dispatcher(*this);
+       msgr->stop();
         return msgr->shutdown();
       }
     };
@@ -421,9 +411,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
         msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return msgr->start(chained_dispatchers);
+        return msgr->start(*this);
       }
       void send_pings(const entity_addr_t& addr) {
         auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD);
@@ -440,7 +428,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
         });
       }
       seastar::future<> shutdown() {
-       msgr->remove_dispatcher(*this);
+       msgr->stop();
         return msgr->shutdown().then([this] {
           stop_send = true;
           return stopped_send_promise.get_future();
@@ -918,10 +906,8 @@ class FailoverSuite : public Dispatcher {
     test_msgr->set_auth_client(&dummy_auth);
     test_msgr->set_auth_server(&dummy_auth);
     test_msgr->interceptor = &interceptor;
-    auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-    chained_dispatchers->push_back(*this);
-    return test_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
-      return test_msgr->start(chained_dispatchers);
+    return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+      return test_msgr->start(*this);
     });
   }
 
@@ -1042,7 +1028,7 @@ class FailoverSuite : public Dispatcher {
   }
 
   seastar::future<> shutdown() {
-    test_msgr->remove_dispatcher(*this);
+    test_msgr->stop();
     return test_msgr->shutdown();
   }
 
@@ -1266,9 +1252,7 @@ class FailoverTest : public Dispatcher {
     cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0));
     cmd_msgr->set_auth_client(&dummy_auth);
     cmd_msgr->set_auth_server(&dummy_auth);
-    auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-    chained_dispatchers->push_back(*this);
-    return cmd_msgr->start(chained_dispatchers).then([this, cmd_peer_addr] {
+    return cmd_msgr->start(*this).then([this, cmd_peer_addr] {
       logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr);
       cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD);
       return pingpong();
@@ -1291,7 +1275,7 @@ class FailoverTest : public Dispatcher {
     return cmd_conn->send(m).then([] {
       return seastar::sleep(200ms);
     }).finally([this] {
-      cmd_msgr->remove_dispatcher(*this);
+      cmd_msgr->stop();
       return cmd_msgr->shutdown();
     });
   }
@@ -1427,10 +1411,8 @@ class FailoverSuitePeer : public Dispatcher {
     peer_msgr->set_default_policy(policy);
     peer_msgr->set_auth_client(&dummy_auth);
     peer_msgr->set_auth_server(&dummy_auth);
-    auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-    return peer_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable {
-      chained_dispatchers->push_back(*this);
-      return peer_msgr->start(chained_dispatchers);
+    return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+      return peer_msgr->start(*this);
     });
   }
 
@@ -1462,7 +1444,7 @@ class FailoverSuitePeer : public Dispatcher {
     : peer_msgr(peer_msgr), op_callback(op_callback) { }
 
   seastar::future<> shutdown() {
-    peer_msgr->remove_dispatcher(*this);
+    peer_msgr->stop();
     return peer_msgr->shutdown();
   }
 
@@ -1609,11 +1591,8 @@ class FailoverTestPeer : public Dispatcher {
     cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
     cmd_msgr->set_auth_client(&dummy_auth);
     cmd_msgr->set_auth_server(&dummy_auth);
-    auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-    chained_dispatchers->push_back(*this);
-    return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then(
-      [this, chained_dispatchers]() mutable {
-      return cmd_msgr->start(chained_dispatchers);
+    return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+      return cmd_msgr->start(*this);
     }).handle_exception_type([cmd_peer_addr](const std::system_error& e) {
       if (e.code() == std::errc::address_in_use) {
         logger().error("FailoverTestPeer::init({}) "
index 9e939a125fd7875dc82a8b58491ab231e65ccc1a..48f82f776f0b8e8f833808a49a0d6c6a3851d82d 100644 (file)
@@ -182,9 +182,7 @@ static seastar::future<> run(
             msgr->set_crc_data();
           }
           return msgr->bind(entity_addrvec_t{addr}).then([this] {
-           auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-           chained_dispatchers->push_back(*this);
-            return msgr->start(chained_dispatchers);
+            return msgr->start(*this);
           });
         });
       }
@@ -192,6 +190,7 @@ static seastar::future<> run(
         logger().info("{} shutdown...", lname);
         return seastar::smp::submit_to(msgr_sid, [this] {
           ceph_assert(msgr);
+          msgr->stop();
           return msgr->shutdown();
         });
       }
@@ -333,9 +332,7 @@ static seastar::future<> run(
       }
 
       seastar::future<> init(bool v1_crc_enabled) {
-       auto chained_dispatchers = seastar::make_lw_shared<ChainedDispatchers>();
-       chained_dispatchers->push_back(*this);
-        return container().invoke_on_all([v1_crc_enabled, chained_dispatchers] (auto& client) mutable {
+        return container().invoke_on_all([v1_crc_enabled] (auto& client) {
           if (client.is_active()) {
             client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid);
             client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
@@ -346,7 +343,7 @@ static seastar::future<> run(
               client.msgr->set_crc_header();
               client.msgr->set_crc_data();
             }
-            return client.msgr->start(chained_dispatchers);
+            return client.msgr->start(client);
           }
           return seastar::now();
         });
@@ -357,6 +354,7 @@ static seastar::future<> run(
           if (client.is_active()) {
             logger().info("{} shutdown...", client.lname);
             ceph_assert(client.msgr);
+            client.msgr->stop();
             return client.msgr->shutdown().then([&client] {
               return client.stop_dispatch_messages();
             });