]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: access methods for new usage ops
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 11 Jun 2012 17:23:49 +0000 (10:23 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 11 Jun 2012 20:25:51 +0000 (13:25 -0700)
Wrappers for new rados class methods, and new RGWRados
methods to handle usage functionality.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 87e553d36dbb00858c0a38c0a925d7ea43a35e13..92420e35c56605fbd86eaf91de8b85d320e5fd45 100644 (file)
@@ -50,6 +50,8 @@ static rgw_bucket pi_buckets_rados = RGW_ROOT_BUCKET;
 static RGWObjCategory shadow_category = RGW_OBJ_CATEGORY_SHADOW;
 static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
 
+#define RGW_USAGE_OBJ_PREFIX "usage."
+
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -333,6 +335,127 @@ int RGWRados::log_show_next(RGWAccessHandle handle, rgw_log_entry *entry)
   return 1;
 }
 
+static void usage_log_hash(CephContext *cct, const string& name, string& hash, uint32_t index)
+{
+  uint32_t val = index;
+
+  if (!name.empty()) {
+    val %= cct->_conf->rgw_usage_max_user_shards;
+    val += ceph_str_hash_linux(name.c_str(), name.size());
+  }
+  char buf[16];
+  snprintf(buf, sizeof(buf), RGW_USAGE_OBJ_PREFIX "%u", (unsigned)(val % cct->_conf->rgw_usage_max_shards));
+  hash = buf;
+}
+
+int RGWRados::log_usage(map<rgw_user_bucket, RGWUsageInfo>& usage_info)
+{
+  uint32_t index = 0;
+
+  map<string, rgw_usage_log_info> log_objs;
+
+  string hash;
+  string last_user;
+
+  /* restructure usage map, cluster by object hash */
+  map<rgw_user_bucket, RGWUsageInfo>::iterator iter;
+  for (iter = usage_info.begin(); iter != usage_info.end(); ++iter) {
+    const rgw_user_bucket& ub = iter->first;
+    RGWUsageInfo& info = iter->second;
+
+    if (ub.user.empty()) {
+      ldout(cct, 0) << "WARNING: RGWRados::log_usage(): user name empty (bucket=" << ub.bucket << "), skipping" << dendl;
+      continue;
+    }
+
+    if (ub.user != last_user) {
+      /* index *should* be random, but why waste extra cycles
+         in most cases max user shards is not going to exceed 1,
+         so just incrementing it */
+      usage_log_hash(cct, ub.user, hash, index++);
+    }
+    last_user = ub.user;
+    vector<rgw_usage_log_entry>& v = log_objs[hash].entries;
+
+    map<utime_t, rgw_usage_log_entry>::iterator miter;
+    for (miter = info.m.begin(); miter != info.m.end(); ++miter) {
+      v.push_back(miter->second);
+    }
+  }
+
+  map<string, rgw_usage_log_info>::iterator liter;
+
+  for (liter = log_objs.begin(); liter != log_objs.end(); ++liter) {
+    int r = cls_obj_usage_log_add(liter->first, liter->second);
+    if (r < 0)
+      return r;
+  }
+  return 0;
+}
+
+int RGWRados::read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+                         bool *is_truncated, RGWUsageIter& usage_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage)
+{
+  uint32_t num = max_entries;
+  string hash, first_hash;
+  usage_log_hash(cct, user, first_hash, 0);
+
+  if (usage_iter.index) {
+    usage_log_hash(cct, user, hash, usage_iter.index);
+  } else {
+    hash = first_hash;
+  }
+
+  do {
+    rgw_cls_usage_log_read_ret read_ret;
+    map<rgw_user_bucket, rgw_usage_log_entry>::iterator iter;
+
+    int ret =  cls_obj_usage_log_read(hash, user, start_epoch, end_epoch, num,
+                                    usage_iter.read_iter, read_ret, is_truncated);
+    if (ret == -ENOENT)
+      goto next;
+
+    if (ret < 0)
+      return ret;
+
+    num -= usage.size();
+
+    for (iter = read_ret.usage.begin(); iter != read_ret.usage.end(); ++iter) {
+      usage[iter->first].aggregate(iter->second);
+    }
+
+next:
+    if (!*is_truncated) {
+      usage_iter.read_iter.clear();
+      usage_log_hash(cct, user, hash, ++usage_iter.index);
+    }
+  } while (num && !*is_truncated && hash != first_hash);
+  return 0;
+}
+
+int RGWRados::trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch)
+{
+  uint32_t index = 0;
+  string hash, first_hash;
+  usage_log_hash(cct, user, first_hash, 0);
+
+  hash = first_hash;
+
+  do {
+    int ret =  cls_obj_usage_log_trim(hash, user, start_epoch, end_epoch);
+    if (ret == -ENOENT)
+      goto next;
+
+    if (ret < 0)
+      return ret;
+
+next:
+    usage_log_hash(cct, user, hash, ++index);
+  } while (hash != first_hash);
+
+  return 0;
+}
+
 int RGWRados::decode_policy(bufferlist& bl, ACLOwner *owner)
 {
   bufferlist::iterator i = bl.begin();
@@ -2539,6 +2662,86 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
   return m.size();
 }
 
+int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
+{
+  librados::IoCtx io_ctx;
+
+  int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+  if (r == -ENOENT) {
+    string id;
+    map<std::string, bufferlist> attrs;
+    rgw_bucket pool(RGW_USAGE_LOG_POOL_NAME);
+    r = rgwstore->create_bucket(id, pool, attrs, true);
+    if (r < 0)
+      return r;
+    // retry
+    r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+  }
+  if (r < 0)
+    return r;
+
+  bufferlist in, out;
+  rgw_cls_usage_log_add_op call;
+  call.info = info;
+  ::encode(call, in);
+  r = io_ctx.exec(oid, "rgw", "user_usage_log_add", in, out);
+  return r;
+}
+
+int RGWRados::cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+                                     string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated)
+{
+  librados::IoCtx io_ctx;
+
+  *is_truncated = false;
+
+  int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+  if (r < 0)
+    return r;
+
+  bufferlist in, out;
+  rgw_cls_usage_log_read_op call;
+  call.start_epoch = start_epoch;
+  call.end_epoch = end_epoch;
+  call.owner = user;
+  call.max_entries = max_entries;
+  call.iter = read_iter;
+  ::encode(call, in);
+  r = io_ctx.exec(oid, "rgw", "user_usage_log_read", in, out);
+  if (r < 0)
+    return r;
+
+  try {
+    bufferlist::iterator iter = out.begin();
+    ::decode(result, iter);
+    read_iter = result.next_iter;
+    if (is_truncated)
+      *is_truncated = result.truncated;
+  } catch (buffer::error& e) {
+    ldout(cct, 0) << "ERROR: cls_obj_usage_log_read_op: failed to decode result" << dendl;
+    return -EINVAL;
+  }
+  return r;
+}
+
+int RGWRados::cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch)
+{
+  librados::IoCtx io_ctx;
+
+  int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+  if (r < 0)
+    return r;
+
+  bufferlist in, out;
+  rgw_cls_usage_log_read_op call;
+  call.start_epoch = start_epoch;
+  call.end_epoch = end_epoch;
+  ::encode(call, in);
+  r = io_ctx.exec(oid, "rgw", "user_usage_log_trim", in, out);
+  return r;
+}
+
 int RGWRados::check_disk_state(librados::IoCtx io_ctx,
                                rgw_bucket& bucket,
                                rgw_bucket_dir_entry& list_state,
index 0c74384517e7f46a4ec2e7a82c15ead0fe096952..9f835a8499b73b399654b3b7f6e8675b845c61cc 100644 (file)
@@ -29,6 +29,23 @@ static inline void get_obj_bucket_and_oid_key(rgw_obj& obj, rgw_bucket& bucket,
   prepend_bucket_marker(bucket, obj.key, key);
 }
 
+struct RGWUsageInfo {
+  map<utime_t, rgw_usage_log_entry> m;
+
+  void insert(utime_t& t, rgw_usage_log_entry& entry, bool *account) {
+    bool exists = m.find(t) != m.end();
+    *account = !exists;
+    m[t].aggregate(entry);
+  }
+};
+
+struct RGWUsageIter {
+  string read_iter;
+  uint32_t index;
+
+  RGWUsageIter() : index(0) {}
+};
+
 class RGWAccessListFilter {
 public:
   virtual ~RGWAccessListFilter() {}
@@ -302,6 +319,11 @@ public:
   int log_show_init(const string& name, RGWAccessHandle *handle);
   int log_show_next(RGWAccessHandle handle, rgw_log_entry *entry);
 
+  // log bandwidth info
+  int log_usage(map<rgw_user_bucket, RGWUsageInfo>& usage_info);
+  int read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+                 bool *is_truncated, RGWUsageIter& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage);
+  int trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch);
 
   /**
    * get listing of the objects in a bucket.
@@ -556,6 +578,11 @@ public:
     return cls_obj_complete_cancel(bucket, tag, oid);
   }
 
+  int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
+  int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+                             string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated);
+  int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch);
+
   /// clean up/process any temporary objects older than given date[/time]
   int remove_temp_objects(string date, string time);