From 4c52d1d09f7a93d6fcb520155c9985e561b3b667 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 25 Oct 2017 17:56:57 -0700 Subject: [PATCH] rgw: aws sync: check that source object doesn't change Make suret that while syncing the object it doesn't change, which can be a problem when uploading the object piecemeal. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rados.cc | 2 +- src/rgw/rgw_cr_rados.h | 8 ++- src/rgw/rgw_cr_rest.h | 3 + src/rgw/rgw_rados.cc | 8 +++ src/rgw/rgw_rest_conn.cc | 15 +++-- src/rgw/rgw_rest_conn.h | 3 + src/rgw/rgw_sync_module.cc | 4 +- src/rgw/rgw_sync_module.h | 4 ++ src/rgw/rgw_sync_module_aws.cc | 104 ++++++++++++++++++++++++++------- src/rgw/rgw_sync_module_aws.h | 32 +++++++++- 10 files changed, 151 insertions(+), 32 deletions(-) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index eca59a7f6f7a2..6d21be1dee695 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -618,7 +618,7 @@ int RGWAsyncStatRemoteObj::_send_request() pattrs, nullptr, nullptr, /* string *ptag, */ - nullptr); /* string *petag, */ + petag); /* string *petag, */ if (r < 0) { ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl; diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 20db87508d6b0..80b96a3056a6c 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -830,6 +830,7 @@ class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest { ceph::real_time *pmtime; uint64_t *psize; + string *petag; map *pattrs; protected: @@ -841,12 +842,14 @@ public: const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, + string *_petag, map *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), pmtime(_pmtime), psize(_psize), + petag(_petag), pattrs(_pattrs) {} }; @@ -862,6 +865,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine { ceph::real_time *pmtime; uint64_t *psize; + string *petag; map *pattrs; RGWAsyncStatRemoteObj *req; @@ -873,6 +877,7 @@ public: const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, + string *_petag, map *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), @@ -880,6 +885,7 @@ public: key(_key), pmtime(_pmtime), psize(_psize), + petag(_petag), pattrs(_pattrs), req(NULL) {} @@ -897,7 +903,7 @@ public: int send_request() override { req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone, - bucket_info, key, pmtime, psize, pattrs); + bucket_info, key, pmtime, psize, petag, pattrs); async_rados->queue(req); return 0; } diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 72767ebbf5c8e..005ed10cbac77 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -377,6 +377,9 @@ protected: uint64_t size; } range; + ceph::real_time mtime; + string etag; + public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8fdb88fc530ff..43908c85f2ba4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -7571,6 +7571,10 @@ struct obj_time_weight { if (l < r) { return true; } + if (!zone_short_id || !rhs.zone_short_id) { + /* don't compare zone ids, if one wasn't provided */ + return false; + } if (zone_short_id != rhs.zone_short_id) { return (zone_short_id < rhs.zone_short_id); } @@ -7588,6 +7592,10 @@ struct obj_time_weight { if (mtime < rhs.mtime) { return true; } + if (!zone_short_id || !rhs.zone_short_id) { + /* don't compare zone ids, if one wasn't provided */ + return false; + } if (zone_short_id != rhs.zone_short_id) { return (zone_short_id < rhs.zone_short_id); } diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index f35d119d76349..0bfce1af30355 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -169,14 +169,18 @@ int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real return ret; } -static void set_date_header(const real_time *t, map& headers, const string& header_name) +static void set_date_header(const real_time *t, map& headers, bool high_precision_time, const string& header_name) { if (!t) { return; } stringstream s; utime_t tm = utime_t(*t); - tm.gmtime_nsec(s); + if (high_precision_time) { + tm.gmtime_nsec(s); + } else { + tm.gmtime(s); + } headers[header_name] = s.str(); } @@ -256,8 +260,11 @@ int RGWRESTConn::get_obj(const rgw_obj& obj, const get_obj_params& in_params, bo } } - set_date_header(in_params.mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE"); - set_date_header(in_params.unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE"); + set_date_header(in_params.mod_ptr, extra_headers, in_params.high_precision_time, "HTTP_IF_MODIFIED_SINCE"); + set_date_header(in_params.unmod_ptr, extra_headers, in_params.high_precision_time, "HTTP_IF_UNMODIFIED_SINCE"); + if (!in_params.etag.empty()) { + set_header(in_params.etag, extra_headers, "HTTP_IF_MATCH"); + } if (in_params.mod_zone_id != 0) { set_header(in_params.mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID"); } diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index bdbdd4822db1a..aa91c65faeff3 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -105,6 +105,9 @@ public: req_info *info{nullptr}; const ceph::real_time *mod_ptr{nullptr}; const ceph::real_time *unmod_ptr{nullptr}; + bool high_precision_time{true}; + + string etag; uint32_t mod_zone_id{0}; uint64_t mod_pg_ver{0}; diff --git a/src/rgw/rgw_sync_module.cc b/src/rgw/rgw_sync_module.cc index 635cc3c519c4b..309772b22d317 100644 --- a/src/rgw/rgw_sync_module.cc +++ b/src/rgw/rgw_sync_module.cc @@ -29,7 +29,7 @@ int RGWCallStatRemoteObjCR::operate() { yield { call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, - bucket_info, key, &mtime, &size, &attrs)); + bucket_info, key, &mtime, &size, &etag, &attrs)); } if (retcode < 0) { ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl; @@ -41,7 +41,7 @@ int RGWCallStatRemoteObjCR::operate() { yield { RGWStatRemoteObjCBCR *cb = allocate_callback(); if (cb) { - cb->set_result(mtime, size, std::move(attrs)); + cb->set_result(mtime, size, etag, std::move(attrs)); call(cb); } } diff --git a/src/rgw/rgw_sync_module.h b/src/rgw/rgw_sync_module.h index be3fdc3d420ec..9925d677eafd2 100644 --- a/src/rgw/rgw_sync_module.h +++ b/src/rgw/rgw_sync_module.h @@ -122,6 +122,7 @@ protected: ceph::real_time mtime; uint64_t size = 0; + string etag; map attrs; public: RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env, @@ -130,9 +131,11 @@ public: void set_result(ceph::real_time& _mtime, uint64_t _size, + const string& _etag, map&& _attrs) { mtime = _mtime; size = _size; + etag = _etag; attrs = std::move(_attrs); } }; @@ -140,6 +143,7 @@ public: class RGWCallStatRemoteObjCR : public RGWCoroutine { ceph::real_time mtime; uint64_t size{0}; + string etag; map attrs; protected: diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index f89364f777dfc..8ef9908760cc2 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -43,6 +43,7 @@ struct AWSConfig { std::unique_ptr conn; }; + class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF { RGWDataSyncEnv *sync_env; @@ -50,15 +51,17 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF rgw_obj src_obj; RGWRESTConn::get_obj_params req_params; - string etag; + rgw_sync_aws_src_obj_properties src_properties; public: RGWRESTStreamGetCRF(CephContext *_cct, RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWDataSyncEnv *_sync_env, RGWRESTConn *_conn, - rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), - sync_env(_sync_env), conn(_conn), src_obj(_src_obj) { + rgw_obj& _src_obj, + const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), + sync_env(_sync_env), conn(_conn), src_obj(_src_obj), + src_properties(_src_properties) { } int init() override { @@ -68,6 +71,11 @@ public: req_params.get_op = true; req_params.prepend_metadata = true; + req_params.unmod_ptr = &src_properties.mtime; + req_params.etag = src_properties.etag; + req_params.mod_zone_id = src_properties.zone_short_id; + req_params.mod_pg_ver = src_properties.pg_ver; + if (range.is_set) { req_params.range_is_set = true; req_params.range_start = range.ofs; @@ -91,8 +99,6 @@ public: const string& val = header.second; if (header.first == "RGWX_OBJECT_SIZE") { rest_obj.content_len = atoi(val.c_str()); - } else if (header.first == "ETAG") { - etag = val; } else { rest_obj.attrs[header.first] = val; } @@ -189,6 +195,8 @@ class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { rgw_obj src_obj; rgw_obj dest_obj; + rgw_sync_aws_src_obj_properties src_properties; + std::shared_ptr in_crf; std::shared_ptr out_crf; @@ -196,18 +204,22 @@ public: RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, + const rgw_sync_aws_src_obj_properties& _src_properties, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), dest_conn(_dest_conn), src_obj(_src_obj), - dest_obj(_dest_obj) {} + dest_obj(_dest_obj), + src_properties(_src_properties) {} int operate() { reenter(this) { /* init input */ - in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj)); + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, + source_conn, src_obj, + src_properties)); /* init output */ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn, @@ -232,6 +244,8 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { rgw_obj src_obj; rgw_obj dest_obj; + rgw_sync_aws_src_obj_properties src_properties; + string upload_id; rgw_sync_aws_multipart_part_info part_info; @@ -247,6 +261,7 @@ public: const rgw_obj& _src_obj, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, + const rgw_sync_aws_src_obj_properties& _src_properties, const string& _upload_id, const rgw_sync_aws_multipart_part_info& _part_info, string *_petag) : RGWCoroutine(_sync_env->cct), @@ -255,6 +270,7 @@ public: dest_conn(_dest_conn), src_obj(_src_obj), dest_obj(_dest_obj), + src_properties(_src_properties), upload_id(_upload_id), part_info(_part_info), petag(_petag) {} @@ -262,7 +278,9 @@ public: int operate() { reenter(this) { /* init input */ - in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj)); + in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, + source_conn, src_obj, + src_properties)); in_crf->set_range(part_info.ofs, part_info.size); @@ -334,7 +352,6 @@ class RGWAWSInitMultipartCR : public RGWCoroutine { rgw_obj dest_obj; uint64_t obj_size; - ceph::real_time mtime; bufferlist out_bl; @@ -357,13 +374,11 @@ public: RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, uint64_t _obj_size, - const ceph::real_time& _mtime, string *_upload_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), dest_conn(_dest_conn), dest_obj(_dest_obj), obj_size(_obj_size), - mtime(_mtime), upload_id(_upload_id) {} int operate() { @@ -575,7 +590,8 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { rgw_obj dest_obj; uint64_t obj_size; - ceph::real_time mtime; + string src_etag; + rgw_sync_aws_src_obj_properties src_properties; rgw_sync_aws_multipart_upload_info status; @@ -592,14 +608,14 @@ public: RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, uint64_t _obj_size, - const ceph::real_time& _mtime) : RGWCoroutine(_sync_env->cct), + const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), dest_conn(_dest_conn), src_obj(_src_obj), dest_obj(_dest_obj), obj_size(_obj_size), - mtime(_mtime), + src_properties(_src_properties), status_obj(sync_env->store->get_zone_params().log_pool, RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) { } @@ -618,20 +634,21 @@ public: if (retcode >= 0) { /* check here that mtime and size did not change */ - if (status.mtime != mtime || status.obj_size != obj_size) { + if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || + status.src_properties.etag != src_properties.etag) { yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); retcode = -ENOENT; } } if (retcode == -ENOENT) { - yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id)); + yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, &status.upload_id)); if (retcode < 0) { return set_cr_error(retcode); } status.obj_size = obj_size; - status.mtime = mtime; + status.src_properties = src_properties; #warning flexible part size needed status.part_size = 5 * 1024 * 1024; status.num_parts = (obj_size + status.part_size - 1) / status.part_size; @@ -652,6 +669,7 @@ public: call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env, source_conn, src_obj, dest_conn, dest_obj, + status.src_properties, status.upload_id, cur_part_info, &cur_part_info.etag)); @@ -692,6 +710,27 @@ public: return 0; } }; +template +int decode_attr(map& attrs, const char *attr_name, T *result, T def_val) +{ + map::iterator iter = attrs.find(attr_name); + if (iter == attrs.end()) { + *result = def_val; + return 0; + } + bufferlist& bl = iter->second; + if (bl.length() == 0) { + *result = def_val; + return 0; + } + bufferlist::iterator bliter = bl.begin(); + try { + decode(*result, bliter); + } catch (buffer::error& err) { + return -EIO; + } + return 0; +} // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { @@ -704,6 +743,9 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { string obj_path; int ret{0}; + uint32_t src_zone_short_id{0}; + uint64_t src_pg_ver{0}; + static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024; public: @@ -719,10 +761,21 @@ public: int operate() override { reenter(this) { - + ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl; + } else { + ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl; + src_pg_ver = 0; /* all or nothing */ + } + } ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " size=" << size - << " mtime=" << mtime << " attrs=" << attrs + << " mtime=" << mtime << " etag=" << etag + << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver + << " attrs=" << attrs << dendl; source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone); @@ -758,11 +811,20 @@ public: uri resolution */ rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key)); + + rgw_sync_aws_src_obj_properties src_properties; + src_properties.mtime = mtime; + src_properties.etag = etag; + src_properties.zone_short_id = src_zone_short_id; + src_properties.pg_ver = src_pg_ver; + if (size < multipart_threshold) { - call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, conf.conn.get(), dest_obj)); + call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, + src_properties, + conf.conn.get(), dest_obj)); } else { call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, conf.conn.get(), - dest_obj, size, mtime)); + dest_obj, size, src_properties)); } } if (retcode < 0) { diff --git a/src/rgw/rgw_sync_module_aws.h b/src/rgw/rgw_sync_module_aws.h index 5180da5f7cdf6..14749d514b1e0 100644 --- a/src/rgw/rgw_sync_module_aws.h +++ b/src/rgw/rgw_sync_module_aws.h @@ -29,10 +29,36 @@ struct rgw_sync_aws_multipart_part_info { }; WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_part_info) +struct rgw_sync_aws_src_obj_properties { + ceph::real_time mtime; + string etag; + uint32_t zone_short_id{0}; + uint64_t pg_ver{0}; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(mtime, bl); + encode(etag, bl); + encode(zone_short_id, bl); + encode(pg_ver, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + decode(mtime, bl); + decode(etag, bl); + decode(zone_short_id, bl); + decode(pg_ver, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_sync_aws_src_obj_properties) + struct rgw_sync_aws_multipart_upload_info { string upload_id; uint64_t obj_size; - ceph::real_time mtime; + rgw_sync_aws_src_obj_properties src_properties; uint32_t part_size{0}; uint32_t num_parts{0}; @@ -45,7 +71,7 @@ struct rgw_sync_aws_multipart_upload_info { ENCODE_START(1, 1, bl); encode(upload_id, bl); encode(obj_size, bl); - encode(mtime, bl); + encode(src_properties, bl); encode(part_size, bl); encode(num_parts, bl); encode(cur_part, bl); @@ -58,7 +84,7 @@ struct rgw_sync_aws_multipart_upload_info { DECODE_START(1, bl); decode(upload_id, bl); decode(obj_size, bl); - decode(mtime, bl); + decode(src_properties, bl); decode(part_size, bl); decode(num_parts, bl); decode(cur_part, bl); -- 2.39.5