From c84b7902edcdbecf655fa7a52645e53d2adb3f27 Mon Sep 17 00:00:00 2001 From: Seena Fallah Date: Sat, 18 Nov 2023 20:14:56 +0100 Subject: [PATCH] rgw: add s3select usage to log usage Expose bytes processed and bytes returned to log usage. Fixes: https://tracker.ceph.com/issues/63563 Signed-off-by: Seena Fallah (cherry picked from commit e3ececd293c64a76565293616b93b8b87550f8d2) --- qa/tasks/radosgw_admin.py | 61 ++++++++++++++++++++--------- src/cls/rgw/cls_rgw_types.cc | 22 +++++++++++ src/cls/rgw/cls_rgw_types.h | 49 +++++++++++++++++++++-- src/rgw/rgw_common.h | 1 + src/rgw/rgw_log.cc | 7 +++- src/rgw/rgw_rest_s3.cc | 11 ++++++ src/rgw/rgw_s3select.cc | 2 + src/rgw/rgw_usage.cc | 11 ++++++ src/tools/ceph-dencoder/rgw_types.h | 1 + 9 files changed, 140 insertions(+), 25 deletions(-) diff --git a/qa/tasks/radosgw_admin.py b/qa/tasks/radosgw_admin.py index 28d58715fac..3b98702acca 100644 --- a/qa/tasks/radosgw_admin.py +++ b/qa/tasks/radosgw_admin.py @@ -59,7 +59,8 @@ def usage_acc_findsum2(summaries, user, add=True): return None e = {'user': user, 'categories': [], 'total': {'bytes_received': 0, - 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }} + 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0, + 'bytes_processed': 0, 'bytes_returned': 0}} summaries.append(e) return e def usage_acc_update2(x, out, b_in, err): @@ -79,6 +80,17 @@ def usage_acc_validate_fields(r, x, x2, what): return if len(q) > 0: r.append("incomplete counts in " + what + ": " + ", ".join(q)) +def usage_acc_validate_s3select_fields(r, x, x2, what): + q=[] + for field in ['bytes_processed', 'bytes_returned']: + try: + if x2[field] < x[field]: + q.append("field %s: %d < %d" % (field, x2[field], x[field])) + except Exception as ex: + r.append( "missing/bad field " + field + " in " + what + " " + str(ex)) + return + if len(q) > 0: + r.append("incomplete counts in " + what + ": " + ", ".join(q)) class usage_acc: def __init__(self): self.results = {'entries': [], 'summary': []} @@ -92,7 +104,9 @@ class usage_acc: return b if not add: return None - b = {'bucket': bucket, 'categories': []} + b = {'bucket': bucket, 'categories': [], 's3select': { + 'bytes_processed': 0, 'bytes_returned': 0, + }} e['buckets'].append(b) return b def c2x(self, c, cat, add=True): @@ -146,60 +160,69 @@ class usage_acc: try: b2 = self.e2b(e2, b['bucket'], False) if b2 != None: - c2 = b2['categories'] + c2 = b2['categories'] except Exception as ex: r.append("malformed entry looking for bucket " - + b['bucket'] + " in user " + e['user'] + " " + str(ex)) + + b['bucket'] + " in user " + e['user'] + " " + str(ex)) break if b2 == None: r.append("can't find bucket " + b['bucket'] - + " in user " + e['user']) + + " in user " + e['user']) continue for x in c: try: x2 = self.c2x(c2, x['category'], False) except Exception as ex: r.append("malformed entry looking for " - + x['category'] + " in bucket " + b['bucket'] - + " user " + e['user'] + " " + str(ex)) + + x['category'] + " in bucket " + b['bucket'] + + " user " + e['user'] + " " + str(ex)) break usage_acc_validate_fields(r, x, x2, "entry: category " - + x['category'] + " bucket " + b['bucket'] - + " in user " + e['user']) + + x['category'] + " bucket " + b['bucket'] + + " in user " + e['user']) + + if 's3select' not in b2: + r.append("missing s3select in bucket " + + b['bucket'] + " in user " + e['user']) + continue + usage_acc_validate_s3select_fields(r, + b['s3select'], b2['s3select'], + "entry: s3select in bucket " + b['bucket'] + " in user " + e['user']) for s in self.results['summary']: c = s['categories'] try: s2 = usage_acc_findsum2(results['summary'], s['user'], False) except Exception as ex: - r.append("malformed summary looking for user " + e['user'] - + " " + str(ex)) + r.append("malformed summary looking for user " + s['user'] + + " " + str(ex)) break - if s2 == None: - r.append("missing summary for user " + e['user'] + " " + str(ex)) - continue + if s2 == None: + r.append("missing summary for user " + s['user']) + continue try: c2 = s2['categories'] except Exception as ex: r.append("malformed summary missing categories for user " - + e['user'] + " " + str(ex)) + + s['user'] + " " + str(ex)) break for x in c: try: x2 = self.c2x(c2, x['category'], False) except Exception as ex: r.append("malformed summary looking for " - + x['category'] + " user " + e['user'] + " " + str(ex)) + + x['category'] + " user " + s['user'] + " " + str(ex)) break usage_acc_validate_fields(r, x, x2, "summary: category " - + x['category'] + " in user " + e['user']) + + x['category'] + " in user " + s['user']) x = s['total'] try: x2 = s2['total'] except Exception as ex: r.append("malformed summary looking for totals for user " - + e['user'] + " " + str(ex)) + + s['user'] + " " + str(ex)) break - usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user']) + usage_acc_validate_fields(r, x, x2, "summary: totals for user" + s['user']) + usage_acc_validate_s3select_fields(r, x, x2, "summary: s3select totals for user" + s['user']) return r def ignore_this_entry(cat, bucket, user, out, b_in, err): diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index 1c232a576bb..05d48d734fc 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -693,6 +693,21 @@ void rgw_bucket_dir::dump(Formatter *f) const f->close_section(); } +void rgw_s3select_usage_data::generate_test_instances(list& o) +{ + rgw_s3select_usage_data *s = new rgw_s3select_usage_data; + s->bytes_processed = 1024; + s->bytes_returned = 512; + o.push_back(s); + o.push_back(new rgw_s3select_usage_data); +} + +void rgw_s3select_usage_data::dump(Formatter *f) const +{ + f->dump_unsigned("bytes_processed", bytes_processed); + f->dump_unsigned("bytes_returned", bytes_returned); +} + void rgw_usage_data::generate_test_instances(list& o) { rgw_usage_data *s = new rgw_usage_data; @@ -773,12 +788,18 @@ void rgw_usage_log_entry::dump(Formatter *f) const } } f->close_section(); + + f->open_object_section("s3select"); + f->dump_unsigned("bytes_processed", s3select_usage.bytes_processed); + f->dump_unsigned("bytes_returned", s3select_usage.bytes_returned); + f->close_section(); } void rgw_usage_log_entry::generate_test_instances(list &o) { rgw_usage_log_entry *entry = new rgw_usage_log_entry; rgw_usage_data usage_data{1024, 2048}; + rgw_s3select_usage_data s3select_usage_data{8192, 4096}; entry->owner = rgw_user("owner"); entry->payer = rgw_user("payer"); entry->bucket = "bucket"; @@ -788,6 +809,7 @@ void rgw_usage_log_entry::generate_test_instances(list &o entry->total_usage.ops = usage_data.ops; entry->total_usage.successful_ops = usage_data.successful_ops; entry->usage_map["get_obj"] = usage_data; + entry->s3select_usage = s3select_usage_data; o.push_back(entry); o.push_back(new rgw_usage_log_entry); } diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 7b6b46101cc..e38e06c7ced 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -869,6 +869,38 @@ struct rgw_bucket_dir { }; WRITE_CLASS_ENCODER(rgw_bucket_dir) +struct rgw_s3select_usage_data { + uint64_t bytes_processed; + uint64_t bytes_returned; + + rgw_s3select_usage_data() : bytes_processed(0), bytes_returned(0) {} + rgw_s3select_usage_data(uint64_t processed, uint64_t returned) + : bytes_processed(processed), bytes_returned(returned) {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(bytes_processed, bl); + encode(bytes_returned, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(bytes_processed, bl); + decode(bytes_returned, bl); + DECODE_FINISH(bl); + } + + void aggregate(const rgw_s3select_usage_data& usage) { + bytes_processed += usage.bytes_processed; + bytes_returned += usage.bytes_returned; + } + + void dump(ceph::Formatter *f) const; + static void generate_test_instances(std::list& o); +}; +WRITE_CLASS_ENCODER(rgw_s3select_usage_data) + struct rgw_usage_data { uint64_t bytes_sent; uint64_t bytes_received; @@ -915,13 +947,14 @@ struct rgw_usage_log_entry { uint64_t epoch; rgw_usage_data total_usage; /* this one is kept for backwards compatibility */ std::map usage_map; + rgw_s3select_usage_data s3select_usage; rgw_usage_log_entry() : epoch(0) {} rgw_usage_log_entry(std::string& o, std::string& b) : owner(o), bucket(b), epoch(0) {} rgw_usage_log_entry(std::string& o, std::string& p, std::string& b) : owner(o), payer(p), bucket(b), epoch(0) {} void encode(ceph::buffer::list& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(4, 1, bl); encode(owner.to_str(), bl); encode(bucket, bl); encode(epoch, bl); @@ -931,12 +964,13 @@ struct rgw_usage_log_entry { encode(total_usage.successful_ops, bl); encode(usage_map, bl); encode(payer.to_str(), bl); + encode(s3select_usage, bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { - DECODE_START(3, bl); + DECODE_START(4, bl); std::string s; decode(s, bl); owner.from_str(s); @@ -956,6 +990,9 @@ struct rgw_usage_log_entry { decode(p, bl); payer.from_str(p); } + if (struct_v >= 4) { + decode(s3select_usage, bl); + } DECODE_FINISH(bl); } @@ -970,9 +1007,13 @@ struct rgw_usage_log_entry { for (auto iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) { if (!categories || !categories->size() || categories->count(iter->first)) { - add(iter->first, iter->second); + add_usage(iter->first, iter->second); } } + + if (!categories || !categories->size() || categories->count("s3select")) { + s3select_usage.aggregate(e.s3select_usage); + } } void sum(rgw_usage_data& usage, @@ -985,7 +1026,7 @@ struct rgw_usage_log_entry { } } - void add(const std::string& category, const rgw_usage_data& data) { + void add_usage(const std::string& category, const rgw_usage_data& data) { usage_map[category].aggregate(data); total_usage.aggregate(data); } diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index c95acae4510..2002ae51ec9 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1260,6 +1260,7 @@ struct req_state : DoutPrefixProvider { uint64_t obj_size{0}; bool enable_ops_log; bool enable_usage_log; + rgw_s3select_usage_data s3select_usage; uint8_t defer_to_bucket_acls; uint32_t perm_mask{0}; diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index ba60e576b77..5d7e3678f8f 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -236,9 +236,12 @@ static void log_usage(req_state *s, const string& op_name) ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name << " tenant=" << s->bucket_tenant << ", bytes_sent=" << bytes_sent << ", bytes_received=" - << bytes_received << ", success=" << data.successful_ops << dendl; + << bytes_received << ", success=" << data.successful_ops + << ", bytes_processed=" << s->s3select_usage.bytes_processed + << ", bytes_returned=" << s->s3select_usage.bytes_returned << dendl; - entry.add(op_name, data); + entry.add_usage(op_name, data); + entry.s3select_usage = s->s3select_usage; utime_t ts = ceph_clock_now(); diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 29e12e7720c..a9a6c3699d0 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1592,7 +1592,16 @@ void RGWGetUsage_ObjStore_S3::send_response() utime_t ut(entry.epoch, 0); ut.gmtime(formatter->dump_stream("Time")); formatter->dump_int("Epoch", entry.epoch); + dump_usage_categories_info(formatter, entry, &categories); + + formatter->open_object_section("s3select"); + if (categories.empty() || categories.count("s3select")) { + encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter); + encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter); + } + formatter->close_section(); // s3select + formatter->close_section(); // bucket } @@ -1622,6 +1631,8 @@ void RGWGetUsage_ObjStore_S3::send_response() encode_json("BytesReceived", total_usage.bytes_received, formatter); encode_json("Ops", total_usage.ops, formatter); encode_json("SuccessfulOps", total_usage.successful_ops, formatter); + encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter); + encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter); formatter->close_section(); // total formatter->close_section(); // user } diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 8c6dc542512..1b7dced2782 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -27,6 +27,7 @@ uint64_t aws_response_handler::get_processed_size() void aws_response_handler::update_processed_size(uint64_t value) { processed_size += value; + s->s3select_usage.bytes_processed = processed_size; } uint64_t aws_response_handler::get_total_bytes_returned() @@ -37,6 +38,7 @@ uint64_t aws_response_handler::get_total_bytes_returned() void aws_response_handler::update_total_bytes_returned(uint64_t value) { total_bytes_returned = value; + s->s3select_usage.bytes_returned = total_bytes_returned; } void aws_response_handler::push_header(const char* header_name, const char* header_value) diff --git a/src/rgw/rgw_usage.cc b/src/rgw/rgw_usage.cc index 43e56577c24..1f9cb6f8d5f 100644 --- a/src/rgw/rgw_usage.cc +++ b/src/rgw/rgw_usage.cc @@ -105,7 +105,16 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, if (!payer.empty() && payer != owner) { formatter->dump_string("payer", payer); } + dump_usage_categories_info(formatter, entry, categories); + + formatter->open_object_section("s3select"); + if (!categories || categories->empty() || categories->count("s3select")) { + formatter->dump_unsigned("bytes_processed", entry.s3select_usage.bytes_processed); + formatter->dump_unsigned("bytes_returned", entry.s3select_usage.bytes_returned); + } + formatter->close_section(); // s3select + formatter->close_section(); // bucket flusher.flush(); } @@ -136,6 +145,8 @@ int RGWUsage::show(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, encode_json("bytes_received", total_usage.bytes_received, formatter); encode_json("ops", total_usage.ops, formatter); encode_json("successful_ops", total_usage.successful_ops, formatter); + encode_json("bytes_processed", entry.s3select_usage.bytes_processed, formatter); + encode_json("bytes_returned", entry.s3select_usage.bytes_returned, formatter); formatter->close_section(); // total formatter->close_section(); // user diff --git a/src/tools/ceph-dencoder/rgw_types.h b/src/tools/ceph-dencoder/rgw_types.h index da1fde8f485..8cd0acfc624 100644 --- a/src/tools/ceph-dencoder/rgw_types.h +++ b/src/tools/ceph-dencoder/rgw_types.h @@ -53,6 +53,7 @@ TYPE(rgw_usage_log_entry) TYPE(rgw_cls_bi_entry) TYPE(rgw_bucket_olh_entry) TYPE(rgw_usage_data) +TYPE(rgw_s3select_usage_data) TYPE(rgw_usage_log_info) TYPE(rgw_user_bucket) TYPE(cls_rgw_lc_entry) -- 2.39.5