sync_marker.timestamp = timestamp;
}
- ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " realm_epoch=" << sync_marker.realm_epoch << dendl;
RGWRados *store = sync_env->store;
return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados,
store,
const rgw_pool& pool;
const std::string& period; //< currently syncing period id
+ const epoch_t realm_epoch; //< realm_epoch of period
RGWMetadataLog* mdlog; //< log of syncing period
uint32_t shard_id;
rgw_meta_sync_marker& sync_marker;
public:
RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
- const std::string& period, RGWMetadataLog* mdlog,
- uint32_t _shard_id, rgw_meta_sync_marker& _marker,
+ const std::string& period, epoch_t realm_epoch,
+ RGWMetadataLog* mdlog, uint32_t _shard_id,
+ rgw_meta_sync_marker& _marker,
const std::string& period_marker, bool *_reset_backoff)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
- period(period), mdlog(mdlog), shard_id(_shard_id), sync_marker(_marker),
+ period(period), realm_epoch(realm_epoch), mdlog(mdlog),
+ shard_id(_shard_id), sync_marker(_marker),
period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
reset_backoff(_reset_backoff) {
*reset_backoff = false;
temp_marker->state = rgw_meta_sync_marker::IncrementalSync;
temp_marker->marker = std::move(temp_marker->next_step_marker);
temp_marker->next_step_marker.clear();
- ldout(sync_env->cct, 0) << *this << ": saving marker pos=" << temp_marker->marker << dendl;
+ temp_marker->realm_epoch = realm_epoch;
+ ldout(sync_env->cct, 0) << *this << ": saving marker pos=" << temp_marker->marker << " realm_epoch=" << realm_epoch << dendl;
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
yield call(new WriteMarkerCR(sync_env->async_rados, sync_env->store,
yield;
}
}
+ // if the period has advanced, we can't use the existing marker
+ if (sync_marker.realm_epoch < realm_epoch) {
+ ldout(sync_env->cct, 0) << "clearing marker=" << sync_marker.marker
+ << " from old realm_epoch=" << sync_marker.realm_epoch
+ << " (now " << realm_epoch << ')' << dendl;
+ sync_marker.realm_epoch = realm_epoch;
+ sync_marker.marker.clear();
+ }
mdlog_marker = sync_marker.marker;
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
sync_env->shard_obj_name(shard_id),
}
#define INCREMENTAL_MAX_ENTRIES 100
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
- if (!period_marker.empty() && period_marker <= marker) {
+ if (!period_marker.empty() && period_marker <= mdlog_marker) {
+ ldout(cct, 10) << "mdlog_marker past period_marker=" << period_marker << dendl;
done_with_period = true;
break;
}
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
if (done_with_period) {
// return control to RGWMetaSyncCR and advance to the next period
+ ldout(sync_env->cct, 10) << *this << ": done with period" << dendl;
break;
}
if (mdlog_marker == max_marker && can_adjust_marker) {
if (!can_adjust_marker) {
return -EAGAIN;
}
+
+ return set_cr_done();
}
/* TODO */
return 0;
const rgw_pool& pool;
const std::string& period;
+ epoch_t realm_epoch;
RGWMetadataLog* mdlog;
uint32_t shard_id;
rgw_meta_sync_marker sync_marker;
static constexpr bool exit_on_error = false; // retry on all errors
public:
RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_pool& _pool,
- const std::string& period, RGWMetadataLog* mdlog,
- uint32_t _shard_id, const rgw_meta_sync_marker& _marker,
+ const std::string& period, epoch_t realm_epoch,
+ RGWMetadataLog* mdlog, uint32_t _shard_id,
+ const rgw_meta_sync_marker& _marker,
std::string&& period_marker)
: RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env),
- pool(_pool), period(period), mdlog(mdlog), shard_id(_shard_id),
- sync_marker(_marker), period_marker(std::move(period_marker)) {}
+ pool(_pool), period(period), realm_epoch(realm_epoch), mdlog(mdlog),
+ shard_id(_shard_id), sync_marker(_marker),
+ period_marker(std::move(period_marker)) {}
RGWCoroutine *alloc_cr() override {
- return new RGWMetaSyncShardCR(sync_env, pool, period, mdlog, shard_id,
- sync_marker, period_marker, backoff_ptr());
+ return new RGWMetaSyncShardCR(sync_env, pool, period, realm_epoch, mdlog,
+ shard_id, sync_marker, period_marker, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() override {
yield {
// get the mdlog for the current period (may be empty)
auto& period_id = sync_status.sync_info.period;
+ auto realm_epoch = sync_status.sync_info.realm_epoch;
auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
// prevent wakeup() from accessing shard_crs while we're spawning them
}
}
- auto cr = new RGWMetaSyncShardControlCR(sync_env, pool, period_id,
- mdlog, shard_id, marker,
- std::move(period_marker));
+ using ShardCR = RGWMetaSyncShardControlCR;
+ auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
+ mdlog, shard_id, marker,
+ std::move(period_marker));
auto stack = spawn(cr, false);
shard_crs[shard_id] = RefPair{cr, stack};
}