#include "client.h"
#include <seastar/core/sleep.hh>
+#include <seastar/util/defer.hh>
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
co_await gates.close_all();
}
+seastar::future<> Client::send(MessageURef msg)
+{
+ LOG_PREFIX(Client::send);
+ DEBUGDPP("{}", *this, *msg);
+ if (!conn_lock.try_lock_shared()) {
+ WARNDPP("ongoing reconnect, report skipped", *this, *msg);
+ co_return;
+ }
+ auto unlocker = seastar::defer([this] {
+ conn_lock.unlock_shared();
+ });
+ if (!conn) {
+ WARNDPP("no conn available, report skipped", *this, *msg);
+ co_return;
+ }
+ DEBUGDPP("sending {}", *this, *msg);
+ co_await conn->send(std::move(msg));
+}
+
std::optional<seastar::future<>>
Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
m->daemon_name = local_conf()->name.get_id();
local_conf().get_config_bl(0, &m->config_bl, &last_config_bl_version);
local_conf().get_defaults_bl(&m->config_defaults_bl);
- return conn->send(std::move(m));
+ return send(std::move(m));
} else {
DEBUGDPP("connection changed", *this);
return seastar::now();
{
LOG_PREFIX(Client::reconnect);
DEBUGDPP("", *this);
+ co_await conn_lock.lock();
+ auto unlocker = seastar::defer([this] {
+ conn_lock.unlock();
+ });
if (conn) {
DEBUGDPP("marking down", *this);
conn->mark_down();
_send_report();
gates.dispatch_in_background(__func__, *this, [this, FNAME] {
DEBUGDPP("dispatching in background", *this);
- if (!conn) {
- WARNDPP("no conn available; report skipped", *this);
- return seastar::now();
- }
return with_stats.get_stats(
- ).then([this, FNAME](auto &&pg_stats) {
- if (!conn) {
- WARNDPP("no conn available; before sending stats, report skipped", *this);
- return seastar::now();
- }
- return conn->send(std::move(pg_stats));
+ ).then([this](auto &&pg_stats) {
+ return send(std::move(pg_stats));
});
});
}
DEBUGDPP("", *this);
gates.dispatch_in_background(__func__, *this, [this, FNAME] {
DEBUGDPP("dispatching in background", *this);
- if (!conn) {
- WARNDPP("cannot send report; no conn available", *this);
- return seastar::now();
- }
auto report = make_message<MMgrReport>();
// Adding empty information since we don't support perfcounters yet
report->undeclare_types.emplace_back();
return get_perf_report_cb(
).then([report=std::move(report), this](auto payload) mutable {
report->metric_report_message = MetricReportMessage(std::move(payload));
- return conn->send(std::move(report));
+ return send(std::move(report));
});
}
- return conn->send(std::move(report));
+ return send(std::move(report));
});
}
#pragma once
#include <seastar/core/timer.hh>
+#include <seastar/core/shared_mutex.hh>
#include "crimson/common/gated.h"
#include "crimson/net/Dispatcher.h"
get_perf_report_cb_t cb_get);
seastar::future<> start();
seastar::future<> stop();
+ seastar::future<> send(MessageURef msg);
void report();
void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
crimson::net::Messenger& msgr;
WithStats& with_stats;
crimson::net::ConnectionRef conn;
+ seastar::shared_mutex conn_lock;
seastar::timer<seastar::lowres_clock> report_timer;
crimson::common::gate_per_shard gates;
uint64_t last_config_bl_version = 0;