}
};
+
+class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *source_conn;
+ RGWRESTConn *dest_conn;
+ rgw_obj src_obj;
+ rgw_obj dest_obj;
+
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+public:
+ RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_source_conn,
+ const rgw_obj& _src_obj,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ source_conn(_source_conn),
+ dest_conn(_dest_conn),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj) {}
+
+ int operate() {
+ reenter(this) {
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+ /* init output */
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+ dest_obj));
+
+ yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+#if 0
+class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *source_conn;
+ RGWRESTConn *dest_conn;
+ rgw_obj src_obj;
+ rgw_obj dest_obj;
+
+ uint64_t ofs;
+ uint64_t size;
+ int part_num;
+
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+public:
+ RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_source_conn,
+ const rgw_obj& _src_obj,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ uint64_t _ofs,
+ uint64_t _size,
+ int _part_num) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ source_conn(_source_conn),
+ dest_conn(_dest_conn),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj),
+ ofs(_ofs), size(_size),
+ part_num(_part_num) {}
+
+ int operate() {
+ reenter(this) {
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+ in_crf->set_range(ofs, size);
+
+ /* init output */
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+ dest_obj));
+
+ out_crf->set_multipart(part_num, size);
+
+ yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+#endif
+
+class RGWAWSInitMultipartCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *dest_conn;
+ rgw_obj dest_obj;
+
+ uint64_t obj_size;
+ ceph::real_time mtime;
+
+ bufferlist out_bl;
+
+ string *upload_id;
+
+ struct InitMultipartResult {
+ string bucket;
+ string key;
+ string upload_id;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+ RGWXMLDecoder::decode_xml("Key", key, obj);
+ RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
+ }
+ } result;
+
+public:
+ RGWAWSInitMultipartCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ uint64_t _obj_size,
+ const ceph::real_time& _mtime,
+ string *_upload_id) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ obj_size(_obj_size),
+ mtime(_mtime),
+ upload_id(_upload_id) {}
+
+ int operate() {
+ reenter(this) {
+
+ yield {
+ string path = dest_obj.bucket.name + "/" + dest_obj.key.name;
+ rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
+ bufferlist bl;
+ call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
+ path, params, bl, &out_bl));
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+ return set_cr_error(retcode);
+ }
+ {
+#warning need to cancel upload in case of error here
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sync_env->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ try {
+ RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sync_env->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+ }
+
+ ldout(sync_env->cct, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+
+ *upload_id = result.upload_id;
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *source_conn;
+ RGWRESTConn *dest_conn;
+ rgw_obj src_obj;
+ rgw_obj dest_obj;
+
+ uint64_t obj_size;
+ ceph::real_time mtime;
+
+ string upload_id;
+
+public:
+ RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_source_conn,
+ const rgw_obj& _src_obj,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ uint64_t _obj_size,
+ const ceph::real_time& _mtime) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ source_conn(_source_conn),
+ dest_conn(_dest_conn),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj),
+ obj_size(_obj_size),
+ mtime(_mtime) {}
+
+ int operate() {
+ reenter(this) {
+#if 0
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+
+ /* init output */
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
+ dest_obj));
+
+ yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+#endif
+
+ yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, obj_size, mtime, &upload_id));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
const AWSConfig& conf;
bufferlist res;
unordered_map <string, bool> bucket_created;
string target_bucket_name;
- std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
- std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
rgw_rest_obj rest_obj;
string obj_path;
int ret{0};
+ static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
+
public:
RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info,
bucket_created[target_bucket_name] = true;
}
- {
+ yield {
rgw_obj src_obj(bucket_info.bucket, key);
- /* init input */
- in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
-
- ret = in_crf->init();
- if (ret < 0) {
- return set_cr_error(ret);
- }
/* init output */
rgw_bucket target_bucket;
target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
- uri resolution */
+ uri resolution */
rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, conf.conn.get(),
- dest_obj));
- ret = out_crf->init();
- if (ret < 0) {
- return set_cr_error(ret);
+ if (size < multipart_threshold) {
+ call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, conf.conn.get(), dest_obj));
+ } else {
+ call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, conf.conn.get(),
+ dest_obj, size, mtime));
}
}
-
- yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
if (retcode < 0) {
return set_cr_error(retcode);
}