}
static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
- const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+ const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status,
+ optional_yield y)
{
int ret = 0;
rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
bufferlist bl;
ret = rgw_get_system_obj(sysobj, pool, oid, bl, nullptr, nullptr,
- null_yield, dpp);
+ y, dpp);
if (ret < 0) {
return ret;
}
static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
- const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+ const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status,
+ optional_yield y)
{
int ret = 0;
rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
status->encode(bl);
ret = rgw_put_system_obj(dpp, sysobj, pool, oid, bl, true, nullptr,
- real_time{}, null_yield);
+ real_time{}, y);
return ret;
}
static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
- const rgw_raw_obj *status_obj)
+ const rgw_raw_obj *status_obj, optional_yield y)
{
int ret = 0;
rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
const auto oid = status_obj->oid;
auto sysobj = rados->svc()->sysobj;
- ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield);
+ ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, y);
return ret;
}
if (!in_progress) { // first time. Send RESTORE req.
rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
- ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
+ ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params, tier_ctx.y);
ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl;
// accounted_size in complete_request() reads from RGWX_OBJECT_SIZE which is set
// only for internal ops/sync. So instead read from headers[CONTENT_LEN].
// Same goes for pattrs.
- ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, null_yield);
+ ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, tier_ctx.y);
if (ret < 0) {
if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
ldpp_dout(tier_ctx.dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl;
int retcode{0};
+ optional_yield y;
+
public:
RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp,
- rgw::sal::Object *_obj, const real_time &_mtime) :
+ rgw::sal::Object *_obj, const real_time &_mtime,
+ optional_yield _y) :
cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime),
- read_op(obj->get_read_op()) {}
+ read_op(obj->get_read_op()), y(_y) {}
~RGWLCStreamRead() {};
int set_range(off_t _ofs, off_t _end);
int retcode;
+ optional_yield y;
+
public:
RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp,
const rgw_lc_obj_properties& _obj_properties,
RGWRESTConn& _conn,
- const rgw_obj& _dest_obj) :
- dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
+ const rgw_obj& _dest_obj,
+ optional_yield _y) :
+ dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj),
+ y(_y) {
}
int init();
static bool keep_attr(const std::string& h);
}
int RGWLCStreamRead::init() {
- optional_yield y = null_yield;
real_time read_mtime;
read_op->params.lastmod = &read_mtime;
}
int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) {
- int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield);
+ int ret = read_op->iterate(dpp, ofs, end, out_cb, y);
return ret;
}
}
int RGWLCCloudStreamPut::complete_request() {
- return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, null_yield);
+ return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, y);
}
/* Read local copy and write to Cloud endpoint */
*/
std::shared_ptr<RGWLCStreamRead> readf;
readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
- tier_ctx.obj, tier_ctx.o.meta.mtime));
+ tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y));
std::shared_ptr<RGWLCCloudStreamPut> writef;
writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
- dest_obj));
+ dest_obj, tier_ctx.y));
/* actual Read & Write */
ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
* be taking lot of time eventually erroring out at times. */
std::shared_ptr<RGWLCStreamRead> readf;
readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
- tier_ctx.obj, tier_ctx.o.meta.mtime));
+ tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y));
std::shared_ptr<RGWLCCloudStreamPut> writef;
writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
- dest_obj));
+ dest_obj, tier_ctx.y));
/* Prepare Read from source */
end = part_info.ofs + part_info.size - 1;
int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn,
const rgw_obj& dest_obj, std::optional<uint64_t> days,
- RGWZoneGroupTierS3Glacier& glacier_params) {
+ RGWZoneGroupTierS3Glacier& glacier_params,
+ optional_yield y) {
rgw_http_param_pair params[] = {{"restore", nullptr}, {nullptr, nullptr}};
// XXX: include versionId=VersionId in the params above
bl.append(ss.str());
ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
- out_bl, &bl, nullptr, null_yield);
+ out_bl, &bl, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to send Restore request to cloud for obj=" << dest_obj << " , ret = " << ret << dendl;
static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
- const std::string& upload_id) {
+ const std::string& upload_id, optional_yield y) {
int ret;
bufferlist out_bl;
bufferlist bl;
string resource = obj_to_aws_path(dest_obj);
ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr,
- out_bl, &bl, nullptr, null_yield);
+ out_bl, &bl, nullptr, y);
if (ret < 0) {
static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
uint64_t obj_size, std::map<std::string, std::string>& attrs,
- std::string& upload_id) {
+ std::string& upload_id, optional_yield y) {
bufferlist out_bl;
bufferlist bl;
string resource = obj_to_aws_path(dest_obj);
ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs,
- out_bl, &bl, nullptr, null_yield);
+ out_bl, &bl, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
std::string& upload_id,
- const std::map<int, rgw_lc_multipart_part_info>& parts) {
+ const std::map<int, rgw_lc_multipart_part_info>& parts,
+ optional_yield y) {
rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
stringstream ss;
bl.append(ss.str());
ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
- out_bl, &bl, nullptr, null_yield);
+ out_bl, &bl, nullptr, y);
if (ret < 0) {
const std::string& upload_id) {
int ret;
- ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id);
+ ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id, tier_ctx.y);
if (ret < 0) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl;
/* ignore error, best effort */
}
/* remove status obj */
- ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj);
+ ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y);
if (ret < 0) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl;
// ignore error, best effort
rgw_pool pool = static_cast<rgw::sal::RadosStore*>(tier_ctx.driver)->svc()->zone->get_zone_params().log_pool;
status_obj = rgw_raw_obj(pool, "lc_multipart_" + tier_ctx.obj->get_oid());
- ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status);
+ ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl;
}
if (ret == -ENOENT) {
- RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime);
+ RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y);
readf.init();
RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs);
- ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id);
+ ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id, tier_ctx.y);
if (ret < 0) {
return ret;
}
status.mtime = obj_properties.mtime;
status.etag = obj_properties.etag;
- ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status);
+ ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y);
if (ret < 0) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to driver multipart upload state, ret=" << ret << dendl;
}
- ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts);
+ ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts, tier_ctx.y);
if (ret < 0) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl;
cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
}
/* remove status obj */
- ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj);
+ ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y);
if (ret < 0) {
ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl;
// ignore error, best effort
}
ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr,
- out_bl, &bl, nullptr, null_yield);
+ out_bl, &bl, nullptr, tier_ctx.y);
if (ret < 0 ) {
ldpp_dout(tier_ctx.dpp, 0) << "create target bucket : " << tier_ctx.target_bucket_name << " returned ret:" << ret << dendl;