+#include "common/ceph_time.h"
#include "messages/MPing.h"
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Server> {
ceph::net::Messenger *msgr = nullptr;
+ MessageRef msg_pong{new MPing(), false};
Dispatcher* get_local_shard() override {
return &(container().local());
logger().info("server got {}", *m);
}
// reply with a pong
- return c->send(MessageRef{new MPing(), false});
+ return c->send(msg_pong);
}
seastar::future<> init(const entity_name_t& name,
struct PingSession : public seastar::enable_shared_from_this<PingSession> {
unsigned count = 0u;
+ mono_time connected_time;
+ mono_time finish_time;
};
using PingSessionRef = seastar::shared_ptr<PingSession>;
ceph::net::Messenger *msgr = nullptr;
std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+ MessageRef msg_ping{new MPing(), false};
Client(unsigned rounds, double keepalive_ratio)
: rounds(rounds),
keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+
+ PingSessionRef find_session(ceph::net::ConnectionRef c) {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ return found->second;
+ }
+
Dispatcher* get_local_shard() override {
return &(container().local());
}
auto [i, added] = sessions.emplace(conn, session);
std::ignore = i;
ceph_assert(added);
- return container().invoke_on_all([conn = conn.get()](auto& client) {
- auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
- std::ignore = i;
- ceph_assert(added);
- });
+ session->connected_time = mono_clock::now();
+ return seastar::now();
}
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
- auto found = sessions.find(c);
- if (found == sessions.end()) {
- ceph_assert(false);
- }
- auto session = found->second;
+ auto session = find_session(c);
++(session->count);
if (verbose) {
logger().info("client ms_dispatch {}", session->count);
if (session->count == rounds) {
logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+ session->finish_time = mono_clock::now();
return container().invoke_on_all([conn = c.get()](auto &client) {
auto found = client.pending_conns.find(conn);
ceph_assert(found != client.pending_conns.end());
}
seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+ mono_time start_time = mono_clock::now();
return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
- .then([this, foreign_dispatch](auto conn) {
- if (foreign_dispatch) {
- return do_dispatch_pingpong(&**conn)
- .finally([this, conn] {});
- } else {
- // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
- return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
- return client.do_dispatch_pingpong(conn);
- }).finally([this, conn] {});
- }
+ .then([this, foreign_dispatch, start_time](auto conn) {
+ return seastar::futurize_apply([this, conn, foreign_dispatch] {
+ if (foreign_dispatch) {
+ return do_dispatch_pingpong(&**conn);
+ } else {
+ // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+ return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+ return client.do_dispatch_pingpong(conn);
+ });
+ }
+ }).finally([this, conn, start_time] {
+ return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
+ auto session = client.find_session((*conn)->shared_from_this());
+ std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+ std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+ logger().info("{}: handshake {}, pingpong {}",
+ **conn, dur_handshake.count(), dur_pingpong.count());
+ });
+ });
});
}
private:
seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
- return seastar::do_with(0u, 0u,
- [this, conn](auto &count_ping, auto &count_keepalive) {
- return seastar::do_until(
- [this, conn, &count_ping, &count_keepalive] {
- bool stop = (count_ping == rounds);
- if (stop) {
- logger().info("{}: finished sending {} pings with {} keepalives",
- *conn, count_ping, count_keepalive);
- }
- return stop;
- },
- [this, conn, &count_ping, &count_keepalive] {
- return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
- if (keepalive_dist(rng)) {
- count_keepalive += 1;
- return conn->keepalive()
- .then([&count_keepalive] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- } else {
- count_ping += 1;
- return conn->send(MessageRef{new MPing(), false})
- .then([] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
+ return container().invoke_on_all([conn](auto& client) {
+ auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ }).then([this, conn] {
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
}
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ count_keepalive += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ return conn->send(msg_ping)
+ .then([&count_ping] {
+ count_ping += 1;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ return found->second.get_future();
});
- }).then([this, conn] {
- auto found = pending_conns.find(conn);
- if (found == pending_conns.end())
- throw std::runtime_error{"Not connected."};
- return found->second.get_future();
});
});
}