From deac2ef85f95893ba5fbba1ff824a103fd5ce2e8 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 24 Oct 2017 17:06:01 -0700 Subject: [PATCH] rgw: aws sync, store temp per-object sync info When doing a multipart object sync, need to store the object's info so that we can either continue if upload is interrupted, and abort older upload_id in case object changed. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 8 ++ src/rgw/rgw_data_sync.h | 1 + src/rgw/rgw_sync_module_aws.cc | 154 ++++++++++++++++++++++----------- src/rgw/rgw_sync_module_aws.h | 66 ++++++++++++++ 4 files changed, 180 insertions(+), 49 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 5a350da2a5f..807f1d81a00 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -35,6 +35,7 @@ static string datalog_sync_status_oid_prefix = "datalog.sync-status"; static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; static string bucket_status_oid_prefix = "bucket.sync-status"; +static string object_status_oid_prefix = "bucket.sync-status"; class RGWSyncDebugLogger { CephContext *cct; @@ -3184,6 +3185,13 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } +string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone, + const rgw_obj& obj) +{ + return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" + + obj.key.name + ":" + obj.key.instance; +} + class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { static constexpr int max_concurrent_shards = 16; RGWRados *const store; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 1c4a6a531b8..088486e13c4 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -531,6 +531,7 @@ public: int init_sync_status(); static string status_oid(const string& source_zone, const rgw_bucket_shard& bs); + static string obj_status_oid(const string& source_zone, const rgw_obj& obj); /* can be used by sync modules */ int read_sync_status(); int run(); diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 9413566f102..f89364f777d 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -5,6 +5,7 @@ #include "rgw_sync_module.h" #include "rgw_data_sync.h" #include "rgw_sync_module_aws.h" +#include "rgw_cr_rados.h" #include "rgw_rest_conn.h" #include "rgw_cr_rest.h" #include "rgw_acl.h" @@ -224,13 +225,6 @@ public: } }; -struct multipart_part_info { - int part_num; - uint64_t ofs; - uint64_t size; - string etag; -}; - class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; @@ -240,7 +234,7 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { string upload_id; - multipart_part_info part_info; + rgw_sync_aws_multipart_part_info part_info; std::shared_ptr in_crf; std::shared_ptr out_crf; @@ -254,7 +248,7 @@ public: RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, const string& _upload_id, - const multipart_part_info& _part_info, + const rgw_sync_aws_multipart_part_info& _part_info, string *_petag) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), @@ -434,9 +428,9 @@ class RGWAWSCompleteMultipartCR : public RGWCoroutine { string upload_id; struct CompleteMultipartReq { - map parts; + map parts; - CompleteMultipartReq(const map& _parts) : parts(_parts) {} + CompleteMultipartReq(const map& _parts) : parts(_parts) {} void dump_xml(Formatter *f) const { for (auto p : parts) { @@ -467,7 +461,7 @@ public: RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, string _upload_id, - const map& _parts) : RGWCoroutine(_sync_env->cct), + const map& _parts) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), dest_conn(_dest_conn), dest_obj(_dest_obj), @@ -533,6 +527,46 @@ public: } }; + +class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRESTConn *dest_conn; + const rgw_obj dest_obj; + const rgw_raw_obj status_obj; + + string upload_id; + +public: + + RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncEnv *_sync_env, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + const rgw_raw_obj& _status_obj, + const string& _upload_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + status_obj(_status_obj), + upload_id(_upload_id) {} + + int operate() { + reenter(this) { + yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } + yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl; + /* ignore error, best effort */ + } + return set_cr_done(); + } + + return 0; + } +}; + class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; @@ -543,20 +577,14 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { uint64_t obj_size; ceph::real_time mtime; - string upload_id; - - uint32_t part_size; - int num_parts; - - int cur_part{0}; - uint64_t cur_ofs{0}; + rgw_sync_aws_multipart_upload_info status; - map parts; - - multipart_part_info *pcur_part_info{nullptr}; + rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr}; int ret_err{0}; + rgw_raw_obj status_obj; + public: RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env, RGWRESTConn *_source_conn, @@ -571,65 +599,93 @@ public: src_obj(_src_obj), dest_obj(_dest_obj), obj_size(_obj_size), - mtime(_mtime) { -#warning flexible part size needed - part_size = 5 * 1024 * 1024; - - num_parts = (obj_size + part_size - 1) / part_size; + mtime(_mtime), + status_obj(sync_env->store->get_zone_params().log_pool, + RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) { } int operate() { reenter(this) { - yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, obj_size, mtime, &upload_id)); - if (retcode < 0) { - return set_cr_error(retcode); + yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->store, + status_obj, &status, false)); + + if (retcode < 0 && retcode != -ENOENT) { + ldout(sync_env->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl; + return retcode; + } + + if (retcode >= 0) { + /* check here that mtime and size did not change */ + + if (status.mtime != mtime || status.obj_size != obj_size) { + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); + retcode = -ENOENT; + } } - for (cur_part = 1; cur_part <= num_parts; ++cur_part) { + if (retcode == -ENOENT) { + yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + status.obj_size = obj_size; + status.mtime = mtime; +#warning flexible part size needed + status.part_size = 5 * 1024 * 1024; + status.num_parts = (obj_size + status.part_size - 1) / status.part_size; + status.cur_part = 1; + } + + for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) { yield { - multipart_part_info& cur_part_info = parts[cur_part]; - cur_part_info.part_num = cur_part; - cur_part_info.ofs = cur_ofs; - cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs); + rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part]; + cur_part_info.part_num = status.cur_part; + cur_part_info.ofs = status.cur_ofs; + cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs); pcur_part_info = &cur_part_info; - cur_ofs += part_size; + status.cur_ofs += status.part_size; call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env, source_conn, src_obj, dest_conn, dest_obj, - upload_id, + status.upload_id, cur_part_info, &cur_part_info.etag)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; - } + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } - ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl; - + yield call(new RGWSimpleRadosWriteCR(sync_env->async_rados, sync_env->store, status_obj, status)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl; + /* continue with upload anyway */ + } + ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; } - yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts)); + yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, status.upload_id, status.parts)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); - if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; - } + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } + /* remove status obj */ + yield call(new RGWRadosRemoveCR(sync_env->store, status_obj)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; + /* ignore error, best effort */ + } return set_cr_done(); } diff --git a/src/rgw/rgw_sync_module_aws.h b/src/rgw/rgw_sync_module_aws.h index 1f80eeaf196..5180da5f7cd 100644 --- a/src/rgw/rgw_sync_module_aws.h +++ b/src/rgw/rgw_sync_module_aws.h @@ -3,6 +3,72 @@ #include "rgw_sync_module.h" +struct rgw_sync_aws_multipart_part_info { + int part_num{0}; + uint64_t ofs{0}; + uint64_t size{0}; + string etag; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(part_num, bl); + encode(ofs, bl); + encode(size, bl); + encode(etag, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + decode(part_num, bl); + decode(ofs, bl); + decode(size, bl); + decode(etag, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_part_info) + +struct rgw_sync_aws_multipart_upload_info { + string upload_id; + uint64_t obj_size; + ceph::real_time mtime; + uint32_t part_size{0}; + uint32_t num_parts{0}; + + int cur_part{0}; + uint64_t cur_ofs{0}; + + std::map parts; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(upload_id, bl); + encode(obj_size, bl); + encode(mtime, bl); + encode(part_size, bl); + encode(num_parts, bl); + encode(cur_part, bl); + encode(cur_ofs, bl); + encode(parts, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + decode(upload_id, bl); + decode(obj_size, bl); + decode(mtime, bl); + decode(part_size, bl); + decode(num_parts, bl); + decode(cur_part, bl); + decode(cur_ofs, bl); + decode(parts, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_upload_info) + class RGWAWSSyncModule : public RGWSyncModule { public: RGWAWSSyncModule() {} -- 2.39.5