From 56b5cc1bdbc5489b5c4be63361bf50fbab5603e1 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 18 Feb 2019 18:29:19 -0800 Subject: [PATCH] rgw: sync: separate source and dest bucket This is still incomplete. The idea is that source bucket and destination bucket do not need to reflect the same entity. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_bucket.cc | 2 +- src/rgw/rgw_cr_rados.cc | 14 +-- src/rgw/rgw_cr_rados.h | 36 +++--- src/rgw/rgw_data_sync.cc | 163 ++++++++++++++------------ src/rgw/rgw_data_sync.h | 30 ++++- src/rgw/rgw_rados.cc | 18 +-- src/rgw/rgw_rados.h | 8 +- src/rgw/rgw_sync_module.cc | 12 +- src/rgw/rgw_sync_module.h | 15 +-- src/rgw/rgw_sync_module_aws.cc | 50 ++++---- src/rgw/rgw_sync_module_es.cc | 50 ++++---- src/rgw/rgw_sync_module_log.cc | 22 ++-- src/rgw/rgw_sync_module_pubsub.cc | 61 +++++----- src/rgw/services/svc_datalog_rados.cc | 4 +- src/rgw/services/svc_datalog_rados.h | 2 +- 16 files changed, 272 insertions(+), 217 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 834c9edaa57..788441b13d5 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2285,7 +2285,7 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf out << indented{width, "zone"} << zone.id << " (" << zone.name << ")\n"; out << indented{width, "bucket"} << info.bucket << "\n\n"; - if (!info.bucket_datasync_enabled()) { + if (!info.bucket_datasync_enabled(store->svc.zone)) { out << "Sync is disabled for bucket " << info.bucket.name << '\n'; return 0; } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index b654edcf483..4f39be8a96e 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1099,7 +1099,7 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, map *at } for (int i = 0; i < shards_num; ++i, ++shard_id) { - r = store->svc()->datalog_rados->add_entry(bucket_info.bucket, shard_id); + r = store->svc()->datalog_rados->add_entry(bucket_info, shard_id); if (r < 0) { set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r)); return r; diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index b9b29f4e11d..9adc54a2455 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -583,9 +583,9 @@ int RGWAsyncFetchRemoteObj::_send_request() snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id()); map attrs; - rgw_obj src_obj(bucket_info.bucket, key); + rgw_obj src_obj(src_bucket, key); - rgw_obj dest_obj(bucket_info.bucket, dest_key.value_or(key)); + rgw_obj dest_obj(dest_bucket_info.bucket, dest_key.value_or(key)); std::optional bytes_transferred; int r = store->getRados()->fetch_remote_obj(obj_ctx, @@ -594,8 +594,8 @@ int RGWAsyncFetchRemoteObj::_send_request() source_zone, dest_obj, src_obj, - bucket_info, /* dest */ - bucket_info, /* source */ + dest_bucket_info, /* dest */ + nullptr, /* source */ dest_placement_rule, NULL, /* real_time* src_mtime, */ NULL, /* real_time* mtime, */ @@ -641,16 +641,14 @@ int RGWAsyncStatRemoteObj::_send_request() char buf[16]; snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id()); - rgw_obj src_obj(bucket_info.bucket, key); - - rgw_obj dest_obj(src_obj); + rgw_obj src_obj(src_bucket, key); int r = store->getRados()->stat_remote_obj(obj_ctx, rgw_user(user_id), nullptr, /* req_info */ source_zone, src_obj, - bucket_info, /* source */ + nullptr, /* source */ pmtime, /* real_time* src_mtime, */ psize, /* uint64_t * */ nullptr, /* const real_time* mod_ptr, */ diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 119cdf28aeb..e040624b98e 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -853,8 +853,9 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { rgw::sal::RGWRadosStore *store; string source_zone; - RGWBucketInfo bucket_info; + rgw_bucket src_bucket; std::optional dest_placement_rule; + RGWBucketInfo dest_bucket_info; rgw_obj_key key; std::optional dest_key; @@ -872,8 +873,9 @@ protected: public: RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, const string& _source_zone, - RGWBucketInfo& _bucket_info, + const rgw_bucket& _src_bucket, std::optional _dest_placement_rule, + const RGWBucketInfo& _dest_bucket_info, const rgw_obj_key& _key, const std::optional& _dest_key, std::optional _versioned_epoch, @@ -881,8 +883,9 @@ public: PerfCounters* counters, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), - bucket_info(_bucket_info), + src_bucket(_src_bucket), dest_placement_rule(_dest_placement_rule), + dest_bucket_info(_dest_bucket_info), key(_key), dest_key(_dest_key), versioned_epoch(_versioned_epoch), @@ -901,8 +904,9 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { rgw::sal::RGWRadosStore *store; string source_zone; - RGWBucketInfo bucket_info; + rgw_bucket src_bucket; std::optional dest_placement_rule; + RGWBucketInfo dest_bucket_info; rgw_obj_key key; std::optional dest_key; @@ -920,8 +924,9 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { public: RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store, const string& _source_zone, - RGWBucketInfo& _bucket_info, + const rgw_bucket& _src_bucket, std::optional _dest_placement_rule, + const RGWBucketInfo& _dest_bucket_info, const rgw_obj_key& _key, const std::optional& _dest_key, std::optional _versioned_epoch, @@ -930,8 +935,9 @@ public: : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), - bucket_info(_bucket_info), + src_bucket(_src_bucket), dest_placement_rule(_dest_placement_rule), + dest_bucket_info(_dest_bucket_info), key(_key), dest_key(_dest_key), versioned_epoch(_versioned_epoch), @@ -952,7 +958,7 @@ public: int send_request() override { req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, - source_zone, bucket_info, dest_placement_rule, + source_zone, src_bucket, dest_placement_rule, dest_bucket_info, key, dest_key, versioned_epoch, copy_if_newer, zones_trace, counters, dpp); async_rados->queue(req); @@ -968,8 +974,7 @@ class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest { rgw::sal::RGWRadosStore *store; string source_zone; - RGWBucketInfo bucket_info; - + rgw_bucket src_bucket; rgw_obj_key key; ceph::real_time *pmtime; @@ -983,7 +988,7 @@ protected: public: RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, const string& _source_zone, - RGWBucketInfo& _bucket_info, + rgw_bucket& _src_bucket, const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, @@ -991,7 +996,7 @@ public: map *_pattrs, map *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), - bucket_info(_bucket_info), + src_bucket(_src_bucket), key(_key), pmtime(_pmtime), psize(_psize), @@ -1006,8 +1011,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine { rgw::sal::RGWRadosStore *store; string source_zone; - RGWBucketInfo bucket_info; - + rgw_bucket src_bucket; rgw_obj_key key; ceph::real_time *pmtime; @@ -1021,7 +1025,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine { public: RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store, const string& _source_zone, - RGWBucketInfo& _bucket_info, + rgw_bucket& _src_bucket, const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, @@ -1030,7 +1034,7 @@ public: map *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), - bucket_info(_bucket_info), + src_bucket(_src_bucket), key(_key), pmtime(_pmtime), psize(_psize), @@ -1053,7 +1057,7 @@ public: int send_request() override { req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone, - bucket_info, key, pmtime, psize, petag, pattrs, pheaders); + src_bucket, key, pmtime, psize, petag, pattrs, pheaders); async_rados->queue(req); return 0; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2c2a0272ba3..1e87aaee32e 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -989,8 +989,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; - rgw_bucket_shard bs; - RGWBucketInfo bucket_info; + rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; RGWMetaSyncEnv meta_sync_env; @@ -1003,10 +1002,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { public: RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", SSTR(bucket_shard_str{bs}))) { + sync_pipe.source_bs = bs; } ~RGWRunBucketSyncCoroutine() override { if (lease_cr) { @@ -1689,9 +1689,9 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule { public: RGWDefaultDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -1713,27 +1713,27 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl return 0; } -RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info, - std::nullopt, + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, sync_pipe.source_bs.bucket, + std::nullopt, sync_pipe.dest_bucket_info, key, std::nullopt, versioned_epoch, true, zones_trace, sync_env->counters, sync_env->dpp); } -RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, +RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, versioned, versioned_epoch, + sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, &mtime, zones_trace); } -RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, versioned, versioned_epoch, + sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &mtime, zones_trace); } @@ -1741,9 +1741,9 @@ class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule { public: RGWArchiveDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override; }; @@ -1768,14 +1768,14 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl return 0; } -RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) +RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - if (!bucket_info.versioned() || - (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) { + ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + if (!sync_pipe.dest_bucket_info.versioned() || + (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) { ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl; - bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED; - int op_ret = sync_env->store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), NULL); + sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED; + int op_ret = sync_env->store->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL); if (op_ret < 0) { ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl; return NULL; @@ -1793,25 +1793,25 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RG } return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, std::nullopt, + sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, dest_key, versioned_epoch, true, zones_trace, nullptr, sync_env->dpp); } -RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, +RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; + ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } -RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, +RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) { - ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, versioned, versioned_epoch, + sync_pipe.dest_bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, &mtime, zones_trace); } @@ -2039,7 +2039,7 @@ public: class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; - rgw_bucket_shard bs; + const rgw_bucket_sync_pipe& sync_pipe; const string sync_status_oid; rgw_bucket_shard_sync_info& status; @@ -2047,17 +2047,18 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_index_marker_info info; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, + const rgw_bucket_sync_pipe& _sync_pipe, rgw_bucket_shard_sync_info& _status) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + sync_pipe(_sync_pipe), + sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _sync_pipe)), status(_status) {} int operate() override { reenter(this) { /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, sync_pipe.source_bs, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); @@ -2096,7 +2097,10 @@ public: RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() { - return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status); +#warning FIXME + rgw_bucket_sync_pipe sync_pipe; + sync_pipe.source_bs = bs; + return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, sync_pipe, init_status); } #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync." @@ -2166,10 +2170,10 @@ class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine { map attrs; public: RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, + const rgw_bucket_sync_pipe& sync_pipe, rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), + oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)), status(_status) {} int operate() override; }; @@ -2366,7 +2370,10 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_bucke RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { - return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status); +#warning FIXME + rgw_bucket_sync_pipe sync_pipe; + sync_pipe.source_bs = bs; + return new RGWReadBucketSyncStatusCoroutine(&sync_env, sync_pipe, sync_status); } RGWBucketSyncStatusManager::RGWBucketSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone, @@ -2693,8 +2700,8 @@ template class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - RGWBucketInfo *bucket_info; - const rgw_bucket_shard& bs; + rgw_bucket_sync_pipe& sync_pipe; + rgw_bucket_shard& bs; rgw_obj_key key; bool versioned; @@ -2720,8 +2727,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo *_bucket_info, - const rgw_bucket_shard& bs, + rgw_bucket_sync_pipe& _sync_pipe, const rgw_obj_key& _key, bool _versioned, std::optional _versioned_epoch, real_time& _timestamp, @@ -2730,7 +2736,7 @@ public: const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker, rgw_zone_set& _zones_trace, RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), bs(bs), + sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), owner(_owner), timestamp(_timestamp), op(_op), @@ -2777,25 +2783,25 @@ public: } else if (op == CLS_RGW_OP_ADD || op == CLS_RGW_OP_LINK_OLH) { set_status("syncing obj"); - tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace)); + tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->sync_object(sync_env, sync_pipe, key, versioned_epoch, &zones_trace)); } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) { set_status("removing obj"); if (op == CLS_RGW_OP_UNLINK_INSTANCE) { versioned = true; } - tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); + tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->remove_object(sync_env, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace)); // our copy of the object is more recent, continue as if it succeeded if (retcode == -ERR_PRECONDITION_FAILED) { retcode = 0; } } else if (op == CLS_RGW_OP_LINK_OLH_DM) { set_status("creating delete marker"); - tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); - call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); + tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]")); + call(data_sync_module->create_delete_marker(sync_env, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace)); } - tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key)); + tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key)); } } while (marker_tracker->need_retry(key)); { @@ -2837,8 +2843,8 @@ done: class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - const rgw_bucket_shard& bs; - RGWBucketInfo *bucket_info; + rgw_bucket_sync_pipe& sync_pipe; + rgw_bucket_shard& bs; boost::intrusive_ptr lease_cr; bucket_list_result list_result; list::iterator entries_iter; @@ -2857,14 +2863,15 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, - RGWBucketInfo *_bucket_info, + RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, + rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info), + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), + lease_cr(lease_cr), sync_info(sync_info), marker_tracker(sync_env, status_oid, sync_info.full_marker), status_oid(status_oid), tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync", @@ -2915,7 +2922,7 @@ int RGWBucketShardFullSyncCR::operate() tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?")); } else { using SyncCR = RGWBucketSyncSingleEntryCR; - yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key, + yield spawn(new SyncCR(sync_env, sync_pipe, entry->key, false, /* versioned, only matters for object removal */ entry->versioned_epoch, entry->mtime, entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE, @@ -2987,8 +2994,8 @@ static bool has_olh_epoch(RGWModifyOp op) { class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - const rgw_bucket_shard& bs; - RGWBucketInfo *bucket_info; + rgw_bucket_sync_pipe& sync_pipe; + rgw_bucket_shard& bs; boost::intrusive_ptr lease_cr; list list_result; list::iterator entries_iter, entries_end; @@ -3009,14 +3016,14 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket_shard& bs, - RGWBucketInfo *_bucket_info, + rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), - bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info), + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs), + lease_cr(lease_cr), sync_info(sync_info), marker_tracker(sync_env, status_oid, sync_info.inc_marker), status_oid(status_oid), zone_id(_sync_env->store->svc()->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", @@ -3196,7 +3203,7 @@ int RGWBucketShardIncrementalSyncCR::operate() } tn->log(20, SSTR("entry->timestamp=" << entry->timestamp)); using SyncCR = RGWBucketSyncSingleEntryCR; - spawn(new SyncCR(sync_env, bucket_info, bs, key, + spawn(new SyncCR(sync_env, sync_pipe, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, entry->state, cur_id, &marker_tracker, entry->zones_trace, tn), @@ -3282,7 +3289,7 @@ int RGWRunBucketSyncCoroutine::operate() } tn->log(10, "took lease"); - yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status)); + yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, sync_pipe, &sync_status)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3292,12 +3299,12 @@ int RGWRunBucketSyncCoroutine::operate() tn->log(20, SSTR("sync status for bucket: " << sync_status.state)); - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info)); if (retcode == -ENOENT) { /* bucket instance info has not been synced in yet, fetch it now */ yield { tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata")); - string raw_key = string("bucket.instance:") + bs.bucket.get_key(); + string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key(); meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc()->zone->get_master_conn(), sync_env->async_rados, sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer); @@ -3309,16 +3316,16 @@ int RGWRunBucketSyncCoroutine::operate() tn)); } if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket})); + tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket})); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info)); } if (retcode < 0) { - tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket})); + tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket})); lease_cr->go_down(); drain_all(); return set_cr_error(retcode); @@ -3326,7 +3333,7 @@ int RGWRunBucketSyncCoroutine::operate() do { if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status)); + yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, sync_pipe, sync_status)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock @@ -3344,7 +3351,7 @@ int RGWRunBucketSyncCoroutine::operate() } if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info, + yield call(new RGWBucketShardFullSyncCR(sync_env, sync_pipe, status_oid, lease_cr.get(), sync_status, tn)); if (retcode < 0) { @@ -3356,7 +3363,7 @@ int RGWRunBucketSyncCoroutine::operate() } if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info, + yield call(new RGWBucketShardIncrementalSyncCR(sync_env, sync_pipe, status_oid, lease_cr.get(), sync_status, tn)); if (retcode < 0) { @@ -3507,14 +3514,15 @@ std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const } string RGWBucketSyncStatusManager::status_oid(const string& source_zone, - const rgw_bucket_shard& bs) + const rgw_bucket_sync_pipe& sync_pipe) { - return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); + return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key(); } string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone, const rgw_obj& obj) { +#warning FIXME return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" + obj.key.name + ":" + obj.key.instance; } @@ -3525,6 +3533,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { RGWDataSyncEnv *const env; const int num_shards; rgw_bucket_shard bs; +#warning change this + rgw_bucket_sync_pipe sync_pipe; using Vector = std::vector; Vector::iterator i, end; @@ -3543,7 +3553,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { if (i == end) { return false; } - spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false); + sync_pipe.source_bs = bs; + spawn(new RGWReadBucketSyncStatusCoroutine(env, sync_pipe, &*i), false); ++i; ++bs.shard_id; return true; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 0840da93b09..6979399097d 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -16,6 +16,34 @@ #include "rgw_sync_module.h" #include "rgw_sync_trace.h" +struct rgw_bucket_sync_pipe { + rgw_bucket_shard source_bs; + RGWBucketInfo dest_bucket_info; + string source_prefix; + string dest_prefix; +}; + +inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { + if (p.source_bs.bucket == p.dest_bucket_info.bucket && + p.source_prefix == p.dest_prefix) { + return out << p.source_bs; + } + + out << p.source_bs; + + if (!p.source_prefix.empty()) { + out << "/" << p.source_prefix; + } + + out << " -> " << p.dest_bucket_info.bucket; + + if (!p.dest_prefix.empty()) { + out << "/" << p.dest_prefix; + } + + return out; +} + struct rgw_datalog_info { uint32_t num_shards; @@ -564,7 +592,7 @@ public: map& get_sync_status() { return sync_status; } int init_sync_status(); - static string status_oid(const string& source_zone, const rgw_bucket_shard& bs); + static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs); static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */ // implements DoutPrefixProvider diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 67093096efd..3012b79f904 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3507,7 +3507,7 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx, req_info *info, const string& source_zone, rgw_obj& src_obj, - RGWBucketInfo& src_bucket_info, + const RGWBucketInfo *src_bucket_info, real_time *src_mtime, uint64_t *psize, const real_time *mod_ptr, @@ -3532,12 +3532,12 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx, RGWRESTConn *conn; if (source_zone.empty()) { - if (src_bucket_info.zonegroup.empty()) { + if (!src_bucket_info || src_bucket_info->zonegroup.empty()) { /* source is in the master zonegroup */ conn = svc.zone->get_master_conn(); } else { auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map(); - map::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup); + map::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup); if (iter == zonegroup_conn_map.end()) { ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl; return -ENOENT; @@ -3622,8 +3622,8 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, const string& source_zone, const rgw_obj& dest_obj, const rgw_obj& src_obj, - RGWBucketInfo& dest_bucket_info, - RGWBucketInfo& src_bucket_info, + const RGWBucketInfo& dest_bucket_info, + const RGWBucketInfo *src_bucket_info, std::optional dest_placement_rule, real_time *src_mtime, real_time *mtime, @@ -3665,11 +3665,11 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, auto& zone_conn_map = svc.zone->get_zone_conn_map(); auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map(); if (source_zone.empty()) { - if (dest_bucket_info.zonegroup.empty()) { + if (!src_bucket_info || src_bucket_info->zonegroup.empty()) { /* source is in the master zonegroup */ conn = svc.zone->get_master_conn(); } else { - map::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup); + map::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup); if (iter == zonegroup_conn_map.end()) { ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl; return -ENOENT; @@ -3988,7 +3988,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx, if (remote_src || !source_zone.empty()) { return fetch_remote_obj(obj_ctx, user_id, info, source_zone, - dest_obj, src_obj, dest_bucket_info, src_bucket_info, + dest_obj, src_obj, dest_bucket_info, &src_bucket_info, dest_placement, src_mtime, mtime, mod_ptr, unmod_ptr, high_precision_time, if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category, @@ -7014,7 +7014,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc return 0; } -int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta, +int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta, uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, optional_yield y, rgw_zone_set *zones_trace, bool log_data_change) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 8213f20cacc..9eab78c83b5 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1060,7 +1060,7 @@ public: req_info *info, const string& source_zone, rgw_obj& src_obj, - RGWBucketInfo& src_bucket_info, + const RGWBucketInfo *src_bucket_info, real_time *src_mtime, uint64_t *psize, const real_time *mod_ptr, @@ -1080,8 +1080,8 @@ public: const string& source_zone, const rgw_obj& dest_obj, const rgw_obj& src_obj, - RGWBucketInfo& dest_bucket_info, - RGWBucketInfo& src_bucket_info, + const RGWBucketInfo& dest_bucket_info, + const RGWBucketInfo *src_bucket_info, std::optional dest_placement, ceph::real_time *src_mtime, ceph::real_time *mtime, @@ -1277,7 +1277,7 @@ public: bufferlist& obj_tag, map >& log, uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr); int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr); - int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta, + int set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta, uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, optional_yield y, rgw_zone_set *zones_trace = nullptr, bool log_data_change = false); int repair_olh(RGWObjState* state, const RGWBucketInfo& bucket_info, diff --git a/src/rgw/rgw_sync_module.cc b/src/rgw/rgw_sync_module.cc index cfb5e3fc103..9ee2b4341f6 100644 --- a/src/rgw/rgw_sync_module.cc +++ b/src/rgw/rgw_sync_module.cc @@ -28,15 +28,15 @@ RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler() } RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), key(_key) { + src_bucket(_src_bucket), key(_key) { } RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), key(_key) { + src_bucket(_src_bucket), key(_key) { } int RGWCallStatRemoteObjCR::operate() { @@ -44,14 +44,14 @@ int RGWCallStatRemoteObjCR::operate() { yield { call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, &mtime, &size, &etag, &attrs, &headers)); + src_bucket, key, &mtime, &size, &etag, &attrs, &headers)); } if (retcode < 0) { ldout(sync_env->cct, 10) << "RGWStatRemoteObjCR() returned " << retcode << dendl; return set_cr_error(retcode); } ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key + << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << dendl; yield { RGWStatRemoteObjCBCR *cb = allocate_callback(); diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index ea731d62134..6f989d981e0 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -12,6 +12,7 @@ class RGWRemoteDataLog; struct RGWDataSyncEnv; struct rgw_bucket_entry_owner; struct rgw_obj_key; +struct rgw_bucket_sync_pipe; class RGWDataSyncModule { @@ -28,10 +29,10 @@ public: virtual RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) { return nullptr; } - virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) = 0; + virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; - virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0; }; @@ -140,7 +141,7 @@ class RGWStatRemoteObjCBCR : public RGWCoroutine { protected: RGWDataSyncEnv *sync_env; - RGWBucketInfo bucket_info; + rgw_bucket src_bucket; rgw_obj_key key; ceph::real_time mtime; @@ -150,7 +151,7 @@ protected: map headers; public: RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key); + rgw_bucket& _src_bucket, rgw_obj_key& _key); ~RGWStatRemoteObjCBCR() override {} void set_result(ceph::real_time& _mtime, @@ -176,12 +177,12 @@ class RGWCallStatRemoteObjCR : public RGWCoroutine { protected: RGWDataSyncEnv *sync_env; - RGWBucketInfo bucket_info; + rgw_bucket src_bucket; rgw_obj_key key; public: RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key); + rgw_bucket& _src_bucket, rgw_obj_key& _key); ~RGWCallStatRemoteObjCR() override {} diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 33f41b3dce1..547cfeac4ae 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -1548,6 +1548,7 @@ int decode_attr(map& attrs, const char *attr_name, T *result // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { + rgw_bucket_sync_pipe sync_pipe; AWSSyncInstanceEnv& instance; uint64_t versioned_epoch{0}; @@ -1576,10 +1577,11 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { public: RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, AWSSyncInstanceEnv& _instance, - uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) {} @@ -1599,7 +1601,7 @@ public: } } ldout(sync_env->cct, 4) << "AWS: download begin: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size + << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " etag=" << etag << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver << dendl; @@ -1610,8 +1612,8 @@ public: return set_cr_error(-EINVAL); } - instance.get_profile(bucket_info.bucket, &target); - instance.conf.get_target(target, bucket_info, key, &target_bucket_name, &target_obj_name); + instance.get_profile(sync_pipe.source_bs.bucket, &target); + instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name); if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { @@ -1651,7 +1653,7 @@ public: } yield { - rgw_obj src_obj(bucket_info.bucket, key); + rgw_obj src_obj(src_bucket, key); /* init output */ rgw_bucket target_bucket; @@ -1695,43 +1697,45 @@ public: }; class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + rgw_bucket_sync_pipe sync_pipe; AWSSyncInstanceEnv& instance; uint64_t versioned_epoch; public: RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, + AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + sync_pipe(_sync_pipe), instance(_instance), versioned_epoch(_versioned_epoch) { } ~RGWAWSHandleRemoteObjCR() {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWAWSHandleRemoteObjCBCR(sync_env, bucket_info, key, instance, versioned_epoch); + return new RGWAWSHandleRemoteObjCBCR(sync_env, sync_pipe, key, instance, versioned_epoch); } }; class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env{nullptr}; std::shared_ptr target; - RGWBucketInfo bucket_info; + rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; ceph::real_time mtime; AWSSyncInstanceEnv& instance; int ret{0}; public: RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), key(_key), + sync_pipe(_sync_pipe), key(_key), mtime(_mtime), instance(_instance) {} int operate() override { reenter(this) { ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; + << " b=" <cct, target->conn.get(), @@ -1764,21 +1768,21 @@ public: ~RGWAWSDataSyncModule() {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, instance, versioned_epoch.value_or(0)); + ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWAWSHandleRemoteObjCR(sync_env, sync_pipe, key, instance, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) <<"rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWAWSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, instance); + ldout(sync_env->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + return new RGWAWSRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, instance); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_es.cc b/src/rgw/rgw_sync_module_es.cc index 83fc0e8cef3..46433c3903a 100644 --- a/src/rgw/rgw_sync_module_es.cc +++ b/src/rgw/rgw_sync_module_es.cc @@ -770,22 +770,24 @@ public: }; class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { + rgw_bucket_sync_pipe sync_pipe; ElasticConfigRef conf; uint64_t versioned_epoch; public: RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf), + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key + << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << dendl; yield { - string path = conf->get_obj_path(bucket_info, key); - es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); + string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); + es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch); call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, @@ -804,40 +806,42 @@ public: }; class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + rgw_bucket_sync_pipe sync_pipe; ElasticConfigRef conf; uint64_t versioned_epoch; public: RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, + ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + sync_pipe(_sync_pipe), conf(_conf), versioned_epoch(_versioned_epoch) { } ~RGWElasticHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch); + return new RGWElasticHandleRemoteObjCBCR(sync_env, sync_pipe, key, conf, versioned_epoch); } }; class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - RGWBucketInfo bucket_info; + rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; ceph::real_time mtime; ElasticConfigRef conf; public: RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), key(_key), + sync_pipe(_sync_pipe), key(_key), mtime(_mtime), conf(_conf) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; + << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { - string path = conf->get_obj_path(bucket_info, key); + string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key); call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(), sync_env->http_manager, @@ -876,26 +880,26 @@ public: return new RGWElasticGetESInfoCBCR(sync_env, conf); } - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - if (!conf->should_handle_operation(bucket_info)) { + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } - return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0)); + return new RGWElasticHandleRemoteObjCR(sync_env, sync_pipe, key, conf, versioned_epoch.value_or(0)); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { /* versioned and versioned epoch params are useless in the elasticsearch backend case */ - ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - if (!conf->should_handle_operation(bucket_info)) { + ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) { ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; return nullptr; } - return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf); + return new RGWElasticRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, conf); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; return NULL; diff --git a/src/rgw/rgw_sync_module_log.cc b/src/rgw/rgw_sync_module_log.cc index af9eb2da148..981e820910f 100644 --- a/src/rgw/rgw_sync_module_log.cc +++ b/src/rgw/rgw_sync_module_log.cc @@ -13,10 +13,10 @@ class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR { public: RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key) {} + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _src_bucket, _key) {} int operate() override { ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; return set_cr_done(); } @@ -26,13 +26,13 @@ public: class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR { public: RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key) { + rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _src_bucket, _key) { } ~RGWLogStatRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWLogStatRemoteObjCBCR(sync_env, bucket_info, key); + return new RGWLogStatRemoteObjCBCR(sync_env, src_bucket, key); } }; @@ -41,17 +41,17 @@ class RGWLogDataSyncModule : public RGWDataSyncModule { public: explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {} - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key); + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; + return new RGWLogStatRemoteObjCR(sync_env, sync_pipe.source_bs.bucket, key); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { + ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime + ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return NULL; } diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index abdf8b1f545..af797aa4a10 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -1309,6 +1309,7 @@ public: // coroutine invoked on remote object creation class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { RGWDataSyncEnv *sync_env; + rgw_bucket_sync_pipe sync_pipe; PSEnvRef env; std::optional versioned_epoch; EventRef event; @@ -1316,10 +1317,11 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { TopicsRef topics; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key), sync_env(_sync_env), + sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { @@ -1327,7 +1329,7 @@ public: int operate() override { reenter(this) { ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime + << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; { std::vector > attrs; @@ -1342,16 +1344,17 @@ public: // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sync_env->cct, - bucket_info.bucket, key, + sync_pipe.source_bs.bucket, key, mtime, &attrs, rgw::notify::ObjectCreated, &event); make_s3_record_ref(sync_env->cct, - bucket_info.bucket, bucket_info.owner, key, + sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, mtime, &attrs, rgw::notify::ObjectCreated, &record); } - yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics)); +#warning should it be source owner? + yield call(new RGWPSHandleObjEventCR(sync_env, env, sync_pipe.dest_bucket_info.owner, event, record, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1362,14 +1365,16 @@ public: }; class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { + rgw_bucket_sync_pipe sync_pipe; PSEnvRef env; std::optional versioned_epoch; TopicsRef topics; public: RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, - TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key), + sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { } @@ -1377,24 +1382,24 @@ public: ~RGWPSHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { - return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics); + return new RGWPSHandleRemoteObjCBCR(sync_env, sync_pipe, key, env, versioned_epoch, topics); } }; class RGWPSHandleObjCreateCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - RGWBucketInfo bucket_info; + rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; PSEnvRef env; std::optional versioned_epoch; TopicsRef topics; public: RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), + sync_pipe(_sync_pipe), key(_key), env(_env), versioned_epoch(_versioned_epoch) { @@ -1404,8 +1409,8 @@ public: int operate() override { reenter(this) { - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, - bucket_info.bucket, key, + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, sync_pipe.dest_bucket_info.owner, + sync_pipe.source_bs.bucket, key, rgw::notify::ObjectCreated, &topics)); if (retcode < 0) { @@ -1413,10 +1418,10 @@ public: return set_cr_error(retcode); } if (topics->empty()) { - ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl; + ldout(sync_env->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl; return set_cr_done(); } - yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics)); + yield call(new RGWPSHandleRemoteObjCR(sync_env, sync_pipe, key, env, versioned_epoch, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1441,12 +1446,12 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { public: RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, PSEnvRef _env, - RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, + rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), - owner(_bucket_info.owner), - bucket(_bucket_info.bucket), + owner(_sync_pipe.dest_bucket_info.owner), + bucket(_sync_pipe.dest_bucket_info.bucket), key(_key), mtime(_mtime), event_type(_event_type) {} int operate() override { @@ -1505,25 +1510,25 @@ public: return new RGWPSInitEnvCBCR(sync_env, env); } - RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << + ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; - return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch); + return new RGWPSHandleObjCreateCR(sync_env, sync_pipe, key, env, versioned_epoch); } - RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << + ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete); + return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); } - RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, + RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { - ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << + ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; - return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); + return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); } PSConfigRef& get_conf() { return conf; } diff --git a/src/rgw/services/svc_datalog_rados.cc b/src/rgw/services/svc_datalog_rados.cc index 54c20fff7c8..3371c6e1b93 100644 --- a/src/rgw/services/svc_datalog_rados.cc +++ b/src/rgw/services/svc_datalog_rados.cc @@ -56,9 +56,9 @@ int RGWSI_DataLog_RADOS::get_info(int shard_id, RGWDataChangesLogInfo *info) return log->get_info(shard_id, info); } -int RGWSI_DataLog_RADOS::add_entry(const rgw_bucket& bucket, int shard_id) +int RGWSI_DataLog_RADOS::add_entry(const RGWBucketInfo& bucket_info, int shard_id) { - return log->add_entry(bucket, shard_id); + return log->add_entry(bucket_info, shard_id); } int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, diff --git a/src/rgw/services/svc_datalog_rados.h b/src/rgw/services/svc_datalog_rados.h index 19a6c5edb56..5317035c09f 100644 --- a/src/rgw/services/svc_datalog_rados.h +++ b/src/rgw/services/svc_datalog_rados.h @@ -58,7 +58,7 @@ public: int get_info(int shard_id, RGWDataChangesLogInfo *info); - int add_entry(const rgw_bucket& bucket, int shard_id); + int add_entry(const RGWBucketInfo& bucket_info, int shard_id); int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries, list& entries, const string& marker, -- 2.39.5