m_client->_kick_stale_sessions();
else if (command == "status")
m_client->dump_status(f);
+ else if (command == "dump_subvolume_metrics")
+ m_client->dump_subvolume_metrics(f);
else
ceph_abort_msg("bad command registered");
}
async_ino_releasor(m->cct),
objecter_finisher(m->cct),
m_command_hook(this),
- fscid(0)
+ fscid(0),
+ subvolume_tracker{std::make_unique<SubvolumeMetricTracker>(cct, whoami)}
{
/* We only use the locale for normalization/case folding. That is unaffected
* by the locale but required by the API.
}
}
+void Client::dump_subvolume_metrics(Formatter* f) {
+ subvolume_tracker->dump(f);
+}
+
void Client::_pre_init()
{
timer.init();
lderr(cct) << "error registering admin socket command: "
<< cpp_strerror(-ret) << dendl;
}
+ ret = admin_socket->register_command("dump_subvolume_metrics_aggr",
+ &m_command_hook,
+ "dump aggregated subvolume metrics");
+ if (ret < 0) {
+ lderr(cct) << "error registering admin socket command: "
+ << cpp_strerror(-ret) << dendl;
+ }
}
void Client::shutdown()
logger->set(l_c_wr_sqsum, n_sqsum);
logger->set(l_c_wr_ops, nr_write_request);
}
-
// ===================
// metadata cache stuff
in = add_update_inode(&ist, request->sent_stamp, session,
request->perms);
+ if (ist.subvolume_id) {
+ ldout(cct, 20) << __func__ << " subv_metric adding " << in->ino << "-" << ist.subvolume_id << dendl;
+ subvolume_tracker->add_inode(in->ino, ist.subvolume_id);
+ }
}
Inode *diri = NULL;
*pdirbl = reply->get_extra_bl();
// -- log times --
- utime_t lat = ceph_clock_now();
+ utime_t lat = mono_clock_now();
lat -= request->sent_stamp;
ldout(cct, 20) << "lat " << lat << dendl;
got_mds_push(session.get());
+ // check whether the current inode is under subvolume for metrics collection
+ if (m->subvolume_id > 0) {
+ ldout(cct, 10) << __func__ << " adding " << m->get_ino() << " to subvolume tracker " << m->subvolume_id << dendl;
+ subvolume_tracker->add_inode(m->get_ino(), m->subvolume_id);
+ }
+
bool do_cap_release = false;
Inode *in;
vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
for (auto &p : mds_sessions) {
auto session = p.second;
if (session->release && mdsmap->is_clientreplay_or_active_or_stopping(
- p.first)) {
+ p.first)) {
nr_caps += session->release->caps.size();
+ for (const auto &cap: session->release->caps) {
+ ldout(cct, 10) << __func__ << " removing " << static_cast<inodeno_t>(cap.ino) <<
+ " from subvolume tracker" << dendl;
+ subvolume_tracker->remove_inode(static_cast<inodeno_t>(cap.ino));
+ }
if (cct->_conf->client_inject_release_failure) {
ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
} else {
message.push_back(metric);
}
+ // subvolume metrics
+ if (_collect_and_send_global_metrics ||
+ session->mds_metric_flags.test(CLIENT_METRIC_TYPE_SUBVOLUME_METRICS)) {
+ auto metrics = subvolume_tracker->aggregate(true);
+ if (!metrics.empty()) {
+ for (auto m: metrics)
+ ldout(cct, 20) << " sending subv_metric " << m << dendl;
+ metric = ClientMetricMessage(SubvolumeMetricsPayload(metrics));
+ message.push_back(metric);
+ }
+ }
+
session->con->send_message2(make_message<MClientMetrics>(std::move(message)));
}
f->pos = offset + r;
}
- lat = ceph_clock_now();
+ lat = mono_clock_now();
lat -= start;
+ clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, static_cast<uint32_t>(r)});
++clnt->nr_read_request;
clnt->update_io_stat_read(lat);
}
success:
r = read;
-
+ clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric(false, mono_clock_now() - start_time, r));
error:
onfinish->complete(r);
const auto& conf = cct->_conf;
Inode *in = f->inode.get();
utime_t lat;
- utime_t start = ceph_clock_now();
+ utime_t start = mono_clock_now();
CRF_iofinish *crf_iofinish = nullptr;
if ((f->mode & CEPH_FILE_MODE_RD) == 0)
conf->client_oc &&
(have & (CEPH_CAP_FILE_CACHE |
CEPH_CAP_FILE_LAZYIO)),
- have, movepos, start, f, in, f->pos, offset, size));
+ have, movepos, f, in, f->pos, offset, size));
crf_iofinish->CRF = crf.get();
}
f->pos = start_pos + rc;
}
- lat = ceph_clock_now();
+ lat = mono_clock_now();
lat -= start;
-
+ subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, bl->length()});
++nr_read_request;
update_io_stat_read(lat);
}
Client::C_Readahead::C_Readahead(Client *c, Fh *f) :
- client(c), f(f) {
+ client(c), f(f), start_time(mono_clock_now()) {
f->get();
f->readahead.inc_pending();
}
client->put_cap_ref(f->inode.get(), CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
if (r > 0) {
client->update_read_io_size(r);
+ client->subvolume_tracker->add_metric(f->inode->ino, SimpleIOMetric(false, mono_clock_now()-start_time, r));
}
}
io_finish.reset(io_finish_cond);
}
+ auto start_time = mono_clock_now();
r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
off, len, bl, 0, io_finish.get());
client_lock.lock();
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
update_read_io_size(bl->length());
+ subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, mono_clock_now() - start_time, bl->length()});
} else {
put_cap_ref(in, CEPH_CAP_FILE_CACHE);
}
update_write_io_size(size);
// time
- lat = ceph_clock_now();
+ lat = mono_clock_now();
lat -= start;
+ subvolume_tracker->add_metric(in->ino, SimpleIOMetric{true, lat, static_cast<uint32_t>(size)});
++nr_write_request;
update_io_stat_write(lat);
(uint64_t)(offset+size) > in->size ) { //exceeds filesize
return -EFBIG;
}
- //ldout(cct, 7) << "write fh " << fh << " size " << size << " offset " << offset << dendl;
+ ldout(cct, 7) << "_write fh " << f << " size " << size << " offset " << offset << dendl;
if (objecter->osdmap_pool_full(in->layout.pool_id)) {
return -ENOSPC;
ldout(cct, 10) << "cur file size is " << in->size << dendl;
// time it.
- utime_t start = ceph_clock_now();
+ utime_t start = mono_clock_now();
if (in->inline_version == 0) {
int r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
cct->_conf->client_oc &&
(have & (CEPH_CAP_FILE_BUFFER |
CEPH_CAP_FILE_LAZYIO)),
- start, f, in, fpos, offset, size,
+ f, in, fpos, offset, size,
do_fsync, syncdataonly));
cwf_iofinish->CWF = cwf.get();
get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
// async, caching, non-blocking.
+ ldout(cct, 10) << " _write_oc " << dendl;
r = objectcacher->file_write(&in->oset, &in->layout,
in->snaprealm->get_snap_context(),
offset, size, bl, ceph::real_clock::now(),
}
// allow caller to wait on onfinish...
+ ldout(cct, 10) << " _write_oc_1" << dendl;
return 0;
}
sleep(delay);
client_lock.lock();
}
+
+ ldout(cct, 10) << " _write_filer" << dendl;
filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, ceph::real_clock::now(), 0,
in->truncate_size, in->truncate_seq,
cwf.release();
// allow caller to wait on onfinish...
+ ldout(cct, 10) << " _write_filer_2" << dendl;
return 0;
}
success:
// do not get here if non-blocking caller (onfinish != nullptr)
+ ldout(cct, 10) << " _write_filer_succeess" << dendl;
r = _write_success(f, start, fpos, offset, size, in);
if (r >= 0 && do_fsync) {
utime_t lat;
- lat = ceph_clock_now();
+ lat = mono_clock_now();
lat -= start;
clnt->logger->tinc(l_c_fsync, lat);
ceph_tid_t flush_tid = 0;
InodeRef tmp_ref;
utime_t lat;
- utime_t start = ceph_clock_now();
+ utime_t start = mono_clock_now();
ldout(cct, 8) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
<< cpp_strerror(-r) << dendl;
}
- lat = ceph_clock_now();
+ lat = mono_clock_now();
lat -= start;
logger->tinc(l_c_fsync, lat);
return *p;
}
+// --- subvolume metrics tracking --- //
+SubvolumeMetricTracker::SubvolumeMetricTracker(CephContext *ct, client_t id) : cct(ct), whoami(id) {}
+
+void SubvolumeMetricTracker::dump(Formatter *f) {
+ auto current_metrics = aggregate(false);
+ f->open_array_section("current_metrics");
+ for (auto &met : current_metrics) {
+ f->dump_object("", met);
+ }
+ f->close_section();
+
+ std::vector<AggregatedIOMetrics> temp;
+ {
+ std::shared_lock l(metrics_lock);
+ temp = last_subvolume_metrics;
+ }
+ f->open_array_section("last_metrics");
+ for (auto &val : temp) {
+ f->dump_object("", val);
+ }
+ f->close_section();
+}
+
+void SubvolumeMetricTracker::add_inode(inodeno_t inode, inodeno_t subvol) {
+ ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << subvol << dendl;
+ std::unique_lock l(metrics_lock);
+ if (likely(inode_subvolume.contains(inode))) {
+ ldout(cct, 10) << __func__ << " " << inode << "-" << subvol << " subv_metric inode exists" << dendl;
+ return;
+ }
+
+ auto [it, inserted] = subvolume_metrics.try_emplace(subvol);
+ if (inserted) {
+ ldout(cct, 10) << __func__ << " inserted " << inode << "-" << subvol << dendl;
+ }
+ inode_subvolume[inode] = subvol;
+ ldout(cct, 10) << __func__ << " add " << inode << "-" << subvol << dendl;
+}
+
+void SubvolumeMetricTracker::remove_inode(inodeno_t inode) {
+ ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << dendl;
+ std::unique_lock l(metrics_lock);
+ auto it = inode_subvolume.find(inode);
+ if (it == inode_subvolume.end()) {
+ return;
+ }
+
+ inodeno_t subvol = it->second;
+ inode_subvolume.erase(it);
+
+ auto se = subvolume_metrics.find(subvol);
+ if (se == subvolume_metrics.end()) {
+ ldout(cct, 20) << __func__ << " subv_metric not found " << inode << "-" << subvol << dendl;
+ return;
+ }
+ ldout(cct, 20) << __func__ << " subv_metric end " << inode << dendl;
+}
+
+void SubvolumeMetricTracker::add_metric(inodeno_t inode, SimpleIOMetric&& metric) {
+ ldout(cct, 10) << __func__ << " " << inode << dendl;
+ std::unique_lock l(metrics_lock);
+
+ auto it = inode_subvolume.find(inode);
+ if (it == inode_subvolume.end())
+ return;
+
+ auto& entry = subvolume_metrics[it->second];
+ entry.metrics.add(std::move(metric));
+}
+
+std::vector<AggregatedIOMetrics>
+SubvolumeMetricTracker::aggregate(bool clean) {
+ ldout(cct, 20) << __func__ << dendl;
+ std::vector<AggregatedIOMetrics> res;
+
+ if (clean) {
+ std::unordered_map<inodeno_t, SubvolumeEntry> tmp;
+ // move subvolume map to the local one, to release wlock asap
+ {
+ std::unique_lock l(metrics_lock);
+ subvolume_metrics.swap(tmp);
+ }
+ res.reserve(tmp.size());
+ for (auto &[subv_id, entry]: tmp) {
+ res.emplace_back(std::move(entry.metrics));
+ res.back().subvolume_id = subv_id;
+ }
+ } else {
+ // on rlock is needed, no need to copy the map to the local instance on the metrics map
+ std::shared_lock l(metrics_lock);
+ res.reserve(subvolume_metrics.size());
+ for (const auto &[subv_id, entry]: subvolume_metrics) {
+ res.emplace_back(entry.metrics);
+ res.back().subvolume_id = subv_id;
+ }
+ }
+
+ {
+ std::unique_lock l(metrics_lock);
+ last_subvolume_metrics = res; // since res holds only 1 aggregated metrics per subvolume, copying is not so bad
+ }
+
+ ldout(cct, 20) << __func__ << " res size " << res.size() << dendl;
+ return res; // return value optimization
+}
+// --- subvolume metrics tracking --- //
StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc,
boost::asio::io_context& ictx)
#include <thread>
#include <unordered_map>
#include <unordered_set>
+#include <shared_mutex>
using std::set;
using std::map;
int fd; // fd attached using fdopendir (-1 if none)
};
+/**
+ * @brief The subvolume metric tracker.
+ *
+ * Maps subvolume_id(which is in fact inode id) to the vector of SimpleIOMetric instances.
+ * Each simple metric records the single IO operation on the client.
+ * On clients metric message to the MDS, client will aggregate all simple metrics for each subvolume
+ * into the AggregatedIOMetric struct and clear the current metrics list.
+ * TODO: limit the cap for each subvolume? in the case client sends metrics to the MDS not so often?
+ */
+class SubvolumeMetricTracker {
+public:
+ struct SubvolumeEntry {
+ AggregatedIOMetrics metrics;
+
+ void dump(Formatter *f) const {
+ f->dump_object("", metrics);
+ }
+ };
+
+ SubvolumeMetricTracker(CephContext *ct, client_t id);
+ void dump(Formatter *f);
+ void add_inode(inodeno_t inode, inodeno_t subvol);
+ void remove_inode(inodeno_t inode);
+ void add_metric(inodeno_t inode, SimpleIOMetric&& metric);
+ std::vector<AggregatedIOMetrics> aggregate(bool clean);
+protected:
+ std::vector<AggregatedIOMetrics> last_subvolume_metrics;
+ std::unordered_map<inodeno_t, SubvolumeEntry> subvolume_metrics;
+ std::unordered_map<inodeno_t, inodeno_t> inode_subvolume;
+ CephContext *cct = nullptr;
+ client_t whoami;
+ std::shared_mutex metrics_lock;
+};
+
class Client : public Dispatcher, public md_config_obs_t {
public:
friend class C_Block_Sync; // Calls block map and protected helpers
void force_session_readonly(MetaSession *s);
void dump_status(Formatter *f); // debug
+ void dump_subvolume_metrics(Formatter* f);
Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override;
C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish,
bool is_read_async, int have_caps, bool movepos,
- utime_t start, Fh *f, Inode *in, uint64_t fpos,
+ Fh *f, Inode *in, uint64_t fpos,
int64_t offset, uint64_t size)
: clnt(clnt), onfinish(onfinish), iofinish(iofinish),
is_read_async(is_read_async), have_caps(have_caps), f(f), in(in),
- start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) {
+ start(mono_clock_now()), fpos(fpos), offset(offset), size(size), movepos(movepos) {
iofinished = false;
}
uint64_t fpos, uint64_t off, uint64_t len,
bufferlist *bl, Filer *filer, int have_caps)
: clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl),
- filer(filer), have_caps(have_caps)
+ filer(filer), have_caps(have_caps), start_time(mono_clock_now())
{
left = len;
wanted = len;
bufferlist tbl;
Filer *filer;
int have_caps;
+ utime_t start_time;
int read;
uint64_t pos;
bool fini;
public:
C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in,
uint64_t fpos, uint64_t off, uint64_t len)
- : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {}
+ : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), start_time(mono_clock_now()) {}
private:
Client *clnt;
Inode *in;
uint64_t off;
uint64_t len;
+ utime_t start_time;
void finish(int r) override;
};
void finish_fsync(int r);
C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
- bool is_file_write, utime_t start, Fh *f, Inode *in,
+ bool is_file_write, Fh *f, Inode *in,
uint64_t fpos, int64_t offset, uint64_t size,
bool do_fsync, bool syncdataonly)
: clnt(clnt), onfinish(onfinish),
- is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
+ is_file_write(is_file_write), start(mono_clock_now()), f(f), in(in), fpos(fpos),
offset(offset), size(size), syncdataonly(syncdataonly) {
iofinished_r = 0;
onuninlinefinished_r = 0;
C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish)
: clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) {
flush_tid = 0;
- start = ceph_clock_now();
+ start = mono_clock_now();
progress = 0;
flush_wait = false;
flush_completed = false;
Client *client;
Fh *f;
+ utime_t start_time = 0;
};
/*
// Cluster fsid
fs_cluster_id_t fscid;
+ // subvolume metrics tracker
+ std::unique_ptr<SubvolumeMetricTracker> subvolume_tracker = nullptr;
+
// file handles, etc.
interval_set<int> free_fd_set; // unused fds
std::unordered_map<int, Fh*> fd_map;
return n;
}
+static inline utime_t mono_clock_now()
+{
+#if defined(__linux__)
+ struct timespec tp;
+ clock_gettime(CLOCK_MONOTONIC, &tp);
+ utime_t n(tp);
+#else
+ struct timeval tv;
+ gettimeofday(&tv, nullptr);
+ utime_t n(&tv);
+#endif
+ return n;
+}
+
#endif
#include "include/int_types.h"
#include "include/stringify.h"
#include "include/utime.h"
+#include "include/fs_types.h"
namespace ceph { class Formatter; }
CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,
CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,
CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
+ CLIENT_METRIC_TYPE_SUBVOLUME_METRICS,
};
inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) {
switch(type) {
case ClientMetricType::CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY:
os << "STDEV_METADATA_LATENCY";
break;
+ case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS:
+ os << "SUBVOLUME_METRICS";
+ break;
default:
os << "(UNKNOWN:" << static_cast<std::underlying_type<ClientMetricType>::type>(type) << ")";
break;
}
};
+/**
+ * @brief Struct to hold single IO metric
+ * To save the memory for clients with high number of subvolumes, the layout is as following:
+ * 64 bits of data:
+ * MSB - read or write (0/1)
+ * 32 bits for latency, in microsecs
+ * 31 bits for payload size, bytes
+ */
+struct SimpleIOMetric {
+ uint64_t packed_data;
+
+ // is_write flag (1 bit) - MSB
+ static constexpr uint64_t OP_TYPE_BITS = 1;
+ static constexpr uint64_t OP_TYPE_SHIFT = 64 - OP_TYPE_BITS; // 63
+ static constexpr uint64_t OP_TYPE_MASK = 1ULL << OP_TYPE_SHIFT; // 0x8000000000000000ULL
+
+ // latency in microseconds (32 bits)
+ // directly after is_write bit
+ static constexpr uint64_t LATENCY_BITS = 32;
+ static constexpr uint64_t LATENCY_SHIFT = OP_TYPE_SHIFT - LATENCY_BITS; // 63 - 32 = 31
+ static constexpr uint64_t LATENCY_MASK = ((1ULL << LATENCY_BITS) - 1) << LATENCY_SHIFT;
+
+ // payload size (31 bits)
+ // Placed directly after latency, occupying the lowest 31 bits
+ static constexpr uint64_t PAYLOAD_SIZE_BITS = 31;
+ static constexpr uint64_t PAYLOAD_SIZE_SHIFT = 0; // Starts from the LSB (bit 0)
+ static constexpr uint64_t PAYLOAD_SIZE_MASK = (1ULL << PAYLOAD_SIZE_BITS) - 1;
+
+ static constexpr uint32_t MAX_LATENCY_US = (1ULL << LATENCY_BITS) - 1; // 2^32 - 1 microseconds
+ static constexpr uint32_t MAX_PAYLOAD_SIZE = (1ULL << PAYLOAD_SIZE_BITS) - 1; // 2^31 - 1 bytes
+
+ SimpleIOMetric(bool is_write, utime_t latency, uint32_t payload_size) : packed_data(0) {
+ if (is_write) {
+ packed_data |= OP_TYPE_MASK;
+ }
+
+ uint64_t current_latency_usec = latency.usec();
+
+ // if less than 1 microsecond - set to 1 microsecond
+ if (current_latency_usec == 0) {
+ current_latency_usec = 1;
+ }
+
+ // handle overflow
+ if (current_latency_usec > MAX_LATENCY_US) {
+ current_latency_usec = MAX_LATENCY_US; // Truncate if too large
+ }
+ packed_data |= (current_latency_usec << LATENCY_SHIFT) & LATENCY_MASK;
+
+
+ // payload_size (31 bits)
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ payload_size = MAX_PAYLOAD_SIZE; // Truncate if too large
+ }
+ packed_data |= (static_cast<uint64_t>(payload_size) << PAYLOAD_SIZE_SHIFT) & PAYLOAD_SIZE_MASK;
+ }
+
+ SimpleIOMetric() : packed_data(0) {}
+
+ bool is_write() const {
+ return (packed_data & OP_TYPE_MASK) != 0;
+ }
+
+ uint32_t get_latency_us() const {
+ return static_cast<uint32_t>((packed_data & LATENCY_MASK) >> LATENCY_SHIFT);
+ }
+
+ uint32_t get_payload_size() const {
+ return static_cast<uint32_t>((packed_data & PAYLOAD_SIZE_MASK) >> PAYLOAD_SIZE_SHIFT);
+ }
+
+ // --- Dump method ---
+ void dump(Formatter *f) const {
+ f->dump_string("op", is_write() ? "w" : "r");
+ f->dump_unsigned("lat_us", get_latency_us());
+ f->dump_unsigned("size", get_payload_size());
+ }
+};
+
+/**
+ * brief holds result of the SimpleIOMetrics aggregation, for each subvolume, on the client
+ * the aggregation occurs just before sending metrics to the MDS
+ */
+struct AggregatedIOMetrics {
+ inodeno_t subvolume_id = 0;
+ uint32_t read_count = 0;
+ uint32_t write_count = 0;
+ uint64_t read_bytes = 0;
+ uint64_t write_bytes = 0;
+ uint64_t read_latency_us = 0;
+ uint64_t write_latency_us = 0;
+ uint64_t time_stamp = 0; // set on MDS
+
+ void add(const SimpleIOMetric& m) {
+ auto lat = m.get_latency_us();
+ if (m.is_write()) {
+ ++write_count;
+ write_bytes += m.get_payload_size();
+ write_latency_us += lat;
+ } else {
+ ++read_count;
+ read_bytes += m.get_payload_size();
+ read_latency_us += lat;
+ }
+ }
+
+ uint64_t avg_read_throughput_Bps() const {
+ return read_latency_us > 0 ? (read_bytes * 1'000'000ULL) / read_latency_us : 0;
+ }
+
+ uint64_t avg_write_throughput_Bps() const {
+ return write_latency_us > 0 ? (write_bytes * 1'000'000ULL) / write_latency_us : 0;
+ }
+
+ uint64_t avg_read_latency_us() const {
+ return read_count ? read_latency_us / read_count : 0;
+ }
+
+ uint64_t avg_write_latency_us() const {
+ return write_count ? write_latency_us / write_count : 0;
+ }
+
+ double read_iops() const {
+ return read_latency_us > 0 ? static_cast<double>(read_count) * 1e6 / read_latency_us : 0.0;
+ }
+
+ double write_iops() const {
+ return write_latency_us > 0 ? static_cast<double>(write_count) * 1e6 / write_latency_us : 0.0;
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_unsigned("subvolume_id", subvolume_id);
+ f->dump_unsigned("read_count", read_count);
+ f->dump_unsigned("read_bytes", read_bytes);
+ f->dump_unsigned("avg_read_latency_us", avg_read_latency_us());
+ f->dump_unsigned("read_iops", read_iops());
+ f->dump_float("avg_read_tp_Bps", avg_read_throughput_Bps());
+ f->dump_unsigned("write_count", write_count);
+ f->dump_unsigned("write_bytes", write_bytes);
+ f->dump_unsigned("avg_write_latency_us", avg_write_latency_us());
+ f->dump_float("write_iops", write_iops());
+ f->dump_float("avg_write_tp_Bps", avg_write_throughput_Bps());
+ f->dump_unsigned("time_stamp", time_stamp);
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const AggregatedIOMetrics& m) {
+ os << "{subvolume_id=\"" << m.subvolume_id
+ << "\", read_count=" << m.read_count
+ << ", write_count=" << m.write_count
+ << ", read_bytes=" << m.read_bytes
+ << ", write_bytes=" << m.write_bytes
+ << ", read_latency_ns=" << m.read_latency_us
+ << ", write_latency_ns=" << m.write_latency_us
+ << ", time_stamp_ms=" << m.time_stamp
+ << "}";
+ return os;
+ }
+
+ void encode(bufferlist& bl) const {
+ using ceph::encode;
+ ENCODE_START(1, 1, bl);
+ encode(subvolume_id, bl);
+ encode(read_count, bl);
+ encode(write_count, bl);
+ encode(read_bytes, bl);
+ encode(write_bytes, bl);
+ encode(read_latency_us, bl);
+ encode(write_latency_us, bl);
+ encode(time_stamp, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator& p) {
+ using ceph::decode;
+ DECODE_START(1, p);
+ decode(subvolume_id, p);
+ decode(read_count, p);
+ decode(write_count, p);
+ decode(read_bytes, p);
+ decode(write_bytes, p);
+ decode(read_latency_us, p);
+ decode(write_latency_us, p);
+ decode(time_stamp, p);
+ DECODE_FINISH(p);
+ }
+};
+
+struct SubvolumeMetricsPayload : public ClientMetricPayloadBase {
+ std::vector<AggregatedIOMetrics> subvolume_metrics;
+
+ SubvolumeMetricsPayload() : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS) { }
+ SubvolumeMetricsPayload(std::vector<AggregatedIOMetrics> metrics)
+ : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS), subvolume_metrics(std::move(metrics)) {}
+
+ void encode(bufferlist &bl) const {
+ using ceph::encode;
+ ENCODE_START(1, 1, bl);
+ encode(subvolume_metrics.size(), bl);
+ for(auto const& m : subvolume_metrics) {
+ m.encode(bl);
+ }
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::const_iterator &iter) {
+ using ceph::decode;
+ DECODE_START(1, iter) ;
+ size_t size = 0;
+ decode(size, iter);
+ subvolume_metrics.clear();
+ subvolume_metrics.reserve(size);
+ for (size_t i = 0; i < size; ++i) {
+ subvolume_metrics.emplace_back();
+ subvolume_metrics.back().decode(iter);
+ }
+ DECODE_FINISH(iter);
+ }
+
+ void dump(Formatter *f) const {
+ f->open_array_section("metrics");
+ for(auto const& m : subvolume_metrics)
+ m.dump(f);
+ f->close_section();
+ }
+
+ void print(std::ostream *out) const {
+ *out << "metrics_count: " << subvolume_metrics.size();
+ }
+};
+
typedef std::variant<CapInfoPayload,
- ReadLatencyPayload,
- WriteLatencyPayload,
- MetadataLatencyPayload,
- DentryLeasePayload,
- OpenedFilesPayload,
- PinnedIcapsPayload,
- OpenedInodesPayload,
- ReadIoSizesPayload,
- WriteIoSizesPayload,
- UnknownPayload> ClientMetricPayload;
+ ReadLatencyPayload,
+ WriteLatencyPayload,
+ MetadataLatencyPayload,
+ DentryLeasePayload,
+ OpenedFilesPayload,
+ PinnedIcapsPayload,
+ OpenedInodesPayload,
+ ReadIoSizesPayload,
+ WriteIoSizesPayload,
+ SubvolumeMetricsPayload,
+ UnknownPayload> ClientMetricPayload;
// metric update message sent by clients
struct ClientMetricMessage {
case ClientMetricType::CLIENT_METRIC_TYPE_WRITE_IO_SIZES:
payload = WriteIoSizesPayload();
break;
+ case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS:
+ payload = SubvolumeMetricsPayload();
+ break;
default:
payload = UnknownPayload(static_cast<ClientMetricType>(metric_type));
break;
* note: encoding matches MClientReply::InodeStat
*/
if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
- ENCODE_START(8, 1, bl);
+ ENCODE_START(9, 1, bl);
encode(std::tuple{
oi->ino,
snapid,
encode(file_i->fscrypt_auth, bl);
encode(file_i->fscrypt_file, bl);
encode_nohead(optmdbl, bl);
+ encode(get_subvolume_id(), bl);
// encode inodestat
ENCODE_FINISH(bl);
}
}
}
+inodeno_t CInode::get_subvolume_id() const {
+ auto snapr = find_snaprealm();
+ return snapr ? snapr->get_subvolume_ino() : inodeno_t(0);
+}
+
void CInode::queue_export_pin(mds_rank_t export_pin)
{
if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
* @returns a pool ID >=0
*/
int64_t get_backtrace_pool() const;
-
+ inodeno_t get_subvolume_id() const;
protected:
ceph_lock_state_t *get_fcntl_lock_state() {
if (!fcntl_locks)
return cap;
}
+void Locker::maybe_set_subvolume_id(const CInode* inode, ref_t<MClientCaps>& ack) {
+ if (ack && inode) {
+ ack->subvolume_id = inode->get_subvolume_id();
+ dout(10) << "setting subvolume id " << ack->subvolume_id << " for inode " << inode->ino().val << dendl;
+ }
+}
+
void Locker::issue_caps_set(set<CInode*>& inset)
{
for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty);
}
+ CInode *head_in = mdcache->get_inode(m->get_ino());
if (m->get_client_tid() > 0 && session &&
session->have_completed_flush(m->get_client_tid())) {
dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
if (op == CEPH_CAP_OP_FLUSHSNAP) {
if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, 0, mds->get_osd_epoch_barrier());
+ maybe_set_subvolume_id(head_in, ack);
} else {
if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, 0, mds->get_osd_epoch_barrier());
+ maybe_set_subvolume_id(head_in, ack);
}
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
}
}
- CInode *head_in = mdcache->get_inode(m->get_ino());
if (!head_in) {
if (mds->is_clientreplay()) {
dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+ maybe_set_subvolume_id(head_in, ack);
}
if (in == head_in ||
m->get_caps(), 0, dirty, 0, cap->get_last_issue(), mds->get_osd_epoch_barrier());
ack->set_client_tid(m->get_client_tid());
ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+ maybe_set_subvolume_id(head_in, ack);
}
// filter wanted based on what we could ever give out (given auth/replica status)
void file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
client_t client, const ref_t<MClientCaps> &ack);
+ void maybe_set_subvolume_id(const CInode* head_in, ref_t<MClientCaps>& ack);
xlist<ScatterLock*> updated_scatterlocks;
// Maintain a global list to quickly find if any caps are late revoking
return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
}
+std::string MDSRank::get_path(inodeno_t ino) {
+ std::lock_guard locker(mds_lock);
+ CInode* inode = mdcache->get_inode(ino);
+ if (!inode) return {};
+ std::string res;
+ inode->make_path_string(res);
+ return res;
+}
+
std::vector<std::string> MDSRankDispatcher::get_tracked_keys()
const noexcept
{
return inject_journal_corrupt_dentry_first;
}
+ std::string get_path(inodeno_t ino);
+
// Reference to global MDS::mds_lock, so that users of MDSRank don't
// carry around references to the outer MDS, and we can substitute
// a separate lock here in future potentially.
#include "MetricsHandler.h"
+#include <variant>
+
#include "common/debug.h"
#include "common/errno.h"
-#include "include/cephfs/metrics/Types.h"
#include "messages/MClientMetrics.h"
#include "messages/MMDSMetrics.h"
metrics.write_io_sizes_metric.updated = true;
}
+void MetricsHandler::handle_payload(Session* session, const SubvolumeMetricsPayload& payload, std::unique_lock<ceph::mutex>& lk) {
+ dout(20) << ": type=" << payload.get_type() << ", session=" << session
+ << " , subv_metrics count=" << payload.subvolume_metrics.size() << dendl;
+
+ ceph_assert(lk.owns_lock()); // caller must hold the lock
+
+ std::vector<std::string> resolved_paths;
+ resolved_paths.reserve(payload.subvolume_metrics.size());
+
+ // RAII guard: unlock on construction, re-lock on destruction (even on exceptions)
+ struct UnlockGuard {
+ std::unique_lock<ceph::mutex> &lk;
+ explicit UnlockGuard(std::unique_lock<ceph::mutex>& l) : lk(l) { lk.unlock(); }
+ ~UnlockGuard() noexcept {
+ if (!lk.owns_lock()) {
+ try { lk.lock(); }
+ catch (...) {
+ dout(0) << "failed to re-lock in UnlockGuard dtor" << dendl;
+ }
+ }
+ }
+ } unlock_guard{lk};
+
+ // unlocked: resolve paths, no contention with mds lock
+ for (const auto& metric : payload.subvolume_metrics) {
+ std::string path = mds->get_path(metric.subvolume_id);
+ if (path.empty()) {
+ dout(10) << " path not found for " << metric.subvolume_id << dendl;
+ }
+ resolved_paths.emplace_back(std::move(path));
+ }
+
+ // locked again (via UnlockGuard dtor): update metrics map
+ const auto now_ms = static_cast<int64_t>(
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+std::chrono::steady_clock::now().time_since_epoch()).count());
+
+ // Keep index pairing but avoid double map lookup
+ for (size_t i = 0; i < resolved_paths.size(); ++i) {
+ const auto& path = resolved_paths[i];
+ if (path.empty()) continue;
+
+ auto& vec = subvolume_metrics_map[path];
+
+ dout(20) << " accumulating subv_metric " << payload.subvolume_metrics[i] << dendl;
+ vec.emplace_back(std::move(payload.subvolume_metrics[i]));
+ vec.back().time_stamp = now_ms;
+ }
+}
+
void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl;
}
return;
}
- std::scoped_lock locker(lock);
+ std::unique_lock locker(lock);
Session *session = mds->get_session(m);
dout(20) << ": session=" << session << dendl;
}
for (auto &metric : m->updates) {
- std::visit(HandlePayloadVisitor(this, session), metric.payload);
+ // Special handling for SubvolumeMetricsPayload to avoid lock contention
+ if (auto* subv_payload = std::get_if<SubvolumeMetricsPayload>(&metric.payload)) {
+ // this handles the subvolume metrics payload without acquiring the mds lock
+ // should not call the visitor pattern here
+ handle_payload(session, *subv_payload, locker);
+ } else {
+ std::visit(HandlePayloadVisitor(this, session), metric.payload);
+ }
}
}
}
}
+ // subvolume metrics, reserve 100 entries per subvolume ? good enough?
+ metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100);
+ for (auto &[path, aggregated_metrics] : subvolume_metrics_map) {
+ metrics_message.subvolume_metrics.emplace_back();
+ aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back());
+ }
+ // if we need to show local MDS metrics, we need to save a last copy...
+ subvolume_metrics_map.clear();
+
// only start incrementing when its kicked via set_next_seq()
if (next_seq != 0) {
++last_updated_seq;
}
- dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
- << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
- << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
+ dout(20) << ": sending " << metrics_message.subvolume_metrics.size() << " subv_metrics to aggregator"
+ << dendl;
mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
}
+
+void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path,
+ const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res) {
+ dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl;
+ res.subvolume_path = subvolume_path;
+
+ uint64_t weighted_read_latency_sum = 0;
+ uint64_t weighted_write_latency_sum = 0;
+
+ res.read_ops = 0;
+ res.write_ops = 0;
+ res.read_size = 0;
+ res.write_size = 0;
+ res.avg_read_latency = 0;
+ res.avg_write_latency = 0;
+ res.time_stamp = 0;
+
+ for (const auto& m : metrics_list) {
+ res.read_ops += m.read_count;
+ res.write_ops += m.write_count;
+ res.read_size += m.read_bytes;
+ res.write_size += m.write_bytes;
+ // we want to have more metrics in the sliding window (on the aggregator),
+ // so we set the latest timestamp of all received metrics
+ res.time_stamp = std::max(res.time_stamp, m.time_stamp);
+
+ if (m.read_count > 0) {
+ weighted_read_latency_sum += m.read_latency_us * m.read_count;
+ }
+
+ if (m.write_count > 0) {
+ weighted_write_latency_sum += m.write_latency_us * m.write_count;
+ }
+ }
+
+ // normalize latencies
+ res.avg_read_latency = (res.read_ops > 0)
+ ? (weighted_read_latency_sum / res.read_ops)
+ : 0;
+ res.avg_write_latency = (res.write_ops > 0)
+ ? (weighted_write_latency_sum / res.write_ops)
+ : 0;
+}
\ No newline at end of file
#define CEPH_MDS_METRICS_HANDLER_H
#include <map>
+#include <mutex>
+#include <unordered_map>
#include <thread>
#include <utility>
#include "common/ceph_mutex.h"
#include "MDSPerfMetricTypes.h"
+#include "include/cephfs/metrics/Types.h"
#include <boost/optional.hpp>
#include <boost/variant/static_visitor.hpp>
struct OpenedInodesPayload;
struct ReadIoSizesPayload;
struct WriteIoSizesPayload;
+struct SubvolumeMetricsPayload;
struct UnknownPayload;
+struct AggregatedIOMetrics;
class MClientMetrics;
class MDSMap;
class MDSRank;
inline void operator()(const ClientMetricPayload &payload) const {
metrics_handler->handle_payload(session, payload);
}
+
+ // Specialization for SubvolumeMetricsPayload - should not be called
+ // as it's handled specially in handle_client_metrics
+ // just for the compiler to be happy with visitor pattern
+ inline void operator()(const SubvolumeMetricsPayload &) const {
+ ceph_abort_msg("SubvolumeMetricsPayload should be handled specially");
+ }
};
+ std::unique_ptr<PerfCounters> create_subv_perf_counter(const std::string& subv_name);
+
MDSRank *mds;
// drop this lock when calling ->send_message_mds() else mds might
// deadlock
std::thread updater;
std::map<entity_inst_t, std::pair<version_t, Metrics>> client_metrics_map;
-
- // address of rank 0 mds, so that the message can be sent without
+ // maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance
+ std::unordered_map<std::string, std::vector<AggregatedIOMetrics>> subvolume_metrics_map;
+ uint64_t subv_metrics_tracker_window_time_sec = 300;
+ // address of rank 0 mds, so that the message can be sent withoutå
// acquiring mds_lock. misdirected messages to rank 0 are taken
// care of by rank 0.
boost::optional<entity_addrvec_t> addr_rank0;
void handle_payload(Session *session, const OpenedInodesPayload &payload);
void handle_payload(Session *session, const ReadIoSizesPayload &payload);
void handle_payload(Session *session, const WriteIoSizesPayload &payload);
+ void handle_payload(Session *session, const SubvolumeMetricsPayload &payload, std::unique_lock<ceph::mutex> &lock_guard);
void handle_payload(Session *session, const UnknownPayload &payload);
void set_next_seq(version_t seq);
void handle_mds_ping(const cref_t<MMDSPing> &m);
void update_rank0();
+
+ void aggregate_subvolume_metrics(const std::string& subvolume_path,
+ const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res);
};
#endif // CEPH_MDS_METRICS_HANDLER_H
CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, \
CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, \
- CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
+ CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
+ CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, \
}
#define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL
{
out << "{rval: " << rval << ", scan_idx=" << scan_idx << ", blocks=" << blocks << "}";
}
+
+void SubvolumeMetric::dump(Formatter *f) const {
+ f->dump_string("subvolume_path", subvolume_path);
+ f->dump_unsigned("read_ops", read_ops);
+ f->dump_unsigned("write_ops", write_ops);
+ f->dump_unsigned("read_size", read_size);
+ f->dump_unsigned("write_size", write_size);
+ f->dump_unsigned("avg_read_latency", avg_read_latency);
+ f->dump_unsigned("avg_write_latency", avg_write_latency);
+ f->dump_unsigned("time_window_sec", time_stamp);
+}
+
+std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) {
+ os << "{subv_path=" << m.subvolume_path
+ << ", read_ops=" << m.read_ops
+ << ", write_ops=" << m.write_ops
+ << ", read_size=" << m.read_size
+ << ", write_size=" << m.write_size
+ << ", avg_read_lat=" << m.avg_read_latency
+ << ", avg_write_lat=" << m.avg_write_latency
+ << ", time_window_sec=" << m.time_stamp << "}";
+ return os;
+}
#include <map>
#include <string>
#include <string_view>
+#include <shared_mutex>
#include "common/DecayCounter.h"
#include "common/entity_name.h"
};
WRITE_CLASS_ENCODER(BlockDiff);
+/**
+ *
+ * @brief Represents aggregated metric per subvolume
+ * This aggregation is a result of aggregating multiple
+ * AggregatedIOMetric instances from various clients
+ * (AggregatedIOMetric is created on the client by aggregating multiple SimpleIOMetric instances)
+ */
+struct SubvolumeMetric {
+ std::string subvolume_path;
+ uint64_t read_ops = 0;
+ uint64_t write_ops = 0;
+ uint64_t read_size = 0;
+ uint64_t write_size = 0;
+ uint64_t avg_read_latency = 0;
+ uint64_t avg_write_latency = 0;
+ uint64_t time_stamp = 0;
+
+ DENC(SubvolumeMetric, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.subvolume_path, p);
+ denc(v.read_ops, p);
+ denc(v.write_ops, p);
+ denc(v.read_size, p);
+ denc(v.write_size, p);
+ denc(v.avg_read_latency, p);
+ denc(v.avg_write_latency, p);
+ denc(v.time_stamp, p);
+ DENC_FINISH(p);
+ }
+
+ void dump(Formatter *f) const;
+ friend std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m);
+};
#endif
class MClientCaps final : public SafeMessage {
private:
- static constexpr int HEAD_VERSION = 12;
+ static constexpr int HEAD_VERSION = 13;
static constexpr int COMPAT_VERSION = 1;
public:
std::vector<uint8_t> fscrypt_auth;
std::vector<uint8_t> fscrypt_file;
+ uint64_t subvolume_id = 0;
int get_caps() const { return head.caps; }
int get_wanted() const { return head.wanted; }
decode(fscrypt_auth, p);
decode(fscrypt_file, p);
}
+ if (header.version >= 13) {
+ decode(subvolume_id, p);
+ }
}
void encode_payload(uint64_t features) override {
using ceph::encode;
encode(nsubdirs, payload);
encode(fscrypt_auth, payload);
encode(fscrypt_file, payload);
+ encode(subvolume_id, payload);
}
private:
template<class T, typename... Args>
std::vector<uint8_t> fscrypt_file;
optmetadata_multiton<optmetadata_singleton_client_t,std::allocator> optmetadata;
+ inodeno_t subvolume_id;
public:
InodeStat() {}
void decode(ceph::buffer::list::const_iterator &p, const uint64_t features) {
using ceph::decode;
if (features == (uint64_t)-1) {
- DECODE_START(8, p);
+ DECODE_START(9, p);
decode(vino.ino, p);
decode(vino.snapid, p);
decode(rdev, p);
if (struct_v >= 8) {
decode(optmetadata, p);
}
+ if (struct_v >= 9) {
+ decode(subvolume_id, p);
+ }
DECODE_FINISH(p);
}
else {