#define dout_subsys ceph_subsys_rgw
-#define DEFAULT_NUM_SHARDS 10
+#define DEFAULT_NUM_SHARDS 1000
-int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState& state)
+int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
{
set<string> keys;
map<string, bufferlist> vals;
search_info = *info;
search_info.job_name = job_name;
search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
- search_state = ORPHAN_SEARCH_INIT;
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
r = save_state();
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
}
search_info = state.info;
- search_state = state.state;
+ search_stage = state.stage;
}
index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
return ret;
}
-int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id)
+int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
{
ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
RGWBucketInfo bucket_info;
bool truncated;
deque<RGWRados::Object::Stat> stat_ops;
- map<int, list<string> > oids;
int count = 0;
}
}
- ret = log_oids(linked_objs_index, oids);
- if (ret < 0) {
- cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
- return ret;
- }
-
return 0;
}
int RGWOrphanSearch::build_linked_oids_index()
{
- string marker;
-
- map<int, string>::iterator iter;
- for (iter = buckets_instance_index.begin(); iter != buckets_instance_index.end(); ++iter) {
+ map<int, list<string> > oids;
+ map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
+ for (; iter != buckets_instance_index.end(); ++iter) {
bool truncated;
string oid = iter->second;
do {
map<string, bufferlist> entries;
- int ret = orphan_store.read_entries(oid, marker, &entries, &truncated);
+ int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
if (ret == -ENOENT) {
truncated = false;
ret = 0;
break;
}
- marker = entries.rbegin()->first; /* last entry */
-
for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
- ret = build_linked_oids_for_bucket(eiter->first);
+ ret = build_linked_oids_for_bucket(eiter->first, oids);
}
+
+ search_stage.shard = iter->first;
+ search_stage.marker = entries.rbegin()->first; /* last entry */
} while (truncated);
+
+ search_stage.marker.clear();
+ }
+
+ int ret = log_oids(linked_objs_index, oids);
+ if (ret < 0) {
+ cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
+ return ret;
}
+ save_state();
+
return 0;
}
{
int r;
- switch (search_state) {
+cout << "search_stage.marker=" << search_stage.marker << " search_stage.shard=" << search_stage.shard << std::endl;
+
+ switch (search_stage.stage) {
- case ORPHAN_SEARCH_INIT:
+ case ORPHAN_SEARCH_STAGE_INIT:
ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
- search_state = ORPHAN_SEARCH_LSPOOL;
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
r = save_state();
if (r < 0) {
lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
return r;
}
// fall through
- case ORPHAN_SEARCH_LSPOOL:
+ case ORPHAN_SEARCH_STAGE_LSPOOL:
ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
r = build_all_oids_index();
if (r < 0) {
return r;
}
- search_state = ORPHAN_SEARCH_LSBUCKETS;
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
r = save_state();
if (r < 0) {
lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
}
// fall through
- case ORPHAN_SEARCH_LSBUCKETS:
+ case ORPHAN_SEARCH_STAGE_LSBUCKETS:
ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
r = build_buckets_instance_index();
if (r < 0) {
return r;
}
- search_state = ORPHAN_SEARCH_ITERATE_BI;
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
r = save_state();
if (r < 0) {
lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
// fall through
- case ORPHAN_SEARCH_ITERATE_BI:
+ case ORPHAN_SEARCH_STAGE_ITERATE_BI:
ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
r = build_linked_oids_index();
if (r < 0) {
return r;
}
- case ORPHAN_SEARCH_DONE:
+ search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
+ r = save_state();
+ if (r < 0) {
+ lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
+ return r;
+ }
+ // fall through
+
+ case ORPHAN_SEARCH_STAGE_COMPARE:
break;
default:
#define RGW_ORPHAN_INDEX_PREFIX "orphan.scan"
-enum OrphanSearchState {
- ORPHAN_SEARCH_UNKNOWN = 0,
- ORPHAN_SEARCH_INIT = 1,
- ORPHAN_SEARCH_LSPOOL = 2,
- ORPHAN_SEARCH_LSBUCKETS = 3,
- ORPHAN_SEARCH_ITERATE_BI = 4,
- ORPHAN_SEARCH_DONE = 5,
+enum RGWOrphanSearchStageId {
+ ORPHAN_SEARCH_STAGE_UNKNOWN = 0,
+ ORPHAN_SEARCH_STAGE_INIT = 1,
+ ORPHAN_SEARCH_STAGE_LSPOOL = 2,
+ ORPHAN_SEARCH_STAGE_LSBUCKETS = 3,
+ ORPHAN_SEARCH_STAGE_ITERATE_BI = 4,
+ ORPHAN_SEARCH_STAGE_COMPARE = 5,
};
+struct RGWOrphanSearchStage {
+ RGWOrphanSearchStageId stage;
+ int shard;
+ string marker;
+
+ RGWOrphanSearchStage() : stage(ORPHAN_SEARCH_STAGE_UNKNOWN), shard(0) {}
+ RGWOrphanSearchStage(RGWOrphanSearchStageId _stage) : stage(_stage), shard(0) {}
+ RGWOrphanSearchStage(RGWOrphanSearchStageId _stage, int _shard, const string& _marker) : stage(_stage), shard(_shard), marker(_marker) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode((int)stage, bl);
+ ::encode(shard, bl);
+ ::encode(marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ int s;
+ ::decode(s, bl);
+ stage = (RGWOrphanSearchStageId)s;
+ ::decode(shard, bl);
+ ::decode(marker, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(RGWOrphanSearchStage)
+
struct RGWOrphanSearchInfo {
string job_name;
string pool;
struct RGWOrphanSearchState {
RGWOrphanSearchInfo info;
- OrphanSearchState state;
- bufferlist state_info;
+ RGWOrphanSearchStage stage;
- RGWOrphanSearchState() : state(ORPHAN_SEARCH_UNKNOWN) {}
+ RGWOrphanSearchState() : stage(ORPHAN_SEARCH_STAGE_UNKNOWN) {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
::encode(info, bl);
- ::encode((int)state, bl);
- ::encode(state_info, bl);
+ ::encode(stage, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
::decode(info, bl);
- int s;
- ::decode(s, bl);
- state = (OrphanSearchState)s;
- ::decode(state_info, bl);
+ ::decode(stage, bl);
DECODE_FINISH(bl);
}
RGWOrphanStore orphan_store;
RGWOrphanSearchInfo search_info;
- OrphanSearchState search_state;
+ RGWOrphanSearchStage search_stage;
map<int, string> all_objs_index;
map<int, string> buckets_instance_index;
int save_state() {
RGWOrphanSearchState state;
state.info = search_info;
- state.state = search_state;
+ state.stage = search_stage;
return orphan_store.write_job(search_info.job_name, state);
}
int build_all_oids_index();
int build_buckets_instance_index();
- int build_linked_oids_for_bucket(const string& bucket_instance_id);
+ int build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids);
int build_linked_oids_index();
int run();