]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/lc: add coroutine support for cloud-transition and cloud-restore 68453/head
authorMatthew N. Heler <matthew.heler@hotmail.com>
Fri, 17 Apr 2026 18:13:52 +0000 (13:13 -0500)
committerMatthew N. Heler <matthew.heler@hotmail.com>
Fri, 24 Apr 2026 11:10:20 +0000 (06:10 -0500)
LCWorker's cloud-tiering was blocking the io_context on every HTTP
call, so rgw_lc_max_wp_worker coroutines ended up running one at a
time instead of in parallel. Same story on the restore side.

Pass the worker's optional_yield through RGWLCCloudTierCtx and use it
from the stream classes, the multipart and status-obj helpers, and
cloud_tier_restore. Transitions and restores now actually yield.

Signed-off-by: Matthew N. Heler <matthew.heler@hotmail.com>
src/rgw/driver/rados/rgw_lc_tier.cc
src/rgw/driver/rados/rgw_lc_tier.h
src/rgw/driver/rados/rgw_sal_rados.cc

index 8328c1393debe1899a59b26028af04c5f0b82fdc..f2fdf8e5c0a181d327e94ab567f5f3c9784d9eb3 100644 (file)
@@ -102,7 +102,8 @@ static inline string obj_to_aws_path(const rgw_obj& obj)
 }
 
 static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
-    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status,
+    optional_yield y)
 {
   int ret = 0;
   rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
@@ -118,7 +119,7 @@ static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *d
   bufferlist bl;
 
   ret = rgw_get_system_obj(sysobj, pool, oid, bl, nullptr, nullptr,
-      null_yield, dpp);
+      y, dpp);
 
   if (ret < 0) {
     return ret;
@@ -141,7 +142,8 @@ static int read_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *d
 }
 
 static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
-    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status)
+    const rgw_raw_obj *status_obj, rgw_lc_multipart_upload_info *status,
+    optional_yield y)
 {
   int ret = 0;
   rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
@@ -158,13 +160,13 @@ static int put_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *dr
   status->encode(bl);
 
   ret = rgw_put_system_obj(dpp, sysobj, pool, oid, bl, true, nullptr,
-      real_time{}, null_yield);
+      real_time{}, y);
 
   return ret;
 }
 
 static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver *driver,
-    const rgw_raw_obj *status_obj)
+    const rgw_raw_obj *status_obj, optional_yield y)
 {
   int ret = 0;
   rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(driver);
@@ -178,7 +180,7 @@ static int delete_upload_status(const DoutPrefixProvider *dpp, rgw::sal::Driver
   const auto oid = status_obj->oid;
   auto sysobj = rados->svc()->sysobj;
 
-  ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, null_yield);
+  ret = rgw_delete_system_obj(dpp, sysobj, pool, oid, nullptr, y);
 
   return ret;
 }
@@ -280,7 +282,7 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
   if (!in_progress) { // first time. Send RESTORE req.
 
     rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
-    ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
+    ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params, tier_ctx.y);
 
     ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl;
 
@@ -364,7 +366,7 @@ int rgw_cloud_tier_get_object(RGWLCCloudTierCtx& tier_ctx, bool head,
     // accounted_size in complete_request() reads from RGWX_OBJECT_SIZE which is set
     // only for internal ops/sync. So instead read from headers[CONTENT_LEN].
     // Same goes for pattrs.
-    ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, null_yield);
+    ret = tier_ctx.conn.complete_request(tier_ctx.dpp, in_req, &etag, pset_mtime, nullptr, nullptr, &headers, tier_ctx.y);
     if (ret < 0) {
       if (ret == -EIO && tries < NUM_ENPOINT_IOERROR_RETRIES - 1) {
         ldpp_dout(tier_ctx.dpp, 20) << __func__  << "(): failed to fetch object from remote. retries=" << tries << dendl;
@@ -496,11 +498,14 @@ class RGWLCStreamRead
 
   int retcode{0};
 
+  optional_yield y;
+
   public:
   RGWLCStreamRead(CephContext *_cct, const DoutPrefixProvider *_dpp,
-      rgw::sal::Object *_obj, const real_time &_mtime) :
+      rgw::sal::Object *_obj, const real_time &_mtime,
+      optional_yield _y) :
     cct(_cct), dpp(_dpp), obj(_obj), mtime(_mtime),
-    read_op(obj->get_read_op()) {}
+    read_op(obj->get_read_op()), y(_y) {}
 
   ~RGWLCStreamRead() {};
   int set_range(off_t _ofs, off_t _end);
@@ -531,12 +536,16 @@ class RGWLCCloudStreamPut
 
   int retcode;
 
+  optional_yield y;
+
   public:
   RGWLCCloudStreamPut(const DoutPrefixProvider *_dpp,
       const rgw_lc_obj_properties&  _obj_properties,
       RGWRESTConn& _conn,
-      const rgw_obj& _dest_obj) :
-    dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj) {
+      const rgw_obj& _dest_obj,
+      optional_yield _y) :
+    dpp(_dpp), obj_properties(_obj_properties), conn(_conn), dest_obj(_dest_obj),
+    y(_y) {
     }
   int init();
   static bool keep_attr(const std::string& h);
@@ -578,7 +587,6 @@ void RGWLCStreamRead::set_multipart(uint64_t part_size, off_t part_off, off_t pa
 }
 
 int RGWLCStreamRead::init() {
-  optional_yield y = null_yield;
   real_time read_mtime;
 
   read_op->params.lastmod = &read_mtime;
@@ -650,7 +658,7 @@ int RGWLCStreamRead::init_rest_obj() {
 }
 
 int RGWLCStreamRead::read(off_t ofs, off_t end, RGWGetDataCB *out_cb) {
-  int ret = read_op->iterate(dpp, ofs, end, out_cb, null_yield);
+  int ret = read_op->iterate(dpp, ofs, end, out_cb, y);
   return ret;
 }
 
@@ -871,7 +879,7 @@ RGWGetDataCB *RGWLCCloudStreamPut::get_cb() {
 }
 
 int RGWLCCloudStreamPut::complete_request() {
-  return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, null_yield);
+  return conn.complete_request(dpp, out_req, etag, &obj_properties.mtime, y);
 }
 
 /* Read local copy and write to Cloud endpoint */
@@ -950,11 +958,11 @@ static int cloud_tier_plain_transfer(RGWLCCloudTierCtx& tier_ctx) {
    */
   std::shared_ptr<RGWLCStreamRead> readf;
   readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
-        tier_ctx.obj, tier_ctx.o.meta.mtime));
+        tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y));
 
   std::shared_ptr<RGWLCCloudStreamPut> writef;
   writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
-               dest_obj));
+               dest_obj, tier_ctx.y));
 
   /* actual Read & Write */
   ret = cloud_tier_transfer_object(tier_ctx.dpp, readf.get(), writef.get());
@@ -992,11 +1000,11 @@ static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx,
    * be taking lot of time eventually erroring out at times. */
   std::shared_ptr<RGWLCStreamRead> readf;
   readf.reset(new RGWLCStreamRead(tier_ctx.cct, tier_ctx.dpp,
-        tier_ctx.obj, tier_ctx.o.meta.mtime));
+        tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y));
 
   std::shared_ptr<RGWLCCloudStreamPut> writef;
   writef.reset(new RGWLCCloudStreamPut(tier_ctx.dpp, obj_properties, tier_ctx.conn,
-               dest_obj));
+               dest_obj, tier_ctx.y));
 
   /* Prepare Read from source */
   end = part_info.ofs + part_info.size - 1;
@@ -1021,7 +1029,8 @@ static int cloud_tier_send_multipart_part(RGWLCCloudTierCtx& tier_ctx,
 
 int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn,
                        const rgw_obj& dest_obj, std::optional<uint64_t> days,
-                       RGWZoneGroupTierS3Glacier& glacier_params) {
+                       RGWZoneGroupTierS3Glacier& glacier_params,
+                       optional_yield y) {
   rgw_http_param_pair params[] = {{"restore", nullptr}, {nullptr, nullptr}};
   // XXX: include versionId=VersionId in the params above
 
@@ -1068,7 +1077,7 @@ int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn,
   bl.append(ss.str());
 
   ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
-                                out_bl, &bl, nullptr, null_yield);
+                                out_bl, &bl, nullptr, y);
 
   if (ret < 0) {
     ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to send Restore request to cloud for obj=" << dest_obj << " , ret = " << ret << dendl;
@@ -1112,7 +1121,7 @@ int cloud_tier_restore(const DoutPrefixProvider *dpp, RGWRESTConn& dest_conn,
 
 static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
       RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
-      const std::string& upload_id) {
+      const std::string& upload_id, optional_yield y) {
   int ret;
   bufferlist out_bl;
   bufferlist bl;
@@ -1120,7 +1129,7 @@ static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
 
   string resource = obj_to_aws_path(dest_obj);
   ret = dest_conn.send_resource(dpp, "DELETE", resource, params, nullptr,
-      out_bl, &bl, nullptr, null_yield);
+      out_bl, &bl, nullptr, y);
 
 
   if (ret < 0) {
@@ -1134,7 +1143,7 @@ static int cloud_tier_abort_multipart(const DoutPrefixProvider *dpp,
 static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
       RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
       uint64_t obj_size, std::map<std::string, std::string>& attrs,
-      std::string& upload_id) {
+      std::string& upload_id, optional_yield y) {
   bufferlist out_bl;
   bufferlist bl;
 
@@ -1156,7 +1165,7 @@ static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
   string resource = obj_to_aws_path(dest_obj);
 
   ret = dest_conn.send_resource(dpp, "POST", resource, params, &attrs,
-      out_bl, &bl, nullptr, null_yield);
+      out_bl, &bl, nullptr, y);
 
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
@@ -1197,7 +1206,8 @@ static int cloud_tier_init_multipart(const DoutPrefixProvider *dpp,
 static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
       RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
       std::string& upload_id,
-      const std::map<int, rgw_lc_multipart_part_info>& parts) {
+      const std::map<int, rgw_lc_multipart_part_info>& parts,
+      optional_yield y) {
   rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
 
   stringstream ss;
@@ -1242,7 +1252,7 @@ static int cloud_tier_complete_multipart(const DoutPrefixProvider *dpp,
   bl.append(ss.str());
 
   ret = dest_conn.send_resource(dpp, "POST", resource, params, nullptr,
-      out_bl, &bl, nullptr, null_yield);
+      out_bl, &bl, nullptr, y);
 
 
   if (ret < 0) {
@@ -1284,14 +1294,14 @@ static int cloud_tier_abort_multipart_upload(RGWLCCloudTierCtx& tier_ctx,
       const std::string& upload_id) {
   int ret;
 
-  ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id);
+  ret = cloud_tier_abort_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, upload_id, tier_ctx.y);
 
   if (ret < 0) {
     ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " ret=" << ret << dendl;
     /* ignore error, best effort */
   }
   /* remove status obj */
-  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj);
+  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y);
   if (ret < 0) {
     ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " ret=" << ret << dendl;
     // ignore error, best effort 
@@ -1344,7 +1354,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
   rgw_pool pool = static_cast<rgw::sal::RadosStore*>(tier_ctx.driver)->svc()->zone->get_zone_params().log_pool;
   status_obj = rgw_raw_obj(pool, "lc_multipart_" + tier_ctx.obj->get_oid());
 
-  ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status);
+  ret = read_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y);
 
   if (ret < 0 && ret != -ENOENT) {
     ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " ret=" << ret << dendl;
@@ -1361,7 +1371,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
   }
 
   if (ret == -ENOENT) {
-    RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime);
+    RGWLCStreamRead readf(tier_ctx.cct, tier_ctx.dpp, tier_ctx.obj, tier_ctx.o.meta.mtime, tier_ctx.y);
 
     readf.init();
 
@@ -1369,7 +1379,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
 
     RGWLCCloudStreamPut::init_send_attrs(tier_ctx.dpp, rest_obj, obj_properties, new_attrs);
 
-    ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id);
+    ret = cloud_tier_init_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, obj_size, new_attrs, status.upload_id, tier_ctx.y);
     if (ret < 0) {
       return ret;
     }
@@ -1378,7 +1388,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
     status.mtime = obj_properties.mtime;
     status.etag = obj_properties.etag;
 
-    ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status);
+    ret = put_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, &status, tier_ctx.y);
 
     if (ret < 0) {
       ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to driver multipart upload state, ret=" << ret << dendl;
@@ -1423,7 +1433,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
 
   }
 
-  ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts);
+  ret = cloud_tier_complete_multipart(tier_ctx.dpp, tier_ctx.conn, dest_obj, status.upload_id, parts, tier_ctx.y);
   if (ret < 0) {
     ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << tier_ctx.obj << " (error: " << cpp_strerror(-ret) << ")" << dendl;
     cloud_tier_abort_multipart_upload(tier_ctx, dest_obj, status_obj, status.upload_id);
@@ -1431,7 +1441,7 @@ static int cloud_tier_multipart_transfer(RGWLCCloudTierCtx& tier_ctx) {
   }
 
   /* remove status obj */
-  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj);
+  ret = delete_upload_status(tier_ctx.dpp, tier_ctx.driver, &status_obj, tier_ctx.y);
   if (ret < 0) {
     ldpp_dout(tier_ctx.dpp, 0) << "ERROR: failed to abort multipart upload obj=" << tier_ctx.obj << " upload_id=" << status.upload_id << " part number " << cur_part << " (" << cpp_strerror(-ret) << ")" << dendl;
     // ignore error, best effort 
@@ -1507,7 +1517,7 @@ static int cloud_tier_create_bucket(RGWLCCloudTierCtx& tier_ctx) {
   }
 
   ret = tier_ctx.conn.send_resource(tier_ctx.dpp, "PUT", resource, nullptr, nullptr,
-                                    out_bl, &bl, nullptr, null_yield);
+                                    out_bl, &bl, nullptr, tier_ctx.y);
 
   if (ret < 0 ) {
     ldpp_dout(tier_ctx.dpp, 0) << "create target bucket : " << tier_ctx.target_bucket_name << " returned ret:" << ret << dendl;
index b6759d39bd6e36a7112da357675dfe9d902d92e4..44e2ea3883ab23f44d683a928b0105083b01bf7b 100644 (file)
@@ -40,14 +40,16 @@ struct RGWLCCloudTierCtx {
   bool is_multipart_upload{false};
   bool target_bucket_created{true};
 
+  optional_yield y;
+
   RGWLCCloudTierCtx(CephContext* _cct, const DoutPrefixProvider *_dpp,
       rgw_bucket_dir_entry& _o, rgw::sal::Driver *_driver,
       RGWBucketInfo &_binfo, rgw::sal::Object *_obj,
       RGWRESTConn& _conn, std::string& _bucket,
-      std::string& _storage_class) :
+      std::string& _storage_class, optional_yield _y) :
     cct(_cct), dpp(_dpp), o(_o), driver(_driver), bucket_info(_binfo),
     obj(_obj), conn(_conn), target_bucket_name(_bucket),
-    target_storage_class(_storage_class) {}
+    target_storage_class(_storage_class), y(_y) {}
 };
 
 /* Transition object to cloud endpoint */
@@ -70,7 +72,8 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
 int cloud_tier_restore(const DoutPrefixProvider *dpp,
                        RGWRESTConn& dest_conn, const rgw_obj& dest_obj,
                        std::optional<uint64_t> days,
-                       RGWZoneGroupTierS3Glacier& glacier_params);
+                       RGWZoneGroupTierS3Glacier& glacier_params,
+                       optional_yield y);
 
 bool is_restore_in_progress(const DoutPrefixProvider *dpp,
                             std::map<std::string, std::string>& headers);
index f70491e080c18c00e74069ae6b21b3f77461c37e..71350710e31cca94a811d52d2c87daf8a39c7bfd 100644 (file)
@@ -3271,7 +3271,7 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket,
   // save source cloudtier storage class
   RGWLCCloudTierCtx tier_ctx(cct, dpp, ent, store, bucket->get_info(),
            this, conn, bucket_name,
-           rtier->get_rt().t.s3.target_storage_class);
+           rtier->get_rt().t.s3.target_storage_class, y);
   tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings;
   tier_ctx.multipart_min_part_size = rtier->get_rt().t.s3.multipart_min_part_size;
   tier_ctx.multipart_sync_threshold = rtier->get_rt().t.s3.multipart_sync_threshold;
@@ -3347,7 +3347,7 @@ int RadosObject::transition_to_cloud(Bucket* bucket,
 
   RGWLCCloudTierCtx tier_ctx(cct, dpp, o, store, bucket->get_info(),
                             this, conn, bucket_name,
-                            rtier->get_rt().t.s3.target_storage_class);
+                            rtier->get_rt().t.s3.target_storage_class, y);
   tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings;
   tier_ctx.multipart_min_part_size = rtier->get_rt().t.s3.multipart_min_part_size;
   tier_ctx.multipart_sync_threshold = rtier->get_rt().t.s3.multipart_sync_threshold;