static RGWObjCategory shadow_category = RGW_OBJ_CATEGORY_SHADOW;
static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
+#define RGW_USAGE_OBJ_PREFIX "usage."
+
#define dout_subsys ceph_subsys_rgw
return 1;
}
+static void usage_log_hash(CephContext *cct, const string& name, string& hash, uint32_t index)
+{
+ uint32_t val = index;
+
+ if (!name.empty()) {
+ val %= cct->_conf->rgw_usage_max_user_shards;
+ val += ceph_str_hash_linux(name.c_str(), name.size());
+ }
+ char buf[16];
+ snprintf(buf, sizeof(buf), RGW_USAGE_OBJ_PREFIX "%u", (unsigned)(val % cct->_conf->rgw_usage_max_shards));
+ hash = buf;
+}
+
+int RGWRados::log_usage(map<rgw_user_bucket, RGWUsageInfo>& usage_info)
+{
+ uint32_t index = 0;
+
+ map<string, rgw_usage_log_info> log_objs;
+
+ string hash;
+ string last_user;
+
+ /* restructure usage map, cluster by object hash */
+ map<rgw_user_bucket, RGWUsageInfo>::iterator iter;
+ for (iter = usage_info.begin(); iter != usage_info.end(); ++iter) {
+ const rgw_user_bucket& ub = iter->first;
+ RGWUsageInfo& info = iter->second;
+
+ if (ub.user.empty()) {
+ ldout(cct, 0) << "WARNING: RGWRados::log_usage(): user name empty (bucket=" << ub.bucket << "), skipping" << dendl;
+ continue;
+ }
+
+ if (ub.user != last_user) {
+ /* index *should* be random, but why waste extra cycles
+ in most cases max user shards is not going to exceed 1,
+ so just incrementing it */
+ usage_log_hash(cct, ub.user, hash, index++);
+ }
+ last_user = ub.user;
+ vector<rgw_usage_log_entry>& v = log_objs[hash].entries;
+
+ map<utime_t, rgw_usage_log_entry>::iterator miter;
+ for (miter = info.m.begin(); miter != info.m.end(); ++miter) {
+ v.push_back(miter->second);
+ }
+ }
+
+ map<string, rgw_usage_log_info>::iterator liter;
+
+ for (liter = log_objs.begin(); liter != log_objs.end(); ++liter) {
+ int r = cls_obj_usage_log_add(liter->first, liter->second);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+}
+
+int RGWRados::read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+ bool *is_truncated, RGWUsageIter& usage_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage)
+{
+ uint32_t num = max_entries;
+ string hash, first_hash;
+ usage_log_hash(cct, user, first_hash, 0);
+
+ if (usage_iter.index) {
+ usage_log_hash(cct, user, hash, usage_iter.index);
+ } else {
+ hash = first_hash;
+ }
+
+ do {
+ rgw_cls_usage_log_read_ret read_ret;
+ map<rgw_user_bucket, rgw_usage_log_entry>::iterator iter;
+
+ int ret = cls_obj_usage_log_read(hash, user, start_epoch, end_epoch, num,
+ usage_iter.read_iter, read_ret, is_truncated);
+ if (ret == -ENOENT)
+ goto next;
+
+ if (ret < 0)
+ return ret;
+
+ num -= usage.size();
+
+ for (iter = read_ret.usage.begin(); iter != read_ret.usage.end(); ++iter) {
+ usage[iter->first].aggregate(iter->second);
+ }
+
+next:
+ if (!*is_truncated) {
+ usage_iter.read_iter.clear();
+ usage_log_hash(cct, user, hash, ++usage_iter.index);
+ }
+ } while (num && !*is_truncated && hash != first_hash);
+ return 0;
+}
+
+int RGWRados::trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch)
+{
+ uint32_t index = 0;
+ string hash, first_hash;
+ usage_log_hash(cct, user, first_hash, 0);
+
+ hash = first_hash;
+
+ do {
+ int ret = cls_obj_usage_log_trim(hash, user, start_epoch, end_epoch);
+ if (ret == -ENOENT)
+ goto next;
+
+ if (ret < 0)
+ return ret;
+
+next:
+ usage_log_hash(cct, user, hash, ++index);
+ } while (hash != first_hash);
+
+ return 0;
+}
+
int RGWRados::decode_policy(bufferlist& bl, ACLOwner *owner)
{
bufferlist::iterator i = bl.begin();
return m.size();
}
+int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
+{
+ librados::IoCtx io_ctx;
+
+ int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+ if (r == -ENOENT) {
+ string id;
+ map<std::string, bufferlist> attrs;
+ rgw_bucket pool(RGW_USAGE_LOG_POOL_NAME);
+ r = rgwstore->create_bucket(id, pool, attrs, true);
+ if (r < 0)
+ return r;
+
+ // retry
+ r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+ }
+ if (r < 0)
+ return r;
+
+ bufferlist in, out;
+ rgw_cls_usage_log_add_op call;
+ call.info = info;
+ ::encode(call, in);
+ r = io_ctx.exec(oid, "rgw", "user_usage_log_add", in, out);
+ return r;
+}
+
+int RGWRados::cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+ string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated)
+{
+ librados::IoCtx io_ctx;
+
+ *is_truncated = false;
+
+ int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+ if (r < 0)
+ return r;
+
+ bufferlist in, out;
+ rgw_cls_usage_log_read_op call;
+ call.start_epoch = start_epoch;
+ call.end_epoch = end_epoch;
+ call.owner = user;
+ call.max_entries = max_entries;
+ call.iter = read_iter;
+ ::encode(call, in);
+ r = io_ctx.exec(oid, "rgw", "user_usage_log_read", in, out);
+ if (r < 0)
+ return r;
+
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(result, iter);
+ read_iter = result.next_iter;
+ if (is_truncated)
+ *is_truncated = result.truncated;
+ } catch (buffer::error& e) {
+ ldout(cct, 0) << "ERROR: cls_obj_usage_log_read_op: failed to decode result" << dendl;
+ return -EINVAL;
+ }
+ return r;
+}
+
+int RGWRados::cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch)
+{
+ librados::IoCtx io_ctx;
+
+ int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx);
+ if (r < 0)
+ return r;
+
+ bufferlist in, out;
+ rgw_cls_usage_log_read_op call;
+ call.start_epoch = start_epoch;
+ call.end_epoch = end_epoch;
+ ::encode(call, in);
+ r = io_ctx.exec(oid, "rgw", "user_usage_log_trim", in, out);
+ return r;
+}
+
int RGWRados::check_disk_state(librados::IoCtx io_ctx,
rgw_bucket& bucket,
rgw_bucket_dir_entry& list_state,
prepend_bucket_marker(bucket, obj.key, key);
}
+struct RGWUsageInfo {
+ map<utime_t, rgw_usage_log_entry> m;
+
+ void insert(utime_t& t, rgw_usage_log_entry& entry, bool *account) {
+ bool exists = m.find(t) != m.end();
+ *account = !exists;
+ m[t].aggregate(entry);
+ }
+};
+
+struct RGWUsageIter {
+ string read_iter;
+ uint32_t index;
+
+ RGWUsageIter() : index(0) {}
+};
+
class RGWAccessListFilter {
public:
virtual ~RGWAccessListFilter() {}
int log_show_init(const string& name, RGWAccessHandle *handle);
int log_show_next(RGWAccessHandle handle, rgw_log_entry *entry);
+ // log bandwidth info
+ int log_usage(map<rgw_user_bucket, RGWUsageInfo>& usage_info);
+ int read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+ bool *is_truncated, RGWUsageIter& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage);
+ int trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch);
/**
* get listing of the objects in a bucket.
return cls_obj_complete_cancel(bucket, tag, oid);
}
+ int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
+ int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
+ string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated);
+ int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch);
+
/// clean up/process any temporary objects older than given date[/time]
int remove_temp_objects(string date, string time);