std::optional<rgw_data_sync_obligation> complete; // obligation to complete
uint32_t obligation_counter = 0;
RGWDataSyncShardMarkerTrack *marker_tracker;
- boost::intrusive_ptr<RGWOmapAppend> error_repo;
+ const rgw_raw_obj& error_repo;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
RGWSyncTraceNodeRef tn;
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
rgw_data_sync_obligation obligation,
RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo,
+ const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
state(std::move(state)), obligation(std::move(obligation)),
- marker_tracker(_marker_tracker), error_repo(_error_repo),
+ marker_tracker(_marker_tracker), error_repo(error_repo),
lease_cr(std::move(lease_cr)) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
}
- if (error_repo && complete->timestamp != ceph::real_time{}) {
+ if (complete->timestamp != ceph::real_time{}) {
tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
- yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+ yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo,
complete->key, complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
}
}
- } else if (error_repo && complete->retry) {
- yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+ } else if (complete->retry) {
+ yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
complete->key, complete->timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
- << error_repo->get_obj() << " retcode=" << retcode));
+ << error_repo << " retcode=" << retcode));
}
}
/* FIXME: what do do in case of error */
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
string status_oid;
-
- string error_oid;
- RGWOmapAppend *error_repo = nullptr;
+ rgw_raw_obj error_repo;
std::map<std::string, bufferlist> error_entries;
string error_marker;
ceph::real_time entry_timestamp;
uint32_t _shard_id, rgw_data_sync_marker& _marker,
RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- pool(_pool), shard_id(_shard_id), sync_marker(_marker), tn(_tn),
+ pool(_pool), shard_id(_shard_id), sync_marker(_marker),
+ status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
+ error_repo(pool, status_oid + ".retry"), tn(_tn),
bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
{
set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
- status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id);
- error_oid = status_oid + ".retry";
}
~RGWDataSyncShardCR() override {
if (lease_cr) {
lease_cr->abort();
}
- if (error_repo) {
- error_repo->put();
- }
}
void append_modified_shards(set<string>& keys) {
}
auto store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
+ rgw_raw_obj(pool, status_oid),
lock_name, lock_duration, this));
lease_stack.reset(spawn(lease_cr.get(), false));
}
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
if (!lease_cr->is_locked()) {
- stop_spawned_services();
+ lease_cr->go_down();
drain_all();
return set_cr_error(-ECANCELED);
}
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
+ rgw_raw_obj(pool, status_oid),
sync_marker));
}
if (retcode < 0) {
set_status("lease acquired");
tn->log(10, "took lease");
}
- error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
- rgw_raw_obj(pool, error_oid),
- 1 /* no buffer */);
- error_repo->get();
- spawn(error_repo, false);
marker_tracker.emplace(sc, status_oid, sync_marker, tn);
do {
if (!lease_cr->is_locked()) {
- stop_spawned_services();
+ lease_cr->go_down();
drain_all();
return set_cr_error(-ECANCELED);
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
/* process bucket shards that previously failed */
omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
- yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+ yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo,
error_marker, max_error_entries, omapvals));
error_entries = std::move(omapvals->entries);
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
retcode = parse_bucket_key(error_marker, source_bs);
if (retcode < 0) {
tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
- spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, rgw_raw_obj{pool, error_oid},
+ spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
error_marker, entry_timestamp), false);
continue;
}
&next_marker, &log_entries, &truncated));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
- stop_spawned_services();
+ lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
// convert timespan -> time_point -> utime_t
return utime_t(ceph::coarse_real_clock::zero() + interval);
}
-
- void stop_spawned_services() {
- lease_cr->go_down();
- if (error_repo) {
- error_repo->finish();
- error_repo->put();
- error_repo = NULL;
- }
- }
};
class RGWDataSyncShardControlCR : public RGWBackoffControlCR {