return;
}
-void RGWObjectExpirer::process_single_shard(const string& shard,
+bool RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
{
string marker;
string out_marker;
bool truncated = false;
+ bool done = true;
CephContext *cct = store->ctx();
int num_entries = cct->_conf->rgw_objexp_chunk_size;
int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
if (ret == -EBUSY) { /* already locked by another processor */
dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
- return;
+ return false;
}
do {
real_time rt_last = last_run.to_real_time();
utime_t now = ceph_clock_now(g_ceph_context);
if (now >= end) {
+ done = false;
break;
}
} while (truncated);
l.unlock(&store->objexp_pool_ctx, shard);
- return;
+ return done;
}
-void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start)
+/* Returns true if all shards have been processed successfully. */
+bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start)
{
utime_t shard_marker;
CephContext *cct = store->ctx();
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
+ bool all_done = true;
for (int i = 0; i < num_shards; i++) {
string shard;
ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
- process_single_shard(shard, last_run, round_start);
+ if (! process_single_shard(shard, last_run, round_start)) {
+ all_done = false;
+ }
}
- return;
+ return all_done;
}
bool RGWObjectExpirer::going_down()
do {
utime_t start = ceph_clock_now(cct);
ldout(cct, 2) << "object expiration: start" << dendl;
- oe->inspect_all_shards(last_run, start);
+ if (oe->inspect_all_shards(last_run, start)) {
+ /* All shards have been processed properly. Next time we can start
+ * from this moment. */
+ last_run = start;
+ }
ldout(cct, 2) << "object expiration: stop" << dendl;
- last_run = start;
if (oe->going_down())
break;
const string& from_marker,
const string& to_marker);
- void process_single_shard(const string& shard,
+ bool process_single_shard(const std::string& shard,
const utime_t& last_run,
const utime_t& round_start);
- void inspect_all_shards(const utime_t& last_run,
+ bool inspect_all_shards(const utime_t& last_run,
const utime_t& round_start);
bool going_down();