From: Yehuda Sadeh Date: Mon, 11 Jun 2012 17:23:49 +0000 (-0700) Subject: rgw: access methods for new usage ops X-Git-Tag: v0.48argonaut~80^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=baa3aff43766ca85032d9a80ef60f016ee2cb873;p=ceph.git rgw: access methods for new usage ops Wrappers for new rados class methods, and new RGWRados methods to handle usage functionality. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 87e553d36dbb..92420e35c566 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -50,6 +50,8 @@ static rgw_bucket pi_buckets_rados = RGW_ROOT_BUCKET; static RGWObjCategory shadow_category = RGW_OBJ_CATEGORY_SHADOW; static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; +#define RGW_USAGE_OBJ_PREFIX "usage." + #define dout_subsys ceph_subsys_rgw @@ -333,6 +335,127 @@ int RGWRados::log_show_next(RGWAccessHandle handle, rgw_log_entry *entry) return 1; } +static void usage_log_hash(CephContext *cct, const string& name, string& hash, uint32_t index) +{ + uint32_t val = index; + + if (!name.empty()) { + val %= cct->_conf->rgw_usage_max_user_shards; + val += ceph_str_hash_linux(name.c_str(), name.size()); + } + char buf[16]; + snprintf(buf, sizeof(buf), RGW_USAGE_OBJ_PREFIX "%u", (unsigned)(val % cct->_conf->rgw_usage_max_shards)); + hash = buf; +} + +int RGWRados::log_usage(map& usage_info) +{ + uint32_t index = 0; + + map log_objs; + + string hash; + string last_user; + + /* restructure usage map, cluster by object hash */ + map::iterator iter; + for (iter = usage_info.begin(); iter != usage_info.end(); ++iter) { + const rgw_user_bucket& ub = iter->first; + RGWUsageInfo& info = iter->second; + + if (ub.user.empty()) { + ldout(cct, 0) << "WARNING: RGWRados::log_usage(): user name empty (bucket=" << ub.bucket << "), skipping" << dendl; + continue; + } + + if (ub.user != last_user) { + /* index *should* be random, but why waste extra cycles + in most cases max user shards is not going to exceed 1, + so just incrementing it */ + usage_log_hash(cct, ub.user, hash, index++); + } + last_user = ub.user; + vector& v = log_objs[hash].entries; + + map::iterator miter; + for (miter = info.m.begin(); miter != info.m.end(); ++miter) { + v.push_back(miter->second); + } + } + + map::iterator liter; + + for (liter = log_objs.begin(); liter != log_objs.end(); ++liter) { + int r = cls_obj_usage_log_add(liter->first, liter->second); + if (r < 0) + return r; + } + return 0; +} + +int RGWRados::read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + bool *is_truncated, RGWUsageIter& usage_iter, map& usage) +{ + uint32_t num = max_entries; + string hash, first_hash; + usage_log_hash(cct, user, first_hash, 0); + + if (usage_iter.index) { + usage_log_hash(cct, user, hash, usage_iter.index); + } else { + hash = first_hash; + } + + do { + rgw_cls_usage_log_read_ret read_ret; + map::iterator iter; + + int ret = cls_obj_usage_log_read(hash, user, start_epoch, end_epoch, num, + usage_iter.read_iter, read_ret, is_truncated); + if (ret == -ENOENT) + goto next; + + if (ret < 0) + return ret; + + num -= usage.size(); + + for (iter = read_ret.usage.begin(); iter != read_ret.usage.end(); ++iter) { + usage[iter->first].aggregate(iter->second); + } + +next: + if (!*is_truncated) { + usage_iter.read_iter.clear(); + usage_log_hash(cct, user, hash, ++usage_iter.index); + } + } while (num && !*is_truncated && hash != first_hash); + return 0; +} + +int RGWRados::trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch) +{ + uint32_t index = 0; + string hash, first_hash; + usage_log_hash(cct, user, first_hash, 0); + + hash = first_hash; + + do { + int ret = cls_obj_usage_log_trim(hash, user, start_epoch, end_epoch); + if (ret == -ENOENT) + goto next; + + if (ret < 0) + return ret; + +next: + usage_log_hash(cct, user, hash, ++index); + } while (hash != first_hash); + + return 0; +} + int RGWRados::decode_policy(bufferlist& bl, ACLOwner *owner) { bufferlist::iterator i = bl.begin(); @@ -2539,6 +2662,86 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, return m.size(); } +int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info) +{ + librados::IoCtx io_ctx; + + int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx); + if (r == -ENOENT) { + string id; + map attrs; + rgw_bucket pool(RGW_USAGE_LOG_POOL_NAME); + r = rgwstore->create_bucket(id, pool, attrs, true); + if (r < 0) + return r; + + // retry + r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx); + } + if (r < 0) + return r; + + bufferlist in, out; + rgw_cls_usage_log_add_op call; + call.info = info; + ::encode(call, in); + r = io_ctx.exec(oid, "rgw", "user_usage_log_add", in, out); + return r; +} + +int RGWRados::cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated) +{ + librados::IoCtx io_ctx; + + *is_truncated = false; + + int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx); + if (r < 0) + return r; + + bufferlist in, out; + rgw_cls_usage_log_read_op call; + call.start_epoch = start_epoch; + call.end_epoch = end_epoch; + call.owner = user; + call.max_entries = max_entries; + call.iter = read_iter; + ::encode(call, in); + r = io_ctx.exec(oid, "rgw", "user_usage_log_read", in, out); + if (r < 0) + return r; + + try { + bufferlist::iterator iter = out.begin(); + ::decode(result, iter); + read_iter = result.next_iter; + if (is_truncated) + *is_truncated = result.truncated; + } catch (buffer::error& e) { + ldout(cct, 0) << "ERROR: cls_obj_usage_log_read_op: failed to decode result" << dendl; + return -EINVAL; + } + return r; +} + +int RGWRados::cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch) +{ + librados::IoCtx io_ctx; + + int r = rados->ioctx_create(RGW_USAGE_LOG_POOL_NAME, io_ctx); + if (r < 0) + return r; + + bufferlist in, out; + rgw_cls_usage_log_read_op call; + call.start_epoch = start_epoch; + call.end_epoch = end_epoch; + ::encode(call, in); + r = io_ctx.exec(oid, "rgw", "user_usage_log_trim", in, out); + return r; +} + int RGWRados::check_disk_state(librados::IoCtx io_ctx, rgw_bucket& bucket, rgw_bucket_dir_entry& list_state, diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0c74384517e7..9f835a8499b7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -29,6 +29,23 @@ static inline void get_obj_bucket_and_oid_key(rgw_obj& obj, rgw_bucket& bucket, prepend_bucket_marker(bucket, obj.key, key); } +struct RGWUsageInfo { + map m; + + void insert(utime_t& t, rgw_usage_log_entry& entry, bool *account) { + bool exists = m.find(t) != m.end(); + *account = !exists; + m[t].aggregate(entry); + } +}; + +struct RGWUsageIter { + string read_iter; + uint32_t index; + + RGWUsageIter() : index(0) {} +}; + class RGWAccessListFilter { public: virtual ~RGWAccessListFilter() {} @@ -302,6 +319,11 @@ public: int log_show_init(const string& name, RGWAccessHandle *handle); int log_show_next(RGWAccessHandle handle, rgw_log_entry *entry); + // log bandwidth info + int log_usage(map& usage_info); + int read_usage(string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + bool *is_truncated, RGWUsageIter& read_iter, map& usage); + int trim_usage(string& user, uint64_t start_epoch, uint64_t end_epoch); /** * get listing of the objects in a bucket. @@ -556,6 +578,11 @@ public: return cls_obj_complete_cancel(bucket, tag, oid); } + int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info); + int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, + string& read_iter, rgw_cls_usage_log_read_ret& result, bool *is_truncated); + int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch); + /// clean up/process any temporary objects older than given date[/time] int remove_temp_objects(string date, string time);