namespace {
+using seastar::engine;
using seastar::future;
using crimson::net::error;
+using crimson::net::FixedCPUServerSocket;
using crimson::net::Socket;
using crimson::net::SocketFRef;
+using crimson::net::SocketRef;
using crimson::net::stop_t;
static seastar::logger logger{"crimsontest"};
-
-template <typename ConcreteService>
-class SocketFactoryBase
- : public seastar::peering_sharded_service<ConcreteService> {
- static constexpr const char* server_addr = "127.0.0.1:9020";
-
- seastar::gate shutdown_gate;
- std::optional<seastar::server_socket> listener;
-
- public:
- virtual ~SocketFactoryBase() = default;
-
- virtual future<> bind_accept() {
- return this->container().invoke_on_all([] (auto& factory) {
- entity_addr_t addr;
- addr.parse(server_addr, nullptr);
- seastar::socket_address s_addr(addr.in4_addr());
- seastar::listen_options lo;
- lo.reuse_address = true;
- factory.listener = seastar::listen(s_addr, lo);
- }).then([this] {
- return this->container().invoke_on_all([] (auto& factory) {
- // gate accepting
- // SocketFactoryBase::shutdown() will drain the continuations in the gate
- // so ignore the returned future
- std::ignore = seastar::with_gate(factory.shutdown_gate, [&factory] {
- return seastar::keep_doing([&factory] {
- return Socket::accept(*factory.listener).then(
- [&factory] (SocketFRef socket,
- entity_addr_t peer_addr) {
- // gate socket dispatching
- std::ignore = seastar::with_gate(factory.shutdown_gate,
- [&factory, socket = std::move(socket)] () mutable {
- return factory.handle_server_socket(std::move(socket))
- .handle_exception([] (auto eptr) {
- logger.error("handle_server_socket():"
- "got unexpected exception {}", eptr);
- ceph_abort();
- });
- });
- });
- }).handle_exception_type([] (const std::system_error& e) {
- if (e.code() != error::connection_aborted &&
- e.code() != error::invalid_argument) {
- logger.error("accepting: got unexpected error {}", e);
- ceph_abort();
- }
- // successful
- }).handle_exception([] (auto eptr) {
- logger.error("accepting: got unexpected exception {}", eptr);
- ceph_abort();
- });
- });
- });
- });
- }
-
- future<> shutdown() {
- return this->container().invoke_on_all([] (auto& factory) {
- if (factory.listener) {
- factory.listener.value().abort_accept();
- }
- return factory.shutdown_gate.close();
- });
- }
-
- future<> stop() { return seastar::now(); }
-
- static future<SocketFRef> connect() {
- entity_addr_t addr;
- addr.parse(server_addr, nullptr);
- return Socket::connect(addr);
- }
-
- protected:
- virtual future<> handle_server_socket(SocketFRef&& socket) = 0;
-};
-
-class AcceptTest final
- : public SocketFactoryBase<AcceptTest> {
- public:
- future<> handle_server_socket(SocketFRef&& socket) override {
- return seastar::sleep(100ms
- ).then([socket = std::move(socket)] () mutable {
- return socket->close()
- .finally([socket = std::move(socket)] {});
- });
- }
-};
+static entity_addr_t server_addr = [] {
+ entity_addr_t saddr;
+ saddr.parse("127.0.0.1:9020", nullptr);
+ return saddr;
+} ();
+
+future<SocketRef> socket_connect() {
+ logger.debug("socket_connect()...");
+ return Socket::connect(server_addr).then([] (auto socket) {
+ logger.debug("socket_connect() connected");
+ return socket.release();
+ });
+}
future<> test_refused() {
logger.info("test_refused()...");
- return AcceptTest::connect().discard_result(
- ).then([] {
+ return socket_connect().discard_result().then([] {
ceph_abort_msg("connection is not refused");
}).handle_exception_type([] (const std::system_error& e) {
if (e.code() != error::connection_refused) {
logger.error("test_refused() got unexpeted error {}", e);
ceph_abort();
+ } else {
+ logger.info("test_refused() ok\n");
}
- // successful
}).handle_exception([] (auto eptr) {
logger.error("test_refused() got unexpeted exception {}", eptr);
ceph_abort();
future<> test_bind_same() {
logger.info("test_bind_same()...");
- return crimson::net::create_sharded<AcceptTest>().then(
- [] (AcceptTest* factory) {
- return factory->bind_accept().then([] {
+ return FixedCPUServerSocket::create().then([] (auto pss1) {
+ return pss1->listen(server_addr).then([] {
// try to bind the same address
- return crimson::net::create_sharded<AcceptTest>().then(
- [] (AcceptTest* factory2) {
- return factory2->bind_accept().then([] {
- ceph_abort_msg("bind should raise addr-in-use");
- return seastar::now();
- }).finally([factory2] {
- return factory2->shutdown();
+ return FixedCPUServerSocket::create().then([] (auto pss2) {
+ return pss2->listen(server_addr).then([] {
+ ceph_abort("Should raise address_in_use!");
+ }).handle_exception_type([] (const std::system_error& e) {
+ assert(e.code() == std::errc::address_in_use);
+ // successful!
+ }).finally([pss2] {
+ return pss2->destroy();
}).handle_exception_type([] (const std::system_error& e) {
if (e.code() != error::address_in_use) {
logger.error("test_bind_same() got unexpeted error {}", e);
ceph_abort();
+ } else {
+ logger.info("test_bind_same() ok\n");
}
- // successful
});
});
- }).finally([factory] {
- return factory->shutdown();
+ }).finally([pss1] {
+ return pss1->destroy();
}).handle_exception([] (auto eptr) {
logger.error("test_bind_same() got unexpeted exception {}", eptr);
ceph_abort();
future<> test_accept() {
logger.info("test_accept()");
- return crimson::net::create_sharded<AcceptTest>(
- ).then([] (AcceptTest* factory) {
- return factory->bind_accept().then([factory] {
+ return FixedCPUServerSocket::create().then([] (auto pss) {
+ return pss->listen(server_addr).then([pss] {
+ return pss->accept([] (auto socket, auto paddr) {
+ // simple accept
+ return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
+ return socket->close().finally([cleanup = std::move(socket)] {});
+ });
+ });
+ }).then([] {
return seastar::when_all(
- factory->connect().then([] (auto socket) {
+ socket_connect().then([] (auto socket) {
return socket->close().finally([cleanup = std::move(socket)] {}); }),
- factory->connect().then([] (auto socket) {
+ socket_connect().then([] (auto socket) {
return socket->close().finally([cleanup = std::move(socket)] {}); }),
- factory->connect().then([] (auto socket) {
+ socket_connect().then([] (auto socket) {
return socket->close().finally([cleanup = std::move(socket)] {}); })
).discard_result();
}).then([] {
// should be enough to be connected locally
return seastar::sleep(50ms);
- }).finally([factory] {
- return factory->shutdown();
+ }).then([] {
+ logger.info("test_accept() ok\n");
+ }).finally([pss] {
+ return pss->destroy();
}).handle_exception([] (auto eptr) {
logger.error("test_accept() got unexpeted exception {}", eptr);
ceph_abort();
});
}
-class SocketFactory final
- : public SocketFactoryBase<SocketFactory> {
- const seastar::shard_id target_shard;
- seastar::promise<SocketFRef> socket_promise;
-
- future<> bind_accept() override {
- return SocketFactoryBase<SocketFactory>::bind_accept();
- }
-
- future<SocketFRef> get_accepted() {
- return socket_promise.get_future();
- }
+class SocketFactory {
+ SocketRef client_socket;
+ SocketFRef server_socket;
+ FixedCPUServerSocket *pss = nullptr;
+ seastar::promise<> server_connected;
public:
- SocketFactory(seastar::shard_id shard) : target_shard{shard} {}
-
- future<> handle_server_socket(SocketFRef&& socket) override {
- return container().invoke_on(target_shard,
- [socket = std::move(socket)] (auto& factory) mutable {
- factory.socket_promise.set_value(std::move(socket));
- });
- }
-
- static future<tuple<SocketFRef, SocketFRef>> get_sockets() {
- return crimson::net::create_sharded<SocketFactory>(seastar::engine().cpu_id()
- ).then([] (SocketFactory* factory) {
- return factory->bind_accept().then([factory] {
- return connect();
- }).then([factory] (auto fp_client_socket) {
- return factory->get_accepted().then(
- [fp_client_socket = std::move(fp_client_socket)](auto fp_server_socket) mutable {
- return seastar::make_ready_future<tuple<SocketFRef, SocketFRef>>(
- std::make_tuple(std::move(fp_client_socket), std::move(fp_server_socket)));
- });
- }).finally([factory] {
- return factory->shutdown();
+ // cb_client() on CPU#0, cb_server() on CPU#1
+ template <typename FuncC, typename FuncS>
+ static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
+ assert(engine().cpu_id() == 0u);
+ auto owner = std::make_unique<SocketFactory>();
+ auto psf = owner.get();
+ return seastar::smp::submit_to(1u, [psf] {
+ return FixedCPUServerSocket::create().then([psf] (auto pss) {
+ psf->pss = pss;
+ return pss->listen(server_addr);
});
- });
+ }).then([psf] {
+ return seastar::when_all_succeed(
+ seastar::smp::submit_to(0u, [psf] {
+ return socket_connect().then([psf] (auto socket) {
+ psf->client_socket = std::move(socket);
+ });
+ }),
+ seastar::smp::submit_to(1u, [psf] {
+ return psf->pss->accept([psf] (auto socket, auto paddr) {
+ psf->server_socket = seastar::make_foreign(std::move(socket));
+ return seastar::smp::submit_to(0u, [psf] {
+ psf->server_connected.set_value();
+ });
+ });
+ })
+ );
+ }).then([psf] {
+ return psf->server_connected.get_future();
+ }).finally([psf] {
+ if (psf->pss) {
+ return seastar::smp::submit_to(1u, [psf] {
+ return psf->pss->destroy();
+ });
+ }
+ return seastar::now();
+ }).then([psf,
+ cb_client = std::move(cb_client),
+ cb_server = std::move(cb_server)] () mutable {
+ logger.debug("dispatch_sockets(): client/server socket are ready");
+ return seastar::when_all_succeed(
+ seastar::smp::submit_to(0u, [socket = psf->client_socket.get(),
+ cb_client = std::move(cb_client)] {
+ return cb_client(socket).finally([socket] {
+ logger.debug("closing client socket...");
+ return socket->close();
+ }).handle_exception([] (auto eptr) {
+ logger.error("dispatch_sockets():"
+ " cb_client() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ }),
+ seastar::smp::submit_to(1u, [socket = psf->server_socket.get(),
+ cb_server = std::move(cb_server)] {
+ return cb_server(socket).finally([socket] {
+ logger.debug("closing server socket...");
+ return socket->close();
+ }).handle_exception([] (auto eptr) {
+ logger.error("dispatch_sockets():"
+ " cb_server() got unexpeted exception {}", eptr);
+ ceph_abort();
+ });
+ })
+ );
+ }).finally([cleanup = std::move(owner)] {});
}
};
ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
}
- SocketFRef socket;
+ Socket* socket = nullptr;
uint64_t write_count = 0;
uint64_t read_count = 0;
- Connection(SocketFRef&& socket) : socket{std::move(socket)} {
+ Connection(Socket* socket) : socket{socket} {
+ assert(socket);
data[DATA_SIZE - 1] = DATA_TAIL;
}
future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
+ logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
return seastar::repeat([this, round, force_shut] {
if (round != 0 && round <= write_count) {
return seastar::futurize_apply([this, force_shut] {
if (force_shut) {
+ logger.debug("dispatch_write() done, force shutdown output");
socket->force_shutdown_out();
+ } else {
+ logger.debug("dispatch_write() done");
}
}).then([] {
return seastar::make_ready_future<stop_t>(stop_t::yes);
return dispatch_write(
).then([] {
ceph_abort();
- }).handle_exception_type([] (const std::system_error& e) {
+ }).handle_exception_type([this] (const std::system_error& e) {
if (e.code() != error::broken_pipe &&
e.code() != error::connection_reset) {
logger.error("dispatch_write_unbounded(): "
// successful
logger.debug("dispatch_write_unbounded(): "
"expected error {}", e);
+ shutdown();
});
}
future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
+ logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
return seastar::repeat([this, round, force_shut] {
if (round != 0 && round <= read_count) {
return seastar::futurize_apply([this, force_shut] {
if (force_shut) {
+ logger.debug("dispatch_read() done, force shutdown input");
socket->force_shutdown_in();
+ } else {
+ logger.debug("dispatch_read() done");
}
}).then([] {
return seastar::make_ready_future<stop_t>(stop_t::yes);
return dispatch_read(
).then([] {
ceph_abort();
- }).handle_exception_type([] (const std::system_error& e) {
+ }).handle_exception_type([this] (const std::system_error& e) {
if (e.code() != error::read_eof
&& e.code() != error::connection_reset) {
logger.error("dispatch_read_unbounded(): "
// successful
logger.debug("dispatch_read_unbounded(): "
"expected error {}", e);
+ shutdown();
});
}
socket->shutdown();
}
- future<> close() {
- return socket->close();
- }
-
public:
- static future<> dispatch_rw_bounded(SocketFRef&& socket, bool is_client,
- unsigned round, bool force_shut = false) {
- return seastar::smp::submit_to(is_client ? 0 : 1,
- [socket = std::move(socket), round, force_shut] () mutable {
- return seastar::do_with(Connection{std::move(socket)},
- [round, force_shut] (auto& conn) {
- ceph_assert(round != 0);
- return seastar::when_all_succeed(
- conn.dispatch_write(round, force_shut),
- conn.dispatch_read(round, force_shut)
- ).finally([&conn] {
- return conn.close();
- });
- });
- }).handle_exception([is_client] (auto eptr) {
- logger.error("dispatch_rw_bounded(): {} got unexpected exception {}",
- is_client ? "client" : "server", eptr);
- ceph_abort();
+ static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
+ bool force_shut = false) {
+ logger.debug("dispatch_rw_bounded(round={}, force_shut={})...",
+ round, force_shut);
+ return seastar::do_with(Connection{socket},
+ [round, force_shut] (auto& conn) {
+ ceph_assert(round != 0);
+ return seastar::when_all_succeed(
+ conn.dispatch_write(round, force_shut),
+ conn.dispatch_read(round, force_shut)
+ );
});
}
- static future<> dispatch_rw_unbounded(SocketFRef&& socket, bool is_client,
- bool preemptive_shut = false) {
- return seastar::smp::submit_to(is_client ? 0 : 1,
- [socket = std::move(socket), preemptive_shut, is_client] () mutable {
- return seastar::do_with(Connection{std::move(socket)},
- [preemptive_shut, is_client] (auto& conn) {
- return seastar::when_all_succeed(
- conn.dispatch_write_unbounded(),
- conn.dispatch_read_unbounded(),
- seastar::futurize_apply([&conn, preemptive_shut] {
- if (preemptive_shut) {
- return seastar::sleep(100ms).then([&conn] { conn.shutdown(); });
- } else {
- return seastar::now();
- }
- })
- ).finally([&conn] {
- return conn.close();
- });
- });
- }).handle_exception([is_client] (auto eptr) {
- logger.error("dispatch_rw_unbounded(): {} got unexpected exception {}",
- is_client ? "client" : "server", eptr);
- ceph_abort();
+ static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
+ logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
+ return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) {
+ return seastar::when_all_succeed(
+ conn.dispatch_write_unbounded(),
+ conn.dispatch_read_unbounded(),
+ seastar::futurize_apply([&conn, preemptive_shut] {
+ if (preemptive_shut) {
+ return seastar::sleep(100ms).then([&conn] {
+ logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
+ conn.shutdown();
+ });
+ } else {
+ return seastar::now();
+ }
+ })
+ );
});
}
};
future<> test_read_write() {
logger.info("test_read_write()...");
- return SocketFactory::get_sockets().then([] (auto sockets) {
- auto [client_socket, server_socket] = std::move(sockets);
- return seastar::when_all_succeed(
- Connection::dispatch_rw_bounded(std::move(client_socket), true, 128),
- Connection::dispatch_rw_bounded(std::move(server_socket), false, 128)
- );
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
+ [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
+ ).then([] {
+ logger.info("test_read_write() ok\n");
}).handle_exception([] (auto eptr) {
logger.error("test_read_write() got unexpeted exception {}", eptr);
ceph_abort();
future<> test_unexpected_down() {
logger.info("test_unexpected_down()...");
- return SocketFactory::get_sockets().then([] (auto sockets) {
- auto [client_socket, server_socket] = std::move(sockets);
- return seastar::when_all_succeed(
- Connection::dispatch_rw_bounded(std::move(client_socket), true, 128, true),
- Connection::dispatch_rw_unbounded(std::move(server_socket), false)
- );
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true); },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_unexpected_down() ok\n");
}).handle_exception([] (auto eptr) {
logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
ceph_abort();
future<> test_shutdown_propagated() {
logger.info("test_shutdown_propagated()...");
- return SocketFactory::get_sockets().then([] (auto sockets) {
- auto [client_socket, server_socket] = std::move(sockets);
- client_socket->shutdown();
- return Connection::dispatch_rw_unbounded(std::move(server_socket), false
- ).finally([client_socket = std::move(client_socket)] () mutable {
- return client_socket->close(
- ).finally([cleanup = std::move(client_socket)] {});
- });
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) {
+ logger.debug("test_shutdown_propagated() shutdown client socket");
+ cs->shutdown();
+ return seastar::now();
+ },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_shutdown_propagated() ok\n");
}).handle_exception([] (auto eptr) {
logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
ceph_abort();
future<> test_preemptive_down() {
logger.info("test_preemptive_down()...");
- return SocketFactory::get_sockets().then([] (auto sockets) {
- auto [client_socket, server_socket] = std::move(sockets);
- return seastar::when_all_succeed(
- Connection::dispatch_rw_unbounded(std::move(client_socket), true, true),
- Connection::dispatch_rw_unbounded(std::move(server_socket), false)
- );
+ return SocketFactory::dispatch_sockets(
+ [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
+ [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+ ).then([] {
+ logger.info("test_preemptive_down() ok\n");
}).handle_exception([] (auto eptr) {
logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
ceph_abort();
{
seastar::app_template app;
return app.run(argc, argv, [] {
- return test_refused().then([] {
+ return seastar::futurize_apply([] {
+ return test_refused();
+ }).then([] {
return test_bind_same();
}).then([] {
return test_accept();