}
};
- struct Client final
- : public crimson::net::Dispatcher {
+ class Client final
+ : public crimson::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ public:
+ Client(seastar::shard_id primary_sid,
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates)
+ : primary_sid{primary_sid},
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}),
+ rounds(rounds),
+ gates{*gates} {}
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ assert(seastar::this_shard_id() == primary_sid);
+ msgr = crimson::net::Messenger::create(
+ name, lname, nonce, false);
+ msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
+ msgr->set_auth_client(&dummy_auth);
+ msgr->set_auth_server(&dummy_auth);
+ return msgr->start({this});
+ }
+
+ seastar::future<> shutdown() {
+ assert(seastar::this_shard_id() == primary_sid);
+ ceph_assert(msgr);
+ msgr->stop();
+ return msgr->shutdown();
+ }
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
+ assert(seastar::this_shard_id() == primary_sid);
+ mono_time start_time = mono_clock::now();
+ auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
+ return seastar::futurize_invoke([this, conn] {
+ return do_dispatch_pingpong(conn);
+ }).then([] {
+ // 500ms should be enough to establish the connection
+ return seastar::sleep(500ms);
+ }).then([this, conn, start_time] {
+ return container().invoke_on(
+ conn->get_shard_id(),
+ [pconn=&*conn, start_time](auto &local) {
+ assert(pconn->is_connected());
+ auto session = local.find_session(pconn);
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ *pconn, dur_handshake.count(), dur_pingpong.count());
+ }).then([conn] {});
+ });
+ }
+
+ static seastar::future<Client*> create(
+ unsigned rounds,
+ double keepalive_ratio,
+ ShardedGates *gates) {
+ return create_sharded<Client>(
+ seastar::this_shard_id(),
+ rounds,
+ keepalive_ratio,
+ gates);
+ }
+
+ private:
struct PingSession : public seastar::enable_shared_from_this<PingSession> {
unsigned count = 0u;
mono_time connected_time;
};
using PingSessionRef = seastar::shared_ptr<PingSession>;
- unsigned rounds;
- std::bernoulli_distribution keepalive_dist;
- crimson::net::MessengerRef msgr;
- std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
- std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
- crimson::auth::DummyAuthClientServer dummy_auth;
-
- Client(unsigned rounds, double keepalive_ratio)
- : rounds(rounds),
- keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
-
- PingSessionRef find_session(crimson::net::ConnectionRef c) {
- auto found = sessions.find(c);
- if (found == sessions.end()) {
- ceph_assert(false);
- }
- return found->second;
- }
-
void ms_handle_connect(
crimson::net::ConnectionRef conn,
seastar::shard_id prv_shard) override {
+ auto &local = container().local();
assert(prv_shard == seastar::this_shard_id());
auto session = seastar::make_shared<PingSession>();
- auto [i, added] = sessions.emplace(conn, session);
+ auto [i, added] = local.sessions.emplace(&*conn, session);
std::ignore = i;
ceph_assert(added);
session->connected_time = mono_clock::now();
}
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
- auto session = find_session(c);
+ auto &local = container().local();
+ auto session = local.find_session(&*c);
++(session->count);
if (verbose) {
logger().info("client ms_dispatch {}", session->count);
if (session->count == rounds) {
logger().info("{}: finished receiving {} pongs", *c, session->count);
session->finish_time = mono_clock::now();
- auto found = pending_conns.find(c);
- ceph_assert(found != pending_conns.end());
- found->second.set_value();
+ gates.dispatch_in_background("echo_notify_done", [c, this] {
+ return container().invoke_on(primary_sid, [pconn=&*c](auto &local) {
+ auto found = local.pending_conns.find(pconn);
+ ceph_assert(found != local.pending_conns.end());
+ found->second.set_value();
+ }).then([c] {});
+ });
}
return {seastar::now()};
}
- seastar::future<> init(const entity_name_t& name,
- const std::string& lname,
- const uint64_t nonce) {
- msgr = crimson::net::Messenger::create(
- name, lname, nonce, true);
- msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0));
- msgr->set_auth_client(&dummy_auth);
- msgr->set_auth_server(&dummy_auth);
- return msgr->start({this});
- }
-
- seastar::future<> shutdown() {
- ceph_assert(msgr);
- msgr->stop();
- return msgr->shutdown();
- }
-
- seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr) {
- mono_time start_time = mono_clock::now();
- auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
- return seastar::futurize_invoke([this, conn] {
- return do_dispatch_pingpong(conn);
- }).then([this, conn, start_time] {
- auto session = find_session(conn);
- std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
- std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
- logger().info("{}: handshake {}, pingpong {}",
- *conn, dur_handshake.count(), dur_pingpong.count());
- });
+ PingSessionRef find_session(crimson::net::Connection *c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
}
- private:
seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
- auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
+ auto [i, added] = pending_conns.emplace(&*conn, seastar::promise<>());
std::ignore = i;
ceph_assert(added);
return seastar::do_with(0u, 0u,
},
[this, conn, &count_ping, &count_keepalive] {
return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
- if (keepalive_dist(rng)) {
- return conn->send_keepalive()
- .then([&count_keepalive] {
- count_keepalive += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- return conn->send(crimson::make_message<MPing>())
- .then([&count_ping] {
- count_ping += 1;
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
- }
- });
+ if (keepalive_dist(rng)) {
+ return conn->send_keepalive(
+ ).then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(crimson::make_message<MPing>()
+ ).then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
}).then([this, conn] {
- auto found = pending_conns.find(conn);
+ auto found = pending_conns.find(&*conn);
+ assert(found != pending_conns.end());
return found->second.get_future();
}
);
});
}
+
+ private:
+ // primary shard only
+ const seastar::shard_id primary_sid;
+ std::bernoulli_distribution keepalive_dist;
+ crimson::net::MessengerRef msgr;
+ std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
+ crimson::auth::DummyAuthClientServer dummy_auth;
+
+ // per shard
+ const unsigned rounds;
+ std::map<crimson::net::Connection*, PingSessionRef> sessions;
+ ShardedGates &gates;
};
};
rounds, keepalive_ratio);
return ShardedGates::create(
).then([rounds, keepalive_ratio](auto *gates) {
+ return seastar::when_all_succeed(
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ test_state::Client::create(rounds, keepalive_ratio, gates),
+ seastar::make_ready_future<ShardedGates*>(gates));
+ }).then_unpack([](auto *client1, auto *client2, auto *gates) {
auto server1 = seastar::make_shared<test_state::Server>(*gates);
auto server2 = seastar::make_shared<test_state::Server>(*gates);
- auto client1 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
- auto client2 = seastar::make_shared<test_state::Client>(rounds, keepalive_ratio);
// start servers and clients
auto addr1 = get_server_addr();
auto addr2 = get_server_addr();
// dispatch pingpoing
).then_unpack([client1, client2, server1, server2] {
return seastar::when_all_succeed(
- // test connecting in parallel, accepting in parallel
- client1->dispatch_pingpong(server2->msgr->get_myaddr()),
- client2->dispatch_pingpong(server1->msgr->get_myaddr()));
+ // test connecting in parallel, accepting in parallel
+ client1->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client1->dispatch_pingpong(server2->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr()),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr()));
// shutdown
- }).then_unpack([] {
- return seastar::now();
- }).then([client1] {
+ }).then_unpack([client1] {
logger().info("client1 shutdown...");
return client1->shutdown();
}).then([client2] {
return server2->shutdown();
}).then([] {
logger().info("test_echo() done!\n");
- }).handle_exception([server1, server2, client1, client2] (auto eptr) {
+ }).handle_exception([](auto eptr) {
logger().error("test_echo() failed: got exception {}", eptr);
throw;
- }).finally([gates] {
+ }).finally([gates, server1, server2] {
return gates->close();
});
});