class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
{
RGWDataSyncEnv *sync_env;
+ rgw_sync_aws_src_obj_properties src_properties;
std::shared_ptr<AWSSyncConfig_Profile> target;
rgw_obj dest_obj;
string etag;
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWDataSyncEnv *_sync_env,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
std::shared_ptr<AWSSyncConfig_Profile>& _target,
rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
- sync_env(_sync_env), target(_target), dest_obj(_dest_obj) {
+ sync_env(_sync_env), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
}
int init() {
new_attrs[header_str] = s;
}
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
+ new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
+
r->set_send_length(rest_obj.content_len);
RGWAccessControlPolicy policy;
src_properties));
/* init output */
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target,
- dest_obj));
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env,
+ src_properties, target, dest_obj));
yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
if (retcode < 0) {
in_crf->set_range(part_info.ofs, part_info.size);
/* init output */
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target,
- dest_obj));
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env,
+ src_properties, target, dest_obj));
out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
AWSSyncInstanceEnv& instance;
+ uint64_t versioned_epoch{0};
+
RGWRESTConn *source_conn{nullptr};
std::shared_ptr<AWSSyncConfig_Profile> target;
bufferlist res;
RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info,
rgw_obj_key& _key,
- AWSSyncInstanceEnv& _instance) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
- instance(_instance)
+ AWSSyncInstanceEnv& _instance,
+ uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ instance(_instance), versioned_epoch(_versioned_epoch)
{}
~RGWAWSHandleRemoteObjCBCR(){
src_properties.etag = etag;
src_properties.zone_short_id = src_zone_short_id;
src_properties.pg_ver = src_pg_ver;
+ src_properties.versioned_epoch = versioned_epoch;
if (size < instance.conf.s3.multipart_sync_threshold) {
call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
AWSSyncInstanceEnv& instance;
+ uint64_t versioned_epoch;
public:
RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- AWSSyncInstanceEnv& _instance) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
- instance(_instance) {
+ AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ instance(_instance), versioned_epoch(_versioned_epoch) {
}
~RGWAWSHandleRemoteObjCR() {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWAWSHandleRemoteObjCBCR(sync_env, bucket_info, key, instance);
+ return new RGWAWSHandleRemoteObjCBCR(sync_env, bucket_info, key, instance, versioned_epoch);
}
};
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 0) << instance.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
- return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, instance);
+ return new RGWAWSHandleRemoteObjCR(sync_env, bucket_info, key, instance, versioned_epoch);
}
RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
rgw_zone_set *zones_trace) override {