]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: sync: separate source and dest bucket
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 19 Feb 2019 02:29:19 +0000 (18:29 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
This is still incomplete. The idea is that source bucket and destination
bucket do not need to reflect the same entity.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
16 files changed:
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync_module.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_aws.cc
src/rgw/rgw_sync_module_es.cc
src/rgw/rgw_sync_module_log.cc
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/services/svc_datalog_rados.cc
src/rgw/services/svc_datalog_rados.h

index 834c9edaa577d41fc19f94c23f7cf08763e227b1..788441b13d5bf70c344ac5a65c07b44cd253573a 100644 (file)
@@ -2285,7 +2285,7 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf
   out << indented{width, "zone"} << zone.id << " (" << zone.name << ")\n";
   out << indented{width, "bucket"} << info.bucket << "\n\n";
 
-  if (!info.bucket_datasync_enabled()) {
+  if (!info.bucket_datasync_enabled(store->svc.zone)) {
     out << "Sync is disabled for bucket " << info.bucket.name << '\n';
     return 0;
   }
index b654edcf4832505db693e3e0bfd671bcdc1384a0..4f39be8a96eb4abb86e8c48822f5707c4f9747d4 100644 (file)
@@ -1099,7 +1099,7 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *at
   }
 
   for (int i = 0; i < shards_num; ++i, ++shard_id) {
-    r = store->svc()->datalog_rados->add_entry(bucket_info.bucket, shard_id);
+    r = store->svc()->datalog_rados->add_entry(bucket_info, shard_id);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
       return r;
index b9b29f4e11dfabe6fb4de843b5f541bde7d2d88b..9adc54a2455504cd971252e4c2bb7ea04c6e77d1 100644 (file)
@@ -583,9 +583,9 @@ int RGWAsyncFetchRemoteObj::_send_request()
   snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
   map<string, bufferlist> attrs;
 
-  rgw_obj src_obj(bucket_info.bucket, key);
+  rgw_obj src_obj(src_bucket, key);
 
-  rgw_obj dest_obj(bucket_info.bucket, dest_key.value_or(key));
+  rgw_obj dest_obj(dest_bucket_info.bucket, dest_key.value_or(key));
 
   std::optional<uint64_t> bytes_transferred;
   int r = store->getRados()->fetch_remote_obj(obj_ctx,
@@ -594,8 +594,8 @@ int RGWAsyncFetchRemoteObj::_send_request()
                        source_zone,
                        dest_obj,
                        src_obj,
-                       bucket_info, /* dest */
-                       bucket_info, /* source */
+                       dest_bucket_info, /* dest */
+                       nullptr, /* source */
                       dest_placement_rule,
                        NULL, /* real_time* src_mtime, */
                        NULL, /* real_time* mtime, */
@@ -641,16 +641,14 @@ int RGWAsyncStatRemoteObj::_send_request()
   char buf[16];
   snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
 
-  rgw_obj src_obj(bucket_info.bucket, key);
-
-  rgw_obj dest_obj(src_obj);
+  rgw_obj src_obj(src_bucket, key);
 
   int r = store->getRados()->stat_remote_obj(obj_ctx,
                        rgw_user(user_id),
                        nullptr, /* req_info */
                        source_zone,
                        src_obj,
-                       bucket_info, /* source */
+                       nullptr, /* source */
                        pmtime, /* real_time* src_mtime, */
                        psize, /* uint64_t * */
                        nullptr, /* const real_time* mod_ptr, */
index 119cdf28aeb897cce5b472335cb2a90a2f758f46..e040624b98ec2ffe963b4cb83d9c1b999a642daf 100644 (file)
@@ -853,8 +853,9 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
   rgw::sal::RGWRadosStore *store;
   string source_zone;
 
-  RGWBucketInfo bucket_info;
+  rgw_bucket src_bucket;
   std::optional<rgw_placement_rule> dest_placement_rule;
+  RGWBucketInfo dest_bucket_info;
 
   rgw_obj_key key;
   std::optional<rgw_obj_key> dest_key;
@@ -872,8 +873,9 @@ protected:
 public:
   RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
                          const string& _source_zone,
-                         RGWBucketInfo& _bucket_info,
+                         const rgw_bucket& _src_bucket,
                         std::optional<rgw_placement_rule> _dest_placement_rule,
+                         const RGWBucketInfo& _dest_bucket_info,
                          const rgw_obj_key& _key,
                          const std::optional<rgw_obj_key>& _dest_key,
                          std::optional<uint64_t> _versioned_epoch,
@@ -881,8 +883,9 @@ public:
                          PerfCounters* counters, const DoutPrefixProvider *dpp)
     : RGWAsyncRadosRequest(caller, cn), store(_store),
       source_zone(_source_zone),
-      bucket_info(_bucket_info),
+      src_bucket(_src_bucket),
       dest_placement_rule(_dest_placement_rule),
+      dest_bucket_info(_dest_bucket_info),
       key(_key),
       dest_key(_dest_key),
       versioned_epoch(_versioned_epoch),
@@ -901,8 +904,9 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
   rgw::sal::RGWRadosStore *store;
   string source_zone;
 
-  RGWBucketInfo bucket_info;
+  rgw_bucket src_bucket;
   std::optional<rgw_placement_rule> dest_placement_rule;
+  RGWBucketInfo dest_bucket_info;
 
   rgw_obj_key key;
   std::optional<rgw_obj_key> dest_key;
@@ -920,8 +924,9 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
 public:
   RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
                       const string& _source_zone,
-                      RGWBucketInfo& _bucket_info,
+                      const rgw_bucket& _src_bucket,
                      std::optional<rgw_placement_rule> _dest_placement_rule,
+                      const RGWBucketInfo& _dest_bucket_info,
                       const rgw_obj_key& _key,
                       const std::optional<rgw_obj_key>& _dest_key,
                       std::optional<uint64_t> _versioned_epoch,
@@ -930,8 +935,9 @@ public:
     : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
       async_rados(_async_rados), store(_store),
       source_zone(_source_zone),
-      bucket_info(_bucket_info),
+      src_bucket(_src_bucket),
       dest_placement_rule(_dest_placement_rule),
+      dest_bucket_info(_dest_bucket_info),
       key(_key),
       dest_key(_dest_key),
       versioned_epoch(_versioned_epoch),
@@ -952,7 +958,7 @@ public:
 
   int send_request() override {
     req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
-                                    source_zone, bucket_info, dest_placement_rule,
+                                    source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
                                      key, dest_key, versioned_epoch, copy_if_newer,
                                      zones_trace, counters, dpp);
     async_rados->queue(req);
@@ -968,8 +974,7 @@ class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
   rgw::sal::RGWRadosStore *store;
   string source_zone;
 
-  RGWBucketInfo bucket_info;
-
+  rgw_bucket src_bucket;
   rgw_obj_key key;
 
   ceph::real_time *pmtime;
@@ -983,7 +988,7 @@ protected:
 public:
   RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
                          const string& _source_zone,
-                         RGWBucketInfo& _bucket_info,
+                         rgw_bucket& _src_bucket,
                          const rgw_obj_key& _key,
                          ceph::real_time *_pmtime,
                          uint64_t *_psize,
@@ -991,7 +996,7 @@ public:
                          map<string, bufferlist> *_pattrs,
                          map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
                                                       source_zone(_source_zone),
-                                                      bucket_info(_bucket_info),
+                                                      src_bucket(_src_bucket),
                                                       key(_key),
                                                       pmtime(_pmtime),
                                                       psize(_psize),
@@ -1006,8 +1011,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
   rgw::sal::RGWRadosStore *store;
   string source_zone;
 
-  RGWBucketInfo bucket_info;
-
+  rgw_bucket src_bucket;
   rgw_obj_key key;
 
   ceph::real_time *pmtime;
@@ -1021,7 +1025,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
 public:
   RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
                       const string& _source_zone,
-                      RGWBucketInfo& _bucket_info,
+                      rgw_bucket& _src_bucket,
                       const rgw_obj_key& _key,
                       ceph::real_time *_pmtime,
                       uint64_t *_psize,
@@ -1030,7 +1034,7 @@ public:
                       map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
                                        async_rados(_async_rados), store(_store),
                                        source_zone(_source_zone),
-                                       bucket_info(_bucket_info),
+                                       src_bucket(_src_bucket),
                                        key(_key),
                                        pmtime(_pmtime),
                                        psize(_psize),
@@ -1053,7 +1057,7 @@ public:
 
   int send_request() override {
     req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
-                                    bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
+                                    src_bucket, key, pmtime, psize, petag, pattrs, pheaders);
     async_rados->queue(req);
     return 0;
   }
index 2c2a0272ba3c02d2711e7abfcb08a1edf420f9bc..1e87aaee32e590168874d4da732860fb47e7428c 100644 (file)
@@ -989,8 +989,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
 
 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  rgw_bucket_shard bs;
-  RGWBucketInfo bucket_info;
+  rgw_bucket_sync_pipe sync_pipe;
   rgw_bucket_shard_sync_info sync_status;
   RGWMetaSyncEnv meta_sync_env;
 
@@ -1003,10 +1002,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
 
 public:
   RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
                                          SSTR(bucket_shard_str{bs}))) {
+    sync_pipe.source_bs = bs;
   }
   ~RGWRunBucketSyncCoroutine() override {
     if (lease_cr) {
@@ -1689,9 +1689,9 @@ class RGWDefaultDataSyncModule : public RGWDataSyncModule {
 public:
   RGWDefaultDataSyncModule() {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
 };
 
@@ -1713,27 +1713,27 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
   return 0;
 }
 
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
-  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
-                                std::nullopt,
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, sync_pipe.source_bs.bucket,
+                                std::nullopt, sync_pipe.dest_bucket_info,
                                  key, std::nullopt, versioned_epoch,
                                  true, zones_trace, sync_env->counters, sync_env->dpp);
 }
 
-RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
                                                       real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
-                            bucket_info, key, versioned, versioned_epoch,
+                            sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
                             NULL, NULL, false, &mtime, zones_trace);
 }
 
-RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                                              rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
-                            bucket_info, key, versioned, versioned_epoch,
+                            sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
                             &owner.id, &owner.display_name, true, &mtime, zones_trace);
 }
 
@@ -1741,9 +1741,9 @@ class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
 public:
   RGWArchiveDataSyncModule() {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
 };
 
@@ -1768,14 +1768,14 @@ int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattabl
   return 0;
 }
 
-RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
-  ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-  if (!bucket_info.versioned() ||
-     (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
+  ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+  if (!sync_pipe.dest_bucket_info.versioned() ||
+     (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
       ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
-      bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
-      int op_ret = sync_env->store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
+      sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
+      int op_ret = sync_env->store->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL);
       if (op_ret < 0) {
          ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
          return NULL;
@@ -1793,25 +1793,25 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RG
   }
 
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
-                                 bucket_info, std::nullopt,
+                                 sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
                                  key, dest_key, versioned_epoch,
                                  true, zones_trace, nullptr, sync_env->dpp);
 }
 
-RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
                                                      real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
-  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
   return NULL;
 }
 
-RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                                             rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
 {
-  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+  ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
                                    << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
   return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
-                            bucket_info, key, versioned, versioned_epoch,
+                            sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
                             &owner.id, &owner.display_name, true, &mtime, zones_trace);
 }
 
@@ -2039,7 +2039,7 @@ public:
 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
-  rgw_bucket_shard bs;
+  const rgw_bucket_sync_pipe& sync_pipe;
   const string sync_status_oid;
 
   rgw_bucket_shard_sync_info& status;
@@ -2047,17 +2047,18 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
   rgw_bucket_index_marker_info info;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
-                                        const rgw_bucket_shard& bs,
+                                        const rgw_bucket_sync_pipe& _sync_pipe,
                                         rgw_bucket_shard_sync_info& _status)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      sync_pipe(_sync_pipe),
+      sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _sync_pipe)),
       status(_status)
   {}
 
   int operate() override {
     reenter(this) {
       /* fetch current position in logs */
-      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
+      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, sync_pipe.source_bs, &info));
       if (retcode < 0 && retcode != -ENOENT) {
         ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
         return set_cr_error(retcode);
@@ -2096,7 +2097,10 @@ public:
 
 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
 {
-  return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
+#warning FIXME
+  rgw_bucket_sync_pipe sync_pipe;
+  sync_pipe.source_bs = bs;
+  return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, sync_pipe, init_status);
 }
 
 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
@@ -2166,10 +2170,10 @@ class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
   map<string, bufferlist> attrs;
 public:
   RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
-                                   const rgw_bucket_shard& bs,
+                                   const rgw_bucket_sync_pipe& sync_pipe,
                                    rgw_bucket_shard_sync_info *_status)
     : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-      oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+      oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)),
       status(_status) {}
   int operate() override;
 };
@@ -2366,7 +2370,10 @@ int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_bucke
 
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
-  return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
+#warning FIXME
+  rgw_bucket_sync_pipe sync_pipe;
+  sync_pipe.source_bs = bs;
+  return new RGWReadBucketSyncStatusCoroutine(&sync_env, sync_pipe, sync_status);
 }
 
 RGWBucketSyncStatusManager::RGWBucketSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
@@ -2693,8 +2700,8 @@ template <class T, class K>
 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
-  RGWBucketInfo *bucket_info;
-  const rgw_bucket_shard& bs;
+  rgw_bucket_sync_pipe& sync_pipe;
+  rgw_bucket_shard& bs;
 
   rgw_obj_key key;
   bool versioned;
@@ -2720,8 +2727,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 public:
   RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
-                             RGWBucketInfo *_bucket_info,
-                             const rgw_bucket_shard& bs,
+                             rgw_bucket_sync_pipe& _sync_pipe,
                              const rgw_obj_key& _key, bool _versioned,
                              std::optional<uint64_t> _versioned_epoch,
                              real_time& _timestamp,
@@ -2730,7 +2736,7 @@ public:
                             const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
                              RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
                                                      sync_env(_sync_env),
-                                                      bucket_info(_bucket_info), bs(bs),
+                                                      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
                                                       key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
                                                       owner(_owner),
                                                       timestamp(_timestamp), op(_op),
@@ -2777,25 +2783,25 @@ public:
           } else if (op == CLS_RGW_OP_ADD ||
                      op == CLS_RGW_OP_LINK_OLH) {
             set_status("syncing obj");
-            tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
-            call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
+            tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+            call(data_sync_module->sync_object(sync_env, sync_pipe, key, versioned_epoch, &zones_trace));
           } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
             set_status("removing obj");
             if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
               versioned = true;
             }
-            tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
-            call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
+            tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+            call(data_sync_module->remove_object(sync_env, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
             // our copy of the object is more recent, continue as if it succeeded
             if (retcode == -ERR_PRECONDITION_FAILED) {
               retcode = 0;
             }
           } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
             set_status("creating delete marker");
-            tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
-            call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
+            tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+            call(data_sync_module->create_delete_marker(sync_env, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
           }
-          tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
+          tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key));
         }
       } while (marker_tracker->need_retry(key));
       {
@@ -2837,8 +2843,8 @@ done:
 
 class RGWBucketShardFullSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  const rgw_bucket_shard& bs;
-  RGWBucketInfo *bucket_info;
+  rgw_bucket_sync_pipe& sync_pipe;
+  rgw_bucket_shard& bs;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   bucket_list_result list_result;
   list<bucket_list_entry>::iterator entries_iter;
@@ -2857,14 +2863,15 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
 
   RGWSyncTraceNodeRef tn;
 public:
-  RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
-                           RGWBucketInfo *_bucket_info,
+  RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
+                           rgw_bucket_sync_pipe& _sync_pipe,
                            const std::string& status_oid,
                            RGWContinuousLeaseCR *lease_cr,
                            rgw_bucket_shard_sync_info& sync_info,
                            RGWSyncTraceNodeRef tn_parent)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+      lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sync_env, status_oid, sync_info.full_marker),
       status_oid(status_oid),
       tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
@@ -2915,7 +2922,7 @@ int RGWBucketShardFullSyncCR::operate()
           tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
         } else {
           using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
-          yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
+          yield spawn(new SyncCR(sync_env, sync_pipe, entry->key,
                                  false, /* versioned, only matters for object removal */
                                  entry->versioned_epoch, entry->mtime,
                                  entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
@@ -2987,8 +2994,8 @@ static bool has_olh_epoch(RGWModifyOp op) {
 
 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  const rgw_bucket_shard& bs;
-  RGWBucketInfo *bucket_info;
+  rgw_bucket_sync_pipe& sync_pipe;
+  rgw_bucket_shard& bs;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   list<rgw_bi_log_entry> list_result;
   list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
@@ -3009,14 +3016,14 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
-                                  const rgw_bucket_shard& bs,
-                                  RGWBucketInfo *_bucket_info,
+                                  rgw_bucket_sync_pipe& _sync_pipe,
                                   const std::string& status_oid,
                                   RGWContinuousLeaseCR *lease_cr,
                                   rgw_bucket_shard_sync_info& sync_info,
                                   RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
-      bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+      lease_cr(lease_cr), sync_info(sync_info),
       marker_tracker(sync_env, status_oid, sync_info.inc_marker),
       status_oid(status_oid), zone_id(_sync_env->store->svc()->zone->get_zone().id),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
@@ -3196,7 +3203,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
             }
             tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
             using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
-            spawn(new SyncCR(sync_env, bucket_info, bs, key,
+            spawn(new SyncCR(sync_env, sync_pipe, key,
                              entry->is_versioned(), versioned_epoch,
                              entry->timestamp, owner, entry->op, entry->state,
                              cur_id, &marker_tracker, entry->zones_trace, tn),
@@ -3282,7 +3289,7 @@ int RGWRunBucketSyncCoroutine::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
+    yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, sync_pipe, &sync_status));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -3292,12 +3299,12 @@ int RGWRunBucketSyncCoroutine::operate()
 
     tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
 
-    yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
+    yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
     if (retcode == -ENOENT) {
       /* bucket instance info has not been synced in yet, fetch it now */
       yield {
         tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
-        string raw_key = string("bucket.instance:") + bs.bucket.get_key();
+        string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
 
         meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc()->zone->get_master_conn(), sync_env->async_rados,
                            sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
@@ -3309,16 +3316,16 @@ int RGWRunBucketSyncCoroutine::operate()
                                           tn));
       }
       if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
+        tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket}));
         lease_cr->go_down();
         drain_all();
         return set_cr_error(retcode);
       }
 
-      yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
+      yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
     }
     if (retcode < 0) {
-      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
       lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
@@ -3326,7 +3333,7 @@ int RGWRunBucketSyncCoroutine::operate()
 
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
-        yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+        yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, sync_pipe, sync_status));
         if (retcode == -ENOENT) {
           tn->log(0, "bucket sync disabled");
           lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
@@ -3344,7 +3351,7 @@ int RGWRunBucketSyncCoroutine::operate()
       }
 
       if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
-        yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+        yield call(new RGWBucketShardFullSyncCR(sync_env, sync_pipe,
                                                 status_oid, lease_cr.get(),
                                                 sync_status, tn));
         if (retcode < 0) {
@@ -3356,7 +3363,7 @@ int RGWRunBucketSyncCoroutine::operate()
       }
 
       if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
-        yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+        yield call(new RGWBucketShardIncrementalSyncCR(sync_env, sync_pipe,
                                                        status_oid, lease_cr.get(),
                                                        sync_status, tn));
         if (retcode < 0) {
@@ -3507,14 +3514,15 @@ std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
 }
 
 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
-                                              const rgw_bucket_shard& bs)
+                                              const rgw_bucket_sync_pipe& sync_pipe)
 {
-  return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
+  return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key();
 }
 
 string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
                                                   const rgw_obj& obj)
 {
+#warning FIXME
   return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
          obj.key.name + ":" + obj.key.instance;
 }
@@ -3525,6 +3533,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
   RGWDataSyncEnv *const env;
   const int num_shards;
   rgw_bucket_shard bs;
+#warning change this
+  rgw_bucket_sync_pipe sync_pipe;
 
   using Vector = std::vector<rgw_bucket_shard_sync_info>;
   Vector::iterator i, end;
@@ -3543,7 +3553,8 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
     if (i == end) {
       return false;
     }
-    spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
+    sync_pipe.source_bs = bs;
+    spawn(new RGWReadBucketSyncStatusCoroutine(env, sync_pipe, &*i), false);
     ++i;
     ++bs.shard_id;
     return true;
index 0840da93b09d166df6d1c881c112efd7a1854ce3..6979399097df6cd40b6764effaa3e5150b8147ca 100644 (file)
 #include "rgw_sync_module.h"
 #include "rgw_sync_trace.h"
 
+struct rgw_bucket_sync_pipe {
+  rgw_bucket_shard source_bs;
+  RGWBucketInfo dest_bucket_info;
+  string source_prefix;
+  string dest_prefix;
+};
+
+inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
+  if (p.source_bs.bucket == p.dest_bucket_info.bucket &&
+      p.source_prefix == p.dest_prefix) {
+    return out << p.source_bs;
+  }
+
+  out << p.source_bs;
+
+  if (!p.source_prefix.empty()) {
+    out << "/" << p.source_prefix;
+  }
+
+  out << " -> " << p.dest_bucket_info.bucket;
+
+  if (!p.dest_prefix.empty()) {
+    out << "/" << p.dest_prefix;
+  }
+
+  return out;
+}
+
 struct rgw_datalog_info {
   uint32_t num_shards;
 
@@ -564,7 +592,7 @@ public:
   map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
   int init_sync_status();
 
-  static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
+  static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs);
   static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
 
   // implements DoutPrefixProvider
index 67093096efde82fcc0587d9067a053d251e21750..3012b79f904d2e6201b5cc9da9c1d60da09450e4 100644 (file)
@@ -3507,7 +3507,7 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
                req_info *info,
                const string& source_zone,
                rgw_obj& src_obj,
-               RGWBucketInfo& src_bucket_info,
+               const RGWBucketInfo *src_bucket_info,
                real_time *src_mtime,
                uint64_t *psize,
                const real_time *mod_ptr,
@@ -3532,12 +3532,12 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
 
   RGWRESTConn *conn;
   if (source_zone.empty()) {
-    if (src_bucket_info.zonegroup.empty()) {
+    if (!src_bucket_info || src_bucket_info->zonegroup.empty()) {
       /* source is in the master zonegroup */
       conn = svc.zone->get_master_conn();
     } else {
       auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
-      map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup);
+      map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup);
       if (iter == zonegroup_conn_map.end()) {
         ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl;
         return -ENOENT;
@@ -3622,8 +3622,8 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                const string& source_zone,
                const rgw_obj& dest_obj,
                const rgw_obj& src_obj,
-               RGWBucketInfo& dest_bucket_info,
-               RGWBucketInfo& src_bucket_info,
+               const RGWBucketInfo& dest_bucket_info,
+               const RGWBucketInfo *src_bucket_info,
                std::optional<rgw_placement_rule> dest_placement_rule,
                real_time *src_mtime,
                real_time *mtime,
@@ -3665,11 +3665,11 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   auto& zone_conn_map = svc.zone->get_zone_conn_map();
   auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
   if (source_zone.empty()) {
-    if (dest_bucket_info.zonegroup.empty()) {
+    if (!src_bucket_info || src_bucket_info->zonegroup.empty()) {
       /* source is in the master zonegroup */
       conn = svc.zone->get_master_conn();
     } else {
-      map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup);
+      map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup);
       if (iter == zonegroup_conn_map.end()) {
         ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl;
         return -ENOENT;
@@ -3988,7 +3988,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
 
   if (remote_src || !source_zone.empty()) {
     return fetch_remote_obj(obj_ctx, user_id, info, source_zone,
-               dest_obj, src_obj, dest_bucket_info, src_bucket_info,
+               dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
                dest_placement, src_mtime, mtime, mod_ptr,
                unmod_ptr, high_precision_time,
                if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
@@ -7014,7 +7014,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc
   return 0;
 }
 
-int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
                       uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
                       optional_yield y, rgw_zone_set *zones_trace, bool log_data_change)
 {
index 8213f20cacc8a8f1a3ec4784e20fbc715e056f4b..9eab78c83b5942e3b5eb2124f78bb4eb31b4b161 100644 (file)
@@ -1060,7 +1060,7 @@ public:
                req_info *info,
                const string& source_zone,
                rgw_obj& src_obj,
-               RGWBucketInfo& src_bucket_info,
+               const RGWBucketInfo *src_bucket_info,
                real_time *src_mtime,
                uint64_t *psize,
                const real_time *mod_ptr,
@@ -1080,8 +1080,8 @@ public:
                        const string& source_zone,
                        const rgw_obj& dest_obj,
                        const rgw_obj& src_obj,
-                       RGWBucketInfo& dest_bucket_info,
-                       RGWBucketInfo& src_bucket_info,
+                       const RGWBucketInfo& dest_bucket_info,
+                       const RGWBucketInfo *src_bucket_info,
                       std::optional<rgw_placement_rule> dest_placement,
                        ceph::real_time *src_mtime,
                        ceph::real_time *mtime,
@@ -1277,7 +1277,7 @@ public:
                     bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                     uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
   int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
-  int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+  int set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
               uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time,
               optional_yield y, rgw_zone_set *zones_trace = nullptr, bool log_data_change = false);
   int repair_olh(RGWObjState* state, const RGWBucketInfo& bucket_info,
index cfb5e3fc10372dc71ae99e9dffbdb9a9c11afec5..9ee2b4341f6a452867bab0814298192f7cf74928 100644 (file)
@@ -28,15 +28,15 @@ RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler()
 }
 
 RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                       RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
+                       rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
                                                           sync_env(_sync_env),
-                                                          bucket_info(_bucket_info), key(_key) {
+                                                          src_bucket(_src_bucket), key(_key) {
 }
 
 RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                                               RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
+                                               rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
                                                                                                  sync_env(_sync_env),
-                                                                                                 bucket_info(_bucket_info), key(_key) {
+                                                                                                 src_bucket(_src_bucket), key(_key) {
 }
 
 int RGWCallStatRemoteObjCR::operate() {
@@ -44,14 +44,14 @@ int RGWCallStatRemoteObjCR::operate() {
     yield {
       call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
                                   sync_env->source_zone,
-                                  bucket_info, key, &mtime, &size, &etag, &attrs, &headers));
+                                  src_bucket, key, &mtime, &size, &etag, &attrs, &headers));
     }
     if (retcode < 0) {
       ldout(sync_env->cct, 10) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
       return set_cr_error(retcode);
     }
     ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
-                             << " b=" << bucket_info.bucket << " k=" << key
+                             << " b=" << src_bucket << " k=" << key
                              << " size=" << size << " mtime=" << mtime << dendl;
     yield {
       RGWStatRemoteObjCBCR *cb = allocate_callback();
index ea731d6213486339d8402c825e36d3afb4c215d8..6f989d981e0272ed41faa1f2b6207e24b75f39ee 100644 (file)
@@ -12,6 +12,7 @@ class RGWRemoteDataLog;
 struct RGWDataSyncEnv;
 struct rgw_bucket_entry_owner;
 struct rgw_obj_key;
+struct rgw_bucket_sync_pipe;
 
 
 class RGWDataSyncModule {
@@ -28,10 +29,10 @@ public:
   virtual RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) {
     return nullptr;
   }
-  virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
-  virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
+  virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
                                       bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
-  virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
                                              rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
 };
 
@@ -140,7 +141,7 @@ class RGWStatRemoteObjCBCR : public RGWCoroutine {
 protected:
   RGWDataSyncEnv *sync_env;
 
-  RGWBucketInfo bucket_info;
+  rgw_bucket src_bucket;
   rgw_obj_key key;
 
   ceph::real_time mtime;
@@ -150,7 +151,7 @@ protected:
   map<string, string> headers;
 public:
   RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                       RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+                       rgw_bucket& _src_bucket, rgw_obj_key& _key);
   ~RGWStatRemoteObjCBCR() override {}
 
   void set_result(ceph::real_time& _mtime,
@@ -176,12 +177,12 @@ class RGWCallStatRemoteObjCR : public RGWCoroutine {
 protected:
   RGWDataSyncEnv *sync_env;
 
-  RGWBucketInfo bucket_info;
+  rgw_bucket src_bucket;
   rgw_obj_key key;
 
 public:
   RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                     RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+                     rgw_bucket& _src_bucket, rgw_obj_key& _key);
 
   ~RGWCallStatRemoteObjCR() override {}
 
index 33f41b3dce1b8bc1d063a4a06e332dd3e671ee5e..547cfeac4ae2976af898f231984bd71ce7aedee0 100644 (file)
@@ -1548,6 +1548,7 @@ int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result
 
 // maybe use Fetch Remote Obj instead?
 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
+  rgw_bucket_sync_pipe sync_pipe;
   AWSSyncInstanceEnv& instance;
 
   uint64_t versioned_epoch{0};
@@ -1576,10 +1577,11 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
 
 public:
   RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                            RGWBucketInfo& _bucket_info,
+                            rgw_bucket_sync_pipe& _sync_pipe,
                             rgw_obj_key& _key,
                             AWSSyncInstanceEnv& _instance,
-                            uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+                            uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+                                                         sync_pipe(_sync_pipe),
                                                          instance(_instance), versioned_epoch(_versioned_epoch)
   {}
 
@@ -1599,7 +1601,7 @@ public:
         }
       }
       ldout(sync_env->cct, 4) << "AWS: download begin: z=" << sync_env->source_zone
-                              << " b=" << bucket_info.bucket << " k=" << key << " size=" << size
+                              << " b=" << src_bucket << " k=" << key << " size=" << size
                               << " mtime=" << mtime << " etag=" << etag
                               << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
                               << dendl;
@@ -1610,8 +1612,8 @@ public:
         return set_cr_error(-EINVAL);
       }
 
-      instance.get_profile(bucket_info.bucket, &target);
-      instance.conf.get_target(target, bucket_info, key, &target_bucket_name, &target_obj_name);
+      instance.get_profile(sync_pipe.source_bs.bucket, &target);
+      instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
 
       if (bucket_created.find(target_bucket_name) == bucket_created.end()){
         yield {
@@ -1651,7 +1653,7 @@ public:
       }
 
       yield {
-        rgw_obj src_obj(bucket_info.bucket, key);
+        rgw_obj src_obj(src_bucket, key);
 
         /* init output */
         rgw_bucket target_bucket;
@@ -1695,43 +1697,45 @@ public:
 };
 
 class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  rgw_bucket_sync_pipe sync_pipe;
   AWSSyncInstanceEnv& instance;
   uint64_t versioned_epoch;
 public:
   RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                              RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                              AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                              rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+                              AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+                                                          sync_pipe(_sync_pipe),
                                                           instance(_instance), versioned_epoch(_versioned_epoch) {
   }
 
   ~RGWAWSHandleRemoteObjCR() {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-    return new RGWAWSHandleRemoteObjCBCR(sync_env, bucket_info, key, instance, versioned_epoch);
+    return new RGWAWSHandleRemoteObjCBCR(sync_env, sync_pipe, key, instance, versioned_epoch);
   }
 };
 
 class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env{nullptr};
   std::shared_ptr<AWSSyncConfig_Profile> target;
-  RGWBucketInfo bucket_info;
+  rgw_bucket_sync_pipe sync_pipe;
   rgw_obj_key key;
   ceph::real_time mtime;
   AWSSyncInstanceEnv& instance;
   int ret{0};
 public:
   RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
                           AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                        bucket_info(_bucket_info), key(_key),
+                                                        sync_pipe(_sync_pipe), key(_key),
                                                         mtime(_mtime), instance(_instance) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
-                              << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+                              << " b=" <<sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
       yield {
-        instance.get_profile(bucket_info.bucket, &target);
-        string path =  instance.conf.get_path(target, bucket_info, key);
+        instance.get_profile(sync_pipe.source_bs.bucket, &target);
+        string path =  instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
         ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
 
         call(new RGWDeleteRESTResourceCR(sync_env->cct, target->conn.get(),
@@ -1764,21 +1768,21 @@ public:
 
   ~RGWAWSDataSyncModule() {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
                             std::optional<uint64_t> versioned_epoch,
                             rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-    return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, instance, versioned_epoch.value_or(0));
+    ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+    return new RGWAWSHandleRemoteObjCR(sync_env, sync_pipe, key, instance, versioned_epoch.value_or(0));
   }
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
                               rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) <<"rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    return new RGWAWSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, instance);
+    ldout(sync_env->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+    return new RGWAWSRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, instance);
   }
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
                                      rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+    ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
     return NULL;
   }
index 83fc0e8cef36c020317d14cd2bb56329ce98f906..46433c3903a6a310435294d78542c00eafb0d5b9 100644 (file)
@@ -770,22 +770,24 @@ public:
 };
 
 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  rgw_bucket_sync_pipe sync_pipe;
   ElasticConfigRef conf;
   uint64_t versioned_epoch;
 public:
   RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+                          ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+                                                                               sync_pipe(_sync_pipe), conf(_conf),
                                                                                versioned_epoch(_versioned_epoch) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key
+                               << " b=" << sync_pipe.source_bs.bucket << " k=" << key
                                << " size=" << size << " mtime=" << mtime << dendl;
 
       yield {
-        string path = conf->get_obj_path(bucket_info, key);
-        es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+        string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
+        es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
 
         call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
                                                             sync_env->http_manager,
@@ -804,40 +806,42 @@ public:
 };
 
 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  rgw_bucket_sync_pipe sync_pipe;
   ElasticConfigRef conf;
   uint64_t versioned_epoch;
 public:
   RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                        ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                        rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+                        ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+                                                           sync_pipe(_sync_pipe),
                                                            conf(_conf), versioned_epoch(_versioned_epoch) {
   }
 
   ~RGWElasticHandleRemoteObjCR() override {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-    return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+    return new RGWElasticHandleRemoteObjCBCR(sync_env, sync_pipe, key, conf, versioned_epoch);
   }
 };
 
 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
-  RGWBucketInfo bucket_info;
+  rgw_bucket_sync_pipe sync_pipe;
   rgw_obj_key key;
   ceph::real_time mtime;
   ElasticConfigRef conf;
 public:
   RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
                           ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                        bucket_info(_bucket_info), key(_key),
+                                                        sync_pipe(_sync_pipe), key(_key),
                                                         mtime(_mtime), conf(_conf) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+                               << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
       yield {
-        string path = conf->get_obj_path(bucket_info, key);
+        string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
 
         call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
                                          sync_env->http_manager,
@@ -876,26 +880,26 @@ public:
     return new RGWElasticGetESInfoCBCR(sync_env, conf);
   }
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-    if (!conf->should_handle_operation(bucket_info)) {
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+    if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
       ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
       return nullptr;
     }
-    return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
+    return new RGWElasticHandleRemoteObjCR(sync_env, sync_pipe, key, conf, versioned_epoch.value_or(0));
   }
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     /* versioned and versioned epoch params are useless in the elasticsearch backend case */
-    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    if (!conf->should_handle_operation(bucket_info)) {
+    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+    if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
       ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
       return nullptr;
     }
-    return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+    return new RGWElasticRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, conf);
   }
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
     ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
     return NULL;
index af9eb2da14803eb7f19eb46474c293ee610eab01..981e820910ff2ff941d0fc27b2f910d3aa981fbc 100644 (file)
 class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR {
 public:
   RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key) {}
+                          rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _src_bucket, _key) {}
   int operate() override {
     ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone
-                            << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+                            << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime
                             << " attrs=" << attrs << dendl;
     return set_cr_done();
   }
@@ -26,13 +26,13 @@ public:
 class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR {
 public:
   RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key) {
+                        rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _src_bucket, _key) {
   }
 
   ~RGWLogStatRemoteObjCR() override {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-    return new RGWLogStatRemoteObjCBCR(sync_env, bucket_info, key);
+    return new RGWLogStatRemoteObjCBCR(sync_env, src_bucket, key);
   }
 };
 
@@ -41,17 +41,17 @@ class RGWLogDataSyncModule : public RGWDataSyncModule {
 public:
   explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-    return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key);
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+    return new RGWLogStatRemoteObjCR(sync_env, sync_pipe.source_bs.bucket, key);
   }
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
     return NULL;
   }
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+    ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
     return NULL;
   }
index abdf8b1f545a7a6262d7d97eb92b6bf39996e939..af797aa4a107fd620c5f4b5946bd7f14ffc19d8d 100644 (file)
@@ -1309,6 +1309,7 @@ public:
 // coroutine invoked on remote object creation
 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   RGWDataSyncEnv *sync_env;
+  rgw_bucket_sync_pipe sync_pipe;
   PSEnvRef env;
   std::optional<uint64_t> versioned_epoch;
   EventRef<rgw_pubsub_event> event;
@@ -1316,10 +1317,11 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
                           PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
-                          TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+                          TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
                                                                       sync_env(_sync_env),
+                                                                      sync_pipe(_sync_pipe),
                                                                       env(_env),
                                                                       versioned_epoch(_versioned_epoch),
                                                                       topics(_topics) {
@@ -1327,7 +1329,7 @@ public:
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+                               << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
                                << " attrs=" << attrs << dendl;
       {
         std::vector<std::pair<std::string, std::string> > attrs;
@@ -1342,16 +1344,17 @@ public:
         // this is why both are created here, once we have information about the 
         // subscription, we will store/push only the relevant ones
         make_event_ref(sync_env->cct,
-                       bucket_info.bucket, key,
+                       sync_pipe.source_bs.bucket, key,
                        mtime, &attrs,
                        rgw::notify::ObjectCreated, &event);
         make_s3_record_ref(sync_env->cct,
-                       bucket_info.bucket, bucket_info.owner, key,
+                       sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
                        mtime, &attrs,
                        rgw::notify::ObjectCreated, &record);
       }
 
-      yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
+#warning should it be source owner?
+      yield call(new RGWPSHandleObjEventCR(sync_env, env, sync_pipe.dest_bucket_info.owner, event, record, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -1362,14 +1365,16 @@ public:
 };
 
 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  rgw_bucket_sync_pipe sync_pipe;
   PSEnvRef env;
   std::optional<uint64_t> versioned_epoch;
   TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                        rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
                         PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
-                        TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                        TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+                                                           sync_pipe(_sync_pipe),
                                                            env(_env), versioned_epoch(_versioned_epoch),
                                                            topics(_topics) {
   }
@@ -1377,24 +1382,24 @@ public:
   ~RGWPSHandleRemoteObjCR() override {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
+    return new RGWPSHandleRemoteObjCBCR(sync_env, sync_pipe, key, env, versioned_epoch, topics);
   }
 };
 
 class RGWPSHandleObjCreateCR : public RGWCoroutine {
   
   RGWDataSyncEnv *sync_env;
-  RGWBucketInfo bucket_info;
+  rgw_bucket_sync_pipe sync_pipe;
   rgw_obj_key key;
   PSEnvRef env;
   std::optional<uint64_t> versioned_epoch;
   TopicsRef topics;
 public:
   RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
-                       RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                       rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
                        PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sync_env->cct),
                                                                    sync_env(_sync_env),
-                                                                   bucket_info(_bucket_info),
+                                                                   sync_pipe(_sync_pipe),
                                                                    key(_key),
                                                                    env(_env),
                                                                    versioned_epoch(_versioned_epoch) {
@@ -1404,8 +1409,8 @@ public:
 
   int operate() override {
     reenter(this) {
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
-                                             bucket_info.bucket, key,
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, sync_pipe.dest_bucket_info.owner,
+                                             sync_pipe.source_bs.bucket, key,
                                              rgw::notify::ObjectCreated,
                                              &topics));
       if (retcode < 0) {
@@ -1413,10 +1418,10 @@ public:
         return set_cr_error(retcode);
       }
       if (topics->empty()) {
-        ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+        ldout(sync_env->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl;
         return set_cr_done();
       }
-      yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
+      yield call(new RGWPSHandleRemoteObjCR(sync_env, sync_pipe, key, env, versioned_epoch, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -1441,12 +1446,12 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
 public:
   RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
                            PSEnvRef _env,
-                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                           rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
                            rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
                                                              sync_env(_sync_env),
                                                              env(_env),
-                                                             owner(_bucket_info.owner),
-                                                             bucket(_bucket_info.bucket),
+                                                             owner(_sync_pipe.dest_bucket_info.owner),
+                                                             bucket(_sync_pipe.dest_bucket_info.bucket),
                                                              key(_key),
                                                              mtime(_mtime), event_type(_event_type) {}
   int operate() override {
@@ -1505,25 +1510,25 @@ public:
     return new RGWPSInitEnvCBCR(sync_env, env);
   }
 
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info
+  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe
       rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << 
+    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << 
           " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-    return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
+    return new RGWPSHandleObjCreateCR(sync_env, sync_pipe, key, env, versioned_epoch);
   }
 
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info
+  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe
       rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << 
+    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << 
           " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
+    return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
   }
 
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info
+  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe
       rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << 
+    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << 
           " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
+    return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
   }
 
   PSConfigRef& get_conf() { return conf; }
index 54c20fff7c8fa405366a6c5094ab5d24d44c8b35..3371c6e1b9343426fca9a0f87739c2d4222499fc 100644 (file)
@@ -56,9 +56,9 @@ int RGWSI_DataLog_RADOS::get_info(int shard_id, RGWDataChangesLogInfo *info)
   return log->get_info(shard_id, info);
 }
 
-int RGWSI_DataLog_RADOS::add_entry(const rgw_bucket& bucket, int shard_id)
+int RGWSI_DataLog_RADOS::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
 {
-  return log->add_entry(bucket, shard_id);
+  return log->add_entry(bucket_info, shard_id);
 }
 
 int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
index 19a6c5edb56ad58b19b24f91216ca60848ba84ba..5317035c09fef35c0a710d28cee1c8f3b1d0e7f3 100644 (file)
@@ -58,7 +58,7 @@ public:
 
   int get_info(int shard_id, RGWDataChangesLogInfo *info);
 
-  int add_entry(const rgw_bucket& bucket, int shard_id);
+  int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
   int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
                   list<rgw_data_change_log_entry>& entries,
                   const string& marker,