From 799bd1bcac45aaa3b63813eed05a6fc8cee05bad Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 6 Oct 2020 17:59:41 -0400 Subject: [PATCH] rgw: move full sync from SyncBucketShard to SyncBucket renamed ListBucketShardCR to ListRemoteBucketCR and removed the shard-id parameter renamed BucketFullSyncShardMarkerTrack to BucketFullSyncMarkerTrack, which now updates the bucket-level rgw_bucket_sync_status renamed BucketShardFullSyncCR to BucketFullSyncCR BucketSyncCR now takes a bucket-wide lease during full sync Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 284 ++++++++++++++++++++------------------- 1 file changed, 143 insertions(+), 141 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index cb469a8793a09..78dda886eb476 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3453,34 +3453,30 @@ struct bucket_list_result { } }; -class RGWListBucketShardCR: public RGWCoroutine { +class RGWListRemoteBucketCR: public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; const rgw_bucket_shard& bs; - const string instance_key; rgw_obj_key marker_position; bucket_list_result *result; public: - RGWListBucketShardCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, - rgw_obj_key& _marker_position, bucket_list_result *_result) + RGWListRemoteBucketCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, + rgw_obj_key& _marker_position, bucket_list_result *_result) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs), - instance_key(bs.get_key()), marker_position(_marker_position), - result(_result) {} + marker_position(_marker_position), result(_result) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield { - rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() }, - { "versions" , NULL }, + rgw_http_param_pair pairs[] = { { "versions" , NULL }, { "format" , "json" }, { "objs-container" , "true" }, { "key-marker" , marker_position.name.c_str() }, { "version-id-marker" , marker_position.instance.c_str() }, { NULL, NULL } }; - // don't include tenant in the url, it's already part of instance_key - string p = string("/") + bs.bucket.name; + string p = string("/") + bs.bucket.get_key(':', 0); call(new RGWReadRESTResourceCR(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result)); } if (retcode < 0) { @@ -3536,37 +3532,35 @@ public: #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 -class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { +class RGWBucketFullSyncMarkerTrack : public RGWSyncShardMarkerTrack { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; - string marker_oid; - rgw_bucket_shard_full_sync_marker sync_marker; + const rgw_raw_obj& status_obj; + rgw_bucket_sync_status& sync_status; RGWSyncTraceNodeRef tn; RGWObjVersionTracker& objv_tracker; public: - RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc, - const string& _marker_oid, - const rgw_bucket_shard_full_sync_marker& _marker, - RGWSyncTraceNodeRef tn, - RGWObjVersionTracker& objv_tracker) + RGWBucketFullSyncMarkerTrack(RGWDataSyncCtx *_sc, + const rgw_raw_obj& status_obj, + rgw_bucket_sync_status& sync_status, + RGWSyncTraceNodeRef tn, + RGWObjVersionTracker& objv_tracker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), - sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid), - sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker) + sc(_sc), sync_env(_sc->env), status_obj(status_obj), + sync_status(sync_status), tn(std::move(tn)), objv_tracker(objv_tracker) {} - RGWCoroutine* store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override { - sync_marker.position = new_marker; - sync_marker.count = index_pos; - map attrs; - sync_marker.encode_attr(attrs); + RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override { + sync_status.full.position = new_marker; + sync_status.full.count = index_pos; - tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); - return new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), - attrs, &objv_tracker); + tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker)); + return new RGWSimpleRadosWriteCR( + sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, + status_obj, sync_status, &objv_tracker); } RGWOrderCallCR *allocate_order_control_cr() override { @@ -3865,28 +3859,29 @@ done: #define BUCKET_SYNC_SPAWN_WINDOW 20 -class RGWBucketShardFullSyncCR : public RGWCoroutine { +class RGWBucketFullSyncCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; + rgw_bucket_sync_status& sync_status; rgw_bucket_shard& bs; boost::intrusive_ptr lease_cr; bucket_list_result list_result; list::iterator entries_iter; - rgw_bucket_shard_sync_info& sync_info; rgw_obj_key list_marker; bucket_list_entry *entry{nullptr}; int total_entries{0}; - int sync_status{0}; + int sync_result{0}; - const string& status_oid; + const rgw_raw_obj& status_obj; + RGWObjVersionTracker& objv; rgw_zone_set zones_trace; RGWSyncTraceNodeRef tn; - RGWBucketFullSyncShardMarkerTrack marker_tracker; + RGWBucketFullSyncMarkerTrack marker_tracker; struct _prefix_handler { RGWBucketSyncFlowManager::pipe_rules_ref rules; @@ -3933,20 +3928,20 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { } prefix_handler; public: - RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc, - rgw_bucket_sync_pipe& _sync_pipe, - const std::string& status_oid, - boost::intrusive_ptr lease_cr, - rgw_bucket_shard_sync_info& sync_info, - RGWSyncTraceNodeRef tn_parent, - RGWObjVersionTracker& objv_tracker) + RGWBucketFullSyncCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, + const rgw_raw_obj& status_obj, + boost::intrusive_ptr lease_cr, + rgw_bucket_sync_status& sync_status, + RGWSyncTraceNodeRef tn_parent, + RGWObjVersionTracker& objv_tracker) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), - lease_cr(std::move(lease_cr)), sync_info(sync_info), - status_oid(status_oid), + sync_pipe(_sync_pipe), sync_status(sync_status), + bs(_sync_pipe.info.source_bs), + lease_cr(std::move(lease_cr)), status_obj(status_obj), objv(objv_tracker), tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync", SSTR(bucket_shard_str{bs}))), - marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker) + marker_tracker(sc, status_obj, sync_status, tn, objv_tracker) { zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key()); prefix_handler.set_rules(sync_pipe.get_rules()); @@ -3955,12 +3950,12 @@ public: int operate(const DoutPrefixProvider *dpp) override; }; -int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp) +int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp) { reenter(this) { - list_marker = sync_info.full_marker.position; + list_marker = sync_status.full.position; - total_entries = sync_info.full_marker.count; + total_entries = sync_status.full.count; do { if (lease_cr && !lease_cr->is_locked()) { drain_all(); @@ -3975,8 +3970,7 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp) break; } - yield call(new RGWListBucketShardCR(sc, bs, list_marker, - &list_result)); + yield call(new RGWListRemoteBucketCR(sc, bs, list_marker, &list_result)); if (retcode < 0 && retcode != -ENOENT) { set_status("failed bucket listing, going down"); drain_all(); @@ -4016,19 +4010,19 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp) [&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); - sync_status = ret; + sync_result = ret; } return 0; }); } - } while (list_result.is_truncated && sync_status == 0); + } while (list_result.is_truncated && sync_result == 0); set_status("done iterating over all objects"); - /* wait for all operations to complete */ + /* wait for all operations to complete */ drain_all_cb([&](uint64_t stack_id, int ret) { if (ret < 0) { tn->log(10, "a sync operation returned error"); - sync_status = ret; + sync_result = ret; } return 0; }); @@ -4036,26 +4030,29 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp) if (lease_cr && !lease_cr->is_locked()) { return set_cr_error(-ECANCELED); } + yield call(marker_tracker.flush()); + if (retcode < 0) { + tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode)); + return set_cr_error(retcode); + } /* update sync state to incremental */ - if (sync_status == 0) { - yield { - sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync; - map attrs; - sync_info.encode_state_attr(attrs); - call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), - attrs)); - } + if (sync_result == 0) { + sync_status.state = BucketSyncState::Incremental; + tn->log(5, SSTR("set bucket state=" << sync_status.state)); + yield call(new RGWSimpleRadosWriteCR( + dpp, sync_env->async_rados, sync_env->svc->sysobj, + status_obj, sync_status, &objv)); + tn->log(5, SSTR("bucket status objv=" << objv)); } else { - tn->log(10, SSTR("backing out with sync_status=" << sync_status)); + tn->log(10, SSTR("backing out with sync_status=" << sync_result)); } - if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */ + if (retcode < 0 && sync_result == 0) { /* actually tried to set incremental state and failed */ tn->log(0, SSTR("ERROR: failed to set sync state on bucket " << bucket_shard_str{bs} << " retcode=" << retcode)); return set_cr_error(retcode); } - if (sync_status < 0) { - return set_cr_error(sync_status); + if (sync_result < 0) { + return set_cr_error(sync_result); } return set_cr_done(); } @@ -4886,6 +4883,7 @@ class RGWSyncBucketShardCR : public RGWCoroutine { boost::intrusive_ptr lease_cr; rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe& sync_pipe; + BucketSyncState& bucket_state; ceph::real_time* progress; const std::string status_oid; @@ -4899,10 +4897,12 @@ public: boost::intrusive_ptr lease_cr, const rgw_bucket_sync_pair_info& _sync_pair, rgw_bucket_sync_pipe& sync_pipe, + BucketSyncState& bucket_state, const RGWSyncTraceNodeRef& tn, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress), + lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), + sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress), status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)), tn(tn) { } @@ -4916,57 +4916,24 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); - drain_all(); return set_cr_error(retcode); } - tn->log(20, SSTR("sync status for source bucket: " << sync_status.state)); - - do { - if (sync_status.state == rgw_bucket_shard_sync_info::StateInit || - sync_status.state == rgw_bucket_shard_sync_info::StateStopped) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false)); - if (retcode == -ENOENT) { - tn->log(0, "bucket sync disabled"); - drain_all(); - return set_cr_done(); - } - if (retcode < 0) { - tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode)); - drain_all(); - return set_cr_error(retcode); - } - } - if (progress) { - *progress = sync_status.inc_marker.timestamp; - } - - if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe, - status_oid, lease_cr, - sync_status, tn, objv_tracker)); - if (retcode < 0) { - tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode)); - drain_all(); - return set_cr_error(retcode); - } - } - - if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, - status_oid, lease_cr, - sync_status, tn, - objv_tracker, progress)); - if (retcode < 0) { - tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); - drain_all(); - return set_cr_error(retcode); - } - } - // loop back to previous states unless incremental sync returns normally - } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync); + tn->log(20, SSTR("sync status for source bucket shard: " << sync_status.state)); + sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; + if (progress) { + *progress = sync_status.inc_marker.timestamp; + } - drain_all(); + yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, + status_oid, lease_cr, + sync_status, tn, + objv_tracker, progress)); + if (retcode < 0) { + tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); + return set_cr_error(retcode); + } + // TODO: handle transition to StateStopped return set_cr_done(); } @@ -4976,7 +4943,8 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp) class RGWSyncBucketCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *env; - boost::intrusive_ptr lease_cr; + boost::intrusive_ptr data_lease_cr; + boost::intrusive_ptr bucket_lease_cr; rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe sync_pipe; ceph::real_time* progress; @@ -4984,6 +4952,7 @@ class RGWSyncBucketCR : public RGWCoroutine { const rgw_raw_obj status_obj; rgw_bucket_sync_status bucket_status; RGWObjVersionTracker objv; + bool init_check_compat = false; RGWSyncTraceNodeRef tn; @@ -4994,7 +4963,7 @@ public: const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env), - lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), + data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), status_obj(env->svc->zone->get_zone_params().log_pool, RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone, sync_pair.source_bs.bucket, @@ -5043,34 +5012,67 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp) return set_cr_error(retcode); } - if (bucket_status.state == BucketSyncState::Init) { - // init sync status - yield { - // use exclusive create if it didn't exist. if we lose the race to - // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come - // back later and read the new status - const bool exclusive = objv.version_for_check() == nullptr; - int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; - call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, - bucket_status, objv, num_shards, - exclusive)); - } - if (retcode < 0) { - return set_cr_error(retcode); + do { + tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state)); + + if (bucket_status.state == BucketSyncState::Init || + bucket_status.state == BucketSyncState::Stopped) { + // init sync status + yield { + init_check_compat = objv.read_version.ver <= 1; // newly-created + int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards; + call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj, + bucket_status, objv, num_shards, + init_check_compat)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } } - } - if (bucket_status.state == BucketSyncState::Full) { - // TODO: full sync - } + if (bucket_status.state == BucketSyncState::Full) { + // take a bucket-wide lease for full sync to prevent the bucket shards + // from duplicating the work + yield { + const std::string lock_name = "full sync"; + const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; + bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj, + lock_name, lock_duration, this)); + spawn(bucket_lease_cr.get(), false); + } + while (!bucket_lease_cr->is_locked()) { + if (bucket_lease_cr->is_done()) { + tn->log(5, "failed to take bucket lease"); + set_status("lease lock failed, early abort"); + drain_all(); + return set_cr_error(bucket_lease_cr->get_ret_status()); + } + tn->log(5, "waiting on bucket lease"); + yield set_sleeping(true); + } - if (bucket_status.state == BucketSyncState::Incremental) { - yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair, - sync_pipe, tn, progress)); - if (retcode < 0) { - return set_cr_error(retcode); + yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj, + bucket_lease_cr, bucket_status, + tn, objv)); + bucket_lease_cr->go_down(); + drain_all(); + bucket_lease_cr.reset(); + if (retcode < 0) { + return set_cr_error(retcode); + } } - } + + if (bucket_status.state == BucketSyncState::Incremental) { + yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair, + sync_pipe, bucket_status.state, + tn, progress)); + if (retcode < 0) { + return set_cr_error(retcode); + } + } + // loop back to previous states unless incremental sync returns normally + } while (bucket_status.state != BucketSyncState::Incremental); + return set_cr_done(); } -- 2.39.5