]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add s3select usage to log usage
authorSeena Fallah <seenafallah@gmail.com>
Sat, 18 Nov 2023 19:14:56 +0000 (20:14 +0100)
committerKonstantin Shalygin <k0ste@k0ste.ru>
Sat, 10 Aug 2024 09:15:23 +0000 (16:15 +0700)
Expose bytes processed and bytes returned to log usage.

Fixes: https://tracker.ceph.com/issues/63563
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
(cherry picked from commit e3ececd293c64a76565293616b93b8b87550f8d2)

qa/tasks/radosgw_admin.py
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_common.h
src/rgw/rgw_log.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_s3select.cc
src/rgw/rgw_usage.cc
src/tools/ceph-dencoder/rgw_types.h

index 28d58715facacec0d10668ebbe1cad1ce075337d..3b98702accaae65de8bc3c2edd1e16abd69b8d73 100644 (file)
@@ -59,7 +59,8 @@ def usage_acc_findsum2(summaries, user, add=True):
         return None
     e = {'user': user, 'categories': [],
         'total': {'bytes_received': 0,
-            'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }}
+            'bytes_sent': 0, 'ops': 0, 'successful_ops': 0,
+            'bytes_processed': 0, 'bytes_returned': 0}}
     summaries.append(e)
     return e
 def usage_acc_update2(x, out, b_in, err):
@@ -79,6 +80,17 @@ def usage_acc_validate_fields(r, x, x2, what):
             return
     if len(q) > 0:
         r.append("incomplete counts in " + what + ": " + ", ".join(q))
+def usage_acc_validate_s3select_fields(r, x, x2, what):
+    q=[]
+    for field in ['bytes_processed', 'bytes_returned']:
+        try:
+            if x2[field] < x[field]:
+                q.append("field %s: %d < %d" % (field, x2[field], x[field]))
+        except Exception as ex:
+            r.append( "missing/bad field " + field + " in " + what + " " + str(ex))
+            return
+    if len(q) > 0:
+        r.append("incomplete counts in " + what + ": " + ", ".join(q))
 class usage_acc:
     def __init__(self):
         self.results = {'entries': [], 'summary': []}
@@ -92,7 +104,9 @@ class usage_acc:
                 return b
         if not add:
                 return None
-        b = {'bucket': bucket, 'categories': []}
+        b = {'bucket': bucket, 'categories': [], 's3select': {
+            'bytes_processed': 0, 'bytes_returned': 0,
+        }}
         e['buckets'].append(b)
         return b
     def c2x(self, c, cat, add=True):
@@ -146,60 +160,69 @@ class usage_acc:
                 try:
                     b2 = self.e2b(e2, b['bucket'], False)
                     if b2 != None:
-                            c2 = b2['categories']
+                        c2 = b2['categories']
                 except Exception as ex:
                     r.append("malformed entry looking for bucket "
-                       + b['bucket'] + " in user " + e['user'] + " " + str(ex))
+                        + b['bucket'] + " in user " + e['user'] + " " + str(ex))
                     break
                 if b2 == None:
                     r.append("can't find bucket " + b['bucket']
-                       + " in user " + e['user'])
+                        + " in user " + e['user'])
                     continue
                 for x in c:
                     try:
                         x2 = self.c2x(c2, x['category'], False)
                     except Exception as ex:
                         r.append("malformed entry looking for "
-                           + x['category'] + " in bucket " + b['bucket']
-                           + " user " + e['user'] + " " + str(ex))
+                            + x['category'] + " in bucket " + b['bucket']
+                            + " user " + e['user'] + " " + str(ex))
                         break
                     usage_acc_validate_fields(r, x, x2, "entry: category "
-                       + x['category'] + " bucket " + b['bucket']
-                       + " in user " + e['user'])
+                        + x['category'] + " bucket " + b['bucket']
+                        + " in user " + e['user'])
+
+                if 's3select' not in b2:
+                    r.append("missing s3select in bucket "
+                        + b['bucket'] + " in user " + e['user'])
+                    continue
+                usage_acc_validate_s3select_fields(r,
+                    b['s3select'], b2['s3select'],
+                    "entry: s3select in bucket " + b['bucket'] + " in user " + e['user'])
         for s in self.results['summary']:
             c = s['categories']
             try:
                 s2 = usage_acc_findsum2(results['summary'], s['user'], False)
             except Exception as ex:
-                r.append("malformed summary looking for user " + e['user']
-                   + " " + str(ex))
+                r.append("malformed summary looking for user " + s['user']
+                    + " " + str(ex))
                 break
-                if s2 == None:
-                    r.append("missing summary for user " + e['user'] + " " + str(ex))
-                    continue
+            if s2 == None:
+                r.append("missing summary for user " + s['user'])
+                continue
             try:
                 c2 = s2['categories']
             except Exception as ex:
                 r.append("malformed summary missing categories for user "
-                   + e['user'] + " " + str(ex))
+                    + s['user'] + " " + str(ex))
                 break
             for x in c:
                 try:
                     x2 = self.c2x(c2, x['category'], False)
                 except Exception as ex:
                     r.append("malformed summary looking for "
-                       + x['category'] + " user " + e['user'] + " " + str(ex))
+                        + x['category'] + " user " + s['user'] + " " + str(ex))
                     break
                 usage_acc_validate_fields(r, x, x2, "summary: category "
-                   + x['category'] + " in user " + e['user'])
+                    + x['category'] + " in user " + s['user'])
             x = s['total']
             try:
                 x2 = s2['total']
             except Exception as ex:
                 r.append("malformed summary looking for totals for user "
-                         + e['user'] + " " + str(ex))
+                    + s['user'] + " " + str(ex))
                 break
-            usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user'])
+            usage_acc_validate_fields(r, x, x2, "summary: totals for user" + s['user'])
+            usage_acc_validate_s3select_fields(r, x, x2, "summary: s3select totals for user" + s['user'])
         return r
 
 def ignore_this_entry(cat, bucket, user, out, b_in, err):
index 1c232a576bbaf9d4a081ff283cbc5f1482f3800c..05d48d734fc042602c18a4311c622d5504ffa9c4 100644 (file)
@@ -693,6 +693,21 @@ void rgw_bucket_dir::dump(Formatter *f) const
   f->close_section();
 }
 
+void rgw_s3select_usage_data::generate_test_instances(list<rgw_s3select_usage_data*>& o)
+{
+  rgw_s3select_usage_data *s = new rgw_s3select_usage_data;
+  s->bytes_processed = 1024;
+  s->bytes_returned = 512;
+  o.push_back(s);
+  o.push_back(new rgw_s3select_usage_data);
+}
+
+void rgw_s3select_usage_data::dump(Formatter *f) const
+{
+  f->dump_unsigned("bytes_processed", bytes_processed);
+  f->dump_unsigned("bytes_returned", bytes_returned);
+}
+
 void rgw_usage_data::generate_test_instances(list<rgw_usage_data*>& o)
 {
   rgw_usage_data *s = new rgw_usage_data;
@@ -773,12 +788,18 @@ void rgw_usage_log_entry::dump(Formatter *f) const
     }
   }
   f->close_section();
+
+  f->open_object_section("s3select");
+  f->dump_unsigned("bytes_processed", s3select_usage.bytes_processed);
+  f->dump_unsigned("bytes_returned", s3select_usage.bytes_returned);
+  f->close_section();
 }
 
 void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o)
 {
   rgw_usage_log_entry *entry = new rgw_usage_log_entry;
   rgw_usage_data usage_data{1024, 2048};
+  rgw_s3select_usage_data s3select_usage_data{8192, 4096};
   entry->owner = rgw_user("owner");
   entry->payer = rgw_user("payer");
   entry->bucket = "bucket";
@@ -788,6 +809,7 @@ void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o
   entry->total_usage.ops = usage_data.ops;
   entry->total_usage.successful_ops = usage_data.successful_ops;
   entry->usage_map["get_obj"] = usage_data;
+  entry->s3select_usage = s3select_usage_data;
   o.push_back(entry);
   o.push_back(new rgw_usage_log_entry);
 }
index 7b6b46101ccc0cd1759d968da2510c41b211947f..e38e06c7ced9a9b2589b7a4d74b229ff7921df02 100644 (file)
@@ -869,6 +869,38 @@ struct rgw_bucket_dir {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_dir)
 
+struct rgw_s3select_usage_data {
+  uint64_t bytes_processed;
+  uint64_t bytes_returned;
+
+  rgw_s3select_usage_data() : bytes_processed(0), bytes_returned(0) {}
+  rgw_s3select_usage_data(uint64_t processed, uint64_t returned)
+    : bytes_processed(processed), bytes_returned(returned) {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bytes_processed, bl);
+    encode(bytes_returned, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(bytes_processed, bl);
+    decode(bytes_returned, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void aggregate(const rgw_s3select_usage_data& usage) {
+    bytes_processed += usage.bytes_processed;
+    bytes_returned += usage.bytes_returned;
+  }
+
+  void dump(ceph::Formatter *f) const;
+  static void generate_test_instances(std::list<rgw_s3select_usage_data*>& o);
+};
+WRITE_CLASS_ENCODER(rgw_s3select_usage_data)
+
 struct rgw_usage_data {
   uint64_t bytes_sent;
   uint64_t bytes_received;
@@ -915,13 +947,14 @@ struct rgw_usage_log_entry {
   uint64_t epoch;
   rgw_usage_data total_usage; /* this one is kept for backwards compatibility */
   std::map<std::string, rgw_usage_data> usage_map;
+  rgw_s3select_usage_data s3select_usage;
 
   rgw_usage_log_entry() : epoch(0) {}
   rgw_usage_log_entry(std::string& o, std::string& b) : owner(o), bucket(b), epoch(0) {}
   rgw_usage_log_entry(std::string& o, std::string& p, std::string& b) : owner(o), payer(p), bucket(b), epoch(0) {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(owner.to_str(), bl);
     encode(bucket, bl);
     encode(epoch, bl);
@@ -931,12 +964,13 @@ struct rgw_usage_log_entry {
     encode(total_usage.successful_ops, bl);
     encode(usage_map, bl);
     encode(payer.to_str(), bl);
+    encode(s3select_usage, bl);
     ENCODE_FINISH(bl);
   }
 
 
    void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     std::string s;
     decode(s, bl);
     owner.from_str(s);
@@ -956,6 +990,9 @@ struct rgw_usage_log_entry {
       decode(p, bl);
       payer.from_str(p);
     }
+    if (struct_v >= 4) {
+      decode(s3select_usage, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -970,9 +1007,13 @@ struct rgw_usage_log_entry {
 
     for (auto iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) {
       if (!categories || !categories->size() || categories->count(iter->first)) {
-        add(iter->first, iter->second);
+        add_usage(iter->first, iter->second);
       }
     }
+
+    if (!categories || !categories->size() || categories->count("s3select")) {
+      s3select_usage.aggregate(e.s3select_usage);
+    }
   }
 
   void sum(rgw_usage_data& usage,
@@ -985,7 +1026,7 @@ struct rgw_usage_log_entry {
     }
   }
 
-  void add(const std::string& category, const rgw_usage_data& data) {
+  void add_usage(const std::string& category, const rgw_usage_data& data) {
     usage_map[category].aggregate(data);
     total_usage.aggregate(data);
   }
index c95acae4510eefa5b1dc9cf2683ad4b8bc2c515e..2002ae51ec9b9ab12bde7f3a5c2452730e528ed5 100644 (file)
@@ -1260,6 +1260,7 @@ struct req_state : DoutPrefixProvider {
   uint64_t obj_size{0};
   bool enable_ops_log;
   bool enable_usage_log;
+  rgw_s3select_usage_data s3select_usage;
   uint8_t defer_to_bucket_acls;
   uint32_t perm_mask{0};
 
index ba60e576b7732761fccdf0daf82f2e382cd450a3..5d7e3678f8fd3dc549169574cc963a37ad0d4442 100644 (file)
@@ -236,9 +236,12 @@ static void log_usage(req_state *s, const string& op_name)
   ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
        << " tenant=" << s->bucket_tenant
        << ", bytes_sent=" << bytes_sent << ", bytes_received="
-       << bytes_received << ", success=" << data.successful_ops << dendl;
+       << bytes_received << ", success=" << data.successful_ops
+       << ", bytes_processed=" << s->s3select_usage.bytes_processed
+       << ", bytes_returned=" << s->s3select_usage.bytes_returned << dendl;
 
-  entry.add(op_name, data);
+  entry.add_usage(op_name, data);
+  entry.s3select_usage = s->s3select_usage;
 
   utime_t ts = ceph_clock_now();
 
index 29e12e7720cd9f2ab895e0b27a1e8efce80a19dc..a9a6c3699d0d2e54c74e1d33a44b99d9f21ed986 100644 (file)
@@ -1592,7 +1592,16 @@ void RGWGetUsage_ObjStore_S3::send_response()
       utime_t ut(entry.epoch, 0);
       ut.gmtime(formatter->dump_stream("Time"));
       formatter->dump_int("Epoch", entry.epoch);
+
       dump_usage_categories_info(formatter, entry, &categories);
+
+      formatter->open_object_section("s3select");
+      if (categories.empty() || categories.count("s3select")) {
+        encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+        encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
+      }
+      formatter->close_section(); // s3select
+
       formatter->close_section(); // bucket
     }
 
@@ -1622,6 +1631,8 @@ void RGWGetUsage_ObjStore_S3::send_response()
        encode_json("BytesReceived", total_usage.bytes_received, formatter);
        encode_json("Ops", total_usage.ops, formatter);
        encode_json("SuccessfulOps", total_usage.successful_ops, formatter);
+       encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+       encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
        formatter->close_section(); // total
        formatter->close_section(); // user
      }
index 8c6dc542512b868c96472d577f327680c1aad124..1b7dced2782176987a5dc16c2bd5d29544f04566 100644 (file)
@@ -27,6 +27,7 @@ uint64_t aws_response_handler::get_processed_size()
 void aws_response_handler::update_processed_size(uint64_t value)
 {
   processed_size += value;
+  s->s3select_usage.bytes_processed = processed_size;
 }
 
 uint64_t aws_response_handler::get_total_bytes_returned()
@@ -37,6 +38,7 @@ uint64_t aws_response_handler::get_total_bytes_returned()
 void aws_response_handler::update_total_bytes_returned(uint64_t value)
 {
   total_bytes_returned = value;
+  s->s3select_usage.bytes_returned = total_bytes_returned;
 }
 
 void aws_response_handler::push_header(const char* header_name, const char* header_value)
index 43e56577c24ca993686ec61d1373c00fcd384e1e..1f9cb6f8d5fc78f3ea5408e90c117d0297d8c279 100644 (file)
@@ -105,7 +105,16 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
         if (!payer.empty() && payer != owner) {
           formatter->dump_string("payer", payer);
         }
+
         dump_usage_categories_info(formatter, entry, categories);
+
+        formatter->open_object_section("s3select");
+        if (!categories || categories->empty() || categories->count("s3select")) {
+          formatter->dump_unsigned("bytes_processed", entry.s3select_usage.bytes_processed);
+          formatter->dump_unsigned("bytes_returned", entry.s3select_usage.bytes_returned);
+        }
+        formatter->close_section(); // s3select
+
         formatter->close_section(); // bucket
         flusher.flush();
       }
@@ -136,6 +145,8 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
       encode_json("bytes_received", total_usage.bytes_received, formatter);
       encode_json("ops", total_usage.ops, formatter);
       encode_json("successful_ops", total_usage.successful_ops, formatter);
+      encode_json("bytes_processed", entry.s3select_usage.bytes_processed, formatter);
+      encode_json("bytes_returned", entry.s3select_usage.bytes_returned, formatter);
       formatter->close_section(); // total
 
       formatter->close_section(); // user
index da1fde8f4853b0c4d1c493ae5339cbd679df5123..8cd0acfc62459d4b810683e95359d28fa9cfcca5 100644 (file)
@@ -53,6 +53,7 @@ TYPE(rgw_usage_log_entry)
 TYPE(rgw_cls_bi_entry)
 TYPE(rgw_bucket_olh_entry)
 TYPE(rgw_usage_data)
+TYPE(rgw_s3select_usage_data)
 TYPE(rgw_usage_log_info)
 TYPE(rgw_user_bucket)
 TYPE(cls_rgw_lc_entry)