]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/cloud-restore: Support restoration of objects transitioned to Glacier/Tape endpoint
authorSoumya Koduri <skoduri@redhat.com>
Wed, 30 Apr 2025 20:36:21 +0000 (02:06 +0530)
committerSoumya Koduri <skoduri@redhat.com>
Thu, 10 Jul 2025 16:39:48 +0000 (22:09 +0530)
Restoration of objects from certain cloud services (like Glacier/Tape) could
take significant amount of time (even days). Hence store the state of such restore requests
and periodically process them.

Brief summary of changes

* Refactored existing restore code to consolidate and move all restore processing into rgw_restore* file/class

* RGWRestore class is defined to manage the restoration of objects.

* Lastly, for SAL_RADOS, FIFO is used to store and read restore entries.

Currently, this PR handles storing state of restore requests sent to cloud-glacier tier-type which need async processing.
The changes are tested with AWS Glacier Flexible Retrieval with tier_type Expedited and Standard.

Reviewed-by: Matt Benjamin <mbenjamin@redhat.com>
Reviewed-by: Adam Emerson <aemerson@redhat.com>
Reviewed-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
Reviewed-by: Daniel Gryniewicz <dang@redhat.com>
Signed-off-by: Soumya Koduri <skoduri@redhat.com>
(cherry picked from commit ef96bb0d6137bacf45b9ee2f99ad5bcd8b3b6add)

36 files changed:
src/common/options/rgw.yaml.in
src/common/subsys.h
src/rgw/.rgw_op.cc.swn [new file with mode: 0644]
src/rgw/CMakeLists.txt
src/rgw/driver/daos/rgw_sal_daos.cc
src/rgw/driver/daos/rgw_sal_daos.h
src/rgw/driver/motr/rgw_sal_motr.cc
src/rgw/driver/motr/rgw_sal_motr.h
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/driver/posix/rgw_sal_posix.h
src/rgw/driver/rados/rgw_lc_tier.cc
src/rgw/driver/rados/rgw_lc_tier.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/driver/rados/rgw_zone.h
src/rgw/radosgw-admin/radosgw-admin.cc
src/rgw/rgw_appmain.cc
src/rgw/rgw_lc.h
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_op.cc
src/rgw/rgw_realm_reloader.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_restore.cc [new file with mode: 0644]
src/rgw/rgw_restore.h [new file with mode: 0644]
src/rgw/rgw_sal.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_filter.cc
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_fwd.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_zone.cc
src/test/rgw/rgw_cr_test.cc

index ade58eb67de69e5d30b1e10af8eb7d0270452272..701d09067d3b379322acf716797fa99cc5ad561a 100644 (file)
@@ -230,6 +230,7 @@ options:
   see_also:
   - rgw_enable_gc_threads
   - rgw_enable_lc_threads
+  - rgw_enable_restore_threads
   with_legacy: true
 - name: rgw_enable_gc_threads
   type: bool
@@ -246,6 +247,7 @@ options:
   see_also:
   - rgw_enable_quota_threads
   - rgw_enable_lc_threads
+  - rgw_enable_restore_threads
   with_legacy: true
 - name: rgw_enable_lc_threads
   type: bool
@@ -263,6 +265,24 @@ options:
   see_also:
   - rgw_enable_gc_threads
   - rgw_enable_quota_threads
+  - rgw_enable_restore_threads
+  with_legacy: true
+- name: rgw_enable_restore_threads
+  type: bool
+  level: advanced
+  desc: Enables the objects' restore maintenance thread.
+  long_desc: The objects restore maintenance thread is responsible for all the objects
+    restoration related maintenance work. The thread itself can be disabled, but in order
+    for the restore from the cloud to work correctly, at least one RGW in each zone needs
+    to have this thread running. Having the thread enabled on multiple RGW processes within
+    the same zone can spread some of the maintenance work between them.
+  default: true
+  services:
+  - rgw
+  see_also:
+  - rgw_enable_gc_threads
+  - rgw_enable_quota_threads
+  - rgw_enable_lc_threads
   with_legacy: true
 - name: rgw_data
   type: str
@@ -475,6 +495,35 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_restore_max_objs
+  type: int
+  level: advanced
+  desc: Number of shards for restore processing
+  long_desc: Number of RADOS objects to use for storing restore entries which are in progress. This affects concurrency of restore maintenance, as shards can be processed in parallel.
+  default: 32
+  services:
+  - rgw
+  with_legacy: true
+- name: rgw_restore_lock_max_time
+  type: int
+  level: dev
+  default: 90
+  services:
+  - rgw
+  see_also:
+  with_legacy: true
+- name: rgw_restore_processor_period
+  type: int
+  level: advanced
+  desc: Restore cycle run time
+  long_desc: The amount of time between the start of consecutive runs of the restore
+    processing threads. If the thread runs takes more than this period, it will
+    not wait before running again.
+  fmt_desc: The cycle time for restore state processing.
+  default: 15_min
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_mp_lock_max_time
   type: int
   level: advanced
@@ -1260,6 +1309,14 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_nfs_run_restore_threads
+  type: bool
+  level: advanced
+  desc: run objects' restore threads in librgw (default off)
+  default: false
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_nfs_run_sync_thread
   type: bool
   level: advanced
index 419115c35ff7b228b57f90db6729afa8df639f8f..d756124f1a13e5e916a8fe5379f62be241d8f7c9 100644 (file)
@@ -66,6 +66,7 @@ SUBSYS(rgw_access, 1, 5)
 SUBSYS(rgw_dbstore, 1, 5)
 SUBSYS(rgw_flight, 1, 5)
 SUBSYS(rgw_lifecycle, 1, 5)
+SUBSYS(rgw_restore, 1, 5)
 SUBSYS(rgw_notification, 1, 5)
 SUBSYS(javaclient, 1, 5)
 SUBSYS(asok, 1, 5)
diff --git a/src/rgw/.rgw_op.cc.swn b/src/rgw/.rgw_op.cc.swn
new file mode 100644 (file)
index 0000000..11ce880
Binary files /dev/null and b/src/rgw/.rgw_op.cc.swn differ
index de0489e33ed7d4c909da1f9a6696d0a51d5a1ca5..f6ae85fd4e56c8caea0d8b85d45e7b761ca8a46e 100644 (file)
@@ -88,6 +88,7 @@ set(librgw_common_srcs
   rgw_ldap.cc
   rgw_lc.cc
   rgw_lc_s3.cc
+  rgw_restore.cc
   rgw_metadata.cc
   rgw_multi.cc
   rgw_multi_del.cc
index f60c5a11208ed9653c66758300002184ffdff7fe..c90a8770514ba5de879ed47fd7fbf25209afc4c8 100644 (file)
@@ -1028,15 +1028,13 @@ int DaosObject::transition_to_cloud(
 
 int DaosObject::restore_obj_from_cloud(Bucket* bucket,
           rgw::sal::PlacementTier* tier,
-          rgw_placement_rule& placement_rule,
-          rgw_bucket_dir_entry& o,
          CephContext* cct,
           RGWObjTier& tier_config,
           uint64_t olh_epoch,
           std::optional<uint64_t> days,
+         bool& in_progress,
           const DoutPrefixProvider* dpp, 
-          optional_yield y,
-          uint32_t flags)
+          optional_yield y)
 {
   return DAOS_NOT_IMPLEMENTED_LOG(dpp);
 }
@@ -2321,6 +2319,12 @@ std::unique_ptr<Lifecycle> DaosStore::get_lifecycle(void) {
   return 0;
 }
 
+std::unique_ptr<Restore> DaosStore::get_restore(const int n_objs,
+                               const std::vector<std::string_view>& obj_names) {
+  DAOS_NOT_IMPLEMENTED_LOG(nullptr);
+  return 0;
+}
+
 bool DaosStore::process_expired_objects(const DoutPrefixProvider *dpp,
                                        optional_yield y) {
   DAOS_NOT_IMPLEMENTED_LOG(nullptr);
index 19cff1d579802e0c5b8ed39807f5037fc3c54726..65ecbdcbb28c8fc670ec86be97c65583fbc8d951 100644 (file)
@@ -654,15 +654,13 @@ class DaosObject : public StoreObject {
                                   optional_yield y) override;
   virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
                           RGWObjTier& tier_config,
                           uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) override;
+                          optional_yield y) override;
   virtual bool placement_rules_match(rgw_placement_rule& r1,
                                      rgw_placement_rule& r2) override;
   virtual int dump_obj_layout(const DoutPrefixProvider* dpp, optional_yield y,
@@ -937,6 +935,8 @@ class DaosStore : public StoreDriver {
   virtual std::string zone_unique_trans_id(const uint64_t unique_num) override;
   virtual int cluster_stat(RGWClusterStat& stats) override;
   virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+  virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                const std::vector<std::string_view>& obj_names) override;
   virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
   virtual std::unique_ptr<Notification> get_notification(
       rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
@@ -953,6 +953,7 @@ class DaosStore : public StoreDriver {
       std::string& _req_id,
       optional_yield y) override;
   virtual RGWLC* get_rgwlc(void) override { return NULL; }
+  virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
   virtual RGWCoroutinesManagerRegistry* get_cr_registry() override {
     return NULL;
   }
index dee118243f7f572973dee5f52e3568c8fe418bcf..3cb654fe41bf4df878f74d5e67b8559e0a7a843a 100644 (file)
@@ -3335,6 +3335,11 @@ std::unique_ptr<Lifecycle> MotrStore::get_lifecycle(void)
   return 0;
 }
 
+std::unique_ptr<Restore> MotrStore::get_restore(const int n_objs,
+                       const std::vector<std::string_view>& obj_names) {
+  return 0;
+}
+
 bool MotrStore::process_expired_objects(const DoutPrefixProvider *dpp,
                                        optional_yield y)
 {
index e95ab3252d6df3c5420a36562e735447eb5a1d38..709b77c34a565642432791bd3bbb04b61d9ac0b9 100644 (file)
@@ -1006,6 +1006,8 @@ class MotrStore : public StoreDriver {
     virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
     virtual int cluster_stat(RGWClusterStat& stats) override;
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+    virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                  const std::vector<std::string_view>& obj_names) override;
     virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
     virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj,
         req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
@@ -1020,6 +1022,7 @@ class MotrStore : public StoreDriver {
         std::string& _req_id,
         optional_yield y) override;
     virtual RGWLC* get_rgwlc(void) override { return NULL; }
+    virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
 
     virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
index df625c8c07841148b9a604aaa46bd21e9776804e..798a18bfb9e7e2a67dcc7054882ff46c93a397bf 100644 (file)
@@ -3063,15 +3063,11 @@ int POSIXObject::transition_to_cloud(Bucket* bucket,
 
 int POSIXObject::restore_obj_from_cloud(Bucket* bucket,
           rgw::sal::PlacementTier* tier,
-          rgw_placement_rule& placement_rule,
-          rgw_bucket_dir_entry& o,
          CephContext* cct,
-          RGWObjTier& tier_config,
-          uint64_t olh_epoch,
           std::optional<uint64_t> days,
+          bool& in_progress,
           const DoutPrefixProvider* dpp, 
-          optional_yield y,
-          uint32_t flags)
+          optional_yield y)
 {
   return -ERR_NOT_IMPLEMENTED;
 }
index 3af36d02a4c12b69c0fe7840fb25ad9ef6bcd4d9..b31acb3b860bd32d67137fbbe95d20198abf1544 100644 (file)
@@ -697,15 +697,11 @@ public:
                         optional_yield y) override;
   virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
-                          RGWObjTier& tier_config,
-                          uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) override;
+                          optional_yield y) override;
   virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
   virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override;
   virtual int swift_versioning_restore(const ACLOwner& owner, const rgw_user& remote_user, bool& restored,
index cf057f6fae8f59df86795eb7f8c848e194f63812..c536d6fc10cad868ea0d1764b0deb73dc87ad60e 100644 (file)
@@ -260,6 +260,7 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
                          uint64_t& accounted_size, rgw::sal::Attrs& attrs,
                          std::optional<uint64_t> days,
                          RGWZoneGroupTierS3Glacier& glacier_params,
+                        bool& in_progress,
                          void* cb) {
   RGWRESTConn::get_obj_params req_params;
   std::string target_obj_name;
@@ -276,25 +277,23 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
     target_obj_name += get_key_instance(tier_ctx.obj->get_key());
   }
 
-  if (glacier_params.glacier_restore_tier_type != GlacierRestoreTierType::Expedited) {
-    //XXX: Supporting STANDARD tier type is still in WIP
-    ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: Only Expedited tier_type is supported " << dendl;
-    return -1;
-  }
+  if (!in_progress) { // first time. Send RESTORE req.
 
-  rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
+    rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
+    ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
 
-  ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
-  
-  ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << dest_obj << "returned ret = " << ret << dendl;
-  if (ret < 0 ) { 
-    ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl;
-    return ret;
+    ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl;
+
+    if (ret < 0 ) {
+      ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl;
+      return ret;
+    }
+    in_progress = true;
   }
 
   // now send HEAD request and verify if restore is complete on glacier/tape endpoint
-  bool restore_in_progress = false;
+  static constexpr int MAX_RETRIES = 10;
+  uint32_t retries = 0;
   do {
     ret = rgw_cloud_tier_get_object(tier_ctx, true, headers, nullptr, etag,
                                     accounted_size, attrs, nullptr);
@@ -304,8 +303,14 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
       return ret;
     }
 
-    restore_in_progress = is_restore_in_progress(tier_ctx.dpp, headers);
-  } while(restore_in_progress);
+    in_progress = is_restore_in_progress(tier_ctx.dpp, headers);
+
+  } while(retries++ < MAX_RETRIES && in_progress);
+
+  if (in_progress) {
+    ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << " still in progress; returning " << dendl;
+    return 0;
+  } 
 
   // now do the actual GET
   ret = rgw_cloud_tier_get_object(tier_ctx, false, headers, pset_mtime, etag,
index 60006da89149da98399523bfb85e2f9223b538a5..20d6e0675d091503e1f04f0418079c6d7bd6a0a7 100644 (file)
@@ -61,8 +61,9 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx,
                          std::map<std::string, std::string>& headers,
                          real_time* pset_mtime, std::string& etag,
                          uint64_t& accounted_size, rgw::sal::Attrs& attrs,
-                                        std::optional<uint64_t> days,
+                        std::optional<uint64_t> days,
                          RGWZoneGroupTierS3Glacier& glacier_params,
+                        bool& in_progress,
                          void* cb);
 
 int cloud_tier_restore(const DoutPrefixProvider *dpp,
index 501d5d8fad1f76ed44db5207ff0b7089b6681949..c057aca97a99a001e1959fe012793bc145a49307 100644 (file)
@@ -38,6 +38,7 @@
 #include "rgw_datalog.h"
 #include "rgw_putobj_processor.h"
 #include "rgw_lc_tier.h"
+#include "rgw_restore.h"
 
 #include "cls/rgw/cls_rgw_ops.h"
 #include "cls/rgw/cls_rgw_client.h"
@@ -1141,6 +1142,12 @@ void RGWRados::finalize()
     rgw::notify::shutdown();
     v1_topic_migration.stop();
   }
+
+  if (use_restore_thread) {
+    restore->stop_processor();
+  }
+  delete restore;
+  restore = NULL;
 }
 
 /** 
@@ -1240,6 +1247,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   if (ret < 0)
     return ret;
 
+  ret = open_restore_pool_ctx(dpp);
+  if (ret < 0)
+    return ret;
+
   ret = open_objexp_pool_ctx(dpp);
   if (ret < 0)
     return ret;
@@ -1363,6 +1374,12 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
   if (use_lc_thread)
     lc->start_processor();
 
+  restore = new RGWRestore();
+  restore->initialize(cct, this->driver);
+
+  if (use_restore_thread)
+    restore->start_processor();
+
   quota_handler = RGWQuotaHandler::generate_handler(dpp, this->driver, quota_threads);
 
   bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
@@ -1484,6 +1501,11 @@ int RGWRados::open_lc_pool_ctx(const DoutPrefixProvider *dpp)
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().lc_pool, lc_pool_ctx, true, true);
 }
 
+int RGWRados::open_restore_pool_ctx(const DoutPrefixProvider *dpp)
+{
+  return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().restore_pool, restore_pool_ctx, true, true);
+}
+
 int RGWRados::open_objexp_pool_ctx(const DoutPrefixProvider *dpp)
 {
   return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().log_pool, objexp_pool_ctx, true, true);
@@ -5353,13 +5375,11 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
                              RGWObjectCtx& obj_ctx,
                              RGWBucketInfo& dest_bucket_info,
                              const rgw_obj& dest_obj,
-                             rgw_placement_rule& dest_placement,
                              RGWObjTier& tier_config,
-                             uint64_t olh_epoch,
                              std::optional<uint64_t> days,
+                            bool& in_progress,
                              const DoutPrefixProvider *dpp,
-                             optional_yield y,
-                             bool log_op){
+                             optional_yield y) {
 
   //XXX: read below from attrs .. check transition_obj()
   ACLOwner owner;
@@ -5381,6 +5401,7 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
     dest_obj_bi.key.instance.clear();
   }
 
+  uint64_t olh_epoch = 0; // read it from attrs fetched from cloud below
   rgw::putobj::AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info, nullptr,
                                   owner, obj_ctx, dest_obj_bi, olh_epoch, tag, dpp, y, no_trace);
  
@@ -5394,11 +5415,9 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
   }
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
-  dest_placement.storage_class = tier_ctx.restore_storage_class;
+  rgw_placement_rule dest_placement(dest_bucket_info.placement_rule, tier_ctx.restore_storage_class);
   RGWRadosPutObj cb(dpp, cct, plugin, compressor, &processor, progress_cb, progress_data,
                     [&](map<string, bufferlist> obj_attrs) {
-                      // XXX: do we need filter() like in fetch_remote_obj() cb
-                      dest_placement.inherit_from(dest_bucket_info.placement_rule);
                       processor.set_tail_placement(dest_placement);
 
                       ret = processor.prepare(rctx.y);
@@ -5422,6 +5441,9 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
     return ret;
   }
 
+  // For Permanent restore, `log_op` depends on flag set on the bucket->get_info().flags
+  bool log_op = (dest_bucket_info.flags & rgw::sal::FLAG_LOG_OP);
+
   uint64_t accounted_size = 0;
   string etag;
   real_time set_mtime;
@@ -5432,7 +5454,7 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
     RGWZoneGroupTierS3Glacier& glacier_params = tier_config.tier_placement.s3_glacier;
     ret = rgw_cloud_tier_restore_object(tier_ctx, headers,
                                 &set_mtime, etag, accounted_size,
-                                attrs, days, glacier_params, &cb);
+                                attrs, days, glacier_params, in_progress, &cb);
   } else {
     ldpp_dout(dpp, 20) << "Fetching  object:" << dest_obj << "from the cloud" <<  dendl;
     ret = rgw_cloud_tier_get_object(tier_ctx, false,  headers,
@@ -5445,6 +5467,11 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
     return ret; 
   }
 
+  if (in_progress) {
+    ldpp_dout(tier_ctx.dpp, 20) << "Restoring object:" << dest_obj << " still in progress; returning " << dendl;
+    return ret;
+  }
+
   if (!cb_processed) { 
     ldpp_dout(dpp, 20) << "Callback not processed, object:" << dest_obj << dendl;
     return -EIO; 
@@ -5470,6 +5497,8 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
         std::optional<uint64_t> olh_ep = ceph::parse<uint64_t>(rgw_bl_str(aiter->second));
         if (olh_ep) {
           olh_epoch = *olh_ep;
+         // update in tier_ctx too
+         tier_ctx.o.versioned_epoch = *olh_ep;
         }
         attrs.erase(aiter);
       }
@@ -5549,6 +5578,8 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
       attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl);
       ldpp_dout(dpp, 20) << "Permanent restore, object:" << dest_obj << dendl;
     }
+    // bucket->get_info().flags doesn't seem to be having rgw::sal::FLAG_LOG_OP    // set by default. Hence set it explicity
+    // XXX: Do we need to check if sync is disabled for bucket.
     log_op = true;
   }
 
index 7e8b3da8afb653e985d21ec05bdcae921126d3b6..01979125d93c2f7ca3dc517460eb355dfc39224a 100644 (file)
@@ -358,6 +358,7 @@ class RGWRados
   int open_root_pool_ctx(const DoutPrefixProvider *dpp);
   int open_gc_pool_ctx(const DoutPrefixProvider *dpp);
   int open_lc_pool_ctx(const DoutPrefixProvider *dpp);
+  int open_restore_pool_ctx(const DoutPrefixProvider *dpp);
   int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
   int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
   int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
@@ -372,9 +373,11 @@ class RGWRados
   rgw::sal::RadosStore* driver{nullptr};
   RGWGC* gc{nullptr};
   RGWLC* lc{nullptr};
+  RGWRestore* restore{nullptr};
   RGWObjectExpirer* obj_expirer{nullptr};
   bool use_gc_thread{false};
   bool use_lc_thread{false};
+  bool use_restore_thread{false};
   bool quota_threads{false};
   bool run_sync_thread{false};
   bool run_reshard_thread{false};
@@ -449,6 +452,7 @@ protected:
 
   librados::IoCtx gc_pool_ctx;        // .rgw.gc
   librados::IoCtx lc_pool_ctx;        // .rgw.lc
+  librados::IoCtx restore_pool_ctx;        // .rgw.restore
   librados::IoCtx objexp_pool_ctx;
   librados::IoCtx reshard_pool_ctx;
   librados::IoCtx notif_pool_ctx;     // .rgw.notif
@@ -500,6 +504,10 @@ public:
     return gc;
   }
 
+  RGWRestore *get_restore() {
+    return restore;
+  }
+
   RGWRados& set_run_gc_thread(bool _use_gc_thread) {
     use_gc_thread = _use_gc_thread;
     return *this;
@@ -510,6 +518,11 @@ public:
     return *this;
   }
 
+  RGWRados& set_run_restore_thread(bool _use_restore_thread) {
+    use_restore_thread = _use_restore_thread;
+    return *this;
+  }
+
   RGWRados& set_run_quota_threads(bool _run_quota_threads) {
     quota_threads = _run_quota_threads;
     return *this;
@@ -534,6 +547,10 @@ public:
     return &lc_pool_ctx;
   }
 
+  librados::IoCtx* get_restore_pool_ctx() {
+    return &restore_pool_ctx;
+  }
+
   librados::IoCtx& get_notif_pool_ctx() {
     return notif_pool_ctx;
   }
@@ -1258,13 +1275,11 @@ int restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx,
                              RGWObjectCtx& obj_ctx,
                              RGWBucketInfo& dest_bucket_info,
                              const rgw_obj& dest_obj,
-                             rgw_placement_rule& dest_placement,
                              RGWObjTier& tier_config,
-                             uint64_t olh_epoch,
                              std::optional<uint64_t> days,
+                            bool& in_progress,
                              const DoutPrefixProvider *dpp,
-                             optional_yield y,
-                             bool log_op = true);
+                             optional_yield y);
 
   int check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y);
 
index 99bb6b0713690f25f33d6109164dd3de5a673182..0c324f3acc7430180a922c8392c145cf6de9c670 100644 (file)
@@ -60,6 +60,7 @@
 #include "rgw_tools.h"
 #include "rgw_tracer.h"
 #include "rgw_zone.h"
+#include "rgw_restore.h"
 
 #include "services/svc_bilog_rados.h"
 #include "services/svc_bi_rados.h"
@@ -2025,6 +2026,12 @@ std::unique_ptr<Lifecycle> RadosStore::get_lifecycle(void)
   return std::make_unique<RadosLifecycle>(this);
 }
 
+std::unique_ptr<Restore> RadosStore::get_restore(const int n_objs,
+                               const std::vector<std::string_view>& obj_names)
+{
+  return std::make_unique<RadosRestore>(this, n_objs, obj_names);
+}
+
 bool RadosStore::process_expired_objects(const DoutPrefixProvider *dpp,
                                         optional_yield y)
 {
@@ -3031,15 +3038,13 @@ int RadosObject::transition(Bucket* bucket,
 
 int RadosObject::restore_obj_from_cloud(Bucket* bucket,
                                   rgw::sal::PlacementTier* tier,
-                                  rgw_placement_rule& placement_rule,
-                                 rgw_bucket_dir_entry& o,
                                  CephContext* cct,
                                   RGWObjTier& tier_config,
                                   uint64_t olh_epoch,
                                   std::optional<uint64_t> days,
+                                 bool& in_progress,
                                   const DoutPrefixProvider* dpp, 
-                                  optional_yield y,
-                                  uint32_t flags)
+                                  optional_yield y)
 {
   /* init */
   rgw::sal::RadosPlacementTier* rtier = static_cast<rgw::sal::RadosPlacementTier*>(tier);
@@ -3051,7 +3056,27 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket,
   string bucket_name = rtier->get_rt().t.s3.target_path;
   const rgw::sal::ZoneGroup& zonegroup = store->get_zone()->get_zonegroup();
   int ret = 0;
-  string src_storage_class = o.meta.storage_class; // or take src_placement also as input
+
+  auto& attrs = get_attrs();
+  RGWObjTier tier_config;
+
+  auto attr_iter = attrs.find(RGW_ATTR_MANIFEST);
+  if (attr_iter != attrs.end()) {
+    RGWObjManifest m;
+    try {
+      using ceph::decode;
+      decode(m, attr_iter->second);
+      m.get_tier_config(&tier_config);
+    } catch (const buffer::end_of_buffer&) {
+      //empty manifest; it's not cloud-tiered
+      ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
+      return -EIO;
+    } catch (const std::exception& e) {
+      ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
+      return -EIO;
+    }
+  }
+
   // update tier_config in case tier params are updated
   tier_config.tier_placement = rtier->get_rt();
 
@@ -3060,11 +3085,22 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket,
                     "-cloud-bucket";
     boost::algorithm::to_lower(bucket_name);
   }
+
+  rgw_bucket_dir_entry ent;
+  ent.key.name = get_key().name;
+  ent.key.instance = get_key().instance;
+  ent.meta.accounted_size = ent.meta.size = get_obj_size();
+  ent.meta.etag = "" ;
+
+  if (!ent.key.instance.empty()) { // non-current versioned object
+    ent.flags |= rgw_bucket_dir_entry::FLAG_VER;
+  }
+
   /* Create RGW REST connection */
   S3RESTConn conn(cct, id, { endpoint }, key, zonegroup.get_id(), region, host_style);
 
   // save source cloudtier storage class
-  RGWLCCloudTierCtx tier_ctx(cct, dpp, o, store, bucket->get_info(),
+  RGWLCCloudTierCtx tier_ctx(cct, dpp, ent, store, bucket->get_info(),
            this, conn, bucket_name,
            rtier->get_rt().t.s3.target_storage_class);
   tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings;
@@ -3074,46 +3110,34 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket,
   tier_ctx.restore_storage_class = rtier->get_rt().restore_storage_class;
   tier_ctx.tier_type = rtier->get_rt().tier_type;
 
-  ldpp_dout(dpp, 20) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl;
+  ldpp_dout(dpp, 20) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl;
 
   if (days && days == 0) {
-    ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl;
+    ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl;
     return 0;
   }
 
-  // Note: For non-versioned objects, below should have already been set by the callers-
-  // o.current should be false; this(obj)->instance should have version-id.
-
-  // set restore_status as RESTORE_ALREADY_IN_PROGRESS
-  ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreAlreadyInProgress);
-  if (ret < 0) {
-    ldpp_dout(dpp, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
-    return ret;
-  }
-
   /* Restore object from the cloud endpoint.
    * All restore related status and attrs are set as part of object download to
    * avoid any races */
   ret = store->getRados()->restore_obj_from_cloud(tier_ctx, *rados_ctx,
-                                bucket->get_info(), get_obj(), placement_rule,
-                                tier_config,
-                                olh_epoch, days, dpp, y, flags & FLAG_LOG_OP);
+                                bucket->get_info(), get_obj(),
+                                tier_config, days, in_progress, dpp, y);
 
   if (ret < 0) { //failed to restore
-    ldpp_dout(dpp, 0) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
-    auto reset_ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreFailed);
+    ldpp_dout(dpp, 0) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
 
     rgw_placement_rule target_placement;
     target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
     target_placement.storage_class = tier->get_storage_class();
 
     /* Reset HEAD object as CloudTiered */
-    reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch,
+    int reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch,
                           tier, tier_ctx.is_multipart_upload,
                           target_placement, tier_ctx.obj);
 
     if (reset_ret < 0) {
-      ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl;
+      ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl;
     }
     return ret;
   }
@@ -3200,21 +3224,6 @@ int RadosObject::transition_to_cloud(Bucket* bucket,
   return ret;
 }
 
-int RadosObject::set_cloud_restore_status(const DoutPrefixProvider* dpp,
-                                 optional_yield y,
-                                 rgw::sal::RGWRestoreStatus restore_status)
-{
-  int ret = 0;
-  set_atomic(true);
-  bufferlist bl;
-  using ceph::encode;
-  encode(restore_status, bl);
-  ret = modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false);
-
-  return ret;
-}
-
 /*
  * If the object is restored temporarily and is expired, delete the data and
  * reset the HEAD object as cloud-transitioned.
@@ -4624,6 +4633,208 @@ std::unique_ptr<LCSerializer> RadosLifecycle::get_serializer(const std::string&
   return std::make_unique<LCRadosSerializer>(store, oid, lock_name, cookie);
 }
 
+RadosRestoreSerializer::RadosRestoreSerializer(RadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) :
+  StoreRestoreSerializer(_oid),
+  ioctx(*store->getRados()->get_restore_pool_ctx()),
+  lock(lock_name)
+{
+  lock.set_cookie(cookie);
+}
+
+int RadosRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y)
+{
+  lock.set_duration(dur);
+  return lock.lock_exclusive((librados::IoCtx*)(&ioctx), oid);
+}
+
+std::unique_ptr<RestoreSerializer> RadosRestore::get_serializer(
+                                                       const std::string& lock_name,
+                                                       const std::string& oid,
+                                                       const std::string& cookie)
+{
+  return std::make_unique<RadosRestoreSerializer>(store, oid, lock_name, cookie);
+}
+
+int RadosRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+               int index, const RGWRestoreEntry& entry) {
+  bufferlist bl;
+
+  encode(entry, bl);
+
+  auto ret = push(dpp, y, index, std::move(bl));
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+
+int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+               int index, const std::list<RGWRestoreEntry>& restore_entries) {
+  std::vector<ceph::buffer::list> ent_list;
+
+  for (auto& entry : restore_entries) {
+    bufferlist bl;
+
+    encode(entry, bl);
+    ent_list.push_back(std::move(bl));
+
+  }
+
+  int ret = push(dpp, y, index, std::move(ent_list));
+
+  if (ret < 0) {
+    ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, std::vector<ceph::buffer::list>&& items) {
+  auto r = fifos[index].push(dpp, items, y);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                << ": unable to push to FIFO: " << obj_names[index]
+                << ": " << cpp_strerror(-r) << dendl;
+    }
+    return r;
+}
+
+int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, ceph::buffer::list&& bl) {
+  auto r = fifos[index].push(dpp, std::move(bl), y);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                << ": unable to push to FIFO: " << obj_names[index]
+                << ": " << cpp_strerror(-r) << dendl;
+  }
+  return r;
+}
+
+struct rgw_restore_fifo_entry {
+  std::string id;
+  ceph::real_time mtime;
+  RGWRestoreEntry entry;
+  rgw_restore_fifo_entry() {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(mtime, bl);
+    encode(entry, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+     DECODE_START(1, bl);
+     decode(id, bl);
+     decode(mtime, bl);
+     decode(entry, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter* f) const;
+  void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_restore_fifo_entry)
+
+int RadosRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
+                  int index, const std::string& marker, std::string* out_marker,
+                  uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+                  bool* truncated)
+{
+  std::vector<rgw::cls::fifo::list_entry> restore_entries;
+  bool more = false;
+
+  auto r = fifos[index].list(dpp, max_entries, marker, &restore_entries, &more, y);
+
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                << ": unable to list FIFO: " << obj_names[index]
+                << ": " << cpp_strerror(-r) << dendl;
+    return r;
+  }
+
+  entries.clear();
+
+  for (const auto& entry : restore_entries) {
+      rgw_restore_fifo_entry r_entry;
+      r_entry.id = entry.marker;
+      r_entry.mtime = entry.mtime;
+
+      auto liter = entry.data.cbegin();
+      try {
+       decode(r_entry.entry, liter);
+      } catch (const buffer::error& err) {
+       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                  << ": failed to decode restore entry: "
+                  << err.what() << dendl;
+       return -EIO;
+      }
+      RGWRestoreEntry& e = r_entry.entry;
+      entries.push_back(std::move(e));
+  }
+
+  if (truncated)
+    *truncated = more;
+
+  if (out_marker && !restore_entries.empty()) {
+    *out_marker = restore_entries.back().marker;
+  }
+
+  return 0;
+}
+
+int RadosRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+                       int index, const std::string_view& marker)
+{
+  assert(index < num_objs);
+
+  int ret = trim(dpp, y, index, marker);
+  return ret;
+}
+
+int RadosRestore::trim(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, const std::string_view& marker) {
+  auto r = fifos[index].trim(dpp, marker, false, y);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                << ": unable to trim FIFO: " << obj_names[index]
+                << ": " << cpp_strerror(-r) << dendl;
+  }
+
+  return r;
+}
+
+std::string_view RadosRestore::max_marker() {
+  static const std::string mm = rgw::cls::fifo::marker::max().to_string();
+  return std::string_view(mm);
+}
+
+int RadosRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
+    std::vector<rgw::cls::fifo::list_entry> restore_entries;
+    bool more = false;
+
+    for (auto shard = 0u; shard < fifos.size(); ++shard) {
+      auto r = fifos[shard].list(dpp, 1, {}, &restore_entries, &more, y);
+      if (r < 0) {
+       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+                  << ": unable to list FIFO: " << obj_names[shard]
+                  << ": " << cpp_strerror(-r) << dendl;
+       return r;
+      }
+      if (!restore_entries.empty()) {
+       return 0;
+      }
+    }
+
+    return 1;
+}
+
 int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags)
 {
   return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types, res, obj_tags);
index 66c73a1199f09f715d06690a451c557148e9ea5c..d0628b7c0a986ee349e76676290dda1da1f7a71a 100644 (file)
@@ -27,6 +27,7 @@
 #include "rgw_putobj_processor.h"
 #include "services/svc_tier_rados.h"
 #include "cls/lock/cls_lock_client.h"
+#include "rgw_log_backing.h"
 
 namespace rgw { namespace sal {
 
@@ -282,6 +283,8 @@ class RadosStore : public StoreDriver {
     virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
     virtual int cluster_stat(RGWClusterStat& stats) override;
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+    virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                   const std::vector<std::string_view>& obj_names) override;
     virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
     virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
     virtual std::unique_ptr<Notification> get_notification(
@@ -343,6 +346,7 @@ class RadosStore : public StoreDriver {
                                  optional_yield y,
                                  const DoutPrefixProvider* dpp) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
+    virtual RGWRestore* get_rgwrestore(void) override { return rados->get_restore(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
 
     virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
@@ -640,15 +644,11 @@ class RadosObject : public StoreObject {
                           optional_yield y) override;
     virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
-                          RGWObjTier& tier_config,
-                          uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) override;
+                          optional_yield y) override;
     virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
     virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override;
 
@@ -952,6 +952,63 @@ public:
                                                       const std::string& cookie) override;
 };
 
+class RadosRestoreSerializer : public StoreRestoreSerializer {
+  librados::IoCtx& ioctx;
+  ::rados::cls::lock::Lock lock;
+
+public:
+  RadosRestoreSerializer(RadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie);
+
+  virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
+  virtual int unlock() override {
+    return lock.unlock(&ioctx, oid);
+  }
+};
+
+class RadosRestore : public Restore {
+  RadosStore* store;
+  const int num_objs;
+  const std::vector<std::string_view>& obj_names;
+  librados::IoCtx& ioctx;
+  ceph::containers::tiny_vector<LazyFIFO> fifos;
+
+public:
+  RadosRestore(RadosStore* _st, const int n_objs, const std::vector<std::string_view>& o_names) : store(_st),
+       num_objs(n_objs), obj_names(o_names),
+       ioctx(*store->getRados()->get_restore_pool_ctx()),
+       fifos(num_objs,
+       [&](const size_t ix, auto emplacer) {
+         emplacer.emplace(ioctx, std::string(obj_names[ix]));
+       }) {}
+
+  ~RadosRestore() override = default;
+
+  virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+                 int index, const RGWRestoreEntry& r_entry) override;
+  virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+              int index, const std::list<RGWRestoreEntry>& restore_entries) override;
+  virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+                  int index,
+                  const std::string& marker, std::string* out_marker,
+                  uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+                  bool* truncated) override;
+  virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+                         int index, const std::string_view& marker) override;
+  virtual std::unique_ptr<RestoreSerializer> get_serializer(
+                                              const std::string& lock_name,
+                                              const std::string& oid,
+                                              const std::string& cookie) override;
+  /** Below routines deal with actual FIFO */
+  int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
+  std::string_view max_marker();
+  int trim(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, const std::string_view& marker);
+  int push(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, ceph::buffer::list&& bl);
+  int push(const DoutPrefixProvider *dpp, optional_yield y,
+               int index, std::vector<ceph::buffer::list>&& items);
+};
+
 class RadosNotification : public StoreNotification {
   RadosStore* store;
   /* XXX it feels incorrect to me that rgw::notify::reservation_t is
index 7860bad50f8a633c3c18d3023db876c3ffbb3395..eacb305848706027c3e7fcb0f4dca97bb71bb4b3 100644 (file)
@@ -127,6 +127,8 @@ struct RGWZoneParams : RGWSystemMetaObj {
 
   JSONFormattable tier_config;
 
+  rgw_pool restore_pool;
+
   RGWZoneParams() : RGWSystemMetaObj() {}
   explicit RGWZoneParams(const std::string& name) : RGWSystemMetaObj(name){}
   RGWZoneParams(const rgw_zone_id& id, const std::string& name) : RGWSystemMetaObj(id.id, name) {}
@@ -154,7 +156,7 @@ struct RGWZoneParams : RGWSystemMetaObj {
   const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
   
   void encode(bufferlist& bl) const override {
-    ENCODE_START(16, 1, bl);
+    ENCODE_START(17, 1, bl);
     encode(domain_root, bl);
     encode(control_pool, bl);
     encode(gc_pool, bl);
@@ -184,11 +186,12 @@ struct RGWZoneParams : RGWSystemMetaObj {
     encode(account_pool, bl);
     encode(group_pool, bl);
     encode(dedup_pool, bl);
+    encode(restore_pool, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) override {
-    DECODE_START(16, bl);
+    DECODE_START(17, bl);
     decode(domain_root, bl);
     decode(control_pool, bl);
     decode(gc_pool, bl);
@@ -271,6 +274,11 @@ struct RGWZoneParams : RGWSystemMetaObj {
     } else {
       dedup_pool = name + ".rgw.dedup";
     }
+    if (struct_v >= 17) {
+      decode(restore_pool, bl);
+    } else {
+      restore_pool = log_pool.name + ":restore";
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
index a28f3bd1e53101a9c4b67c4ae63057ffefcd69a8..d5f80ce2e0f16d8cf54d51081e4ca798ffdf702a 100644 (file)
@@ -4635,6 +4635,7 @@ int main(int argc, const char **argv)
                                        false,
                                        false,
                                        false,
+                                       false,
                                         false,
                                        false, // No background tasks!
                                         null_yield,
index 939768f05240aac3827ab5ccc3f4a89a739ec251..4a2142a79519edc1109b7e5319f703e861442013 100644 (file)
@@ -234,6 +234,10 @@ int rgw::AppMain::init_storage()
     (g_conf()->rgw_enable_lc_threads &&
       ((!nfs) || (nfs && g_conf()->rgw_nfs_run_lc_threads)));
 
+  auto run_restore =
+    (g_conf()->rgw_enable_restore_threads &&
+      ((!nfs) || (nfs && g_conf()->rgw_nfs_run_restore_threads)));
+
   auto run_quota =
     (g_conf()->rgw_enable_quota_threads &&
       ((!nfs) || (nfs && g_conf()->rgw_nfs_run_quota_threads)));
@@ -250,6 +254,7 @@ int rgw::AppMain::init_storage()
          site,
           run_gc,
           run_lc,
+         run_restore,
           run_quota,
           run_sync,
           g_conf().get_val<bool>("rgw_dynamic_resharding"),
index a813a7c28ac255f3edb1ab591c54623c1bec2c2e..73305c6f19c2bbdcfcbeb7a34faaf6fdaefd8eda 100644 (file)
@@ -566,6 +566,7 @@ class RGWLC : public DoutPrefixProvider {
   CephContext *cct;
   rgw::sal::Driver* driver;
   std::unique_ptr<rgw::sal::Lifecycle> sal_lc;
+  std::unique_ptr<rgw::sal::Restore> sal_restore;
   int max_objs{0};
   std::string *obj_names{nullptr};
   std::atomic<bool> down_flag = { false };
@@ -664,6 +665,7 @@ public:
 
   CephContext *get_cct() const override { return cct; }
   rgw::sal::Lifecycle* get_lc() const { return sal_lc.get(); }
+  rgw::sal::Restore* get_restore() const { return sal_restore.get(); }
   unsigned get_subsys() const;
   std::ostream& gen_prefix(std::ostream& out) const;
 
index aa71315aa766e2516a574aad135b472972d80224..f12737feffdd3ac4f5f00ff025420d35cfe4af84 100644 (file)
@@ -105,7 +105,7 @@ int main(const int argc, const char **argv)
     exit(1);
   }
 
-  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, true, null_yield);
+  driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield);
   if (!driver) {
     std::cerr << "couldn't init storage provider" << std::endl;
     return EIO;
index e2113110867f93af3f9d7ad5a3abcdef3513c5f6..1bd8824fa45831832e0e42e66a05aa4fe275307f 100644 (file)
@@ -68,6 +68,7 @@
 #include "rgw_iam_managed_policy.h"
 #include "rgw_bucket_sync.h"
 #include "rgw_bucket_logging.h"
+#include "rgw_restore.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -986,13 +987,13 @@ void handle_replication_status_header(
  */
 int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
                          rgw::sal::Attrs& attrs, bool sync_cloudtiered, std::optional<uint64_t> days,
-                         bool restore_op, optional_yield y)
+                         bool read_through, optional_yield y)
 {
   int op_ret = 0;
   ldpp_dout(dpp, 20) << "reached handle cloud tier " << dendl;
   auto attr_iter = attrs.find(RGW_ATTR_MANIFEST);
   if (attr_iter == attrs.end()) {
-    if (restore_op) {
+    if (!read_through) {
       op_ret = -ERR_INVALID_OBJECT_STATE;
       s->err.message = "only cloud tier object can be restored";
       return op_ret;
@@ -1005,7 +1006,7 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
     decode(m, attr_iter->second);
     if (!m.is_tier_type_s3()) {
       ldpp_dout(dpp, 20) << "not a cloud tier object " <<  s->object->get_key().name << dendl;
-      if (restore_op) {
+      if (!read_through) {
         op_ret = -ERR_INVALID_OBJECT_STATE;
         s->err.message = "only cloud tier object can be restored";
         return op_ret;
@@ -1030,26 +1031,45 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
       auto iter = bl.cbegin();
       decode(restore_status, iter);
     }
-    if (attr_iter == attrs.end() || restore_status == rgw::sal::RGWRestoreStatus::RestoreFailed) {
-      // first time restore or previous restore failed
-      rgw::sal::Bucket* pbucket = NULL;
-      pbucket = s->bucket.get();
-
+    if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+      if (read_through) {
+        op_ret = -ERR_REQUEST_TIMEOUT;
+        ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
+        s->err.message = "restore is still in progress";
+        return op_ret;
+      } else { 
+               // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS
+        return static_cast<int>(rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress);
+      } 
+    } else if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) {
+      // corresponds to CLOUD_RESTORED
+      return static_cast<int>(rgw::sal::RGWRestoreStatus::CloudRestored);
+    } else { // first time restore or previous restore failed.
+      // Restore the object.
       std::unique_ptr<rgw::sal::PlacementTier> tier;
       rgw_placement_rule target_placement;
-      target_placement.inherit_from(pbucket->get_placement_rule());
-      attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
+      target_placement.inherit_from(s->bucket->get_placement_rule());
+      auto attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
       if (attr_iter != attrs.end()) {
         target_placement.storage_class = attr_iter->second.to_str();
       }
       op_ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier);
-      ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier" << op_ret <<
-                       " storage class " << target_placement.storage_class << dendl;
       if (op_ret < 0) {
-        s->err.message = "failed to restore object";
+       ldpp_dout(dpp, -1) << "failed to fetch tier placement handle, ret = " << op_ret << dendl;
         return op_ret;
+      } else {
+        ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier for " <<
+                         " storage class " << target_placement.storage_class << dendl;
       }
-      if (!restore_op) {
+
+      if (!tier->is_tier_type_s3()) {
+        ldpp_dout(dpp, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() <<
+                       " for storage class " << target_placement.storage_class << dendl;
+        s->err.message = "failed to restore object";
+        return -EINVAL;
+      }
+
+      if (read_through) {
         if (tier->allow_read_through()) {
           days = tier->get_read_through_restore_days();
         } else { //read-through is not enabled
@@ -1058,56 +1078,30 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::
           return op_ret;
         }
       }
-      // fill in the entry. XXX: Maybe we can avoid it by passing only necessary params
-      rgw_bucket_dir_entry ent;
-      ent.key.name = s->object->get_key().name;
-      ent.key.instance = s->object->get_key().instance;
-      ent.meta.accounted_size = ent.meta.size = s->obj_size;
-      ent.meta.etag = "" ;
-      uint64_t epoch = 0;
-      op_ret = get_system_versioning_params(s, &epoch, NULL);
-      if (!ent.key.instance.empty()) { // non-current versioned object
-        ent.flags |= rgw_bucket_dir_entry::FLAG_VER;
-      }
-      ldpp_dout(dpp, 20) << "getting versioning params tier placement handle cloud tier" << op_ret << dendl;
-      if (op_ret < 0) {
-        ldpp_dout(dpp, 20) << "failed to get versioning params, op_ret = " << op_ret << dendl;
-        s->err.message = "failed to restore object";
-        return op_ret;
-      }
-      op_ret = s->object->restore_obj_from_cloud(pbucket, tier.get(), target_placement, ent,
-                                                 s->cct, tier_config, epoch,
-                                                 days, dpp, y, s->bucket->get_info().flags);
+
+      op_ret = driver->get_rgwrestore()->restore_obj_from_cloud(s->bucket.get(),
+                     s->object.get(), tier.get(), days, y);
+
       if (op_ret < 0) {
-        ldpp_dout(dpp, 0) << "object " << ent.key.name << " fetching failed" << op_ret << dendl;
+       ldpp_dout(dpp, 0) << "Restore of object " << s->object->get_key() << " failed" << op_ret << dendl;
         s->err.message = "failed to restore object";
         return op_ret;
       }
-      ldpp_dout(dpp, 20) << "object " << ent.key.name << " fetching succeed" << dendl;
-      /*  Even if restore is complete the first read through request will return but actually downloaded
-       * object asyncronously.
+
+      ldpp_dout(dpp, 20) << "Restore of object " << s->object->get_key() << " succeed" << dendl;
+      /*  Even if restore is complete the first read through request will return
+       *  but actually downloaded object asyncronously.
        */
-      if (!restore_op) { //read-through
+      if (read_through) { //read-through
         op_ret = -ERR_REQUEST_TIMEOUT;
         ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
         s->err.message = "restore is still in progress";
       }
       return op_ret;
-    } else if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
-        if (!restore_op) {
-          op_ret = -ERR_REQUEST_TIMEOUT;
-          ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
-          s->err.message = "restore is still in progress";
-          return op_ret;
-        } else { 
-          return 1; // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS
-        } 
-    } else {
-      return 2; // corresponds to CLOUD_RESTORED
     }
   } catch (const buffer::end_of_buffer&) {
     //empty manifest; it's not cloud-tiered
-    if (restore_op) {
+    if (!read_through) {
       op_ret = -ERR_INVALID_OBJECT_STATE;
       s->err.message = "only cloud tier object can be restored";
     }
@@ -2590,7 +2584,7 @@ void RGWGetObj::execute(optional_yield y)
 
   if (get_type() == RGW_OP_GET_OBJ && get_data) {
     std::optional<uint64_t> days;
-    op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, false, y);
+    op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, true, y);
     if (op_ret < 0) {
       ldpp_dout(this, 4) << "Cannot get cloud tiered object: " << *s->object
                        <<". Failing with " << op_ret << dendl;
index 32c35b2bb435eda4bbcc24048630dccfde93475e..d63c8fe3c5dda7686211afdd3c11292eba065cba 100644 (file)
@@ -125,6 +125,7 @@ void RGWRealmReloader::reload()
          *env.site,
           cct->_conf->rgw_enable_gc_threads,
           cct->_conf->rgw_enable_lc_threads,
+         cct->_conf->rgw_enable_restore_threads,
           cct->_conf->rgw_enable_quota_threads,
           cct->_conf->rgw_run_sync_thread,
           cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
index 7b15308073dc1f3620e0040abe23a1189656f5ff..7e413ed5b58a0330f315bee7d05452887c629a2a 100644 (file)
@@ -565,41 +565,45 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
        }
       }
     } /* checksum_mode */
-    auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
-    if (attr_iter != attrs.end()) {
-      rgw::sal::RGWRestoreType rt;
-      bufferlist bl = attr_iter->second;
-      auto iter = bl.cbegin();
-      decode(rt, iter);
+     
+    rgw::sal::RGWRestoreStatus restore_status;
+    auto r_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+    if (r_iter != attrs.end()) {
+      bufferlist rbl = r_iter->second;
+      auto iter = rbl.cbegin();
+      decode(restore_status, iter);
 
-      rgw::sal::RGWRestoreStatus restore_status;
-      attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
-      if (attr_iter != attrs.end()) {
-        bufferlist bl = attr_iter->second;
-        auto iter = bl.cbegin();
-        decode(restore_status, iter);
-      }
-      
       //restore status
       if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
-          dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
-      }
-      if (rt == rgw::sal::RGWRestoreType::Temporary) {
-        auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
-        ceph::real_time expiration_date;
-        
-        if (expire_iter != attrs.end()) {
-          bufferlist bl = expire_iter->second;
+        dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
+      } else {
+        auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
+        if (attr_iter != attrs.end()) {
+          rgw::sal::RGWRestoreType rt;
+          bufferlist bl = attr_iter->second;
           auto iter = bl.cbegin();
-          decode(expiration_date, iter);
-        }
-        //restore status
-        dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
-        // temporary restore; set storage-class to cloudtier storage class
-        auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS);
+          decode(rt, iter);
 
-        if (c_iter != attrs.end()) {
-          attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second;
+          if (rt == rgw::sal::RGWRestoreType::Temporary) {
+            auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
+            ceph::real_time expiration_date;
+
+           if (expire_iter != attrs.end()) {
+              bufferlist bl = expire_iter->second;
+              auto iter = bl.cbegin();
+              decode(expiration_date, iter);
+            }
+
+            //restore status
+            dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
+
+            // temporary restore; set storage-class to cloudtier storage class
+            auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS);
+
+            if (c_iter != attrs.end()) {
+              attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second;
+            }
+          }
         }
       }
     }
@@ -3663,35 +3667,40 @@ void RGWRestoreObj_ObjStore_S3::send_response()
 
   if (restore_ret == 0) {
     s->err.http_ret = 202; // OK
-  } else if (restore_ret == 1) {
-    restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS;
-    set_req_state_err(s, restore_ret);
-    dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
-  } else if (restore_ret == 2) {
-    rgw::sal::Attrs attrs;
-    ceph::real_time expiration_date;
-    rgw::sal::RGWRestoreType rt;
-    attrs = s->object->get_attrs();
-    auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
-    auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
-
-    if (expire_iter != attrs.end()) {
-      bufferlist bl = expire_iter->second;
-      auto iter = bl.cbegin();
-      decode(expiration_date, iter);
-    }
-    
-    if (type_iter != attrs.end()) {
-      bufferlist bl = type_iter->second;
-      auto iter = bl.cbegin();
-      decode(rt, iter);
-    }
-    if (rt == rgw::sal::RGWRestoreType::Temporary) {
-      s->err.http_ret = 200; // OK
-      dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
-    } else {
-      s->err.http_ret = 200;
-      dump_header(s, "x-amz-restore", "ongoing-request=\"false\"");
+ } else {
+    rgw::sal::RGWRestoreStatus st = static_cast<rgw::sal::RGWRestoreStatus>(restore_ret);
+
+    if (st == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+      restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS;
+      set_req_state_err(s, restore_ret);
+      dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
+    } else if (st == rgw::sal::RGWRestoreStatus::CloudRestored) {
+      rgw::sal::Attrs attrs;
+      ceph::real_time expiration_date;
+      rgw::sal::RGWRestoreType rt;
+      attrs = s->object->get_attrs();
+      auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
+      auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
+
+      if (expire_iter != attrs.end()) {
+        bufferlist bl = expire_iter->second;
+        auto iter = bl.cbegin();
+        decode(expiration_date, iter);
+      }
+
+      if (type_iter != attrs.end()) {
+        bufferlist bl = type_iter->second;
+        auto iter = bl.cbegin();
+        decode(rt, iter);
+      }
+
+      if (rt == rgw::sal::RGWRestoreType::Temporary) {
+        s->err.http_ret = 200; // OK
+        dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
+      } else {
+        s->err.http_ret = 200;
+        dump_header(s, "x-amz-restore", "ongoing-request=\"false\"");
+      }
     }
   } 
 
diff --git a/src/rgw/rgw_restore.cc b/src/rgw/rgw_restore.cc
new file mode 100644 (file)
index 0000000..fb734b3
--- /dev/null
@@ -0,0 +1,520 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <fmt/chrono.h>
+#include <string.h>
+#include <iostream>
+#include <map>
+#include <algorithm>
+#include <tuple>
+#include <functional>
+
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/variant.hpp>
+
+#include "include/scope_guard.h"
+#include "include/function2.hpp"
+#include "common/Formatter.h"
+#include "common/containers.h"
+#include "common/split.h"
+#include <common/errno.h>
+#include "include/random.h"
+#include "cls/lock/cls_lock_client.h"
+#include "rgw_perf_counters.h"
+#include "rgw_common.h"
+#include "rgw_bucket.h"
+#include "rgw_restore.h"
+#include "rgw_zone.h"
+#include "rgw_string.h"
+#include "rgw_multi.h"
+#include "rgw_sal.h"
+#include "rgw_lc_tier.h"
+#include "rgw_notify.h"
+#include "common/dout.h"
+
+#include "fmt/format.h"
+
+#include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
+#include "services/svc_tier_rados.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw_restore
+
+
+constexpr int32_t hours_in_a_day = 24;
+constexpr int32_t secs_in_a_day = hours_in_a_day * 60 * 60;
+
+using namespace std;
+using namespace rgw::sal;
+
+void RGWRestoreEntry::dump(Formatter *f) const
+{
+  encode_json("Bucket", bucket, f);
+  encode_json("Object", obj_key, f);
+  if (days) {
+    encode_json("Days", days, f);
+  } else {
+    encode_json("Days", 0, f);
+  }
+  encode_json("Zone_id", zone_id, f);
+  encode_json("Status", static_cast<int>(status), f);
+}
+
+void RGWRestoreEntry::decode_json(JSONObj *obj)
+{
+  JSONDecoder::decode_json("Bucket", bucket, obj);
+  JSONDecoder::decode_json("Object", obj_key, obj);
+  JSONDecoder::decode_json("Days", days, obj);
+  JSONDecoder::decode_json("Zone_id", zone_id, obj);
+  int st;
+  JSONDecoder::decode_json("Status", st, obj);
+  status = static_cast<rgw::sal::RGWRestoreStatus>(st);
+}
+
+void RGWRestoreEntry::generate_test_instances(std::list<RGWRestoreEntry*>& l)
+{
+  auto p = new RGWRestoreEntry;
+  rgw_bucket bk("tenant1", "bucket1");
+  rgw_obj_key obj("object1");
+  rgw_obj_key obj2("object2");
+  uint64_t days1 = 3;
+  std::optional<uint64_t> days;
+  std::string zone_id = "zone1";
+  rgw::sal::RGWRestoreStatus status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+  
+  p->bucket = bk;
+  p->obj_key = obj;
+  p->zone_id = zone_id;
+  p->days = days;
+  p->status = status;
+  l.push_back(p);
+
+  p = new RGWRestoreEntry;
+  days = days1;
+  status = rgw::sal::RGWRestoreStatus::CloudRestored;
+  p->bucket = bk;
+  p->obj_key = obj2;
+  p->zone_id = zone_id;
+  p->days = days;
+  p->status = status;
+  l.push_back(p);
+
+  l.push_back(new RGWRestoreEntry);
+}
+
+void RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
+  cct = _cct;
+  driver = _driver;
+
+  /* max_objs indicates the number of shards or objects
+   * used to store Restore Entries */
+  max_objs = cct->_conf->rgw_restore_max_objs;
+  if (max_objs > HASH_PRIME)
+    max_objs = HASH_PRIME;
+
+  for (int i = 0; i < max_objs; i++) {
+    obj_names.push_back(fmt::format("{}.{}", restore_oid_prefix, i));
+  }
+  sal_restore = driver->get_restore(max_objs, obj_names);
+}
+
+void RGWRestore::finalize()
+{
+  obj_names.clear();
+}
+
+static inline std::ostream& operator<<(std::ostream &os, RGWRestoreEntry& ent) {
+  os << "<ent: bucket=";
+  os << ent.bucket;
+  os << "; obj_key=";
+  os << ent.obj_key;
+  os << "; days=";
+  os << ent.days;
+  os << "; zone_id=";
+  os << ent.zone_id;
+  os << "; status=";
+  os << rgw_restore_status_dump(ent.status);
+  os << ">";
+
+  return os;
+}
+
+void RGWRestore::RestoreWorker::stop()
+{
+  std::lock_guard l{lock};
+  cond.notify_all();
+}
+
+bool RGWRestore::going_down()
+{
+  return down_flag;
+}
+
+void RGWRestore::start_processor()
+{
+  worker = std::make_unique<RGWRestore::RestoreWorker>(this, cct, this);
+  worker->create("rgw_restore");
+}
+
+void RGWRestore::stop_processor()
+{
+  down_flag = true;
+  if (worker) {
+    worker->stop();
+    worker->join();
+  }
+  worker.reset(nullptr);
+}
+
+unsigned RGWRestore::get_subsys() const
+{
+  return dout_subsys;
+}
+
+std::ostream& RGWRestore::gen_prefix(std::ostream& out) const
+{
+  return out << "restore: ";
+}
+
+/* Hash based on both <bucket, obj> */
+int RGWRestore::choose_oid(const RGWRestoreEntry& e) {
+  int index;
+  const auto& name = e.bucket.name + e.obj_key.name + e.obj_key.instance;
+  index = ((ceph_str_hash_linux(name.data(), name.size())) % max_objs);
+  return static_cast<int>(index);
+}
+
+void *RGWRestore::RestoreWorker::entry() {
+  do {
+    utime_t start = ceph_clock_now();
+    int r = 0;
+    r = restore->process(this, null_yield);
+    if (r < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: restore process() returned error r=" << r << dendl;
+    }
+    if (restore->going_down())
+      break;
+    utime_t end = ceph_clock_now();
+    end -= start;
+    int secs = cct->_conf->rgw_restore_processor_period;
+
+    if (secs <= end.sec())
+      continue; // next round
+
+    secs -= end.sec();
+    std::unique_lock locker{lock};
+    cond.wait_for(locker, std::chrono::seconds(secs));
+  } while (!restore->going_down());
+
+  return NULL;
+
+}
+
+int RGWRestore::process(RestoreWorker* worker, optional_yield y)
+{
+  int max_secs = cct->_conf->rgw_restore_lock_max_time;
+
+  const int start = ceph::util::generate_random_number(0, max_objs - 1);
+  for (int i = 0; i < max_objs; i++) {
+    int index = (i + start) % max_objs;
+    int ret = process(index, max_secs, y);
+    if (ret < 0)
+      return ret;
+  }
+  return 0;
+}
+
+/* 
+ * Given an index, fetch a list of restore entries to process. After each
+ * iteration, trim the list to the last marker read.
+ *
+ * While processing the entries, if any of their restore operation is still in
+ * progress, such entries are added back to the list.
+ */ 
+int RGWRestore::process(int index, int max_secs, optional_yield y)
+{
+  ldpp_dout(this, 20) << "RGWRestore::process entered with Restore index_shard="
+                     << index << ", max_secs=" << max_secs << dendl;
+
+  /* list used to gather still IN_PROGRESS */
+  std::list<RGWRestoreEntry> r_entries;
+
+  std::unique_ptr<rgw::sal::RestoreSerializer> serializer =
+                       sal_restore->get_serializer(std::string(restore_index_lock_name),
+                                       std::string(obj_names[index]),
+                                       worker->thr_name());
+  utime_t end = ceph_clock_now();
+
+  /* max_secs should be greater than zero. We don't want a zero max_secs
+   * to be translated as no timeout, since we'd then need to break the
+   * lock and that would require a manual intervention. In this case
+   * we can just wait it out. */
+
+  if (max_secs <= 0)
+    return -EAGAIN;
+
+  end += max_secs;
+  utime_t time(max_secs, 0);
+  int ret = serializer->try_lock(this, time, null_yield);
+  if (ret == -EBUSY || ret == -EEXIST) {
+    /* already locked by another lc processor */
+    ldpp_dout(this, 0) << "RGWRestore::process() failed to acquire lock on "
+                      << obj_names[index] << dendl;
+    return -EBUSY;
+  }
+  if (ret < 0)
+    return 0;
+
+  std::unique_lock<rgw::sal::RestoreSerializer> lock(*(serializer.get()), std::adopt_lock);
+  std::string marker;
+  std::string next_marker;
+  bool truncated = false;
+
+  do {
+    int max = 100;
+    std::vector<RGWRestoreEntry> entries;
+
+    ret = sal_restore->list(this, y, index, marker, &next_marker, max, entries, &truncated);
+    ldpp_dout(this, 20) <<
+      "RGWRestore::process sal_restore->list returned with returned:" << ret <<
+      ", entries.size=" << entries.size() << ", truncated=" << truncated <<
+      ", next_marker='" << next_marker << "'" << dendl;
+
+    if (entries.size() == 0) {
+      lock.unlock();
+      return 0;
+    }
+
+    if (ret < 0)
+      goto done;
+
+    marker = next_marker;
+    std::vector<RGWRestoreEntry>::iterator iter;
+    for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      RGWRestoreEntry entry = *iter;
+
+      ret = process_restore_entry(entry, y);
+
+      if (entry.status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+        r_entries.push_back(entry);
+      }
+
+      ///process all entries, trim and re-add
+      utime_t now = ceph_clock_now();
+      if (now >= end) {
+        goto done;
+      }
+
+      if (going_down()) {
+       // leave early, even if tag isn't removed, it's ok since it
+       // will be picked up next time around
+       goto done;
+      }
+    }
+    ret = sal_restore->trim_entries(this, y, index, marker);
+    if (ret < 0) {
+      ldpp_dout(this, 0) << "RGWRestore::process() failed to trim entries on "
+                        << obj_names[index] << dendl;
+    }
+
+    if (!r_entries.empty()) {
+      ret = sal_restore->add_entries(this, y, index, r_entries);
+      if (ret < 0) {
+        ldpp_dout(this, 0) << "RGWRestore::process() failed to add entries on "
+                          << obj_names[index] << dendl;
+      }
+    }
+
+    r_entries.clear();
+  } while (truncated);
+
+done:
+  lock.unlock();
+
+  return 0;
+}
+
+int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y)
+{
+  int ret = 0;
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  std::unique_ptr<rgw::sal::Object> obj;
+  std::unique_ptr<rgw::sal::PlacementTier> tier;
+  std::optional<uint64_t> days = entry.days;
+  // Ensure its the same source zone processing temp entries as we do not
+  // replicate temp restored copies
+  if (days) { // temp copy
+    auto& zone_id = entry.zone_id;
+    if (driver->get_zone()->get_id() != zone_id) {
+      // XXX: Do we need to check if the zone is still valid
+      return 0; // skip processing this entry in this zone
+    }
+  }
+
+  // fill in the details from entry
+  // bucket, obj, days, state=in_progress
+  ret = driver->load_bucket(this, entry.bucket, &bucket, null_yield);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "Restore:get_bucket for " << bucket->get_name()
+                      << " failed" << dendl;
+    return ret;
+  }
+  obj = bucket->get_object(entry.obj_key);
+
+  ret = obj->load_obj_state(this, null_yield, true);
+
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "Restore:get_object for " << entry.obj_key
+                      << " failed" << dendl;
+    return ret;
+  }
+
+  rgw_placement_rule target_placement;
+  target_placement.inherit_from(bucket->get_placement_rule());
+
+  auto& attrs = obj->get_attrs();
+  auto attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+  rgw::sal::RGWRestoreStatus restore_status = rgw::sal::RGWRestoreStatus::None;
+  if (attr_iter != attrs.end()) {
+    bufferlist bl = attr_iter->second;
+    auto iter = bl.cbegin();
+    decode(restore_status, iter);
+  }
+  if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) {
+    // XXX: Check if expiry-date needs to be update
+    ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " already done" << dendl;
+    entry.status = rgw::sal::RGWRestoreStatus::CloudRestored;
+    return 0;
+  }
+
+  attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
+  if (attr_iter != attrs.end()) {
+    target_placement.storage_class = attr_iter->second.to_str();
+  }
+  ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier);
+
+  if (ret < 0) {
+    ldpp_dout(this, -1) << "failed to fetch tier placement handle, ret = " << ret << dendl;
+    return ret;
+  } else {
+    ldpp_dout(this, 20) << "getting tier placement handle cloud tier for " <<
+                         " storage class " << target_placement.storage_class << dendl;
+  }
+
+  if (!tier->is_tier_type_s3()) {
+    ldpp_dout(this, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() << 
+                      " for storage class " << target_placement.storage_class << dendl;
+    return -EINVAL;
+  }
+
+  // now go ahead with restoring object
+  // XXX: first check if its already restored?
+  bool in_progress = true;
+  ret = obj->restore_obj_from_cloud(bucket.get(), tier.get(), cct, days, in_progress,
+                                     this, y);
+  if (ret < 0) {
+    ldpp_dout(this, -1) << "Restore of object(" << obj->get_key() << ") failed" << ret << dendl;
+    auto reset_ret = set_cloud_restore_status(this, obj.get(), y, rgw::sal::RGWRestoreStatus::RestoreFailed);
+    entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed;
+    if (reset_ret < 0) {
+      ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << obj->get_key() << ") " << reset_ret << dendl;
+    }
+    return ret;
+  }
+
+  if (in_progress) {
+    ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " still in progress" << dendl;
+    entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+  } else {
+    ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " succeeded" << dendl;
+    entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed;
+  }
+  return ret;
+}
+
+time_t RGWRestore::thread_stop_at()
+{
+  uint64_t interval = (cct->_conf->rgw_restore_debug_interval > 0)
+    ? cct->_conf->rgw_restore_debug_interval : secs_in_a_day;
+
+  return time(nullptr) + interval;
+}
+
+int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp,
+                          rgw::sal::Object* pobj, optional_yield y,
+                          const rgw::sal::RGWRestoreStatus& restore_status)
+{
+  int ret = -1;
+
+  if (!pobj)
+    return ret;
+
+  pobj->set_atomic();
+
+  bufferlist bl;
+  using ceph::encode;
+  encode(restore_status, bl);
+
+  ret = pobj->modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false);
+
+  return ret;
+}
+
+int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
+                                      rgw::sal::Object* pobj,
+                                      rgw::sal::PlacementTier* tier,
+                                      std::optional<uint64_t> days, optional_yield y)
+{
+  int ret = 0;
+
+  if (!pbucket || !pobj) {
+    ldpp_dout(this, -1) << "ERROR: Invalid bucket/object. Restore failed" << dendl;
+    return -EINVAL;
+  }
+
+  // set restore_status as RESTORE_ALREADY_IN_PROGRESS
+  ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress);
+  if (ret < 0) {
+    ldpp_dout(this, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << pobj->get_key() << " failed, ret=" << ret << dendl;
+    return ret;
+  }
+
+  // now go ahead with restoring object
+  bool in_progress = false;
+  ret = pobj->restore_obj_from_cloud(pbucket, tier, cct, days, in_progress, this, y);
+
+  if (ret < 0) {
+    ldpp_dout(this, 0) << "object " << pobj->get_key() << " fetching failed" << ret << dendl;
+    auto reset_ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreFailed);
+
+    if (reset_ret < 0) {
+      ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << pobj->get_key() << ") " << reset_ret << dendl;
+    }
+
+    return ret;
+  }
+
+  ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl;
+  if (in_progress) {
+    // add restore entry to the list
+    RGWRestoreEntry entry;
+    entry.bucket = pbucket->get_key();
+    entry.obj_key = pobj->get_key();
+    entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+    entry.days = days;
+    entry.zone_id = driver->get_zone()->get_id(); 
+
+    int index = choose_oid(entry);
+    ret = sal_restore->add_entry(this, y, index, entry);
+
+    if (ret < 0) {
+      ldpp_dout(this, -1) << "Adding restore entry of object(" << pobj->get_key() << ") failed" << ret << dendl;
+    }
+  }
+
+  ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl;
+  return ret;
+}
diff --git a/src/rgw/rgw_restore.h b/src/rgw/rgw_restore.h
new file mode 100644 (file)
index 0000000..662745d
--- /dev/null
@@ -0,0 +1,140 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <map>
+#include <array>
+#include <string>
+#include <iostream>
+
+#include "common/debug.h"
+
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+#include "common/ceph_mutex.h"
+#include "common/Cond.h"
+#include "common/iso_8601.h"
+#include "common/Thread.h"
+#include "rgw_common.h"
+#include "cls/rgw/cls_rgw_types.h"
+#include "rgw_sal.h"
+
+#include <atomic>
+#include <tuple>
+
+#define HASH_PRIME 7877
+#define MAX_ID_LEN 255
+static constexpr std::string_view restore_oid_prefix = "restore";
+static constexpr std::string_view restore_index_lock_name = "restore_process";
+
+/** Single Restore entry state */
+struct RGWRestoreEntry {
+  rgw_bucket bucket;
+  rgw_obj_key obj_key;
+  std::optional<uint64_t> days;
+  std::string zone_id; // or should it be zone name?
+  rgw::sal::RGWRestoreStatus status;
+
+  RGWRestoreEntry() {}
+
+  void encode(ceph::buffer::list& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(bucket, bl);
+    encode(obj_key, bl);
+    encode(days, bl);
+    encode(zone_id, bl);
+    encode(status, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(ceph::buffer::list::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(bucket, bl);
+    decode(obj_key, bl);
+    decode(days, bl);
+    decode(zone_id, bl);
+    decode(status, bl);
+    DECODE_FINISH(bl);
+  }
+  void dump(ceph::Formatter* f) const;
+  void decode_json(JSONObj* obj);
+  static void generate_test_instances(std::list<RGWRestoreEntry*>& l);
+};
+WRITE_CLASS_ENCODER(RGWRestoreEntry)
+
+class RGWRestore : public DoutPrefixProvider {
+  CephContext *cct;
+  rgw::sal::Driver* driver;
+  std::unique_ptr<rgw::sal::Restore> sal_restore;
+  int max_objs{0};
+  std::vector<std::string_view> obj_names;
+  std::atomic<bool> down_flag = { false };
+
+  class RestoreWorker : public Thread
+  {
+    const DoutPrefixProvider *dpp;
+    CephContext *cct;
+    RGWRestore *restore;
+    ceph::mutex lock = ceph::make_mutex("RestoreWorker");
+    ceph::condition_variable cond;
+
+  public:
+
+    using lock_guard = std::lock_guard<std::mutex>;
+    using unique_lock = std::unique_lock<std::mutex>;
+
+    RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWRestore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {}
+    RGWRestore* get_restore() { return restore; }
+    std::string thr_name() {
+      return std::string{"restore_thrd: "}; // + std::to_string(ix);
+    }
+    void *entry() override;
+    void stop();
+
+    friend class RGWRados;
+  }; // RestoreWorker
+
+  std::unique_ptr<RGWRestore::RestoreWorker> worker;
+
+public:
+  ~RGWRestore() {
+    stop_processor();
+    finalize();
+  }
+
+  friend class RGWRados;
+
+  RGWRestore() : cct(nullptr), driver(nullptr), max_objs(0) {}
+
+  void initialize(CephContext *_cct, rgw::sal::Driver* _driver);
+  void finalize();
+
+  bool going_down();
+  void start_processor();
+  void stop_processor();
+
+  CephContext *get_cct() const override { return cct; }
+  rgw::sal::Restore* get_restore() const { return sal_restore.get(); }
+  unsigned get_subsys() const;
+
+  std::ostream& gen_prefix(std::ostream& out) const;
+
+  int process(RestoreWorker* worker, optional_yield y);
+  int choose_oid(const RGWRestoreEntry& e);
+  int process(int index, int max_secs, optional_yield y);
+  int process_restore_entry(RGWRestoreEntry& entry, optional_yield y);
+  time_t thread_stop_at();
+
+  /** Set the restore status for the given object */
+  int set_cloud_restore_status(const DoutPrefixProvider* dpp, rgw::sal::Object* pobj,
+                          optional_yield y,
+                          const rgw::sal::RGWRestoreStatus& restore_status);
+
+  /** Given <bucket, obj>, restore the object from the cloud-tier. In case the
+   * object cannot be restored immediately, save that restore state(/entry) 
+   * to be procesed later by RestoreWorker thread. */
+  int restore_obj_from_cloud(rgw::sal::Bucket* pbucket, rgw::sal::Object* pobj,
+                            rgw::sal::PlacementTier* tier,
+                            std::optional<uint64_t> days, optional_yield y);
+};
index 03aa37e43436aa388828674530eea3f3eb207728..acb4f14f352ed011b198e5a227dfbe7dc12b3745 100644 (file)
@@ -74,6 +74,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                                                     const rgw::SiteConfig& site_config,
                                                     bool use_gc_thread,
                                                     bool use_lc_thread,
+                                                    bool use_restore_thread,
                                                     bool quota_threads,
                                                     bool run_sync_thread,
                                                     bool run_reshard_thread,
@@ -94,6 +95,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_use_gc(use_gc)
                 .set_run_gc_thread(use_gc_thread)
                 .set_run_lc_thread(use_lc_thread)
+               .set_run_restore_thread(use_restore_thread)
                 .set_run_quota_threads(quota_threads)
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
@@ -121,6 +123,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider*
                 .set_use_datacache(true)
                 .set_run_gc_thread(use_gc_thread)
                 .set_run_lc_thread(use_lc_thread)
+               .set_run_restore_thread(use_restore_thread)
                 .set_run_quota_threads(quota_threads)
                 .set_run_sync_thread(run_sync_thread)
                 .set_run_reshard_thread(run_reshard_thread)
index ef798b585fe78c8a6d9c7740fef9f4809f965ecd..a30b945f421d97e9edd899099883854204626fcb 100644 (file)
@@ -36,6 +36,7 @@
 struct RGWBucketEnt;
 class RGWRESTMgr;
 class RGWLC;
+class RGWRestore;
 struct rgw_user_bucket;
 class RGWUsageBatch;
 class RGWCoroutinesManagerRegistry;
@@ -51,6 +52,7 @@ class RGWZonePlacementInfo;
 struct rgw_pubsub_topic;
 struct RGWOIDCProviderInfo;
 struct RGWRoleInfo;
+struct RGWRestoreEntry;
 
 using RGWBucketListNameFilter = std::function<bool (const std::string&)>;
 
@@ -159,7 +161,7 @@ static constexpr uint32_t FLAG_PREVENT_VERSIONING = 0x0002;
 // delete object where head object is missing)
 static constexpr uint32_t FLAG_FORCE_OP = 0x0004;
 
-enum RGWRestoreStatus : uint8_t {
+enum class RGWRestoreStatus : uint8_t {
   None  = 0,
   RestoreAlreadyInProgress = 1,
   CloudRestored = 2,
@@ -472,6 +474,9 @@ class Driver {
     virtual int cluster_stat(RGWClusterStat& stats) = 0;
     /** Get a @a Lifecycle object. Used to manage/run lifecycle transitions */
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
+    /** Get a @a Restore object. Used to manage/run restore objects */
+    virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                               const std::vector<std::string_view>& obj_names) = 0;
     /** Reset the temporarily restored objects which are expired */
     virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) = 0;
 
@@ -563,6 +568,8 @@ class Driver {
                                          const DoutPrefixProvider* dpp) = 0;
     /** Get access to the lifecycle management thread */
     virtual RGWLC* get_rgwlc(void) = 0;
+    /** Get access to the tier restore management thread */
+    virtual RGWRestore* get_rgwrestore(void) = 0;   
     /** Get access to the coroutine registry.  Used to create new coroutine managers */
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0;
 
@@ -1265,15 +1272,11 @@ class Object {
                           optional_yield y) = 0;
     virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
-                          RGWObjTier& tier_config,
-                          uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) = 0;
+                          optional_yield y) = 0;
     /** Check to see if two placement rules match */
     virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) = 0;
     /** Dump driver-specific object layout info in JSON */
@@ -1667,6 +1670,53 @@ public:
                                                       const std::string& cookie) = 0;
 };
 
+/** @brief Abstraction of a serializer for Restore
+ */
+class RestoreSerializer : public Serializer {
+public:
+  RestoreSerializer() {}
+  virtual ~RestoreSerializer() = default;
+};
+
+/**
+ * @brief Abstraction for restore processing
+ *
+ * The Restore class is designed to manage the restoration of objects
+ * from cloud tier storage back into the Ceph cluster. This is particularly used
+ * for objects stored in cold storage solutions like AWS Glacier or Tape-based systems,
+ * where retrieval operations are asynchronous and can take a significant amount of time.
+ */
+class Restore {
+
+public:
+  Restore() = default;
+  virtual ~Restore() = default;
+  /** Add a single restore entry state */
+  virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+                 int index, const RGWRestoreEntry& r_entry) = 0;
+  /** Add list of restore entries */
+  virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+              int index, const std::list<RGWRestoreEntry>& restore_entries) = 0;
+  /** List all known entries given a marker */
+  virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+                  int index,
+                  const std::string& marker, std::string* out_marker,
+                  uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+                  bool* truncated) = 0;
+
+  /** Trim restore entries upto the marker */
+  virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+                         int index, const std::string_view& marker) = 0;
+  /* Check if the list is empty */
+  virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
+
+  /** Get a serializer for restore processing */
+  virtual std::unique_ptr<RestoreSerializer> get_serializer(
+                                               const std::string& lock_name,
+                                               const std::string& oid,
+                                               const std::string& cookie) = 0;
+};
+  
 /**
  * @brief Abstraction for a Notification event
  *
@@ -1885,6 +1935,7 @@ public:
                                      const rgw::SiteConfig& site_config,
                                      bool use_gc_thread,
                                      bool use_lc_thread,
+                                     bool use_restore_thread,
                                      bool quota_threads,
                                      bool run_sync_thread,
                                      bool run_reshard_thread,
@@ -1897,6 +1948,7 @@ public:
                                                   site_config,
                                                   use_gc_thread,
                                                   use_lc_thread,
+                                                  use_restore_thread,
                                                   quota_threads,
                                                   run_sync_thread,
                                                   run_reshard_thread,
@@ -1923,6 +1975,7 @@ public:
                                                const rgw::SiteConfig& site_config,
                                                bool use_gc_thread,
                                                bool use_lc_thread,
+                                               bool use_restore_thread,
                                                bool quota_threads,
                                                bool run_sync_thread,
                                                bool run_reshard_thread,
index fae28f8438f09bb92ebb4b699228aef30e9ec60a..077f9583831cd01d90978dd9736b345c512ad2d7 100644 (file)
@@ -1920,6 +1920,12 @@ namespace rgw::sal {
     return std::make_unique<LCDBSerializer>(store, oid, lock_name, cookie);
   }
 
+  std::unique_ptr<Restore> DBStore::get_restore(const int n_objs,
+                       const std::vector<std::string_view>& obj_names)
+  {
+    return nullptr;
+  }
+  
   std::unique_ptr<Notification> DBStore::get_notification(
     rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s,
     rgw::notify::EventType event_type, optional_yield y,
index 0173baf297cc387002d78c0790fb9caf5a55fa85..51f0f2f7c648eb2cc2a83e634df52591d1aea17a 100644 (file)
@@ -893,6 +893,8 @@ public:
       virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
       virtual int cluster_stat(RGWClusterStat& stats) override;
       virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+      virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                    const std::vector<std::string_view>& obj_names) override;
       virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
 
   virtual std::unique_ptr<Notification> get_notification(
@@ -925,6 +927,7 @@ public:
                                   const std::string& topic_queue) override;
 
       virtual RGWLC* get_rgwlc(void) override;
+      virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
       virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
       virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
       virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
index 43fdb006c96731f4482e54779ccb4a87fe08388f..a16fe882051c6aa45cfddbee9dec7452996a5ece 100644 (file)
@@ -440,6 +440,13 @@ bool FilterDriver::process_expired_objects(const DoutPrefixProvider *dpp,
   return next->process_expired_objects(dpp, y);
 }
 
+std::unique_ptr<Restore> FilterDriver::get_restore(const int n_objs,
+                       const std::vector<std::string_view>& obj_names)
+{
+  std::unique_ptr<Restore> restore = next->get_restore(n_objs, obj_names);
+  return std::make_unique<FilterRestore>(std::move(restore));
+}
+
 std::unique_ptr<Notification> FilterDriver::get_notification(rgw::sal::Object* obj,
                                rgw::sal::Object* src_obj, req_state* s,
                                rgw::notify::EventType event_type, optional_yield y,
@@ -487,6 +494,11 @@ RGWLC* FilterDriver::get_rgwlc()
   return next->get_rgwlc();
 }
 
+RGWRestore* FilterDriver::get_rgwrestore()
+{
+  return next->get_rgwrestore();
+}
+
 RGWCoroutinesManagerRegistry* FilterDriver::get_cr_registry()
 {
   return next->get_cr_registry();
@@ -1135,18 +1147,14 @@ int FilterObject::transition_to_cloud(Bucket* bucket,
 
 int FilterObject::restore_obj_from_cloud(Bucket* bucket,
                          rgw::sal::PlacementTier* tier,
-                         rgw_placement_rule& placement_rule,
-                         rgw_bucket_dir_entry& o,
                          CephContext* cct,
-                         RGWObjTier& tier_config,
-                         uint64_t olh_epoch,
                          std::optional<uint64_t> days,
-                         const DoutPrefixProvider* dpp, 
-                         optional_yield y,
-                         uint32_t flags)
+                         bool& in_progress,
+                         const DoutPrefixProvider* dpp,
+                         optional_yield y)
 {
   return next->restore_obj_from_cloud(nextBucket(bucket), nextPlacementTier(tier),
-           placement_rule, o, cct, tier_config, olh_epoch, days, dpp, y, flags);
+           cct, days, in_progress, dpp, y);
 }
 
 bool FilterObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
@@ -1441,6 +1449,49 @@ std::unique_ptr<LCSerializer> FilterLifecycle::get_serializer(
   return std::make_unique<FilterLCSerializer>(std::move(ns));
 }
 
+int FilterRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur,
+                                optional_yield y)
+{
+  return next->try_lock(dpp, dur, y);
+}
+
+std::unique_ptr<RestoreSerializer> FilterRestore::get_serializer(const std::string& lock_name,
+                                                      const std::string& oid,
+                                                      const std::string& cookie) {
+  std::unique_ptr<RestoreSerializer> ns;
+  ns = next->get_serializer(lock_name, oid, cookie);
+  return std::make_unique<FilterRestoreSerializer>(std::move(ns));
+}
+
+int FilterRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+                            int index, const RGWRestoreEntry& r_entry) {
+  return next->add_entry(dpp, y, index, r_entry);
+}
+
+int FilterRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+                              int index,
+                              const std::list<RGWRestoreEntry>& restore_entries) {
+  return next->add_entries(dpp, y, index, restore_entries);
+}
+
+/** List all known entries */
+int FilterRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
+                  int index, const std::string& marker, std::string* out_marker,
+                  uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+                  bool* truncated) {
+  return next->list(dpp, y, index, marker, out_marker, max_entries,
+                   entries, truncated);
+}
+
+int FilterRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+                               int index, const std::string_view& marker) {
+  return next->trim_entries(dpp, y, index, marker);
+}
+
+int FilterRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
+  return next->is_empty(dpp, y);
+}
+
 int FilterNotification::publish_reserve(const DoutPrefixProvider *dpp,
                                        RGWObjTags* obj_tags)
 {
index 4b0b97fe22087bbf185fbced530e59940432b9bd..7de61ef50cee5170329077e9e4358b5ba26926c9 100644 (file)
@@ -292,6 +292,8 @@ public:
   }
   virtual int cluster_stat(RGWClusterStat& stats) override;
   virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+  virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+                       const std::vector<std::string_view>& obj_names) override;
   virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
 
   virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj,
@@ -383,6 +385,7 @@ public:
     return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
   }
   virtual RGWLC* get_rgwlc(void) override;
+  virtual RGWRestore* get_rgwrestore(void) override;
   virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
 
   virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket,
@@ -833,15 +836,11 @@ public:
                                  optional_yield y) override;
     virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
-                          RGWObjTier& tier_config,
-                          uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) override;
+                          optional_yield y) override;
   virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
   virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y,
                              Formatter* f) override;
@@ -1059,6 +1058,51 @@ public:
                                                       const std::string& cookie) override;
 };
 
+class FilterRestoreSerializer : public RestoreSerializer {
+
+protected:
+  std::unique_ptr<RestoreSerializer> next;
+
+public:
+  FilterRestoreSerializer(std::unique_ptr<RestoreSerializer> _next) : next(std::move(_next)) {}
+  virtual ~FilterRestoreSerializer() = default;
+  virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
+  virtual int unlock() override { return next->unlock(); }
+  virtual void print(std::ostream& out) const override { return next->print(out); }
+};
+
+class FilterRestore : public Restore {
+
+protected:
+  std::unique_ptr<Restore> next;
+
+public:
+  FilterRestore(std::unique_ptr<Restore> _next) : next(std::move(_next)) {}
+  ~FilterRestore() override = default;
+
+  virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+                 int index, const RGWRestoreEntry& r_entry) override;
+  virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+              int index,
+              const std::list<RGWRestoreEntry>& restore_entries) override;
+
+  /** List all known entries */
+  virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+                  int index,
+                  const std::string& marker, std::string* out_marker,
+                  uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+                  bool* truncated) override;
+  virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+                         int index, const std::string_view& marker) override;
+  virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
+
+  /** Get a serializer for lifecycle */
+  virtual std::unique_ptr<RestoreSerializer> get_serializer(
+                               const std::string& lock_name,
+                               const std::string& oid,
+                               const std::string& cookie) override;
+};
+  
 class FilterNotification : public Notification {
 protected:
   std::unique_ptr<Notification> next;
index 566a933f8ca05fa592b8e501e73f8eb78034210d..dbdcbf98b218d92d3ec6b22b2e467ec5ffd207c6 100644 (file)
@@ -39,6 +39,7 @@ namespace sal {
   class Object;
   class MultipartUpload;
   class Lifecycle;
+  class Restore;
   class Notification;
   class Writer;
   class PlacementTier;
index da98eb41f997becd97a4979f3c534750e5085bab..0364c84d1ac03b279609d15dc1112bad698e2864 100644 (file)
@@ -370,15 +370,11 @@ class StoreObject : public Object {
     }
     virtual int restore_obj_from_cloud(Bucket* bucket,
                           rgw::sal::PlacementTier* tier,
-                          rgw_placement_rule& placement_rule,
-                          rgw_bucket_dir_entry& o,
                           CephContext* cct,
-                          RGWObjTier& tier_config,
-                          uint64_t olh_epoch,
                           std::optional<uint64_t> days,
+                          bool& in_progress,
                           const DoutPrefixProvider* dpp,
-                          optional_yield y,
-                          uint32_t flags) override {
+                          optional_yield y) override {
       return -1;
     }
     jspan_context& get_trace() override { return trace_ctx; }
@@ -459,6 +455,20 @@ public:
   virtual void print(std::ostream& out) const override { out << oid; }
 };
 
+class StoreRestoreSerializer : public RestoreSerializer {
+
+protected:
+  std::string oid;
+
+public:
+  StoreRestoreSerializer() {}
+  StoreRestoreSerializer(std::string _oid) : oid(_oid) {}
+
+  virtual ~StoreRestoreSerializer() = default;
+  virtual void print(std::ostream& out) const override { out << oid; }
+};
+
+
 class StoreNotification : public Notification {
 protected:
   Object* obj;
index ed693b15f9d5d39c8845748289c4ba1cc62cee71..90ff7cef106c4ccf2776468c03efb8a4145fb363 100644 (file)
@@ -305,6 +305,7 @@ void RGWZoneParams::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("placement_pools", placement_pools, obj);
   JSONDecoder::decode_json("tier_config", tier_config, obj);
   JSONDecoder::decode_json("realm_id", realm_id, obj);
+  JSONDecoder::decode_json("restore_pool", restore_pool, obj);
 }
 
 void RGWZoneParams::dump(Formatter *f) const
@@ -333,6 +334,7 @@ void RGWZoneParams::dump(Formatter *f) const
   encode_json("placement_pools", placement_pools, f);
   encode_json("tier_config", tier_config, f);
   encode_json("realm_id", realm_id, f);
+  encode_json("restore_pool", restore_pool, f);
 }
 
 int RGWZoneParams::init(const DoutPrefixProvider *dpp, 
@@ -1280,6 +1282,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y,
   info.dedup_pool = fix_zone_pool_dup(pools, info.name, ".rgw.dedup", info.dedup_pool);
   info.gc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:gc", info.gc_pool);
   info.lc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:lc", info.lc_pool);
+  info.restore_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:restore", info.restore_pool);
   info.log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log", info.log_pool);
   info.intent_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:intent", info.intent_log_pool);
   info.usage_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:usage", info.usage_log_pool);
index 3adfda594eb2bc8154369f8b53b87cf809b6a58e..f7be1abd2d43f07a7b0e4dda539e0a67775549f3 100644 (file)
@@ -349,6 +349,7 @@ int main(int argc, const char **argv)
                              false,
                              false,
                              false,
+                             false,
                              false,
                               true, true, null_yield, 
                              false));