]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge remote-tracking branch 'gh/wip-2516-2'
authorSage Weil <sage@inktank.com>
Tue, 12 Jun 2012 01:36:59 +0000 (18:36 -0700)
committerSage Weil <sage@inktank.com>
Tue, 12 Jun 2012 01:36:59 +0000 (18:36 -0700)
Reviewed-by: Sage Weil <sage@inktank.com>
1  2 
src/cls_rgw.cc
src/common/config_opts.h
src/rgw/rgw_main.cc

diff --cc src/cls_rgw.cc
index baf8604de594663a480f3a657e0d41b5add82e9a,edc4f865d7171867357d1a1e652ace7d8b99458d..371b08ec9a3c202303001b7c9a163157cf2f3c93
@@@ -411,9 -411,264 +414,264 @@@ int rgw_dir_suggest_changes(cls_method_
    return 0;
  }
  
+ static void usage_record_prefix_by_time(uint64_t epoch, string& key)
+ {
+   char buf[32];
+   snprintf(buf, sizeof(buf), "%011llu", (long long unsigned)epoch);
+   key = buf;
+ }
+ static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
+ {
+   char buf[32 + user.size() + bucket.size()];
+   snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
+   key = buf;
+ }
+ static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
+ {
+   char buf[32 + user.size() + bucket.size()];
+   snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
+   key = buf;
+ }
+ static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
+ {
+   bufferlist::iterator kiter = record_bl.begin();
+   try {
+     ::decode(e, kiter);
+   } catch (buffer::error& err) {
+     CLS_LOG("ERROR: usage_record_decode(): failed to decode record_bl\n");
+     return -EINVAL;
+   }
+   return 0;
+ }
+ int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+ {
+   CLS_LOG("rgw_user_usage_log_add()");
+   bufferlist::iterator in_iter = in->begin();
+   rgw_cls_usage_log_add_op op;
+   try {
+     ::decode(op, in_iter);
+   } catch (buffer::error& err) {
+     CLS_LOG("ERROR: rgw_user_usage_log_add(): failed to decode request\n");
+     return -EINVAL;
+   }
+   rgw_usage_log_info& info = op.info;
+   vector<rgw_usage_log_entry>::iterator iter;
+   for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
+     rgw_usage_log_entry& entry = *iter;
+     string key_by_time;
+     usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
+     CLS_LOG("rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
+     bufferlist record_bl;
+     int ret = cls_cxx_map_read_key(hctx, key_by_time, &record_bl);
+     if (ret < 0 && ret != -ENOENT) {
+       CLS_LOG("ERROR: rgw_user_usage_log_add(): cls_cxx_map_read_key returned %d\n", ret);
+       return -EINVAL;
+     }
+     if (ret >= 0) {
+       rgw_usage_log_entry e;
+       ret = usage_record_decode(record_bl, e);
+       if (ret < 0)
+         return ret;
+       CLS_LOG("rgw_user_usage_log_add aggregating existing bucket\n");
+       entry.aggregate(e);
+     }
+     bufferlist new_record_bl;
+     ::encode(entry, new_record_bl);
+     ret = cls_cxx_map_write_key(hctx, key_by_time, &new_record_bl);
+     if (ret < 0)
+       return ret;
+     string key_by_user;
+     usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+     ret = cls_cxx_map_write_key(hctx, key_by_user, &new_record_bl);
+     if (ret < 0)
+       return ret;
+   }
+   return 0;
+ }
+ static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end,
+                             string& user, string& key_iter, uint32_t max_entries, bool *truncated,
+                             int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+                             void *param)
+ {
+   CLS_LOG("usage_iterate_range");
+   map<string, bufferlist> keys;
+ #define NUM_KEYS 32
+   string filter_prefix;
+   string start_key, end_key;
+   bool by_user = !user.empty();
+   uint32_t i = 0;
+   string user_key;
+   if (truncated)
+     *truncated = false;
+   if (!by_user) {
+     usage_record_prefix_by_time(end, end_key);
+   } else {
+     user_key = user;
+     user_key.append("_");
+   }
+   if (key_iter.empty()) {
+     if (by_user) {
+       start_key = user;
+     } else {
+       usage_record_prefix_by_time(start, start_key);
+     }
+   } else {
+     start_key = key_iter;
+   }
+   do {
+     int ret = cls_cxx_map_read_keys(hctx, start_key, filter_prefix, NUM_KEYS, &keys);
+     if (ret < 0)
+       return ret;
+     map<string, bufferlist>::iterator iter = keys.begin();
+     if (iter == keys.end())
+       break;
+     for (; iter != keys.end(); ++iter) {
+       const string& key = iter->first;
+       rgw_usage_log_entry e;
+       if (!by_user && key.compare(end_key) >= 0)
+         return 0;
+       if (by_user && key.compare(0, user_key.size(), user_key) != 0)
+         return 0;
+       ret = usage_record_decode(iter->second, e);
+       if (ret < 0)
+         return ret;
+       if (e.epoch < start)
+       continue;
+       /* keys are sorted by epoch, so once we're past end we're done */
+       if (e.epoch >= end)
+         return 0;
+       ret = cb(hctx, key, e, param);
+       if (ret < 0)
+         return ret;
+       i++;
+       if (max_entries && (i > max_entries)) {
+         *truncated = true;
+         key_iter = key;
+         return 0;
+       }
+     }
+     iter--;
+     start_key = iter->first;
+   } while (true);
+   return 0;
+ }
+ static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
+ {
+   map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
+   rgw_user_bucket ub(entry.owner, entry.bucket);
+   rgw_usage_log_entry& le = (*usage)[ub];
+   le.aggregate(entry);
+  
+   return 0;
+ }
+ int rgw_user_usage_log_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+ {
+   CLS_LOG("rgw_user_usage_log_read()");
+   bufferlist::iterator in_iter = in->begin();
+   rgw_cls_usage_log_read_op op;
+   try {
+     ::decode(op, in_iter);
+   } catch (buffer::error& err) {
+     CLS_LOG("ERROR: rgw_user_usage_log_read(): failed to decode request\n");
+     return -EINVAL;
+   }
+   rgw_cls_usage_log_read_ret ret_info;
+   map<rgw_user_bucket, rgw_usage_log_entry> *usage = &ret_info.usage;
+   string iter = op.iter;
+ #define MAX_ENTRIES 1000
+   uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES);
+   int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage);
+   if (ret < 0)
+     return ret;
+   if (ret_info.truncated)
+     ret_info.next_iter = iter;
+   ::encode(ret_info, *out);
+   return 0;
+ }
+ static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
+ {
+   string key_by_time;
+   string key_by_user;
+   usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
+   usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+   int ret = cls_cxx_map_remove_key(hctx, key_by_time);
+   if (ret < 0)
+     return ret;
+   return cls_cxx_map_remove_key(hctx, key_by_user);
+ }
+ int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+ {
+   CLS_LOG("rgw_user_usage_log_trim()");
+   /* only continue if object exists! */
+   int ret = cls_cxx_stat(hctx, NULL, NULL);
+   if (ret < 0)
+     return ret;
+   bufferlist::iterator in_iter = in->begin();
+   rgw_cls_usage_log_trim_op op;
+   try {
+     ::decode(op, in_iter);
+   } catch (buffer::error& err) {
+     CLS_LOG("ERROR: rgw_user_log_usage_log_trim(): failed to decode request\n");
+     return -EINVAL;
+   }
+   string iter;
+   ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL);
+   if (ret < 0)
+     return ret;
+   return 0;
+ }
  void __cls_init()
  {
 -  CLS_LOG("Loaded rgw class!");
 +  CLS_LOG(1, "Loaded rgw class!");
  
    cls_register("rgw", &h_class);
    cls_register_cxx_method(h_class, "bucket_init_index", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_init_index, &h_rgw_bucket_init_index);
Simple merge
Simple merge