#include "rgw_sync_module.h"
#include "rgw_data_sync.h"
#include "rgw_sync_module_aws.h"
+#include "rgw_cr_rados.h"
#include "rgw_rest_conn.h"
#include "rgw_cr_rest.h"
#include "rgw_acl.h"
}
};
-struct multipart_part_info {
- int part_num;
- uint64_t ofs;
- uint64_t size;
- string etag;
-};
-
class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
string upload_id;
- multipart_part_info part_info;
+ rgw_sync_aws_multipart_part_info part_info;
std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
const string& _upload_id,
- const multipart_part_info& _part_info,
+ const rgw_sync_aws_multipart_part_info& _part_info,
string *_petag) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
string upload_id;
struct CompleteMultipartReq {
- map<int, multipart_part_info> parts;
+ map<int, rgw_sync_aws_multipart_part_info> parts;
- CompleteMultipartReq(const map<int, multipart_part_info>& _parts) : parts(_parts) {}
+ CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
void dump_xml(Formatter *f) const {
for (auto p : parts) {
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
string _upload_id,
- const map<int, multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
+ const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
dest_conn(_dest_conn),
dest_obj(_dest_obj),
}
};
+
+class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *dest_conn;
+ const rgw_obj dest_obj;
+ const rgw_raw_obj status_obj;
+
+ string upload_id;
+
+public:
+
+ RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ const rgw_raw_obj& _status_obj,
+ const string& _upload_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ status_obj(_status_obj),
+ upload_id(_upload_id) {}
+
+ int operate() {
+ reenter(this) {
+ yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+ /* ignore error, best effort */
+ }
+ yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
+ /* ignore error, best effort */
+ }
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
uint64_t obj_size;
ceph::real_time mtime;
- string upload_id;
-
- uint32_t part_size;
- int num_parts;
-
- int cur_part{0};
- uint64_t cur_ofs{0};
+ rgw_sync_aws_multipart_upload_info status;
- map<int, multipart_part_info> parts;
-
- multipart_part_info *pcur_part_info{nullptr};
+ rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
int ret_err{0};
+ rgw_raw_obj status_obj;
+
public:
RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
RGWRESTConn *_source_conn,
src_obj(_src_obj),
dest_obj(_dest_obj),
obj_size(_obj_size),
- mtime(_mtime) {
-#warning flexible part size needed
- part_size = 5 * 1024 * 1024;
-
- num_parts = (obj_size + part_size - 1) / part_size;
+ mtime(_mtime),
+ status_obj(sync_env->store->get_zone_params().log_pool,
+ RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
}
int operate() {
reenter(this) {
- yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, obj_size, mtime, &upload_id));
- if (retcode < 0) {
- return set_cr_error(retcode);
+ yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->store,
+ status_obj, &status, false));
+
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+ return retcode;
+ }
+
+ if (retcode >= 0) {
+ /* check here that mtime and size did not change */
+
+ if (status.mtime != mtime || status.obj_size != obj_size) {
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
+ retcode = -ENOENT;
+ }
}
- for (cur_part = 1; cur_part <= num_parts; ++cur_part) {
+ if (retcode == -ENOENT) {
+ yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ status.obj_size = obj_size;
+ status.mtime = mtime;
+#warning flexible part size needed
+ status.part_size = 5 * 1024 * 1024;
+ status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
+ status.cur_part = 1;
+ }
+
+ for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
yield {
- multipart_part_info& cur_part_info = parts[cur_part];
- cur_part_info.part_num = cur_part;
- cur_part_info.ofs = cur_ofs;
- cur_part_info.size = std::min((uint64_t)part_size, obj_size - cur_ofs);
+ rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
+ cur_part_info.part_num = status.cur_part;
+ cur_part_info.ofs = status.cur_ofs;
+ cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
pcur_part_info = &cur_part_info;
- cur_ofs += part_size;
+ status.cur_ofs += status.part_size;
call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
source_conn, src_obj,
dest_conn, dest_obj,
- upload_id,
+ status.upload_id,
cur_part_info,
&cur_part_info.etag));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << upload_id << " part number " << cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
- }
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
- ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << cur_part << " etag=" << pcur_part_info->etag << dendl;
-
+ yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(sync_env->async_rados, sync_env->store, status_obj, status));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
+ /* continue with upload anyway */
+ }
+ ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
}
- yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, upload_id, parts));
+ yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, status.upload_id, status.parts));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWAWSAbortMultipartCR(sync_env, dest_conn, dest_obj, upload_id));
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << upload_id << " part number " << cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
- }
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
+ /* remove status obj */
+ yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+ /* ignore error, best effort */
+ }
return set_cr_done();
}