]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
client,mds: add support for subvolume level metrics
authorIgor Golikov <igolikov@ibm.com>
Thu, 10 Jul 2025 10:17:36 +0000 (10:17 +0000)
committerVenky Shankar <vshankar@redhat.com>
Wed, 10 Sep 2025 16:42:45 +0000 (16:42 +0000)
Add support for client side metrics collection using SimpleIOMetric
struct and aggregation using AggregatedIOMetrics struct,
Client holds SimpleIOMetrics vector per each subvolume it recognized
(via caps/metadata messages), aggregates them into the
AggregatedIOMetric struct, and sends  periodically to the MDS, along
with regulat client metrics.
MDS holds map of subvolume_path -> vector<AggregatedIOMetrics> and sends
it periodically to rank0, for further aggregation and exposure.

Fixes: https://tracker.ceph.com/issues/68929, https://tracker.ceph.com/issues/68930
Signed-off-by: Igor Golikov <igolikov@ibm.com>
17 files changed:
src/client/Client.cc
src/client/Client.h
src/common/Clock.h
src/include/cephfs/metrics/Types.h
src/mds/CInode.cc
src/mds/CInode.h
src/mds/Locker.cc
src/mds/Locker.h
src/mds/MDSRank.cc
src/mds/MDSRank.h
src/mds/MetricsHandler.cc
src/mds/MetricsHandler.h
src/mds/cephfs_features.h
src/mds/mdstypes.cc
src/mds/mdstypes.h
src/messages/MClientCaps.h
src/messages/MClientReply.h

index 107419b7d8bd99da79579c5ac6e811c088796544..4d802fa5d60290dc75382bac8abd411d87ffb2c9 100644 (file)
@@ -247,6 +247,8 @@ int Client::CommandHook::call(
       m_client->_kick_stale_sessions();
     else if (command == "status")
       m_client->dump_status(f);
+    else if (command == "dump_subvolume_metrics")
+      m_client->dump_subvolume_metrics(f);
     else
       ceph_abort_msg("bad command registered");
   }
@@ -400,7 +402,8 @@ Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
     async_ino_releasor(m->cct),
     objecter_finisher(m->cct),
     m_command_hook(this),
-    fscid(0)
+    fscid(0),
+    subvolume_tracker{std::make_unique<SubvolumeMetricTracker>(cct, whoami)}
 {
   /* We only use the locale for normalization/case folding. That is unaffected
    * by the locale but required by the API.
@@ -629,6 +632,10 @@ void Client::dump_status(Formatter *f)
   }
 }
 
+void Client::dump_subvolume_metrics(Formatter* f) {
+  subvolume_tracker->dump(f);
+}
+
 void Client::_pre_init()
 {
   timer.init();
@@ -718,6 +725,13 @@ void Client::_finish_init()
     lderr(cct) << "error registering admin socket command: "
               << cpp_strerror(-ret) << dendl;
   }
+  ret = admin_socket->register_command("dump_subvolume_metrics_aggr",
+                                      &m_command_hook,
+                                      "dump aggregated subvolume metrics");
+    if (ret < 0) {
+      lderr(cct) << "error registering admin socket command: "
+              << cpp_strerror(-ret) << dendl;
+    }
 }
 
 void Client::shutdown() 
@@ -857,7 +871,6 @@ void Client::update_io_stat_write(utime_t latency) {
   logger->set(l_c_wr_sqsum, n_sqsum);
   logger->set(l_c_wr_ops, nr_write_request);
 }
-
 // ===================
 // metadata cache stuff
 
@@ -1743,6 +1756,10 @@ Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
 
     in = add_update_inode(&ist, request->sent_stamp, session,
                          request->perms);
+    if (ist.subvolume_id) {
+      ldout(cct, 20) << __func__ << " subv_metric adding " << in->ino << "-" << ist.subvolume_id << dendl;
+      subvolume_tracker->add_inode(in->ino, ist.subvolume_id);
+    }
   }
 
   Inode *diri = NULL;
@@ -2255,7 +2272,7 @@ int Client::make_request(MetaRequest *request,
     *pdirbl = reply->get_extra_bl();
 
   // -- log times --
-  utime_t lat = ceph_clock_now();
+  utime_t lat = mono_clock_now();
   lat -= request->sent_stamp;
   ldout(cct, 20) << "lat " << lat << dendl;
 
@@ -5532,6 +5549,12 @@ void Client::handle_caps(const MConstRef<MClientCaps>& m)
 
   got_mds_push(session.get());
 
+  // check whether the current inode is under subvolume for metrics collection
+  if (m->subvolume_id > 0) {
+    ldout(cct, 10) << __func__ << " adding " << m->get_ino() << " to subvolume tracker " << m->subvolume_id << dendl;
+    subvolume_tracker->add_inode(m->get_ino(), m->subvolume_id);
+  }
+
   bool do_cap_release = false;
   Inode *in;
   vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
@@ -7205,8 +7228,13 @@ void Client::flush_cap_releases()
   for (auto &p : mds_sessions) {
     auto session = p.second;
     if (session->release && mdsmap->is_clientreplay_or_active_or_stopping(
-          p.first)) {
+            p.first)) {
       nr_caps += session->release->caps.size();
+      for (const auto &cap: session->release->caps) {
+        ldout(cct, 10) << __func__ << " removing " << static_cast<inodeno_t>(cap.ino) <<
+        " from subvolume tracker" << dendl;
+        subvolume_tracker->remove_inode(static_cast<inodeno_t>(cap.ino));
+      }
       if (cct->_conf->client_inject_release_failure) {
         ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
       } else {
@@ -7442,6 +7470,18 @@ void Client::collect_and_send_global_metrics() {
     message.push_back(metric);
   }
 
+  // subvolume metrics
+  if (_collect_and_send_global_metrics ||
+        session->mds_metric_flags.test(CLIENT_METRIC_TYPE_SUBVOLUME_METRICS)) {
+    auto metrics = subvolume_tracker->aggregate(true);
+    if (!metrics.empty()) {
+      for (auto m: metrics)
+        ldout(cct, 20) << " sending subv_metric " << m << dendl;
+      metric = ClientMetricMessage(SubvolumeMetricsPayload(metrics));
+      message.push_back(metric);
+    }
+  }
+
   session->con->send_message2(make_message<MClientMetrics>(std::move(message)));
 }
 
@@ -10971,8 +11011,9 @@ void Client::C_Read_Finisher::finish_io(int r)
       f->pos = offset + r;
     }
     
-    lat = ceph_clock_now();
+    lat = mono_clock_now();
     lat -= start;
+    clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, static_cast<uint32_t>(r)});
     ++clnt->nr_read_request;
     clnt->update_io_stat_read(lat);
   }
@@ -11059,7 +11100,7 @@ void Client::C_Read_Sync_NonBlocking::finish(int r)
 success:
 
   r = read;
-
+  clnt->subvolume_tracker->add_metric(in->ino, SimpleIOMetric(false, mono_clock_now() - start_time, r));
 error:
 
   onfinish->complete(r);
@@ -11081,7 +11122,7 @@ int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl,
   const auto& conf = cct->_conf;
   Inode *in = f->inode.get();
   utime_t lat;
-  utime_t start = ceph_clock_now(); 
+  utime_t start = mono_clock_now();
   CRF_iofinish *crf_iofinish = nullptr;
 
   if ((f->mode & CEPH_FILE_MODE_RD) == 0)
@@ -11152,7 +11193,7 @@ retry:
                          conf->client_oc &&
                          (have & (CEPH_CAP_FILE_CACHE |
                                   CEPH_CAP_FILE_LAZYIO)),
-                       have, movepos, start, f, in, f->pos, offset, size));
+                       have, movepos, f, in, f->pos, offset, size));
 
     crf_iofinish->CRF = crf.get();
   }
@@ -11266,9 +11307,9 @@ success:
     f->pos = start_pos + rc;
   }
   
-  lat = ceph_clock_now();
+  lat = mono_clock_now();
   lat -= start;
-
+  subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, lat, bl->length()});
   ++nr_read_request;
   update_io_stat_read(lat);
 
@@ -11284,7 +11325,7 @@ done:
 }
 
 Client::C_Readahead::C_Readahead(Client *c, Fh *f) :
-    client(c), f(f) {
+    client(c), f(f), start_time(mono_clock_now()) {
   f->get();
   f->readahead.inc_pending();
 }
@@ -11299,6 +11340,7 @@ void Client::C_Readahead::finish(int r) {
   client->put_cap_ref(f->inode.get(), CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE);
   if (r > 0) {
     client->update_read_io_size(r);
+    client->subvolume_tracker->add_metric(f->inode->ino, SimpleIOMetric(false, mono_clock_now()-start_time, r));
   }
 }
 
@@ -11391,6 +11433,7 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
     io_finish.reset(io_finish_cond);
   }
 
+  auto start_time = mono_clock_now();
   r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
                              off, len, bl, 0, io_finish.get());
 
@@ -11416,6 +11459,7 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
     client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
     update_read_io_size(bl->length());
+    subvolume_tracker->add_metric(in->ino, SimpleIOMetric{false, mono_clock_now() - start_time, bl->length()});
   } else {
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
   }
@@ -11625,9 +11669,10 @@ int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos,
 
   update_write_io_size(size);
   // time
-  lat = ceph_clock_now();
+  lat = mono_clock_now();
   lat -= start;
 
+  subvolume_tracker->add_metric(in->ino, SimpleIOMetric{true, lat, static_cast<uint32_t>(size)});
   ++nr_write_request;
   update_io_stat_write(lat);
 
@@ -11781,7 +11826,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
        (uint64_t)(offset+size) > in->size ) { //exceeds filesize 
       return -EFBIG;              
        }
-  //ldout(cct, 7) << "write fh " << fh << " size " << size << " offset " << offset << dendl;
+  ldout(cct, 7) << "_write fh " << f << " size " << size << " offset " << offset << dendl;
 
   if (objecter->osdmap_pool_full(in->layout.pool_id)) {
     return -ENOSPC;
@@ -11824,7 +11869,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
   ldout(cct, 10) << "cur file size is " << in->size << dendl;
 
   // time it.
-  utime_t start = ceph_clock_now();
+  utime_t start = mono_clock_now();
 
   if (in->inline_version == 0) {
     int r = _getattr(in, CEPH_STAT_CAP_INLINE_DATA, f->actor_perms, true);
@@ -11897,7 +11942,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
                         cct->_conf->client_oc &&
                           (have & (CEPH_CAP_FILE_BUFFER |
                                  CEPH_CAP_FILE_LAZYIO)),
-                        start, f, in, fpos, offset, size,
+                        f, in, fpos, offset, size,
                         do_fsync, syncdataonly));
 
     cwf_iofinish->CWF = cwf.get();
@@ -11912,6 +11957,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
     get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 
     // async, caching, non-blocking.
+    ldout(cct, 10) << " _write_oc " << dendl;
     r = objectcacher->file_write(&in->oset, &in->layout,
                                 in->snaprealm->get_snap_context(),
                                 offset, size, bl, ceph::real_clock::now(),
@@ -11948,6 +11994,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
       }
 
       // allow caller to wait on onfinish...
+      ldout(cct, 10) << " _write_oc_1" << dendl;
       return 0;
     }
 
@@ -11989,6 +12036,8 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
       sleep(delay);
       client_lock.lock();
     }
+
+    ldout(cct, 10) << " _write_filer" << dendl;
     filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
                       offset, size, bl, ceph::real_clock::now(), 0,
                       in->truncate_size, in->truncate_seq,
@@ -12003,6 +12052,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
       cwf.release();
 
       // allow caller to wait on onfinish...
+      ldout(cct, 10) << " _write_filer_2" << dendl;
       return 0;
     }
 
@@ -12018,6 +12068,7 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, bufferlist bl,
 success:
 
   // do not get here if non-blocking caller (onfinish != nullptr)
+  ldout(cct, 10) << " _write_filer_succeess" << dendl;
   r = _write_success(f, start, fpos, offset, size, in);
 
   if (r >= 0 && do_fsync) {
@@ -12290,7 +12341,7 @@ void Client::C_nonblocking_fsync_state::advance()
 
   utime_t lat;
 
-  lat = ceph_clock_now();
+  lat = mono_clock_now();
   lat -= start;
   clnt->logger->tinc(l_c_fsync, lat);
 
@@ -12348,7 +12399,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
   ceph_tid_t flush_tid = 0;
   InodeRef tmp_ref;
   utime_t lat;
-  utime_t start = ceph_clock_now(); 
+  utime_t start = mono_clock_now();
 
   ldout(cct, 8) << "_fsync on " << *in << " " << (syncdataonly ? "(dataonly)":"(data+metadata)") << dendl;
   
@@ -12401,7 +12452,7 @@ int Client::_fsync(Inode *in, bool syncdataonly)
                  << cpp_strerror(-r) << dendl;
   }
    
-  lat = ceph_clock_now();
+  lat = mono_clock_now();
   lat -= start;
   logger->tinc(l_c_fsync, lat);
 
@@ -17693,6 +17744,112 @@ mds_rank_t Client::_get_random_up_mds() const
   return *p;
 }
 
+// --- subvolume metrics tracking --- //
+SubvolumeMetricTracker::SubvolumeMetricTracker(CephContext *ct, client_t id) : cct(ct), whoami(id) {}
+
+void SubvolumeMetricTracker::dump(Formatter *f) {
+  auto current_metrics = aggregate(false);
+  f->open_array_section("current_metrics");
+  for (auto &met : current_metrics) {
+    f->dump_object("", met);
+  }
+  f->close_section();
+
+  std::vector<AggregatedIOMetrics> temp;
+  {
+    std::shared_lock l(metrics_lock);
+    temp = last_subvolume_metrics;
+  }
+  f->open_array_section("last_metrics");
+  for (auto &val : temp) {
+    f->dump_object("", val);
+  }
+  f->close_section();
+}
+
+void SubvolumeMetricTracker::add_inode(inodeno_t inode, inodeno_t subvol) {
+  ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << subvol << dendl;
+  std::unique_lock l(metrics_lock);
+  if (likely(inode_subvolume.contains(inode))) {
+    ldout(cct, 10) << __func__ << " " << inode << "-" << subvol << " subv_metric inode exists" << dendl;
+    return;
+  }
+
+  auto [it, inserted] = subvolume_metrics.try_emplace(subvol);
+  if (inserted) {
+    ldout(cct, 10) << __func__ << " inserted " << inode << "-" << subvol << dendl;
+  }
+  inode_subvolume[inode] = subvol;
+  ldout(cct, 10) << __func__ << " add " << inode << "-" << subvol << dendl;
+}
+
+void SubvolumeMetricTracker::remove_inode(inodeno_t inode) {
+  ldout(cct, 20) << __func__ << " subv_metric " << inode << "-" << dendl;
+  std::unique_lock l(metrics_lock);
+  auto it = inode_subvolume.find(inode);
+  if (it == inode_subvolume.end()) {
+    return;
+  }
+
+  inodeno_t subvol = it->second;
+  inode_subvolume.erase(it);
+
+  auto se = subvolume_metrics.find(subvol);
+  if (se == subvolume_metrics.end()) {
+    ldout(cct, 20) << __func__ << " subv_metric not found " << inode << "-" << subvol << dendl;
+    return;
+  }
+  ldout(cct, 20) << __func__ << " subv_metric end " << inode << dendl;
+}
+
+void SubvolumeMetricTracker::add_metric(inodeno_t inode, SimpleIOMetric&& metric) {
+  ldout(cct, 10) << __func__ << " " << inode << dendl;
+  std::unique_lock l(metrics_lock);
+
+  auto it = inode_subvolume.find(inode);
+  if (it == inode_subvolume.end())
+    return;
+
+  auto& entry = subvolume_metrics[it->second];
+  entry.metrics.add(std::move(metric));
+}
+
+std::vector<AggregatedIOMetrics>
+SubvolumeMetricTracker::aggregate(bool clean) {
+  ldout(cct, 20) << __func__ << dendl;
+  std::vector<AggregatedIOMetrics> res;
+
+  if (clean) {
+    std::unordered_map<inodeno_t, SubvolumeEntry> tmp;
+    // move subvolume map to the local one, to release wlock asap
+    {
+      std::unique_lock l(metrics_lock);
+      subvolume_metrics.swap(tmp);
+    }
+    res.reserve(tmp.size());
+    for (auto &[subv_id, entry]: tmp) {
+      res.emplace_back(std::move(entry.metrics));
+      res.back().subvolume_id = subv_id;
+    }
+  } else {
+    // on rlock is needed, no need to copy the map to the local instance on the metrics map
+    std::shared_lock l(metrics_lock);
+    res.reserve(subvolume_metrics.size());
+    for (const auto &[subv_id, entry]: subvolume_metrics) {
+      res.emplace_back(entry.metrics);
+      res.back().subvolume_id = subv_id;
+    }
+  }
+
+  {
+    std::unique_lock l(metrics_lock);
+    last_subvolume_metrics = res; // since res holds only 1 aggregated metrics per subvolume, copying is not so bad
+  }
+
+  ldout(cct, 20) << __func__ << " res size " << res.size() << dendl;
+  return res; // return value optimization
+}
+// --- subvolume metrics tracking --- //
 
 StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc,
                                   boost::asio::io_context& ictx)
index 6b7b4c7dbfabae8b164fe8dd7a7f0b3a0e7df3c4..0847ea5c69e95ee6c966f0a37af8bcc686361326 100644 (file)
@@ -55,6 +55,7 @@
 #include <thread>
 #include <unordered_map>
 #include <unordered_set>
+#include <shared_mutex>
 
 using std::set;
 using std::map;
@@ -260,6 +261,40 @@ struct dir_result_t {
   int fd;                // fd attached using fdopendir (-1 if none)
 };
 
+/**
+ * @brief The subvolume metric tracker.
+ *
+ * Maps subvolume_id(which is in fact inode id) to the vector of SimpleIOMetric instances.
+ * Each simple metric records the single IO operation on the client.
+ * On clients metric message to the MDS, client will aggregate all simple metrics for each subvolume
+ * into the AggregatedIOMetric struct and clear the current metrics list.
+ * TODO: limit the cap for each subvolume? in the case client sends metrics to the MDS not so often?
+ */
+class SubvolumeMetricTracker {
+public:
+    struct SubvolumeEntry {
+        AggregatedIOMetrics metrics;
+
+        void dump(Formatter *f) const {
+          f->dump_object("", metrics);
+        }
+    };
+
+    SubvolumeMetricTracker(CephContext *ct, client_t id);
+    void dump(Formatter *f);
+    void add_inode(inodeno_t inode, inodeno_t subvol);
+    void remove_inode(inodeno_t inode);
+    void add_metric(inodeno_t inode, SimpleIOMetric&& metric);
+    std::vector<AggregatedIOMetrics> aggregate(bool clean);
+protected:
+    std::vector<AggregatedIOMetrics> last_subvolume_metrics;
+    std::unordered_map<inodeno_t, SubvolumeEntry> subvolume_metrics;
+    std::unordered_map<inodeno_t, inodeno_t> inode_subvolume;
+    CephContext *cct = nullptr;
+    client_t whoami;
+    std::shared_mutex metrics_lock;
+};
+
 class Client : public Dispatcher, public md_config_obs_t {
 public:
   friend class C_Block_Sync; // Calls block map and protected helpers
@@ -1180,6 +1215,7 @@ protected:
   void force_session_readonly(MetaSession *s);
 
   void dump_status(Formatter *f);  // debug
+  void dump_subvolume_metrics(Formatter* f);
 
   Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override;
 
@@ -1362,11 +1398,11 @@ private:
 
     C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish,
                     bool is_read_async, int have_caps, bool movepos,
-                    utime_t start, Fh *f, Inode *in, uint64_t fpos,
+                    Fh *f, Inode *in, uint64_t fpos,
                     int64_t offset, uint64_t size)
       : clnt(clnt), onfinish(onfinish), iofinish(iofinish),
         is_read_async(is_read_async), have_caps(have_caps), f(f), in(in),
-        start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) {
+        start(mono_clock_now()), fpos(fpos), offset(offset), size(size), movepos(movepos) {
       iofinished = false;
     }
 
@@ -1415,7 +1451,7 @@ private:
                             uint64_t fpos, uint64_t off, uint64_t len,
                             bufferlist *bl, Filer *filer, int have_caps)
       : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl),
-        filer(filer), have_caps(have_caps)
+        filer(filer), have_caps(have_caps), start_time(mono_clock_now())
     {
       left = len;
       wanted = len;
@@ -1439,6 +1475,7 @@ private:
     bufferlist tbl;
     Filer *filer;
     int have_caps;
+    utime_t start_time;
     int read;
     uint64_t pos;
     bool fini;
@@ -1457,7 +1494,7 @@ private:
   public:
     C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in,
                           uint64_t fpos, uint64_t off, uint64_t len)
-      : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {}
+      : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), start_time(mono_clock_now()) {}
 
   private:
     Client *clnt;
@@ -1466,6 +1503,7 @@ private:
     Inode *in;
     uint64_t off;
     uint64_t len;
+    utime_t start_time;
 
     void finish(int r) override;
   };
@@ -1492,11 +1530,11 @@ private:
     void finish_fsync(int r);
 
     C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
-                     bool is_file_write, utime_t start, Fh *f, Inode *in,
+                     bool is_file_write, Fh *f, Inode *in,
                      uint64_t fpos, int64_t offset, uint64_t size,
                      bool do_fsync, bool syncdataonly)
       : clnt(clnt), onfinish(onfinish),
-        is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
+        is_file_write(is_file_write), start(mono_clock_now()), f(f), in(in), fpos(fpos),
         offset(offset), size(size), syncdataonly(syncdataonly) {
       iofinished_r = 0;
       onuninlinefinished_r = 0;
@@ -1576,7 +1614,7 @@ private:
     C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish)
       : clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) {
       flush_tid = 0;
-      start = ceph_clock_now();
+      start = mono_clock_now();
       progress = 0;
       flush_wait = false;
       flush_completed = false;
@@ -1621,6 +1659,7 @@ private:
 
     Client *client;
     Fh *f;
+    utime_t start_time = 0;
   };
 
   /*
@@ -1932,6 +1971,9 @@ private:
   // Cluster fsid
   fs_cluster_id_t fscid;
 
+  // subvolume metrics tracker
+  std::unique_ptr<SubvolumeMetricTracker> subvolume_tracker = nullptr;
+
   // file handles, etc.
   interval_set<int> free_fd_set;  // unused fds
   std::unordered_map<int, Fh*> fd_map;
index b47954ad1ce7d65d09c4c127a3ad4834c5912bd8..44d5452593814bf54969807ab0a4b79862577db7 100644 (file)
@@ -33,4 +33,18 @@ static inline utime_t ceph_clock_now()
   return n;
 }
 
+static inline utime_t mono_clock_now()
+{
+#if defined(__linux__)
+  struct timespec tp;
+  clock_gettime(CLOCK_MONOTONIC, &tp);
+  utime_t n(tp);
+#else
+  struct timeval tv;
+  gettimeofday(&tv, nullptr);
+  utime_t n(&tv);
+#endif
+  return n;
+}
+
 #endif
index 55f67d8924b8061492ca6d1bdfe065c4e31506b7..a12b4176b3f12a2d9373c95b6dfa4bb5fc311df1 100644 (file)
@@ -13,6 +13,7 @@
 #include "include/int_types.h"
 #include "include/stringify.h"
 #include "include/utime.h"
+#include "include/fs_types.h"
 
 namespace ceph { class Formatter; }
 
@@ -33,6 +34,7 @@ enum ClientMetricType {
   CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,
   CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,
   CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
+  CLIENT_METRIC_TYPE_SUBVOLUME_METRICS,
 };
 inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type) {
   switch(type) {
@@ -84,6 +86,9 @@ inline std::ostream &operator<<(std::ostream &os, const ClientMetricType &type)
   case ClientMetricType::CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY:
     os << "STDEV_METADATA_LATENCY";
     break;
+  case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS:
+    os << "SUBVOLUME_METRICS";
+    break;
   default:
     os << "(UNKNOWN:" << static_cast<std::underlying_type<ClientMetricType>::type>(type) << ")";
     break;
@@ -552,17 +557,248 @@ struct UnknownPayload : public ClientMetricPayloadBase {
   }
 };
 
+/**
+ * @brief Struct to hold single IO metric
+ * To save the memory for clients with high number of subvolumes, the layout is as following:
+ * 64 bits of data:
+ *  MSB - read or write (0/1)
+ *  32 bits for latency, in microsecs
+ *  31 bits for payload size, bytes
+ */
+struct SimpleIOMetric {
+    uint64_t packed_data;
+
+    // is_write flag (1 bit) - MSB
+    static constexpr uint64_t OP_TYPE_BITS = 1;
+    static constexpr uint64_t OP_TYPE_SHIFT = 64 - OP_TYPE_BITS; // 63
+    static constexpr uint64_t OP_TYPE_MASK = 1ULL << OP_TYPE_SHIFT; // 0x8000000000000000ULL
+
+    // latency in microseconds (32 bits)
+    // directly after is_write bit
+    static constexpr uint64_t LATENCY_BITS = 32;
+    static constexpr uint64_t LATENCY_SHIFT = OP_TYPE_SHIFT - LATENCY_BITS; // 63 - 32 = 31
+    static constexpr uint64_t LATENCY_MASK = ((1ULL << LATENCY_BITS) - 1) << LATENCY_SHIFT;
+
+    // payload size (31 bits)
+    // Placed directly after latency, occupying the lowest 31 bits
+    static constexpr uint64_t PAYLOAD_SIZE_BITS = 31;
+    static constexpr uint64_t PAYLOAD_SIZE_SHIFT = 0; // Starts from the LSB (bit 0)
+    static constexpr uint64_t PAYLOAD_SIZE_MASK = (1ULL << PAYLOAD_SIZE_BITS) - 1;
+
+    static constexpr uint32_t MAX_LATENCY_US = (1ULL << LATENCY_BITS) - 1; // 2^32 - 1 microseconds
+    static constexpr uint32_t MAX_PAYLOAD_SIZE = (1ULL << PAYLOAD_SIZE_BITS) - 1; // 2^31 - 1 bytes
+
+    SimpleIOMetric(bool is_write, utime_t latency, uint32_t payload_size) : packed_data(0) {
+      if (is_write) {
+        packed_data |= OP_TYPE_MASK;
+      }
+
+      uint64_t current_latency_usec = latency.usec();
+
+      // if less than 1 microsecond - set to 1 microsecond
+      if (current_latency_usec == 0) {
+        current_latency_usec = 1;
+      }
+
+      // handle overflow
+      if (current_latency_usec > MAX_LATENCY_US) {
+        current_latency_usec = MAX_LATENCY_US; // Truncate if too large
+      }
+      packed_data |= (current_latency_usec << LATENCY_SHIFT) & LATENCY_MASK;
+
+
+      // payload_size (31 bits)
+      if (payload_size > MAX_PAYLOAD_SIZE) {
+        payload_size = MAX_PAYLOAD_SIZE; // Truncate if too large
+      }
+      packed_data |= (static_cast<uint64_t>(payload_size) << PAYLOAD_SIZE_SHIFT) & PAYLOAD_SIZE_MASK;
+    }
+
+    SimpleIOMetric() : packed_data(0) {}
+
+    bool is_write() const {
+      return (packed_data & OP_TYPE_MASK) != 0;
+    }
+
+    uint32_t get_latency_us() const {
+      return static_cast<uint32_t>((packed_data & LATENCY_MASK) >> LATENCY_SHIFT);
+    }
+
+    uint32_t get_payload_size() const {
+      return static_cast<uint32_t>((packed_data & PAYLOAD_SIZE_MASK) >> PAYLOAD_SIZE_SHIFT);
+    }
+
+    // --- Dump method ---
+    void dump(Formatter *f) const {
+      f->dump_string("op", is_write() ? "w" : "r");
+      f->dump_unsigned("lat_us", get_latency_us());
+      f->dump_unsigned("size", get_payload_size());
+    }
+};
+
+/**
+ * brief holds result of the SimpleIOMetrics aggregation, for each subvolume, on the client
+ * the aggregation occurs just before sending metrics to the MDS
+ */
+struct AggregatedIOMetrics {
+    inodeno_t subvolume_id = 0;
+    uint32_t read_count = 0;
+    uint32_t write_count = 0;
+    uint64_t read_bytes = 0;
+    uint64_t write_bytes = 0;
+    uint64_t read_latency_us = 0;
+    uint64_t write_latency_us = 0;
+    uint64_t time_stamp = 0; // set on MDS
+
+    void add(const SimpleIOMetric& m) {
+      auto lat = m.get_latency_us();
+      if (m.is_write()) {
+        ++write_count;
+        write_bytes += m.get_payload_size();
+        write_latency_us += lat;
+      } else {
+        ++read_count;
+        read_bytes += m.get_payload_size();
+        read_latency_us += lat;
+      }
+    }
+
+    uint64_t avg_read_throughput_Bps() const {
+      return read_latency_us > 0 ? (read_bytes * 1'000'000ULL) / read_latency_us : 0;
+    }
+
+    uint64_t avg_write_throughput_Bps() const {
+      return write_latency_us > 0 ? (write_bytes * 1'000'000ULL) / write_latency_us : 0;
+    }
+
+    uint64_t avg_read_latency_us() const {
+      return read_count ? read_latency_us / read_count : 0;
+    }
+
+    uint64_t avg_write_latency_us() const {
+      return write_count ? write_latency_us / write_count : 0;
+    }
+
+    double read_iops() const {
+      return read_latency_us > 0 ? static_cast<double>(read_count) * 1e6 / read_latency_us : 0.0;
+    }
+
+    double write_iops() const {
+      return write_latency_us > 0 ? static_cast<double>(write_count) * 1e6 / write_latency_us : 0.0;
+    }
+
+    void dump(Formatter *f) const {
+      f->dump_unsigned("subvolume_id", subvolume_id);
+      f->dump_unsigned("read_count", read_count);
+      f->dump_unsigned("read_bytes", read_bytes);
+      f->dump_unsigned("avg_read_latency_us", avg_read_latency_us());
+      f->dump_unsigned("read_iops", read_iops());
+      f->dump_float("avg_read_tp_Bps", avg_read_throughput_Bps());
+      f->dump_unsigned("write_count", write_count);
+      f->dump_unsigned("write_bytes", write_bytes);
+      f->dump_unsigned("avg_write_latency_us", avg_write_latency_us());
+      f->dump_float("write_iops", write_iops());
+      f->dump_float("avg_write_tp_Bps", avg_write_throughput_Bps());
+      f->dump_unsigned("time_stamp", time_stamp);
+    }
+
+    friend std::ostream& operator<<(std::ostream& os, const AggregatedIOMetrics& m) {
+      os << "{subvolume_id=\""    << m.subvolume_id
+         << "\", read_count="      << m.read_count
+         << ", write_count="       << m.write_count
+         << ", read_bytes="        << m.read_bytes
+         << ", write_bytes="       << m.write_bytes
+         << ", read_latency_ns="   << m.read_latency_us
+         << ", write_latency_ns="  << m.write_latency_us
+         << ", time_stamp_ms="  << m.time_stamp
+         << "}";
+      return os;
+    }
+
+    void encode(bufferlist& bl) const {
+      using ceph::encode;
+      ENCODE_START(1, 1, bl);
+      encode(subvolume_id, bl);
+      encode(read_count, bl);
+      encode(write_count, bl);
+      encode(read_bytes, bl);
+      encode(write_bytes, bl);
+      encode(read_latency_us, bl);
+      encode(write_latency_us, bl);
+      encode(time_stamp, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& p) {
+      using ceph::decode;
+      DECODE_START(1, p);
+      decode(subvolume_id, p);
+      decode(read_count, p);
+      decode(write_count, p);
+      decode(read_bytes, p);
+      decode(write_bytes, p);
+      decode(read_latency_us, p);
+      decode(write_latency_us, p);
+      decode(time_stamp, p);
+      DECODE_FINISH(p);
+    }
+};
+
+struct SubvolumeMetricsPayload : public ClientMetricPayloadBase {
+  std::vector<AggregatedIOMetrics> subvolume_metrics;
+
+  SubvolumeMetricsPayload() : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS) { }
+  SubvolumeMetricsPayload(std::vector<AggregatedIOMetrics> metrics)
+  : ClientMetricPayloadBase(ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS), subvolume_metrics(std::move(metrics)) {}
+
+  void encode(bufferlist &bl) const {
+    using ceph::encode;
+    ENCODE_START(1, 1, bl);
+    encode(subvolume_metrics.size(), bl);
+    for(auto const& m : subvolume_metrics) {
+      m.encode(bl);
+    }
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator &iter) {
+    using ceph::decode;
+    DECODE_START(1, iter) ;
+    size_t size = 0;
+    decode(size, iter);
+    subvolume_metrics.clear();
+    subvolume_metrics.reserve(size);
+    for (size_t i = 0; i < size; ++i) {
+      subvolume_metrics.emplace_back();
+      subvolume_metrics.back().decode(iter);
+    }
+    DECODE_FINISH(iter);
+  }
+
+  void dump(Formatter *f) const {
+    f->open_array_section("metrics");
+    for(auto const& m : subvolume_metrics)
+      m.dump(f);
+    f->close_section();
+  }
+
+  void print(std::ostream *out) const {
+    *out << "metrics_count: " << subvolume_metrics.size();
+  }
+};
+
 typedef std::variant<CapInfoPayload,
-                    ReadLatencyPayload,
-                    WriteLatencyPayload,
-                    MetadataLatencyPayload,
-                    DentryLeasePayload,
-                    OpenedFilesPayload,
-                    PinnedIcapsPayload,
-                    OpenedInodesPayload,
-                    ReadIoSizesPayload,
-                    WriteIoSizesPayload,
-                    UnknownPayload> ClientMetricPayload;
+        ReadLatencyPayload,
+        WriteLatencyPayload,
+        MetadataLatencyPayload,
+        DentryLeasePayload,
+        OpenedFilesPayload,
+        PinnedIcapsPayload,
+        OpenedInodesPayload,
+        ReadIoSizesPayload,
+        WriteIoSizesPayload,
+        SubvolumeMetricsPayload,
+        UnknownPayload> ClientMetricPayload;
 
 // metric update message sent by clients
 struct ClientMetricMessage {
@@ -676,6 +912,9 @@ public:
     case ClientMetricType::CLIENT_METRIC_TYPE_WRITE_IO_SIZES:
       payload = WriteIoSizesPayload();
       break;
+    case ClientMetricType::CLIENT_METRIC_TYPE_SUBVOLUME_METRICS:
+      payload = SubvolumeMetricsPayload();
+      break;
     default:
       payload = UnknownPayload(static_cast<ClientMetricType>(metric_type));
       break;
index 244f348b25346209b5166dd0b07a285c393f1e50..597de4c0b55e3290e837f74e201f99020efa72ca 100644 (file)
@@ -4228,7 +4228,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
    * note: encoding matches MClientReply::InodeStat
    */
   if (session->info.has_feature(CEPHFS_FEATURE_REPLY_ENCODING)) {
-    ENCODE_START(8, 1, bl);
+    ENCODE_START(9, 1, bl);
     encode(std::tuple{
       oi->ino,
       snapid,
@@ -4281,6 +4281,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session,
     encode(file_i->fscrypt_auth, bl);
     encode(file_i->fscrypt_file, bl);
     encode_nohead(optmdbl, bl);
+    encode(get_subvolume_id(), bl);
     // encode inodestat
     ENCODE_FINISH(bl);
   }
@@ -5508,6 +5509,11 @@ int64_t CInode::get_backtrace_pool() const
   }
 }
 
+inodeno_t CInode::get_subvolume_id() const {
+  auto snapr = find_snaprealm();
+  return snapr ? snapr->get_subvolume_ino() : inodeno_t(0);
+}
+
 void CInode::queue_export_pin(mds_rank_t export_pin)
 {
   if (state_test(CInode::STATE_QUEUEDEXPORTPIN))
index f87fd408104586884ebb7414fd2d815e879c2626..c624f34ad5880b2f27a27ebee22f08506e711589 100644 (file)
@@ -1151,7 +1151,7 @@ class CInode : public MDSCacheObject, public InodeStoreBase, public Counter<CIno
    * @returns a pool ID >=0
    */
   int64_t get_backtrace_pool() const;
-
+  inodeno_t get_subvolume_id() const;
 protected:
   ceph_lock_state_t *get_fcntl_lock_state() {
     if (!fcntl_locks)
index 221da02f75122eb0b8ad82d067d594a1e2372d0d..dd2d806eb23a5ac841333aea4a4e7546b22090fe 100644 (file)
@@ -2491,6 +2491,13 @@ Capability* Locker::issue_new_caps(CInode *in,
   return cap;
 }
 
+void Locker::maybe_set_subvolume_id(const CInode* inode, ref_t<MClientCaps>& ack) {
+ if (ack && inode) {
+   ack->subvolume_id = inode->get_subvolume_id();
+   dout(10) << "setting subvolume id " << ack->subvolume_id << " for inode " << inode->ino().val << dendl;
+ }
+}
+
 void Locker::issue_caps_set(set<CInode*>& inset)
 {
   for (set<CInode*>::iterator p = inset.begin(); p != inset.end(); ++p)
@@ -3398,6 +3405,7 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
       if (mds->logger) mds->logger->inc(l_mdss_handle_client_caps_dirty);
   }
 
+  CInode *head_in = mdcache->get_inode(m->get_ino());
   if (m->get_client_tid() > 0 && session &&
       session->have_completed_flush(m->get_client_tid())) {
     dout(7) << "handle_client_caps already flushed tid " << m->get_client_tid()
@@ -3406,9 +3414,11 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
     if (op == CEPH_CAP_OP_FLUSHSNAP) {
       if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flushsnap_ack);
       ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSHSNAP_ACK, m->get_ino(), 0, 0, 0, 0, 0, dirty, 0, 0, mds->get_osd_epoch_barrier());
+      maybe_set_subvolume_id(head_in, ack);
     } else {
       if (mds->logger) mds->logger->inc(l_mdss_ceph_cap_op_flush_ack);
       ack = make_message<MClientCaps>(CEPH_CAP_OP_FLUSH_ACK, m->get_ino(), 0, m->get_cap_id(), m->get_seq(), m->get_caps(), 0, dirty, 0, 0, mds->get_osd_epoch_barrier());
+      maybe_set_subvolume_id(head_in, ack);
     }
     ack->set_snap_follows(follows);
     ack->set_client_tid(m->get_client_tid());
@@ -3445,7 +3455,6 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
     }
   }
 
-  CInode *head_in = mdcache->get_inode(m->get_ino());
   if (!head_in) {
     if (mds->is_clientreplay()) {
       dout(7) << "handle_client_caps on unknown ino " << m->get_ino()
@@ -3534,6 +3543,7 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
       ack->set_snap_follows(follows);
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+      maybe_set_subvolume_id(head_in, ack);
     }
 
     if (in == head_in ||
@@ -3622,6 +3632,7 @@ void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
           m->get_caps(), 0, dirty, 0, cap->get_last_issue(), mds->get_osd_epoch_barrier());
       ack->set_client_tid(m->get_client_tid());
       ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
+      maybe_set_subvolume_id(head_in, ack);
     }
 
     // filter wanted based on what we could ever give out (given auth/replica status)
index d79bdbb8d4b7ce525d94dcda34c99d2841867b0e..e3544dd8421b485c4afaf2b9b717fc300819b93c 100644 (file)
@@ -262,6 +262,7 @@ protected:
   void file_update_finish(CInode *in, MutationRef& mut, unsigned flags,
                          client_t client, const ref_t<MClientCaps> &ack);
 
+  void maybe_set_subvolume_id(const CInode* head_in, ref_t<MClientCaps>& ack);
   xlist<ScatterLock*> updated_scatterlocks;
 
   // Maintain a global list to quickly find if any caps are late revoking
index 32e9a8cdfc40d7aa1432d19c9cc97a1201c5d34c..e397c2ec6e931d74be9a9b90b604f410181204d4 100644 (file)
@@ -4057,6 +4057,15 @@ epoch_t MDSRank::get_osd_epoch() const
   return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
 }
 
+std::string MDSRank::get_path(inodeno_t ino) {
+  std::lock_guard locker(mds_lock);
+  CInode* inode = mdcache->get_inode(ino);
+  if (!inode) return {};
+  std::string res;
+  inode->make_path_string(res);
+  return res;
+}
+
 std::vector<std::string> MDSRankDispatcher::get_tracked_keys()
     const noexcept
 {
index 5a46f765dc92b65fc60120322020dc2087d4e519..402373e80e1cd42f1da6161bf63977aa9d3459d6 100644 (file)
@@ -394,6 +394,8 @@ class MDSRank {
       return inject_journal_corrupt_dentry_first;
     }
 
+    std::string get_path(inodeno_t ino);
+
     // Reference to global MDS::mds_lock, so that users of MDSRank don't
     // carry around references to the outer MDS, and we can substitute
     // a separate lock here in future potentially.
index ae16e8962b83eb460b3fd0b03d9f218510610758..235a5f6cc3b2d4db7d14a753ab5b36395a01993b 100644 (file)
@@ -3,9 +3,10 @@
 
 #include "MetricsHandler.h"
 
+#include <variant>
+
 #include "common/debug.h"
 #include "common/errno.h"
-#include "include/cephfs/metrics/Types.h"
 
 #include "messages/MClientMetrics.h"
 #include "messages/MMDSMetrics.h"
@@ -322,6 +323,56 @@ void MetricsHandler::handle_payload(Session *session, const WriteIoSizesPayload
   metrics.write_io_sizes_metric.updated = true;
 }
 
+void MetricsHandler::handle_payload(Session* session,  const SubvolumeMetricsPayload& payload, std::unique_lock<ceph::mutex>& lk) {
+  dout(20) << ": type=" << payload.get_type() << ", session=" << session
+      << " , subv_metrics count=" << payload.subvolume_metrics.size() << dendl;
+
+  ceph_assert(lk.owns_lock()); // caller must hold the lock
+
+  std::vector<std::string> resolved_paths;
+  resolved_paths.reserve(payload.subvolume_metrics.size());
+
+  // RAII guard: unlock on construction, re-lock on destruction (even on exceptions)
+  struct UnlockGuard {
+    std::unique_lock<ceph::mutex> &lk;
+    explicit UnlockGuard(std::unique_lock<ceph::mutex>& l) : lk(l) { lk.unlock(); }
+    ~UnlockGuard() noexcept {
+      if (!lk.owns_lock()) {
+        try { lk.lock(); }
+        catch (...) {
+          dout(0) << "failed to re-lock in UnlockGuard dtor" << dendl;
+        }
+      }
+    }
+  } unlock_guard{lk};
+
+  // unlocked: resolve paths, no contention with mds lock
+  for (const auto& metric : payload.subvolume_metrics) {
+    std::string path = mds->get_path(metric.subvolume_id);
+    if (path.empty()) {
+      dout(10) << " path not found for " << metric.subvolume_id << dendl;
+    }
+    resolved_paths.emplace_back(std::move(path));
+  }
+
+  // locked again (via UnlockGuard dtor): update metrics map
+  const auto now_ms = static_cast<int64_t>(
+  std::chrono::duration_cast<std::chrono::milliseconds>(
+std::chrono::steady_clock::now().time_since_epoch()).count());
+
+  // Keep index pairing but avoid double map lookup
+  for (size_t i = 0; i < resolved_paths.size(); ++i) {
+    const auto& path = resolved_paths[i];
+    if (path.empty()) continue;
+
+    auto& vec = subvolume_metrics_map[path];
+
+    dout(20) << " accumulating subv_metric " << payload.subvolume_metrics[i] << dendl;
+    vec.emplace_back(std::move(payload.subvolume_metrics[i]));
+    vec.back().time_stamp = now_ms;
+  }
+}
+
 void MetricsHandler::handle_payload(Session *session, const UnknownPayload &payload) {
   dout(5) << ": type=Unknown, session=" << session << ", ignoring unknown payload" << dendl;
 }
@@ -332,7 +383,7 @@ void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
     return;
   }
 
-  std::scoped_lock locker(lock);
+  std::unique_lock locker(lock);
 
   Session *session = mds->get_session(m);
   dout(20) << ": session=" << session << dendl;
@@ -343,7 +394,14 @@ void MetricsHandler::handle_client_metrics(const cref_t<MClientMetrics> &m) {
   }
 
   for (auto &metric : m->updates) {
-    std::visit(HandlePayloadVisitor(this, session), metric.payload);
+    // Special handling for SubvolumeMetricsPayload to avoid lock contention
+    if (auto* subv_payload = std::get_if<SubvolumeMetricsPayload>(&metric.payload)) {
+      // this handles the subvolume metrics payload without acquiring the mds lock
+      // should not call the visitor pattern here
+      handle_payload(session, *subv_payload, locker);
+    } else {
+      std::visit(HandlePayloadVisitor(this, session), metric.payload);
+    }
   }
 }
 
@@ -410,14 +468,65 @@ void MetricsHandler::update_rank0() {
     }
   }
 
+  // subvolume metrics, reserve 100 entries per subvolume ? good enough?
+  metrics_message.subvolume_metrics.reserve(subvolume_metrics_map.size()* 100);
+  for (auto &[path, aggregated_metrics] : subvolume_metrics_map) {
+    metrics_message.subvolume_metrics.emplace_back();
+    aggregate_subvolume_metrics(path, aggregated_metrics, metrics_message.subvolume_metrics.back());
+  }
+  // if we need to show local MDS metrics, we need to save a last copy...
+  subvolume_metrics_map.clear();
+
   // only start incrementing when its kicked via set_next_seq()
   if (next_seq != 0) {
     ++last_updated_seq;
   }
 
-  dout(20) << ": sending metric updates for " << update_client_metrics_map.size()
-           << " clients to rank 0 (address: " << *addr_rank0 << ") with sequence number "
-           << next_seq << ", last updated sequence number " << last_updated_seq << dendl;
+  dout(20) << ": sending " << metrics_message.subvolume_metrics.size() << " subv_metrics to aggregator"
+          << dendl;
 
   mds->send_message_mds(make_message<MMDSMetrics>(std::move(metrics_message)), *addr_rank0);
 }
+
+void MetricsHandler::aggregate_subvolume_metrics(const std::string& subvolume_path,
+                                 const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res) {
+  dout(20) << ": aggregating " << metrics_list.size() << " subv_metrics" << dendl;
+  res.subvolume_path = subvolume_path;
+
+  uint64_t weighted_read_latency_sum = 0;
+  uint64_t weighted_write_latency_sum = 0;
+
+  res.read_ops = 0;
+  res.write_ops = 0;
+  res.read_size = 0;
+  res.write_size = 0;
+  res.avg_read_latency = 0;
+  res.avg_write_latency = 0;
+  res.time_stamp = 0;
+
+  for (const auto& m : metrics_list) {
+    res.read_ops += m.read_count;
+    res.write_ops += m.write_count;
+    res.read_size += m.read_bytes;
+    res.write_size += m.write_bytes;
+    // we want to have more metrics in the sliding window (on the aggregator),
+    // so we set the latest timestamp of all received metrics
+    res.time_stamp = std::max(res.time_stamp, m.time_stamp);
+
+    if (m.read_count > 0) {
+      weighted_read_latency_sum += m.read_latency_us * m.read_count;
+    }
+
+    if (m.write_count > 0) {
+      weighted_write_latency_sum += m.write_latency_us * m.write_count;
+    }
+  }
+
+  // normalize latencies
+  res.avg_read_latency = (res.read_ops > 0)
+                         ? (weighted_read_latency_sum / res.read_ops)
+                         : 0;
+  res.avg_write_latency = (res.write_ops > 0)
+                          ? (weighted_write_latency_sum / res.write_ops)
+                          : 0;
+}
\ No newline at end of file
index 0d03fe8b209874dca55728dfa233650fbc0a2bd8..3dc532bf4e2853e3d02250909f6e888d37300d4e 100644 (file)
@@ -5,6 +5,8 @@
 #define CEPH_MDS_METRICS_HANDLER_H
 
 #include <map>
+#include <mutex>
+#include <unordered_map>
 #include <thread>
 #include <utility>
 
@@ -12,6 +14,7 @@
 #include "common/ceph_mutex.h"
 
 #include "MDSPerfMetricTypes.h"
+#include "include/cephfs/metrics/Types.h"
 
 #include <boost/optional.hpp>
 #include <boost/variant/static_visitor.hpp>
@@ -26,7 +29,9 @@ struct PinnedIcapsPayload;
 struct OpenedInodesPayload;
 struct ReadIoSizesPayload;
 struct WriteIoSizesPayload;
+struct SubvolumeMetricsPayload;
 struct UnknownPayload;
+struct AggregatedIOMetrics;
 class MClientMetrics;
 class MDSMap;
 class MDSRank;
@@ -71,8 +76,17 @@ private:
     inline void operator()(const ClientMetricPayload &payload) const {
       metrics_handler->handle_payload(session, payload);
     }
+    
+    // Specialization for SubvolumeMetricsPayload - should not be called
+    // as it's handled specially in handle_client_metrics
+    // just for the compiler to be happy with visitor pattern
+    inline void operator()(const SubvolumeMetricsPayload &) const {
+      ceph_abort_msg("SubvolumeMetricsPayload should be handled specially");
+    }
   };
 
+  std::unique_ptr<PerfCounters> create_subv_perf_counter(const std::string& subv_name);
+
   MDSRank *mds;
   // drop this lock when calling ->send_message_mds() else mds might
   // deadlock
@@ -89,8 +103,10 @@ private:
 
   std::thread updater;
   std::map<entity_inst_t, std::pair<version_t, Metrics>> client_metrics_map;
-
-  // address of rank 0 mds, so that the message can be sent without
+  // maps subvolume path -> aggregated metrics from all clients reporting to this MDS instance
+  std::unordered_map<std::string, std::vector<AggregatedIOMetrics>> subvolume_metrics_map;
+  uint64_t subv_metrics_tracker_window_time_sec = 300;
+  // address of rank 0 mds, so that the message can be sent withoutå
   // acquiring mds_lock. misdirected messages to rank 0 are taken
   // care of by rank 0.
   boost::optional<entity_addrvec_t> addr_rank0;
@@ -107,6 +123,7 @@ private:
   void handle_payload(Session *session, const OpenedInodesPayload &payload);
   void handle_payload(Session *session, const ReadIoSizesPayload &payload);
   void handle_payload(Session *session, const WriteIoSizesPayload &payload);
+  void handle_payload(Session *session, const SubvolumeMetricsPayload &payload, std::unique_lock<ceph::mutex> &lock_guard);
   void handle_payload(Session *session, const UnknownPayload &payload);
 
   void set_next_seq(version_t seq);
@@ -116,6 +133,9 @@ private:
   void handle_mds_ping(const cref_t<MMDSPing> &m);
 
   void update_rank0();
+
+  void aggregate_subvolume_metrics(const std::string& subvolume_path,
+                                   const std::vector<AggregatedIOMetrics>& metrics_list, SubvolumeMetric &res);
 };
 
 #endif // CEPH_MDS_METRICS_HANDLER_H
index 391054d8701c12ff301e98a4dcc500c3cbf92bfa..4a74a56daab1f53207a0669d6a252916daee7901 100644 (file)
@@ -94,7 +94,8 @@ namespace ceph {
     CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY,      \
     CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,    \
     CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,   \
-    CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
+    CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
+    CLIENT_METRIC_TYPE_SUBVOLUME_METRICS, \
 }
 
 #define CEPHFS_FEATURES_MDS_SUPPORTED CEPHFS_FEATURES_ALL
index 21095afcc3d5436844ed97750f9aafe779c8724c..4104fd2788894a7025b078b3ac19ab4b6d840d97 100644 (file)
@@ -1234,3 +1234,26 @@ void BlockDiff::print(ostream& out) const
 {
   out << "{rval: " << rval << ", scan_idx=" << scan_idx << ", blocks=" << blocks << "}";
 }
+
+void SubvolumeMetric::dump(Formatter *f) const {
+  f->dump_string("subvolume_path", subvolume_path);
+  f->dump_unsigned("read_ops", read_ops);
+  f->dump_unsigned("write_ops", write_ops);
+  f->dump_unsigned("read_size", read_size);
+  f->dump_unsigned("write_size", write_size);
+  f->dump_unsigned("avg_read_latency", avg_read_latency);
+  f->dump_unsigned("avg_write_latency", avg_write_latency);
+  f->dump_unsigned("time_window_sec", time_stamp);
+}
+
+std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m) {
+  os << "{subv_path=" << m.subvolume_path
+     << ", read_ops=" << m.read_ops
+     << ", write_ops=" << m.write_ops
+     << ", read_size=" << m.read_size
+     << ", write_size=" << m.write_size
+     << ", avg_read_lat=" << m.avg_read_latency
+     << ", avg_write_lat=" << m.avg_write_latency
+     << ", time_window_sec=" << m.time_stamp << "}";
+  return os;
+}
index 056db3d7fea859c28b08db502888ed776c680b7f..b57636123cbefde1ace06d24499486af50b22f02 100644 (file)
@@ -10,6 +10,7 @@
 #include <map>
 #include <string>
 #include <string_view>
+#include <shared_mutex>
 
 #include "common/DecayCounter.h"
 #include "common/entity_name.h"
@@ -988,4 +989,37 @@ struct BlockDiff {
 };
 WRITE_CLASS_ENCODER(BlockDiff);
 
+/**
+ *
+ * @brief Represents aggregated metric per subvolume
+ * This aggregation is a result of aggregating multiple
+ * AggregatedIOMetric instances from various clients
+ * (AggregatedIOMetric is created on the client by aggregating multiple SimpleIOMetric instances)
+ */
+struct SubvolumeMetric {
+    std::string subvolume_path;
+    uint64_t read_ops = 0;
+    uint64_t write_ops = 0;
+    uint64_t read_size = 0;
+    uint64_t write_size = 0;
+    uint64_t avg_read_latency = 0;
+    uint64_t avg_write_latency = 0;
+    uint64_t time_stamp = 0;
+
+    DENC(SubvolumeMetric, v, p) {
+      DENC_START(1, 1, p);
+      denc(v.subvolume_path, p);
+      denc(v.read_ops, p);
+      denc(v.write_ops, p);
+      denc(v.read_size, p);
+      denc(v.write_size, p);
+      denc(v.avg_read_latency, p);
+      denc(v.avg_write_latency, p);
+      denc(v.time_stamp, p);
+      DENC_FINISH(p);
+    }
+
+    void dump(Formatter *f) const;
+    friend std::ostream& operator<<(std::ostream& os, const SubvolumeMetric &m);
+};
 #endif
index b001032225e2434f56ca1ae46ff6502c17fdf174..bdb2040e8609e4199594b446074c35bdf26db4e5 100644 (file)
@@ -22,7 +22,7 @@
 class MClientCaps final : public SafeMessage {
 private:
 
-  static constexpr int HEAD_VERSION = 12;
+  static constexpr int HEAD_VERSION = 13;
   static constexpr int COMPAT_VERSION = 1;
 
  public:
@@ -61,6 +61,7 @@ private:
 
   std::vector<uint8_t> fscrypt_auth;
   std::vector<uint8_t> fscrypt_file;
+  uint64_t subvolume_id = 0;
 
   int      get_caps() const { return head.caps; }
   int      get_wanted() const { return head.wanted; }
@@ -282,6 +283,9 @@ public:
       decode(fscrypt_auth, p);
       decode(fscrypt_file, p);
     }
+    if (header.version >= 13) {
+          decode(subvolume_id, p);
+    }
   }
   void encode_payload(uint64_t features) override {
     using ceph::encode;
@@ -352,6 +356,7 @@ public:
     encode(nsubdirs, payload);
     encode(fscrypt_auth, payload);
     encode(fscrypt_file, payload);
+    encode(subvolume_id, payload);
   }
 private:
   template<class T, typename... Args>
index 19af4377bbdcc64ff66da9fae4e267ada931b201..7a8f07232a73a481d971c5d2a203f5b6e693a8be 100644 (file)
@@ -152,6 +152,7 @@ struct InodeStat {
   std::vector<uint8_t> fscrypt_file;
 
   optmetadata_multiton<optmetadata_singleton_client_t,std::allocator> optmetadata;
+  inodeno_t subvolume_id;
 
  public:
   InodeStat() {}
@@ -166,7 +167,7 @@ struct InodeStat {
   void decode(ceph::buffer::list::const_iterator &p, const uint64_t features) {
     using ceph::decode;
     if (features == (uint64_t)-1) {
-      DECODE_START(8, p);
+      DECODE_START(9, p);
       decode(vino.ino, p);
       decode(vino.snapid, p);
       decode(rdev, p);
@@ -232,6 +233,9 @@ struct InodeStat {
       if (struct_v >= 8) {
         decode(optmetadata, p);
       }
+       if (struct_v >= 9) {
+         decode(subvolume_id, p);
+       }
       DECODE_FINISH(p);
     }
     else {