class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- RGWRESTReadResource *http_op;
+ RGWRESTReadResource *http_op = nullptr;
int shard_id;
- string *pmarker;
+ const std::string& marker;
+ string *pnext_marker;
list<rgw_data_change_log_entry> *entries;
bool *truncated;
read_remote_data_log_response response;
public:
- RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
- int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- http_op(NULL),
- shard_id(_shard_id),
- pmarker(_pmarker),
- entries(_entries),
- truncated(_truncated) {
+ RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
+ const std::string& marker, string *pnext_marker,
+ list<rgw_data_change_log_entry> *_entries,
+ bool *_truncated)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
+ entries(_entries), truncated(_truncated) {
}
~RGWReadRemoteDataLogShardCR() override {
if (http_op) {
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "data" },
{ "id", buf },
- { "marker", pmarker->c_str() },
+ { "marker", marker.c_str() },
{ "extra-info", "true" },
{ NULL, NULL } };
}
entries->clear();
entries->swap(response.entries);
- *pmarker = response.marker;
+ *pnext_marker = response.marker;
*truncated = response.truncated;
return set_cr_done();
}
RGWDataSyncShardMarkerTrack *marker_tracker;
+ std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
list<rgw_data_change_log_entry>::iterator log_iter;
bool truncated;
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
rgw_pool& _pool,
- uint32_t _shard_id, rgw_data_sync_marker& _marker,
+ uint32_t _shard_id, const rgw_data_sync_marker& _marker,
RGWSyncTraceNodeRef& _tn,
bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
spawned_keys.clear();
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
+ &next_marker, &log_entries, &truncated));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
stop_spawned_services();
}
}
- tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated));
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
+ << " next_marker=" << next_marker << " truncated=" << truncated));
+ if (!next_marker.empty()) {
+ sync_marker.marker = next_marker;
+ } else if (!log_entries.empty()) {
+ sync_marker.marker = log_entries.back().log_id;
+ }
if (!truncated) {
// we reached the end, wait a while before checking for more
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
rgw_data_sync_marker* sync_marker;
int count;
+ std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
bool truncated;
marker = sync_marker->marker;
count = 0;
do{
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
+ &next_marker, &log_entries, &truncated));
if (retcode == -ENOENT) {
break;