From 482b44f9216e66048bdd983a85cc546eff555afc Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 31 Mar 2020 09:23:29 -0400 Subject: [PATCH] rgw: dont use RGWOmapAppend for error_repo the error_repo writes need to be synchronous Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 57 ++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 38 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a8ecbcecf28a0..4090512b2d069 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1252,7 +1252,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { std::optional complete; // obligation to complete uint32_t obligation_counter = 0; RGWDataSyncShardMarkerTrack *marker_tracker; - boost::intrusive_ptr error_repo; + const rgw_raw_obj& error_repo; boost::intrusive_ptr lease_cr; RGWSyncTraceNodeRef tn; @@ -1262,12 +1262,12 @@ public: RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state, rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo, + const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), state(std::move(state)), obligation(std::move(obligation)), - marker_tracker(_marker_tracker), error_repo(_error_repo), + marker_tracker(_marker_tracker), error_repo(error_repo), lease_cr(std::move(lease_cr)) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation; tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key); @@ -1337,20 +1337,20 @@ public: tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode)); } } - if (error_repo && complete->timestamp != ceph::real_time{}) { + if (complete->timestamp != ceph::real_time{}) { tn->log(10, SSTR("writing " << *complete << " to error repo for retry")); - yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(), + yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo, complete->key, complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode)); } } - } else if (error_repo && complete->retry) { - yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(), + } else if (complete->retry) { + yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo, complete->key, complete->timestamp)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to remove omap key from error repo (" - << error_repo->get_obj() << " retcode=" << retcode)); + << error_repo << " retcode=" << retcode)); } } /* FIXME: what do do in case of error */ @@ -1415,9 +1415,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { boost::intrusive_ptr lease_stack; string status_oid; - - string error_oid; - RGWOmapAppend *error_repo = nullptr; + rgw_raw_obj error_repo; std::map error_entries; string error_marker; ceph::real_time entry_timestamp; @@ -1457,21 +1455,18 @@ public: uint32_t _shard_id, rgw_data_sync_marker& _marker, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - pool(_pool), shard_id(_shard_id), sync_marker(_marker), tn(_tn), + pool(_pool), shard_id(_shard_id), sync_marker(_marker), + status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), + error_repo(pool, status_oid + ".retry"), tn(_tn), bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size)) { set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id; - status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id); - error_oid = status_oid + ".retry"; } ~RGWDataSyncShardCR() override { if (lease_cr) { lease_cr->abort(); } - if (error_repo) { - error_repo->put(); - } } void append_modified_shards(set& keys) { @@ -1517,7 +1512,7 @@ public: } auto store = sync_env->store; lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), + rgw_raw_obj(pool, status_oid), lock_name, lock_duration, this)); lease_stack.reset(spawn(lease_cr.get(), false)); } @@ -1545,7 +1540,7 @@ public: entry_timestamp = sync_marker.timestamp; // time when full sync started do { if (!lease_cr->is_locked()) { - stop_spawned_services(); + lease_cr->go_down(); drain_all(); return set_cr_error(-ECANCELED); } @@ -1605,7 +1600,7 @@ public: sync_marker.marker = sync_marker.next_step_marker; sync_marker.next_step_marker.clear(); call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), + rgw_raw_obj(pool, status_oid), sync_marker)); } if (retcode < 0) { @@ -1639,15 +1634,10 @@ public: set_status("lease acquired"); tn->log(10, "took lease"); } - error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, - rgw_raw_obj(pool, error_oid), - 1 /* no buffer */); - error_repo->get(); - spawn(error_repo, false); marker_tracker.emplace(sc, status_oid, sync_marker, tn); do { if (!lease_cr->is_locked()) { - stop_spawned_services(); + lease_cr->go_down(); drain_all(); return set_cr_error(-ECANCELED); } @@ -1674,7 +1664,7 @@ public: if (error_retry_time <= ceph::coarse_real_clock::now()) { /* process bucket shards that previously failed */ omapvals = std::make_shared(); - yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, error_oid), + yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo, error_marker, max_error_entries, omapvals)); error_entries = std::move(omapvals->entries); tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries")); @@ -1685,7 +1675,7 @@ public: retcode = parse_bucket_key(error_marker, source_bs); if (retcode < 0) { tn->log(1, SSTR("failed to parse bucket shard: " << error_marker)); - spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, rgw_raw_obj{pool, error_oid}, + spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo, error_marker, entry_timestamp), false); continue; } @@ -1714,7 +1704,7 @@ public: &next_marker, &log_entries, &truncated)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode)); - stop_spawned_services(); + lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -1783,15 +1773,6 @@ public: // convert timespan -> time_point -> utime_t return utime_t(ceph::coarse_real_clock::zero() + interval); } - - void stop_spawned_services() { - lease_cr->go_down(); - if (error_repo) { - error_repo->finish(); - error_repo->put(); - error_repo = NULL; - } - } }; class RGWDataSyncShardControlCR : public RGWBackoffControlCR { -- 2.39.5