list<string> sections;
list<string>::iterator sections_iter;
- list<string> result;
+
+ struct meta_list_result {
+ list<string> keys;
+ string marker;
+ uint64_t count{0};
+ bool truncated{false};
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("keys", keys, obj);
+ JSONDecoder::decode_json("marker", marker, obj);
+ JSONDecoder::decode_json("count", count, obj);
+ JSONDecoder::decode_json("truncated", truncated, obj);
+ }
+ } result;
list<string>::iterator iter;
std::unique_ptr<RGWShardedOmapCRManager> entries_index;
bool lost_lock;
bool failed;
+ string marker;
+
map<uint32_t, rgw_meta_sync_marker>& markers;
public:
rearrange_sections();
sections_iter = sections.begin();
for (; sections_iter != sections.end(); ++sections_iter) {
- yield {
- string entrypoint = string("/admin/metadata/") + *sections_iter;
- /* FIXME: need a better scaling solution here, requires streaming output */
- call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
- entrypoint, NULL, &result));
- }
- if (get_ret_status() < 0) {
- ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
- yield entries_index->finish();
- yield lease_cr->go_down();
- drain_all();
- return set_cr_error(get_ret_status());
- }
- iter = result.begin();
- for (; iter != result.end(); ++iter) {
- if (!lease_cr->is_locked()) {
- lost_lock = true;
- break;
+ do {
+ yield {
+#define META_FULL_SYNC_CHUNK_SIZE "1000"
+ string entrypoint = string("/admin/metadata/") + *sections_iter;
+ rgw_http_param_pair pairs[] = { { "max-entries", META_FULL_SYNC_CHUNK_SIZE },
+ { "marker", result.marker.c_str() },
+ { NULL, NULL } };
+ result.keys.clear();
+ call(new RGWReadRESTResourceCR<meta_list_result >(cct, conn, sync_env->http_manager,
+ entrypoint, pairs, &result));
}
- yield; // allow entries_index consumer to make progress
-
- ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
- string s = *sections_iter + ":" + *iter;
- int shard_id;
- RGWRados *store = sync_env->store;
- int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
- ret_status = ret;
- break;
+ if (get_ret_status() < 0) {
+ ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
+ yield entries_index->finish();
+ yield lease_cr->go_down();
+ drain_all();
+ return set_cr_error(get_ret_status());
}
- if (!entries_index->append(s, shard_id)) {
- break;
+ iter = result.keys.begin();
+ for (; iter != result.keys.end(); ++iter) {
+ if (!lease_cr->is_locked()) {
+ lost_lock = true;
+ break;
+ }
+ yield; // allow entries_index consumer to make progress
+
+ ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
+ string s = *sections_iter + ":" + *iter;
+ int shard_id;
+ RGWRados *store = sync_env->store;
+ int ret = store->meta_mgr->get_log_shard_id(*sections_iter, *iter, &shard_id);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: could not determine shard id for " << *sections_iter << ":" << *iter << dendl;
+ ret_status = ret;
+ break;
+ }
+ if (!entries_index->append(s, shard_id)) {
+ break;
+ }
}
- }
+ } while (result.truncated);
}
yield {
if (!entries_index->finish()) {