out << indented{width, "zone"} << zone.id << " (" << zone.name << ")\n";
out << indented{width, "bucket"} << info.bucket << "\n\n";
- if (!info.bucket_datasync_enabled()) {
+ if (!info.bucket_datasync_enabled(store->svc.zone)) {
out << "Sync is disabled for bucket " << info.bucket.name << '\n';
return 0;
}
}
for (int i = 0; i < shards_num; ++i, ++shard_id) {
- r = store->svc()->datalog_rados->add_entry(bucket_info.bucket, shard_id);
+ r = store->svc()->datalog_rados->add_entry(bucket_info, shard_id);
if (r < 0) {
set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
return r;
snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
map<string, bufferlist> attrs;
- rgw_obj src_obj(bucket_info.bucket, key);
+ rgw_obj src_obj(src_bucket, key);
- rgw_obj dest_obj(bucket_info.bucket, dest_key.value_or(key));
+ rgw_obj dest_obj(dest_bucket_info.bucket, dest_key.value_or(key));
std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
source_zone,
dest_obj,
src_obj,
- bucket_info, /* dest */
- bucket_info, /* source */
+ dest_bucket_info, /* dest */
+ nullptr, /* source */
dest_placement_rule,
NULL, /* real_time* src_mtime, */
NULL, /* real_time* mtime, */
char buf[16];
snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
- rgw_obj src_obj(bucket_info.bucket, key);
-
- rgw_obj dest_obj(src_obj);
+ rgw_obj src_obj(src_bucket, key);
int r = store->getRados()->stat_remote_obj(obj_ctx,
rgw_user(user_id),
nullptr, /* req_info */
source_zone,
src_obj,
- bucket_info, /* source */
+ nullptr, /* source */
pmtime, /* real_time* src_mtime, */
psize, /* uint64_t * */
nullptr, /* const real_time* mod_ptr, */
rgw::sal::RGWRadosStore *store;
string source_zone;
- RGWBucketInfo bucket_info;
+ rgw_bucket src_bucket;
std::optional<rgw_placement_rule> dest_placement_rule;
+ RGWBucketInfo dest_bucket_info;
rgw_obj_key key;
std::optional<rgw_obj_key> dest_key;
public:
RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
- RGWBucketInfo& _bucket_info,
+ const rgw_bucket& _src_bucket,
std::optional<rgw_placement_rule> _dest_placement_rule,
+ const RGWBucketInfo& _dest_bucket_info,
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
- bucket_info(_bucket_info),
+ src_bucket(_src_bucket),
dest_placement_rule(_dest_placement_rule),
+ dest_bucket_info(_dest_bucket_info),
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
rgw::sal::RGWRadosStore *store;
string source_zone;
- RGWBucketInfo bucket_info;
+ rgw_bucket src_bucket;
std::optional<rgw_placement_rule> dest_placement_rule;
+ RGWBucketInfo dest_bucket_info;
rgw_obj_key key;
std::optional<rgw_obj_key> dest_key;
public:
RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
- RGWBucketInfo& _bucket_info,
+ const rgw_bucket& _src_bucket,
std::optional<rgw_placement_rule> _dest_placement_rule,
+ const RGWBucketInfo& _dest_bucket_info,
const rgw_obj_key& _key,
const std::optional<rgw_obj_key>& _dest_key,
std::optional<uint64_t> _versioned_epoch,
: RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
- bucket_info(_bucket_info),
+ src_bucket(_src_bucket),
dest_placement_rule(_dest_placement_rule),
+ dest_bucket_info(_dest_bucket_info),
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
int send_request() override {
req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
- source_zone, bucket_info, dest_placement_rule,
+ source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
key, dest_key, versioned_epoch, copy_if_newer,
zones_trace, counters, dpp);
async_rados->queue(req);
rgw::sal::RGWRadosStore *store;
string source_zone;
- RGWBucketInfo bucket_info;
-
+ rgw_bucket src_bucket;
rgw_obj_key key;
ceph::real_time *pmtime;
public:
RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
- RGWBucketInfo& _bucket_info,
+ rgw_bucket& _src_bucket,
const rgw_obj_key& _key,
ceph::real_time *_pmtime,
uint64_t *_psize,
map<string, bufferlist> *_pattrs,
map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
- bucket_info(_bucket_info),
+ src_bucket(_src_bucket),
key(_key),
pmtime(_pmtime),
psize(_psize),
rgw::sal::RGWRadosStore *store;
string source_zone;
- RGWBucketInfo bucket_info;
-
+ rgw_bucket src_bucket;
rgw_obj_key key;
ceph::real_time *pmtime;
public:
RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
const string& _source_zone,
- RGWBucketInfo& _bucket_info,
+ rgw_bucket& _src_bucket,
const rgw_obj_key& _key,
ceph::real_time *_pmtime,
uint64_t *_psize,
map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
- bucket_info(_bucket_info),
+ src_bucket(_src_bucket),
key(_key),
pmtime(_pmtime),
psize(_psize),
int send_request() override {
req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
- bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
+ src_bucket, key, pmtime, psize, petag, pattrs, pheaders);
async_rados->queue(req);
return 0;
}
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- rgw_bucket_shard bs;
- RGWBucketInfo bucket_info;
+ rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
public:
RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
SSTR(bucket_shard_str{bs}))) {
+ sync_pipe.source_bs = bs;
}
~RGWRunBucketSyncCoroutine() override {
if (lease_cr) {
public:
RGWDefaultDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
};
return 0;
}
-RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
- std::nullopt,
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, sync_pipe.source_bs.bucket,
+ std::nullopt, sync_pipe.dest_bucket_info,
key, std::nullopt, versioned_epoch,
true, zones_trace, sync_env->counters, sync_env->dpp);
}
-RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
- bucket_info, key, versioned, versioned_epoch,
+ sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
NULL, NULL, false, &mtime, zones_trace);
}
-RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
- bucket_info, key, versioned, versioned_epoch,
+ sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
&owner.id, &owner.display_name, true, &mtime, zones_trace);
}
public:
RGWArchiveDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
};
return 0;
}
-RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
+RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
- ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- if (!bucket_info.versioned() ||
- (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
+ ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ if (!sync_pipe.dest_bucket_info.versioned() ||
+ (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
- bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
- int op_ret = sync_env->store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
+ sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
+ int op_ret = sync_env->store->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL);
if (op_ret < 0) {
ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
return NULL;
}
return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
- bucket_info, std::nullopt,
+ sync_pipe.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
key, dest_key, versioned_epoch,
true, zones_trace, nullptr, sync_env->dpp);
}
-RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
- ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
-RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
{
- ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
- bucket_info, key, versioned, versioned_epoch,
+ sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
&owner.id, &owner.display_name, true, &mtime, zones_trace);
}
class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- rgw_bucket_shard bs;
+ const rgw_bucket_sync_pipe& sync_pipe;
const string sync_status_oid;
rgw_bucket_shard_sync_info& status;
rgw_bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- const rgw_bucket_shard& bs,
+ const rgw_bucket_sync_pipe& _sync_pipe,
rgw_bucket_shard_sync_info& _status)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ sync_pipe(_sync_pipe),
+ sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _sync_pipe)),
status(_status)
{}
int operate() override {
reenter(this) {
/* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, sync_pipe.source_bs, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
{
- return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
+#warning FIXME
+ rgw_bucket_sync_pipe sync_pipe;
+ sync_pipe.source_bs = bs;
+ return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, sync_pipe, init_status);
}
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
map<string, bufferlist> attrs;
public:
RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- const rgw_bucket_shard& bs,
+ const rgw_bucket_sync_pipe& sync_pipe,
rgw_bucket_shard_sync_info *_status)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+ oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, sync_pipe)),
status(_status) {}
int operate() override;
};
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
- return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
+#warning FIXME
+ rgw_bucket_sync_pipe sync_pipe;
+ sync_pipe.source_bs = bs;
+ return new RGWReadBucketSyncStatusCoroutine(&sync_env, sync_pipe, sync_status);
}
RGWBucketSyncStatusManager::RGWBucketSyncStatusManager(rgw::sal::RGWRadosStore *_store, const string& _source_zone,
class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- RGWBucketInfo *bucket_info;
- const rgw_bucket_shard& bs;
+ rgw_bucket_sync_pipe& sync_pipe;
+ rgw_bucket_shard& bs;
rgw_obj_key key;
bool versioned;
RGWSyncTraceNodeRef tn;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo *_bucket_info,
- const rgw_bucket_shard& bs,
+ rgw_bucket_sync_pipe& _sync_pipe,
const rgw_obj_key& _key, bool _versioned,
std::optional<uint64_t> _versioned_epoch,
real_time& _timestamp,
const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info), bs(bs),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
owner(_owner),
timestamp(_timestamp), op(_op),
} else if (op == CLS_RGW_OP_ADD ||
op == CLS_RGW_OP_LINK_OLH) {
set_status("syncing obj");
- tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
+ tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+ call(data_sync_module->sync_object(sync_env, sync_pipe, key, versioned_epoch, &zones_trace));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
set_status("removing obj");
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
- tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
+ tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+ call(data_sync_module->remove_object(sync_env, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
// our copy of the object is more recent, continue as if it succeeded
if (retcode == -ERR_PRECONDITION_FAILED) {
retcode = 0;
}
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
set_status("creating delete marker");
- tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
- call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
+ tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
+ call(data_sync_module->create_delete_marker(sync_env, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
}
- tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
+ tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key));
}
} while (marker_tracker->need_retry(key));
{
class RGWBucketShardFullSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- const rgw_bucket_shard& bs;
- RGWBucketInfo *bucket_info;
+ rgw_bucket_sync_pipe& sync_pipe;
+ rgw_bucket_shard& bs;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
RGWSyncTraceNodeRef tn;
public:
- RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
- RGWBucketInfo *_bucket_info,
+ RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
+ rgw_bucket_sync_pipe& _sync_pipe,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef tn_parent)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+ lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sync_env, status_oid, sync_info.full_marker),
status_oid(status_oid),
tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
} else {
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
- yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
+ yield spawn(new SyncCR(sync_env, sync_pipe, entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- const rgw_bucket_shard& bs;
- RGWBucketInfo *bucket_info;
+ rgw_bucket_sync_pipe& sync_pipe;
+ rgw_bucket_shard& bs;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
list<rgw_bi_log_entry> list_result;
list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
RGWSyncTraceNodeRef tn;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
- const rgw_bucket_shard& bs,
- RGWBucketInfo *_bucket_info,
+ rgw_bucket_sync_pipe& _sync_pipe,
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
- bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
+ lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sync_env, status_oid, sync_info.inc_marker),
status_oid(status_oid), zone_id(_sync_env->store->svc()->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
}
tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
- spawn(new SyncCR(sync_env, bucket_info, bs, key,
+ spawn(new SyncCR(sync_env, sync_pipe, key,
entry->is_versioned(), versioned_epoch,
entry->timestamp, owner, entry->op, entry->state,
cur_id, &marker_tracker, entry->zones_trace, tn),
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
+ yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, sync_pipe, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
- string raw_key = string("bucket.instance:") + bs.bucket.get_key();
+ string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc()->zone->get_master_conn(), sync_env->async_rados,
sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
tn));
}
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
+ tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{sync_pipe.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, sync_pipe.source_bs.bucket, &sync_pipe.dest_bucket_info));
}
if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
+ tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pipe.source_bs.bucket}));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, sync_pipe, sync_status));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
}
if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+ yield call(new RGWBucketShardFullSyncCR(sync_env, sync_pipe,
status_oid, lease_cr.get(),
sync_status, tn));
if (retcode < 0) {
}
if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+ yield call(new RGWBucketShardIncrementalSyncCR(sync_env, sync_pipe,
status_oid, lease_cr.get(),
sync_status, tn));
if (retcode < 0) {
}
string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
- const rgw_bucket_shard& bs)
+ const rgw_bucket_sync_pipe& sync_pipe)
{
- return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
+ return bucket_status_oid_prefix + "." + source_zone + ":" + sync_pipe.source_bs.get_key();
}
string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
const rgw_obj& obj)
{
+#warning FIXME
return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
obj.key.name + ":" + obj.key.instance;
}
RGWDataSyncEnv *const env;
const int num_shards;
rgw_bucket_shard bs;
+#warning change this
+ rgw_bucket_sync_pipe sync_pipe;
using Vector = std::vector<rgw_bucket_shard_sync_info>;
Vector::iterator i, end;
if (i == end) {
return false;
}
- spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
+ sync_pipe.source_bs = bs;
+ spawn(new RGWReadBucketSyncStatusCoroutine(env, sync_pipe, &*i), false);
++i;
++bs.shard_id;
return true;
#include "rgw_sync_module.h"
#include "rgw_sync_trace.h"
+struct rgw_bucket_sync_pipe {
+ rgw_bucket_shard source_bs;
+ RGWBucketInfo dest_bucket_info;
+ string source_prefix;
+ string dest_prefix;
+};
+
+inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
+ if (p.source_bs.bucket == p.dest_bucket_info.bucket &&
+ p.source_prefix == p.dest_prefix) {
+ return out << p.source_bs;
+ }
+
+ out << p.source_bs;
+
+ if (!p.source_prefix.empty()) {
+ out << "/" << p.source_prefix;
+ }
+
+ out << " -> " << p.dest_bucket_info.bucket;
+
+ if (!p.dest_prefix.empty()) {
+ out << "/" << p.dest_prefix;
+ }
+
+ return out;
+}
+
struct rgw_datalog_info {
uint32_t num_shards;
map<int, rgw_bucket_shard_sync_info>& get_sync_status() { return sync_status; }
int init_sync_status();
- static string status_oid(const string& source_zone, const rgw_bucket_shard& bs);
+ static string status_oid(const string& source_zone, const rgw_bucket_sync_pipe& bs);
static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */
// implements DoutPrefixProvider
req_info *info,
const string& source_zone,
rgw_obj& src_obj,
- RGWBucketInfo& src_bucket_info,
+ const RGWBucketInfo *src_bucket_info,
real_time *src_mtime,
uint64_t *psize,
const real_time *mod_ptr,
RGWRESTConn *conn;
if (source_zone.empty()) {
- if (src_bucket_info.zonegroup.empty()) {
+ if (!src_bucket_info || src_bucket_info->zonegroup.empty()) {
/* source is in the master zonegroup */
conn = svc.zone->get_master_conn();
} else {
auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
- map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup);
+ map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup);
if (iter == zonegroup_conn_map.end()) {
ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl;
return -ENOENT;
const string& source_zone,
const rgw_obj& dest_obj,
const rgw_obj& src_obj,
- RGWBucketInfo& dest_bucket_info,
- RGWBucketInfo& src_bucket_info,
+ const RGWBucketInfo& dest_bucket_info,
+ const RGWBucketInfo *src_bucket_info,
std::optional<rgw_placement_rule> dest_placement_rule,
real_time *src_mtime,
real_time *mtime,
auto& zone_conn_map = svc.zone->get_zone_conn_map();
auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
if (source_zone.empty()) {
- if (dest_bucket_info.zonegroup.empty()) {
+ if (!src_bucket_info || src_bucket_info->zonegroup.empty()) {
/* source is in the master zonegroup */
conn = svc.zone->get_master_conn();
} else {
- map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info.zonegroup);
+ map<string, RGWRESTConn *>::iterator iter = zonegroup_conn_map.find(src_bucket_info->zonegroup);
if (iter == zonegroup_conn_map.end()) {
ldout(cct, 0) << "could not find zonegroup connection to zonegroup: " << source_zone << dendl;
return -ENOENT;
if (remote_src || !source_zone.empty()) {
return fetch_remote_obj(obj_ctx, user_id, info, source_zone,
- dest_obj, src_obj, dest_bucket_info, src_bucket_info,
+ dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
dest_placement, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
return 0;
}
-int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
optional_yield y, rgw_zone_set *zones_trace, bool log_data_change)
{
req_info *info,
const string& source_zone,
rgw_obj& src_obj,
- RGWBucketInfo& src_bucket_info,
+ const RGWBucketInfo *src_bucket_info,
real_time *src_mtime,
uint64_t *psize,
const real_time *mod_ptr,
const string& source_zone,
const rgw_obj& dest_obj,
const rgw_obj& src_obj,
- RGWBucketInfo& dest_bucket_info,
- RGWBucketInfo& src_bucket_info,
+ const RGWBucketInfo& dest_bucket_info,
+ const RGWBucketInfo *src_bucket_info,
std::optional<rgw_placement_rule> dest_placement,
ceph::real_time *src_mtime,
ceph::real_time *mtime,
bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
- int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+ int set_olh(RGWObjectCtx& obj_ctx, const RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time,
optional_yield y, rgw_zone_set *zones_trace = nullptr, bool log_data_change = false);
int repair_olh(RGWObjState* state, const RGWBucketInfo& bucket_info,
}
RGWStatRemoteObjCBCR::RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
+ rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key) {
+ src_bucket(_src_bucket), key(_key) {
}
RGWCallStatRemoteObjCR::RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
+ rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key) {
+ src_bucket(_src_bucket), key(_key) {
}
int RGWCallStatRemoteObjCR::operate() {
yield {
call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
sync_env->source_zone,
- bucket_info, key, &mtime, &size, &etag, &attrs, &headers));
+ src_bucket, key, &mtime, &size, &etag, &attrs, &headers));
}
if (retcode < 0) {
ldout(sync_env->cct, 10) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
return set_cr_error(retcode);
}
ldout(sync_env->cct, 20) << "stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key
+ << " b=" << src_bucket << " k=" << key
<< " size=" << size << " mtime=" << mtime << dendl;
yield {
RGWStatRemoteObjCBCR *cb = allocate_callback();
struct RGWDataSyncEnv;
struct rgw_bucket_entry_owner;
struct rgw_obj_key;
+struct rgw_bucket_sync_pipe;
class RGWDataSyncModule {
virtual RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) {
return nullptr;
}
- virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
- virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ virtual RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) = 0;
+ virtual RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
- virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ virtual RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& bucket_info, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) = 0;
};
protected:
RGWDataSyncEnv *sync_env;
- RGWBucketInfo bucket_info;
+ rgw_bucket src_bucket;
rgw_obj_key key;
ceph::real_time mtime;
map<string, string> headers;
public:
RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+ rgw_bucket& _src_bucket, rgw_obj_key& _key);
~RGWStatRemoteObjCBCR() override {}
void set_result(ceph::real_time& _mtime,
protected:
RGWDataSyncEnv *sync_env;
- RGWBucketInfo bucket_info;
+ rgw_bucket src_bucket;
rgw_obj_key key;
public:
RGWCallStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key);
+ rgw_bucket& _src_bucket, rgw_obj_key& _key);
~RGWCallStatRemoteObjCR() override {}
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
+ rgw_bucket_sync_pipe sync_pipe;
AWSSyncInstanceEnv& instance;
uint64_t versioned_epoch{0};
public:
RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info,
+ rgw_bucket_sync_pipe& _sync_pipe,
rgw_obj_key& _key,
AWSSyncInstanceEnv& _instance,
- uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch)
{}
}
}
ldout(sync_env->cct, 4) << "AWS: download begin: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size
+ << " b=" << src_bucket << " k=" << key << " size=" << size
<< " mtime=" << mtime << " etag=" << etag
<< " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
<< dendl;
return set_cr_error(-EINVAL);
}
- instance.get_profile(bucket_info.bucket, &target);
- instance.conf.get_target(target, bucket_info, key, &target_bucket_name, &target_obj_name);
+ instance.get_profile(sync_pipe.source_bs.bucket, &target);
+ instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
yield {
}
yield {
- rgw_obj src_obj(bucket_info.bucket, key);
+ rgw_obj src_obj(src_bucket, key);
/* init output */
rgw_bucket target_bucket;
};
class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ rgw_bucket_sync_pipe sync_pipe;
AWSSyncInstanceEnv& instance;
uint64_t versioned_epoch;
public:
RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
instance(_instance), versioned_epoch(_versioned_epoch) {
}
~RGWAWSHandleRemoteObjCR() {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWAWSHandleRemoteObjCBCR(sync_env, bucket_info, key, instance, versioned_epoch);
+ return new RGWAWSHandleRemoteObjCBCR(sync_env, sync_pipe, key, instance, versioned_epoch);
}
};
class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env{nullptr};
std::shared_ptr<AWSSyncConfig_Profile> target;
- RGWBucketInfo bucket_info;
+ rgw_bucket_sync_pipe sync_pipe;
rgw_obj_key key;
ceph::real_time mtime;
AWSSyncInstanceEnv& instance;
int ret{0};
public:
RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key),
+ sync_pipe(_sync_pipe), key(_key),
mtime(_mtime), instance(_instance) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ << " b=" <<sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- instance.get_profile(bucket_info.bucket, &target);
- string path = instance.conf.get_path(target, bucket_info, key);
+ instance.get_profile(sync_pipe.source_bs.bucket, &target);
+ string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
call(new RGWDeleteRESTResourceCR(sync_env->cct, target->conn.get(),
~RGWAWSDataSyncModule() {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
std::optional<uint64_t> versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, instance, versioned_epoch.value_or(0));
+ ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ return new RGWAWSHandleRemoteObjCR(sync_env, sync_pipe, key, instance, versioned_epoch.value_or(0));
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) <<"rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWAWSRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, instance);
+ ldout(sync_env->cct, 0) <<"rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWAWSRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, instance);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
};
class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+ rgw_bucket_sync_pipe sync_pipe;
ElasticConfigRef conf;
uint64_t versioned_epoch;
public:
RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe), conf(_conf),
versioned_epoch(_versioned_epoch) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key
+ << " b=" << sync_pipe.source_bs.bucket << " k=" << key
<< " size=" << size << " mtime=" << mtime << dendl;
yield {
- string path = conf->get_obj_path(bucket_info, key);
- es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+ string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
+ es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
};
class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ rgw_bucket_sync_pipe sync_pipe;
ElasticConfigRef conf;
uint64_t versioned_epoch;
public:
RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
conf(_conf), versioned_epoch(_versioned_epoch) {
}
~RGWElasticHandleRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+ return new RGWElasticHandleRemoteObjCBCR(sync_env, sync_pipe, key, conf, versioned_epoch);
}
};
class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- RGWBucketInfo bucket_info;
+ rgw_bucket_sync_pipe sync_pipe;
rgw_obj_key key;
ceph::real_time mtime;
ElasticConfigRef conf;
public:
RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key),
+ sync_pipe(_sync_pipe), key(_key),
mtime(_mtime), conf(_conf) {}
int operate() override {
reenter(this) {
ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = conf->get_obj_path(bucket_info, key);
+ string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
return new RGWElasticGetESInfoCBCR(sync_env, conf);
}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- if (!conf->should_handle_operation(bucket_info)) {
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
- return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
+ return new RGWElasticHandleRemoteObjCR(sync_env, sync_pipe, key, conf, versioned_epoch.value_or(0));
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- if (!conf->should_handle_operation(bucket_info)) {
+ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
- return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+ return new RGWElasticRemoveRemoteObjCBCR(sync_env, sync_pipe, key, mtime, conf);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;
class RGWLogStatRemoteObjCBCR : public RGWStatRemoteObjCBCR {
public:
RGWLogStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key) {}
+ rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWStatRemoteObjCBCR(_sync_env, _src_bucket, _key) {}
int operate() override {
ldout(sync_env->cct, 0) << "SYNC_LOG: stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " b=" << src_bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
return set_cr_done();
}
class RGWLogStatRemoteObjCR : public RGWCallStatRemoteObjCR {
public:
RGWLogStatRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key) {
+ rgw_bucket& _src_bucket, rgw_obj_key& _key) : RGWCallStatRemoteObjCR(_sync_env, _src_bucket, _key) {
}
~RGWLogStatRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWLogStatRemoteObjCBCR(sync_env, bucket_info, key);
+ return new RGWLogStatRemoteObjCBCR(sync_env, src_bucket, key);
}
};
public:
explicit RGWLogDataSyncModule(const string& _prefix) : prefix(_prefix) {}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- return new RGWLogStatRemoteObjCR(sync_env, bucket_info, key);
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: sync_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ return new RGWLogStatRemoteObjCR(sync_env, sync_pipe.source_bs.bucket, key);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: rm_object: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sync_env->cct, 0) << prefix << ": SYNC_LOG: create_delete_marker: b=" << sync_pipe.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
return NULL;
}
// coroutine invoked on remote object creation
class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
RGWDataSyncEnv *sync_env;
+ rgw_bucket_sync_pipe sync_pipe;
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
EventRef<rgw_pubsub_event> event;
TopicsRef topics;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
sync_env(_sync_env),
+ sync_pipe(_sync_pipe),
env(_env),
versioned_epoch(_versioned_epoch),
topics(_topics) {
int operate() override {
reenter(this) {
ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " b=" << sync_pipe.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
<< " attrs=" << attrs << dendl;
{
std::vector<std::pair<std::string, std::string> > attrs;
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sync_env->cct,
- bucket_info.bucket, key,
+ sync_pipe.source_bs.bucket, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &event);
make_s3_record_ref(sync_env->cct,
- bucket_info.bucket, bucket_info.owner, key,
+ sync_pipe.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &record);
}
- yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
+#warning should it be source owner?
+ yield call(new RGWPSHandleObjEventCR(sync_env, env, sync_pipe.dest_bucket_info.owner, event, record, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
};
class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ rgw_bucket_sync_pipe sync_pipe;
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
TopicsRef topics;
public:
RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
- TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _sync_pipe.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
env(_env), versioned_epoch(_versioned_epoch),
topics(_topics) {
}
~RGWPSHandleRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
+ return new RGWPSHandleRemoteObjCBCR(sync_env, sync_pipe, key, env, versioned_epoch, topics);
}
};
class RGWPSHandleObjCreateCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- RGWBucketInfo bucket_info;
+ rgw_bucket_sync_pipe sync_pipe;
rgw_obj_key key;
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
TopicsRef topics;
public:
RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info),
+ sync_pipe(_sync_pipe),
key(_key),
env(_env),
versioned_epoch(_versioned_epoch) {
int operate() override {
reenter(this) {
- yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
- bucket_info.bucket, key,
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, sync_pipe.dest_bucket_info.owner,
+ sync_pipe.source_bs.bucket, key,
rgw::notify::ObjectCreated,
&topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
if (topics->empty()) {
- ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+ ldout(sync_env->cct, 20) << "no topics found for " << sync_pipe.source_bs.bucket << "/" << key << dendl;
return set_cr_done();
}
- yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
+ yield call(new RGWPSHandleRemoteObjCR(sync_env, sync_pipe, key, env, versioned_epoch, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
public:
RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
PSEnvRef _env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
env(_env),
- owner(_bucket_info.owner),
- bucket(_bucket_info.bucket),
+ owner(_sync_pipe.dest_bucket_info.owner),
+ bucket(_sync_pipe.dest_bucket_info.bucket),
key(_key),
mtime(_mtime), event_type(_event_type) {}
int operate() override {
return new RGWPSInitEnvCBCR(sync_env, env);
}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket <<
+ ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe <<
" k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
+ return new RGWPSHandleObjCreateCR(sync_env, sync_pipe, key, env, versioned_epoch);
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket <<
+ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
+ return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, rgw_bucket_sync_pipe& sync_pipe,
rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket <<
+ ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe <<
" k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
+ return new RGWPSGenericObjEventCBCR(sync_env, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
}
PSConfigRef& get_conf() { return conf; }
return log->get_info(shard_id, info);
}
-int RGWSI_DataLog_RADOS::add_entry(const rgw_bucket& bucket, int shard_id)
+int RGWSI_DataLog_RADOS::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
{
- return log->add_entry(bucket, shard_id);
+ return log->add_entry(bucket_info, shard_id);
}
int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
int get_info(int shard_id, RGWDataChangesLogInfo *info);
- int add_entry(const rgw_bucket& bucket, int shard_id);
+ int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
list<rgw_data_change_log_entry>& entries,
const string& marker,