return;
}
-void RGWObjectExpirer::proceed_single_shard(const string& shard,
+void RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
{
return;
}
-void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
- const utime_t& round_start)
+void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start)
{
bool is_next_available;
utime_t shard_marker;
- do {
+ CephContext *cct = store->ctx();
+ int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
+
+ for (int i = 0; i < num_shards; i++) {
string shard;
- store->objexp_get_shard(last_run, round_start, shard_marker, shard,
- is_next_available);
+ store->objexp_get_shard(i, shard);
ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
- proceed_single_shard(shard, last_run, round_start);
+ process_single_shard(shard, last_run, round_start);
} while (is_next_available);
return;
return cls_log_trim(io_ctx, oid, start_time, end_time, from_marker, to_marker);
}
-string RGWRados::objexp_hint_get_shardname(const utime_t &ts)
+string RGWRados::objexp_hint_get_shardname(int shard_num)
{
- const time_t roundedts = ts.sec() / cct->_conf->rgw_objexp_time_step;
- const unsigned int shnum = roundedts % cct->_conf->rgw_objexp_hints_num_shards;
-
char buf[32];
- snprintf(buf, sizeof(buf), "%010u", shnum);
+ snprintf(buf, sizeof(buf), "%010u", shard_num);
string objname("obj_delete_at_hint.");
return objname + buf;
}
+#define MAX_PBJEXP_SHARDS_PRIME 7877
+
+int RGWRados::objexp_key_shard(const rgw_obj_key& key)
+{
+ string obj_key = key.name + key.instance;
+ int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
+ uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+ return sid % num_shards;
+}
+
static string objexp_hint_get_keyext(const string& bucket_name,
const string& bucket_id,
const rgw_obj_key& obj_key)
ObjectWriteOperation op;
cls_timeindex_add(op, delete_at, keyext, hebl);
- string shard_name = objexp_hint_get_shardname(delete_at);
+ string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key));
return objexp_pool_ctx.operate(shard_name, &op);
}
-void RGWRados::objexp_get_shard(const utime_t& start_time,
- const utime_t& end_time,
- utime_t &marker, /* in/out */
- string& shard, /* out */
- bool& truncated) /* out */
+void RGWRados::objexp_get_shard(int shard_num,
+ string& shard) /* out */
{
- if (marker.is_zero()) {
- marker = start_time;
- }
-
- const uint32_t num_shards = cct->_conf->rgw_objexp_hints_num_shards;
- const time_t time_step = cct->_conf->rgw_objexp_time_step;
-
- const time_t sts = start_time.sec() / time_step;
- const time_t ets = end_time.sec() / time_step;
- const time_t mts = marker.sec() / time_step;
-
- const uint32_t periods = ets - sts;
- const uint32_t iters = min(periods, num_shards - 1);
-
- shard = objexp_hint_get_shardname(marker);
-
- if (mts - sts < iters) {
- truncated = true;
- marker += utime_t(time_step, 0);
- } else {
- truncated = false;
- }
-
- return;
+ shard = objexp_hint_get_shardname(shard_num);
}
int RGWRados::objexp_hint_list(const string& oid,
int time_log_trim(const string& oid, const utime_t& start_time, const utime_t& end_time,
const string& from_marker, const string& to_marker);
- string objexp_hint_get_shardname(const utime_t &ts);
- void objexp_get_shard(const utime_t& start_time,
- const utime_t& end_time,
- utime_t &marker, /* out */
- string& shard, /* out */
- bool& truncated); /* out */
+ string objexp_hint_get_shardname(int shard_num);
+ int objexp_key_shard(const rgw_obj_key& key);
+ void objexp_get_shard(int shard_num,
+ string& shard); /* out */
int objexp_hint_add(const utime_t& delete_at,
const string& bucket_name,
const string& bucket_id,