return None
e = {'user': user, 'categories': [],
'total': {'bytes_received': 0,
- 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0 }}
+ 'bytes_sent': 0, 'ops': 0, 'successful_ops': 0,
+ 'bytes_processed': 0, 'bytes_returned': 0}}
summaries.append(e)
return e
def usage_acc_update2(x, out, b_in, err):
return
if len(q) > 0:
r.append("incomplete counts in " + what + ": " + ", ".join(q))
+def usage_acc_validate_s3select_fields(r, x, x2, what):
+ q=[]
+ for field in ['bytes_processed', 'bytes_returned']:
+ try:
+ if x2[field] < x[field]:
+ q.append("field %s: %d < %d" % (field, x2[field], x[field]))
+ except Exception as ex:
+ r.append( "missing/bad field " + field + " in " + what + " " + str(ex))
+ return
+ if len(q) > 0:
+ r.append("incomplete counts in " + what + ": " + ", ".join(q))
class usage_acc:
def __init__(self):
self.results = {'entries': [], 'summary': []}
return b
if not add:
return None
- b = {'bucket': bucket, 'categories': []}
+ b = {'bucket': bucket, 'categories': [], 's3select': {
+ 'bytes_processed': 0, 'bytes_returned': 0,
+ }}
e['buckets'].append(b)
return b
def c2x(self, c, cat, add=True):
try:
b2 = self.e2b(e2, b['bucket'], False)
if b2 != None:
- c2 = b2['categories']
+ c2 = b2['categories']
except Exception as ex:
r.append("malformed entry looking for bucket "
- + b['bucket'] + " in user " + e['user'] + " " + str(ex))
+ + b['bucket'] + " in user " + e['user'] + " " + str(ex))
break
if b2 == None:
r.append("can't find bucket " + b['bucket']
- + " in user " + e['user'])
+ + " in user " + e['user'])
continue
for x in c:
try:
x2 = self.c2x(c2, x['category'], False)
except Exception as ex:
r.append("malformed entry looking for "
- + x['category'] + " in bucket " + b['bucket']
- + " user " + e['user'] + " " + str(ex))
+ + x['category'] + " in bucket " + b['bucket']
+ + " user " + e['user'] + " " + str(ex))
break
usage_acc_validate_fields(r, x, x2, "entry: category "
- + x['category'] + " bucket " + b['bucket']
- + " in user " + e['user'])
+ + x['category'] + " bucket " + b['bucket']
+ + " in user " + e['user'])
+
+ if 's3select' not in b2:
+ r.append("missing s3select in bucket "
+ + b['bucket'] + " in user " + e['user'])
+ continue
+ usage_acc_validate_s3select_fields(r,
+ b['s3select'], b2['s3select'],
+ "entry: s3select in bucket " + b['bucket'] + " in user " + e['user'])
for s in self.results['summary']:
c = s['categories']
try:
s2 = usage_acc_findsum2(results['summary'], s['user'], False)
except Exception as ex:
- r.append("malformed summary looking for user " + e['user']
- + " " + str(ex))
+ r.append("malformed summary looking for user " + s['user']
+ + " " + str(ex))
break
- if s2 == None:
- r.append("missing summary for user " + e['user'] + " " + str(ex))
- continue
+ if s2 == None:
+ r.append("missing summary for user " + s['user'])
+ continue
try:
c2 = s2['categories']
except Exception as ex:
r.append("malformed summary missing categories for user "
- + e['user'] + " " + str(ex))
+ + s['user'] + " " + str(ex))
break
for x in c:
try:
x2 = self.c2x(c2, x['category'], False)
except Exception as ex:
r.append("malformed summary looking for "
- + x['category'] + " user " + e['user'] + " " + str(ex))
+ + x['category'] + " user " + s['user'] + " " + str(ex))
break
usage_acc_validate_fields(r, x, x2, "summary: category "
- + x['category'] + " in user " + e['user'])
+ + x['category'] + " in user " + s['user'])
x = s['total']
try:
x2 = s2['total']
except Exception as ex:
r.append("malformed summary looking for totals for user "
- + e['user'] + " " + str(ex))
+ + s['user'] + " " + str(ex))
break
- usage_acc_validate_fields(r, x, x2, "summary: totals for user" + e['user'])
+ usage_acc_validate_fields(r, x, x2, "summary: totals for user" + s['user'])
+ usage_acc_validate_s3select_fields(r, x, x2, "summary: s3select totals for user" + s['user'])
return r
def ignore_this_entry(cat, bucket, user, out, b_in, err):
f->close_section();
}
+void rgw_s3select_usage_data::generate_test_instances(list<rgw_s3select_usage_data*>& o)
+{
+ rgw_s3select_usage_data *s = new rgw_s3select_usage_data;
+ s->bytes_processed = 1024;
+ s->bytes_returned = 512;
+ o.push_back(s);
+ o.push_back(new rgw_s3select_usage_data);
+}
+
+void rgw_s3select_usage_data::dump(Formatter *f) const
+{
+ f->dump_unsigned("bytes_processed", bytes_processed);
+ f->dump_unsigned("bytes_returned", bytes_returned);
+}
+
void rgw_usage_data::generate_test_instances(list<rgw_usage_data*>& o)
{
rgw_usage_data *s = new rgw_usage_data;
}
}
f->close_section();
+
+ f->open_object_section("s3select");
+ f->dump_unsigned("bytes_processed", s3select_usage.bytes_processed);
+ f->dump_unsigned("bytes_returned", s3select_usage.bytes_returned);
+ f->close_section();
}
void rgw_usage_log_entry::generate_test_instances(list<rgw_usage_log_entry *> &o)
{
rgw_usage_log_entry *entry = new rgw_usage_log_entry;
rgw_usage_data usage_data{1024, 2048};
+ rgw_s3select_usage_data s3select_usage_data{8192, 4096};
entry->owner = rgw_user("owner");
entry->payer = rgw_user("payer");
entry->bucket = "bucket";
entry->total_usage.ops = usage_data.ops;
entry->total_usage.successful_ops = usage_data.successful_ops;
entry->usage_map["get_obj"] = usage_data;
+ entry->s3select_usage = s3select_usage_data;
o.push_back(entry);
o.push_back(new rgw_usage_log_entry);
}
};
WRITE_CLASS_ENCODER(rgw_bucket_dir)
+struct rgw_s3select_usage_data {
+ uint64_t bytes_processed;
+ uint64_t bytes_returned;
+
+ rgw_s3select_usage_data() : bytes_processed(0), bytes_returned(0) {}
+ rgw_s3select_usage_data(uint64_t processed, uint64_t returned)
+ : bytes_processed(processed), bytes_returned(returned) {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(bytes_processed, bl);
+ encode(bytes_returned, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(bytes_processed, bl);
+ decode(bytes_returned, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void aggregate(const rgw_s3select_usage_data& usage) {
+ bytes_processed += usage.bytes_processed;
+ bytes_returned += usage.bytes_returned;
+ }
+
+ void dump(ceph::Formatter *f) const;
+ static void generate_test_instances(std::list<rgw_s3select_usage_data*>& o);
+};
+WRITE_CLASS_ENCODER(rgw_s3select_usage_data)
+
struct rgw_usage_data {
uint64_t bytes_sent;
uint64_t bytes_received;
uint64_t epoch;
rgw_usage_data total_usage; /* this one is kept for backwards compatibility */
std::map<std::string, rgw_usage_data> usage_map;
+ rgw_s3select_usage_data s3select_usage;
rgw_usage_log_entry() : epoch(0) {}
rgw_usage_log_entry(std::string& o, std::string& b) : owner(o), bucket(b), epoch(0) {}
rgw_usage_log_entry(std::string& o, std::string& p, std::string& b) : owner(o), payer(p), bucket(b), epoch(0) {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(4, 1, bl);
encode(owner.to_str(), bl);
encode(bucket, bl);
encode(epoch, bl);
encode(total_usage.successful_ops, bl);
encode(usage_map, bl);
encode(payer.to_str(), bl);
+ encode(s3select_usage, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(3, bl);
+ DECODE_START(4, bl);
std::string s;
decode(s, bl);
owner.from_str(s);
decode(p, bl);
payer.from_str(p);
}
+ if (struct_v >= 4) {
+ decode(s3select_usage, bl);
+ }
DECODE_FINISH(bl);
}
for (auto iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) {
if (!categories || !categories->size() || categories->count(iter->first)) {
- add(iter->first, iter->second);
+ add_usage(iter->first, iter->second);
}
}
+
+ if (!categories || !categories->size() || categories->count("s3select")) {
+ s3select_usage.aggregate(e.s3select_usage);
+ }
}
void sum(rgw_usage_data& usage,
}
}
- void add(const std::string& category, const rgw_usage_data& data) {
+ void add_usage(const std::string& category, const rgw_usage_data& data) {
usage_map[category].aggregate(data);
total_usage.aggregate(data);
}
uint64_t obj_size{0};
bool enable_ops_log;
bool enable_usage_log;
+ rgw_s3select_usage_data s3select_usage;
uint8_t defer_to_bucket_acls;
uint32_t perm_mask{0};
ldpp_dout(s, 30) << "log_usage: bucket_name=" << bucket_name
<< " tenant=" << s->bucket_tenant
<< ", bytes_sent=" << bytes_sent << ", bytes_received="
- << bytes_received << ", success=" << data.successful_ops << dendl;
+ << bytes_received << ", success=" << data.successful_ops
+ << ", bytes_processed=" << s->s3select_usage.bytes_processed
+ << ", bytes_returned=" << s->s3select_usage.bytes_returned << dendl;
- entry.add(op_name, data);
+ entry.add_usage(op_name, data);
+ entry.s3select_usage = s->s3select_usage;
utime_t ts = ceph_clock_now();
utime_t ut(entry.epoch, 0);
ut.gmtime(formatter->dump_stream("Time"));
formatter->dump_int("Epoch", entry.epoch);
+
dump_usage_categories_info(formatter, entry, &categories);
+
+ formatter->open_object_section("s3select");
+ if (categories.empty() || categories.count("s3select")) {
+ encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+ encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
+ }
+ formatter->close_section(); // s3select
+
formatter->close_section(); // bucket
}
encode_json("BytesReceived", total_usage.bytes_received, formatter);
encode_json("Ops", total_usage.ops, formatter);
encode_json("SuccessfulOps", total_usage.successful_ops, formatter);
+ encode_json("BytesProcessed", entry.s3select_usage.bytes_processed, formatter);
+ encode_json("BytesReturned", entry.s3select_usage.bytes_returned, formatter);
formatter->close_section(); // total
formatter->close_section(); // user
}
void aws_response_handler::update_processed_size(uint64_t value)
{
processed_size += value;
+ s->s3select_usage.bytes_processed = processed_size;
}
uint64_t aws_response_handler::get_total_bytes_returned()
void aws_response_handler::update_total_bytes_returned(uint64_t value)
{
total_bytes_returned = value;
+ s->s3select_usage.bytes_returned = total_bytes_returned;
}
void aws_response_handler::push_header(const char* header_name, const char* header_value)
if (!payer.empty() && payer != owner) {
formatter->dump_string("payer", payer);
}
+
dump_usage_categories_info(formatter, entry, categories);
+
+ formatter->open_object_section("s3select");
+ if (!categories || categories->empty() || categories->count("s3select")) {
+ formatter->dump_unsigned("bytes_processed", entry.s3select_usage.bytes_processed);
+ formatter->dump_unsigned("bytes_returned", entry.s3select_usage.bytes_returned);
+ }
+ formatter->close_section(); // s3select
+
formatter->close_section(); // bucket
flusher.flush();
}
encode_json("bytes_received", total_usage.bytes_received, formatter);
encode_json("ops", total_usage.ops, formatter);
encode_json("successful_ops", total_usage.successful_ops, formatter);
+ encode_json("bytes_processed", entry.s3select_usage.bytes_processed, formatter);
+ encode_json("bytes_returned", entry.s3select_usage.bytes_returned, formatter);
formatter->close_section(); // total
formatter->close_section(); // user
TYPE(rgw_cls_bi_entry)
TYPE(rgw_bucket_olh_entry)
TYPE(rgw_usage_data)
+TYPE(rgw_s3select_usage_data)
TYPE(rgw_usage_log_info)
TYPE(rgw_user_bucket)
TYPE(cls_rgw_lc_entry)