std::unique_ptr<RGWRESTConn> conn;
};
+
class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
{
RGWDataSyncEnv *sync_env;
rgw_obj src_obj;
RGWRESTConn::get_obj_params req_params;
- string etag;
+ rgw_sync_aws_src_obj_properties src_properties;
public:
RGWRESTStreamGetCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWDataSyncEnv *_sync_env,
RGWRESTConn *_conn,
- rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
- sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
+ rgw_obj& _src_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+ sync_env(_sync_env), conn(_conn), src_obj(_src_obj),
+ src_properties(_src_properties) {
}
int init() override {
req_params.get_op = true;
req_params.prepend_metadata = true;
+ req_params.unmod_ptr = &src_properties.mtime;
+ req_params.etag = src_properties.etag;
+ req_params.mod_zone_id = src_properties.zone_short_id;
+ req_params.mod_pg_ver = src_properties.pg_ver;
+
if (range.is_set) {
req_params.range_is_set = true;
req_params.range_start = range.ofs;
const string& val = header.second;
if (header.first == "RGWX_OBJECT_SIZE") {
rest_obj.content_len = atoi(val.c_str());
- } else if (header.first == "ETAG") {
- etag = val;
} else {
rest_obj.attrs[header.first] = val;
}
rgw_obj src_obj;
rgw_obj dest_obj;
+ rgw_sync_aws_src_obj_properties src_properties;
+
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env,
RGWRESTConn *_source_conn,
const rgw_obj& _src_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
dest_conn(_dest_conn),
src_obj(_src_obj),
- dest_obj(_dest_obj) {}
+ dest_obj(_dest_obj),
+ src_properties(_src_properties) {}
int operate() {
reenter(this) {
/* init input */
- in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env,
+ source_conn, src_obj,
+ src_properties));
/* init output */
out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
rgw_obj src_obj;
rgw_obj dest_obj;
+ rgw_sync_aws_src_obj_properties src_properties;
+
string upload_id;
rgw_sync_aws_multipart_part_info part_info;
const rgw_obj& _src_obj,
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
const string& _upload_id,
const rgw_sync_aws_multipart_part_info& _part_info,
string *_petag) : RGWCoroutine(_sync_env->cct),
dest_conn(_dest_conn),
src_obj(_src_obj),
dest_obj(_dest_obj),
+ src_properties(_src_properties),
upload_id(_upload_id),
part_info(_part_info),
petag(_petag) {}
int operate() {
reenter(this) {
/* init input */
- in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env,
+ source_conn, src_obj,
+ src_properties));
in_crf->set_range(part_info.ofs, part_info.size);
rgw_obj dest_obj;
uint64_t obj_size;
- ceph::real_time mtime;
bufferlist out_bl;
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
uint64_t _obj_size,
- const ceph::real_time& _mtime,
string *_upload_id) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
dest_conn(_dest_conn),
dest_obj(_dest_obj),
obj_size(_obj_size),
- mtime(_mtime),
upload_id(_upload_id) {}
int operate() {
rgw_obj dest_obj;
uint64_t obj_size;
- ceph::real_time mtime;
+ string src_etag;
+ rgw_sync_aws_src_obj_properties src_properties;
rgw_sync_aws_multipart_upload_info status;
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
uint64_t _obj_size,
- const ceph::real_time& _mtime) : RGWCoroutine(_sync_env->cct),
+ const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
dest_conn(_dest_conn),
src_obj(_src_obj),
dest_obj(_dest_obj),
obj_size(_obj_size),
- mtime(_mtime),
+ src_properties(_src_properties),
status_obj(sync_env->store->get_zone_params().log_pool,
RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
}
if (retcode >= 0) {
/* check here that mtime and size did not change */
- if (status.mtime != mtime || status.obj_size != obj_size) {
+ if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
+ status.src_properties.etag != src_properties.etag) {
yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
retcode = -ENOENT;
}
}
if (retcode == -ENOENT) {
- yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id));
+ yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, &status.upload_id));
if (retcode < 0) {
return set_cr_error(retcode);
}
status.obj_size = obj_size;
- status.mtime = mtime;
+ status.src_properties = src_properties;
#warning flexible part size needed
status.part_size = 5 * 1024 * 1024;
status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
source_conn, src_obj,
dest_conn, dest_obj,
+ status.src_properties,
status.upload_id,
cur_part_info,
&cur_part_info.etag));
return 0;
}
};
+template <class T>
+int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
+{
+ map<string, bufferlist>::iterator iter = attrs.find(attr_name);
+ if (iter == attrs.end()) {
+ *result = def_val;
+ return 0;
+ }
+ bufferlist& bl = iter->second;
+ if (bl.length() == 0) {
+ *result = def_val;
+ return 0;
+ }
+ bufferlist::iterator bliter = bl.begin();
+ try {
+ decode(*result, bliter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return 0;
+}
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
string obj_path;
int ret{0};
+ uint32_t src_zone_short_id{0};
+ uint64_t src_pg_ver{0};
+
static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
public:
int operate() override {
reenter(this) {
-
+ ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
+ } else {
+ ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
+ src_pg_ver = 0; /* all or nothing */
+ }
+ }
ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " size=" << size
- << " mtime=" << mtime << " attrs=" << attrs
+ << " mtime=" << mtime << " etag=" << etag
+ << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
+ << " attrs=" << attrs
<< dendl;
source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone);
uri resolution */
rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
+
+ rgw_sync_aws_src_obj_properties src_properties;
+ src_properties.mtime = mtime;
+ src_properties.etag = etag;
+ src_properties.zone_short_id = src_zone_short_id;
+ src_properties.pg_ver = src_pg_ver;
+
if (size < multipart_threshold) {
- call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, conf.conn.get(), dest_obj));
+ call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
+ src_properties,
+ conf.conn.get(), dest_obj));
} else {
call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, conf.conn.get(),
- dest_obj, size, mtime));
+ dest_obj, size, src_properties));
}
}
if (retcode < 0) {