From c2085e9bb77e4eec7feef31b189919d715788057 Mon Sep 17 00:00:00 2001 From: Igor Golikov Date: Thu, 10 Jul 2025 10:17:36 +0000 Subject: [PATCH] client,mds: add support for subvolume level metrics Add support for client side metrics collection using SimpleIOMetric struct and aggregation using AggregatedIOMetrics struct, Client holds SimpleIOMetrics vector per each subvolume it recognized (via caps/metadata messages), aggregates them into the AggregatedIOMetric struct, and sends periodically to the MDS, along with regulat client metrics. MDS holds map of subvolume_path -> vector and sends it periodically to rank0, for further aggregation and exposure. Fixes: https://tracker.ceph.com/issues/68929, https://tracker.ceph.com/issues/68930 Signed-off-by: Igor Golikov --- src/client/Client.cc | 189 +++++++++++++++++++-- src/client/Client.h | 57 ++++++- src/include/cephfs/metrics/Types.h | 259 +++++++++++++++++++++++++++-- src/mds/CInode.cc | 8 +- src/mds/CInode.h | 2 +- src/mds/Locker.cc | 13 +- src/mds/Locker.h | 1 + src/mds/MDSRank.cc | 8 + src/mds/MDSRank.h | 2 + src/mds/MetricsHandler.cc | 84 +++++++++- src/mds/MetricsHandler.h | 16 +- src/mds/cephfs_features.h | 3 +- src/mds/mdstypes.cc | 23 +++ src/mds/mdstypes.h | 34 ++++ src/messages/MClientCaps.h | 7 +- src/messages/MClientReply.h | 6 +- 16 files changed, 668 insertions(+), 44 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 8f081420735..6d2511c1a6d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -247,6 +247,8 @@ int Client::CommandHook::call( m_client->_kick_stale_sessions(); else if (command == "status") m_client->dump_status(f); + else if (command == "dump_subvolume_metrics") + m_client->dump_subvolume_metrics(f); else ceph_abort_msg("bad command registered"); } @@ -400,7 +402,8 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_) async_ino_releasor(m->cct), objecter_finisher(m->cct), m_command_hook(this), - fscid(0) + fscid(0), + subvolume_tracker{std::make_unique(cct, whoami)} { /* We only use the locale for normalization/case folding. That is unaffected * by the locale but required by the API. @@ -626,6 +629,10 @@ void Client::dump_status(Formatter *f) } } +void Client::dump_subvolume_metrics(Formatter* f) { + subvolume_tracker->dump(f); +} + void Client::_pre_init() { timer.init(); @@ -715,6 +722,13 @@ void Client::_finish_init() lderr(cct) << "error registering admin socket command: " << cpp_strerror(-ret) << dendl; } + ret = admin_socket->register_command("dump_subvolume_metrics_aggr", + &m_command_hook, + "dump aggregated subvolume metrics"); + if (ret < 0) { + lderr(cct) << "error registering admin socket command: " + << cpp_strerror(-ret) << dendl; + } } void Client::shutdown() @@ -855,6 +869,9 @@ void Client::update_io_stat_write(utime_t latency) { logger->set(l_c_wr_ops, nr_write_request); } +void Client::update_subvolume_metric(bool write, utime_t start, utime_t end, uint64_t size, Inode *in) { +} + // =================== // metadata cache stuff @@ -1740,6 +1757,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session) in = add_update_inode(&ist, request->sent_stamp, session, request->perms); + if (ist.subvolume_id) { + ldout(cct, 20) << __func__ << " subv_metric adding " << in->ino << "-" << ist.subvolume_id << dendl; + subvolume_tracker->add_inode(in->ino, ist.subvolume_id); + } } Inode *diri = NULL; @@ -3718,14 +3739,14 @@ private: Client *client; InodeRef inode; public: - C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) { } + C_Client_FlushComplete(Client *c, Inode *in) : client(c), inode(in) {} void finish(int r) override { ceph_assert(ceph_mutex_is_locked_by_me(client->client_lock)); if (r != 0) { client_t const whoami = client->whoami; // For the benefit of ldout prefix ldout(client->cct, 1) << "I/O error from flush on inode " << inode - << " 0x" << std::hex << inode->ino << std::dec - << ": " << r << "(" << cpp_strerror(r) << ")" << dendl; + << " 0x" << std::hex << inode->ino << std::dec + << ": " << r << "(" << cpp_strerror(r) << ")" << dendl; inode->set_async_err(r); } } @@ -5528,6 +5549,12 @@ void Client::handle_caps(const MConstRef& m) got_mds_push(session.get()); + // check whether the current inode is under subvolume for metrics collection + if (m->subvolume_id > 0) { + ldout(cct, 10) << __func__ << " adding " << m->get_ino() << " to subvolume tracker " << m->subvolume_id << dendl; + subvolume_tracker->add_inode(m->get_ino(), m->subvolume_id); + } + bool do_cap_release = false; Inode *in; vinodeno_t vino(m->get_ino(), CEPH_NOSNAP); @@ -7200,8 +7227,13 @@ void Client::flush_cap_releases() for (auto &p : mds_sessions) { auto session = p.second; if (session->release && mdsmap->is_clientreplay_or_active_or_stopping( - p.first)) { + p.first)) { nr_caps += session->release->caps.size(); + for (const auto &cap: session->release->caps) { + ldout(cct, 10) << __func__ << " removing " << static_cast(cap.ino) << + " from subvolume tracker" << dendl; + subvolume_tracker->remove_inode(static_cast(cap.ino)); + } if (cct->_conf->client_inject_release_failure) { ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl; } else { @@ -7437,6 +7469,18 @@ void Client::collect_and_send_global_metrics() { message.push_back(metric); } + // subvolume metrics + if (_collect_and_send_global_metrics || + session->mds_metric_flags.test(CLIENT_METRIC_TYPE_SUBVOLUME_METRICS)) { + auto metrics = subvolume_tracker->aggregate(true); + if (!metrics.empty()) { + for (auto m: metrics) + ldout(cct, 20) << " sending subv_metric " << m << dendl; + metric = ClientMetricMessage(SubvolumeMetricsPayload(metrics)); + message.push_back(metric); + } + } + session->con->send_message2(make_message(std::move(message))); } @@ -10930,6 +10974,7 @@ void Client::C_Read_Finisher::finish_io(int r) lat = ceph_clock_now(); lat -= start; + clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, static_cast(r)}); ++clnt->nr_read_request; clnt->update_io_stat_read(lat); } @@ -11016,7 +11061,7 @@ void Client::C_Read_Sync_NonBlocking::finish(int r) success: r = read; - + clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric(false, ceph_clock_now() - start_time, r)); error: onfinish->complete(r); @@ -11175,7 +11220,7 @@ retry: C_Read_Sync_NonBlocking *crsa = new C_Read_Sync_NonBlocking(this, iofinish.release(), f, in, f->pos, - offset, size, bl, filer.get(), have); + offset, size, bl, filer.get(), have, ceph_clock_now()); crf.release(); // Now make first attempt at performing _read_sync @@ -11225,7 +11270,7 @@ success: lat = ceph_clock_now(); lat -= start; - + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, bl->length()}); ++nr_read_request; update_io_stat_read(lat); @@ -11240,8 +11285,8 @@ done: return rc; } -Client::C_Readahead::C_Readahead(Client *c, Fh *f) : - client(c), f(f) { +Client::C_Readahead::C_Readahead(Client *c, Fh *f, utime_t start) : + client(c), f(f), start_time(start){ f->get(); f->readahead.inc_pending(); } @@ -11256,17 +11301,18 @@ void Client::C_Readahead::finish(int r) { client->put_cap_ref(f->inode.get(), CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE); if (r > 0) { client->update_read_io_size(r); + client->subvolume_tracker->add_metric(f->inode->ino, SimpleIOMetric(false, ceph_clock_now()-start_time, r)); } } -void Client::do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len) +void Client::do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len, utime_t start_time) { if(f->readahead.get_min_readahead_size() > 0) { pair readahead_extent = f->readahead.update(off, len, in->size); if (readahead_extent.second > 0) { ldout(cct, 20) << "readahead " << readahead_extent.first << "~" << readahead_extent.second << " (caller wants " << off << "~" << len << ")" << dendl; - Context *onfinish2 = new C_Readahead(this, f); + Context *onfinish2 = new C_Readahead(this, f, start_time); int r2 = objectcacher->file_read(&in->oset, &in->layout, in->snapid, readahead_extent.first, readahead_extent.second, NULL, 0, onfinish2); @@ -11285,7 +11331,7 @@ void Client::C_Read_Async_Finisher::finish(int r) { // Do read ahead as long as we aren't completing with 0 bytes if (r != 0) - clnt->do_readahead(f, in, off, len); + clnt->do_readahead(f, in, off, len, start_time); onfinish->complete(r); } @@ -11307,7 +11353,7 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, if (onfinish != nullptr) { io_finish.reset(new C_Read_Async_Finisher(this, onfinish, f, in, - f->pos, off, len)); + f->pos, off, len, ceph_clock_now())); } // trim read based on file size? @@ -11348,6 +11394,7 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, io_finish.reset(io_finish_cond); } + auto start_time = ceph_clock_now(); r = objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, len, bl, 0, io_finish.get()); @@ -11373,11 +11420,12 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, client_lock.lock(); put_cap_ref(in, CEPH_CAP_FILE_CACHE); update_read_io_size(bl->length()); + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, ceph_clock_now() - start_time, bl->length()}); } else { put_cap_ref(in, CEPH_CAP_FILE_CACHE); } - do_readahead(f, in, off, len); + do_readahead(f, in, off, len, ceph_clock_now()); return r; } @@ -11585,6 +11633,7 @@ int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos, lat = ceph_clock_now(); lat -= start; + subvolume_tracker->add_metric(in->ino, SimpleIOMetric{true, lat, static_cast(size)}); ++nr_write_request; update_io_stat_write(lat); @@ -11738,7 +11787,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, (uint64_t)(offset+size) > in->size ) { //exceeds filesize return -EFBIG; } - //ldout(cct, 7) << "write fh " << fh << " size " << size << " offset " << offset << dendl; + ldout(cct, 7) << "_write fh " << f << " size " << size << " offset " << offset << dendl; if (objecter->osdmap_pool_full(in->layout.pool_id)) { return -ENOSPC; @@ -11869,6 +11918,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, get_cap_ref(in, CEPH_CAP_FILE_BUFFER); // async, caching, non-blocking. + ldout(cct, 10) << " _write_oc " << dendl; r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(), offset, size, bl, ceph::real_clock::now(), @@ -11905,6 +11955,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, } // allow caller to wait on onfinish... + ldout(cct, 10) << " _write_oc_1" << dendl; return 0; } @@ -11939,6 +11990,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, get_cap_ref(in, CEPH_CAP_FILE_BUFFER); + ldout(cct, 10) << " _write_filer" << dendl; filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(), offset, size, bl, ceph::real_clock::now(), 0, in->truncate_size, in->truncate_seq, @@ -11953,6 +12005,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, cwf.release(); // allow caller to wait on onfinish... + ldout(cct, 10) << " _write_filer_2" << dendl; return 0; } @@ -11968,6 +12021,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl, success: // do not get here if non-blocking caller (onfinish != nullptr) + ldout(cct, 10) << " _write_filer_succeess" << dendl; r = _write_success(f, start, fpos, offset, size, in); if (r >= 0 && do_fsync) { @@ -17616,6 +17670,109 @@ mds_rank_t Client::_get_random_up_mds() const return *p; } +// --- subvolume metrics tracking --- // +SubvolumeTracker::SubvolumeTracker(CephContext *ct, client_t id) : cct(ct), whoami(id) {} + +void SubvolumeTracker::dump(Formatter *f) { + auto current_metrics = aggregate(false); + f->open_array_section("current_metrics"); + for (auto &met : current_metrics) { + f->dump_object("", met); + } + f->close_section(); + + std::vector temp; + { + std::shared_lock l(metrics_lock); + temp = last_subvolume_metrics; + } + f->open_array_section("last_metrics"); + for (auto &val : temp) { + f->dump_object("", val); + } + f->close_section(); +} + +void SubvolumeTracker::add_inode(inodeno_t inode, inodeno_t subvol) { + ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << subvol << dendl; + std::unique_lock l(metrics_lock); + if (inode_subvolume.contains(inode)) { + ldout(cct, 10) << __func__ << " " << inode << "-" << subvol << " subv_metric inode exists" << dendl; + return; + } + + auto [it, inserted] = subvolume_metrics.try_emplace(subvol); + if (inserted) { + it->second.metrics.reserve(initial_reserve); + } + + inode_subvolume[inode] = subvol; + ldout(cct, 10) << __func__ << " add " << inode << "-" << subvol << dendl; +} + +void SubvolumeTracker::remove_inode(inodeno_t inode) { + ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << dendl; + std::unique_lock l(metrics_lock); + auto it = inode_subvolume.find(inode); + if (it == inode_subvolume.end()) { + return; + } + + inodeno_t subvol = it->second; + inode_subvolume.erase(it); + + auto se = subvolume_metrics.find(subvol); + if (se == subvolume_metrics.end()) { + ldout(cct, 20) << __func__ << " subv_metric not found " << inode << "-" << subvol << dendl; + return; + } + ldout(cct, 20) << __func__ << " subv_metric end " << inode << dendl; +} + +void SubvolumeTracker::add_metric(inodeno_t inode, SimpleIOMetric&& metric) { + ldout(cct, 10) << __func__ << " " << inode << dendl; + std::shared_lock l(metrics_lock); + auto it = inode_subvolume.find(inode); + if (it == inode_subvolume.end()) { + return; + } + ldout(cct, 10) << __func__ << " adding subv_metric for " << inode << dendl; + auto& entry = subvolume_metrics[it->second]; // creates entry if not present + l.unlock(); // release before modifying + std::unique_lock l2(metrics_lock); + entry.metrics.emplace_back(metric); +} + +std::vector SubvolumeTracker::aggregate(bool clean) { + ldout(cct, 20) << __func__ << dendl; + std::vector res; + res.reserve(subvolume_metrics.size()); + std::unordered_map local_map; + { + std::unique_lock l(metrics_lock); + if (clean) + subvolume_metrics.swap(local_map); + else + local_map = subvolume_metrics; + } + + for (const auto& [subvol_id, entry] : local_map) { + if (entry.metrics.empty()) + continue; + auto& agg = res.emplace_back(); + agg.subvolume_id = subvol_id; + for (const auto& metric : entry.metrics) { + agg.add(metric); + } + } + + std::unique_lock l(metrics_lock); + last_subvolume_metrics = res; + + ldout(cct, 20) << __func__ << " res size " << res.size() << dendl; + return res; +} +// --- subvolume metrics tracking --- // StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc, boost::asio::io_context& ictx) diff --git a/src/client/Client.h b/src/client/Client.h index 52b23288ea8..4fd75690f82 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -53,6 +53,7 @@ #include #include #include +#include using std::set; using std::map; @@ -258,6 +259,42 @@ struct dir_result_t { int fd; // fd attached using fdopendir (-1 if none) }; +/** + * @brief The subvolume metric tracker. + * + * Maps subvolume_id(which is in fact inode id) to the vector of SimpleIOMetric instances. + * Each simple metric records the single IO operation on the client. + * On clients metric message to the MDS, client will aggregate all simple metrics for each subvolume + * into the AggregatedIOMetric struct and clean the current metrics list. + * TODO: limit the cap for each subvolume? in the case client sends metrics to the MDS not so often? + */ +class SubvolumeTracker { +public: + struct SubvolumeEntry { + std::vector metrics; + + void dump(Formatter *f) const { + for(auto const& m : metrics) + f->dump_object("", m); + } + }; + + SubvolumeTracker(CephContext *ct, client_t id); + void dump(Formatter *f); + void add_inode(inodeno_t inode, inodeno_t subvol); + void remove_inode(inodeno_t inode); + void add_metric(inodeno_t inode, SimpleIOMetric&& metric); + std::vector aggregate(bool clean); +protected: + std::vector last_subvolume_metrics; + std::unordered_map subvolume_metrics; + std::unordered_map inode_subvolume; + size_t initial_reserve = 1000; + CephContext *cct = nullptr; + client_t whoami; + std::shared_mutex metrics_lock; +}; + class Client : public Dispatcher, public md_config_obs_t { public: friend class C_Block_Sync; // Calls block map and protected helpers @@ -1175,6 +1212,7 @@ protected: void force_session_readonly(MetaSession *s); void dump_status(Formatter *f); // debug + void dump_subvolume_metrics(Formatter* f); Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override; @@ -1404,9 +1442,9 @@ private: public: C_Read_Sync_NonBlocking(Client *clnt, Context *onfinish, Fh *f, Inode *in, uint64_t fpos, uint64_t off, uint64_t len, - bufferlist *bl, Filer *filer, int have_caps) + bufferlist *bl, Filer *filer, int have_caps, utime_t start) : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl), - filer(filer), have_caps(have_caps) + filer(filer), have_caps(have_caps), start_time(start) { left = len; wanted = len; @@ -1430,6 +1468,7 @@ private: bufferlist tbl; Filer *filer; int have_caps; + utime_t start_time; int read; uint64_t pos; bool fini; @@ -1447,8 +1486,8 @@ private: class C_Read_Async_Finisher : public Context { public: C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in, - uint64_t fpos, uint64_t off, uint64_t len) - : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {} + uint64_t fpos, uint64_t off, uint64_t len, utime_t start) + : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), start_time(start) {} private: Client *clnt; @@ -1457,6 +1496,7 @@ private: Inode *in; uint64_t off; uint64_t len; + utime_t start_time; void finish(int r) override; }; @@ -1606,12 +1646,13 @@ private: }; struct C_Readahead : public Context { - C_Readahead(Client *c, Fh *f); + C_Readahead(Client *c, Fh *f, utime_t start); ~C_Readahead() override; void finish(int r) override; Client *client; Fh *f; + utime_t start_time = 0; }; /* @@ -1755,7 +1796,7 @@ private: loff_t _lseek(Fh *fh, loff_t offset, int whence); int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl, Context *onfinish = nullptr); - void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len); + void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len, utime_t start_time); int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos, int64_t offset, uint64_t size, Inode *in); int64_t _write(Fh *fh, int64_t offset, uint64_t size, bufferlist bl, @@ -1869,6 +1910,7 @@ private: void update_io_stat_metadata(utime_t latency); void update_io_stat_read(utime_t latency); void update_io_stat_write(utime_t latency); + void update_subvolume_metric(bool write, utime_t start, utime_t end, uint64_t size, Inode *in); bool should_check_perms() const { return (is_fuse && !fuse_default_permissions) || (!is_fuse && client_permissions); @@ -1921,6 +1963,9 @@ private: // Cluster fsid fs_cluster_id_t fscid; + // subvolume metrics tracker + std::unique_ptr subvolume_tracker = nullptr; + // file handles, etc. interval_set free_fd_set; // unused fds std::unordered_map fd_map; diff --git a/src/include/cephfs/metrics/Types.h b/src/include/cephfs/metrics/Types.h index 5bc725c02cc..e758821a3e7 100644 --- a/src/include/cephfs/metrics/Types.h +++ b/src/include/cephfs/metrics/Types.h @@ -13,6 +13,7 @@ #include "include/int_types.h" #include "include/stringify.h" #include "include/utime.h" +#include "include/fs_types.h" namespace ceph { class Formatter; } @@ -33,6 +34,7 @@ enum ClientMetricType { CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, + CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, }; inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) { switch(type) { @@ -84,6 +86,9 @@ inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) case ClientMetricType::CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY: os << "STDEV_METADATA_LATENCY"; break; + case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS: + os << "SUBVOLUME_METRICS"; + break; default: os << "(UNKNOWN:" << static_cast::type>(type) << ")"; break; @@ -552,17 +557,248 @@ struct UnknownPayload : public ClientMetricPayloadBase { } }; +/** + * @brief Struct to hold single IO metric + * To save the memory for clients with high number of subvolumes, the layout is as following: + * 64 bits of data: + * MSB - read or write (0/1) + * 32 bits for latency, in microsecs + * 31 bits for payload size, bytes + */ +struct SimpleIOMetric { + uint64_t packed_data; + + // is_write flag (1 bit) - MSB + static constexpr uint64_t IS_WRITE_BITS = 1; + static constexpr uint64_t IS_WRITE_SHIFT = 64 - IS_WRITE_BITS; // 63 + static constexpr uint64_t IS_WRITE_MASK = 1ULL << IS_WRITE_SHIFT; // 0x8000000000000000ULL + + // latency in microseconds (32 bits) + // directly after is_write bit + static constexpr uint64_t LATENCY_BITS = 32; + static constexpr uint64_t LATENCY_SHIFT = IS_WRITE_SHIFT - LATENCY_BITS; // 63 - 32 = 31 + static constexpr uint64_t LATENCY_MASK = ((1ULL << LATENCY_BITS) - 1) << LATENCY_SHIFT; + + // payload size (31 bits) + // Placed directly after latency, occupying the lowest 31 bits + static constexpr uint64_t PAYLOAD_SIZE_BITS = 31; + static constexpr uint64_t PAYLOAD_SIZE_SHIFT = 0; // Starts from the LSB (bit 0) + static constexpr uint64_t PAYLOAD_SIZE_MASK = (1ULL << PAYLOAD_SIZE_BITS) - 1; + + static constexpr uint32_t MAX_LATENCY_US = (1ULL << LATENCY_BITS) - 1; // 2^32 - 1 microseconds + static constexpr uint32_t MAX_PAYLOAD_SIZE = (1ULL << PAYLOAD_SIZE_BITS) - 1; // 2^31 - 1 bytes + + SimpleIOMetric(bool is_write, utime_t latency, uint32_t payload_size) : packed_data(0) { + if (is_write) { + packed_data |= IS_WRITE_MASK; + } + + uint64_t current_latency_usec = latency.usec(); + + // if less than 1 microsecond - set to 1 microsecond + if (current_latency_usec == 0) { + current_latency_usec = 1; + } + + // handle overflow + if (current_latency_usec > MAX_LATENCY_US) { + current_latency_usec = MAX_LATENCY_US; // Truncate if too large + } + packed_data |= (current_latency_usec << LATENCY_SHIFT) & LATENCY_MASK; + + + // payload_size (31 bits) + if (payload_size > MAX_PAYLOAD_SIZE) { + payload_size = MAX_PAYLOAD_SIZE; // Truncate if too large + } + packed_data |= (static_cast(payload_size) << PAYLOAD_SIZE_SHIFT) & PAYLOAD_SIZE_MASK; + } + + SimpleIOMetric() : packed_data(0) {} + + bool get_is_write() const { + return (packed_data & IS_WRITE_MASK) != 0; + } + + uint32_t get_latency_us() const { + return static_cast((packed_data & LATENCY_MASK) >> LATENCY_SHIFT); + } + + uint32_t get_payload_size() const { + return static_cast((packed_data & PAYLOAD_SIZE_MASK) >> PAYLOAD_SIZE_SHIFT); + } + + // --- Dump method --- + void dump(Formatter *f) const { + f->dump_string("op", get_is_write() ? "w" : "r"); + f->dump_unsigned("lat_us", get_latency_us()); + f->dump_unsigned("size", get_payload_size()); + } +}; + +/** + * brief holds resuolt of the SimpleIOMetrics aggregation, for each subvolume, on the client + * the aggregation occurs just before sending metrics to the MDS + */ +struct AggregatedIOMetrics { + inodeno_t subvolume_id = 0; + uint32_t read_count = 0; + uint32_t write_count = 0; + uint64_t read_bytes = 0; + uint64_t write_bytes = 0; + uint64_t read_latency_us = 0; + uint64_t write_latency_us = 0; + uint64_t time_stamp; // set on MDS + + void add(const SimpleIOMetric& m) { + auto lat = m.get_latency_us(); + if (m.get_is_write()) { + ++write_count; + write_bytes += m.get_payload_size(); + write_latency_us += lat; + } else { + ++read_count; + read_bytes += m.get_payload_size(); + read_latency_us += lat; + } + } + + uint64_t avg_read_throughput_Bps() const { + return read_latency_us > 0 ? (read_bytes * 1'000'000ULL) / read_latency_us : 0; + } + + uint64_t avg_write_throughput_Bps() const { + return write_latency_us > 0 ? (write_bytes * 1'000'000ULL) / write_latency_us : 0; + } + + uint64_t avg_read_latency_us() const { + return read_count ? read_latency_us / read_count : 0; + } + + uint64_t avg_write_latency_us() const { + return write_count ? write_latency_us / write_count : 0; + } + + double read_iops() const { + return read_latency_us > 0 ? static_cast(read_count) * 1e9 / read_latency_us : 0.0; + } + + double write_iops() const { + return write_latency_us > 0 ? static_cast(write_count) * 1e9 / write_latency_us : 0.0; + } + + void dump(Formatter *f) const { + f->dump_unsigned("subvolume_id", subvolume_id); + f->dump_unsigned("read_count", read_count); + f->dump_unsigned("read_bytes", read_bytes); + f->dump_unsigned("avg_read_latency_ns", avg_read_latency_us()); + f->dump_unsigned("read_iops", read_iops()); + f->dump_float("avg_read_tp_Bpb", avg_read_throughput_Bps()); + f->dump_unsigned("write_count", write_count); + f->dump_unsigned("write_bytes", write_bytes); + f->dump_unsigned("avg_write_latency_ns", avg_write_latency_us()); + f->dump_float("write_iops", write_iops()); + f->dump_float("avg_write_tp_Bps", avg_write_throughput_Bps()); + f->dump_unsigned("time_stamp", time_stamp); + } + + friend std::ostream& operator<<(std::ostream& os, const AggregatedIOMetrics& m) { + os << "{subvolume_id=\"" << m.subvolume_id + << "\", read_count=" << m.read_count + << ", write_count=" << m.write_count + << ", read_bytes=" << m.read_bytes + << ", write_bytes=" << m.write_bytes + << ", read_latency_ns=" << m.read_latency_us + << ", write_latency_ns=" << m.write_latency_us + << ", time_stamp_ms=" << m.time_stamp + << "}"; + return os; + } + + void encode(bufferlist& bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(subvolume_id, bl); + encode(read_count, bl); + encode(write_count, bl); + encode(read_bytes, bl); + encode(write_bytes, bl); + encode(read_latency_us, bl); + encode(write_latency_us, bl); + encode(time_stamp, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& p) { + using ceph::decode; + DECODE_START(1, p); + decode(subvolume_id, p); + decode(read_count, p); + decode(write_count, p); + decode(read_bytes, p); + decode(write_bytes, p); + decode(read_latency_us, p); + decode(write_latency_us, p); + decode(time_stamp, p); + DECODE_FINISH(p); + } +}; + +struct SubvolumeMetricsPayload : public ClientMetricPayloadBase { + std::vector subvolume_metrics; + + SubvolumeMetricsPayload() : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS) { } + SubvolumeMetricsPayload(std::vector metrics) + : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS), subvolume_metrics(std::move(metrics)) {} + + void encode(bufferlist &bl) const { + using ceph::encode; + ENCODE_START(1, 1, bl); + encode(subvolume_metrics.size(), bl); + for(auto const& m : subvolume_metrics) { + m.encode(bl); + } + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &iter) { + using ceph::decode; + DECODE_START(1, iter) ; + size_t size = 0; + decode(size, iter); + subvolume_metrics.clear(); + subvolume_metrics.reserve(size); + for (size_t i = 0; i < size; ++i) { + subvolume_metrics.emplace_back(); + subvolume_metrics.back().decode(iter); + } + DECODE_FINISH(iter); + } + + void dump(Formatter *f) const { + f->open_array_section("metrics"); + for(auto const& m : subvolume_metrics) + m.dump(f); + f->close_section(); + } + + void print(std::ostream *out) const { + *out << "metrics_count: " << subvolume_metrics.size(); + } +}; + typedef std::variant ClientMetricPayload; + ReadLatencyPayload, + WriteLatencyPayload, + MetadataLatencyPayload, + DentryLeasePayload, + OpenedFilesPayload, + PinnedIcapsPayload, + OpenedInodesPayload, + ReadIoSizesPayload, + WriteIoSizesPayload, + SubvolumeMetricsPayload, + UnknownPayload> ClientMetricPayload; // metric update message sent by clients struct ClientMetricMessage { @@ -676,6 +912,9 @@ public: case ClientMetricType::CLIENT_METRIC_TYPE_WRITE_IO_SIZES: payload = WriteIoSizesPayload(); break; + case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS: + payload = SubvolumeMetricsPayload(); + break; default: payload = UnknownPayload(static_cast(metric_type)); break; diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index b2a0510e671..b227de7f1a3 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -4228,7 +4228,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, * note: encoding matches MClientReply::InodeStat */ if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) { - ENCODE_START(8, 1, bl); + ENCODE_START(9, 1, bl); encode(std::tuple{ oi->ino, snapid, @@ -4281,6 +4281,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, encode(file_i->fscrypt_auth, bl); encode(file_i->fscrypt_file, bl); encode_nohead(optmdbl, bl); + encode(get_subvolume_id(), bl); // encode inodestat ENCODE_FINISH(bl); } @@ -5504,6 +5505,11 @@ int64_t CInode::get_backtrace_pool() const } } +inodeno_t CInode::get_subvolume_id() const { + auto snapr = find_snaprealm(); + return snapr ? snapr->get_subvolume_ino() : inodeno_t(0); +} + void CInode::queue_export_pin(mds_rank_t export_pin) { if (state_test(CInode::STATE_QUEUEDEXPORTPIN)) diff --git a/src/mds/CInode.h b/src/mds/CInode.h index 09332832e24..b5a7b9a4c6a 100644 --- a/src/mds/CInode.h +++ b/src/mds/CInode.h @@ -1148,7 +1148,7 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter=0 */ int64_t get_backtrace_pool() const; - + inodeno_t get_subvolume_id() const; protected: ceph_lock_state_t *get_fcntl_lock_state() { if (!fcntl_locks) diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 221da02f751..dd2d806eb23 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -2491,6 +2491,13 @@ Capability* Locker::issue_new_caps(CInode *in, return cap; } +void Locker::maybe_set_subvolume_id(const CInode* inode, ref_t& ack) { + if (ack && inode) { + ack->subvolume_id = inode->get_subvolume_id(); + dout(10) << "setting subvolume id " << ack->subvolume_id << " for inode " << inode->ino().val << dendl; + } +} + void Locker::issue_caps_set(set& inset) { for (set::iterator p = inset.begin(); p != inset.end(); ++p) @@ -3398,6 +3405,7 @@ void Locker::handle_client_caps(const cref_t &m) if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty); } + CInode *head_in = mdcache->get_inode(m->get_ino()); if (m->get_client_tid() > 0 && session && session->have_completed_flush(m->get_client_tid())) { dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid() @@ -3406,9 +3414,11 @@ void Locker::handle_client_caps(const cref_t &m) if (op == CEPH_CAP_OP_FLUSHSNAP) { if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack); ack = make_message(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, 0, mds->get_osd_epoch_barrier()); + maybe_set_subvolume_id(head_in, ack); } else { if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack); ack = make_message(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, 0, mds->get_osd_epoch_barrier()); + maybe_set_subvolume_id(head_in, ack); } ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); @@ -3445,7 +3455,6 @@ void Locker::handle_client_caps(const cref_t &m) } } - CInode *head_in = mdcache->get_inode(m->get_ino()); if (!head_in) { if (mds->is_clientreplay()) { dout(7) << "handle_client_caps on unknown ino " << m->get_ino() @@ -3534,6 +3543,7 @@ void Locker::handle_client_caps(const cref_t &m) ack->set_snap_follows(follows); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + maybe_set_subvolume_id(head_in, ack); } if (in == head_in || @@ -3622,6 +3632,7 @@ void Locker::handle_client_caps(const cref_t &m) m->get_caps(), 0, dirty, 0, cap->get_last_issue(), mds->get_osd_epoch_barrier()); ack->set_client_tid(m->get_client_tid()); ack->set_oldest_flush_tid(m->get_oldest_flush_tid()); + maybe_set_subvolume_id(head_in, ack); } // filter wanted based on what we could ever give out (given auth/replica status) diff --git a/src/mds/Locker.h b/src/mds/Locker.h index bce080d12db..098e3a609ad 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -261,6 +261,7 @@ protected: void file_update_finish(CInode *in, MutationRef& mut, unsigned flags, client_t client, const ref_t &ack); + void maybe_set_subvolume_id(const CInode* head_in, ref_t& ack); xlist updated_scatterlocks; // Maintain a global list to quickly find if any caps are late revoking diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index a84e5e9a253..3afa85bacc2 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -4056,6 +4056,14 @@ epoch_t MDSRank::get_osd_epoch() const return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)); } +std::string MDSRank::get_path(inodeno_t ino) { + CInode* inode = mdcache->get_inode(ino); + if (!inode) return {}; + std::string res; + inode->make_path_string(res); + return res; +} + std::vector MDSRankDispatcher::get_tracked_keys() const noexcept { diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index bc7f5c96282..775a26c2bde 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -390,6 +390,8 @@ class MDSRank { return inject_journal_corrupt_dentry_first; } + std::string get_path(inodeno_t ino); + // Reference to global MDS::mds_lock, so that users of MDSRank don't // carry around references to the outer MDS, and we can substitute // a separate lock here in future potentially. diff --git a/src/mds/MetricsHandler.cc b/src/mds/MetricsHandler.cc index ae16e8962b8..2efde2b1299 100644 --- a/src/mds/MetricsHandler.cc +++ b/src/mds/MetricsHandler.cc @@ -5,7 +5,6 @@ #include "common/debug.h" #include "common/errno.h" -#include "include/cephfs/metrics/Types.h" #include "messages/MClientMetrics.h" #include "messages/MMDSMetrics.h" @@ -56,6 +55,8 @@ void MetricsHandler::init() { update_rank0(); } }); + + } void MetricsHandler::shutdown() { @@ -322,6 +323,31 @@ void MetricsHandler::handle_payload(Session *session, const WriteIoSizesPayload metrics.write_io_sizes_metric.updated = true; } +void MetricsHandler::handle_payload(Session *session, const SubvolumeMetricsPayload &payload) { + dout(20) << ": type=" << payload.get_type() << ", session=" << session + << " , subv_metrics count=" << payload.subvolume_metrics.size() << dendl; + + auto it = client_metrics_map.find(session->info.inst); + if (it == client_metrics_map.end()) { + return; + } + + // on each mds we accumulate aggregated metrics from each client per each subvolume + // to be later send to the metrics handler for aggregation + for (auto const& metric : payload.subvolume_metrics) { + std::string path = mds->get_path(metric.subvolume_id); + if (path.empty()) { + dout(10) << " path not found for " << metric.subvolume_id << dendl; + continue; + } + auto& metrics_vec = subvolume_metrics_map[path]; + dout(20) << " accumulating subv_metric " << metric << dendl; + metrics_vec.push_back(std::move(metric)); + metrics_vec.back().time_stamp = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); + } +} + void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) { dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl; } @@ -410,14 +436,64 @@ void MetricsHandler::update_rank0() { } } + // subvolume metrics, reserve 100 entries per subvolume ? good enough? + metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100); + for (auto &[path, aggregated_metrics] : subvolume_metrics_map) { + metrics_message.subvolume_metrics.emplace_back(); + aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back()); + } + // if we need to show local MDS metrics, we need to save a last copy... + subvolume_metrics_map.clear(); + // only start incrementing when its kicked via set_next_seq() if (next_seq != 0) { ++last_updated_seq; } - dout(20) << ": sending metric updates for " << update_client_metrics_map.size() - << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number " - << next_seq << ", last updated sequence number " << last_updated_seq << dendl; + dout(20) << "sending " << metrics_message.subvolume_metrics.size() << " subv_metrics to aggregator" << dendl; mds->send_message_mds(make_message(std::move(metrics_message)), *addr_rank0); } + +void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path, + const std::vector& metrics_list, SubvolumeMetric &res) { + dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl; + res.subvolume_path = subvolume_path; + + uint64_t weighted_read_latency_sum = 0; + uint64_t weighted_write_latency_sum = 0; + + res.read_ops = 0; + res.write_ops = 0; + res.read_size = 0; + res.write_size = 0; + res.avg_read_latency = 0; + res.avg_write_latency = 0; + res.time_stamp = 0; + + for (const auto& m : metrics_list) { + res.read_ops += m.read_count; + res.write_ops += m.write_count; + res.read_size += m.read_bytes; + res.write_size += m.write_bytes; + // we want to have more metrics in the sliding window (on the aggregator), + // so we set the latest timestamp of all received metrics + res.time_stamp = std::max(res.time_stamp, m.time_stamp); + + if (m.read_count > 0) { + weighted_read_latency_sum += m.read_latency_us * m.read_count; + } + + if (m.write_count > 0) { + weighted_write_latency_sum += m.write_latency_us * m.write_count; + } + } + + // normalize latencies + res.avg_read_latency = (res.read_ops > 0) + ? (weighted_read_latency_sum / res.read_ops) + : 0; + res.avg_write_latency = (res.write_ops > 0) + ? (weighted_write_latency_sum / res.write_ops) + : 0; +} \ No newline at end of file diff --git a/src/mds/MetricsHandler.h b/src/mds/MetricsHandler.h index 0d03fe8b209..53255073598 100644 --- a/src/mds/MetricsHandler.h +++ b/src/mds/MetricsHandler.h @@ -5,6 +5,7 @@ #define CEPH_MDS_METRICS_HANDLER_H #include +#include #include #include @@ -12,6 +13,7 @@ #include "common/ceph_mutex.h" #include "MDSPerfMetricTypes.h" +#include "include/cephfs/metrics/Types.h" #include #include @@ -26,7 +28,9 @@ struct PinnedIcapsPayload; struct OpenedInodesPayload; struct ReadIoSizesPayload; struct WriteIoSizesPayload; +struct SubvolumeMetricsPayload; struct UnknownPayload; +struct AggregatedIOMetrics; class MClientMetrics; class MDSMap; class MDSRank; @@ -73,6 +77,8 @@ private: } }; + std::unique_ptr create_subv_perf_counter(const std::string& subv_name); + MDSRank *mds; // drop this lock when calling ->send_message_mds() else mds might // deadlock @@ -89,8 +95,10 @@ private: std::thread updater; std::map> client_metrics_map; - - // address of rank 0 mds, so that the message can be sent without + // maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance + std::unordered_map> subvolume_metrics_map; + uint64_t subv_metrics_tracker_window_time_sec = 300; + // address of rank 0 mds, so that the message can be sent withoutå // acquiring mds_lock. misdirected messages to rank 0 are taken // care of by rank 0. boost::optional addr_rank0; @@ -107,6 +115,7 @@ private: void handle_payload(Session *session, const OpenedInodesPayload &payload); void handle_payload(Session *session, const ReadIoSizesPayload &payload); void handle_payload(Session *session, const WriteIoSizesPayload &payload); + void handle_payload(Session *session, const SubvolumeMetricsPayload &payload); void handle_payload(Session *session, const UnknownPayload &payload); void set_next_seq(version_t seq); @@ -116,6 +125,9 @@ private: void handle_mds_ping(const cref_t &m); void update_rank0(); + + void aggregate_subvolume_metrics(const std::string& subvolume_path, + const std::vector& metrics_list, SubvolumeMetric &res); }; #endif // CEPH_MDS_METRICS_HANDLER_H diff --git a/src/mds/cephfs_features.h b/src/mds/cephfs_features.h index 391054d8701..4a74a56daab 100644 --- a/src/mds/cephfs_features.h +++ b/src/mds/cephfs_features.h @@ -94,7 +94,8 @@ namespace ceph { CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY, \ CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY, \ CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY, \ - CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \ + CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \ + CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, \ } #define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc index 5be3506bc3d..4261416c641 100644 --- a/src/mds/mdstypes.cc +++ b/src/mds/mdstypes.cc @@ -1193,3 +1193,26 @@ void BlockDiff::print(ostream& out) const { out << "{rval: " << rval << ", scan_idx=" << scan_idx << ", blocks=" << blocks << "}"; } + +void SubvolumeMetric::dump(Formatter *f) const { + f->dump_string("subvolume_path", subvolume_path); + f->dump_unsigned("read_ops", read_ops); + f->dump_unsigned("write_ops", write_ops); + f->dump_unsigned("read_size", read_size); + f->dump_unsigned("write_size", write_size); + f->dump_unsigned("avg_read_latency", avg_read_latency); + f->dump_unsigned("avg_write_latency", avg_write_latency); + f->dump_unsigned("time_window_sec", time_stamp); +} + +std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) { + os << "{subv_path=" << m.subvolume_path + << ", read_ops=" << m.read_ops + << ", write_ops=" << m.write_ops + << ", read_size=" << m.read_size + << ", write_size=" << m.write_size + << ", avg_read_lat=" << m.avg_read_latency + << ", avg_write_lat=" << m.avg_write_latency + << ", time_window_sec=" << m.time_stamp << "}"; + return os; +} diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 5cd4e37bba9..bf3ab5b6344 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "common/DecayCounter.h" #include "common/entity_name.h" @@ -985,4 +986,37 @@ struct BlockDiff { }; WRITE_CLASS_ENCODER(BlockDiff); +/** + * + * @brief Represents aggregated metric per subvolume + * This aggregation is a result of aggregating multiple + * AggregatedIOMetric instances from various clients + * (AggregatedIOMetric is created on the client by aggregating multiple SimpleIOMetric instances) + */ +struct SubvolumeMetric { + std::string subvolume_path; + uint64_t read_ops = 0; + uint64_t write_ops = 0; + uint64_t read_size = 0; + uint64_t write_size = 0; + uint64_t avg_read_latency = 0; + uint64_t avg_write_latency = 0; + uint64_t time_stamp = 0; + + DENC(SubvolumeMetric, v, p) { + DENC_START(1, 1, p); + denc(v.subvolume_path, p); + denc(v.read_ops, p); + denc(v.write_ops, p); + denc(v.read_size, p); + denc(v.write_size, p); + denc(v.avg_read_latency, p); + denc(v.avg_write_latency, p); + denc(v.time_stamp, p); + DENC_FINISH(p); + } + + void dump(Formatter *f) const; + friend std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m); +}; #endif diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h index b001032225e..bdb2040e860 100644 --- a/src/messages/MClientCaps.h +++ b/src/messages/MClientCaps.h @@ -22,7 +22,7 @@ class MClientCaps final : public SafeMessage { private: - static constexpr int HEAD_VERSION = 12; + static constexpr int HEAD_VERSION = 13; static constexpr int COMPAT_VERSION = 1; public: @@ -61,6 +61,7 @@ private: std::vector fscrypt_auth; std::vector fscrypt_file; + uint64_t subvolume_id = 0; int get_caps() const { return head.caps; } int get_wanted() const { return head.wanted; } @@ -282,6 +283,9 @@ public: decode(fscrypt_auth, p); decode(fscrypt_file, p); } + if (header.version >= 13) { + decode(subvolume_id, p); + } } void encode_payload(uint64_t features) override { using ceph::encode; @@ -352,6 +356,7 @@ public: encode(nsubdirs, payload); encode(fscrypt_auth, payload); encode(fscrypt_file, payload); + encode(subvolume_id, payload); } private: template diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h index 0b8a7731c00..d86e8d6c016 100644 --- a/src/messages/MClientReply.h +++ b/src/messages/MClientReply.h @@ -152,6 +152,7 @@ struct InodeStat { std::vector fscrypt_file; optmetadata_multiton optmetadata; + inodeno_t subvolume_id; public: InodeStat() {} @@ -166,7 +167,7 @@ struct InodeStat { void decode(ceph::buffer::list::const_iterator &p, const uint64_t features) { using ceph::decode; if (features == (uint64_t)-1) { - DECODE_START(8, p); + DECODE_START(9, p); decode(vino.ino, p); decode(vino.snapid, p); decode(rdev, p); @@ -232,6 +233,9 @@ struct InodeStat { if (struct_v >= 8) { decode(optmetadata, p); } + if (struct_v >= 9) { + decode(subvolume_id, p); + } DECODE_FINISH(p); } else { -- 2.39.5