key = buf;
}
-static void usage_record_prefix_by_user(const string& user, uint64_t epoch, string& key)
+static void usage_record_prefix_by_user_old(const string& user, uint64_t epoch, string& key)
{
char buf[user.size() + 32];
snprintf(buf, sizeof(buf), "%s_%011llu_", user.c_str(), (long long unsigned)epoch);
key = buf;
}
+static void usage_record_prefix_by_user(const string& user, uint64_t epoch, string& key)
+{
+ usage_record_prefix_by_user_old(user, epoch, key);
+ if (user.starts_with('0')) {
+ // Use ~ prefix for name_by_user records that have keys that could potentially
+ // put them among name_by_time records.
+ key.insert(0, 1, '~');
+ }
+}
+
static void usage_record_name_by_time(uint64_t epoch, const string& user, const string& bucket, string& key)
{
char buf[32 + user.size() + bucket.size()];
key = buf;
}
-static void usage_record_name_by_user(const string& user, uint64_t epoch, const string& bucket, string& key)
+static void usage_record_name_by_user_old(const string& user, uint64_t epoch, const string& bucket, string& key)
{
char buf[32 + user.size() + bucket.size()];
snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
key = buf;
}
+static void usage_record_name_by_user(const string& user, uint64_t epoch, const string& bucket, string& key)
+{
+ usage_record_name_by_user_old(user, epoch, bucket, key);
+ if (user.starts_with('0')) {
+ // Use ~ prefix for name_by_user records that have keys that could potentially
+ // put them among name_by_time records.
+ key.insert(0, 1, '~');
+ }
+}
+
static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
{
auto kiter = record_bl.cbegin();
return -EINVAL;
}
+ const ConfigProxy& conf = cls_get_config(hctx);
+ const bool key_transition = conf->rgw_usage_log_key_transition;
+
rgw_usage_log_info& info = op.info;
for (auto iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
ret = cls_cxx_map_set_val(hctx, key_by_user, &new_record_bl);
if (ret < 0)
return ret;
+
+ if (key_transition && puser->to_str().starts_with('0')) {
+ string key_by_user_old;
+ usage_record_name_by_user_old(puser->to_str(), entry.epoch, entry.bucket, key_by_user_old);
+ (void)cls_cxx_map_remove_key(hctx, key_by_user_old);
+ }
}
return 0;
}
-static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end, const string& user,
- const string& bucket, string& key_iter, uint32_t max_entries, bool *truncated,
- int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
- void *param)
+static int usage_handle_range(cls_method_context_t hctx, uint64_t start_epoch, uint64_t end_epoch,
+ const string& start_key, const string& end_key, const string& user_key,
+ const string& bucket, string& key_iter, uint32_t max_entries, bool& truncated,
+ int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+ void *param)
{
- CLS_LOG(10, "entered %s", __func__);
-
- map<string, bufferlist> keys;
string filter_prefix;
- string start_key, end_key;
- bool by_user = !user.empty();
- string user_key;
bool truncated_status = false;
+ map<string, bufferlist> usage_logs;
+ bool by_user = !user_key.empty();
+ int cnt = 0;
- ceph_assert(truncated != nullptr);
-
- if (!by_user) {
- usage_record_prefix_by_time(end, end_key);
- } else {
- user_key = user;
- user_key.append("_");
- }
-
- if (key_iter.empty()) {
- if (by_user) {
- usage_record_prefix_by_user(user, start, start_key);
- } else {
- usage_record_prefix_by_time(start, start_key);
- }
- } else {
- start_key = key_iter;
- }
-
- CLS_LOG(20, "usage_iterate_range start_key=%s", start_key.c_str());
- int ret = cls_cxx_map_get_vals(hctx, start_key, filter_prefix, max_entries, &keys, &truncated_status);
+ CLS_LOG(20, "usage_handle_range start_key=%s", start_key.c_str());
+ int ret = cls_cxx_map_get_vals(hctx, start_key, filter_prefix, max_entries, &usage_logs, &truncated_status);
if (ret < 0)
return ret;
- *truncated = truncated_status;
-
- auto iter = keys.begin();
- if (iter == keys.end())
- return 0;
+ truncated = truncated_status;
- for (; iter != keys.end(); ++iter) {
- const string& key = iter->first;
+ for (auto log : usage_logs) {
+ const string& key = log.first;
rgw_usage_log_entry e;
key_iter = key;
if (!by_user && key.compare(end_key) >= 0) {
- CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
- *truncated = false;
- key_iter = key;
- return 0;
+ CLS_LOG(20, "usage_handle_range reached key=%s, done", key.c_str());
+ truncated = false;
+ break;
}
if (by_user && key.compare(0, user_key.size(), user_key) != 0) {
- CLS_LOG(20, "usage_iterate_range reached key=%s, done", key.c_str());
- *truncated = false;
- key_iter = key;
- return 0;
+ CLS_LOG(20, "usage_handle_range reached key=%s, done", key.c_str());
+ truncated = false;
+ break;
}
- ret = usage_record_decode(iter->second, e);
+ ret = usage_record_decode(log.second, e);
if (ret < 0)
return ret;
if (!bucket.empty() && bucket.compare(e.bucket))
continue;
- if (e.epoch < start)
+ if (e.epoch < start_epoch)
continue;
/* keys are sorted by epoch, so once we're past end we're done */
- if (e.epoch >= end) {
- *truncated = false;
- return 0;
+ if (e.epoch >= end_epoch) {
+ truncated = false;
+ break;
}
ret = cb(hctx, key, e, param);
if (ret < 0)
return ret;
+
+ ++cnt;
+ }
+
+ return cnt;
+}
+
+static int usage_iterate_range_by_user(cls_method_context_t hctx, uint64_t start_epoch, uint64_t end_epoch,
+ const string& user, const string& bucket, string& key_iter, uint32_t max_entries,
+ bool& truncated, int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+ void *param)
+{
+ ceph_assert(!user.empty());
+
+ const ConfigProxy& conf = cls_get_config(hctx);
+ const bool key_transition = conf->rgw_usage_log_key_transition;
+
+ if (key_transition &&
+ user.starts_with('0') &&
+ (key_iter.empty() || key_iter.starts_with('0'))) {
+ // During key transition for records that could have keys that fall within name_by_time records,
+ // we need to go 2 passes to cover both the old keys and the new keys.
+ // Remove this block when key_transition is deprecated.
+ string user_key = user + "_";
+ string start_key;
+ string old_key_iter = key_iter;
+ if (key_iter.empty()) {
+ usage_record_prefix_by_user_old(user, start_epoch, start_key);
+ } else {
+ start_key = key_iter;
+ }
+ int ret = usage_handle_range(hctx, start_epoch, end_epoch, start_key, "", user_key,
+ bucket, old_key_iter, max_entries, truncated, cb, param);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (truncated) {
+ key_iter = old_key_iter;
+ return 0;
+ } else {
+ // All old keys have been handled, start fresh on the new keys
+ key_iter.clear();
+ }
+ max_entries -= ret;
+ }
+
+ if (max_entries > 0) {
+ string start_key;
+ string user_key = user.starts_with('0') ? (std::string("~") + user + "_") : (user + "_");
+ // Handle new keys
+ if (key_iter.empty()) {
+ usage_record_prefix_by_user(user, start_epoch, start_key);
+ } else {
+ start_key = key_iter;
+ }
+
+ return usage_handle_range(hctx, start_epoch, end_epoch, start_key, "", user_key,
+ bucket, key_iter, max_entries, truncated, cb, param);
}
+
return 0;
}
+static int usage_iterate_range_by_time(cls_method_context_t hctx, uint64_t start_epoch, uint64_t end_epoch,
+ const string& bucket, string& key_iter, uint32_t max_entries, bool& truncated,
+ int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+ void *param)
+{
+ CLS_LOG(10, "entered %s", __func__);
+
+ string start_key, end_key;
+
+ usage_record_prefix_by_time(end_epoch, end_key);
+
+ if (key_iter.empty()) {
+ usage_record_prefix_by_time(start_epoch, start_key);
+ } else {
+ start_key = key_iter;
+ }
+
+ return usage_handle_range(hctx, start_epoch, end_epoch, start_key, end_key, "",
+ bucket, key_iter, max_entries, truncated, cb, param);
+}
+
+static int usage_iterate_range(cls_method_context_t hctx, uint64_t start_epoch, uint64_t end_epoch, const string& user,
+ const string& bucket, string& key_iter, uint32_t max_entries, bool& truncated,
+ int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+ void *param)
+{
+ CLS_LOG(10, "entered %s", __func__);
+
+ if (user.empty()) {
+ return usage_iterate_range_by_time(hctx, start_epoch, end_epoch, bucket, key_iter, max_entries, truncated, cb, param);
+ } else {
+ return usage_iterate_range_by_user(hctx, start_epoch, end_epoch, user, bucket, key_iter, max_entries, truncated, cb, param);
+ }
+}
+
static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
{
map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
string iter = op.iter;
#define MAX_ENTRIES 1000
uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES);
- int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, op.bucket, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage);
+
+ int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, op.bucket, iter, max_entries, ret_info.truncated, usage_log_read_cb, (void *)usage);
if (ret < 0)
return ret;
return 0;
}
+struct usage_trim_param
+{
+ bool found = false;
+ const bool key_transition = false;
+};
+
static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
{
- bool *found = (bool *)param;
- if (found) {
- *found = true;
- }
+ usage_trim_param *trim_param = (usage_trim_param *)param;
+
+ ceph_assert(trim_param != nullptr);
+
+ trim_param->found = true;
+
string key_by_time;
string key_by_user;
- string o = entry.owner.to_str();
+ string o = entry.payer.empty() ? entry.owner.to_str() : entry.payer.to_str();
usage_record_name_by_time(entry.epoch, o, entry.bucket, key_by_time);
usage_record_name_by_user(o, entry.epoch, entry.bucket, key_by_user);
if (ret < 0)
return ret;
+ if (trim_param->key_transition && o.starts_with('0')) {
+ string key_by_user_old;
+ usage_record_name_by_user_old(o, entry.epoch, entry.bucket, key_by_user_old);
+ (void)cls_cxx_map_remove_key(hctx, key_by_user_old);
+ }
+
return cls_cxx_map_remove_key(hctx, key_by_user);
}
string iter;
bool more;
- bool found = false;
+ const ConfigProxy& conf = cls_get_config(hctx);
+ usage_trim_param trim_param{false, conf->rgw_usage_log_key_transition};
+
#define MAX_USAGE_TRIM_ENTRIES 1000
- ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, op.bucket, iter, MAX_USAGE_TRIM_ENTRIES, &more, usage_log_trim_cb, (void *)&found);
+ ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, op.bucket, iter, MAX_USAGE_TRIM_ENTRIES, more, usage_log_trim_cb, (void *)&trim_param);
+
if (ret < 0)
return ret;
- if (!more && !found)
+ if (!more && !trim_param.found)
return -ENODATA;
return 0;
ASSERT_EQ(0, destroy_one_pool_pp(gc_pool_name, rados));
}
-auto populate_usage_log_info(std::string user, std::string payer, int total_usage_entries)
+auto populate_usage_log_info(std::string user, std::string payer, int total_usage_entries, uint64_t epoch = 0)
{
rgw_usage_log_info info;
for (int i=0; i < total_usage_entries; i++){
auto bucket = str_int("bucket", i);
info.entries.emplace_back(rgw_usage_log_entry(user, payer, bucket));
+ info.entries.back().epoch = epoch;
}
return info;
}
-auto gen_usage_log_info(std::string payer, std::string bucket, int total_usage_entries)
+auto gen_usage_log_info(std::string payer, std::string bucket, int total_usage_entries, uint64_t epoch = 0)
{
rgw_usage_log_info info;
for (int i=0; i < total_usage_entries; i++){
auto user = str_int("user", i);
info.entries.emplace_back(rgw_usage_log_entry(user, payer, bucket));
+ info.entries.back().epoch = epoch;
}
return info;
}
+// Copied from cls_rgw.cc in order to populate usage logs with old keys
+static void usage_record_name_by_time(uint64_t epoch, const std::string& user, const std::string& bucket, std::string& key)
+{
+ char buf[32 + user.size() + bucket.size()];
+ snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
+ key = buf;
+}
+
+static void usage_record_name_by_user_old(const std::string& user, uint64_t epoch, const std::string& bucket, std::string& key)
+{
+ char buf[32 + user.size() + bucket.size()];
+ snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
+ key = buf;
+}
+
+void populate_old_usage_log_info(librados::IoCtx &ioctx,
+ std::string& oid,
+ std::string& user,
+ std::string& payer,
+ int total_usage_entries,
+ uint64_t epoch)
+{
+ for (int i=0; i < total_usage_entries; ++i) {
+ auto bucket = str_int("bucket", i);
+ rgw_usage_log_entry entry(user, payer, bucket);
+ entry.epoch = epoch;
+ bufferlist bl;
+ encode(entry, bl);
+
+ std::string key_by_time;
+ std::string key_by_user;
+ std::string owner = payer.empty() ? user : payer;
+
+ usage_record_name_by_time(epoch, owner, bucket, key_by_time);
+ usage_record_name_by_user_old(owner, epoch, bucket, key_by_user);
+
+ map<std::string, bufferlist> omap_records{
+ {key_by_time, bl},
+ {key_by_user, bl}
+ };
+
+ librados::ObjectWriteOperation op;
+ op.omap_set(omap_records);
+ ASSERT_EQ(0, ioctx.operate(oid, &op));
+ }
+}
+
+TEST_F(cls_rgw, usage_key_transition)
+{
+ string oid="usage.1";
+ string user="012-345-678";
+ uint64_t an_hour{3600}, start_epoch{0}, log_epoch{1755892800}, end_epoch{log_epoch + an_hour * 3};
+ int total_usage_entries = 8, extra_entries = 4;
+ uint64_t max_entries = 12;
+ string payer;
+
+ // Populate logs with old keys
+ populate_old_usage_log_info(ioctx, oid, user, payer, total_usage_entries, log_epoch);
+ populate_old_usage_log_info(ioctx, oid, user, payer, total_usage_entries, log_epoch + an_hour);
+
+ // Populate logs with new keys with the same epoch as the last old ones. They override the old ones.
+ auto info = populate_usage_log_info(user, payer, total_usage_entries / 2, log_epoch + an_hour);
+ ObjectWriteOperation op;
+ cls_rgw_usage_log_add(op, info);
+ ASSERT_EQ(0, ioctx.operate(oid, &op));
+
+ // Populate logs with new keys with the new epoch
+ info = populate_usage_log_info(user, payer, total_usage_entries + extra_entries, log_epoch + an_hour * 2);
+ ObjectWriteOperation another_op;
+ cls_rgw_usage_log_add(another_op, info);
+ ASSERT_EQ(0, ioctx.operate(oid, &another_op));
+
+ string read_iter;
+ map <rgw_user_bucket, rgw_usage_log_entry> usage, usage2, usage3;
+ bool truncated;
+ int ret = cls_rgw_usage_log_read(ioctx, oid, user, payer, start_epoch, end_epoch, max_entries, read_iter, usage, &truncated);
+ ASSERT_EQ(ret, 0);
+ ASSERT_TRUE(truncated);
+ ASSERT_TRUE(!read_iter.empty() && read_iter.at(0) == '0'); // Old key;
+ ASSERT_EQ(usage.size(), total_usage_entries);
+
+ ret = cls_rgw_usage_log_read(ioctx, oid, user, payer, start_epoch, end_epoch, max_entries, read_iter, usage2, &truncated);
+ ASSERT_EQ(ret, 0);
+ ASSERT_TRUE(truncated); // Still have more left
+ ASSERT_TRUE(!read_iter.empty() && read_iter.at(0) == '~'); // New key;
+ ASSERT_EQ(usage2.size(), total_usage_entries);
+
+ ret = cls_rgw_usage_log_read(ioctx, oid, user, payer, start_epoch, end_epoch, max_entries, read_iter, usage3, &truncated);
+ ASSERT_EQ(ret, 0);
+ ASSERT_FALSE(truncated); // Nothing left
+ ASSERT_TRUE(read_iter.empty()); // Done
+ ASSERT_EQ(usage3.size(), extra_entries);
+
+ ret = cls_rgw_usage_log_trim(ioctx, oid, user, payer, start_epoch, end_epoch);
+ ASSERT_EQ(ret, 0);
+
+ usage.clear();
+ ret = cls_rgw_usage_log_read(ioctx, oid, user, payer, start_epoch, end_epoch, max_entries, read_iter, usage, &truncated);
+ ASSERT_EQ(ret, 0);
+ ASSERT_FALSE(truncated); // Nothing left
+ ASSERT_EQ(usage.size(), 0); // Got nothing
+}
+
TEST_F(cls_rgw, usage_basic)
{
string oid="usage.1";
string user="user1";
- uint64_t start_epoch{0}, end_epoch{(uint64_t) -1};
+ uint64_t a_minute{60}, an_hour{3600};
+ uint64_t start_epoch{0}, log_epoch{1755892800}, end_epoch{log_epoch + a_minute};
int total_usage_entries = 512;
uint64_t max_entries = 2000;
string payer;
- auto info = populate_usage_log_info(user, payer, total_usage_entries);
+ auto info = populate_usage_log_info(user, payer, total_usage_entries, log_epoch);
ObjectWriteOperation op;
cls_rgw_usage_log_add(op, info);
ASSERT_EQ(0, ioctx.operate(oid, &op));
+ // Slip in some records of a user whose name could cause those records to be among key_by_time ones.
+ auto extra_info = populate_usage_log_info("01234", payer, total_usage_entries, log_epoch + an_hour);
+ ObjectWriteOperation extra_op;
+ cls_rgw_usage_log_add(extra_op, extra_info);
+ ASSERT_EQ(0, ioctx.operate(oid, &extra_op));
+
string read_iter;
map <rgw_user_bucket, rgw_usage_log_entry> usage, usage2;
bool truncated;
-
int ret = cls_rgw_usage_log_read(ioctx, oid, user, "", start_epoch, end_epoch,
max_entries, read_iter, usage, &truncated);
// read the entries, and see that we have all the added entries
ASSERT_EQ(static_cast<uint64_t>(total_usage_entries), usage.size());
// delete and read to assert that we've deleted all the values
- ASSERT_EQ(0, cls_rgw_usage_log_trim(ioctx, oid, user, "", start_epoch, end_epoch));
-
+ ASSERT_EQ(0, cls_rgw_usage_log_trim(ioctx, oid, "", "", start_epoch, end_epoch));
ret = cls_rgw_usage_log_read(ioctx, oid, user, "", start_epoch, end_epoch,
max_entries, read_iter, usage2, &truncated);