#include "Message.h"
#include "Dispatcher.h"
#include "Policy.h"
+#include "common/Formatter.h"
#include "common/Throttle.h"
#include "include/Context.h"
#include "include/types.h"
* @} // Startup/Shutdown
*/
+ virtual void dump(Formatter* f) = 0;
+
/**
* @defgroup Messaging
* @{
*
*/
+#include <fmt/core.h>
#include <unistd.h>
+#include "common/ceph_strings.h"
+#include "common/ceph_time.h"
+#include "common/iso_8601.h"
+#include "common/tcp_info.h"
#include "include/Context.h"
+#include "include/msgr.h"
#include "include/random.h"
#include "common/errno.h"
#include "AsyncMessenger.h"
}
}
}
+
+void AsyncConnection::dump(Formatter *f) {
+ std::lock_guard<std::mutex> l(lock);
+
+ f->open_object_section("async_connection");
+ f->dump_string("state", get_state_name(state));
+ f->dump_unsigned("messenger_nonce", async_msgr->get_nonce());
+
+ f->open_object_section("status");
+ f->dump_bool("connected", is_connected());
+ f->dump_bool("loopback", is_loopback);
+ f->close_section(); // status
+
+ if (cs) {
+ f->dump_int("socket_fd", cs.fd());
+ if (!dump_tcp_info(cs.fd(), f)) {
+ f->dump_null("tcp_info");
+ }
+ } else {
+ f->dump_null("socket_fd");
+ f->dump_null("tcp_info");
+ }
+ f->dump_int("conn_id", conn_id);
+
+ f->open_object_section("peer");
+ f->dump_object("entity_name", get_peer_entity_name());
+ f->dump_string("type", ceph_entity_type_name(get_peer_type()));
+ f->dump_int("id", get_peer_id());
+ f->dump_int("global_id", get_peer_global_id());
+ f->open_object_section("addr");
+ peer_addrs->dump(f);
+ f->close_section(); // addr
+ f->close_section(); // peer
+
+ f->dump_string("last_connect_started_ago",
+ ceph::timespan_str(ceph::coarse_mono_clock::now() -
+ last_connect_started));
+ f->dump_string(
+ "last_active_ago",
+ fmt::format("{}", ceph::timespan_str(ceph::coarse_mono_clock::now() -
+ last_active)));
+ f->dump_string("recv_start_time_ago",
+ fmt::format("{}", ceph::timespan_str(ceph::mono_clock::now() -
+ recv_start_time)));
+ f->dump_unsigned("last_tick_id", last_tick_id);
+ f->dump_object("socket_addr", socket_addr);
+ f->dump_object("target_addr", target_addr);
+ f->dump_int("port", port);
+ f->open_object_section("protocol");
+ if (protocol) {
+ protocol->dump(f);
+ } else {
+ f->dump_null("null");
+ }
+ f->close_section(); // protocol
+ f->dump_int("worker_id", worker ? worker->id : -1);
+ f->close_section(); // async_connection
+}
bool is_msgr2() const override;
+ void dump(Formatter* f);
+
friend class Protocol;
friend class ProtocolV1;
friend class ProtocolV2;
#include <fstream>
#include "AsyncMessenger.h"
+#include <utime.h>
+#include "common/Clock.h"
+#include "common/Formatter.h"
#include "common/config.h"
#include "common/Timer.h"
#include "common/errno.h"
return 0;
}
+void AsyncMessenger::dump(Formatter* f) {
+ std::lock_guard l{lock};
+ f->dump_unsigned("nonce", nonce);
+ f->open_object_section("my_name");
+ my_name.dump(f);
+ f->close_section(); // my_name
+ f->open_object_section("my_addrs");
+ my_addrs->dump(f);
+ f->close_section(); // my_addrs
+
+ f->open_array_section("listen_sockets");
+ for (const auto& proc : processors) {
+ for (const auto& sock : proc->listen_sockets) {
+ f->open_object_section("socket");
+ f->dump_int("socket_fd", sock.fd());
+
+ f->dump_int("worker_id", proc->worker ? proc->worker->id : -1);
+ f->close_section(); // socket
+ }
+ }
+ f->close_section(); // listen_sockets
+
+ f->open_object_section("dispatch_queue");
+ f->dump_int("length", get_dispatch_queue_len());
+ utime_t dispatch_queue_max_age;
+ dispatch_queue_max_age.set_from_double(
+ get_dispatch_queue_max_age(ceph_clock_now()));
+ f->dump_string("max_age_ago", utimespan_str(dispatch_queue_max_age));
+ f->close_section(); // dispatch_queue
+
+ f->dump_int("connections_count", conns.size());
+
+ f->open_array_section("connections");
+ for (const auto& [e, c] : conns) {
+ f->open_object_section("connection");
+ e.dump(f);
+ c->dump(f);
+ f->close_section(); // connection
+ }
+ f->close_section(); // connections
+
+ f->open_array_section("anon_conns");
+ for (const auto& c : anon_conns) {
+ c->dump(f);
+ }
+ f->close_section(); // anon_conns
+
+ f->open_array_section("accepting_conns");
+ for (const auto& c : accepting_conns) {
+ c->dump(f);
+ }
+ f->close_section();
+
+ f->open_array_section("deleted_conns");
+ for (const auto& c : deleted_conns) {
+ c->dump(f);
+ }
+ f->close_section(); // deleted_conns
+
+ if (local_connection) {
+ f->open_array_section("local_connection");
+ local_connection->dump(f);
+ f->close_section(); // local_connection
+ }
+}
+
int AsyncMessenger::bind(const entity_addr_t &bind_addr,
std::optional<entity_addrvec_t> public_addrs)
{
entity_addrvec_t* bound_addrs);
void start();
void accept();
+ friend class AsyncMessenger;
};
/*
void wait() override;
int shutdown() override;
+ void dump(Formatter* f) override;
+
/** @} // Startup/Shutdown */
/**