uint32_t shard_id;
rgw_data_sync_marker& sync_marker;
- RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
- std::set<std::string> entries;
- std::set<std::string>::iterator iter;
+ RGWRadosGetOmapValsCR::ResultPtr omapvals;
+ std::map<std::string, bufferlist> entries;
+ std::map<std::string, bufferlist>::iterator iter;
string oid;
string error_oid;
RGWOmapAppend *error_repo;
- std::set<std::string> error_entries;
+ std::map<std::string, bufferlist> error_entries;
string error_marker;
int max_error_entries;
drain_all();
return set_cr_error(-ECANCELED);
}
- omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
- sync_marker.marker, max_entries, omapkeys));
+ omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+ yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid),
+ sync_marker.marker, max_entries, omapvals));
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
- entries = std::move(omapkeys->entries);
+ entries = std::move(omapvals->entries);
if (entries.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
- tn->log(20, SSTR("full sync: " << *iter));
+ tn->log(20, SSTR("full sync: " << iter->first));
total_entries++;
- if (!marker_tracker->start(*iter, total_entries, real_time())) {
- tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
+ if (!marker_tracker->start(iter->first, total_entries, real_time())) {
+ tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sc, *iter, *iter, marker_tracker, error_repo, false, tn), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false);
}
- sync_marker.marker = *iter;
+ sync_marker.marker = iter->first;
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
}
}
}
- } while (omapkeys->more);
- omapkeys.reset();
+ } while (omapvals->more);
+ omapvals.reset();
drain_all_but_stack(lease_stack.get());
if (error_retry_time <= ceph::coarse_real_clock::now()) {
/* process bucket shards that previously failed */
- omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, max_error_entries, omapkeys));
- error_entries = std::move(omapkeys->entries);
+ omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+ yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ error_marker, max_error_entries, omapvals));
+ error_entries = std::move(omapvals->entries);
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
- error_marker = *iter;
+ error_marker = iter->first;
tn->log(20, SSTR("handle error entry: " << error_marker));
spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
- if (!omapkeys->more) {
+ if (!omapvals->more) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
error_marker.clear();
}
}
- omapkeys.reset();
+ omapvals.reset();
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,