From: Igor Golikov Date: Thu, 10 Jul 2025 10:17:36 +0000 (+0000) Subject: client,mds: add support for subvolume level metrics X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fc9bf6968b39153d182ee9acf56e46ca4f82671c;p=ceph-ci.git client,mds: add support for subvolume level metrics Add support for client side metrics collection using SimpleIOMetric struct and aggregation using AggregatedIOMetrics struct, Client holds SimpleIOMetrics vector per each subvolume it recognized (via caps/metadata messages), aggregates them into the AggregatedIOMetric struct, and sends periodically to the MDS, along with regulat client metrics. MDS holds map of subvolume_path -> vector and sends it periodically to rank0, for further aggregation and exposure. Resolves: ISCE-2037 Fixes: https://tracker.ceph.com/issues/68929, https://tracker.ceph.com/issues/68930 Signed-off-by: Igor Golikov (cherry picked from commit c376635d5d5572d64fa76a82a7461ce917186047) (cherry picked from commit 9004492bc93676c5dbb1b864d7f07b14d52e8387) Conflicts: src/client/Client.cc src/client/Client.h src/include/cephfs/metrics/Types.h src/mds/MetricsHandler.cc src/mds/MetricsHandler.h src/mds/cephfs_features.h Resolve conflicts with fscrypt related changes on the client side and CopyIoSizesPayload metrics type on the MDS side. --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 7d6e6e4781b..e3f2b5bc178 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -247,6 +247,8 @@ int Client::CommandHook::call( m_client->_kick_stale_sessions(); else if (command == "status") m_client->dump_status(f); + else if (command == "dump_subvolume_metrics") + m_client->dump_subvolume_metrics(f); else ceph_abort_msg("bad command registered"); } @@ -400,7 +402,8 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_) async_ino_releasor(m->cct), objecter_finisher(m->cct), m_command_hook(this), - fscid(0) + fscid(0), + subvolume_tracker{std::make_unique(cct, whoami)} { /* We only use the locale for normalization/case folding. That is unaffected * by the locale but required by the API. @@ -626,6 +629,10 @@ void Client::dump_status(Formatter *f) } } +void Client::dump_subvolume_metrics(Formatter* f) { + subvolume_tracker->dump(f); +} + void Client::_pre_init() { timer.init(); @@ -716,6 +723,13 @@ void Client::_finish_init() lderr(cct) << "error registering admin socket command: " << cpp_strerror(-ret) << dendl; } + ret = admin_socket->register_command("dump_subvolume_metrics_aggr", + &m_command_hook, + "dump aggregated subvolume metrics"); + if (ret < 0) { + lderr(cct) << "error registering admin socket command: " + << cpp_strerror(-ret) << dendl; + } } void Client::shutdown() @@ -855,7 +869,6 @@ void Client::update_io_stat_write(utime_t latency) { logger->set(l_c_wr_sqsum, n_sqsum); logger->set(l_c_wr_ops, nr_write_request); } - // =================== // metadata cache stuff @@ -1808,6 +1821,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session) in = add_update_inode(&ist, request->sent_stamp, session, request->perms); + if (ist.subvolume_id) { + ldout(cct, 20) << __func__ << " subv_metric adding " << in->ino << "-" << ist.subvolume_id << dendl; + subvolume_tracker->add_inode(in->ino, ist.subvolume_id); + } } Inode *diri = NULL; @@ -2320,7 +2337,7 @@ int Client::make_request(MetaRequest *request, *pdirbl = reply->get_extra_bl(); // -- log times -- - utime_t lat = ceph_clock_now(); + utime_t lat = mono_clock_now(); lat -= request->sent_stamp; ldout(cct, 20) << "lat " << lat << dendl; @@ -5604,6 +5621,12 @@ void Client::handle_caps(const MConstRef& m) got_mds_push(session.get()); + // check whether the current inode is under subvolume for metrics collection + if (m->subvolume_id > 0) { + ldout(cct, 10) << __func__ << " adding " << m->get_ino() << " to subvolume tracker " << m->subvolume_id << dendl; + subvolume_tracker->add_inode(m->get_ino(), m->subvolume_id); + } + bool do_cap_release = false; Inode *in; vinodeno_t vino(m->get_ino(), CEPH_NOSNAP); @@ -7279,8 +7302,13 @@ void Client::flush_cap_releases() for (auto &p : mds_sessions) { auto session = p.second; if (session->release && mdsmap->is_clientreplay_or_active_or_stopping( - p.first)) { + p.first)) { nr_caps += session->release->caps.size(); + for (const auto &cap: session->release->caps) { + ldout(cct, 10) << __func__ << " removing " << static_cast(cap.ino) << + " from subvolume tracker" << dendl; + subvolume_tracker->remove_inode(static_cast(cap.ino)); + } if (cct->_conf->client_inject_release_failure) { ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl; } else { @@ -7516,6 +7544,18 @@ void Client::collect_and_send_global_metrics() { message.push_back(metric); } + // subvolume metrics + if (_collect_and_send_global_metrics || + session->mds_metric_flags.test(CLIENT_METRIC_TYPE_SUBVOLUME_METRICS)) { + auto metrics = subvolume_tracker->aggregate(true); + if (!metrics.empty()) { + for (auto m: metrics) + ldout(cct, 20) << " sending subv_metric " << m << dendl; + metric = ClientMetricMessage(SubvolumeMetricsPayload(metrics)); + message.push_back(metric); + } + } + session->con->send_message2(make_message(std::move(message))); } @@ -11199,8 +11239,9 @@ void Client::C_Read_Finisher::finish_io(int r) f->pos = offset + r; } - lat = ceph_clock_now(); + lat = mono_clock_now(); lat -= start; + clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, static_cast(r)}); ++clnt->nr_read_request; clnt->update_io_stat_read(lat); } @@ -11322,6 +11363,9 @@ success: r = read; } } + + r = read; + clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric(false, mono_clock_now() - start_time, r)); error: onfinish->complete(r); @@ -11343,7 +11387,7 @@ int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl, const auto& conf = cct->_conf; Inode *in = f->inode.get(); utime_t lat; - utime_t start = ceph_clock_now(); + utime_t start = mono_clock_now(); CRF_iofinish *crf_iofinish = nullptr; ldout(cct, 10) << __func__ << " " << *in << " " << offset << "~" << size << dendl; @@ -11416,7 +11460,7 @@ retry: conf->client_oc && (have & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)), - have, movepos, start, f, in, f->pos, offset, size)); + have, movepos, f, in, f->pos, offset, size)); crf_iofinish->CRF = crf.get(); } @@ -11531,9 +11575,9 @@ success: f->pos = start_pos + rc; } - lat = ceph_clock_now(); + lat = mono_clock_now(); lat -= start; - + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, bl->length()}); ++nr_read_request; update_io_stat_read(lat); @@ -11549,7 +11593,7 @@ done: } Client::C_Readahead::C_Readahead(Client *c, Fh *f) : - client(c), f(f) { + client(c), f(f), start_time(mono_clock_now()) { f->get(); f->readahead.inc_pending(); } @@ -11564,6 +11608,7 @@ void Client::C_Readahead::finish(int r) { client->put_cap_ref(f->inode.get(), CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); if (r > 0) { client->update_read_io_size(r); + client->subvolume_tracker->add_metric(f->inode->ino, SimpleIOMetric(false, mono_clock_now()-start_time, r)); } } @@ -11681,9 +11726,11 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, io_finish.reset(io_finish_cond); } + auto start_time = mono_clock_now(); std::vector holes; r = objectcacher->file_read_ex(&in->oset, &in->layout, in->snapid, read_start, read_len, bl, 0, &holes, io_finish.get()); + if (onfinish != nullptr) { // put the cap ref since we're releasing C_Read_Async_Finisher put_cap_ref(in, CEPH_CAP_FILE_CACHE); @@ -11706,6 +11753,7 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, client_lock.lock(); put_cap_ref(in, CEPH_CAP_FILE_CACHE); + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, mono_clock_now() - start_time, bl->length()}); } else { put_cap_ref(in, CEPH_CAP_FILE_CACHE); } @@ -11949,9 +11997,10 @@ int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos, update_write_io_size(size); // time - lat = ceph_clock_now(); + lat = mono_clock_now(); lat -= start; + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{true, lat, static_cast(size)}); ++nr_write_request; update_io_stat_write(lat); @@ -12391,7 +12440,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, (uint64_t)(offset+size) > in->size ) { //exceeds filesize return -EFBIG; } - //ldout(cct, 7) << "write fh " << fh << " size " << size << " offset " << offset << dendl; + ldout(cct, 7) << "_write fh " << f << " size " << size << " offset " << offset << dendl; if (objecter->osdmap_pool_full(in->layout.pool_id)) { return -ENOSPC; @@ -12434,7 +12483,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, ldout(cct, 10) << "cur file size is " << in->size << dendl; // time it. - utime_t start = ceph_clock_now(); + utime_t start = mono_clock_now(); if (in->inline_version == 0) { int r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true); @@ -12546,7 +12595,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, cct->_conf->client_oc && (have & (CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO)), - start, f, in, fpos, + f, in, fpos, request_offset, request_size, offset, size, do_fsync, syncdataonly, enc_mgr->encrypted())); @@ -12559,6 +12608,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, // do buffered write + ldout(cct, 10) << " _write_oc " << dendl; r = enc_mgr->read_modify_write(iofinish.get()); if (r < 0) { ldout(cct, 0) << __func__ << "(): enc_mgr read failed (r=" << r << ")" << dendl; @@ -12594,6 +12644,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, } // allow caller to wait on onfinish... + ldout(cct, 10) << " _write_oc_1" << dendl; return 0; } @@ -12637,6 +12688,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, cwf.release(); // allow caller to wait on onfinish... + ldout(cct, 10) << " _write_filer_2" << dendl; return 0; } @@ -12652,6 +12704,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf, success: // do not get here if non-blocking caller (onfinish != nullptr) + ldout(cct, 10) << " _write_filer_succeess" << dendl; r = _write_success(f, start, fpos, request_offset, request_size, enc_mgr->get_ofs(), enc_mgr->get_size(), in, enc_mgr->encrypted()); if (r >= 0 && do_fsync) { @@ -12922,7 +12975,7 @@ void Client::C_nonblocking_fsync_state::advance() utime_t lat; - lat = ceph_clock_now(); + lat = mono_clock_now(); lat -= start; clnt->logger->tinc(l_c_fsync, lat); @@ -12976,7 +13029,7 @@ int Client::_fsync(Inode *in, bool syncdataonly) ceph_tid_t flush_tid = 0; InodeRef tmp_ref; utime_t lat; - utime_t start = ceph_clock_now(); + utime_t start = mono_clock_now(); ldout(cct, 8) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl; @@ -13029,7 +13082,7 @@ int Client::_fsync(Inode *in, bool syncdataonly) << cpp_strerror(-r) << dendl; } - lat = ceph_clock_now(); + lat = mono_clock_now(); lat -= start; logger->tinc(l_c_fsync, lat); @@ -18711,6 +18764,113 @@ int Client::fcopyfile(const char *spath, const char *dpath, UserPerm& perms, mod return 0; } +// --- subvolume metrics tracking --- // +SubvolumeMetricTracker::SubvolumeMetricTracker(CephContext *ct, client_t id) : cct(ct), whoami(id) {} + +void SubvolumeMetricTracker::dump(Formatter *f) { + auto current_metrics = aggregate(false); + f->open_array_section("current_metrics"); + for (auto &met : current_metrics) { + f->dump_object("", met); + } + f->close_section(); + + std::vector temp; + { + std::shared_lock l(metrics_lock); + temp = last_subvolume_metrics; + } + f->open_array_section("last_metrics"); + for (auto &val : temp) { + f->dump_object("", val); + } + f->close_section(); +} + +void SubvolumeMetricTracker::add_inode(inodeno_t inode, inodeno_t subvol) { + ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << subvol << dendl; + std::unique_lock l(metrics_lock); + if (likely(inode_subvolume.contains(inode))) { + ldout(cct, 10) << __func__ << " " << inode << "-" << subvol << " subv_metric inode exists" << dendl; + return; + } + + auto [it, inserted] = subvolume_metrics.try_emplace(subvol); + if (inserted) { + ldout(cct, 10) << __func__ << " inserted " << inode << "-" << subvol << dendl; + } + inode_subvolume[inode] = subvol; + ldout(cct, 10) << __func__ << " add " << inode << "-" << subvol << dendl; +} + +void SubvolumeMetricTracker::remove_inode(inodeno_t inode) { + ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << dendl; + std::unique_lock l(metrics_lock); + auto it = inode_subvolume.find(inode); + if (it == inode_subvolume.end()) { + return; + } + + inodeno_t subvol = it->second; + inode_subvolume.erase(it); + + auto se = subvolume_metrics.find(subvol); + if (se == subvolume_metrics.end()) { + ldout(cct, 20) << __func__ << " subv_metric not found " << inode << "-" << subvol << dendl; + return; + } + ldout(cct, 20) << __func__ << " subv_metric end " << inode << dendl; +} + +void SubvolumeMetricTracker::add_metric(inodeno_t inode, SimpleIOMetric&& metric) { + ldout(cct, 10) << __func__ << " " << inode << dendl; + std::unique_lock l(metrics_lock); + + auto it = inode_subvolume.find(inode); + if (it == inode_subvolume.end()) + return; + + auto& entry = subvolume_metrics[it->second]; + entry.metrics.add(std::move(metric)); +} + +std::vector +SubvolumeMetricTracker::aggregate(bool clean) { + ldout(cct, 20) << __func__ << dendl; + std::vector res; + + if (clean) { + std::unordered_map tmp; + // move subvolume map to the local one, to release wlock asap + { + std::unique_lock l(metrics_lock); + subvolume_metrics.swap(tmp); + } + res.reserve(tmp.size()); + for (auto &[subv_id, entry]: tmp) { + res.emplace_back(std::move(entry.metrics)); + res.back().subvolume_id = subv_id; + } + } else { + // on rlock is needed, no need to copy the map to the local instance on the metrics map + std::shared_lock l(metrics_lock); + res.reserve(subvolume_metrics.size()); + for (const auto &[subv_id, entry]: subvolume_metrics) { + res.emplace_back(entry.metrics); + res.back().subvolume_id = subv_id; + } + } + + { + std::unique_lock l(metrics_lock); + last_subvolume_metrics = res; // since res holds only 1 aggregated metrics per subvolume, copying is not so bad + } + + ldout(cct, 20) << __func__ << " res size " << res.size() << dendl; + return res; // return value optimization +} +// --- subvolume metrics tracking --- // + StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc, boost::asio::io_context& ictx) : Client(m, mc, new Objecter(m->cct, m, mc, ictx)) diff --git a/src/client/Client.h b/src/client/Client.h index b8ded1f9faa..4c67116082f 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -54,6 +54,7 @@ #include #include #include +#include using std::set; using std::map; @@ -259,6 +260,40 @@ struct dir_result_t { int fd; // fd attached using fdopendir (-1 if none) }; +/** + * @brief The subvolume metric tracker. + * + * Maps subvolume_id(which is in fact inode id) to the vector of SimpleIOMetric instances. + * Each simple metric records the single IO operation on the client. + * On clients metric message to the MDS, client will aggregate all simple metrics for each subvolume + * into the AggregatedIOMetric struct and clear the current metrics list. + * TODO: limit the cap for each subvolume? in the case client sends metrics to the MDS not so often? + */ +class SubvolumeMetricTracker { +public: + struct SubvolumeEntry { + AggregatedIOMetrics metrics; + + void dump(Formatter *f) const { + f->dump_object("", metrics); + } + }; + + SubvolumeMetricTracker(CephContext *ct, client_t id); + void dump(Formatter *f); + void add_inode(inodeno_t inode, inodeno_t subvol); + void remove_inode(inodeno_t inode); + void add_metric(inodeno_t inode, SimpleIOMetric&& metric); + std::vector aggregate(bool clean); +protected: + std::vector last_subvolume_metrics; + std::unordered_map subvolume_metrics; + std::unordered_map inode_subvolume; + CephContext *cct = nullptr; + client_t whoami; + std::shared_mutex metrics_lock; +}; + class Client : public Dispatcher, public md_config_obs_t { public: friend class C_Block_Sync; // Calls block map and protected helpers @@ -1203,6 +1238,7 @@ protected: void force_session_readonly(MetaSession *s); void dump_status(Formatter *f); // debug + void dump_subvolume_metrics(Formatter* f); Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override; @@ -1381,11 +1417,11 @@ private: C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish, bool is_read_async, int have_caps, bool movepos, - utime_t start, Fh *f, Inode *in, uint64_t fpos, + Fh *f, Inode *in, uint64_t fpos, int64_t offset, uint64_t size) : clnt(clnt), onfinish(onfinish), iofinish(iofinish), is_read_async(is_read_async), have_caps(have_caps), f(f), in(in), - start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) { + start(mono_clock_now()), fpos(fpos), offset(offset), size(size), movepos(movepos) { iofinished = false; } @@ -1435,7 +1471,7 @@ private: uint64_t fpos, uint64_t off, uint64_t len, bufferlist *bl, Filer *filer, int have_caps) : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl), - filer(filer), have_caps(have_caps) + filer(filer), have_caps(have_caps), start_time(mono_clock_now()) { left = len; wanted = len; @@ -1462,6 +1498,7 @@ private: bufferlist tbl; Filer *filer; int have_caps; + utime_t start_time; int read; uint64_t pos; bool fini; @@ -1485,9 +1522,8 @@ private: FSCryptFDataDencRef denc, uint64_t read_start, uint64_t read_len) - : clnt(clnt), onfinish(onfinish), f(f), in(in), bl(bl), off(off), len(len), + : clnt(clnt), onfinish(onfinish), f(f), in(in), bl(bl), off(off), len(len), start_time(mono_clock_now()), denc(denc), read_start(read_start), read_len(read_len) {} - private: Client *clnt; Context *onfinish; @@ -1496,6 +1532,7 @@ private: bufferlist *bl; uint64_t off; uint64_t len; + utime_t start_time; FSCryptFDataDencRef denc; uint64_t read_start; @@ -1781,13 +1818,13 @@ private: void finish_fsync(int r); C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline, - bool is_file_write, utime_t start, Fh *f, Inode *in, + bool is_file_write, Fh *f, Inode *in, uint64_t fpos, int64_t req_ofs, uint64_t req_size, int64_t offset, uint64_t size, bool do_fsync, bool syncdataonly, bool encrypted) : clnt(clnt), onfinish(onfinish), - is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos), + is_file_write(is_file_write), start(mono_clock_now()), f(f), in(in), fpos(fpos), req_ofs(req_ofs), req_size(req_size), offset(offset), size(size), syncdataonly(syncdataonly), encrypted(encrypted) { @@ -1877,7 +1914,7 @@ private: C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish) : clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) { flush_tid = 0; - start = ceph_clock_now(); + start = mono_clock_now(); progress = 0; flush_wait = false; flush_completed = false; @@ -1922,6 +1959,7 @@ private: Client *client; Fh *f; + utime_t start_time = 0; }; /* @@ -2239,6 +2277,9 @@ private: // Cluster fsid fs_cluster_id_t fscid; + // subvolume metrics tracker + std::unique_ptr subvolume_tracker = nullptr; + // file handles, etc. interval_set free_fd_set; // unused fds std::unordered_map fd_map; diff --git a/src/common/Clock.h b/src/common/Clock.h index b47954ad1ce..44d54525938 100644 --- a/src/common/Clock.h +++ b/src/common/Clock.h @@ -33,4 +33,18 @@ static inline utime_t ceph_clock_now() return n; } +static inline utime_t mono_clock_now() +{ +#if defined(__linux__) + struct timespec tp; + clock_gettime(CLOCK_MONOTONIC, &tp); + utime_t n(tp); +#else + struct timeval tv; + gettimeofday(&tv, nullptr); + utime_t n(&tv); +#endif + return n; +} + #endif diff --git a/src/include/cephfs/metrics/Types.h b/src/include/cephfs/metrics/Types.h index ee9bc484948..7cb374103ed 100644 --- a/src/include/cephfs/metrics/Types.h +++ b/src/include/cephfs/metrics/Types.h @@ -13,6 +13,7 @@ #include "include/int_types.h" #include "include/stringify.h" #include "include/utime.h" +#include "include/fs_types.h" namespace ceph { class Formatter; } @@ -34,6 +35,7 @@ enum ClientMetricType { CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, CLIENT_METRIC_TYPE_COPY_IO_SIZES, + CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, }; inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) { switch(type) { @@ -88,6 +90,9 @@ inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) case ClientMetricType::CLIENT_METRIC_TYPE_COPY_IO_SIZES: os << "COPY_IO_SIZES"; break; + case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS: + os << "SUBVOLUME_METRICS"; + break; default: os << "(UNKNOWN:" << static_cast::type>(type) << ")"; break; @@ -594,17 +599,248 @@ struct UnknownPayload : public ClientMetricPayloadBase { }; typedef boost::variant ClientMetricPayload; + ReadLatencyPayload, + WriteLatencyPayload, + MetadataLatencyPayload, + DentryLeasePayload, + OpenedFilesPayload, + PinnedIcapsPayload, + OpenedInodesPayload, + ReadIoSizesPayload, + WriteIoSizesPayload, + CopyIoSizesPayload, + SubvolumeMetricsPayload, + UnknownPayload> ClientMetricPayload; + +/** + * @brief Struct to hold single IO metric + * To save the memory for clients with high number of subvolumes, the layout is as following: + * 64 bits of data: + * MSB - read or write (0/1) + * 32 bits for latency, in microsecs + * 31 bits for payload size, bytes + */ +struct SimpleIOMetric { + uint64_t packed_data; + + // is_write flag (1 bit) - MSB + static constexpr uint64_t OP_TYPE_BITS = 1; + static constexpr uint64_t OP_TYPE_SHIFT = 64 - OP_TYPE_BITS; // 63 + static constexpr uint64_t OP_TYPE_MASK = 1ULL << OP_TYPE_SHIFT; // 0x8000000000000000ULL + + // latency in microseconds (32 bits) + // directly after is_write bit + static constexpr uint64_t LATENCY_BITS = 32; + static constexpr uint64_t LATENCY_SHIFT = OP_TYPE_SHIFT - LATENCY_BITS; // 63 - 32 = 31 + static constexpr uint64_t LATENCY_MASK = ((1ULL << LATENCY_BITS) - 1) << LATENCY_SHIFT; + + // payload size (31 bits) + // Placed directly after latency, occupying the lowest 31 bits + static constexpr uint64_t PAYLOAD_SIZE_BITS = 31; + static constexpr uint64_t PAYLOAD_SIZE_SHIFT = 0; // Starts from the LSB (bit 0) + static constexpr uint64_t PAYLOAD_SIZE_MASK = (1ULL << PAYLOAD_SIZE_BITS) - 1; + + static constexpr uint32_t MAX_LATENCY_US = (1ULL << LATENCY_BITS) - 1; // 2^32 - 1 microseconds + static constexpr uint32_t MAX_PAYLOAD_SIZE = (1ULL << PAYLOAD_SIZE_BITS) - 1; // 2^31 - 1 bytes + + SimpleIOMetric(bool is_write, utime_t latency, uint32_t payload_size) : packed_data(0) { + if (is_write) { + packed_data |= OP_TYPE_MASK; + } + + uint64_t current_latency_usec = latency.usec(); + + // if less than 1 microsecond - set to 1 microsecond + if (current_latency_usec == 0) { + current_latency_usec = 1; + } + + // handle overflow + if (current_latency_usec > MAX_LATENCY_US) { + current_latency_usec = MAX_LATENCY_US; // Truncate if too large + } + packed_data |= (current_latency_usec << LATENCY_SHIFT) & LATENCY_MASK; + + + // payload_size (31 bits) + if (payload_size > MAX_PAYLOAD_SIZE) { + payload_size = MAX_PAYLOAD_SIZE; // Truncate if too large + } + packed_data |= (static_cast(payload_size) << PAYLOAD_SIZE_SHIFT) & PAYLOAD_SIZE_MASK; + } + + SimpleIOMetric() : packed_data(0) {} + + bool is_write() const { + return (packed_data & OP_TYPE_MASK) != 0; + } + + uint32_t get_latency_us() const { + return static_cast((packed_data & LATENCY_MASK) >> LATENCY_SHIFT); + } + + uint32_t get_payload_size() const { + return static_cast((packed_data & PAYLOAD_SIZE_MASK) >> PAYLOAD_SIZE_SHIFT); + } + + // --- Dump method --- + void dump(Formatter *f) const { + f->dump_string("op", is_write() ? "w" : "r"); + f->dump_unsigned("lat_us", get_latency_us()); + f->dump_unsigned("size", get_payload_size()); + } +}; + +/** + * brief holds result of the SimpleIOMetrics aggregation, for each subvolume, on the client + * the aggregation occurs just before sending metrics to the MDS + */ +struct AggregatedIOMetrics { + inodeno_t subvolume_id = 0; + uint32_t read_count = 0; + uint32_t write_count = 0; + uint64_t read_bytes = 0; + uint64_t write_bytes = 0; + uint64_t read_latency_us = 0; + uint64_t write_latency_us = 0; + uint64_t time_stamp = 0; // set on MDS + + void add(const SimpleIOMetric& m) { + auto lat = m.get_latency_us(); + if (m.is_write()) { + ++write_count; + write_bytes += m.get_payload_size(); + write_latency_us += lat; + } else { + ++read_count; + read_bytes += m.get_payload_size(); + read_latency_us += lat; + } + } + + uint64_t avg_read_throughput_Bps() const { + return read_latency_us > 0 ? (read_bytes * 1'000'000ULL) / read_latency_us : 0; + } + + uint64_t avg_write_throughput_Bps() const { + return write_latency_us > 0 ? (write_bytes * 1'000'000ULL) / write_latency_us : 0; + } + + uint64_t avg_read_latency_us() const { + return read_count ? read_latency_us / read_count : 0; + } + + uint64_t avg_write_latency_us() const { + return write_count ? write_latency_us / write_count : 0; + } + + double read_iops() const { + return read_latency_us > 0 ? static_cast(read_count) * 1e6 / read_latency_us : 0.0; + } + + double write_iops() const { + return write_latency_us > 0 ? static_cast(write_count) * 1e6 / write_latency_us : 0.0; + } + + void dump(Formatter *f) const { + f->dump_unsigned("subvolume_id", subvolume_id); + f->dump_unsigned("read_count", read_count); + f->dump_unsigned("read_bytes", read_bytes); + f->dump_unsigned("avg_read_latency_us", avg_read_latency_us()); + f->dump_unsigned("read_iops", read_iops()); + f->dump_float("avg_read_tp_Bps", avg_read_throughput_Bps()); + f->dump_unsigned("write_count", write_count); + f->dump_unsigned("write_bytes", write_bytes); + f->dump_unsigned("avg_write_latency_us", avg_write_latency_us()); + f->dump_float("write_iops", write_iops()); + f->dump_float("avg_write_tp_Bps", avg_write_throughput_Bps()); + f->dump_unsigned("time_stamp", time_stamp); + } + + friend std::ostream& operator<<(std::ostream& os, const AggregatedIOMetrics& m) { + os << "{subvolume_id=\"" << m.subvolume_id + << "\", read_count=" << m.read_count + << ", write_count=" << m.write_count + << ", read_bytes=" << m.read_bytes + << ", write_bytes=" << m.write_bytes + << ", read_latency_ns=" << m.read_latency_us + << ", write_latency_ns=" << m.write_latency_us + << ", time_stamp_ms=" << m.time_stamp + << "}"; + return os; + } + + void encode(bufferlist& bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(subvolume_id, bl); + encode(read_count, bl); + encode(write_count, bl); + encode(read_bytes, bl); + encode(write_bytes, bl); + encode(read_latency_us, bl); + encode(write_latency_us, bl); + encode(time_stamp, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& p) { + using ceph::decode; + DECODE_START(1, p); + decode(subvolume_id, p); + decode(read_count, p); + decode(write_count, p); + decode(read_bytes, p); + decode(write_bytes, p); + decode(read_latency_us, p); + decode(write_latency_us, p); + decode(time_stamp, p); + DECODE_FINISH(p); + } +}; + +struct SubvolumeMetricsPayload : public ClientMetricPayloadBase { + std::vector subvolume_metrics; + + SubvolumeMetricsPayload() : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS) { } + SubvolumeMetricsPayload(std::vector metrics) + : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS), subvolume_metrics(std::move(metrics)) {} + + void encode(bufferlist &bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(subvolume_metrics.size(), bl); + for(auto const& m : subvolume_metrics) { + m.encode(bl); + } + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &iter) { + using ceph::decode; + DECODE_START(1, iter) ; + size_t size = 0; + decode(size, iter); + subvolume_metrics.clear(); + subvolume_metrics.reserve(size); + for (size_t i = 0; i < size; ++i) { + subvolume_metrics.emplace_back(); + subvolume_metrics.back().decode(iter); + } + DECODE_FINISH(iter); + } + + void dump(Formatter *f) const { + f->open_array_section("metrics"); + for(auto const& m : subvolume_metrics) + m.dump(f); + f->close_section(); + } + + void print(std::ostream *out) const { + *out << "metrics_count: " << subvolume_metrics.size(); + } +}; // metric update message sent by clients struct ClientMetricMessage { @@ -721,6 +957,9 @@ public: case ClientMetricType::CLIENT_METRIC_TYPE_COPY_IO_SIZES: payload = CopyIoSizesPayload(); break; + case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS: + payload = SubvolumeMetricsPayload(); + break; default: payload = UnknownPayload(static_cast(metric_type)); break; diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 40b023f090d..58abc9f4713 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -4182,7 +4182,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, * note: encoding matches MClientReply::InodeStat */ if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) { - ENCODE_START(8, 1, bl); + ENCODE_START(9, 1, bl); encode(std::tuple{ oi->ino, snapid, @@ -4235,6 +4235,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, encode(file_i->fscrypt_auth, bl); encode(file_i->fscrypt_file, bl); encode_nohead(optmdbl, bl); + encode(get_subvolume_id(), bl); // encode inodestat ENCODE_FINISH(bl); } @@ -5443,6 +5444,11 @@ int64_t CInode::get_backtrace_pool() const } } +inodeno_t CInode::get_subvolume_id() const { + auto snapr = find_snaprealm(); + return snapr ? snapr->get_subvolume_ino() : inodeno_t(0); +} + void CInode::queue_export_pin(mds_rank_t export_pin) { if (state_test(CInode::STATE_QUEUEDEXPORTPIN)) diff --git a/src/mds/CInode.h b/src/mds/CInode.h index 730d596bc2b..1a10abe891d 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -1131,7 +1131,7 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter=0 */ int64_t get_backtrace_pool() const; - + inodeno_t get_subvolume_id() const; protected: ceph_lock_state_t *get_fcntl_lock_state() { if (!fcntl_locks) diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index e2d9936ab16..fadb11190b9 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -2491,6 +2491,13 @@ Capability* Locker::issue_new_caps(CInode *in, return cap; } +void Locker::maybe_set_subvolume_id(const CInode* inode, ref_t& ack) { + if (ack && inode) { + ack->subvolume_id = inode->get_subvolume_id(); + dout(10) << "setting subvolume id " << ack->subvolume_id << " for inode " << inode->ino().val << dendl; + } +} + void Locker::issue_caps_set(set& inset) { for (set::iterator p = inset.begin(); p != inset.end(); ++p) @@ -3398,6 +3405,7 @@ void Locker::handle_client_caps(const cref_t &m) if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty); } + CInode *head_in = mdcache->get_inode(m->get_ino()); if (m->get_client_tid() > 0 && session && session->have_completed_flush(m->get_client_tid())) { dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() @@ -3406,9 +3414,11 @@ void Locker::handle_client_caps(const cref_t &m) if (op == CEPH_CAP_OP_FLUSHSNAP) { if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack); ack = make_message(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, 0, mds->get_osd_epoch_barrier()); + maybe_set_subvolume_id(head_in, ack); } else { if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack); ack = make_message(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, 0, mds->get_osd_epoch_barrier()); + maybe_set_subvolume_id(head_in, ack); } ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); @@ -3445,7 +3455,6 @@ void Locker::handle_client_caps(const cref_t &m) } } - CInode *head_in = mdcache->get_inode(m->get_ino()); if (!head_in) { if (mds->is_clientreplay()) { dout(7) << "handle_client_caps on unknown ino " << m->get_ino() @@ -3534,6 +3543,7 @@ void Locker::handle_client_caps(const cref_t &m) ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + maybe_set_subvolume_id(head_in, ack); } if (in == head_in || @@ -3622,6 +3632,7 @@ void Locker::handle_client_caps(const cref_t &m) m->get_caps(), 0, dirty, 0, cap->get_last_issue(), mds->get_osd_epoch_barrier()); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + maybe_set_subvolume_id(head_in, ack); } // filter wanted based on what we could ever give out (given auth/replica status) diff --git a/src/mds/Locker.h b/src/mds/Locker.h index bce080d12db..098e3a609ad 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -261,6 +261,7 @@ protected: void file_update_finish(CInode *in, MutationRef& mut, unsigned flags, client_t client, const ref_t &ack); + void maybe_set_subvolume_id(const CInode* head_in, ref_t& ack); xlist updated_scatterlocks; // Maintain a global list to quickly find if any caps are late revoking diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 59f695a8c04..01f65e79b92 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -4056,6 +4056,15 @@ epoch_t MDSRank::get_osd_epoch() const return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)); } +std::string MDSRank::get_path(inodeno_t ino) { + std::lock_guard locker(mds_lock); + CInode* inode = mdcache->get_inode(ino); + if (!inode) return {}; + std::string res; + inode->make_path_string(res); + return res; +} + std::vector MDSRankDispatcher::get_tracked_keys() const noexcept { diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index bc7f5c96282..775a26c2bde 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -390,6 +390,8 @@ class MDSRank { return inject_journal_corrupt_dentry_first; } + std::string get_path(inodeno_t ino); + // Reference to global MDS::mds_lock, so that users of MDSRank don't // carry around references to the outer MDS, and we can substitute // a separate lock here in future potentially. diff --git a/src/mds/MetricsHandler.cc b/src/mds/MetricsHandler.cc index dfe2557fb84..2d7d69819e1 100644 --- a/src/mds/MetricsHandler.cc +++ b/src/mds/MetricsHandler.cc @@ -3,9 +3,10 @@ #include "MetricsHandler.h" +#include + #include "common/debug.h" #include "common/errno.h" -#include "include/cephfs/metrics/Types.h" #include "messages/MClientMetrics.h" #include "messages/MMDSMetrics.h" @@ -340,6 +341,55 @@ void MetricsHandler::handle_payload(Session *session, const CopyIoSizesPayload & metrics.copy_io_sizes_metric.updated = true; } +void MetricsHandler::handle_payload(Session* session, const SubvolumeMetricsPayload& payload, std::unique_lock& lk) { + dout(20) << ": type=" << payload.get_type() << ", session=" << session + << " , subv_metrics count=" << payload.subvolume_metrics.size() << dendl; + + ceph_assert(lk.owns_lock()); // caller must hold the lock + + std::vector resolved_paths; + resolved_paths.reserve(payload.subvolume_metrics.size()); + + // RAII guard: unlock on construction, re-lock on destruction (even on exceptions) + struct UnlockGuard { + std::unique_lock &lk; + explicit UnlockGuard(std::unique_lock& l) : lk(l) { lk.unlock(); } + ~UnlockGuard() noexcept { + if (!lk.owns_lock()) { + try { lk.lock(); } + catch (...) { + dout(0) << "failed to re-lock in UnlockGuard dtor" << dendl; + } + } + } + } unlock_guard{lk}; + + // unlocked: resolve paths, no contention with mds lock + for (const auto& metric : payload.subvolume_metrics) { + std::string path = mds->get_path(metric.subvolume_id); + if (path.empty()) { + dout(10) << " path not found for " << metric.subvolume_id << dendl; + } + resolved_paths.emplace_back(std::move(path)); + } + + // locked again (via UnlockGuard dtor): update metrics map + const auto now_ms = static_cast( + std::chrono::duration_cast( +std::chrono::steady_clock::now().time_since_epoch()).count()); + + // Keep index pairing but avoid double map lookup + for (size_t i = 0; i < resolved_paths.size(); ++i) { + const auto& path = resolved_paths[i]; + if (path.empty()) continue; + + auto& vec = subvolume_metrics_map[path]; + + dout(20) << " accumulating subv_metric " << payload.subvolume_metrics[i] << dendl; + vec.emplace_back(std::move(payload.subvolume_metrics[i])); + vec.back().time_stamp = now_ms; + } +} void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) { dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl; @@ -351,7 +401,7 @@ void MetricsHandler::handle_client_metrics(const cref_t &m) { return; } - std::scoped_lock locker(lock); + std::unique_lock locker(lock); Session *session = mds->get_session(m); dout(20) << ": session=" << session << dendl; @@ -362,7 +412,14 @@ void MetricsHandler::handle_client_metrics(const cref_t &m) { } for (auto &metric : m->updates) { - boost::apply_visitor(HandlePayloadVisitor(this, session), metric.payload); + // Special handling for SubvolumeMetricsPayload to avoid lock contention + if (auto* subv_payload = std::get_if(&metric.payload)) { + // this handles the subvolume metrics payload without acquiring the mds lock + // should not call the visitor pattern here + handle_payload(session, *subv_payload, locker); + } else { + std::visit(HandlePayloadVisitor(this, session), metric.payload); + } } } @@ -429,14 +486,65 @@ void MetricsHandler::update_rank0() { } } + // subvolume metrics, reserve 100 entries per subvolume ? good enough? + metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100); + for (auto &[path, aggregated_metrics] : subvolume_metrics_map) { + metrics_message.subvolume_metrics.emplace_back(); + aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back()); + } + // if we need to show local MDS metrics, we need to save a last copy... + subvolume_metrics_map.clear(); + // only start incrementing when its kicked via set_next_seq() if (next_seq != 0) { ++last_updated_seq; } - dout(20) << ": sending metric updates for " << update_client_metrics_map.size() - << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number " - << next_seq << ", last updated sequence number " << last_updated_seq << dendl; + dout(20) << ": sending " << metrics_message.subvolume_metrics.size() << " subv_metrics to aggregator" + << dendl; mds->send_message_mds(make_message(std::move(metrics_message)), *addr_rank0); } + +void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path, + const std::vector& metrics_list, SubvolumeMetric &res) { + dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl; + res.subvolume_path = subvolume_path; + + uint64_t weighted_read_latency_sum = 0; + uint64_t weighted_write_latency_sum = 0; + + res.read_ops = 0; + res.write_ops = 0; + res.read_size = 0; + res.write_size = 0; + res.avg_read_latency = 0; + res.avg_write_latency = 0; + res.time_stamp = 0; + + for (const auto& m : metrics_list) { + res.read_ops += m.read_count; + res.write_ops += m.write_count; + res.read_size += m.read_bytes; + res.write_size += m.write_bytes; + // we want to have more metrics in the sliding window (on the aggregator), + // so we set the latest timestamp of all received metrics + res.time_stamp = std::max(res.time_stamp, m.time_stamp); + + if (m.read_count > 0) { + weighted_read_latency_sum += m.read_latency_us * m.read_count; + } + + if (m.write_count > 0) { + weighted_write_latency_sum += m.write_latency_us * m.write_count; + } + } + + // normalize latencies + res.avg_read_latency = (res.read_ops > 0) + ? (weighted_read_latency_sum / res.read_ops) + : 0; + res.avg_write_latency = (res.write_ops > 0) + ? (weighted_write_latency_sum / res.write_ops) + : 0; +} diff --git a/src/mds/MetricsHandler.h b/src/mds/MetricsHandler.h index b84d48e583e..b9f551b1b7f 100644 --- a/src/mds/MetricsHandler.h +++ b/src/mds/MetricsHandler.h @@ -5,6 +5,8 @@ #define CEPH_MDS_METRICS_HANDLER_H #include +#include +#include #include #include @@ -12,6 +14,7 @@ #include "common/ceph_mutex.h" #include "MDSPerfMetricTypes.h" +#include "include/cephfs/metrics/Types.h" #include #include @@ -27,7 +30,9 @@ struct OpenedInodesPayload; struct ReadIoSizesPayload; struct WriteIoSizesPayload; struct CopyIoSizesPayload; +struct SubvolumeMetricsPayload; struct UnknownPayload; +struct AggregatedIOMetrics; class MClientMetrics; class MDSMap; class MDSRank; @@ -72,8 +77,17 @@ private: inline void operator()(const ClientMetricPayload &payload) const { metrics_handler->handle_payload(session, payload); } + + // Specialization for SubvolumeMetricsPayload - should not be called + // as it's handled specially in handle_client_metrics + // just for the compiler to be happy with visitor pattern + inline void operator()(const SubvolumeMetricsPayload &) const { + ceph_abort_msg("SubvolumeMetricsPayload should be handled specially"); + } }; + std::unique_ptr create_subv_perf_counter(const std::string& subv_name); + MDSRank *mds; // drop this lock when calling ->send_message_mds() else mds might // deadlock @@ -90,8 +104,10 @@ private: std::thread updater; std::map> client_metrics_map; - - // address of rank 0 mds, so that the message can be sent without + // maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance + std::unordered_map> subvolume_metrics_map; + uint64_t subv_metrics_tracker_window_time_sec = 300; + // address of rank 0 mds, so that the message can be sent withoutå // acquiring mds_lock. misdirected messages to rank 0 are taken // care of by rank 0. boost::optional addr_rank0; @@ -109,6 +125,7 @@ private: void handle_payload(Session *session, const ReadIoSizesPayload &payload); void handle_payload(Session *session, const WriteIoSizesPayload &payload); void handle_payload(Session *session, const CopyIoSizesPayload &payload); + void handle_payload(Session *session, const SubvolumeMetricsPayload &payload, std::unique_lock &lock_guard); void handle_payload(Session *session, const UnknownPayload &payload); void set_next_seq(version_t seq); @@ -118,6 +135,9 @@ private: void handle_mds_ping(const cref_t &m); void update_rank0(); + + void aggregate_subvolume_metrics(const std::string& subvolume_path, + const std::vector& metrics_list, SubvolumeMetric &res); }; #endif // CEPH_MDS_METRICS_HANDLER_H diff --git a/src/mds/cephfs_features.h b/src/mds/cephfs_features.h index 126a3be58af..861cc3dcb44 100644 --- a/src/mds/cephfs_features.h +++ b/src/mds/cephfs_features.h @@ -96,6 +96,8 @@ namespace ceph { CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, \ CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \ CLIENT_METRIC_TYPE_COPY_IO_SIZES, \ + CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \ + CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, \ } #define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc index 5be3506bc3d..4261416c641 100644 --- a/src/mds/mdstypes.cc +++ b/src/mds/mdstypes.cc @@ -1193,3 +1193,26 @@ void BlockDiff::print(ostream& out) const { out << "{rval: " << rval << ", scan_idx=" << scan_idx << ", blocks=" << blocks << "}"; } + +void SubvolumeMetric::dump(Formatter *f) const { + f->dump_string("subvolume_path", subvolume_path); + f->dump_unsigned("read_ops", read_ops); + f->dump_unsigned("write_ops", write_ops); + f->dump_unsigned("read_size", read_size); + f->dump_unsigned("write_size", write_size); + f->dump_unsigned("avg_read_latency", avg_read_latency); + f->dump_unsigned("avg_write_latency", avg_write_latency); + f->dump_unsigned("time_window_sec", time_stamp); +} + +std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) { + os << "{subv_path=" << m.subvolume_path + << ", read_ops=" << m.read_ops + << ", write_ops=" << m.write_ops + << ", read_size=" << m.read_size + << ", write_size=" << m.write_size + << ", avg_read_lat=" << m.avg_read_latency + << ", avg_write_lat=" << m.avg_write_latency + << ", time_window_sec=" << m.time_stamp << "}"; + return os; +} diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 195fe56c24f..60c3b360f94 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "common/DecayCounter.h" #include "common/entity_name.h" @@ -984,4 +985,37 @@ struct BlockDiff { }; WRITE_CLASS_ENCODER(BlockDiff); +/** + * + * @brief Represents aggregated metric per subvolume + * This aggregation is a result of aggregating multiple + * AggregatedIOMetric instances from various clients + * (AggregatedIOMetric is created on the client by aggregating multiple SimpleIOMetric instances) + */ +struct SubvolumeMetric { + std::string subvolume_path; + uint64_t read_ops = 0; + uint64_t write_ops = 0; + uint64_t read_size = 0; + uint64_t write_size = 0; + uint64_t avg_read_latency = 0; + uint64_t avg_write_latency = 0; + uint64_t time_stamp = 0; + + DENC(SubvolumeMetric, v, p) { + DENC_START(1, 1, p); + denc(v.subvolume_path, p); + denc(v.read_ops, p); + denc(v.write_ops, p); + denc(v.read_size, p); + denc(v.write_size, p); + denc(v.avg_read_latency, p); + denc(v.avg_write_latency, p); + denc(v.time_stamp, p); + DENC_FINISH(p); + } + + void dump(Formatter *f) const; + friend std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m); +}; #endif diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h index d4cedb601be..498f668afd5 100644 --- a/src/messages/MClientCaps.h +++ b/src/messages/MClientCaps.h @@ -22,7 +22,7 @@ class MClientCaps final : public SafeMessage { private: - static constexpr int HEAD_VERSION = 12; + static constexpr int HEAD_VERSION = 13; static constexpr int COMPAT_VERSION = 1; public: @@ -61,6 +61,7 @@ private: std::vector fscrypt_auth; std::vector fscrypt_file; + uint64_t subvolume_id = 0; bool is_fscrypt_enabled() const { return !!fscrypt_auth.size(); @@ -296,6 +297,9 @@ public: decode(fscrypt_auth, p); decode(fscrypt_file, p); } + if (header.version >= 13) { + decode(subvolume_id, p); + } } void encode_payload(uint64_t features) override { using ceph::encode; @@ -366,6 +370,7 @@ public: encode(nsubdirs, payload); encode(fscrypt_auth, payload); encode(fscrypt_file, payload); + encode(subvolume_id, payload); } private: template diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h index 0b8a7731c00..d86e8d6c016 100644 --- a/src/messages/MClientReply.h +++ b/src/messages/MClientReply.h @@ -152,6 +152,7 @@ struct InodeStat { std::vector fscrypt_file; optmetadata_multiton optmetadata; + inodeno_t subvolume_id; public: InodeStat() {} @@ -166,7 +167,7 @@ struct InodeStat { void decode(ceph::buffer::list::const_iterator &p, const uint64_t features) { using ceph::decode; if (features == (uint64_t)-1) { - DECODE_START(8, p); + DECODE_START(9, p); decode(vino.ino, p); decode(vino.snapid, p); decode(rdev, p); @@ -232,6 +233,9 @@ struct InodeStat { if (struct_v >= 8) { decode(optmetadata, p); } + if (struct_v >= 9) { + decode(subvolume_id, p); + } DECODE_FINISH(p); } else {