From 7a130a9934bb822158b7f7a8a5e8472601a969df Mon Sep 17 00:00:00 2001 From: "Matthew N. Heler" Date: Fri, 17 Apr 2026 13:13:52 -0500 Subject: [PATCH] rgw/lc: add coroutine support for cloud-transition and cloud-restore LCWorker's cloud-tiering was blocking the io_context on every HTTP call, so rgw_lc_max_wp_worker coroutines ended up running one at a time instead of in parallel. Same story on the restore side. Pass the worker's optional_yield through RGWLCCloudTierCtx and use it from the stream classes, the multipart and status-obj helpers, and cloud_tier_restore. Transitions and restores now actually yield. Signed-off-by: Matthew N. Heler --- src/rgw/driver/rados/rgw_lc_tier.cc | 82 +++++++++++++++------------ src/rgw/driver/rados/rgw_lc_tier.h | 9 ++- src/rgw/driver/rados/rgw_sal_rados.cc | 4 +- 3 files changed, 54 insertions(+), 41 deletions(-) diff --git a/src/rgw/driver/rados/rgw_lc_tier.cc b/src/rgw/driver/rados/rgw_lc_tier.cc index 8328c1393de..f2fdf8e5c0a 100644 --- a/src/rgw/driver/rados/rgw_lc_tier.cc +++ b/src/rgw/driver/rados/rgw_lc_tier.cc @@ -102,7 +102,8 @@ static inline string obj_to_aws_path(const rgw_obj& obj) } static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver, - const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status) + const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status, + optional_yield y) { int ret = 0; rgw::sal::RadosStore *rados = dynamic_cast(driver); @@ -118,7 +119,7 @@ static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *d bufferlist bl; ret = rgw_get_system_obj(sysobj, pool, oid, bl, nullptr, nullptr, - null_yield, dpp); + y, dpp); if (ret < 0) { return ret; @@ -141,7 +142,8 @@ static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *d } static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver, - const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status) + const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status, + optional_yield y) { int ret = 0; rgw::sal::RadosStore *rados = dynamic_cast(driver); @@ -158,13 +160,13 @@ static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *dr status->encode(bl); ret = rgw_put_system_obj(dpp, sysobj, pool, oid, bl, true, nullptr, - real_time{}, null_yield); + real_time{}, y); return ret; } static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver, - const rgw_raw_obj *status_obj) + const rgw_raw_obj *status_obj, optional_yield y) { int ret = 0; rgw::sal::RadosStore *rados = dynamic_cast(driver); @@ -178,7 +180,7 @@ static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver const auto oid = status_obj->oid; auto sysobj = rados->svc()->sysobj; - ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield); + ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, y); return ret; } @@ -280,7 +282,7 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, if (!in_progress) { // first time. Send RESTORE req. rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name)); - ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params); + ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params, tier_ctx.y); ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl; @@ -364,7 +366,7 @@ int rgw_cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head, // accounted_size in complete_request() reads from RGWX_OBJECT_SIZE which is set // only for internal ops/sync. So instead read from headers[CONTENT_LEN]. // Same goes for pattrs. - ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, null_yield); + ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, tier_ctx.y); if (ret < 0) { if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) { ldpp_dout(tier_ctx.dpp, 20) << __func__ << "(): failed to fetch object from remote. retries=" << tries << dendl; @@ -496,11 +498,14 @@ class RGWLCStreamRead int retcode{0}; + optional_yield y; + public: RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp, - rgw::sal::Object *_obj, const real_time &_mtime) : + rgw::sal::Object *_obj, const real_time &_mtime, + optional_yield _y) : cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime), - read_op(obj->get_read_op()) {} + read_op(obj->get_read_op()), y(_y) {} ~RGWLCStreamRead() {}; int set_range(off_t _ofs, off_t _end); @@ -531,12 +536,16 @@ class RGWLCCloudStreamPut int retcode; + optional_yield y; + public: RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp, const rgw_lc_obj_properties& _obj_properties, RGWRESTConn& _conn, - const rgw_obj& _dest_obj) : - dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) { + const rgw_obj& _dest_obj, + optional_yield _y) : + dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj), + y(_y) { } int init(); static bool keep_attr(const std::string& h); @@ -578,7 +587,6 @@ void RGWLCStreamRead::set_multipart(uint64_t part_size, off_t part_off, off_t pa } int RGWLCStreamRead::init() { - optional_yield y = null_yield; real_time read_mtime; read_op->params.lastmod = &read_mtime; @@ -650,7 +658,7 @@ int RGWLCStreamRead::init_rest_obj() { } int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) { - int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield); + int ret = read_op->iterate(dpp, ofs, end, out_cb, y); return ret; } @@ -871,7 +879,7 @@ RGWGetDataCB *RGWLCCloudStreamPut::get_cb() { } int RGWLCCloudStreamPut::complete_request() { - return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, null_yield); + return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, y); } /* Read local copy and write to Cloud endpoint */ @@ -950,11 +958,11 @@ static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) { */ std::shared_ptr readf; readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp, - tier_ctx.obj, tier_ctx.o.meta.mtime)); + tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y)); std::shared_ptr writef; writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn, - dest_obj)); + dest_obj, tier_ctx.y)); /* actual Read & Write */ ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get()); @@ -992,11 +1000,11 @@ static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx, * be taking lot of time eventually erroring out at times. */ std::shared_ptr readf; readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp, - tier_ctx.obj, tier_ctx.o.meta.mtime)); + tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y)); std::shared_ptr writef; writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn, - dest_obj)); + dest_obj, tier_ctx.y)); /* Prepare Read from source */ end = part_info.ofs + part_info.size - 1; @@ -1021,7 +1029,8 @@ static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx, int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, const rgw_obj& dest_obj, std::optional days, - RGWZoneGroupTierS3Glacier& glacier_params) { + RGWZoneGroupTierS3Glacier& glacier_params, + optional_yield y) { rgw_http_param_pair params[] = {{"restore", nullptr}, {nullptr, nullptr}}; // XXX: include versionId=VersionId in the params above @@ -1068,7 +1077,7 @@ int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, bl.append(ss.str()); ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr, - out_bl, &bl, nullptr, null_yield); + out_bl, &bl, nullptr, y); if (ret < 0) { ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to send Restore request to cloud for obj=" << dest_obj << " , ret = " << ret << dendl; @@ -1112,7 +1121,7 @@ int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, const rgw_obj& dest_obj, - const std::string& upload_id) { + const std::string& upload_id, optional_yield y) { int ret; bufferlist out_bl; bufferlist bl; @@ -1120,7 +1129,7 @@ static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp, string resource = obj_to_aws_path(dest_obj); ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr, - out_bl, &bl, nullptr, null_yield); + out_bl, &bl, nullptr, y); if (ret < 0) { @@ -1134,7 +1143,7 @@ static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp, static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, const rgw_obj& dest_obj, uint64_t obj_size, std::map& attrs, - std::string& upload_id) { + std::string& upload_id, optional_yield y) { bufferlist out_bl; bufferlist bl; @@ -1156,7 +1165,7 @@ static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp, string resource = obj_to_aws_path(dest_obj); ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs, - out_bl, &bl, nullptr, null_yield); + out_bl, &bl, nullptr, y); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl; @@ -1197,7 +1206,8 @@ static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp, static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, const rgw_obj& dest_obj, std::string& upload_id, - const std::map& parts) { + const std::map& parts, + optional_yield y) { rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} }; stringstream ss; @@ -1242,7 +1252,7 @@ static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp, bl.append(ss.str()); ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr, - out_bl, &bl, nullptr, null_yield); + out_bl, &bl, nullptr, y); if (ret < 0) { @@ -1284,14 +1294,14 @@ static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx, const std::string& upload_id) { int ret; - ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id); + ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id, tier_ctx.y); if (ret < 0) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl; /* ignore error, best effort */ } /* remove status obj */ - ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj); + ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y); if (ret < 0) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl; // ignore error, best effort @@ -1344,7 +1354,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { rgw_pool pool = static_cast(tier_ctx.driver)->svc()->zone->get_zone_params().log_pool; status_obj = rgw_raw_obj(pool, "lc_multipart_" + tier_ctx.obj->get_oid()); - ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status); + ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y); if (ret < 0 && ret != -ENOENT) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl; @@ -1361,7 +1371,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { } if (ret == -ENOENT) { - RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime); + RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y); readf.init(); @@ -1369,7 +1379,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs); - ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id); + ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id, tier_ctx.y); if (ret < 0) { return ret; } @@ -1378,7 +1388,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { status.mtime = obj_properties.mtime; status.etag = obj_properties.etag; - ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status); + ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y); if (ret < 0) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to driver multipart upload state, ret=" << ret << dendl; @@ -1423,7 +1433,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { } - ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts); + ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts, tier_ctx.y); if (ret < 0) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl; cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id); @@ -1431,7 +1441,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) { } /* remove status obj */ - ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj); + ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y); if (ret < 0) { ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl; // ignore error, best effort @@ -1507,7 +1517,7 @@ static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) { } ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr, - out_bl, &bl, nullptr, null_yield); + out_bl, &bl, nullptr, tier_ctx.y); if (ret < 0 ) { ldpp_dout(tier_ctx.dpp, 0) << "create target bucket : " << tier_ctx.target_bucket_name << " returned ret:" << ret << dendl; diff --git a/src/rgw/driver/rados/rgw_lc_tier.h b/src/rgw/driver/rados/rgw_lc_tier.h index b6759d39bd6..44e2ea3883a 100644 --- a/src/rgw/driver/rados/rgw_lc_tier.h +++ b/src/rgw/driver/rados/rgw_lc_tier.h @@ -40,14 +40,16 @@ struct RGWLCCloudTierCtx { bool is_multipart_upload{false}; bool target_bucket_created{true}; + optional_yield y; + RGWLCCloudTierCtx(CephContext* _cct, const DoutPrefixProvider *_dpp, rgw_bucket_dir_entry& _o, rgw::sal::Driver *_driver, RGWBucketInfo &_binfo, rgw::sal::Object *_obj, RGWRESTConn& _conn, std::string& _bucket, - std::string& _storage_class) : + std::string& _storage_class, optional_yield _y) : cct(_cct), dpp(_dpp), o(_o), driver(_driver), bucket_info(_binfo), obj(_obj), conn(_conn), target_bucket_name(_bucket), - target_storage_class(_storage_class) {} + target_storage_class(_storage_class), y(_y) {} }; /* Transition object to cloud endpoint */ @@ -70,7 +72,8 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn, const rgw_obj& dest_obj, std::optional days, - RGWZoneGroupTierS3Glacier& glacier_params); + RGWZoneGroupTierS3Glacier& glacier_params, + optional_yield y); bool is_restore_in_progress(const DoutPrefixProvider *dpp, std::map& headers); diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index f70491e080c..71350710e31 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -3271,7 +3271,7 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, // save source cloudtier storage class RGWLCCloudTierCtx tier_ctx(cct, dpp, ent, store, bucket->get_info(), this, conn, bucket_name, - rtier->get_rt().t.s3.target_storage_class); + rtier->get_rt().t.s3.target_storage_class, y); tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings; tier_ctx.multipart_min_part_size = rtier->get_rt().t.s3.multipart_min_part_size; tier_ctx.multipart_sync_threshold = rtier->get_rt().t.s3.multipart_sync_threshold; @@ -3347,7 +3347,7 @@ int RadosObject::transition_to_cloud(Bucket* bucket, RGWLCCloudTierCtx tier_ctx(cct, dpp, o, store, bucket->get_info(), this, conn, bucket_name, - rtier->get_rt().t.s3.target_storage_class); + rtier->get_rt().t.s3.target_storage_class, y); tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings; tier_ctx.multipart_min_part_size = rtier->get_rt().t.s3.multipart_min_part_size; tier_ctx.multipart_sync_threshold = rtier->get_rt().t.s3.multipart_sync_threshold; -- 2.47.3