]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add s3select usage to log usage 57229/head
authorSeena Fallah <seenafallah@gmail.com>
Sat, 18 Nov 2023 19:14:56 +0000 (20:14 +0100)
committerGal Salomon <gal.salomon@gmail.com>
Wed, 21 Aug 2024 08:31:48 +0000 (11:31 +0300)
Expose bytes processed and bytes returned to log usage.

Fixes: https://tracker.ceph.com/issues/63563
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
(cherry picked from commit e3ececd293c64a76565293616b93b8b87550f8d2)

qa/tasks/radosgw_admin.py
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_common.h
src/rgw/rgw_log.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_usage.cc
src/tools/ceph-dencoder/rgw_types.h

index 28d58715facacec0d10668ebbe1cad1ce075337d..3b98702accaae65de8bc3c2edd1e16abd69b8d73 100644 (file)
@@ -59,7 +59,8 @@ def usage_acc_findsum2(summaries, user, add=True):
         return None
     e = {'user': user, 'categories': [],
         'total': {'bytes_received': 0,
-            'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }}
+            'bytes_sent': 0, 'ops': 0, 'successful_ops': 0,
+            'bytes_processed': 0, 'bytes_returned': 0}}
     summaries.append(e)
     return e
 def usage_acc_update2(x, out, b_in, err):
@@ -79,6 +80,17 @@ def usage_acc_validate_fields(r, x, x2, what):
             return
     if len(q) > 0:
         r.append("incomplete counts in " + what + ": " + ", ".join(q))
+def usage_acc_validate_s3select_fields(r, x, x2, what):
+    q=[]
+    for field in ['bytes_processed', 'bytes_returned']:
+        try:
+            if x2[field] < x[field]:
+                q.append("field %s: %d < %d" % (field, x2[field], x[field]))
+        except Exception as ex:
+            r.append( "missing/bad field " + field + " in " + what + " " + str(ex))
+            return
+    if len(q) > 0:
+        r.append("incomplete counts in " + what + ": " + ", ".join(q))
 class usage_acc:
     def __init__(self):
         self.results = {'entries': [], 'summary': []}
@@ -92,7 +104,9 @@ class usage_acc:
                 return b
         if not add:
                 return None
-        b = {'bucket': bucket, 'categories': []}
+        b = {'bucket': bucket, 'categories': [], 's3select': {
+            'bytes_processed': 0, 'bytes_returned': 0,
+        }}
         e['buckets'].append(b)
         return b
     def c2x(self, c, cat, add=True):
@@ -146,60 +160,69 @@ class usage_acc:
                 try:
                     b2 = self.e2b(e2, b['bucket'], False)
                     if b2 != None:
-                            c2 = b2['categories']
+                        c2 = b2['categories']
                 except Exception as ex:
                     r.append("malformed entry looking for bucket "
-                       + b['bucket'] + " in user " + e['user'] + " " + str(ex))
+                        + b['bucket'] + " in user " + e['user'] + " " + str(ex))
                     break
                 if b2 == None:
                     r.append("can't find bucket " + b['bucket']
-                       + " in user " + e['user'])
+                        + " in user " + e['user'])
                     continue
                 for x in c:
                     try:
                         x2 = self.c2x(c2, x['category'], False)
                     except Exception as ex:
                         r.append("malformed entry looking for "
-                           + x['category'] + " in bucket " + b['bucket']
-                           + " user " + e['user'] + " " + str(ex))
+                            + x['category'] + " in bucket " + b['bucket']
+                            + " user " + e['user'] + " " + str(ex))
                         break
                     usage_acc_validate_fields(r, x, x2, "entry: category "
-                       + x['category'] + " bucket " + b['bucket']
-                       + " in user " + e['user'])
+                        + x['category'] + " bucket " + b['bucket']
+                        + " in user " + e['user'])
+
+                if 's3select' not in b2:
+                    r.append("missing s3select in bucket "
+                        + b['bucket'] + " in user " + e['user'])
+                    continue
+                usage_acc_validate_s3select_fields(r,
+                    b['s3select'], b2['s3select'],
+                    "entry: s3select in bucket " + b['bucket'] + " in user " + e['user'])
         for s in self.results['summary']:
             c = s['categories']
             try:
                 s2 = usage_acc_findsum2(results['summary'], s['user'], False)
             except Exception as ex:
-                r.append("malformed summary looking for user " + e['user']
-                   + " " + str(ex))
+                r.append("malformed summary looking for user " + s['user']
+                    + " " + str(ex))
                 break
-                if s2 == None:
-                    r.append("missing summary for user " + e['user'] + " " + str(ex))
-                    continue
+            if s2 == None:
+                r.append("missing summary for user " + s['user'])
+                continue
             try:
                 c2 = s2['categories']
             except Exception as ex:
                 r.append("malformed summary missing categories for user "
-                   + e['user'] + " " + str(ex))
+                    + s['user'] + " " + str(ex))
                 break
             for x in c:
                 try:
                     x2 = self.c2x(c2, x['category'], False)
                 except Exception as ex:
                     r.append("malformed summary looking for "
-                       + x['category'] + " user " + e['user'] + " " + str(ex))
+                        + x['category'] + " user " + s['user'] + " " + str(ex))
                     break
                 usage_acc_validate_fields(r, x, x2, "summary: category "
-                   + x['category'] + " in user " + e['user'])
+                    + x['category'] + " in user " + s['user'])
             x = s['total']
             try:
                 x2 = s2['total']
             except Exception as ex:
                 r.append("malformed summary looking for totals for user "
-                         + e['user'] + " " + str(ex))
+                    + s['user'] + " " + str(ex))
                 break
-            usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user'])
+            usage_acc_validate_fields(r, x, x2, "summary: totals for user" + s['user'])
+            usage_acc_validate_s3select_fields(r, x, x2, "summary: s3select totals for user" + s['user'])
         return r
 
 def ignore_this_entry(cat, bucket, user, out, b_in, err):
index 3a71860c316b439c8cf0d85c9754a5b7617342f0..a441fcc36aaca15246c190350578768a0a78b405 100644 (file)
@@ -679,6 +679,21 @@ void rgw_bucket_dir::dump(Formatter *f) const
   f->close_section();
 }
 
+void rgw_s3select_usage_data::generate_test_instances(list<rgw_s3select_usage_data*>& o)
+{
+  rgw_s3select_usage_data *s = new rgw_s3select_usage_data;
+  s->bytes_processed = 1024;
+  s->bytes_returned = 512;
+  o.push_back(s);
+  o.push_back(new rgw_s3select_usage_data);
+}
+
+void rgw_s3select_usage_data::dump(Formatter *f) const
+{
+  f->dump_unsigned("bytes_processed", bytes_processed);
+  f->dump_unsigned("bytes_returned", bytes_returned);
+}
+
 void rgw_usage_data::generate_test_instances(list<rgw_usage_data*>& o)
 {
   rgw_usage_data *s = new rgw_usage_data;
@@ -759,12 +774,18 @@ void rgw_usage_log_entry::dump(Formatter *f) const
     }
   }
   f->close_section();
+
+  f->open_object_section("s3select");
+  f->dump_unsigned("bytes_processed", s3select_usage.bytes_processed);
+  f->dump_unsigned("bytes_returned", s3select_usage.bytes_returned);
+  f->close_section();
 }
 
 void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o)
 {
   rgw_usage_log_entry *entry = new rgw_usage_log_entry;
   rgw_usage_data usage_data{1024, 2048};
+  rgw_s3select_usage_data s3select_usage_data{8192, 4096};
   entry->owner = rgw_user("owner");
   entry->payer = rgw_user("payer");
   entry->bucket = "bucket";
@@ -774,6 +795,7 @@ void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o
   entry->total_usage.ops = usage_data.ops;
   entry->total_usage.successful_ops = usage_data.successful_ops;
   entry->usage_map["get_obj"] = usage_data;
+  entry->s3select_usage = s3select_usage_data;
   o.push_back(entry);
   o.push_back(new rgw_usage_log_entry);
 }
index f94bf114fa1e6a93989e749c80a598dd0209f9f6..7d1cbf01c93d8bf5f89894ea423d5cea508805c0 100644 (file)
@@ -868,6 +868,38 @@ struct rgw_bucket_dir {
 };
 WRITE_CLASS_ENCODER(rgw_bucket_dir)
 
+struct rgw_s3select_usage_data {
+  uint64_t bytes_processed;
+  uint64_t bytes_returned;
+
+  rgw_s3select_usage_data() : bytes_processed(0), bytes_returned(0) {}
+  rgw_s3select_usage_data(uint64_t processed, uint64_t returned)
+    : bytes_processed(processed), bytes_returned(returned) {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bytes_processed, bl);
+    encode(bytes_returned, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(bytes_processed, bl);
+    decode(bytes_returned, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void aggregate(const rgw_s3select_usage_data& usage) {
+    bytes_processed += usage.bytes_processed;
+    bytes_returned += usage.bytes_returned;
+  }
+
+  void dump(ceph::Formatter *f) const;
+  static void generate_test_instances(std::list<rgw_s3select_usage_data*>& o);
+};
+WRITE_CLASS_ENCODER(rgw_s3select_usage_data)
+
 struct rgw_usage_data {
   uint64_t bytes_sent;
   uint64_t bytes_received;
@@ -914,13 +946,14 @@ struct rgw_usage_log_entry {
   uint64_t epoch;
   rgw_usage_data total_usage; /* this one is kept for backwards compatibility */
   std::map<std::string, rgw_usage_data> usage_map;
+  rgw_s3select_usage_data s3select_usage;
 
   rgw_usage_log_entry() : epoch(0) {}
   rgw_usage_log_entry(std::string& o, std::string& b) : owner(o), bucket(b), epoch(0) {}
   rgw_usage_log_entry(std::string& o, std::string& p, std::string& b) : owner(o), payer(p), bucket(b), epoch(0) {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(owner.to_str(), bl);
     encode(bucket, bl);
     encode(epoch, bl);
@@ -930,12 +963,13 @@ struct rgw_usage_log_entry {
     encode(total_usage.successful_ops, bl);
     encode(usage_map, bl);
     encode(payer.to_str(), bl);
+    encode(s3select_usage, bl);
     ENCODE_FINISH(bl);
   }
 
 
    void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     std::string s;
     decode(s, bl);
     owner.from_str(s);
@@ -955,6 +989,9 @@ struct rgw_usage_log_entry {
       decode(p, bl);
       payer.from_str(p);
     }
+    if (struct_v >= 4) {
+      decode(s3select_usage, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -969,9 +1006,13 @@ struct rgw_usage_log_entry {
 
     for (auto iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) {
       if (!categories || !categories->size() || categories->count(iter->first)) {
-        add(iter->first, iter->second);
+        add_usage(iter->first, iter->second);
       }
     }
+
+    if (!categories || !categories->size() || categories->count("s3select")) {
+      s3select_usage.aggregate(e.s3select_usage);
+    }
   }
 
   void sum(rgw_usage_data& usage,
@@ -984,7 +1025,7 @@ struct rgw_usage_log_entry {
     }
   }
 
-  void add(const std::string& category, const rgw_usage_data& data) {
+  void add_usage(const std::string& category, const rgw_usage_data& data) {
     usage_map[category].aggregate(data);
     total_usage.aggregate(data);
   }
index 648b2e087b491ea0e1b1379c2ca7557af2d09311..34491b56ef2a1f9da0e1a5f6b5390eb3f7fc511c 100644 (file)
@@ -1111,6 +1111,7 @@ struct req_state : DoutPrefixProvider {
   uint64_t obj_size{0};
   bool enable_ops_log;
   bool enable_usage_log;
+  rgw_s3select_usage_data s3select_usage;
   uint8_t defer_to_bucket_acls;
   uint32_t perm_mask{0};
 
index de67fcd4b1471795c07d237b349e155029117c5b..5a92b9f784c7d12a333642b131e6279c6a95683b 100644 (file)
@@ -238,9 +238,12 @@ static void log_usage(req_state *s, const string& op_name)
   ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
        << " tenant=" << s->bucket_tenant
        << ", bytes_sent=" << bytes_sent << ", bytes_received="
-       << bytes_received << ", success=" << data.successful_ops << dendl;
+       << bytes_received << ", success=" << data.successful_ops
+       << ", bytes_processed=" << s->s3select_usage.bytes_processed
+       << ", bytes_returned=" << s->s3select_usage.bytes_returned << dendl;
 
-  entry.add(op_name, data);
+  entry.add_usage(op_name, data);
+  entry.s3select_usage = s->s3select_usage;
 
   utime_t ts = ceph_clock_now();
 
index b28a563bc80aca75cd2cf52c6c0022578f5ea39a..f444a9a83b5f7a02511334632ee191664b327b1f 100644 (file)
@@ -1568,7 +1568,16 @@ void RGWGetUsage_ObjStore_S3::send_response()
       utime_t ut(entry.epoch, 0);
       ut.gmtime(formatter->dump_stream("Time"));
       formatter->dump_int("Epoch", entry.epoch);
+
       dump_usage_categories_info(formatter, entry, &categories);
+
+      formatter->open_object_section("s3select");
+      if (categories.empty() || categories.count("s3select")) {
+        encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+        encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
+      }
+      formatter->close_section(); // s3select
+
       formatter->close_section(); // bucket
     }
 
@@ -1598,6 +1607,8 @@ void RGWGetUsage_ObjStore_S3::send_response()
        encode_json("BytesReceived", total_usage.bytes_received, formatter);
        encode_json("Ops", total_usage.ops, formatter);
        encode_json("SuccessfulOps", total_usage.successful_ops, formatter);
+       encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+       encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
        formatter->close_section(); // total
        formatter->close_section(); // user
      }
index ca7ca20eb12cfc0fd1886833639bc835248129b3..7f10a0034a8fdae399d9a37193b9b60163b45751 100644 (file)
@@ -105,7 +105,16 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
         if (!payer.empty() && payer != owner) {
           formatter->dump_string("payer", payer);
         }
+
         dump_usage_categories_info(formatter, entry, categories);
+
+        formatter->open_object_section("s3select");
+        if (!categories || categories->empty() || categories->count("s3select")) {
+          formatter->dump_unsigned("bytes_processed", entry.s3select_usage.bytes_processed);
+          formatter->dump_unsigned("bytes_returned", entry.s3select_usage.bytes_returned);
+        }
+        formatter->close_section(); // s3select
+
         formatter->close_section(); // bucket
         flusher.flush();
       }
@@ -136,6 +145,8 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
       encode_json("bytes_received", total_usage.bytes_received, formatter);
       encode_json("ops", total_usage.ops, formatter);
       encode_json("successful_ops", total_usage.successful_ops, formatter);
+      encode_json("bytes_processed", entry.s3select_usage.bytes_processed, formatter);
+      encode_json("bytes_returned", entry.s3select_usage.bytes_returned, formatter);
       formatter->close_section(); // total
 
       formatter->close_section(); // user
index dd5c3a8cb7f18180bfbe1f2f0034055fd96a7d2a..c1f2cd9b50b9f0e9d04a6f7cc5dac4da86aa7ec8 100644 (file)
@@ -45,6 +45,7 @@ TYPE(rgw_usage_log_entry)
 TYPE(rgw_cls_bi_entry)
 TYPE(rgw_bucket_olh_entry)
 TYPE(rgw_usage_data)
+TYPE(rgw_s3select_usage_data)
 TYPE(rgw_usage_log_info)
 TYPE(rgw_user_bucket)
 TYPE(cls_rgw_lc_entry)