ct_error_code<std::errc::resource_unavailable_try_again>;
using file_too_large =
ct_error_code<std::errc::file_too_large>;
+ using address_in_use = ct_error_code<std::errc::address_in_use>;
struct pass_further_all {
template <class ErrorT>
#include "msg/MessageRef.h"
#include "msg/msg_types.h"
+#include "crimson/common/errorator.h"
+
using auth_proto_t = int;
class AuthConnectionMeta;
return seastar::now();
}
+ using bind_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use // The address (range) is already bound
+ >;
/// bind to the given address
- /// throws std::system_error with address_in_use if the addr is already bound
- // TODO: use errorated future
- virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;
+ virtual bind_ertr::future<> bind(const entity_addrvec_t& addr) = 0;
/// try to bind to the first unused port of given address
- /// throws std::system_error with address_in_use if the range is unavailable
- // TODO: use errorated future
- virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
- uint32_t min_port, uint32_t max_port) = 0;
+ virtual bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) = 0;
/// start the messenger
virtual seastar::future<> start(const std::list<Dispatcher*>&) = 0;
}
#endif
-seastar::future<> FixedCPUServerSocket::listen(entity_addr_t addr)
+FixedCPUServerSocket::listen_ertr::future<>
+FixedCPUServerSocket::listen(entity_addr_t addr)
{
assert(seastar::this_shard_id() == cpu);
logger().trace("FixedCPUServerSocket::listen({})...", addr);
lo.reuse_address = true;
lo.set_fixed_cpu(ss.cpu);
ss.listener = seastar::listen(s_addr, lo);
+ }).then([] {
+ return true;
}).handle_exception_type([addr] (const std::system_error& e) {
if (e.code() == std::errc::address_in_use) {
logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
- throw;
} else {
logger().error("FixedCPUServerSocket::listen({}): "
"got unexpeted error {}", addr, e);
ceph_abort();
}
+ return false;
+ }).then([] (bool success) -> listen_ertr::future<> {
+ if (success) {
+ return listen_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
});
}
#include <seastar/net/packet.hh>
#include "include/buffer.h"
-#include "msg/msg_types.h"
#include "crimson/common/log.h"
#include "Errors.h"
+#include "Fwd.h"
#ifdef UNIT_TESTS_BUILT
#include "Interceptor.h"
FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
- seastar::future<> listen(entity_addr_t addr);
+ using listen_ertr = crimson::errorator<
+ crimson::ct_error::address_in_use // The address is already bound
+ >;
+ listen_ertr::future<> listen(entity_addr_t addr);
// fn_accept should be a nothrow function of type
// seastar::future<>(SocketRef, entity_addr_t)
return Messenger::set_myaddrs(my_addrs);
}
-seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
{
assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
} else {
return seastar::now();
}
- }).then([this] {
+ }).then([this] () -> bind_ertr::future<> {
const entity_addr_t listen_addr = get_myaddr();
logger().debug("{} do_bind: try listen {}...", *this, listen_addr);
if (!listener) {
logger().warn("{} do_bind: listener doesn't exist", *this);
- return seastar::now();
+ return bind_ertr::now();
}
return listener->listen(listen_addr);
});
}
-seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
+SocketMessenger::bind_ertr::future<>
+SocketMessenger::bind(const entity_addrvec_t& addrs)
{
- return do_bind(addrs).then([this] {
+ return do_bind(addrs).safe_then([this] {
logger().info("{} bind: done", *this);
});
}
-seastar::future<>
+SocketMessenger::bind_ertr::future<>
SocketMessenger::try_bind(const entity_addrvec_t& addrs,
uint32_t min_port, uint32_t max_port)
{
auto addr = addrs.front();
if (addr.get_port() != 0) {
- return do_bind(addrs).then([this] {
+ return do_bind(addrs).safe_then([this] {
logger().info("{} try_bind: done", *this);
});
}
ceph_assert(min_port <= max_port);
return seastar::do_with(uint32_t(min_port),
[this, max_port, addr] (auto& port) {
- return seastar::repeat([this, max_port, addr, &port] {
+ return seastar::repeat_until_value([this, max_port, addr, &port] {
auto to_bind = addr;
to_bind.set_port(port);
- return do_bind(entity_addrvec_t{to_bind}).then([this] {
+ return do_bind(entity_addrvec_t{to_bind}
+ ).safe_then([this] () -> seastar::future<std::optional<bool>> {
logger().info("{} try_bind: done", *this);
- return stop_t::yes;
- }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
- assert(e.code() == std::errc::address_in_use);
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(true));
+ }, bind_ertr::all_same_way([this, max_port, &port]
+ (const std::error_code& e) mutable
+ -> seastar::future<std::optional<bool>> {
+ assert(e == std::errc::address_in_use);
logger().trace("{} try_bind: {} already used", *this, port);
if (port == max_port) {
- throw;
+ return seastar::make_ready_future<std::optional<bool>>(
+ std::make_optional<bool>(false));
}
++port;
- return stop_t::no;
- });
+ return seastar::make_ready_future<std::optional<bool>>();
+ }));
+ }).then([] (bool success) -> bind_ertr::future<> {
+ if (success) {
+ return bind_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
});
});
}
uint32_t global_seq = 0;
bool started = false;
- seastar::future<> do_bind(const entity_addrvec_t& addr);
+ bind_ertr::future<> do_bind(const entity_addrvec_t& addr);
public:
SocketMessenger(const entity_name_t& myname,
// Messenger interfaces are assumed to be called from its own shard, but its
// behavior should be symmetric when called from any shard.
- seastar::future<> bind(const entity_addrvec_t& addr) override;
+ bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
- seastar::future<> try_bind(const entity_addrvec_t& addr,
- uint32_t min_port, uint32_t max_port) override;
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) override;
seastar::future<> start(const std::list<Dispatcher*>& dispatchers) override;
return msgr.try_bind(addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this, &msgr]() mutable {
+ .safe_then([this, &msgr]() mutable {
return msgr.start(*this);
- });
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("heartbeat messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ }));
}
seastar::future<> Heartbeat::stop()
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this, dispatchers]() mutable {
+ .safe_then([this, dispatchers]() mutable {
return cluster_msgr->start(dispatchers);
- }),
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("cluster messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ })),
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
- .then([this, dispatchers]() mutable {
+ .safe_then([this, dispatchers]() mutable {
return public_msgr->start(dispatchers);
- }));
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger().error("public messenger try_bind(): address range is unavailable.");
+ ceph_abort();
+ })));
}).then_unpack([this] {
return seastar::when_all_succeed(monc->start(),
mgrc->start());
msgr->set_require_authorizer(false);
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return msgr->start(*this);
- });
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("test_echo(): "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
}
seastar::future<> shutdown() {
ceph_assert(msgr);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return msgr->start(*this);
- });
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("test_concurrent_dispatch(): "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
}
};
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return msgr->start(*this);
- });
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("test_preemptive_shutdown(): "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
}
entity_addr_t get_addr() const {
return msgr->get_myaddr();
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
test_msgr->interceptor = &interceptor;
- return test_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return test_msgr->start(*this);
- });
+ }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ logger().error("FailoverSuite: "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
}
seastar::future<> send_op(bool expect_reply=true) {
peer_msgr->set_default_policy(policy);
peer_msgr->set_auth_client(&dummy_auth);
peer_msgr->set_auth_server(&dummy_auth);
- return peer_msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return peer_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return peer_msgr->start(*this);
- });
+ }, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {
+ logger().error("FailoverSuitePeer: "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
}
seastar::future<> send_op() {
cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0));
cmd_msgr->set_auth_client(&dummy_auth);
cmd_msgr->set_auth_server(&dummy_auth);
- return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] {
+ return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).safe_then([this] {
return cmd_msgr->start(*this);
- }).handle_exception_type([cmd_peer_addr](const std::system_error& e) {
- if (e.code() == std::errc::address_in_use) {
- logger().error("FailoverTestPeer::init({}) "
- "likely there is another instance of "
- "unittest_seastar_messenger running", cmd_peer_addr);
- } else {
- logger().error("FailoverTestPeer::init({}): {}", cmd_peer_addr, e.what());
- }
- abort();
- });
+ }, Messenger::bind_ertr::all_same_way([cmd_peer_addr] (const std::error_code& e) {
+ logger().error("FailoverTestPeer: "
+ "there is another instance running at {}", cmd_peer_addr);
+ ceph_abort();
+ }));
}
public:
future<> test_refused() {
logger.info("test_refused()...");
return socket_connect().discard_result().then([] {
- ceph_abort_msg("connection is not refused");
+ logger.error("test_refused(): connection to {} is not refused", server_addr);
+ ceph_abort();
}).handle_exception_type([] (const std::system_error& e) {
if (e.code() != std::errc::connection_refused) {
logger.error("test_refused() got unexpeted error {}", e);
future<> test_bind_same() {
logger.info("test_bind_same()...");
return FixedCPUServerSocket::create().then([] (auto pss1) {
- return pss1->listen(server_addr).then([] {
+ return pss1->listen(server_addr).safe_then([] {
// try to bind the same address
return FixedCPUServerSocket::create().then([] (auto pss2) {
- return pss2->listen(server_addr).then([] {
- ceph_abort("Should raise address_in_use!");
- }).handle_exception_type([] (const std::system_error& e) {
- assert(e.code() == std::errc::address_in_use);
- // successful!
- }).finally([pss2] {
- return pss2->destroy();
- }).handle_exception_type([] (const std::system_error& e) {
- if (e.code() != std::errc::address_in_use) {
- logger.error("test_bind_same() got unexpeted error {}", e);
- ceph_abort();
- } else {
+ return pss2->listen(server_addr).safe_then([] {
+ logger.error("test_bind_same() should raise address_in_use");
+ ceph_abort();
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ if (e == std::errc::address_in_use) {
+ // successful!
logger.info("test_bind_same() ok\n");
+ } else {
+ logger.error("test_bind_same() got unexpected error {}", e);
+ ceph_abort();
}
+ // Note: need to return a explicit ready future, or there will be a
+ // runtime error: member access within null pointer of type 'struct promise_base'
+ return seastar::now();
+ })).then([pss2] {
+ return pss2->destroy();
});
});
- }).finally([pss1] {
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger.error("test_bind_same(): there is another instance running at {}",
+ server_addr);
+ ceph_abort();
+ })).then([pss1] {
return pss1->destroy();
}).handle_exception([] (auto eptr) {
logger.error("test_bind_same() got unexpeted exception {}", eptr);
future<> test_accept() {
logger.info("test_accept()");
return FixedCPUServerSocket::create().then([] (auto pss) {
- return pss->listen(server_addr).then([pss] {
+ return pss->listen(server_addr).safe_then([pss] {
return pss->accept([] (auto socket, auto paddr) {
// simple accept
return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
return socket->close().finally([cleanup = std::move(socket)] {});
});
});
- }).then([] {
+ }, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger.error("test_accept(): there is another instance running at {}",
+ server_addr);
+ ceph_abort();
+ })).then([] {
return seastar::when_all(
socket_connect().then([] (auto socket) {
return socket->close().finally([cleanup = std::move(socket)] {}); }),
return seastar::smp::submit_to(1u, [psf] {
return FixedCPUServerSocket::create().then([psf] (auto pss) {
psf->pss = pss;
- return pss->listen(server_addr);
+ return pss->listen(server_addr
+ ).safe_then([]{}, FixedCPUServerSocket::listen_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ logger.error("dispatch_sockets(): there is another instance running at {}",
+ server_addr);
+ ceph_abort();
+ }));
});
}).then([psf] {
return seastar::when_all_succeed(
msgr->set_crc_header();
msgr->set_crc_data();
}
- return msgr->bind(entity_addrvec_t{addr}).then([this] {
+ return msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return msgr->start(*this);
- });
+ }, crimson::net::Messenger::bind_ertr::all_same_way(
+ [addr] (const std::error_code& e) {
+ logger().error("Server: "
+ "there is another instance running at {}", addr);
+ ceph_abort();
+ }));
});
}
seastar::future<> shutdown() {