]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aggregate usage by payer
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 29 Feb 2016 23:19:19 +0000 (15:19 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 29 Feb 2016 23:19:19 +0000 (15:19 -0800)
If bucket has requester-payer property set, aggregate the usage data
on it by the user that did the operation.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Signed-off-by: Javier M. Mellid <jmunhoz@igalia.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_log.cc
src/rgw/rgw_usage.cc

index 0fd88905eb1616402c08d7570f471eba73720fc8..421f8b1b35ffe07b67be1272f3291cd861e22dbc 100644 (file)
@@ -2581,14 +2581,14 @@ static void usage_record_prefix_by_user(string& user, uint64_t epoch, string& ke
   key = buf;
 }
 
-static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
+static void usage_record_name_by_time(uint64_t epoch, const string& user, string& bucket, string& key)
 {
   char buf[32 + user.size() + bucket.size()];
   snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
   key = buf;
 }
 
-static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
+static void usage_record_name_by_user(const string& user, uint64_t epoch, string& bucket, string& key)
 {
   char buf[32 + user.size() + bucket.size()];
   snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
@@ -2628,9 +2628,12 @@ int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist
   for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
     rgw_usage_log_entry& entry = *iter;
     string key_by_time;
-    usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
 
-    CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
+    rgw_user *puser = (entry.payer.empty() ? &entry.owner : &entry.payer);
+
+    usage_record_name_by_time(entry.epoch, puser->to_str(), entry.bucket, key_by_time);
+
+    CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", puser->to_str().c_str(), entry.bucket.c_str());
 
     bufferlist record_bl;
     int ret = cls_cxx_map_get_val(hctx, key_by_time, &record_bl);
@@ -2654,7 +2657,7 @@ int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist
       return ret;
 
     string key_by_user;
-    usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+    usage_record_name_by_user(puser->to_str(), entry.epoch, entry.bucket, key_by_user);
     ret = cls_cxx_map_set_val(hctx, key_by_user, &new_record_bl);
     if (ret < 0)
       return ret;
@@ -2756,7 +2759,13 @@ static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64
 static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
 {
   map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
-  rgw_user_bucket ub(entry.owner, entry.bucket);
+  rgw_user *puser;
+  if (!entry.payer.empty()) {
+    puser = &entry.payer;
+  } else {
+    puser = &entry.owner;
+  }
+  rgw_user_bucket ub(puser->to_str(), entry.bucket);
   rgw_usage_log_entry& le = (*usage)[ub];
   le.aggregate(entry);
  
@@ -2798,8 +2807,9 @@ static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_u
   string key_by_time;
   string key_by_user;
 
-  usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
-  usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+  string o = entry.owner.to_str();
+  usage_record_name_by_time(entry.epoch, o, entry.bucket, key_by_time);
+  usage_record_name_by_user(o, entry.epoch, entry.bucket, key_by_user);
 
   int ret = cls_cxx_map_remove_key(hctx, key_by_time);
   if (ret < 0)
index 053d716e292ac8c8354def87770fee404f50a51a..0ab665857cb94c07332074e59228fd6ea0506637 100644 (file)
@@ -507,16 +507,23 @@ WRITE_CLASS_ENCODER(rgw_cls_obj_check_mtime)
 
 struct rgw_cls_usage_log_add_op {
   rgw_usage_log_info info;
+  rgw_user user;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     ::encode(info, bl);
+    ::encode(user.to_str(), bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     ::decode(info, bl);
+    if (struct_v >= 2) {
+      string s;
+      ::decode(s, bl);
+      user.from_str(s);
+    }
     DECODE_FINISH(bl);
   }
 };
index 3da746851b7ca1b8b8aee94a878288f1be25922c..9efc2288d57cf3041cfbf744116350e176eeefa5 100644 (file)
@@ -7,6 +7,8 @@
 #include "include/utime.h"
 #include "common/Formatter.h"
 
+#include "rgw/rgw_basic_types.h"
+
 #define CEPH_RGW_REMOVE 'r'
 #define CEPH_RGW_UPDATE 'u'
 #define CEPH_RGW_TAG_TIMEOUT 60*60*24
@@ -662,7 +664,8 @@ WRITE_CLASS_ENCODER(rgw_usage_data)
 
 
 struct rgw_usage_log_entry {
-  string owner;
+  rgw_user owner;
+  rgw_user payer; /* if empty, same as owner */
   string bucket;
   uint64_t epoch;
   rgw_usage_data total_usage; /* this one is kept for backwards compatibility */
@@ -670,10 +673,11 @@ struct rgw_usage_log_entry {
 
   rgw_usage_log_entry() : epoch(0) {}
   rgw_usage_log_entry(string& o, string& b) : owner(o), bucket(b), epoch(0) {}
+  rgw_usage_log_entry(string& o, string& p, string& b) : owner(o), payer(p), bucket(b), epoch(0) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
-    ::encode(owner, bl);
+    ENCODE_START(3, 1, bl);
+    ::encode(owner.to_str(), bl);
     ::encode(bucket, bl);
     ::encode(epoch, bl);
     ::encode(total_usage.bytes_sent, bl);
@@ -681,13 +685,16 @@ struct rgw_usage_log_entry {
     ::encode(total_usage.ops, bl);
     ::encode(total_usage.successful_ops, bl);
     ::encode(usage_map, bl);
+    ::encode(payer.to_str(), bl);
     ENCODE_FINISH(bl);
   }
 
 
    void decode(bufferlist::iterator& bl) {
-    DECODE_START(2, bl);
-    ::decode(owner, bl);
+    DECODE_START(3, bl);
+    string s;
+    ::decode(s, bl);
+    owner.from_str(s);
     ::decode(bucket, bl);
     ::decode(epoch, bl);
     ::decode(total_usage.bytes_sent, bl);
@@ -699,6 +706,11 @@ struct rgw_usage_log_entry {
     } else {
       ::decode(usage_map, bl);
     }
+    if (struct_v >= 3) {
+      string p;
+      ::decode(p, bl);
+      payer.from_str(p);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -707,6 +719,10 @@ struct rgw_usage_log_entry {
       owner = e.owner;
       bucket = e.bucket;
       epoch = e.epoch;
+      payer = e.payer;
+    }
+    if (payer != e.payer) {
+      return; /* can't aggregate differet payers */
     }
     map<string, rgw_usage_data>::const_iterator iter;
     for (iter = e.usage_map.begin(); iter != e.usage_map.end(); ++iter) {
@@ -756,7 +772,7 @@ struct rgw_user_bucket {
   string bucket;
 
   rgw_user_bucket() {}
-  rgw_user_bucket(string &u, string& b) : user(u), bucket(b) {}
+  rgw_user_bucket(const string& u, const string& b) : user(u), bucket(b) {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
index 5faffecc34ea5500685c9b13253c6eab56710507..eecb990e8b3353435c0a42d516724d19811a3446 100644 (file)
@@ -127,13 +127,14 @@ public:
     round_timestamp = ts.round_to_hour();
   }
 
-  void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+  void insert_user(utime_t& timestamp, const rgw_user& user, rgw_usage_log_entry& entry) {
     lock.Lock();
     if (timestamp.sec() > round_timestamp + 3600)
       recalc_round_timestamp(timestamp);
     entry.epoch = round_timestamp.sec();
     bool account;
-    rgw_user_bucket ub(entry.owner, entry.bucket);
+    string u = user.to_str();
+    rgw_user_bucket ub(u, entry.bucket);
     usage_map[ub].insert(round_timestamp, entry, &account);
     if (account)
       num_entries++;
@@ -145,6 +146,14 @@ public:
     }
   }
 
+  void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+    if (entry.payer.empty()) {
+      insert_user(timestamp, entry.owner, entry);
+    } else {
+      insert_user(timestamp, entry.payer, entry);
+    }
+  }
+
   void flush() {
     map<rgw_user_bucket, RGWUsageBatch> old_map;
     lock.Lock();
@@ -178,14 +187,20 @@ static void log_usage(struct req_state *s, const string& op_name)
     return;
 
   rgw_user user;
+  rgw_user payer;
 
-  if (!s->bucket_name.empty())
+  if (!s->bucket_name.empty()) {
     user = s->bucket_owner.get_id();
-  else
+    if (s->bucket_info.requester_pays) {
+      payer = s->user->user_id;
+    }
+  } else {
     user = s->user->user_id;
+  }
 
-  string id = user.to_str();
-  rgw_usage_log_entry entry(id, s->bucket.name);
+  string u = user.to_str();
+  string p = payer.to_str();
+  rgw_usage_log_entry entry(u, p, s->bucket.name);
 
   uint64_t bytes_sent = s->cio->get_bytes_sent();
   uint64_t bytes_received = s->cio->get_bytes_received();
index f8495c529e8acfd753af14f44be34f6e7863ecee..82ab5ae584bfbd88935b7cd67c585767d7890bf1 100644 (file)
@@ -78,7 +78,7 @@ int RGWUsage::show(RGWRados *store, rgw_user& uid, uint64_t start_epoch,
             formatter->close_section();
           }
           formatter->open_object_section("user");
-          formatter->dump_string("owner", ub.user);
+          formatter->dump_string("user", ub.user);
           formatter->open_array_section("buckets");
           user_section_open = true;
           last_owner = ub.user;
@@ -88,6 +88,11 @@ int RGWUsage::show(RGWRados *store, rgw_user& uid, uint64_t start_epoch,
         utime_t ut(entry.epoch, 0);
         ut.gmtime(formatter->dump_stream("time"));
         formatter->dump_int("epoch", entry.epoch);
+        formatter->dump_string("owner", entry.owner.to_str());
+        string payer = entry.payer.to_str();
+        if (!payer.empty()) {
+          formatter->dump_string("payer", payer);
+        }
         dump_usage_categories_info(formatter, entry, categories);
         formatter->close_section(); // bucket
         flusher.flush();