int init_search = false;
int num_shards = 0;
int max_concurrent_ios = 32;
+ uint64_t orphan_stale_secs = (24 * 3600);
std::string val;
std::ostringstream errs;
num_shards = atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--max-concurrent-ios", (char*)NULL)) {
max_concurrent_ios = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, i, &val, "--orphan-stale-secs", (char*)NULL)) {
+ orphan_stale_secs = (uint64_t)atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--shard-id", (char*)NULL)) {
shard_id = atoi(val.c_str());
specified_shard_id = true;
}
if (opt_cmd == OPT_ORPHANS_FIND) {
- RGWOrphanSearch search(store, max_concurrent_ios);
+ RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs);
if (job_id.empty()) {
cerr << "ERROR: --job-id not specified" << std::endl;
}
if (opt_cmd == OPT_ORPHANS_FINISH) {
- RGWOrphanSearch search(store, max_concurrent_ios);
+ RGWOrphanSearch search(store, max_concurrent_ios, orphan_stale_secs);
if (job_id.empty()) {
cerr << "ERROR: --job-id not specified" << std::endl;
search_info = *info;
search_info.job_name = job_name;
search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
+ search_info.start_time = ceph_clock_now(store->ctx());
search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
RGWOrphanSearchState state;
librados::IoCtx& ioctx = orphan_store.get_ioctx();
+ librados::IoCtx data_ioctx;
+
+ librados::Rados *rados = store->get_rados();
+
+ int ret = rados->ioctx_create(search_info.pool.c_str(), data_ioctx);
+ if (ret < 0) {
+ lderr(store->ctx()) << __func__ << ": ioctx_create() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
+
map<int, string>::iterator liter = linked_objs_index.begin();
map<int, string>::iterator aiter = all_objs_index.begin();
continue;
}
+ time_t mtime;
+ r = data_ioctx.stat(key, NULL, &mtime);
+ if (r < 0) {
+ if (r != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
+ }
+ continue;
+ }
+ if (stale_secs && (uint64_t)mtime >= time_threshold) {
+ ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
+ continue;
+ }
ldout(store->ctx(), 20) << "leaked: " << key << dendl;
cout << "leaked: " << key << std::endl;
} while (!done);
string job_name;
string pool;
uint16_t num_shards;
+ utime_t start_time;
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(job_name, bl);
::encode(pool, bl);
::encode(num_shards, bl);
+ ::encode(start_time, bl);
ENCODE_FINISH(bl);
}
::decode(job_name, bl);
::decode(pool, bl);
::decode(num_shards, bl);
+ ::decode(start_time, bl);
DECODE_FINISH(bl);
}
string index_objs_prefix;
uint16_t max_concurrent_ios;
+ uint64_t stale_secs;
struct log_iter_info {
string oid;
int remove_index(map<int, string>& index);
public:
- RGWOrphanSearch(RGWRados *_store, int _max_ios) : store(_store), orphan_store(store), max_concurrent_ios(_max_ios) {}
+ RGWOrphanSearch(RGWRados *_store, int _max_ios, uint64_t _stale_secs) : store(_store), orphan_store(store), max_concurrent_ios(_max_ios), stale_secs(_stale_secs) {}
int save_state() {
RGWOrphanSearchState state;