From: Yehuda Sadeh Date: Wed, 18 Oct 2017 21:44:03 +0000 (-0700) Subject: rgw: aws sync, more work on large object sync via multipart upload X-Git-Tag: v13.1.0~270^2~76 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=77ac03608adb71d4fbced0df75f2055533b97877;p=ceph.git rgw: aws sync, more work on large object sync via multipart upload Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index a0fc93c1acfc..ee2b944bb97e 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -371,6 +371,12 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { protected: rgw_rest_obj rest_obj; + struct range_info { + bool is_set{false}; + uint64_t ofs; + uint64_t size; + } range; + public: RGWStreamReadHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, @@ -396,6 +402,12 @@ public: rgw_rest_obj& get_rest_obj() { return rest_obj; } + + void set_range(uint64_t ofs, uint64_t size) { + range.is_set = true; + range.ofs = ofs; + range.size = size; + } }; class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { @@ -406,6 +418,13 @@ protected: RGWHTTPStreamRWRequest *req{nullptr}; + struct multipart_info { + bool is_multipart{false}; + string upload_id; + int part_num{0}; + uint64_t part_size; + } multipart; + public: RGWStreamWriteHTTPResourceCRF(CephContext *_cct, RGWCoroutinesEnv *_env, @@ -426,6 +445,13 @@ public: void set_req(RGWHTTPStreamRWRequest *r) { req = r; } + + void set_multipart(const string& upload_id, int part_num, uint64_t part_size) { + multipart.is_multipart = true; + multipart.upload_id = upload_id; + multipart.part_num = part_num; + multipart.part_size = part_size; + } }; class RGWStreamSpliceCR : public RGWCoroutine { diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 749fe4371957..0723baf853fe 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -549,7 +549,7 @@ static void send_prepare_convert(const rgw_obj& obj, string *resource) *resource = urlsafe_bucket + "/" + urlsafe_object; } -int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr) +int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr) { string resource; send_prepare_convert(obj, &resource); @@ -557,7 +557,7 @@ int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map& return send_request(&key, extra_headers, resource, mgr); } -int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map& extra_headers, rgw_obj& obj) +int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map& extra_headers, const rgw_obj& obj) { string resource; send_prepare_convert(obj, &resource); diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index e990715c84e4..5ab40a3cc91c 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -118,13 +118,15 @@ public: virtual ~RGWRESTStreamRWRequest() override {} int send_prepare(RGWAccessKey *key, map& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */); - int send_prepare(RGWAccessKey& key, map& extra_headers, rgw_obj& obj); + int send_prepare(RGWAccessKey& key, map& extra_headers, const rgw_obj& obj); int send(RGWHTTPManager *mgr); - int send_request(RGWAccessKey& key, map& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr); + int send_request(RGWAccessKey& key, map& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr); int send_request(RGWAccessKey *key, map& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */); int complete_request(string& etag, real_time *mtime, uint64_t *psize, map& attrs); + + void add_params(param_vec_t *params); }; class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest { diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 70ee7f0bccf0..f35d119d7634 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -118,7 +118,7 @@ public: explicit StreamObjData(rgw_obj& _obj) : obj(_obj) {} }; -int RGWRESTConn::put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req) +int RGWRESTConn::put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req) { string url; int ret = get_url(url); @@ -128,6 +128,11 @@ int RGWRESTConn::put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req) rgw_user uid; param_vec_t params; populate_params(params, &uid, self_zone_group); + + if (extra_params) { + append_param_list(params, extra_params); + } + RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, ¶ms); wr->send_init(obj); *req = wr; @@ -184,12 +189,28 @@ static void set_header(T val, map& headers, const string& header } -int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj, +int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj, const real_time *mod_ptr, const real_time *unmod_ptr, uint32_t mod_zone_id, uint64_t mod_pg_ver, bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest, bool skip_decrypt, bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req) +{ + get_obj_params params; + params.uid = uid; + params.info = info; + params.mod_ptr = mod_ptr; + params.mod_pg_ver = mod_pg_ver; + params.prepend_metadata = prepend_metadata; + params.get_op = get_op; + params.rgwx_stat = rgwx_stat; + params.sync_manifest = sync_manifest; + params.skip_decrypt = skip_decrypt; + params.cb = cb; + return get_obj(obj, params, send, req); +} + +int RGWRESTConn::get_obj(const rgw_obj& obj, const get_obj_params& in_params, bool send, RGWRESTStreamRWRequest **req) { string url; int ret = get_url(url); @@ -197,31 +218,31 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw return ret; param_vec_t params; - populate_params(params, &uid, self_zone_group); - if (prepend_metadata) { + populate_params(params, &in_params.uid, self_zone_group); + if (in_params.prepend_metadata) { params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true")); } - if (rgwx_stat) { + if (in_params.rgwx_stat) { params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true")); } - if (sync_manifest) { + if (in_params.sync_manifest) { params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "sync-manifest", "")); } - if (skip_decrypt) { + if (in_params.skip_decrypt) { params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "skip-decrypt", "")); } if (!obj.key.instance.empty()) { const string& instance = obj.key.instance; params.push_back(param_pair_t("versionId", instance)); } - if (get_op) { - *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, ¶ms); + if (in_params.get_op) { + *req = new RGWRESTStreamReadRequest(cct, url, in_params.cb, NULL, ¶ms); } else { - *req = new RGWRESTStreamHeadRequest(cct, url, cb, NULL, ¶ms); + *req = new RGWRESTStreamHeadRequest(cct, url, in_params.cb, NULL, ¶ms); } map extra_headers; - if (info) { - const auto& orig_map = info->env->get_map(); + if (in_params.info) { + const auto& orig_map = in_params.info->env->get_map(); /* add original headers that start with HTTP_X_AMZ_ */ static constexpr char SEARCH_AMZ_PREFIX[] = "HTTP_X_AMZ_"; @@ -235,13 +256,18 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw } } - set_date_header(mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE"); - set_date_header(unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE"); - if (mod_zone_id != 0) { - set_header(mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID"); + set_date_header(in_params.mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE"); + set_date_header(in_params.unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE"); + if (in_params.mod_zone_id != 0) { + set_header(in_params.mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID"); + } + if (in_params.mod_pg_ver != 0) { + set_header(in_params.mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER"); } - if (mod_pg_ver != 0) { - set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER"); + if (in_params.range_is_set) { + char buf[64]; + snprintf(buf, sizeof(buf), "bytes=%lld-%lld", (long long)in_params.range_start, (long long)in_params.range_end); + set_header(buf, extra_headers, "RANGE"); } int r = (*req)->send_prepare(key, extra_headers, obj); diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 215f007424de..bdbdd4822db1 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -36,16 +36,22 @@ struct rgw_http_param_pair { const char *val; }; -// copy a null-terminated rgw_http_param_pair list into a list of string pairs -inline param_vec_t make_param_list(const rgw_http_param_pair* pp) +// append a null-terminated rgw_http_param_pair list into a list of string pairs +inline void append_param_list(param_vec_t& params, const rgw_http_param_pair* pp) { - param_vec_t params; while (pp && pp->key) { string k = pp->key; string v = (pp->val ? pp->val : ""); params.emplace_back(make_pair(std::move(k), std::move(v))); ++pp; } +} + +// copy a null-terminated rgw_http_param_pair list into a list of string pairs +inline param_vec_t make_param_list(const rgw_http_param_pair* pp) +{ + param_vec_t params; + append_param_list(params, pp); return params; } @@ -89,12 +95,36 @@ public: /* async requests */ - int put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req); + int put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req); int put_obj_async(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size, map& attrs, bool send, RGWRESTStreamS3PutObj **req); int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime); - int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj, + struct get_obj_params { + rgw_user uid; + req_info *info{nullptr}; + const ceph::real_time *mod_ptr{nullptr}; + const ceph::real_time *unmod_ptr{nullptr}; + + uint32_t mod_zone_id{0}; + uint64_t mod_pg_ver{0}; + + bool prepend_metadata{false}; + bool get_op{false}; + bool rgwx_stat{false}; + bool sync_manifest{false}; + + bool skip_decrypt{true}; + RGWGetDataCB *cb{nullptr}; + + bool range_is_set{false}; + uint64_t range_start{0}; + uint64_t range_end{0}; + }; + + int get_obj(const rgw_obj& obj, const get_obj_params& params, bool send, RGWRESTStreamRWRequest **req); + + int get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj, const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr, uint32_t mod_zone_id, uint64_t mod_pg_ver, bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest, diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 113eb72e7aa4..571947d99f55 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -30,6 +30,11 @@ static string aws_object_name(const RGWBucketInfo& bucket_info, const rgw_obj_ke return object_name; } +static string obj_to_aws_path(const rgw_obj& obj) +{ + return obj.bucket.name + "/" + obj.key.name; +} + struct AWSConfig { string id; std::unique_ptr conn; @@ -40,6 +45,7 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF RGWDataSyncEnv *sync_env; RGWRESTConn *conn; rgw_obj src_obj; + RGWRESTConn::get_obj_params req_params; public: RGWRESTStreamGetCRF(CephContext *_cct, RGWCoroutinesEnv *_env, @@ -52,12 +58,19 @@ public: int init() override { /* init input connection */ + + + req_params.get_op = true; + req_params.prepend_metadata = true; + + if (range.is_set) { + req_params.range_is_set = true; + req_params.range_start = range.ofs; + req_params.range_end = range.ofs + range.size - 1; + } + RGWRESTStreamRWRequest *in_req; - int ret = conn->get_obj(rgw_user(), nullptr, src_obj, - nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */, - true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */, - false /* sync_manifest */, true /* skip_descrypt */, false /* send */, - nullptr /* cb */, &in_req); + int ret = conn->get_obj(src_obj, req_params, false /* send */, &in_req); if (ret < 0) { ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl; return ret; @@ -108,7 +121,16 @@ public: /* init output connection */ RGWRESTStreamS3PutObj *out_req{nullptr}; - conn->put_obj_send_init(dest_obj, &out_req); + if (multipart.is_multipart) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", multipart.part_num); + rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, + { "partNumber", buf }, + { nullptr, nullptr } }; + conn->put_obj_send_init(dest_obj, params, &out_req); + } else { + conn->put_obj_send_init(dest_obj, nullptr, &out_req); + } set_req(out_req); @@ -179,7 +201,6 @@ public: } }; -#if 0 class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; @@ -187,6 +208,7 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { rgw_obj src_obj; rgw_obj dest_obj; + string upload_id; uint64_t ofs; uint64_t size; int part_num; @@ -200,6 +222,7 @@ public: const rgw_obj& _src_obj, RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, + const string& _upload_id, uint64_t _ofs, uint64_t _size, int _part_num) : RGWCoroutine(_sync_env->cct), @@ -208,6 +231,7 @@ public: dest_conn(_dest_conn), src_obj(_src_obj), dest_obj(_dest_obj), + upload_id(_upload_id), ofs(_ofs), size(_size), part_num(_part_num) {} @@ -222,7 +246,7 @@ public: out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn, dest_obj)); - out_crf->set_multipart(part_num, size); + out_crf->set_multipart(upload_id, part_num, size); yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); if (retcode < 0) { @@ -235,7 +259,45 @@ public: return 0; } }; -#endif + +class RGWAWSAbortMultipartCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRESTConn *dest_conn; + rgw_obj dest_obj; + + string upload_id; + +public: + RGWAWSAbortMultipartCR(RGWDataSyncEnv *_sync_env, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + const string& _upload_id) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id) {} + + int operate() { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + bufferlist bl; + call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, sync_env->http_manager, + obj_to_aws_path(dest_obj), params)); + } + + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl; + return set_cr_error(retcode); + } + + return set_cr_done(); + } + + return 0; + } +}; class RGWAWSInitMultipartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -279,11 +341,10 @@ public: reenter(this) { yield { - string path = dest_obj.bucket.name + "/" + dest_obj.key.name; rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} }; bufferlist bl; call(new RGWPostRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, - path, params, bl, &out_bl)); + obj_to_aws_path(dest_obj), params, bl, &out_bl)); } if (retcode < 0) { @@ -291,7 +352,11 @@ public: return set_cr_error(retcode); } { -#warning need to cancel upload in case of error here + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ RGWXMLDecoder::XMLParser parser; if (!parser.init()) { ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;