]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: make bind()/try_bind() return errorated future
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 30 Nov 2020 03:50:36 +0000 (11:50 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 1 Dec 2020 01:34:42 +0000 (09:34 +0800)
Also fixed callers to handle the error: abort immediately upon bind
failure and report error. Previously, these callers didn't handle bind
failures correctly and would result in misleading undefined behaviors.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
12 files changed:
src/crimson/common/errorator.h
src/crimson/net/Fwd.h
src/crimson/net/Messenger.h
src/crimson/net/Socket.cc
src/crimson/net/Socket.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/SocketMessenger.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/osd.cc
src/test/crimson/test_messenger.cc
src/test/crimson/test_socket.cc
src/tools/crimson/perf_crimson_msgr.cc

index 4fdbb53b54869ff0e01812384138f991ab76484f..31237f644a5dc8fae99e16d41f6fd23e0453f23a 100644 (file)
@@ -1017,6 +1017,7 @@ namespace ct_error {
     ct_error_code<std::errc::resource_unavailable_try_again>;
   using file_too_large =
     ct_error_code<std::errc::file_too_large>;
+  using address_in_use = ct_error_code<std::errc::address_in_use>;
 
   struct pass_further_all {
     template <class ErrorT>
index 8dab402b3962189225d30932b1a59173903925b2..77818704a85dcd895f07bee71432bcd4d93962af 100644 (file)
@@ -23,6 +23,8 @@
 #include "msg/MessageRef.h"
 #include "msg/msg_types.h"
 
+#include "crimson/common/errorator.h"
+
 using auth_proto_t = int;
 
 class AuthConnectionMeta;
index 60326135bc6ea2d5572b04a4048a6b030d349da3..9133cdd0356f4c7bfb16d48c452ce011c2ffb808 100644 (file)
@@ -66,16 +66,15 @@ public:
     return seastar::now();
   }
 
+  using bind_ertr = crimson::errorator<
+    crimson::ct_error::address_in_use // The address (range) is already bound
+    >;
   /// bind to the given address
-  /// throws std::system_error with address_in_use if the addr is already bound
-  // TODO: use errorated future
-  virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;
+  virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0;
 
   /// try to bind to the first unused port of given address
-  /// throws std::system_error with address_in_use if the range is unavailable
-  // TODO: use errorated future
-  virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
-                                     uint32_t min_port, uint32_t max_port) = 0;
+  virtual bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+                                       uint32_t min_port, uint32_t max_port) = 0;
 
   /// start the messenger
   virtual seastar::future<> start(const std::list<Dispatcher*>&) = 0;
index b5b044b994eca1de3d57c6d90bde5594140fba8d..8ad106dbdd7e22525d8485603cbe1af9f13b7e0f 100644 (file)
@@ -198,7 +198,8 @@ void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocke
 }
 #endif
 
-seastar::future<> FixedCPUServerSocket::listen(entity_addr_t addr)
+FixedCPUServerSocket::listen_ertr::future<>
+FixedCPUServerSocket::listen(entity_addr_t addr)
 {
   assert(seastar::this_shard_id() == cpu);
   logger().trace("FixedCPUServerSocket::listen({})...", addr);
@@ -209,15 +210,23 @@ seastar::future<> FixedCPUServerSocket::listen(entity_addr_t addr)
     lo.reuse_address = true;
     lo.set_fixed_cpu(ss.cpu);
     ss.listener = seastar::listen(s_addr, lo);
+  }).then([] {
+    return true;
   }).handle_exception_type([addr] (const std::system_error& e) {
     if (e.code() == std::errc::address_in_use) {
       logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
-      throw;
     } else {
       logger().error("FixedCPUServerSocket::listen({}): "
                      "got unexpeted error {}", addr, e);
       ceph_abort();
     }
+    return false;
+  }).then([] (bool success) -> listen_ertr::future<> {
+    if (success) {
+      return listen_ertr::now();
+    } else {
+      return crimson::ct_error::address_in_use::make();
+    }
   });
 }
 
index 8b05a884896ef1cc770e3ae422e907ff596574f8..d39a2517f959e6c3064d707d1aed649fc11b3613 100644 (file)
@@ -9,10 +9,10 @@
 #include <seastar/net/packet.hh>
 
 #include "include/buffer.h"
-#include "msg/msg_types.h"
 
 #include "crimson/common/log.h"
 #include "Errors.h"
+#include "Fwd.h"
 
 #ifdef UNIT_TESTS_BUILT
 #include "Interceptor.h"
@@ -197,7 +197,10 @@ public:
   FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
   FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
 
-  seastar::future<> listen(entity_addr_t addr);
+  using listen_ertr = crimson::errorator<
+    crimson::ct_error::address_in_use // The address is already bound
+    >;
+  listen_ertr::future<> listen(entity_addr_t addr);
 
   // fn_accept should be a nothrow function of type
   // seastar::future<>(SocketRef, entity_addr_t)
index 07a86bfdfa9a3403e74798f8b7770373e8aac2e2..f1ad9bde2e5eaf9eba7b3896f723a152ee490efd 100644 (file)
@@ -48,7 +48,7 @@ seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
   return Messenger::set_myaddrs(my_addrs);
 }
 
-seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
 {
   assert(seastar::this_shard_id() == master_sid);
   ceph_assert(addrs.front().get_family() == AF_INET);
@@ -60,52 +60,64 @@ seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
     } else {
       return seastar::now();
     }
-  }).then([this] {
+  }).then([this] () -> bind_ertr::future<> {
     const entity_addr_t listen_addr = get_myaddr();
     logger().debug("{} do_bind: try listen {}...", *this, listen_addr);
     if (!listener) {
       logger().warn("{} do_bind: listener doesn't exist", *this);
-      return seastar::now();
+      return bind_ertr::now();
     }
     return listener->listen(listen_addr);
   });
 }
 
-seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
 {
-  return do_bind(addrs).then([this] {
+  return do_bind(addrs).safe_then([this] {
     logger().info("{} bind: done", *this);
   });
 }
 
-seastar::future<>
+SocketMessenger::bind_ertr::future<>
 SocketMessenger::try_bind(const entity_addrvec_t& addrs,
                           uint32_t min_port, uint32_t max_port)
 {
   auto addr = addrs.front();
   if (addr.get_port() != 0) {
-    return do_bind(addrs).then([this] {
+    return do_bind(addrs).safe_then([this] {
       logger().info("{} try_bind: done", *this);
     });
   }
   ceph_assert(min_port <= max_port);
   return seastar::do_with(uint32_t(min_port),
                           [this, max_port, addr] (auto& port) {
-    return seastar::repeat([this, max_port, addr, &port] {
+    return seastar::repeat_until_value([this, max_port, addr, &port] {
       auto to_bind = addr;
       to_bind.set_port(port);
-      return do_bind(entity_addrvec_t{to_bind}).then([this] {
+      return do_bind(entity_addrvec_t{to_bind}
+      ).safe_then([this] () -> seastar::future<std::optional<bool>> {
         logger().info("{} try_bind: done", *this);
-        return stop_t::yes;
-      }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
-        assert(e.code() == std::errc::address_in_use);
+        return seastar::make_ready_future<std::optional<bool>>(
+            std::make_optional<bool>(true));
+      }, bind_ertr::all_same_way([this, max_port, &port]
+                                 (const std::error_code& e) mutable
+                                 -> seastar::future<std::optional<bool>> {
+        assert(e == std::errc::address_in_use);
         logger().trace("{} try_bind: {} already used", *this, port);
         if (port == max_port) {
-          throw;
+          return seastar::make_ready_future<std::optional<bool>>(
+              std::make_optional<bool>(false));
         }
         ++port;
-        return stop_t::no;
-      });
+        return seastar::make_ready_future<std::optional<bool>>();
+      }));
+    }).then([] (bool success) -> bind_ertr::future<> {
+      if (success) {
+        return bind_ertr::now();
+      } else {
+        return crimson::ct_error::address_in_use::make();
+      }
     });
   });
 }
index 33da5f1a4805ba7dde4baf76976be043c0346daa..cca955f3a30486ac5e77ea58dc822f31ac397298 100644 (file)
@@ -49,7 +49,7 @@ class SocketMessenger final : public Messenger {
   uint32_t global_seq = 0;
   bool started = false;
 
-  seastar::future<> do_bind(const entity_addrvec_t& addr);
+  bind_ertr::future<> do_bind(const entity_addrvec_t& addr);
 
  public:
   SocketMessenger(const entity_name_t& myname,
@@ -61,10 +61,10 @@ class SocketMessenger final : public Messenger {
 
   // Messenger interfaces are assumed to be called from its own shard, but its
   // behavior should be symmetric when called from any shard.
-  seastar::future<> bind(const entity_addrvec_t& addr) override;
+  bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
 
-  seastar::future<> try_bind(const entity_addrvec_t& addr,
-                             uint32_t min_port, uint32_t max_port) override;
+  bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+                               uint32_t min_port, uint32_t max_port) override;
 
   seastar::future<> start(const std::list<Dispatcher*>& dispatchers) override;
 
index fcf1f30b4e77ddc9032a8872f4573cf364cd1678..4c64e1573ac6847b5d64cfaa2eceb0985b78bb0c 100644 (file)
@@ -74,9 +74,13 @@ Heartbeat::start_messenger(crimson::net::Messenger& msgr,
   return msgr.try_bind(addrs,
                        local_conf()->ms_bind_port_min,
                        local_conf()->ms_bind_port_max)
-  .then([this, &msgr]() mutable {
+  .safe_then([this, &msgr]() mutable {
     return msgr.start(*this);
-  });
+  }, crimson::net::Messenger::bind_ertr::all_same_way(
+      [] (const std::error_code& e) {
+    logger().error("heartbeat messenger try_bind(): address range is unavailable.");
+    ceph_abort();
+  }));
 }
 
 seastar::future<> Heartbeat::stop()
index eeb8915e668d4f13d4769ca27b49b26f7820b07f..264932cfb02c464b792cb19a9c6722b48f9d814e 100644 (file)
@@ -273,15 +273,23 @@ seastar::future<> OSD::start()
       cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
                              local_conf()->ms_bind_port_min,
                              local_conf()->ms_bind_port_max)
-        .then([this, dispatchers]() mutable {
+        .safe_then([this, dispatchers]() mutable {
          return cluster_msgr->start(dispatchers);
-       }),
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          logger().error("cluster messenger try_bind(): address range is unavailable.");
+          ceph_abort();
+        })),
       public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
                             local_conf()->ms_bind_port_min,
                             local_conf()->ms_bind_port_max)
-        .then([this, dispatchers]() mutable {
+        .safe_then([this, dispatchers]() mutable {
          return public_msgr->start(dispatchers);
-       }));
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          logger().error("public messenger try_bind(): address range is unavailable.");
+          ceph_abort();
+        })));
   }).then_unpack([this] {
     return seastar::when_all_succeed(monc->start(),
                                      mgrc->start());
index 3691a68537c2c08d67bf1851bce85f094745ae7f..d5da8a94d027059b429c44c4951704a95ca272a9 100644 (file)
@@ -67,9 +67,14 @@ static seastar::future<> test_echo(unsigned rounds,
         msgr->set_require_authorizer(false);
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+        return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
           return msgr->start(*this);
-        });
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [addr] (const std::error_code& e) {
+          logger().error("test_echo(): "
+                         "there is another instance running at {}", addr);
+          ceph_abort();
+        }));
       }
       seastar::future<> shutdown() {
         ceph_assert(msgr);
@@ -291,9 +296,14 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+        return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
           return msgr->start(*this);
-        });
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [addr] (const std::error_code& e) {
+          logger().error("test_concurrent_dispatch(): "
+                         "there is another instance running at {}", addr);
+          ceph_abort();
+        }));
       }
     };
 
@@ -377,9 +387,14 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
         msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
         msgr->set_auth_client(&dummy_auth);
         msgr->set_auth_server(&dummy_auth);
-        return msgr->bind(entity_addrvec_t{addr}).then([this] {
+        return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
           return msgr->start(*this);
-        });
+        }, crimson::net::Messenger::bind_ertr::all_same_way(
+            [addr] (const std::error_code& e) {
+          logger().error("test_preemptive_shutdown(): "
+                         "there is another instance running at {}", addr);
+          ceph_abort();
+        }));
       }
       entity_addr_t get_addr() const {
         return msgr->get_myaddr();
@@ -906,9 +921,13 @@ class FailoverSuite : public Dispatcher {
     test_msgr->set_auth_client(&dummy_auth);
     test_msgr->set_auth_server(&dummy_auth);
     test_msgr->interceptor = &interceptor;
-    return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+    return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
       return test_msgr->start(*this);
-    });
+    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+      logger().error("FailoverSuite: "
+                     "there is another instance running at {}", addr);
+      ceph_abort();
+    }));
   }
 
   seastar::future<> send_op(bool expect_reply=true) {
@@ -1411,9 +1430,13 @@ class FailoverSuitePeer : public Dispatcher {
     peer_msgr->set_default_policy(policy);
     peer_msgr->set_auth_client(&dummy_auth);
     peer_msgr->set_auth_server(&dummy_auth);
-    return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+    return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
       return peer_msgr->start(*this);
-    });
+    }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+      logger().error("FailoverSuitePeer: "
+                     "there is another instance running at {}", addr);
+      ceph_abort();
+    }));
   }
 
   seastar::future<> send_op() {
@@ -1591,18 +1614,13 @@ class FailoverTestPeer : public Dispatcher {
     cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
     cmd_msgr->set_auth_client(&dummy_auth);
     cmd_msgr->set_auth_server(&dummy_auth);
-    return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+    return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] {
       return cmd_msgr->start(*this);
-    }).handle_exception_type([cmd_peer_addr](const std::system_error& e) {
-      if (e.code() == std::errc::address_in_use) {
-        logger().error("FailoverTestPeer::init({}) "
-                       "likely there is another instance of "
-                       "unittest_seastar_messenger running", cmd_peer_addr);
-      } else {
-        logger().error("FailoverTestPeer::init({}): {}", cmd_peer_addr, e.what());
-      }
-      abort();
-    });
+    }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) {
+      logger().error("FailoverTestPeer: "
+                     "there is another instance running at {}", cmd_peer_addr);
+      ceph_abort();
+    }));
   }
 
  public:
index bccfb36526a796505504cb52dd597b22af260002..5be99c08e6628f00d8ff48661b98a4c4cc56a022 100644 (file)
@@ -43,7 +43,8 @@ future<SocketRef> socket_connect() {
 future<> test_refused() {
   logger.info("test_refused()...");
   return socket_connect().discard_result().then([] {
-    ceph_abort_msg("connection is not refused");
+    logger.error("test_refused(): connection to {} is not refused", server_addr);
+    ceph_abort();
   }).handle_exception_type([] (const std::system_error& e) {
     if (e.code() != std::errc::connection_refused) {
       logger.error("test_refused() got unexpeted error {}", e);
@@ -60,26 +61,34 @@ future<> test_refused() {
 future<> test_bind_same() {
   logger.info("test_bind_same()...");
   return FixedCPUServerSocket::create().then([] (auto pss1) {
-    return pss1->listen(server_addr).then([] {
+    return pss1->listen(server_addr).safe_then([] {
       // try to bind the same address
       return FixedCPUServerSocket::create().then([] (auto pss2) {
-        return pss2->listen(server_addr).then([] {
-          ceph_abort("Should raise address_in_use!");
-        }).handle_exception_type([] (const std::system_error& e) {
-          assert(e.code() == std::errc::address_in_use);
-          // successful!
-        }).finally([pss2] {
-          return pss2->destroy();
-        }).handle_exception_type([] (const std::system_error& e) {
-          if (e.code() != std::errc::address_in_use) {
-            logger.error("test_bind_same() got unexpeted error {}", e);
-            ceph_abort();
-          } else {
+        return pss2->listen(server_addr).safe_then([] {
+          logger.error("test_bind_same() should raise address_in_use");
+          ceph_abort();
+        }, FixedCPUServerSocket::listen_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          if (e == std::errc::address_in_use) {
+            // successful!
             logger.info("test_bind_same() ok\n");
+          } else {
+            logger.error("test_bind_same() got unexpected error {}", e);
+            ceph_abort();
           }
+          // Note: need to return a explicit ready future, or there will be a
+          // runtime error: member access within null pointer of type 'struct promise_base'
+          return seastar::now();
+        })).then([pss2] {
+          return pss2->destroy();
         });
       });
-    }).finally([pss1] {
+    }, FixedCPUServerSocket::listen_ertr::all_same_way(
+        [] (const std::error_code& e) {
+      logger.error("test_bind_same(): there is another instance running at {}",
+                   server_addr);
+      ceph_abort();
+    })).then([pss1] {
       return pss1->destroy();
     }).handle_exception([] (auto eptr) {
       logger.error("test_bind_same() got unexpeted exception {}", eptr);
@@ -91,14 +100,19 @@ future<> test_bind_same() {
 future<> test_accept() {
   logger.info("test_accept()");
   return FixedCPUServerSocket::create().then([] (auto pss) {
-    return pss->listen(server_addr).then([pss] {
+    return pss->listen(server_addr).safe_then([pss] {
       return pss->accept([] (auto socket, auto paddr) {
         // simple accept
         return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
           return socket->close().finally([cleanup = std::move(socket)] {});
         });
       });
-    }).then([] {
+    }, FixedCPUServerSocket::listen_ertr::all_same_way(
+        [] (const std::error_code& e) {
+      logger.error("test_accept(): there is another instance running at {}",
+                   server_addr);
+      ceph_abort();
+    })).then([] {
       return seastar::when_all(
         socket_connect().then([] (auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); }),
@@ -137,7 +151,13 @@ class SocketFactory {
     return seastar::smp::submit_to(1u, [psf] {
       return FixedCPUServerSocket::create().then([psf] (auto pss) {
         psf->pss = pss;
-        return pss->listen(server_addr);
+        return pss->listen(server_addr
+        ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way(
+            [] (const std::error_code& e) {
+          logger.error("dispatch_sockets(): there is another instance running at {}",
+                       server_addr);
+          ceph_abort();
+        }));
       });
     }).then([psf] {
       return seastar::when_all_succeed(
index 48f82f776f0b8e8f833808a49a0d6c6a3851d82d..710dc39fbd2bb17c387a07774952d0cc65214def 100644 (file)
@@ -181,9 +181,14 @@ static seastar::future<> run(
             msgr->set_crc_header();
             msgr->set_crc_data();
           }
-          return msgr->bind(entity_addrvec_t{addr}).then([this] {
+          return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
             return msgr->start(*this);
-          });
+          }, crimson::net::Messenger::bind_ertr::all_same_way(
+              [addr] (const std::error_code& e) {
+            logger().error("Server: "
+                           "there is another instance running at {}", addr);
+            ceph_abort();
+          }));
         });
       }
       seastar::future<> shutdown() {