protected:
rgw_rest_obj rest_obj;
+ struct range_info {
+ bool is_set{false};
+ uint64_t ofs;
+ uint64_t size;
+ } range;
+
public:
RGWStreamReadHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
rgw_rest_obj& get_rest_obj() {
return rest_obj;
}
+
+ void set_range(uint64_t ofs, uint64_t size) {
+ range.is_set = true;
+ range.ofs = ofs;
+ range.size = size;
+ }
};
class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
RGWHTTPStreamRWRequest *req{nullptr};
+ struct multipart_info {
+ bool is_multipart{false};
+ string upload_id;
+ int part_num{0};
+ uint64_t part_size;
+ } multipart;
+
public:
RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
void set_req(RGWHTTPStreamRWRequest *r) {
req = r;
}
+
+ void set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
+ multipart.is_multipart = true;
+ multipart.upload_id = upload_id;
+ multipart.part_num = part_num;
+ multipart.part_size = part_size;
+ }
};
class RGWStreamSpliceCR : public RGWCoroutine {
*resource = urlsafe_bucket + "/" + urlsafe_object;
}
-int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr)
+int RGWRESTStreamRWRequest::send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr)
{
string resource;
send_prepare_convert(obj, &resource);
return send_request(&key, extra_headers, resource, mgr);
}
-int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj)
+int RGWRESTStreamRWRequest::send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj)
{
string resource;
send_prepare_convert(obj, &resource);
virtual ~RGWRESTStreamRWRequest() override {}
int send_prepare(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, bufferlist *send_data = nullptr /* optional input data */);
- int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj);
+ int send_prepare(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj);
int send(RGWHTTPManager *mgr);
- int send_request(RGWAccessKey& key, map<string, string>& extra_headers, rgw_obj& obj, RGWHTTPManager *mgr);
+ int send_request(RGWAccessKey& key, map<string, string>& extra_headers, const rgw_obj& obj, RGWHTTPManager *mgr);
int send_request(RGWAccessKey *key, map<string, string>& extra_headers, const string& resource, RGWHTTPManager *mgr, bufferlist *send_data = nullptr /* optional input data */);
int complete_request(string& etag, real_time *mtime, uint64_t *psize, map<string, string>& attrs);
+
+ void add_params(param_vec_t *params);
};
class RGWRESTStreamReadRequest : public RGWRESTStreamRWRequest {
explicit StreamObjData(rgw_obj& _obj) : obj(_obj) {}
};
-int RGWRESTConn::put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req)
+int RGWRESTConn::put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req)
{
string url;
int ret = get_url(url);
rgw_user uid;
param_vec_t params;
populate_params(params, &uid, self_zone_group);
+
+ if (extra_params) {
+ append_param_list(params, extra_params);
+ }
+
RGWRESTStreamS3PutObj *wr = new RGWRESTStreamS3PutObj(cct, "PUT", url, NULL, ¶ms);
wr->send_init(obj);
*req = wr;
}
-int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
+int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj,
const real_time *mod_ptr, const real_time *unmod_ptr,
uint32_t mod_zone_id, uint64_t mod_pg_ver,
bool prepend_metadata, bool get_op, bool rgwx_stat,
bool sync_manifest, bool skip_decrypt,
bool send, RGWGetDataCB *cb, RGWRESTStreamRWRequest **req)
+{
+ get_obj_params params;
+ params.uid = uid;
+ params.info = info;
+ params.mod_ptr = mod_ptr;
+ params.mod_pg_ver = mod_pg_ver;
+ params.prepend_metadata = prepend_metadata;
+ params.get_op = get_op;
+ params.rgwx_stat = rgwx_stat;
+ params.sync_manifest = sync_manifest;
+ params.skip_decrypt = skip_decrypt;
+ params.cb = cb;
+ return get_obj(obj, params, send, req);
+}
+
+int RGWRESTConn::get_obj(const rgw_obj& obj, const get_obj_params& in_params, bool send, RGWRESTStreamRWRequest **req)
{
string url;
int ret = get_url(url);
return ret;
param_vec_t params;
- populate_params(params, &uid, self_zone_group);
- if (prepend_metadata) {
+ populate_params(params, &in_params.uid, self_zone_group);
+ if (in_params.prepend_metadata) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "prepend-metadata", "true"));
}
- if (rgwx_stat) {
+ if (in_params.rgwx_stat) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "stat", "true"));
}
- if (sync_manifest) {
+ if (in_params.sync_manifest) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "sync-manifest", ""));
}
- if (skip_decrypt) {
+ if (in_params.skip_decrypt) {
params.push_back(param_pair_t(RGW_SYS_PARAM_PREFIX "skip-decrypt", ""));
}
if (!obj.key.instance.empty()) {
const string& instance = obj.key.instance;
params.push_back(param_pair_t("versionId", instance));
}
- if (get_op) {
- *req = new RGWRESTStreamReadRequest(cct, url, cb, NULL, ¶ms);
+ if (in_params.get_op) {
+ *req = new RGWRESTStreamReadRequest(cct, url, in_params.cb, NULL, ¶ms);
} else {
- *req = new RGWRESTStreamHeadRequest(cct, url, cb, NULL, ¶ms);
+ *req = new RGWRESTStreamHeadRequest(cct, url, in_params.cb, NULL, ¶ms);
}
map<string, string> extra_headers;
- if (info) {
- const auto& orig_map = info->env->get_map();
+ if (in_params.info) {
+ const auto& orig_map = in_params.info->env->get_map();
/* add original headers that start with HTTP_X_AMZ_ */
static constexpr char SEARCH_AMZ_PREFIX[] = "HTTP_X_AMZ_";
}
}
- set_date_header(mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE");
- set_date_header(unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE");
- if (mod_zone_id != 0) {
- set_header(mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID");
+ set_date_header(in_params.mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE");
+ set_date_header(in_params.unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE");
+ if (in_params.mod_zone_id != 0) {
+ set_header(in_params.mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID");
+ }
+ if (in_params.mod_pg_ver != 0) {
+ set_header(in_params.mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
}
- if (mod_pg_ver != 0) {
- set_header(mod_pg_ver, extra_headers, "HTTP_DEST_PG_VER");
+ if (in_params.range_is_set) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "bytes=%lld-%lld", (long long)in_params.range_start, (long long)in_params.range_end);
+ set_header(buf, extra_headers, "RANGE");
}
int r = (*req)->send_prepare(key, extra_headers, obj);
const char *val;
};
-// copy a null-terminated rgw_http_param_pair list into a list of string pairs
-inline param_vec_t make_param_list(const rgw_http_param_pair* pp)
+// append a null-terminated rgw_http_param_pair list into a list of string pairs
+inline void append_param_list(param_vec_t& params, const rgw_http_param_pair* pp)
{
- param_vec_t params;
while (pp && pp->key) {
string k = pp->key;
string v = (pp->val ? pp->val : "");
params.emplace_back(make_pair(std::move(k), std::move(v)));
++pp;
}
+}
+
+// copy a null-terminated rgw_http_param_pair list into a list of string pairs
+inline param_vec_t make_param_list(const rgw_http_param_pair* pp)
+{
+ param_vec_t params;
+ append_param_list(params, pp);
return params;
}
/* async requests */
- int put_obj_send_init(rgw_obj& obj, RGWRESTStreamS3PutObj **req);
+ int put_obj_send_init(rgw_obj& obj, const rgw_http_param_pair *extra_params, RGWRESTStreamS3PutObj **req);
int put_obj_async(const rgw_user& uid, rgw_obj& obj, uint64_t obj_size,
map<string, bufferlist>& attrs, bool send, RGWRESTStreamS3PutObj **req);
int complete_request(RGWRESTStreamS3PutObj *req, string& etag, ceph::real_time *mtime);
- int get_obj(const rgw_user& uid, req_info *info /* optional */, rgw_obj& obj,
+ struct get_obj_params {
+ rgw_user uid;
+ req_info *info{nullptr};
+ const ceph::real_time *mod_ptr{nullptr};
+ const ceph::real_time *unmod_ptr{nullptr};
+
+ uint32_t mod_zone_id{0};
+ uint64_t mod_pg_ver{0};
+
+ bool prepend_metadata{false};
+ bool get_op{false};
+ bool rgwx_stat{false};
+ bool sync_manifest{false};
+
+ bool skip_decrypt{true};
+ RGWGetDataCB *cb{nullptr};
+
+ bool range_is_set{false};
+ uint64_t range_start{0};
+ uint64_t range_end{0};
+ };
+
+ int get_obj(const rgw_obj& obj, const get_obj_params& params, bool send, RGWRESTStreamRWRequest **req);
+
+ int get_obj(const rgw_user& uid, req_info *info /* optional */, const rgw_obj& obj,
const ceph::real_time *mod_ptr, const ceph::real_time *unmod_ptr,
uint32_t mod_zone_id, uint64_t mod_pg_ver,
bool prepend_metadata, bool get_op, bool rgwx_stat, bool sync_manifest,
return object_name;
}
+static string obj_to_aws_path(const rgw_obj& obj)
+{
+ return obj.bucket.name + "/" + obj.key.name;
+}
+
struct AWSConfig {
string id;
std::unique_ptr<RGWRESTConn> conn;
RGWDataSyncEnv *sync_env;
RGWRESTConn *conn;
rgw_obj src_obj;
+ RGWRESTConn::get_obj_params req_params;
public:
RGWRESTStreamGetCRF(CephContext *_cct,
RGWCoroutinesEnv *_env,
int init() override {
/* init input connection */
+
+
+ req_params.get_op = true;
+ req_params.prepend_metadata = true;
+
+ if (range.is_set) {
+ req_params.range_is_set = true;
+ req_params.range_start = range.ofs;
+ req_params.range_end = range.ofs + range.size - 1;
+ }
+
RGWRESTStreamRWRequest *in_req;
- int ret = conn->get_obj(rgw_user(), nullptr, src_obj,
- nullptr /* mod_ptr */, nullptr /* unmod_ptr */, 0 /* mod_zone_id */, 0 /* mod_pg_ver */,
- true /* prepend_metadata */, true /* get_op */, false /*rgwx_stat */,
- false /* sync_manifest */, true /* skip_descrypt */, false /* send */,
- nullptr /* cb */, &in_req);
+ int ret = conn->get_obj(src_obj, req_params, false /* send */, &in_req);
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
return ret;
/* init output connection */
RGWRESTStreamS3PutObj *out_req{nullptr};
- conn->put_obj_send_init(dest_obj, &out_req);
+ if (multipart.is_multipart) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+ rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+ { "partNumber", buf },
+ { nullptr, nullptr } };
+ conn->put_obj_send_init(dest_obj, params, &out_req);
+ } else {
+ conn->put_obj_send_init(dest_obj, nullptr, &out_req);
+ }
set_req(out_req);
}
};
-#if 0
class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
rgw_obj src_obj;
rgw_obj dest_obj;
+ string upload_id;
uint64_t ofs;
uint64_t size;
int part_num;
const rgw_obj& _src_obj,
RGWRESTConn *_dest_conn,
const rgw_obj& _dest_obj,
+ const string& _upload_id,
uint64_t _ofs,
uint64_t _size,
int _part_num) : RGWCoroutine(_sync_env->cct),
dest_conn(_dest_conn),
src_obj(_src_obj),
dest_obj(_dest_obj),
+ upload_id(_upload_id),
ofs(_ofs), size(_size),
part_num(_part_num) {}
out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
dest_obj));
- out_crf->set_multipart(part_num, size);
+ out_crf->set_multipart(upload_id, part_num, size);
yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
if (retcode < 0) {
return 0;
}
};
-#endif
+
+class RGWAWSAbortMultipartCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTConn *dest_conn;
+ rgw_obj dest_obj;
+
+ string upload_id;
+
+public:
+ RGWAWSAbortMultipartCR(RGWDataSyncEnv *_sync_env,
+ RGWRESTConn *_dest_conn,
+ const rgw_obj& _dest_obj,
+ const string& _upload_id) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ upload_id(_upload_id) {}
+
+ int operate() {
+ reenter(this) {
+
+ yield {
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+ bufferlist bl;
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, sync_env->http_manager,
+ obj_to_aws_path(dest_obj), params));
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
class RGWAWSInitMultipartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
reenter(this) {
yield {
- string path = dest_obj.bucket.name + "/" + dest_obj.key.name;
rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
bufferlist bl;
call(new RGWPostRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn, sync_env->http_manager,
- path, params, bl, &out_bl));
+ obj_to_aws_path(dest_obj), params, bl, &out_bl));
}
if (retcode < 0) {
return set_cr_error(retcode);
}
{
-#warning need to cancel upload in case of error here
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
RGWXMLDecoder::XMLParser parser;
if (!parser.init()) {
ldout(sync_env->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;