From 44585adc78bded751b8b50d6304068a8c5186fa1 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 30 Nov 2020 11:50:36 +0800 Subject: [PATCH] crimson/net: make bind()/try_bind() return errorated future Also fixed callers to handle the error: abort immediately upon bind failure and report error. Previously, these callers didn't handle bind failures correctly and would result in misleading undefined behaviors. Signed-off-by: Yingxin Cheng --- src/crimson/common/errorator.h | 1 + src/crimson/net/Fwd.h | 2 + src/crimson/net/Messenger.h | 13 +++--- src/crimson/net/Socket.cc | 13 +++++- src/crimson/net/Socket.h | 7 ++- src/crimson/net/SocketMessenger.cc | 42 +++++++++++------- src/crimson/net/SocketMessenger.h | 8 ++-- src/crimson/osd/heartbeat.cc | 8 +++- src/crimson/osd/osd.cc | 16 +++++-- src/test/crimson/test_messenger.cc | 60 +++++++++++++++++--------- src/test/crimson/test_socket.cc | 56 ++++++++++++++++-------- src/tools/crimson/perf_crimson_msgr.cc | 9 +++- 12 files changed, 158 insertions(+), 77 deletions(-) diff --git a/src/crimson/common/errorator.h b/src/crimson/common/errorator.h index 4fdbb53b54869..31237f644a5dc 100644 --- a/src/crimson/common/errorator.h +++ b/src/crimson/common/errorator.h @@ -1017,6 +1017,7 @@ namespace ct_error { ct_error_code; using file_too_large = ct_error_code; + using address_in_use = ct_error_code; struct pass_further_all { template diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 8dab402b39621..77818704a85dc 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -23,6 +23,8 @@ #include "msg/MessageRef.h" #include "msg/msg_types.h" +#include "crimson/common/errorator.h" + using auth_proto_t = int; class AuthConnectionMeta; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 60326135bc6ea..9133cdd0356f4 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -66,16 +66,15 @@ public: return seastar::now(); } + using bind_ertr = crimson::errorator< + crimson::ct_error::address_in_use // The address (range) is already bound + >; /// bind to the given address - /// throws std::system_error with address_in_use if the addr is already bound - // TODO: use errorated future - virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0; + virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0; /// try to bind to the first unused port of given address - /// throws std::system_error with address_in_use if the range is unavailable - // TODO: use errorated future - virtual seastar::future<> try_bind(const entity_addrvec_t& addr, - uint32_t min_port, uint32_t max_port) = 0; + virtual bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) = 0; /// start the messenger virtual seastar::future<> start(const std::list&) = 0; diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index b5b044b994eca..8ad106dbdd7e2 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -198,7 +198,8 @@ void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocke } #endif -seastar::future<> FixedCPUServerSocket::listen(entity_addr_t addr) +FixedCPUServerSocket::listen_ertr::future<> +FixedCPUServerSocket::listen(entity_addr_t addr) { assert(seastar::this_shard_id() == cpu); logger().trace("FixedCPUServerSocket::listen({})...", addr); @@ -209,15 +210,23 @@ seastar::future<> FixedCPUServerSocket::listen(entity_addr_t addr) lo.reuse_address = true; lo.set_fixed_cpu(ss.cpu); ss.listener = seastar::listen(s_addr, lo); + }).then([] { + return true; }).handle_exception_type([addr] (const std::system_error& e) { if (e.code() == std::errc::address_in_use) { logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); - throw; } else { logger().error("FixedCPUServerSocket::listen({}): " "got unexpeted error {}", addr, e); ceph_abort(); } + return false; + }).then([] (bool success) -> listen_ertr::future<> { + if (success) { + return listen_ertr::now(); + } else { + return crimson::ct_error::address_in_use::make(); + } }); } diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 8b05a884896ef..d39a2517f959e 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -9,10 +9,10 @@ #include #include "include/buffer.h" -#include "msg/msg_types.h" #include "crimson/common/log.h" #include "Errors.h" +#include "Fwd.h" #ifdef UNIT_TESTS_BUILT #include "Interceptor.h" @@ -197,7 +197,10 @@ public: FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; - seastar::future<> listen(entity_addr_t addr); + using listen_ertr = crimson::errorator< + crimson::ct_error::address_in_use // The address is already bound + >; + listen_ertr::future<> listen(entity_addr_t addr); // fn_accept should be a nothrow function of type // seastar::future<>(SocketRef, entity_addr_t) diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 07a86bfdfa9a3..f1ad9bde2e5ea 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -48,7 +48,7 @@ seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) return Messenger::set_myaddrs(my_addrs); } -seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) +SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) { assert(seastar::this_shard_id() == master_sid); ceph_assert(addrs.front().get_family() == AF_INET); @@ -60,52 +60,64 @@ seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) } else { return seastar::now(); } - }).then([this] { + }).then([this] () -> bind_ertr::future<> { const entity_addr_t listen_addr = get_myaddr(); logger().debug("{} do_bind: try listen {}...", *this, listen_addr); if (!listener) { logger().warn("{} do_bind: listener doesn't exist", *this); - return seastar::now(); + return bind_ertr::now(); } return listener->listen(listen_addr); }); } -seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) +SocketMessenger::bind_ertr::future<> +SocketMessenger::bind(const entity_addrvec_t& addrs) { - return do_bind(addrs).then([this] { + return do_bind(addrs).safe_then([this] { logger().info("{} bind: done", *this); }); } -seastar::future<> +SocketMessenger::bind_ertr::future<> SocketMessenger::try_bind(const entity_addrvec_t& addrs, uint32_t min_port, uint32_t max_port) { auto addr = addrs.front(); if (addr.get_port() != 0) { - return do_bind(addrs).then([this] { + return do_bind(addrs).safe_then([this] { logger().info("{} try_bind: done", *this); }); } ceph_assert(min_port <= max_port); return seastar::do_with(uint32_t(min_port), [this, max_port, addr] (auto& port) { - return seastar::repeat([this, max_port, addr, &port] { + return seastar::repeat_until_value([this, max_port, addr, &port] { auto to_bind = addr; to_bind.set_port(port); - return do_bind(entity_addrvec_t{to_bind}).then([this] { + return do_bind(entity_addrvec_t{to_bind} + ).safe_then([this] () -> seastar::future> { logger().info("{} try_bind: done", *this); - return stop_t::yes; - }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { - assert(e.code() == std::errc::address_in_use); + return seastar::make_ready_future>( + std::make_optional(true)); + }, bind_ertr::all_same_way([this, max_port, &port] + (const std::error_code& e) mutable + -> seastar::future> { + assert(e == std::errc::address_in_use); logger().trace("{} try_bind: {} already used", *this, port); if (port == max_port) { - throw; + return seastar::make_ready_future>( + std::make_optional(false)); } ++port; - return stop_t::no; - }); + return seastar::make_ready_future>(); + })); + }).then([] (bool success) -> bind_ertr::future<> { + if (success) { + return bind_ertr::now(); + } else { + return crimson::ct_error::address_in_use::make(); + } }); }); } diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 33da5f1a4805b..cca955f3a3048 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -49,7 +49,7 @@ class SocketMessenger final : public Messenger { uint32_t global_seq = 0; bool started = false; - seastar::future<> do_bind(const entity_addrvec_t& addr); + bind_ertr::future<> do_bind(const entity_addrvec_t& addr); public: SocketMessenger(const entity_name_t& myname, @@ -61,10 +61,10 @@ class SocketMessenger final : public Messenger { // Messenger interfaces are assumed to be called from its own shard, but its // behavior should be symmetric when called from any shard. - seastar::future<> bind(const entity_addrvec_t& addr) override; + bind_ertr::future<> bind(const entity_addrvec_t& addr) override; - seastar::future<> try_bind(const entity_addrvec_t& addr, - uint32_t min_port, uint32_t max_port) override; + bind_ertr::future<> try_bind(const entity_addrvec_t& addr, + uint32_t min_port, uint32_t max_port) override; seastar::future<> start(const std::list& dispatchers) override; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index fcf1f30b4e77d..4c64e1573ac68 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -74,9 +74,13 @@ Heartbeat::start_messenger(crimson::net::Messenger& msgr, return msgr.try_bind(addrs, local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this, &msgr]() mutable { + .safe_then([this, &msgr]() mutable { return msgr.start(*this); - }); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [] (const std::error_code& e) { + logger().error("heartbeat messenger try_bind(): address range is unavailable."); + ceph_abort(); + })); } seastar::future<> Heartbeat::stop() diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index eeb8915e668d4..264932cfb02c4 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -273,15 +273,23 @@ seastar::future<> OSD::start() cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this, dispatchers]() mutable { + .safe_then([this, dispatchers]() mutable { return cluster_msgr->start(dispatchers); - }), + }, crimson::net::Messenger::bind_ertr::all_same_way( + [] (const std::error_code& e) { + logger().error("cluster messenger try_bind(): address range is unavailable."); + ceph_abort(); + })), public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this, dispatchers]() mutable { + .safe_then([this, dispatchers]() mutable { return public_msgr->start(dispatchers); - })); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [] (const std::error_code& e) { + logger().error("public messenger try_bind(): address range is unavailable."); + ceph_abort(); + }))); }).then_unpack([this] { return seastar::when_all_succeed(monc->start(), mgrc->start()); diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 3691a68537c2c..d5da8a94d0270 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -67,9 +67,14 @@ static seastar::future<> test_echo(unsigned rounds, msgr->set_require_authorizer(false); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start(*this); - }); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("test_echo(): " + "there is another instance running at {}", addr); + ceph_abort(); + })); } seastar::future<> shutdown() { ceph_assert(msgr); @@ -291,9 +296,14 @@ static seastar::future<> test_concurrent_dispatch(bool v2) msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start(*this); - }); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("test_concurrent_dispatch(): " + "there is another instance running at {}", addr); + ceph_abort(); + })); } }; @@ -377,9 +387,14 @@ seastar::future<> test_preemptive_shutdown(bool v2) { msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start(*this); - }); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("test_preemptive_shutdown(): " + "there is another instance running at {}", addr); + ceph_abort(); + })); } entity_addr_t get_addr() const { return msgr->get_myaddr(); @@ -906,9 +921,13 @@ class FailoverSuite : public Dispatcher { test_msgr->set_auth_client(&dummy_auth); test_msgr->set_auth_server(&dummy_auth); test_msgr->interceptor = &interceptor; - return test_msgr->bind(entity_addrvec_t{addr}).then([this] { + return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return test_msgr->start(*this); - }); + }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) { + logger().error("FailoverSuite: " + "there is another instance running at {}", addr); + ceph_abort(); + })); } seastar::future<> send_op(bool expect_reply=true) { @@ -1411,9 +1430,13 @@ class FailoverSuitePeer : public Dispatcher { peer_msgr->set_default_policy(policy); peer_msgr->set_auth_client(&dummy_auth); peer_msgr->set_auth_server(&dummy_auth); - return peer_msgr->bind(entity_addrvec_t{addr}).then([this] { + return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return peer_msgr->start(*this); - }); + }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) { + logger().error("FailoverSuitePeer: " + "there is another instance running at {}", addr); + ceph_abort(); + })); } seastar::future<> send_op() { @@ -1591,18 +1614,13 @@ class FailoverTestPeer : public Dispatcher { cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); - return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] { + return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] { return cmd_msgr->start(*this); - }).handle_exception_type([cmd_peer_addr](const std::system_error& e) { - if (e.code() == std::errc::address_in_use) { - logger().error("FailoverTestPeer::init({}) " - "likely there is another instance of " - "unittest_seastar_messenger running", cmd_peer_addr); - } else { - logger().error("FailoverTestPeer::init({}): {}", cmd_peer_addr, e.what()); - } - abort(); - }); + }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) { + logger().error("FailoverTestPeer: " + "there is another instance running at {}", cmd_peer_addr); + ceph_abort(); + })); } public: diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index bccfb36526a79..5be99c08e6628 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -43,7 +43,8 @@ future socket_connect() { future<> test_refused() { logger.info("test_refused()..."); return socket_connect().discard_result().then([] { - ceph_abort_msg("connection is not refused"); + logger.error("test_refused(): connection to {} is not refused", server_addr); + ceph_abort(); }).handle_exception_type([] (const std::system_error& e) { if (e.code() != std::errc::connection_refused) { logger.error("test_refused() got unexpeted error {}", e); @@ -60,26 +61,34 @@ future<> test_refused() { future<> test_bind_same() { logger.info("test_bind_same()..."); return FixedCPUServerSocket::create().then([] (auto pss1) { - return pss1->listen(server_addr).then([] { + return pss1->listen(server_addr).safe_then([] { // try to bind the same address return FixedCPUServerSocket::create().then([] (auto pss2) { - return pss2->listen(server_addr).then([] { - ceph_abort("Should raise address_in_use!"); - }).handle_exception_type([] (const std::system_error& e) { - assert(e.code() == std::errc::address_in_use); - // successful! - }).finally([pss2] { - return pss2->destroy(); - }).handle_exception_type([] (const std::system_error& e) { - if (e.code() != std::errc::address_in_use) { - logger.error("test_bind_same() got unexpeted error {}", e); - ceph_abort(); - } else { + return pss2->listen(server_addr).safe_then([] { + logger.error("test_bind_same() should raise address_in_use"); + ceph_abort(); + }, FixedCPUServerSocket::listen_ertr::all_same_way( + [] (const std::error_code& e) { + if (e == std::errc::address_in_use) { + // successful! logger.info("test_bind_same() ok\n"); + } else { + logger.error("test_bind_same() got unexpected error {}", e); + ceph_abort(); } + // Note: need to return a explicit ready future, or there will be a + // runtime error: member access within null pointer of type 'struct promise_base' + return seastar::now(); + })).then([pss2] { + return pss2->destroy(); }); }); - }).finally([pss1] { + }, FixedCPUServerSocket::listen_ertr::all_same_way( + [] (const std::error_code& e) { + logger.error("test_bind_same(): there is another instance running at {}", + server_addr); + ceph_abort(); + })).then([pss1] { return pss1->destroy(); }).handle_exception([] (auto eptr) { logger.error("test_bind_same() got unexpeted exception {}", eptr); @@ -91,14 +100,19 @@ future<> test_bind_same() { future<> test_accept() { logger.info("test_accept()"); return FixedCPUServerSocket::create().then([] (auto pss) { - return pss->listen(server_addr).then([pss] { + return pss->listen(server_addr).safe_then([pss] { return pss->accept([] (auto socket, auto paddr) { // simple accept return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable { return socket->close().finally([cleanup = std::move(socket)] {}); }); }); - }).then([] { + }, FixedCPUServerSocket::listen_ertr::all_same_way( + [] (const std::error_code& e) { + logger.error("test_accept(): there is another instance running at {}", + server_addr); + ceph_abort(); + })).then([] { return seastar::when_all( socket_connect().then([] (auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }), @@ -137,7 +151,13 @@ class SocketFactory { return seastar::smp::submit_to(1u, [psf] { return FixedCPUServerSocket::create().then([psf] (auto pss) { psf->pss = pss; - return pss->listen(server_addr); + return pss->listen(server_addr + ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way( + [] (const std::error_code& e) { + logger.error("dispatch_sockets(): there is another instance running at {}", + server_addr); + ceph_abort(); + })); }); }).then([psf] { return seastar::when_all_succeed( diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index 48f82f776f0b8..710dc39fbd2bb 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -181,9 +181,14 @@ static seastar::future<> run( msgr->set_crc_header(); msgr->set_crc_data(); } - return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->bind(entity_addrvec_t{addr}).safe_then([this] { return msgr->start(*this); - }); + }, crimson::net::Messenger::bind_ertr::all_same_way( + [addr] (const std::error_code& e) { + logger().error("Server: " + "there is another instance running at {}", addr); + ceph_abort(); + })); }); } seastar::future<> shutdown() { -- 2.39.5