key = buf;
}
-static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
+static void usage_record_name_by_time(uint64_t epoch, const string& user, string& bucket, string& key)
{
char buf[32 + user.size() + bucket.size()];
snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
key = buf;
}
-static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
+static void usage_record_name_by_user(const string& user, uint64_t epoch, string& bucket, string& key)
{
char buf[32 + user.size() + bucket.size()];
snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
rgw_usage_log_entry& entry = *iter;
string key_by_time;
- usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
- CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
+ rgw_user *puser = (entry.payer.empty() ? &entry.owner : &entry.payer);
+
+ usage_record_name_by_time(entry.epoch, puser->to_str(), entry.bucket, key_by_time);
+
+ CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", puser->to_str().c_str(), entry.bucket.c_str());
bufferlist record_bl;
int ret = cls_cxx_map_get_val(hctx, key_by_time, &record_bl);
return ret;
string key_by_user;
- usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+ usage_record_name_by_user(puser->to_str(), entry.epoch, entry.bucket, key_by_user);
ret = cls_cxx_map_set_val(hctx, key_by_user, &new_record_bl);
if (ret < 0)
return ret;
static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
{
map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
- rgw_user_bucket ub(entry.owner, entry.bucket);
+ rgw_user *puser;
+ if (!entry.payer.empty()) {
+ puser = &entry.payer;
+ } else {
+ puser = &entry.owner;
+ }
+ rgw_user_bucket ub(puser->to_str(), entry.bucket);
rgw_usage_log_entry& le = (*usage)[ub];
le.aggregate(entry);
string key_by_time;
string key_by_user;
- usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
- usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+ string o = entry.owner.to_str();
+ usage_record_name_by_time(entry.epoch, o, entry.bucket, key_by_time);
+ usage_record_name_by_user(o, entry.epoch, entry.bucket, key_by_user);
int ret = cls_cxx_map_remove_key(hctx, key_by_time);
if (ret < 0)
struct rgw_cls_usage_log_add_op {
rgw_usage_log_info info;
+ rgw_user user;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(info, bl);
+ ::encode(user.to_str(), bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
::decode(info, bl);
+ if (struct_v >= 2) {
+ string s;
+ ::decode(s, bl);
+ user.from_str(s);
+ }
DECODE_FINISH(bl);
}
};
#include "include/utime.h"
#include "common/Formatter.h"
+#include "rgw/rgw_basic_types.h"
+
#define CEPH_RGW_REMOVE 'r'
#define CEPH_RGW_UPDATE 'u'
#define CEPH_RGW_TAG_TIMEOUT 60*60*24
struct rgw_usage_log_entry {
- string owner;
+ rgw_user owner;
+ rgw_user payer; /* if empty, same as owner */
string bucket;
uint64_t epoch;
rgw_usage_data total_usage; /* this one is kept for backwards compatibility */
rgw_usage_log_entry() : epoch(0) {}
rgw_usage_log_entry(string& o, string& b) : owner(o), bucket(b), epoch(0) {}
+ rgw_usage_log_entry(string& o, string& p, string& b) : owner(o), payer(p), bucket(b), epoch(0) {}
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
- ::encode(owner, bl);
+ ENCODE_START(3, 1, bl);
+ ::encode(owner.to_str(), bl);
::encode(bucket, bl);
::encode(epoch, bl);
::encode(total_usage.bytes_sent, bl);
::encode(total_usage.ops, bl);
::encode(total_usage.successful_ops, bl);
::encode(usage_map, bl);
+ ::encode(payer.to_str(), bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(2, bl);
- ::decode(owner, bl);
+ DECODE_START(3, bl);
+ string s;
+ ::decode(s, bl);
+ owner.from_str(s);
::decode(bucket, bl);
::decode(epoch, bl);
::decode(total_usage.bytes_sent, bl);
} else {
::decode(usage_map, bl);
}
+ if (struct_v >= 3) {
+ string p;
+ ::decode(p, bl);
+ payer.from_str(p);
+ }
DECODE_FINISH(bl);
}
owner = e.owner;
bucket = e.bucket;
epoch = e.epoch;
+ payer = e.payer;
+ }
+ if (payer != e.payer) {
+ return; /* can't aggregate differet payers */
}
map<string, rgw_usage_data>::const_iterator iter;
for (iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) {
string bucket;
rgw_user_bucket() {}
- rgw_user_bucket(string &u, string& b) : user(u), bucket(b) {}
+ rgw_user_bucket(const string& u, const string& b) : user(u), bucket(b) {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
round_timestamp = ts.round_to_hour();
}
- void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+ void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
lock.Lock();
if (timestamp.sec() > round_timestamp + 3600)
recalc_round_timestamp(timestamp);
entry.epoch = round_timestamp.sec();
bool account;
- rgw_user_bucket ub(entry.owner, entry.bucket);
+ string u = user.to_str();
+ rgw_user_bucket ub(u, entry.bucket);
usage_map[ub].insert(round_timestamp, entry, &account);
if (account)
num_entries++;
}
}
+ void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+ if (entry.payer.empty()) {
+ insert_user(timestamp, entry.owner, entry);
+ } else {
+ insert_user(timestamp, entry.payer, entry);
+ }
+ }
+
void flush() {
map<rgw_user_bucket, RGWUsageBatch> old_map;
lock.Lock();
return;
rgw_user user;
+ rgw_user payer;
- if (!s->bucket_name.empty())
+ if (!s->bucket_name.empty()) {
user = s->bucket_owner.get_id();
- else
+ if (s->bucket_info.requester_pays) {
+ payer = s->user->user_id;
+ }
+ } else {
user = s->user->user_id;
+ }
- string id = user.to_str();
- rgw_usage_log_entry entry(id, s->bucket.name);
+ string u = user.to_str();
+ string p = payer.to_str();
+ rgw_usage_log_entry entry(u, p, s->bucket.name);
uint64_t bytes_sent = s->cio->get_bytes_sent();
uint64_t bytes_received = s->cio->get_bytes_received();
formatter->close_section();
}
formatter->open_object_section("user");
- formatter->dump_string("owner", ub.user);
+ formatter->dump_string("user", ub.user);
formatter->open_array_section("buckets");
user_section_open = true;
last_owner = ub.user;
utime_t ut(entry.epoch, 0);
ut.gmtime(formatter->dump_stream("time"));
formatter->dump_int("epoch", entry.epoch);
+ formatter->dump_string("owner", entry.owner.to_str());
+ string payer = entry.payer.to_str();
+ if (!payer.empty()) {
+ formatter->dump_string("payer", payer);
+ }
dump_usage_categories_info(formatter, entry, categories);
formatter->close_section(); // bucket
flusher.flush();