From 7fb98eaf0a1864021919ee02b87080887c5ffae0 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 31 Mar 2020 09:23:23 -0400 Subject: [PATCH] rgw: use objv_tracker to read/write bucket sync status use cls_version on bucket sync status to detect racing writes - whether from other gateways, or from radosgw-admin commands like 'bucket sync' or 'bucket sync init' classes that require a non-null version tracker take it by reference Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 104 +++++++++++++++++++++------------------ src/rgw/rgw_data_sync.h | 2 +- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8f1ec4f9d5f74..918395f65c860 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -994,6 +994,7 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; RGWMetaSyncEnv meta_sync_env; + RGWObjVersionTracker objv_tracker; ceph::real_time* progress; const std::string status_oid; @@ -2767,16 +2768,17 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { const string sync_status_oid; rgw_bucket_shard_sync_info& status; - + RGWObjVersionTracker& objv_tracker; rgw_bucket_index_marker_info info; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, - rgw_bucket_shard_sync_info& _status) + rgw_bucket_shard_sync_info& _status, + RGWObjVersionTracker& objv_tracker) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)), - status(_status) + status(_status), objv_tracker(objv_tracker) {} int operate() override { @@ -2817,9 +2819,9 @@ public: if (write_status) { map attrs; status.encode_all_attrs(attrs); - call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs)); + call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker)); } else { - call(new RGWRadosRemoveCR(store, obj)); + call(new RGWRadosRemoveCR(store, obj, &objv_tracker)); } } if (info.syncstopped) { @@ -2869,12 +2871,12 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp, sc.init(sync_env, conn, source_zone); } -RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num) +RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker) { if ((size_t)num >= sync_pairs.size()) { return nullptr; } - return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status); + return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker); } #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." @@ -2941,15 +2943,17 @@ class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; string oid; rgw_bucket_shard_sync_info *status; - + RGWObjVersionTracker* objv_tracker; map attrs; public: RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& sync_pair, - rgw_bucket_shard_sync_info *_status) + rgw_bucket_shard_sync_info *_status, + RGWObjVersionTracker* objv_tracker) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)), - status(_status) {} + status(_status), objv_tracker(objv_tracker) + {} int operate() override; }; @@ -2958,7 +2962,7 @@ int RGWReadBucketPipeSyncStatusCoroutine::operate() reenter(this) { yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid), - &attrs, true)); + &attrs, true, objv_tracker)); if (retcode == -ENOENT) { *status = rgw_bucket_shard_sync_info(); return set_cr_done(); @@ -3153,7 +3157,7 @@ RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_sh return nullptr; } - return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status); + return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr); } RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store, @@ -3351,20 +3355,19 @@ class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), - marker_oid(_marker_oid), - sync_marker(_marker) {} - - void set_tn(RGWSyncTraceNodeRef& _tn) { - tn = _tn; - } + const string& _marker_oid, + const rgw_bucket_shard_full_sync_marker& _marker, + RGWSyncTraceNodeRef tn, + RGWObjVersionTracker& objv_tracker) + : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), + sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid), + sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker) + {} RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override { sync_marker.position = new_marker; @@ -3376,7 +3379,7 @@ public: tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), - attrs); + attrs, &objv_tracker); } RGWOrderCallCR *allocate_order_control_cr() override { @@ -3390,14 +3393,14 @@ class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine { rgw_raw_obj obj; rgw_bucket_shard_inc_sync_marker sync_marker; ceph::real_time* stable_timestamp; - RGWObjVersionTracker* objv_tracker; + RGWObjVersionTracker& objv_tracker; std::map attrs; public: RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env, const rgw_raw_obj& obj, const rgw_bucket_shard_inc_sync_marker& sync_marker, ceph::real_time* stable_timestamp, - RGWObjVersionTracker* objv_tracker) + RGWObjVersionTracker& objv_tracker) : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj), sync_marker(sync_marker), stable_timestamp(stable_timestamp), objv_tracker(objv_tracker) @@ -3407,7 +3410,7 @@ class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine { sync_marker.encode_attr(attrs); yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, - obj, attrs, objv_tracker)); + obj, attrs, &objv_tracker)); if (retcode < 0) { return set_cr_error(retcode); } @@ -3437,6 +3440,7 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack pending_olh; // object names with pending olh operations RGWSyncTraceNodeRef tn; + RGWObjVersionTracker& objv_tracker; ceph::real_time* stable_timestamp; void handle_finish(const string& marker) override { @@ -3457,15 +3461,15 @@ public: RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc, const string& _marker_oid, const rgw_bucket_shard_inc_sync_marker& _marker, + RGWSyncTraceNodeRef tn, + RGWObjVersionTracker& objv_tracker, ceph::real_time* stable_timestamp) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), sc(_sc), sync_env(_sc->env), obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid), - sync_marker(_marker), stable_timestamp(stable_timestamp) + sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker), + stable_timestamp(stable_timestamp) {} - void set_tn(RGWSyncTraceNodeRef& _tn) { - tn = _tn; - } RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { sync_marker.position = new_marker; @@ -3473,7 +3477,7 @@ public: tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp)); return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker, - stable_timestamp, nullptr); + stable_timestamp, objv_tracker); } /* @@ -3683,7 +3687,6 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { bucket_list_result list_result; list::iterator entries_iter; rgw_bucket_shard_sync_info& sync_info; - RGWBucketFullSyncShardMarkerTrack marker_tracker; rgw_obj_key list_marker; bucket_list_entry *entry{nullptr}; @@ -3696,6 +3699,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { rgw_zone_set zones_trace; RGWSyncTraceNodeRef tn; + RGWBucketFullSyncShardMarkerTrack marker_tracker; struct _prefix_handler { RGWBucketSyncFlowManager::pipe_rules_ref rules; @@ -3747,16 +3751,17 @@ public: const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, - RGWSyncTraceNodeRef tn_parent) + RGWSyncTraceNodeRef tn_parent, + RGWObjVersionTracker& objv_tracker) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), lease_cr(lease_cr), sync_info(sync_info), - marker_tracker(sc, status_oid, sync_info.full_marker), status_oid(status_oid), tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync", - SSTR(bucket_shard_str{bs}))) { + SSTR(bucket_shard_str{bs}))), + marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker) + { zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key()); - marker_tracker.set_tn(tn); prefix_handler.set_rules(sync_pipe.get_rules()); } @@ -3896,9 +3901,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { rgw_bucket_shard_sync_info& sync_info; rgw_obj_key key; rgw_bi_log_entry *entry{nullptr}; - RGWBucketIncSyncShardMarkerTrack marker_tracker; bool updated_status{false}; - const string& status_oid; rgw_zone_id zone_id; string target_location_key; @@ -3908,6 +3911,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { bool syncstopped{false}; RGWSyncTraceNodeRef tn; + RGWBucketIncSyncShardMarkerTrack marker_tracker; public: RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc, @@ -3916,19 +3920,20 @@ public: RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent, + RGWObjVersionTracker& objv_tracker, ceph::real_time* stable_timestamp) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), lease_cr(lease_cr), sync_info(sync_info), - marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp), - status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id), + zone_id(sync_env->svc->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", - SSTR(bucket_shard_str{bs}))) + SSTR(bucket_shard_str{bs}))), + marker_tracker(sc, status_oid, sync_info.inc_marker, tn, + objv_tracker, stable_timestamp) { set_description() << "bucket shard incremental sync bucket=" << bucket_shard_str{bs}; set_status("init"); - marker_tracker.set_tn(tn); rules = sync_pipe.get_rules(); target_location_key = sync_pipe.info.dest_bs.bucket.get_key(); } @@ -4745,7 +4750,7 @@ int RGWRunBucketSyncCoroutine::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status)); + yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -4778,7 +4783,7 @@ int RGWRunBucketSyncCoroutine::operate() do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit || sync_status.state == rgw_bucket_shard_sync_info::StateStopped) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status)); + yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock @@ -4801,7 +4806,7 @@ int RGWRunBucketSyncCoroutine::operate() if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe, status_oid, lease_cr.get(), - sync_status, tn)); + sync_status, tn, objv_tracker)); if (retcode < 0) { tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode)); lease_cr->go_down(); @@ -4814,7 +4819,7 @@ int RGWRunBucketSyncCoroutine::operate() yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, status_oid, lease_cr.get(), sync_status, tn, - progress)); + objv_tracker, progress)); if (retcode < 0) { tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); lease_cr->go_down(); @@ -4899,12 +4904,15 @@ int RGWBucketPipeSyncStatusManager::init() int RGWBucketPipeSyncStatusManager::init_sync_status() { list stacks; + // pass an empty objv tracker to each so that the version gets incremented + std::list objvs; for (auto& mgr : source_mgrs) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); for (int i = 0; i < mgr->num_pipes(); ++i) { - stack->call(mgr->init_sync_status_cr(i)); + objvs.emplace_back(); + stack->call(mgr->init_sync_status_cr(i, objvs.back())); } stacks.push_back(stack); @@ -5039,7 +5047,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { } sync_pair.source_bs = source_bs; sync_pair.dest_bs = dest_bs; - spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false); + spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false); ++i; ++source_bs.shard_id; if (shard_to_shard_sync) { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 7c00db3efa646..18d52b03197f4 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -603,7 +603,7 @@ public: const rgw_bucket& dest_bucket); RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status); - RGWCoroutine *init_sync_status_cr(int num); + RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker); RGWCoroutine *run_sync_cr(int num); int num_pipes() { -- 2.39.5