From cf812eaddec989797aa3273fd5ce3f85f6fe3b7f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 29 Feb 2016 15:19:19 -0800 Subject: [PATCH] rgw: aggregate usage by payer If bucket has requester-payer property set, aggregate the usage data on it by the user that did the operation. Signed-off-by: Yehuda Sadeh Signed-off-by: Javier M. Mellid --- src/cls/rgw/cls_rgw.cc | 26 ++++++++++++++++++-------- src/cls/rgw/cls_rgw_ops.h | 11 +++++++++-- src/cls/rgw/cls_rgw_types.h | 28 ++++++++++++++++++++++------ src/rgw/rgw_log.cc | 27 +++++++++++++++++++++------ src/rgw/rgw_usage.cc | 7 ++++++- 5 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 0fd88905eb161..421f8b1b35ffe 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -2581,14 +2581,14 @@ static void usage_record_prefix_by_user(string& user, uint64_t epoch, string& ke key = buf; } -static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key) +static void usage_record_name_by_time(uint64_t epoch, const string& user, string& bucket, string& key) { char buf[32 + user.size() + bucket.size()]; snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str()); key = buf; } -static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key) +static void usage_record_name_by_user(const string& user, uint64_t epoch, string& bucket, string& key) { char buf[32 + user.size() + bucket.size()]; snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str()); @@ -2628,9 +2628,12 @@ int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) { rgw_usage_log_entry& entry = *iter; string key_by_time; - usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time); - CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str()); + rgw_user *puser = (entry.payer.empty() ? &entry.owner : &entry.payer); + + usage_record_name_by_time(entry.epoch, puser->to_str(), entry.bucket, key_by_time); + + CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", puser->to_str().c_str(), entry.bucket.c_str()); bufferlist record_bl; int ret = cls_cxx_map_get_val(hctx, key_by_time, &record_bl); @@ -2654,7 +2657,7 @@ int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist return ret; string key_by_user; - usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user); + usage_record_name_by_user(puser->to_str(), entry.epoch, entry.bucket, key_by_user); ret = cls_cxx_map_set_val(hctx, key_by_user, &new_record_bl); if (ret < 0) return ret; @@ -2756,7 +2759,13 @@ static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64 static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param) { map *usage = (map *)param; - rgw_user_bucket ub(entry.owner, entry.bucket); + rgw_user *puser; + if (!entry.payer.empty()) { + puser = &entry.payer; + } else { + puser = &entry.owner; + } + rgw_user_bucket ub(puser->to_str(), entry.bucket); rgw_usage_log_entry& le = (*usage)[ub]; le.aggregate(entry); @@ -2798,8 +2807,9 @@ static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_u string key_by_time; string key_by_user; - usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time); - usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user); + string o = entry.owner.to_str(); + usage_record_name_by_time(entry.epoch, o, entry.bucket, key_by_time); + usage_record_name_by_user(o, entry.epoch, entry.bucket, key_by_user); int ret = cls_cxx_map_remove_key(hctx, key_by_time); if (ret < 0) diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 053d716e292ac..0ab665857cb94 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -507,16 +507,23 @@ WRITE_CLASS_ENCODER(rgw_cls_obj_check_mtime) struct rgw_cls_usage_log_add_op { rgw_usage_log_info info; + rgw_user user; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); ::encode(info, bl); + ::encode(user.to_str(), bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); ::decode(info, bl); + if (struct_v >= 2) { + string s; + ::decode(s, bl); + user.from_str(s); + } DECODE_FINISH(bl); } }; diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 3da746851b7ca..9efc2288d57cf 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -7,6 +7,8 @@ #include "include/utime.h" #include "common/Formatter.h" +#include "rgw/rgw_basic_types.h" + #define CEPH_RGW_REMOVE 'r' #define CEPH_RGW_UPDATE 'u' #define CEPH_RGW_TAG_TIMEOUT 60*60*24 @@ -662,7 +664,8 @@ WRITE_CLASS_ENCODER(rgw_usage_data) struct rgw_usage_log_entry { - string owner; + rgw_user owner; + rgw_user payer; /* if empty, same as owner */ string bucket; uint64_t epoch; rgw_usage_data total_usage; /* this one is kept for backwards compatibility */ @@ -670,10 +673,11 @@ struct rgw_usage_log_entry { rgw_usage_log_entry() : epoch(0) {} rgw_usage_log_entry(string& o, string& b) : owner(o), bucket(b), epoch(0) {} + rgw_usage_log_entry(string& o, string& p, string& b) : owner(o), payer(p), bucket(b), epoch(0) {} void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); - ::encode(owner, bl); + ENCODE_START(3, 1, bl); + ::encode(owner.to_str(), bl); ::encode(bucket, bl); ::encode(epoch, bl); ::encode(total_usage.bytes_sent, bl); @@ -681,13 +685,16 @@ struct rgw_usage_log_entry { ::encode(total_usage.ops, bl); ::encode(total_usage.successful_ops, bl); ::encode(usage_map, bl); + ::encode(payer.to_str(), bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START(2, bl); - ::decode(owner, bl); + DECODE_START(3, bl); + string s; + ::decode(s, bl); + owner.from_str(s); ::decode(bucket, bl); ::decode(epoch, bl); ::decode(total_usage.bytes_sent, bl); @@ -699,6 +706,11 @@ struct rgw_usage_log_entry { } else { ::decode(usage_map, bl); } + if (struct_v >= 3) { + string p; + ::decode(p, bl); + payer.from_str(p); + } DECODE_FINISH(bl); } @@ -707,6 +719,10 @@ struct rgw_usage_log_entry { owner = e.owner; bucket = e.bucket; epoch = e.epoch; + payer = e.payer; + } + if (payer != e.payer) { + return; /* can't aggregate differet payers */ } map::const_iterator iter; for (iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) { @@ -756,7 +772,7 @@ struct rgw_user_bucket { string bucket; rgw_user_bucket() {} - rgw_user_bucket(string &u, string& b) : user(u), bucket(b) {} + rgw_user_bucket(const string& u, const string& b) : user(u), bucket(b) {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index 5faffecc34ea5..eecb990e8b335 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -127,13 +127,14 @@ public: round_timestamp = ts.round_to_hour(); } - void insert(utime_t& timestamp, rgw_usage_log_entry& entry) { + void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) { lock.Lock(); if (timestamp.sec() > round_timestamp + 3600) recalc_round_timestamp(timestamp); entry.epoch = round_timestamp.sec(); bool account; - rgw_user_bucket ub(entry.owner, entry.bucket); + string u = user.to_str(); + rgw_user_bucket ub(u, entry.bucket); usage_map[ub].insert(round_timestamp, entry, &account); if (account) num_entries++; @@ -145,6 +146,14 @@ public: } } + void insert(utime_t& timestamp, rgw_usage_log_entry& entry) { + if (entry.payer.empty()) { + insert_user(timestamp, entry.owner, entry); + } else { + insert_user(timestamp, entry.payer, entry); + } + } + void flush() { map old_map; lock.Lock(); @@ -178,14 +187,20 @@ static void log_usage(struct req_state *s, const string& op_name) return; rgw_user user; + rgw_user payer; - if (!s->bucket_name.empty()) + if (!s->bucket_name.empty()) { user = s->bucket_owner.get_id(); - else + if (s->bucket_info.requester_pays) { + payer = s->user->user_id; + } + } else { user = s->user->user_id; + } - string id = user.to_str(); - rgw_usage_log_entry entry(id, s->bucket.name); + string u = user.to_str(); + string p = payer.to_str(); + rgw_usage_log_entry entry(u, p, s->bucket.name); uint64_t bytes_sent = s->cio->get_bytes_sent(); uint64_t bytes_received = s->cio->get_bytes_received(); diff --git a/src/rgw/rgw_usage.cc b/src/rgw/rgw_usage.cc index f8495c529e8ac..82ab5ae584bfb 100644 --- a/src/rgw/rgw_usage.cc +++ b/src/rgw/rgw_usage.cc @@ -78,7 +78,7 @@ int RGWUsage::show(RGWRados *store, rgw_user& uid, uint64_t start_epoch, formatter->close_section(); } formatter->open_object_section("user"); - formatter->dump_string("owner", ub.user); + formatter->dump_string("user", ub.user); formatter->open_array_section("buckets"); user_section_open = true; last_owner = ub.user; @@ -88,6 +88,11 @@ int RGWUsage::show(RGWRados *store, rgw_user& uid, uint64_t start_epoch, utime_t ut(entry.epoch, 0); ut.gmtime(formatter->dump_stream("time")); formatter->dump_int("epoch", entry.epoch); + formatter->dump_string("owner", entry.owner.to_str()); + string payer = entry.payer.to_str(); + if (!payer.empty()) { + formatter->dump_string("payer", payer); + } dump_usage_categories_info(formatter, entry, categories); formatter->close_section(); // bucket flusher.flush(); -- 2.39.5