From 1df6277d8443ef1abf0c17268dce19d79a61171c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 20 Oct 2017 17:43:32 -0700 Subject: [PATCH] rgw: aws sync: multipart upload complete Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rest.cc | 12 +- src/rgw/rgw_cr_rest.h | 2 + src/rgw/rgw_rest_client.cc | 5 + src/rgw/rgw_sync_module_aws.cc | 211 +++++++++++++++++++++++++++++---- 4 files changed, 206 insertions(+), 24 deletions(-) diff --git a/src/rgw/rgw_cr_rest.cc b/src/rgw/rgw_cr_rest.cc index cf6f178b6e0..8ecfd2dec4b 100644 --- a/src/rgw/rgw_cr_rest.cc +++ b/src/rgw/rgw_cr_rest.cc @@ -42,7 +42,10 @@ public: } } - env->manager->io_complete(cr, io_id); +#define GET_DATA_WINDOW_SIZE 1 * 1024 * 1024 + if (bl.length() >= GET_DATA_WINDOW_SIZE) { + env->manager->io_complete(cr, io_id); + } return 0; } @@ -202,6 +205,10 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry) yield caller->io_block(0, req->get_io_id()); *need_retry = !req->is_done(); } + +#warning need to lock in_req->headers + handle_headers(req->get_out_headers()); + return req->get_req_retcode(); } return 0; @@ -267,6 +274,7 @@ int RGWStreamSpliceCR::operate() { total_read += bl.length(); yield { + ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; ret = out_crf->write(bl); if (ret < 0) { return set_cr_error(ret); @@ -277,8 +285,6 @@ int RGWStreamSpliceCR::operate() { ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; return set_cr_error(ret); } - - ldout(cct, 20) << "wrote " << bl.length() << " bytes" << dendl; } while (true); do { diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index ee2b944bb97..72767ebbf5c 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -442,6 +442,8 @@ public: int write(bufferlist& data) override; /* reentrant */ int drain_writes(bool *need_retry) override; /* reentrant */ + virtual void handle_headers(const std::map& headers) {} + void set_req(RGWHTTPStreamRWRequest *r) { req = r; } diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 0723baf853f..aa9639534b5 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -427,6 +427,11 @@ void RGWRESTStreamS3PutObj::send_init(rgw_obj& obj) map& args = new_info.args.get_params(); get_params_str(args, params_str); + /* merge params with extra args so that we can sign correctly */ + for (param_vec_t::iterator iter = params.begin(); iter != params.end(); ++iter) { + new_info.args.append(iter->first, iter->second); + } + new_url.append(resource + params_str); new_env.set("HTTP_DATE", date_str.c_str()); diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index 94ce689f99d..9413566f102 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -1,3 +1,5 @@ +#include "common/errno.h" + #include "rgw_common.h" #include "rgw_coroutine.h" #include "rgw_sync_module.h" @@ -46,6 +48,8 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF RGWRESTConn *conn; rgw_obj src_obj; RGWRESTConn::get_obj_params req_params; + + string etag; public: RGWRESTStreamGetCRF(CephContext *_cct, RGWCoroutinesEnv *_env, @@ -86,6 +90,8 @@ public: const string& val = header.second; if (header.first == "RGWX_OBJECT_SIZE") { rest_obj.content_len = atoi(val.c_str()); + } else if (header.first == "ETAG") { + etag = val; } else { rest_obj.attrs[header.first] = val; } @@ -107,6 +113,7 @@ class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF RGWDataSyncEnv *sync_env; RGWRESTConn *conn; rgw_obj dest_obj; + string etag; public: RGWAWSStreamPutCRF(CephContext *_cct, RGWCoroutinesEnv *_env, @@ -155,6 +162,22 @@ public: r->send_ready(conn->get_key(), new_attrs, false); } + + void handle_headers(const map& headers) { + for (auto h : headers) { + if (h.first == "ETAG") { + etag = h.second; + } + } + } + + bool get_etag(string *petag) { + if (etag.empty()) { + return false; + } + *petag = etag; + return true; + } }; @@ -201,6 +224,13 @@ public: } }; +struct multipart_part_info { + int part_num; + uint64_t ofs; + uint64_t size; + string etag; +}; + class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; @@ -209,13 +239,14 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { rgw_obj dest_obj; string upload_id; - uint64_t ofs; - uint64_t size; - int part_num; + + multipart_part_info part_info; std::shared_ptr in_crf; std::shared_ptr out_crf; + string *petag; + public: RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env, RGWRESTConn *_source_conn, @@ -223,36 +254,40 @@ public: RGWRESTConn *_dest_conn, const rgw_obj& _dest_obj, const string& _upload_id, - uint64_t _ofs, - uint64_t _size, - int _part_num) : RGWCoroutine(_sync_env->cct), + const multipart_part_info& _part_info, + string *_petag) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), dest_conn(_dest_conn), src_obj(_src_obj), dest_obj(_dest_obj), upload_id(_upload_id), - ofs(_ofs), size(_size), - part_num(_part_num) {} + part_info(_part_info), + petag(_petag) {} int operate() { reenter(this) { /* init input */ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj)); - in_crf->set_range(ofs, size); + in_crf->set_range(part_info.ofs, part_info.size); /* init output */ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn, dest_obj)); - out_crf->set_multipart(upload_id, part_num, size); + out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); if (retcode < 0) { return set_cr_error(retcode); } + if (!((RGWAWSStreamPutCRF *)out_crf.get())->get_etag(petag)) { + ldout(sync_env->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl; + return set_cr_error(-EIO); + } + return set_cr_done(); } @@ -389,6 +424,115 @@ public: } }; +class RGWAWSCompleteMultipartCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRESTConn *dest_conn; + rgw_obj dest_obj; + + bufferlist out_bl; + + string upload_id; + + struct CompleteMultipartReq { + map parts; + + CompleteMultipartReq(const map& _parts) : parts(_parts) {} + + void dump_xml(Formatter *f) const { + for (auto p : parts) { + f->open_object_section("Part"); + encode_xml("PartNumber", p.first, f); + encode_xml("ETag", p.second.etag, f); + f->close_section(); + }; + } + } req_enc; + + struct CompleteMultipartResult { + string location; + string bucket; + string key; + string etag; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Location", bucket, obj); + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("ETag", etag, obj); + } + } result; + +public: + RGWAWSCompleteMultipartCR(RGWDataSyncEnv *_sync_env, + RGWRESTConn *_dest_conn, + const rgw_obj& _dest_obj, + string _upload_id, + const map& _parts) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + dest_conn(_dest_conn), + dest_obj(_dest_obj), + upload_id(_upload_id), + req_enc(_parts) {} + + int operate() { + reenter(this) { + + yield { + rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; + stringstream ss; + XMLFormatter formatter; + + encode_xml("CompleteMultipartUpload", req_enc, &formatter); + + formatter.flush(ss); + + bufferlist bl; + bl.append(ss.str()); + + call(new RGWPostRawRESTResourceCR (sync_env->cct, dest_conn, sync_env->http_manager, + obj_to_aws_path(dest_obj), params, bl, &out_bl)); + } + + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; + return set_cr_error(retcode); + } + { + /* + * If one of the following fails we cannot abort upload, as we cannot + * extract the upload id. If one of these fail it's very likely that that's + * the least of our problem. + */ + RGWXMLDecoder::XMLParser parser; + if (!parser.init()) { + ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl; + return set_cr_error(-EIO); + } + + if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl; + return set_cr_error(-EIO); + } + + try { + RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true); + } catch (RGWXMLDecoder::err& err) { + string str(out_bl.c_str(), out_bl.length()); + ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl; + return set_cr_error(-EIO); + } + } + + ldout(sync_env->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl; + + return set_cr_done(); + } + + return 0; + } +}; + class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; @@ -404,16 +548,15 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { uint32_t part_size; int num_parts; - struct part_info { - int part_num; - uint64_t ofs; - uint64_t size; - string etag; - }; - int cur_part{0}; uint64_t cur_ofs{0}; + map parts; + + multipart_part_info *pcur_part_info{nullptr}; + + int ret_err{0}; + public: RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env, RGWRESTConn *_source_conn, @@ -445,20 +588,46 @@ public: for (cur_part = 1; cur_part <= num_parts; ++cur_part) { yield { - part_info cur_part_info; - cur_part_info.part_num = cur_part + 1; + multipart_part_info& cur_part_info = parts[cur_part]; + cur_part_info.part_num = cur_part; cur_part_info.ofs = cur_ofs; cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs); + pcur_part_info = &cur_part_info; + cur_ofs += part_size; call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env, source_conn, src_obj, dest_conn, dest_obj, upload_id, - cur_part_info.ofs, cur_part_info.size, - cur_part_info.part_num)); + cur_part_info, + &cur_part_info.etag)); + } + + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ret_err = retcode; + yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; + } + return set_cr_error(ret_err); + } + + ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl; + + } + + yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; + ret_err = retcode; + yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl; } + return set_cr_error(ret_err); } return set_cr_done(); -- 2.39.5