}
return shard_id < b.shard_id;
}
+
+ bool operator==(const rgw_bucket_shard& b) const {
+ return (bucket == b.bucket &&
+ shard_id == b.shard_id);
+ }
};
inline ostream& operator<<(ostream& out, const rgw_bucket_shard& bs) {
}
int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
- const rgw_bucket& bucket, int shard_id,
+ const rgw_bucket& source_bucket, int shard_id,
+ const rgw_bucket& dest_bucket,
RGWSyncErrorLogger *_error_logger,
RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module)
{
conn = _conn;
source_zone = _source_zone;
- bs.bucket = bucket;
- bs.shard_id = shard_id;
+ sync_pair.source_bs.bucket = source_bucket;
+ sync_pair.source_bs.shard_id = shard_id;
+ sync_pair.dest_bs.bucket = dest_bucket;
+ if (dest_bucket == source_bucket) {
+ sync_pair.dest_bs.shard_id = shard_id;
+ }
sync_env.init(dpp, store->ctx(), store, store->svc(), async_rados, http_manager,
_error_logger, _sync_tracer, _sync_module, nullptr);
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
- const rgw_bucket_sync_pipe& sync_pipe;
+ const rgw_bucket_sync_pair_info& sync_pair;
const string sync_status_oid;
rgw_bucket_shard_sync_info& status;
rgw_bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
- const rgw_bucket_sync_pipe& _sync_pipe,
+ const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- sync_pipe(_sync_pipe),
- sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pipe.info)),
+ sync_pair(_sync_pair),
+ sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)),
status(_status)
{}
int operate() override {
reenter(this) {
/* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pipe.info.source_bs, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
{
-#warning FIXME
- rgw_bucket_sync_pipe sync_pipe;
- sync_pipe.source_bs = bs;
- sync_pipe.dest_bs = bs;
- return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pipe, init_status);
+ return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pair, init_status);
}
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
- rgw_bucket_sync_pair_info sync_pair;
- sync_pair.source_bs = bs;
-#warning FIXME
- sync_pair.dest_bs = bs;
return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pair, sync_status);
}
http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
source_zone(_source_zone),
conn(NULL), error_logger(NULL),
- bucket(bucket),
+ dest_bucket(_dest_bucket),
num_shards(0)
{
}
sync_pair.source_bs.bucket = piter->bucket;
sync_pair.dest_bs.bucket = bucket_info.bucket;
- yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn));
+ yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
return set_cr_error(retcode);
}
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, &pbucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
}
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
return set_cr_done();
}
+
+ return 0;
}
int RGWRunBucketSyncCoroutine::operate()
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.info.source_bs.bucket}));
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pipe, sync_status));
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
{
- return new RGWRunBucketSyncCoroutine(&sc, bs, sync_env.sync_tracer->root_node);
+ return new RGWRunBucketSyncCoroutine(&sc, sync_pair, sync_env.sync_tracer->root_node);
}
int RGWBucketPipeSyncStatusManager::init()
}
#warning read specific bucket sources
+ rgw_bucket source_bucket = dest_bucket;
- const string key = bucket.get_key();
+ const string key = source_bucket.get_key();
rgw_http_param_pair pairs[] = { { "key", key.c_str() },
{ NULL, NULL } };
int effective_num_shards = (num_shards ? num_shards : 1);
- auto async_rados = store->svc.rados->get_async_processor();
+ auto async_rados = store->svc()->rados->get_async_processor();
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager);
- ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->getRados()->get_sync_tracer(), sync_module);
+ ret = l->init(source_zone, conn, source_bucket, (num_shards ? i : -1), dest_bucket, error_logger, store->getRados()->get_sync_tracer(), sync_module);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
int ret = cr_mgr.run(stacks);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
- << bucket_str{bucket} << dendl;
+ << bucket_str{dest_bucket} << dendl;
return ret;
}
int ret = cr_mgr.run(stacks);
if (ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
- << bucket_str{bucket} << dendl;
+ << bucket_str{dest_bucket} << dendl;
return ret;
}
{
auto zone = std::string_view{source_zone};
return out << "bucket sync zone:" << zone.substr(0, 8)
- << " bucket:" << bucket.name << ' ';
+ << " bucket:" << dest_bucket << ' ';
}
string RGWBucketPipeSyncStatusManager::status_oid(const string& source_zone,
const int num_shards;
rgw_bucket_shard bs;
#warning change this
- rgw_bucket_sync_pipe sync_pipe;
+ rgw_bucket_sync_pair_info sync_pair;
using Vector = std::vector<rgw_bucket_shard_sync_info>;
Vector::iterator i, end;
if (i == end) {
return false;
}
- sync_pipe.source_bs = bs;
- spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pipe, &*i), false);
+ sync_pair.source_bs = bs;
+ spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
++i;
++bs.shard_id;
return true;
string dest_prefix;
};
-struct rgw_bucket_sync_pipe {
- rgw_bucket_sync_pair_info info;
- RGWBucketInfo source_bucket_info;
- RGWBucketInfo dest_bucket_info;
-};
-
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
if (p.source_bs.bucket == p.dest_bs.bucket &&
p.source_prefix == p.dest_prefix) {
return out;
}
+struct rgw_bucket_sync_pipe {
+ rgw_bucket_sync_pair_info info;
+ RGWBucketInfo source_bucket_info;
+ RGWBucketInfo dest_bucket_info;
+};
+
+inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
+ return out << p.info;
+}
+
struct rgw_datalog_info {
uint32_t num_shards;
rgw::sal::RGWRadosStore *store;
RGWRESTConn *conn{nullptr};
string source_zone;
- rgw_bucket_shard bs;
+
+ rgw_bucket_sync_pair_info sync_pair;
RGWAsyncRadosProcessor *async_rados;
RGWHTTPManager *http_manager;
RGWHTTPManager *_http_manager);
int init(const string& _source_zone, RGWRESTConn *_conn,
- const rgw_bucket& bucket, int shard_id,
+ const rgw_bucket& source_bucket, int shard_id,
+ const rgw_bucket& dest_bucket,
RGWSyncErrorLogger *_error_logger,
RGWSyncTraceManager *_sync_tracer,
RGWSyncModuleInstanceRef& _sync_module);
RGWSyncErrorLogger *error_logger;
RGWSyncModuleInstanceRef sync_module;
- rgw_bucket bucket;
+ rgw_bucket dest_bucket;
map<int, RGWRemoteBucketLog *> source_logs;
public:
RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
- const rgw_bucket& bucket);
+ const rgw_bucket& dest_bucket);
~RGWBucketPipeSyncStatusManager();
int init();
rgw_bucket_sync_pipe& _sync_pipe,
rgw_obj_key& _key,
AWSSyncInstanceEnv& _instance,
- uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch)
{}
return set_cr_error(-EINVAL);
}
- instance.get_profile(sync_pipe.source_bs.bucket, &target);
+ instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
public:
RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch) {
}
int operate() override {
reenter(this) {
ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone
- << " b=" <<sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- instance.get_profile(sync_pipe.source_bs.bucket, &target);
+ instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
ldout(sc->cct, 0) << "AWS: removing aws object at" << path << dendl;
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
std::optional<uint64_t> versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
public:
RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe), conf(_conf),
versioned_epoch(_versioned_epoch) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone
- << " b=" << sync_pipe.source_bs.bucket << " k=" << key
+ << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
<< " size=" << size << " mtime=" << mtime << dendl;
yield {
public:
RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
conf(_conf), versioned_epoch(_versioned_epoch) {
}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone
- << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
}
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;
explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- return new RGWLogStatRemoteObjCR(sc, sync_pipe.source_bs.bucket, key);
+ ldout(sc->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ return new RGWLogStatRemoteObjCR(sc, sync_pipe.info.source_bs.bucket, key);
}
RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sc->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sc->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sc(_sc),
sync_pipe(_sync_pipe),
env(_env),
int operate() override {
reenter(this) {
ldout(sc->cct, 20) << ": stat of remote obj: z=" << sc->source_zone
- << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
{
std::vector<std::pair<std::string, std::string> > attrs;
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
- sync_pipe.source_bs.bucket, key,
+ sync_pipe.info.source_bs.bucket, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &event);
make_s3_record_ref(sc->cct,
- sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
+ sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &record);
}
RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.source_bs.bucket, _key),
+ TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
sync_pipe(_sync_pipe),
env(_env), versioned_epoch(_versioned_epoch),
topics(_topics) {
int operate() override {
reenter(this) {
yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner,
- sync_pipe.source_bs.bucket, key,
+ sync_pipe.info.source_bs.bucket, key,
rgw::notify::ObjectCreated,
&topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
if (topics->empty()) {
- ldout(sc->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl;
+ ldout(sc->cct, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl;
return set_cr_done();
}
yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics));