#include <seastar/core/app-template.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/future-util.hh>
+#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/with_timeout.hh>
return saddr;
}
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [=] {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...
+ ).then([sharded_obj] {
+ seastar::engine().at_exit([sharded_obj] {
+ return sharded_obj->stop().then([sharded_obj] {});
+ });
+ return sharded_obj.get();
+ });
+ }).then([](seastar::sharded<T> *ptr_shard) {
+ return &ptr_shard->local();
+ });
+}
+
+class ShardedGates
+ : public seastar::peering_sharded_service<ShardedGates> {
+public:
+ ShardedGates() = default;
+ ~ShardedGates() {
+ assert(gate.is_closed());
+ }
+
+ template <typename Func>
+ void dispatch_in_background(const char *what, Func &&f) {
+ std::ignore = seastar::with_gate(
+ container().local().gate, std::forward<Func>(f)
+ ).handle_exception([what](std::exception_ptr eptr) {
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ logger().error("ShardedGates::dispatch_in_background: "
+ "{} got exxception {}", what, e.what());
+ }
+ });
+ }
+
+ seastar::future<> close() {
+ return container().invoke_on_all([](auto &local) {
+ return local.gate.close();
+ });
+ }
+
+ static seastar::future<ShardedGates*> create() {
+ return create_sharded<ShardedGates>();
+ }
+
+ // seastar::future<> stop() is intentially not implemented
+
+private:
+ seastar::gate gate;
+};
+
static seastar::future<> test_echo(unsigned rounds,
double keepalive_ratio)
{
struct test_state {
struct Server final
: public crimson::net::Dispatcher {
+ ShardedGates &gates;
crimson::net::MessengerRef msgr;
crimson::auth::DummyAuthClientServer dummy_auth;
+ Server(ShardedGates &gates) : gates{gates} {}
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef conn,
+ seastar::shard_id new_shard,
+ bool is_replace) override {
+ logger().info("server accepted {}", *conn);
+ ceph_assert(new_shard == seastar::this_shard_id());
+ ceph_assert(!is_replace);
+ }
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
if (verbose) {
logger().info("server got {}", *m);
}
// reply with a pong
- std::ignore = c->send(crimson::make_message<MPing>());
+ gates.dispatch_in_background("echo_send_pong", [c] {
+ return c->send(crimson::make_message<MPing>());
+ });
return {seastar::now()};
}
const uint64_t nonce,
const entity_addr_t& addr) {
msgr = crimson::net::Messenger::create(
- name, lname, nonce, true);
+ name, lname, nonce, false);
msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0));
msgr->set_auth_client(&dummy_auth);
msgr->set_auth_server(&dummy_auth);
logger().info("test_echo(rounds={}, keepalive_ratio={}):",
rounds, keepalive_ratio);
- auto server1 = seastar::make_shared<test_state::Server>();
- auto server2 = seastar::make_shared<test_state::Server>();
- auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
- auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
- // start servers and clients
- auto addr1 = get_server_addr();
- auto addr2 = get_server_addr();
- addr1.set_type(entity_addr_t::TYPE_MSGR2);
- addr2.set_type(entity_addr_t::TYPE_MSGR2);
- return seastar::when_all_succeed(
- server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
- server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
- client1->init(entity_name_t::OSD(2), "client1", 3),
- client2->init(entity_name_t::OSD(3), "client2", 4)
- // dispatch pingpoing
- ).then_unpack([client1, client2, server1, server2] {
+ return ShardedGates::create(
+ ).then([rounds, keepalive_ratio](auto *gates) {
+ auto server1 = seastar::make_shared<test_state::Server>(*gates);
+ auto server2 = seastar::make_shared<test_state::Server>(*gates);
+ auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+ auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
+ // start servers and clients
+ auto addr1 = get_server_addr();
+ auto addr2 = get_server_addr();
+ addr1.set_type(entity_addr_t::TYPE_MSGR2);
+ addr2.set_type(entity_addr_t::TYPE_MSGR2);
return seastar::when_all_succeed(
- // test connecting in parallel, accepting in parallel
- client1->dispatch_pingpong(server2->msgr->get_myaddr()),
- client2->dispatch_pingpong(server1->msgr->get_myaddr()));
- // shutdown
- }).then_unpack([] {
- return seastar::now();
- }).then([client1] {
- logger().info("client1 shutdown...");
- return client1->shutdown();
- }).then([client2] {
- logger().info("client2 shutdown...");
- return client2->shutdown();
- }).then([server1] {
- logger().info("server1 shutdown...");
- return server1->shutdown();
- }).then([server2] {
- logger().info("server2 shutdown...");
- return server2->shutdown();
- }).then([] {
- logger().info("test_echo() done!\n");
- }).handle_exception([server1, server2, client1, client2] (auto eptr) {
- logger().error("test_echo() failed: got exception {}", eptr);
- throw;
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4)
+ // dispatch pingpoing
+ ).then_unpack([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel
+ client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+ // shutdown
+ }).then_unpack([] {
+ return seastar::now();
+ }).then([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).then([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).then([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).then([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
+ }).then([] {
+ logger().info("test_echo() done!\n");
+ }).handle_exception([server1, server2, client1, client2] (auto eptr) {
+ logger().error("test_echo() failed: got exception {}", eptr);
+ throw;
+ }).finally([gates] {
+ return gates->close();
+ });
});
}