+#include "common/errno.h"
+
#include "rgw_common.h"
#include "rgw_coroutine.h"
#include "rgw_sync_module.h"
RGWRESTConn *conn;
rgw_obj src_obj;
RGWRESTConn::get_obj_params req_params;
+
+ string etag;
public:
RGWRESTStreamGetCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
const string& val = header.second;
if (header.first == "RGWX_OBJECT_SIZE") {
rest_obj.content_len = atoi(val.c_str());
+ } else if (header.first == "ETAG") {
+ etag = val;
} else {
rest_obj.attrs[header.first] = val;
}
RGWDataSyncEnv *sync_env;
RGWRESTConn *conn;
rgw_obj dest_obj;
+ string etag;
public:
RGWAWSStreamPutCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
r->send_ready(conn->get_key(), new_attrs, false);
}
+
+ void handle_headers(const map<string, string>& headers) {
+ for (auto h : headers) {
+ if (h.first == "ETAG") {
+ etag = h.second;
+ }
+ }
+ }
+
+ bool get_etag(string *petag) {
+ if (etag.empty()) {
+ return false;
+ }
+ *petag = etag;
+ return true;
+ }
};
}
};
+struct multipart_part_info {
+ int part_num;
+ uint64_t ofs;
+ uint64_t size;
+ string etag;
+};
+
class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
rgw_obj dest_obj;
string upload_id;
- uint64_t ofs;
- uint64_t size;
- int part_num;
+
+ multipart_part_info part_info;
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+ string *petag;
+
public:
RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env,
RGWRESTConn *_source_conn,
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
const string& _upload_id,
- uint64_t _ofs,
- uint64_t _size,
- int _part_num) : RGWCoroutine(_sync_env->cct),
+ const multipart_part_info& _part_info,
+ string *_petag) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
dest_conn(_dest_conn),
src_obj(_src_obj),
dest_obj(_dest_obj),
upload_id(_upload_id),
- ofs(_ofs), size(_size),
- part_num(_part_num) {}
+ part_info(_part_info),
+ petag(_petag) {}
int operate() {
reenter(this) {
/* init input */
in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
- in_crf->set_range(ofs, size);
+ in_crf->set_range(part_info.ofs, part_info.size);
/* init output */
out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
dest_obj));
- out_crf->set_multipart(upload_id, part_num, size);
+ out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
if (retcode < 0) {
return set_cr_error(retcode);
}
+ if (!((RGWAWSStreamPutCRF *)out_crf.get())->get_etag(petag)) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+ return set_cr_error(-EIO);
+ }
+
return set_cr_done();
}
}
};
+class RGWAWSCompleteMultipartCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *dest_conn;
+ rgw_obj dest_obj;
+
+ bufferlist out_bl;
+
+ string upload_id;
+
+ struct CompleteMultipartReq {
+ map<int, multipart_part_info> parts;
+
+ CompleteMultipartReq(const map<int, multipart_part_info>& _parts) : parts(_parts) {}
+
+ void dump_xml(Formatter *f) const {
+ for (auto p : parts) {
+ f->open_object_section("Part");
+ encode_xml("PartNumber", p.first, f);
+ encode_xml("ETag", p.second.etag, f);
+ f->close_section();
+ };
+ }
+ } req_enc;
+
+ struct CompleteMultipartResult {
+ string location;
+ string bucket;
+ string key;
+ string etag;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Location", bucket, obj);
+ RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+ RGWXMLDecoder::decode_xml("Key", key, obj);
+ RGWXMLDecoder::decode_xml("ETag", etag, obj);
+ }
+ } result;
+
+public:
+ RGWAWSCompleteMultipartCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ string _upload_id,
+ const map<int, multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ upload_id(_upload_id),
+ req_enc(_parts) {}
+
+ int operate() {
+ reenter(this) {
+
+ yield {
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+ stringstream ss;
+ XMLFormatter formatter;
+
+ encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+
+ formatter.flush(ss);
+
+ bufferlist bl;
+ bl.append(ss.str());
+
+ call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
+ obj_to_aws_path(dest_obj), params, bl, &out_bl));
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+ return set_cr_error(retcode);
+ }
+ {
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ try {
+ RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+ }
+
+ ldout(sync_env->cct, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
uint32_t part_size;
int num_parts;
- struct part_info {
- int part_num;
- uint64_t ofs;
- uint64_t size;
- string etag;
- };
-
int cur_part{0};
uint64_t cur_ofs{0};
+ map<int, multipart_part_info> parts;
+
+ multipart_part_info *pcur_part_info{nullptr};
+
+ int ret_err{0};
+
public:
RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
RGWRESTConn *_source_conn,
for (cur_part = 1; cur_part <= num_parts; ++cur_part) {
yield {
- part_info cur_part_info;
- cur_part_info.part_num = cur_part + 1;
+ multipart_part_info& cur_part_info = parts[cur_part];
+ cur_part_info.part_num = cur_part;
cur_part_info.ofs = cur_ofs;
cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
+ pcur_part_info = &cur_part_info;
+
cur_ofs += part_size;
call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
source_conn, src_obj,
dest_conn, dest_obj,
upload_id,
- cur_part_info.ofs, cur_part_info.size,
- cur_part_info.part_num));
+ cur_part_info,
+ &cur_part_info.etag));
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+ ret_err = retcode;
+ yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+ }
+ return set_cr_error(ret_err);
+ }
+
+ ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl;
+
+ }
+
+ yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+ ret_err = retcode;
+ yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
}
+ return set_cr_error(ret_err);
}
return set_cr_done();