From a0e0624cd9ab09d3c1d019377f9bf586c61118d2 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 20 Jun 2019 19:26:08 +0800 Subject: [PATCH] crimson/net: add shutdown interface to Socket * Support preemptive shutdown. * Implement socket unittests to verify all shutdown possibilities. * Added missing error::broken_pipe. Signed-off-by: Yingxin Cheng --- src/crimson/net/Errors.cc | 10 ++ src/crimson/net/Errors.h | 1 + src/crimson/net/Socket.cc | 39 +++++ src/crimson/net/Socket.h | 29 +++- src/test/crimson/test_socket.cc | 279 ++++++++++++++++++++++++++++++++ 5 files changed, 353 insertions(+), 5 deletions(-) diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc index a5748019c84..abd34a809f9 100644 --- a/src/crimson/net/Errors.cc +++ b/src/crimson/net/Errors.cc @@ -47,6 +47,8 @@ const std::error_category& net_category() return "invalid argument"; case error::address_in_use: return "address in use"; + case error::broken_pipe: + return "broken pipe"; default: return "unknown"; } @@ -67,6 +69,8 @@ const std::error_category& net_category() return std::errc::invalid_argument; case error::address_in_use: return std::errc::address_in_use; + case error::broken_pipe: + return std::errc::broken_pipe; default: return std::error_condition(ev, *this); } @@ -92,6 +96,9 @@ const std::error_category& net_category() case error::address_in_use: return cond == std::errc::address_in_use || cond == std::error_condition(EADDRINUSE, std::system_category()); + case error::broken_pipe: + return cond == std::errc::broken_pipe + || cond == std::error_condition(EPIPE, std::system_category()); default: return false; } @@ -117,6 +124,9 @@ const std::error_category& net_category() case error::address_in_use: return code == std::errc::address_in_use || code == std::error_code(EADDRINUSE, std::system_category()); + case error::broken_pipe: + return code == std::errc::broken_pipe + || code == std::error_code(EPIPE, std::system_category()); default: return false; } diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h index aa81adcfe6e..d8c712bf2d0 100644 --- a/src/crimson/net/Errors.h +++ b/src/crimson/net/Errors.h @@ -31,6 +31,7 @@ enum class error { corrupted_message, invalid_argument, address_in_use, + broken_pipe, }; /// net error category diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 3ea65d41a54..ca4e80db5dd 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -3,12 +3,17 @@ #include "Socket.h" +#include "crimson/common/log.h" #include "Errors.h" namespace ceph::net { namespace { +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + // an input_stream consumer that reads buffer segments into a bufferlist up to // the given number of remaining bytes struct bufferlist_consumer { @@ -81,4 +86,38 @@ Socket::read_exactly(size_t bytes) { }); } +void Socket::shutdown() { +#ifndef NDEBUG + ceph_assert(!down); + down = true; +#endif + socket.shutdown_input(); + socket.shutdown_output(); +} + +static inline seastar::future<> close_and_handle_errors(auto& out) { + return out.close().handle_exception_type([] (const std::system_error& e) { + if (e.code() != error::broken_pipe && + e.code() != error::connection_reset) { + logger().error("Socket::close(): unexpected error {}", e); + ceph_abort(); + } + // can happen when out is already shutdown, ignore + }); +} + +seastar::future<> Socket::close() { +#ifndef NDEBUG + ceph_assert(!closed); + closed = true; +#endif + return seastar::when_all_succeed( + in.close(), + close_and_handle_errors(out) + ).handle_exception([] (auto eptr) { + logger().error("Socket::close(): unexpected exception {}", eptr); + ceph_abort(); + }); +} + } // namespace ceph::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index a43a7f5a0bb..17ffb4c7b26 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -22,6 +22,11 @@ class Socket seastar::input_stream in; seastar::output_stream out; +#ifndef NDEBUG + bool down = false; + bool closed = false; +#endif + /// buffer state for read() struct { bufferlist buffer; @@ -39,6 +44,12 @@ class Socket // performance. see seastar::net::connected_socket::output() out(socket.output(65536)) {} + ~Socket() { +#ifndef NDEBUG + assert(closed); +#endif + } + Socket(Socket&& o) = delete; static seastar::future @@ -79,12 +90,20 @@ class Socket return out.write(std::move(buf)).then([this] { return out.flush(); }); } + // preemptively disable further reads or writes, can only be shutdown once. + void shutdown(); + /// Socket can only be closed once. - seastar::future<> close() { - return seastar::smp::submit_to(sid, [this] { - return seastar::when_all( - in.close(), out.close()).discard_result(); - }); + seastar::future<> close(); + + // shutdown input_stream only, for tests + void force_shutdown_in() { + socket.shutdown_input(); + } + + // shutdown output_stream only, for tests + void force_shutdown_out() { + socket.shutdown_output(); } }; diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index b73974561b1..cbdb4ec20d1 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -17,6 +17,7 @@ using seastar::future; using ceph::net::error; using ceph::net::Socket; using ceph::net::SocketFRef; +using ceph::net::stop_t; static seastar::logger logger{"test"}; @@ -180,6 +181,276 @@ future<> test_accept() { }); } +class SocketFactory final + : public SocketFactoryBase { + const seastar::shard_id target_shard; + seastar::promise socket_promise; + + future<> bind_accept() override { + return SocketFactoryBase::bind_accept(); + } + + future get_accepted() { + return socket_promise.get_future(); + } + + public: + SocketFactory(seastar::shard_id shard) : target_shard{shard} {} + + future<> handle_server_socket(SocketFRef&& socket) override { + return container().invoke_on(target_shard, + [socket = std::move(socket)] (auto& factory) mutable { + factory.socket_promise.set_value(std::move(socket)); + }); + } + + static future get_sockets() { + return ceph::net::create_sharded(seastar::engine().cpu_id() + ).then([] (SocketFactory* factory) { + return factory->bind_accept().then([factory] { + return connect(); + }).then([factory] (auto fp_client_socket) { + return factory->get_accepted( + ).then([fp_client_socket = std::move(fp_client_socket)] + (auto fp_server_socket) mutable { + return seastar::make_ready_future( + std::move(fp_client_socket), std::move(fp_server_socket)); + }); + }).finally([factory] { + return factory->shutdown(); + }); + }); + } +}; + +class Connection { + static const uint64_t DATA_TAIL = 5327; + static const unsigned DATA_SIZE = 4096; + std::array data = {0}; + + void verify_data_read(const uint64_t read_data[]) { + ceph_assert(read_data[0] == read_count); + ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL); + } + + SocketFRef socket; + uint64_t write_count = 0; + uint64_t read_count = 0; + + Connection(SocketFRef&& socket) : socket{std::move(socket)} { + data[DATA_SIZE - 1] = DATA_TAIL; + } + + future<> dispatch_write(unsigned round = 0, bool force_shut = false) { + return seastar::repeat([this, round, force_shut] { + if (round != 0 && round <= write_count) { + return seastar::futurize_apply([this, force_shut] { + if (force_shut) { + socket->force_shutdown_out(); + } + }).then([] { + return seastar::make_ready_future(stop_t::yes); + }); + } else { + data[0] = write_count; + return socket->write(seastar::net::packet( + reinterpret_cast(&data), sizeof(data)) + ).then([this] { + return socket->flush(); + }).then([this] { + write_count += 1; + return seastar::make_ready_future(stop_t::no); + }); + } + }); + } + + future<> dispatch_write_unbounded() { + return dispatch_write( + ).then([] { + ceph_abort(); + }).handle_exception_type([] (const std::system_error& e) { + if (e.code() != error::broken_pipe && + e.code() != error::connection_reset) { + logger.error("dispatch_write_unbounded(): " + "unexpected error {}", e); + throw; + } + // successful + logger.debug("dispatch_write_unbounded(): " + "expected error {}", e); + }); + } + + future<> dispatch_read(unsigned round = 0, bool force_shut = false) { + return seastar::repeat([this, round, force_shut] { + if (round != 0 && round <= read_count) { + return seastar::futurize_apply([this, force_shut] { + if (force_shut) { + socket->force_shutdown_in(); + } + }).then([] { + return seastar::make_ready_future(stop_t::yes); + }); + } else { + return seastar::futurize_apply([this] { + // we want to test both Socket::read() and Socket::read_exactly() + if (read_count % 2) { + return socket->read(DATA_SIZE * sizeof(uint64_t) + ).then([this] (ceph::bufferlist bl) { + uint64_t read_data[DATA_SIZE]; + auto p = bl.cbegin(); + ::ceph::decode_raw(read_data, p); + verify_data_read(read_data); + }); + } else { + return socket->read_exactly(DATA_SIZE * sizeof(uint64_t) + ).then([this] (auto buf) { + auto read_data = reinterpret_cast(buf.get()); + verify_data_read(read_data); + }); + } + }).then([this] { + ++read_count; + return seastar::make_ready_future(stop_t::no); + }); + } + }); + } + + future<> dispatch_read_unbounded() { + return dispatch_read( + ).then([] { + ceph_abort(); + }).handle_exception_type([] (const std::system_error& e) { + if (e.code() != error::read_eof + && e.code() != error::connection_reset) { + logger.error("dispatch_read_unbounded(): " + "unexpected error {}", e); + throw; + } + // successful + logger.debug("dispatch_read_unbounded(): " + "expected error {}", e); + }); + } + + void shutdown() { + socket->shutdown(); + } + + future<> close() { + return socket->close(); + } + + public: + static future<> dispatch_rw_bounded(SocketFRef&& socket, bool is_client, + unsigned round, bool force_shut = false) { + return seastar::smp::submit_to(is_client ? 0 : 1, + [socket = std::move(socket), round, force_shut] () mutable { + return seastar::do_with(Connection{std::move(socket)}, + [round, force_shut] (auto& conn) { + ceph_assert(round != 0); + return seastar::when_all_succeed( + conn.dispatch_write(round, force_shut), + conn.dispatch_read(round, force_shut) + ).finally([&conn] { + return conn.close(); + }); + }); + }).handle_exception([is_client] (auto eptr) { + logger.error("dispatch_rw_bounded(): {} got unexpected exception {}", + is_client ? "client" : "server", eptr); + ceph_abort(); + }); + } + + static future<> dispatch_rw_unbounded(SocketFRef&& socket, bool is_client, + bool preemptive_shut = false) { + return seastar::smp::submit_to(is_client ? 0 : 1, + [socket = std::move(socket), preemptive_shut, is_client] () mutable { + return seastar::do_with(Connection{std::move(socket)}, + [preemptive_shut, is_client] (auto& conn) { + return seastar::when_all_succeed( + conn.dispatch_write_unbounded(), + conn.dispatch_read_unbounded(), + seastar::futurize_apply([&conn, preemptive_shut] { + if (preemptive_shut) { + return seastar::sleep(100ms).then([&conn] { conn.shutdown(); }); + } else { + return seastar::now(); + } + }) + ).finally([&conn] { + return conn.close(); + }); + }); + }).handle_exception([is_client] (auto eptr) { + logger.error("dispatch_rw_unbounded(): {} got unexpected exception {}", + is_client ? "client" : "server", eptr); + ceph_abort(); + }); + } +}; + +future<> test_read_write() { + logger.info("test_read_write()..."); + return SocketFactory::get_sockets( + ).then([] (auto client_socket, auto server_socket) { + return seastar::when_all_succeed( + Connection::dispatch_rw_bounded(std::move(client_socket), true, 128), + Connection::dispatch_rw_bounded(std::move(server_socket), false, 128) + ); + }).handle_exception([] (auto eptr) { + logger.error("test_read_write() got unexpeted exception {}", eptr); + ceph_abort(); + }); +} + +future<> test_unexpected_down() { + logger.info("test_unexpected_down()..."); + return SocketFactory::get_sockets( + ).then([] (auto client_socket, auto server_socket) { + return seastar::when_all_succeed( + Connection::dispatch_rw_bounded(std::move(client_socket), true, 128, true), + Connection::dispatch_rw_unbounded(std::move(server_socket), false) + ); + }).handle_exception([] (auto eptr) { + logger.error("test_unexpected_down() got unexpeted exception {}", eptr); + ceph_abort(); + }); +} + +future<> test_shutdown_propagated() { + logger.info("test_shutdown_propagated()..."); + return SocketFactory::get_sockets( + ).then([] (auto client_socket, auto server_socket) { + client_socket->shutdown(); + return Connection::dispatch_rw_unbounded(std::move(server_socket), false + ).finally([client_socket = std::move(client_socket)] () mutable { + return client_socket->close( + ).finally([cleanup = std::move(client_socket)] {}); + }); + }).handle_exception([] (auto eptr) { + logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr); + ceph_abort(); + }); +} + +future<> test_preemptive_down() { + logger.info("test_preemptive_down()..."); + return SocketFactory::get_sockets( + ).then([] (auto client_socket, auto server_socket) { + return seastar::when_all_succeed( + Connection::dispatch_rw_unbounded(std::move(client_socket), true, true), + Connection::dispatch_rw_unbounded(std::move(server_socket), false) + ); + }).handle_exception([] (auto eptr) { + logger.error("test_preemptive_down() got unexpeted exception {}", eptr); + ceph_abort(); + }); +} + } int main(int argc, char** argv) @@ -190,6 +461,14 @@ int main(int argc, char** argv) return test_bind_same(); }).then([] { return test_accept(); + }).then([] { + return test_read_write(); + }).then([] { + return test_unexpected_down(); + }).then([] { + return test_shutdown_propagated(); + }).then([] { + return test_preemptive_down(); }).then([] { logger.info("All tests succeeded"); }).handle_exception([] (auto eptr) { -- 2.39.5