From: Soumya Koduri Date: Wed, 30 Apr 2025 20:36:21 +0000 (+0530) Subject: rgw/cloud-restore: Support restoration of objects transitioned to Glacier/Tape endpoint X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ef96bb0d6137bacf45b9ee2f99ad5bcd8b3b6add;p=ceph.git rgw/cloud-restore: Support restoration of objects transitioned to Glacier/Tape endpoint Restoration of objects from certain cloud services (like Glacier/Tape) could take significant amount of time (even days). Hence store the state of such restore requests and periodically process them. Brief summary of changes * Refactored existing restore code to consolidate and move all restore processing into rgw_restore* file/class * RGWRestore class is defined to manage the restoration of objects. * Lastly, for SAL_RADOS, FIFO is used to store and read restore entries. Currently, this PR handles storing state of restore requests sent to cloud-glacier tier-type which need async processing. The changes are tested with AWS Glacier Flexible Retrieval with tier_type Expedited and Standard. Reviewed-by: Matt Benjamin Reviewed-by: Adam Emerson Reviewed-by: Jiffin Tony Thottan Reviewed-by: Daniel Gryniewicz Signed-off-by: Soumya Koduri --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 3303ce4a6c5e7..9868bbc63f190 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -230,6 +230,7 @@ options: see_also: - rgw_enable_gc_threads - rgw_enable_lc_threads + - rgw_enable_restore_threads with_legacy: true - name: rgw_enable_gc_threads type: bool @@ -246,6 +247,7 @@ options: see_also: - rgw_enable_quota_threads - rgw_enable_lc_threads + - rgw_enable_restore_threads with_legacy: true - name: rgw_enable_lc_threads type: bool @@ -263,6 +265,24 @@ options: see_also: - rgw_enable_gc_threads - rgw_enable_quota_threads + - rgw_enable_restore_threads + with_legacy: true +- name: rgw_enable_restore_threads + type: bool + level: advanced + desc: Enables the objects' restore maintenance thread. + long_desc: The objects restore maintenance thread is responsible for all the objects + restoration related maintenance work. The thread itself can be disabled, but in order + for the restore from the cloud to work correctly, at least one RGW in each zone needs + to have this thread running. Having the thread enabled on multiple RGW processes within + the same zone can spread some of the maintenance work between them. + default: true + services: + - rgw + see_also: + - rgw_enable_gc_threads + - rgw_enable_quota_threads + - rgw_enable_lc_threads with_legacy: true - name: rgw_data type: str @@ -475,6 +495,35 @@ options: services: - rgw with_legacy: true +- name: rgw_restore_max_objs + type: int + level: advanced + desc: Number of shards for restore processing + long_desc: Number of RADOS objects to use for storing restore entries which are in progress. This affects concurrency of restore maintenance, as shards can be processed in parallel. + default: 32 + services: + - rgw + with_legacy: true +- name: rgw_restore_lock_max_time + type: int + level: dev + default: 90 + services: + - rgw + see_also: + with_legacy: true +- name: rgw_restore_processor_period + type: int + level: advanced + desc: Restore cycle run time + long_desc: The amount of time between the start of consecutive runs of the restore + processing threads. If the thread runs takes more than this period, it will + not wait before running again. + fmt_desc: The cycle time for restore state processing. + default: 15_min + services: + - rgw + with_legacy: true - name: rgw_mp_lock_max_time type: int level: advanced @@ -1270,6 +1319,14 @@ options: services: - rgw with_legacy: true +- name: rgw_nfs_run_restore_threads + type: bool + level: advanced + desc: run objects' restore threads in librgw (default off) + default: false + services: + - rgw + with_legacy: true - name: rgw_nfs_run_sync_thread type: bool level: advanced diff --git a/src/common/subsys.h b/src/common/subsys.h index 419115c35ff7b..d756124f1a13e 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -66,6 +66,7 @@ SUBSYS(rgw_access, 1, 5) SUBSYS(rgw_dbstore, 1, 5) SUBSYS(rgw_flight, 1, 5) SUBSYS(rgw_lifecycle, 1, 5) +SUBSYS(rgw_restore, 1, 5) SUBSYS(rgw_notification, 1, 5) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 8f912ca8a869b..18f1213f89416 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -66,6 +66,7 @@ set(librgw_common_srcs rgw_ldap.cc rgw_lc.cc rgw_lc_s3.cc + rgw_restore.cc rgw_metadata.cc rgw_multi.cc rgw_multi_del.cc diff --git a/src/rgw/driver/daos/rgw_sal_daos.cc b/src/rgw/driver/daos/rgw_sal_daos.cc index f60c5a11208ed..c90a8770514ba 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.cc +++ b/src/rgw/driver/daos/rgw_sal_daos.cc @@ -1028,15 +1028,13 @@ int DaosObject::transition_to_cloud( int DaosObject::restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, RGWObjTier& tier_config, uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) + optional_yield y) { return DAOS_NOT_IMPLEMENTED_LOG(dpp); } @@ -2321,6 +2319,12 @@ std::unique_ptr DaosStore::get_lifecycle(void) { return 0; } +std::unique_ptr DaosStore::get_restore(const int n_objs, + const std::vector& obj_names) { + DAOS_NOT_IMPLEMENTED_LOG(nullptr); + return 0; +} + bool DaosStore::process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) { DAOS_NOT_IMPLEMENTED_LOG(nullptr); diff --git a/src/rgw/driver/daos/rgw_sal_daos.h b/src/rgw/driver/daos/rgw_sal_daos.h index 19cff1d579802..65ecbdcbb28c8 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.h +++ b/src/rgw/driver/daos/rgw_sal_daos.h @@ -654,15 +654,13 @@ class DaosObject : public StoreObject { optional_yield y) override; virtual int restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, RGWObjTier& tier_config, uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) override; + optional_yield y) override; virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override; virtual int dump_obj_layout(const DoutPrefixProvider* dpp, optional_yield y, @@ -937,6 +935,8 @@ class DaosStore : public StoreDriver { virtual std::string zone_unique_trans_id(const uint64_t unique_num) override; virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) override; virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override; virtual std::unique_ptr get_notification( rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, @@ -953,6 +953,7 @@ class DaosStore : public StoreDriver { std::string& _req_id, optional_yield y) override; virtual RGWLC* get_rgwlc(void) override { return NULL; } + virtual RGWRestore* get_rgwrestore(void) override { return NULL; } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } diff --git a/src/rgw/driver/motr/rgw_sal_motr.cc b/src/rgw/driver/motr/rgw_sal_motr.cc index dee118243f7f5..3cb654fe41bf4 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.cc +++ b/src/rgw/driver/motr/rgw_sal_motr.cc @@ -3335,6 +3335,11 @@ std::unique_ptr MotrStore::get_lifecycle(void) return 0; } +std::unique_ptr MotrStore::get_restore(const int n_objs, + const std::vector& obj_names) { + return 0; +} + bool MotrStore::process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) { diff --git a/src/rgw/driver/motr/rgw_sal_motr.h b/src/rgw/driver/motr/rgw_sal_motr.h index e95ab3252d6df..709b77c34a565 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.h +++ b/src/rgw/driver/motr/rgw_sal_motr.h @@ -1006,6 +1006,8 @@ class MotrStore : public StoreDriver { virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list& zone_ids) override; virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) override; virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override; virtual std::unique_ptr get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override; @@ -1020,6 +1022,7 @@ class MotrStore : public StoreDriver { std::string& _req_id, optional_yield y) override; virtual RGWLC* get_rgwlc(void) override { return NULL; } + virtual RGWRestore* get_rgwrestore(void) override { return NULL; } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info) override; diff --git a/src/rgw/driver/posix/rgw_sal_posix.cc b/src/rgw/driver/posix/rgw_sal_posix.cc index 9bd71a4ee3545..d72b38ee45bb8 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.cc +++ b/src/rgw/driver/posix/rgw_sal_posix.cc @@ -3071,15 +3071,11 @@ int POSIXObject::transition_to_cloud(Bucket* bucket, int POSIXObject::restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) + optional_yield y) { return -ERR_NOT_IMPLEMENTED; } diff --git a/src/rgw/driver/posix/rgw_sal_posix.h b/src/rgw/driver/posix/rgw_sal_posix.h index 0246b764aa214..fdf2b9d510fe8 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.h +++ b/src/rgw/driver/posix/rgw_sal_posix.h @@ -697,15 +697,11 @@ public: optional_yield y) override; virtual int restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) override; + optional_yield y) override; virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override; virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override; virtual int swift_versioning_restore(const ACLOwner& owner, const rgw_user& remote_user, bool& restored, diff --git a/src/rgw/driver/rados/rgw_lc_tier.cc b/src/rgw/driver/rados/rgw_lc_tier.cc index cf057f6fae8f5..c536d6fc10cad 100644 --- a/src/rgw/driver/rados/rgw_lc_tier.cc +++ b/src/rgw/driver/rados/rgw_lc_tier.cc @@ -260,6 +260,7 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, uint64_t& accounted_size, rgw::sal::Attrs& attrs, std::optional days, RGWZoneGroupTierS3Glacier& glacier_params, + bool& in_progress, void* cb) { RGWRESTConn::get_obj_params req_params; std::string target_obj_name; @@ -276,25 +277,23 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, target_obj_name += get_key_instance(tier_ctx.obj->get_key()); } - if (glacier_params.glacier_restore_tier_type != GlacierRestoreTierType::Expedited) { - //XXX: Supporting STANDARD tier type is still in WIP - ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: Only Expedited tier_type is supported " << dendl; - return -1; - } + if (!in_progress) { // first time. Send RESTORE req. - rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name)); + rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name)); + ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params); - ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params); - - ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << dest_obj << "returned ret = " << ret << dendl; - - if (ret < 0 ) { - ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl; - return ret; + ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl; + + if (ret < 0 ) { + ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl; + return ret; + } + in_progress = true; } // now send HEAD request and verify if restore is complete on glacier/tape endpoint - bool restore_in_progress = false; + static constexpr int MAX_RETRIES = 10; + uint32_t retries = 0; do { ret = rgw_cloud_tier_get_object(tier_ctx, true, headers, nullptr, etag, accounted_size, attrs, nullptr); @@ -304,8 +303,14 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, return ret; } - restore_in_progress = is_restore_in_progress(tier_ctx.dpp, headers); - } while(restore_in_progress); + in_progress = is_restore_in_progress(tier_ctx.dpp, headers); + + } while(retries++ < MAX_RETRIES && in_progress); + + if (in_progress) { + ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << " still in progress; returning " << dendl; + return 0; + } // now do the actual GET ret = rgw_cloud_tier_get_object(tier_ctx, false, headers, pset_mtime, etag, diff --git a/src/rgw/driver/rados/rgw_lc_tier.h b/src/rgw/driver/rados/rgw_lc_tier.h index 60006da89149d..20d6e0675d091 100644 --- a/src/rgw/driver/rados/rgw_lc_tier.h +++ b/src/rgw/driver/rados/rgw_lc_tier.h @@ -61,8 +61,9 @@ int rgw_cloud_tier_restore_object(RGWLCCloudTierCtx& tier_ctx, std::map& headers, real_time* pset_mtime, std::string& etag, uint64_t& accounted_size, rgw::sal::Attrs& attrs, - std::optional days, + std::optional days, RGWZoneGroupTierS3Glacier& glacier_params, + bool& in_progress, void* cb); int cloud_tier_restore(const DoutPrefixProvider *dpp, diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 0546040027540..452957c3130ba 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -38,6 +38,7 @@ #include "rgw_datalog.h" #include "rgw_putobj_processor.h" #include "rgw_lc_tier.h" +#include "rgw_restore.h" #include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_client.h" @@ -1145,6 +1146,12 @@ void RGWRados::finalize() rgw::notify::shutdown(); v1_topic_migration.stop(); } + + if (use_restore_thread) { + restore->stop_processor(); + } + delete restore; + restore = NULL; } /** @@ -1244,6 +1251,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (ret < 0) return ret; + ret = open_restore_pool_ctx(dpp); + if (ret < 0) + return ret; + ret = open_objexp_pool_ctx(dpp); if (ret < 0) return ret; @@ -1367,6 +1378,12 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (use_lc_thread) lc->start_processor(); + restore = new RGWRestore(); + restore->initialize(cct, this->driver); + + if (use_restore_thread) + restore->start_processor(); + quota_handler = RGWQuotaHandler::generate_handler(dpp, this->driver, quota_threads); bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : @@ -1488,6 +1505,11 @@ int RGWRados::open_lc_pool_ctx(const DoutPrefixProvider *dpp) return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().lc_pool, lc_pool_ctx, true, true); } +int RGWRados::open_restore_pool_ctx(const DoutPrefixProvider *dpp) +{ + return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().restore_pool, restore_pool_ctx, true, true); +} + int RGWRados::open_objexp_pool_ctx(const DoutPrefixProvider *dpp) { return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().log_pool, objexp_pool_ctx, true, true); @@ -5357,13 +5379,11 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, RGWObjectCtx& obj_ctx, RGWBucketInfo& dest_bucket_info, const rgw_obj& dest_obj, - rgw_placement_rule& dest_placement, RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider *dpp, - optional_yield y, - bool log_op){ + optional_yield y) { //XXX: read below from attrs .. check transition_obj() ACLOwner owner; @@ -5385,6 +5405,7 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, dest_obj_bi.key.instance.clear(); } + uint64_t olh_epoch = 0; // read it from attrs fetched from cloud below rgw::putobj::AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info, nullptr, owner, obj_ctx, dest_obj_bi, olh_epoch, tag, dpp, y, no_trace); @@ -5398,11 +5419,9 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, } boost::optional compressor; CompressorRef plugin; - dest_placement.storage_class = tier_ctx.restore_storage_class; + rgw_placement_rule dest_placement(dest_bucket_info.placement_rule, tier_ctx.restore_storage_class); RGWRadosPutObj cb(dpp, cct, plugin, compressor, &processor, progress_cb, progress_data, [&](map obj_attrs) { - // XXX: do we need filter() like in fetch_remote_obj() cb - dest_placement.inherit_from(dest_bucket_info.placement_rule); processor.set_tail_placement(dest_placement); ret = processor.prepare(rctx.y); @@ -5426,6 +5445,9 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, return ret; } + // For Permanent restore, `log_op` depends on flag set on the bucket->get_info().flags + bool log_op = (dest_bucket_info.flags & rgw::sal::FLAG_LOG_OP); + uint64_t accounted_size = 0; string etag; real_time set_mtime; @@ -5436,7 +5458,7 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, RGWZoneGroupTierS3Glacier& glacier_params = tier_config.tier_placement.s3_glacier; ret = rgw_cloud_tier_restore_object(tier_ctx, headers, &set_mtime, etag, accounted_size, - attrs, days, glacier_params, &cb); + attrs, days, glacier_params, in_progress, &cb); } else { ldpp_dout(dpp, 20) << "Fetching object:" << dest_obj << "from the cloud" << dendl; ret = rgw_cloud_tier_get_object(tier_ctx, false, headers, @@ -5449,6 +5471,11 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, return ret; } + if (in_progress) { + ldpp_dout(tier_ctx.dpp, 20) << "Restoring object:" << dest_obj << " still in progress; returning " << dendl; + return ret; + } + if (!cb_processed) { ldpp_dout(dpp, 20) << "Callback not processed, object:" << dest_obj << dendl; return -EIO; @@ -5474,6 +5501,8 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, std::optional olh_ep = ceph::parse(rgw_bl_str(aiter->second)); if (olh_ep) { olh_epoch = *olh_ep; + // update in tier_ctx too + tier_ctx.o.versioned_epoch = *olh_ep; } attrs.erase(aiter); } @@ -5553,6 +5582,8 @@ int RGWRados::restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl); ldpp_dout(dpp, 20) << "Permanent restore, object:" << dest_obj << dendl; } + // bucket->get_info().flags doesn't seem to be having rgw::sal::FLAG_LOG_OP // set by default. Hence set it explicity + // XXX: Do we need to check if sync is disabled for bucket. log_op = true; } diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index b2deaa553e343..a3103f2960cfa 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -358,6 +358,7 @@ class RGWRados int open_root_pool_ctx(const DoutPrefixProvider *dpp); int open_gc_pool_ctx(const DoutPrefixProvider *dpp); int open_lc_pool_ctx(const DoutPrefixProvider *dpp); + int open_restore_pool_ctx(const DoutPrefixProvider *dpp); int open_objexp_pool_ctx(const DoutPrefixProvider *dpp); int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); @@ -372,9 +373,11 @@ class RGWRados rgw::sal::RadosStore* driver{nullptr}; RGWGC* gc{nullptr}; RGWLC* lc{nullptr}; + RGWRestore* restore{nullptr}; RGWObjectExpirer* obj_expirer{nullptr}; bool use_gc_thread{false}; bool use_lc_thread{false}; + bool use_restore_thread{false}; bool quota_threads{false}; bool run_sync_thread{false}; bool run_reshard_thread{false}; @@ -449,6 +452,7 @@ protected: librados::IoCtx gc_pool_ctx; // .rgw.gc librados::IoCtx lc_pool_ctx; // .rgw.lc + librados::IoCtx restore_pool_ctx; // .rgw.restore librados::IoCtx objexp_pool_ctx; librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif @@ -500,6 +504,10 @@ public: return gc; } + RGWRestore *get_restore() { + return restore; + } + RGWRados& set_run_gc_thread(bool _use_gc_thread) { use_gc_thread = _use_gc_thread; return *this; @@ -510,6 +518,11 @@ public: return *this; } + RGWRados& set_run_restore_thread(bool _use_restore_thread) { + use_restore_thread = _use_restore_thread; + return *this; + } + RGWRados& set_run_quota_threads(bool _run_quota_threads) { quota_threads = _run_quota_threads; return *this; @@ -534,6 +547,10 @@ public: return &lc_pool_ctx; } + librados::IoCtx* get_restore_pool_ctx() { + return &restore_pool_ctx; + } + librados::IoCtx& get_notif_pool_ctx() { return notif_pool_ctx; } @@ -1258,13 +1275,11 @@ int restore_obj_from_cloud(RGWLCCloudTierCtx& tier_ctx, RGWObjectCtx& obj_ctx, RGWBucketInfo& dest_bucket_info, const rgw_obj& dest_obj, - rgw_placement_rule& dest_placement, RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider *dpp, - optional_yield y, - bool log_op = true); + optional_yield y); int check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y); diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 2bf0bdc972662..7568b84d30c79 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -60,6 +60,7 @@ #include "rgw_tools.h" #include "rgw_tracer.h" #include "rgw_zone.h" +#include "rgw_restore.h" #include "services/svc_bilog_rados.h" #include "services/svc_bi_rados.h" @@ -2025,6 +2026,12 @@ std::unique_ptr RadosStore::get_lifecycle(void) return std::make_unique(this); } +std::unique_ptr RadosStore::get_restore(const int n_objs, + const std::vector& obj_names) +{ + return std::make_unique(this, n_objs, obj_names); +} + bool RadosStore::process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) { @@ -3031,15 +3038,13 @@ int RadosObject::transition(Bucket* bucket, int RadosObject::restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, RGWObjTier& tier_config, uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) + optional_yield y) { /* init */ rgw::sal::RadosPlacementTier* rtier = static_cast(tier); @@ -3051,7 +3056,27 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, string bucket_name = rtier->get_rt().t.s3.target_path; const rgw::sal::ZoneGroup& zonegroup = store->get_zone()->get_zonegroup(); int ret = 0; - string src_storage_class = o.meta.storage_class; // or take src_placement also as input + + auto& attrs = get_attrs(); + RGWObjTier tier_config; + + auto attr_iter = attrs.find(RGW_ATTR_MANIFEST); + if (attr_iter != attrs.end()) { + RGWObjManifest m; + try { + using ceph::decode; + decode(m, attr_iter->second); + m.get_tier_config(&tier_config); + } catch (const buffer::end_of_buffer&) { + //empty manifest; it's not cloud-tiered + ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl; + return -EIO; + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl; + return -EIO; + } + } + // update tier_config in case tier params are updated tier_config.tier_placement = rtier->get_rt(); @@ -3060,11 +3085,22 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, "-cloud-bucket"; boost::algorithm::to_lower(bucket_name); } + + rgw_bucket_dir_entry ent; + ent.key.name = get_key().name; + ent.key.instance = get_key().instance; + ent.meta.accounted_size = ent.meta.size = get_obj_size(); + ent.meta.etag = "" ; + + if (!ent.key.instance.empty()) { // non-current versioned object + ent.flags |= rgw_bucket_dir_entry::FLAG_VER; + } + /* Create RGW REST connection */ S3RESTConn conn(cct, id, { endpoint }, key, zonegroup.get_id(), region, host_style); // save source cloudtier storage class - RGWLCCloudTierCtx tier_ctx(cct, dpp, o, store, bucket->get_info(), + RGWLCCloudTierCtx tier_ctx(cct, dpp, ent, store, bucket->get_info(), this, conn, bucket_name, rtier->get_rt().t.s3.target_storage_class); tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings; @@ -3074,46 +3110,34 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, tier_ctx.restore_storage_class = rtier->get_rt().restore_storage_class; tier_ctx.tier_type = rtier->get_rt().tier_type; - ldpp_dout(dpp, 20) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl; + ldpp_dout(dpp, 20) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl; if (days && days == 0) { - ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl; + ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl; return 0; } - // Note: For non-versioned objects, below should have already been set by the callers- - // o.current should be false; this(obj)->instance should have version-id. - - // set restore_status as RESTORE_ALREADY_IN_PROGRESS - ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreAlreadyInProgress); - if (ret < 0) { - ldpp_dout(dpp, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl; - return ret; - } - /* Restore object from the cloud endpoint. * All restore related status and attrs are set as part of object download to * avoid any races */ ret = store->getRados()->restore_obj_from_cloud(tier_ctx, *rados_ctx, - bucket->get_info(), get_obj(), placement_rule, - tier_config, - olh_epoch, days, dpp, y, flags & FLAG_LOG_OP); + bucket->get_info(), get_obj(), + tier_config, days, in_progress, dpp, y); if (ret < 0) { //failed to restore - ldpp_dout(dpp, 0) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl; - auto reset_ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreFailed); + ldpp_dout(dpp, 0) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl; rgw_placement_rule target_placement; target_placement.inherit_from(tier_ctx.bucket_info.placement_rule); target_placement.storage_class = tier->get_storage_class(); /* Reset HEAD object as CloudTiered */ - reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch, + int reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch, tier, tier_ctx.is_multipart_upload, target_placement, tier_ctx.obj); if (reset_ret < 0) { - ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl; + ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl; } return ret; } @@ -3200,21 +3224,6 @@ int RadosObject::transition_to_cloud(Bucket* bucket, return ret; } -int RadosObject::set_cloud_restore_status(const DoutPrefixProvider* dpp, - optional_yield y, - rgw::sal::RGWRestoreStatus restore_status) -{ - int ret = 0; - set_atomic(true); - - bufferlist bl; - using ceph::encode; - encode(restore_status, bl); - ret = modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false); - - return ret; -} - /* * If the object is restored temporarily and is expired, delete the data and * reset the HEAD object as cloud-transitioned. @@ -4642,6 +4651,208 @@ std::unique_ptr RadosLifecycle::get_serializer(const std::string& return std::make_unique(store, oid, lock_name, cookie); } +RadosRestoreSerializer::RadosRestoreSerializer(RadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) : + StoreRestoreSerializer(_oid), + ioctx(*store->getRados()->get_restore_pool_ctx()), + lock(lock_name) +{ + lock.set_cookie(cookie); +} + +int RadosRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) +{ + lock.set_duration(dur); + return lock.lock_exclusive((librados::IoCtx*)(&ioctx), oid); +} + +std::unique_ptr RadosRestore::get_serializer( + const std::string& lock_name, + const std::string& oid, + const std::string& cookie) +{ + return std::make_unique(store, oid, lock_name, cookie); +} + +int RadosRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y, + int index, const RGWRestoreEntry& entry) { + bufferlist bl; + + encode(entry, bl); + + auto ret = push(dpp, y, index, std::move(bl)); + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl; + return ret; + } + + return 0; +} + + +int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, + int index, const std::list& restore_entries) { + std::vector ent_list; + + for (auto& entry : restore_entries) { + bufferlist bl; + + encode(entry, bl); + ent_list.push_back(std::move(bl)); + + } + + int ret = push(dpp, y, index, std::move(ent_list)); + + if (ret < 0) { + ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl; + return ret; + } + + return 0; +} + +int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y, + int index, std::vector&& items) { + auto r = fifos[index].push(dpp, items, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to push to FIFO: " << obj_names[index] + << ": " << cpp_strerror(-r) << dendl; + } + return r; +} + +int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y, + int index, ceph::buffer::list&& bl) { + auto r = fifos[index].push(dpp, std::move(bl), y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to push to FIFO: " << obj_names[index] + << ": " << cpp_strerror(-r) << dendl; + } + return r; +} + +struct rgw_restore_fifo_entry { + std::string id; + ceph::real_time mtime; + RGWRestoreEntry entry; + rgw_restore_fifo_entry() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(mtime, bl); + encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(mtime, bl); + decode(entry, bl); + DECODE_FINISH(bl); + } + + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); +}; +WRITE_CLASS_ENCODER(rgw_restore_fifo_entry) + +int RadosRestore::list(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string& marker, std::string* out_marker, + uint32_t max_entries, std::vector& entries, + bool* truncated) +{ + std::vector restore_entries; + bool more = false; + + auto r = fifos[index].list(dpp, max_entries, marker, &restore_entries, &more, y); + + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to list FIFO: " << obj_names[index] + << ": " << cpp_strerror(-r) << dendl; + return r; + } + + entries.clear(); + + for (const auto& entry : restore_entries) { + rgw_restore_fifo_entry r_entry; + r_entry.id = entry.marker; + r_entry.mtime = entry.mtime; + + auto liter = entry.data.cbegin(); + try { + decode(r_entry.entry, liter); + } catch (const buffer::error& err) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": failed to decode restore entry: " + << err.what() << dendl; + return -EIO; + } + RGWRestoreEntry& e = r_entry.entry; + entries.push_back(std::move(e)); + } + + if (truncated) + *truncated = more; + + if (out_marker && !restore_entries.empty()) { + *out_marker = restore_entries.back().marker; + } + + return 0; +} + +int RadosRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) +{ + assert(index < num_objs); + + int ret = trim(dpp, y, index, marker); + return ret; +} + +int RadosRestore::trim(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) { + auto r = fifos[index].trim(dpp, marker, false, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to trim FIFO: " << obj_names[index] + << ": " << cpp_strerror(-r) << dendl; + } + + return r; +} + +std::string_view RadosRestore::max_marker() { + static const std::string mm = rgw::cls::fifo::marker::max().to_string(); + return std::string_view(mm); +} + +int RadosRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) { + std::vector restore_entries; + bool more = false; + + for (auto shard = 0u; shard < fifos.size(); ++shard) { + auto r = fifos[shard].list(dpp, 1, {}, &restore_entries, &more, y); + if (r < 0) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to list FIFO: " << obj_names[shard] + << ": " << cpp_strerror(-r) << dendl; + return r; + } + if (!restore_entries.empty()) { + return 0; + } + } + + return 1; +} + int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags) { return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types, res, obj_tags); diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index e91f9272ab30d..ff22de78acf62 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -27,6 +27,7 @@ #include "rgw_putobj_processor.h" #include "services/svc_tier_rados.h" #include "cls/lock/cls_lock_client.h" +#include "rgw_log_backing.h" namespace rgw { namespace sal { @@ -282,6 +283,8 @@ class RadosStore : public StoreDriver { virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list& zone_ids) override; virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) override; virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override; virtual std::unique_ptr get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override; virtual std::unique_ptr get_notification( @@ -343,6 +346,7 @@ class RadosStore : public StoreDriver { optional_yield y, const DoutPrefixProvider* dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } + virtual RGWRestore* get_rgwrestore(void) override { return rados->get_restore(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info, optional_yield y) override; @@ -640,15 +644,11 @@ class RadosObject : public StoreObject { optional_yield y) override; virtual int restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) override; + optional_yield y) override; virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override; virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override; @@ -948,6 +948,63 @@ public: const std::string& cookie) override; }; +class RadosRestoreSerializer : public StoreRestoreSerializer { + librados::IoCtx& ioctx; + ::rados::cls::lock::Lock lock; + +public: + RadosRestoreSerializer(RadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie); + + virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override; + virtual int unlock() override { + return lock.unlock(&ioctx, oid); + } +}; + +class RadosRestore : public Restore { + RadosStore* store; + const int num_objs; + const std::vector& obj_names; + librados::IoCtx& ioctx; + ceph::containers::tiny_vector fifos; + +public: + RadosRestore(RadosStore* _st, const int n_objs, const std::vector& o_names) : store(_st), + num_objs(n_objs), obj_names(o_names), + ioctx(*store->getRados()->get_restore_pool_ctx()), + fifos(num_objs, + [&](const size_t ix, auto emplacer) { + emplacer.emplace(ioctx, std::string(obj_names[ix])); + }) {} + + ~RadosRestore() override = default; + + virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, + int index, const RGWRestoreEntry& r_entry) override; + virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, + int index, const std::list& restore_entries) override; + virtual int list(const DoutPrefixProvider *dpp, optional_yield y, + int index, + const std::string& marker, std::string* out_marker, + uint32_t max_entries, std::vector& entries, + bool* truncated) override; + virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) override; + virtual std::unique_ptr get_serializer( + const std::string& lock_name, + const std::string& oid, + const std::string& cookie) override; + /** Below routines deal with actual FIFO */ + int is_empty(const DoutPrefixProvider *dpp, optional_yield y); + std::string_view max_marker(); + int trim(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker); + int push(const DoutPrefixProvider *dpp, optional_yield y, + int index, ceph::buffer::list&& bl); + int push(const DoutPrefixProvider *dpp, optional_yield y, + int index, std::vector&& items); +}; + class RadosNotification : public StoreNotification { RadosStore* store; /* XXX it feels incorrect to me that rgw::notify::reservation_t is diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index f68c96c5db467..2a79d9d421ef3 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -4635,6 +4635,7 @@ int main(int argc, const char **argv) false, false, false, + false, false, false, // No background tasks! null_yield, diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 82e48c62ae3fc..d3f616d252422 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -234,6 +234,10 @@ int rgw::AppMain::init_storage() (g_conf()->rgw_enable_lc_threads && ((!nfs) || (nfs && g_conf()->rgw_nfs_run_lc_threads))); + auto run_restore = + (g_conf()->rgw_enable_restore_threads && + ((!nfs) || (nfs && g_conf()->rgw_nfs_run_restore_threads))); + auto run_quota = (g_conf()->rgw_enable_quota_threads && ((!nfs) || (nfs && g_conf()->rgw_nfs_run_quota_threads))); @@ -250,6 +254,7 @@ int rgw::AppMain::init_storage() site, run_gc, run_lc, + run_restore, run_quota, run_sync, g_conf().get_val("rgw_dynamic_resharding"), diff --git a/src/rgw/rgw_lc.h b/src/rgw/rgw_lc.h index a813a7c28ac25..73305c6f19c2b 100644 --- a/src/rgw/rgw_lc.h +++ b/src/rgw/rgw_lc.h @@ -566,6 +566,7 @@ class RGWLC : public DoutPrefixProvider { CephContext *cct; rgw::sal::Driver* driver; std::unique_ptr sal_lc; + std::unique_ptr sal_restore; int max_objs{0}; std::string *obj_names{nullptr}; std::atomic down_flag = { false }; @@ -664,6 +665,7 @@ public: CephContext *get_cct() const override { return cct; } rgw::sal::Lifecycle* get_lc() const { return sal_lc.get(); } + rgw::sal::Restore* get_restore() const { return sal_restore.get(); } unsigned get_subsys() const; std::ostream& gen_prefix(std::ostream& out) const; diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index d5831b93fbfc1..d74de5ab076ba 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -104,7 +104,7 @@ int main(const int argc, const char **argv) exit(1); } - driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, true, null_yield, cfgstore.get()); + driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield, cfgstore.get()); if (!driver) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 03daa69350637..bce0653402b72 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -68,6 +68,7 @@ #include "rgw_iam_managed_policy.h" #include "rgw_bucket_sync.h" #include "rgw_bucket_logging.h" +#include "rgw_restore.h" #include "services/svc_zone.h" #include "services/svc_quota.h" @@ -986,13 +987,13 @@ void handle_replication_status_header( */ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Attrs& attrs, bool sync_cloudtiered, std::optional days, - bool restore_op, optional_yield y) + bool read_through, optional_yield y) { int op_ret = 0; ldpp_dout(dpp, 20) << "reached handle cloud tier " << dendl; auto attr_iter = attrs.find(RGW_ATTR_MANIFEST); if (attr_iter == attrs.end()) { - if (restore_op) { + if (!read_through) { op_ret = -ERR_INVALID_OBJECT_STATE; s->err.message = "only cloud tier object can be restored"; return op_ret; @@ -1005,7 +1006,7 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: decode(m, attr_iter->second); if (!m.is_tier_type_s3()) { ldpp_dout(dpp, 20) << "not a cloud tier object " << s->object->get_key().name << dendl; - if (restore_op) { + if (!read_through) { op_ret = -ERR_INVALID_OBJECT_STATE; s->err.message = "only cloud tier object can be restored"; return op_ret; @@ -1030,26 +1031,45 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: auto iter = bl.cbegin(); decode(restore_status, iter); } - if (attr_iter == attrs.end() || restore_status == rgw::sal::RGWRestoreStatus::RestoreFailed) { - // first time restore or previous restore failed - rgw::sal::Bucket* pbucket = NULL; - pbucket = s->bucket.get(); - + if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { + if (read_through) { + op_ret = -ERR_REQUEST_TIMEOUT; + ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; + s->err.message = "restore is still in progress"; + return op_ret; + } else { + // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS + return static_cast(rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress); + } + } else if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) { + // corresponds to CLOUD_RESTORED + return static_cast(rgw::sal::RGWRestoreStatus::CloudRestored); + } else { // first time restore or previous restore failed. + // Restore the object. std::unique_ptr tier; rgw_placement_rule target_placement; - target_placement.inherit_from(pbucket->get_placement_rule()); - attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS); + target_placement.inherit_from(s->bucket->get_placement_rule()); + auto attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS); if (attr_iter != attrs.end()) { target_placement.storage_class = attr_iter->second.to_str(); } op_ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier); - ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier" << op_ret << - " storage class " << target_placement.storage_class << dendl; if (op_ret < 0) { - s->err.message = "failed to restore object"; + ldpp_dout(dpp, -1) << "failed to fetch tier placement handle, ret = " << op_ret << dendl; return op_ret; + } else { + ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier for " << + " storage class " << target_placement.storage_class << dendl; } - if (!restore_op) { + + if (!tier->is_tier_type_s3()) { + ldpp_dout(dpp, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() << + " for storage class " << target_placement.storage_class << dendl; + s->err.message = "failed to restore object"; + return -EINVAL; + } + + if (read_through) { if (tier->allow_read_through()) { days = tier->get_read_through_restore_days(); } else { //read-through is not enabled @@ -1058,56 +1078,30 @@ int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal:: return op_ret; } } - // fill in the entry. XXX: Maybe we can avoid it by passing only necessary params - rgw_bucket_dir_entry ent; - ent.key.name = s->object->get_key().name; - ent.key.instance = s->object->get_key().instance; - ent.meta.accounted_size = ent.meta.size = s->obj_size; - ent.meta.etag = "" ; - uint64_t epoch = 0; - op_ret = get_system_versioning_params(s, &epoch, NULL); - if (!ent.key.instance.empty()) { // non-current versioned object - ent.flags |= rgw_bucket_dir_entry::FLAG_VER; - } - ldpp_dout(dpp, 20) << "getting versioning params tier placement handle cloud tier" << op_ret << dendl; - if (op_ret < 0) { - ldpp_dout(dpp, 20) << "failed to get versioning params, op_ret = " << op_ret << dendl; - s->err.message = "failed to restore object"; - return op_ret; - } - op_ret = s->object->restore_obj_from_cloud(pbucket, tier.get(), target_placement, ent, - s->cct, tier_config, epoch, - days, dpp, y, s->bucket->get_info().flags); + + op_ret = driver->get_rgwrestore()->restore_obj_from_cloud(s->bucket.get(), + s->object.get(), tier.get(), days, y); + if (op_ret < 0) { - ldpp_dout(dpp, 0) << "object " << ent.key.name << " fetching failed" << op_ret << dendl; + ldpp_dout(dpp, 0) << "Restore of object " << s->object->get_key() << " failed" << op_ret << dendl; s->err.message = "failed to restore object"; return op_ret; } - ldpp_dout(dpp, 20) << "object " << ent.key.name << " fetching succeed" << dendl; - /* Even if restore is complete the first read through request will return but actually downloaded - * object asyncronously. + + ldpp_dout(dpp, 20) << "Restore of object " << s->object->get_key() << " succeed" << dendl; + /* Even if restore is complete the first read through request will return + * but actually downloaded object asyncronously. */ - if (!restore_op) { //read-through + if (read_through) { //read-through op_ret = -ERR_REQUEST_TIMEOUT; ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; s->err.message = "restore is still in progress"; } return op_ret; - } else if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { - if (!restore_op) { - op_ret = -ERR_REQUEST_TIMEOUT; - ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl; - s->err.message = "restore is still in progress"; - return op_ret; - } else { - return 1; // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS - } - } else { - return 2; // corresponds to CLOUD_RESTORED } } catch (const buffer::end_of_buffer&) { //empty manifest; it's not cloud-tiered - if (restore_op) { + if (!read_through) { op_ret = -ERR_INVALID_OBJECT_STATE; s->err.message = "only cloud tier object can be restored"; } @@ -2590,7 +2584,7 @@ void RGWGetObj::execute(optional_yield y) if (get_type() == RGW_OP_GET_OBJ && get_data) { std::optional days; - op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, false, y); + op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, true, y); if (op_ret < 0) { ldpp_dout(this, 4) << "Cannot get cloud tiered object: " << *s->object <<". Failing with " << op_ret << dendl; diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index c3c8ea78d906a..18bc355e449a4 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -124,6 +124,7 @@ void RGWRealmReloader::reload() *env.site, cct->_conf->rgw_enable_gc_threads, cct->_conf->rgw_enable_lc_threads, + cct->_conf->rgw_enable_restore_threads, cct->_conf->rgw_enable_quota_threads, cct->_conf->rgw_run_sync_thread, cct->_conf.get_val("rgw_dynamic_resharding"), diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 893c4d66a75ea..51abea848ec89 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -565,41 +565,45 @@ int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs, } } } /* checksum_mode */ - auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE); - if (attr_iter != attrs.end()) { - rgw::sal::RGWRestoreType rt; - bufferlist bl = attr_iter->second; - auto iter = bl.cbegin(); - decode(rt, iter); + + rgw::sal::RGWRestoreStatus restore_status; + auto r_iter = attrs.find(RGW_ATTR_RESTORE_STATUS); + if (r_iter != attrs.end()) { + bufferlist rbl = r_iter->second; + auto iter = rbl.cbegin(); + decode(restore_status, iter); - rgw::sal::RGWRestoreStatus restore_status; - attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS); - if (attr_iter != attrs.end()) { - bufferlist bl = attr_iter->second; - auto iter = bl.cbegin(); - decode(restore_status, iter); - } - //restore status if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { - dump_header(s, "x-amz-restore", "ongoing-request=\"true\""); - } - if (rt == rgw::sal::RGWRestoreType::Temporary) { - auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE); - ceph::real_time expiration_date; - - if (expire_iter != attrs.end()) { - bufferlist bl = expire_iter->second; + dump_header(s, "x-amz-restore", "ongoing-request=\"true\""); + } else { + auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE); + if (attr_iter != attrs.end()) { + rgw::sal::RGWRestoreType rt; + bufferlist bl = attr_iter->second; auto iter = bl.cbegin(); - decode(expiration_date, iter); - } - //restore status - dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\""); - // temporary restore; set storage-class to cloudtier storage class - auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS); + decode(rt, iter); - if (c_iter != attrs.end()) { - attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second; + if (rt == rgw::sal::RGWRestoreType::Temporary) { + auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE); + ceph::real_time expiration_date; + + if (expire_iter != attrs.end()) { + bufferlist bl = expire_iter->second; + auto iter = bl.cbegin(); + decode(expiration_date, iter); + } + + //restore status + dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\""); + + // temporary restore; set storage-class to cloudtier storage class + auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS); + + if (c_iter != attrs.end()) { + attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second; + } + } } } } @@ -3670,35 +3674,40 @@ void RGWRestoreObj_ObjStore_S3::send_response() if (restore_ret == 0) { s->err.http_ret = 202; // OK - } else if (restore_ret == 1) { - restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS; - set_req_state_err(s, restore_ret); - dump_header(s, "x-amz-restore", "ongoing-request=\"true\""); - } else if (restore_ret == 2) { - rgw::sal::Attrs attrs; - ceph::real_time expiration_date; - rgw::sal::RGWRestoreType rt; - attrs = s->object->get_attrs(); - auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE); - auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE); - - if (expire_iter != attrs.end()) { - bufferlist bl = expire_iter->second; - auto iter = bl.cbegin(); - decode(expiration_date, iter); - } - - if (type_iter != attrs.end()) { - bufferlist bl = type_iter->second; - auto iter = bl.cbegin(); - decode(rt, iter); - } - if (rt == rgw::sal::RGWRestoreType::Temporary) { - s->err.http_ret = 200; // OK - dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\""); - } else { - s->err.http_ret = 200; - dump_header(s, "x-amz-restore", "ongoing-request=\"false\""); + } else { + rgw::sal::RGWRestoreStatus st = static_cast(restore_ret); + + if (st == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { + restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS; + set_req_state_err(s, restore_ret); + dump_header(s, "x-amz-restore", "ongoing-request=\"true\""); + } else if (st == rgw::sal::RGWRestoreStatus::CloudRestored) { + rgw::sal::Attrs attrs; + ceph::real_time expiration_date; + rgw::sal::RGWRestoreType rt; + attrs = s->object->get_attrs(); + auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE); + auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE); + + if (expire_iter != attrs.end()) { + bufferlist bl = expire_iter->second; + auto iter = bl.cbegin(); + decode(expiration_date, iter); + } + + if (type_iter != attrs.end()) { + bufferlist bl = type_iter->second; + auto iter = bl.cbegin(); + decode(rt, iter); + } + + if (rt == rgw::sal::RGWRestoreType::Temporary) { + s->err.http_ret = 200; // OK + dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\""); + } else { + s->err.http_ret = 200; + dump_header(s, "x-amz-restore", "ongoing-request=\"false\""); + } } } diff --git a/src/rgw/rgw_restore.cc b/src/rgw/rgw_restore.cc new file mode 100644 index 0000000000000..fb734b33010fb --- /dev/null +++ b/src/rgw/rgw_restore.cc @@ -0,0 +1,520 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "include/scope_guard.h" +#include "include/function2.hpp" +#include "common/Formatter.h" +#include "common/containers.h" +#include "common/split.h" +#include +#include "include/random.h" +#include "cls/lock/cls_lock_client.h" +#include "rgw_perf_counters.h" +#include "rgw_common.h" +#include "rgw_bucket.h" +#include "rgw_restore.h" +#include "rgw_zone.h" +#include "rgw_string.h" +#include "rgw_multi.h" +#include "rgw_sal.h" +#include "rgw_lc_tier.h" +#include "rgw_notify.h" +#include "common/dout.h" + +#include "fmt/format.h" + +#include "services/svc_sys_obj.h" +#include "services/svc_zone.h" +#include "services/svc_tier_rados.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rgw_restore + + +constexpr int32_t hours_in_a_day = 24; +constexpr int32_t secs_in_a_day = hours_in_a_day * 60 * 60; + +using namespace std; +using namespace rgw::sal; + +void RGWRestoreEntry::dump(Formatter *f) const +{ + encode_json("Bucket", bucket, f); + encode_json("Object", obj_key, f); + if (days) { + encode_json("Days", days, f); + } else { + encode_json("Days", 0, f); + } + encode_json("Zone_id", zone_id, f); + encode_json("Status", static_cast(status), f); +} + +void RGWRestoreEntry::decode_json(JSONObj *obj) +{ + JSONDecoder::decode_json("Bucket", bucket, obj); + JSONDecoder::decode_json("Object", obj_key, obj); + JSONDecoder::decode_json("Days", days, obj); + JSONDecoder::decode_json("Zone_id", zone_id, obj); + int st; + JSONDecoder::decode_json("Status", st, obj); + status = static_cast(st); +} + +void RGWRestoreEntry::generate_test_instances(std::list& l) +{ + auto p = new RGWRestoreEntry; + rgw_bucket bk("tenant1", "bucket1"); + rgw_obj_key obj("object1"); + rgw_obj_key obj2("object2"); + uint64_t days1 = 3; + std::optional days; + std::string zone_id = "zone1"; + rgw::sal::RGWRestoreStatus status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress; + + p->bucket = bk; + p->obj_key = obj; + p->zone_id = zone_id; + p->days = days; + p->status = status; + l.push_back(p); + + p = new RGWRestoreEntry; + days = days1; + status = rgw::sal::RGWRestoreStatus::CloudRestored; + p->bucket = bk; + p->obj_key = obj2; + p->zone_id = zone_id; + p->days = days; + p->status = status; + l.push_back(p); + + l.push_back(new RGWRestoreEntry); +} + +void RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { + cct = _cct; + driver = _driver; + + /* max_objs indicates the number of shards or objects + * used to store Restore Entries */ + max_objs = cct->_conf->rgw_restore_max_objs; + if (max_objs > HASH_PRIME) + max_objs = HASH_PRIME; + + for (int i = 0; i < max_objs; i++) { + obj_names.push_back(fmt::format("{}.{}", restore_oid_prefix, i)); + } + sal_restore = driver->get_restore(max_objs, obj_names); +} + +void RGWRestore::finalize() +{ + obj_names.clear(); +} + +static inline std::ostream& operator<<(std::ostream &os, RGWRestoreEntry& ent) { + os << ""; + + return os; +} + +void RGWRestore::RestoreWorker::stop() +{ + std::lock_guard l{lock}; + cond.notify_all(); +} + +bool RGWRestore::going_down() +{ + return down_flag; +} + +void RGWRestore::start_processor() +{ + worker = std::make_unique(this, cct, this); + worker->create("rgw_restore"); +} + +void RGWRestore::stop_processor() +{ + down_flag = true; + if (worker) { + worker->stop(); + worker->join(); + } + worker.reset(nullptr); +} + +unsigned RGWRestore::get_subsys() const +{ + return dout_subsys; +} + +std::ostream& RGWRestore::gen_prefix(std::ostream& out) const +{ + return out << "restore: "; +} + +/* Hash based on both */ +int RGWRestore::choose_oid(const RGWRestoreEntry& e) { + int index; + const auto& name = e.bucket.name + e.obj_key.name + e.obj_key.instance; + index = ((ceph_str_hash_linux(name.data(), name.size())) % max_objs); + return static_cast(index); +} + +void *RGWRestore::RestoreWorker::entry() { + do { + utime_t start = ceph_clock_now(); + int r = 0; + r = restore->process(this, null_yield); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: restore process() returned error r=" << r << dendl; + } + if (restore->going_down()) + break; + utime_t end = ceph_clock_now(); + end -= start; + int secs = cct->_conf->rgw_restore_processor_period; + + if (secs <= end.sec()) + continue; // next round + + secs -= end.sec(); + std::unique_lock locker{lock}; + cond.wait_for(locker, std::chrono::seconds(secs)); + } while (!restore->going_down()); + + return NULL; + +} + +int RGWRestore::process(RestoreWorker* worker, optional_yield y) +{ + int max_secs = cct->_conf->rgw_restore_lock_max_time; + + const int start = ceph::util::generate_random_number(0, max_objs - 1); + for (int i = 0; i < max_objs; i++) { + int index = (i + start) % max_objs; + int ret = process(index, max_secs, y); + if (ret < 0) + return ret; + } + return 0; +} + +/* + * Given an index, fetch a list of restore entries to process. After each + * iteration, trim the list to the last marker read. + * + * While processing the entries, if any of their restore operation is still in + * progress, such entries are added back to the list. + */ +int RGWRestore::process(int index, int max_secs, optional_yield y) +{ + ldpp_dout(this, 20) << "RGWRestore::process entered with Restore index_shard=" + << index << ", max_secs=" << max_secs << dendl; + + /* list used to gather still IN_PROGRESS */ + std::list r_entries; + + std::unique_ptr serializer = + sal_restore->get_serializer(std::string(restore_index_lock_name), + std::string(obj_names[index]), + worker->thr_name()); + utime_t end = ceph_clock_now(); + + /* max_secs should be greater than zero. We don't want a zero max_secs + * to be translated as no timeout, since we'd then need to break the + * lock and that would require a manual intervention. In this case + * we can just wait it out. */ + + if (max_secs <= 0) + return -EAGAIN; + + end += max_secs; + utime_t time(max_secs, 0); + int ret = serializer->try_lock(this, time, null_yield); + if (ret == -EBUSY || ret == -EEXIST) { + /* already locked by another lc processor */ + ldpp_dout(this, 0) << "RGWRestore::process() failed to acquire lock on " + << obj_names[index] << dendl; + return -EBUSY; + } + if (ret < 0) + return 0; + + std::unique_lock lock(*(serializer.get()), std::adopt_lock); + std::string marker; + std::string next_marker; + bool truncated = false; + + do { + int max = 100; + std::vector entries; + + ret = sal_restore->list(this, y, index, marker, &next_marker, max, entries, &truncated); + ldpp_dout(this, 20) << + "RGWRestore::process sal_restore->list returned with returned:" << ret << + ", entries.size=" << entries.size() << ", truncated=" << truncated << + ", next_marker='" << next_marker << "'" << dendl; + + if (entries.size() == 0) { + lock.unlock(); + return 0; + } + + if (ret < 0) + goto done; + + marker = next_marker; + std::vector::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + RGWRestoreEntry entry = *iter; + + ret = process_restore_entry(entry, y); + + if (entry.status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { + r_entries.push_back(entry); + } + + ///process all entries, trim and re-add + utime_t now = ceph_clock_now(); + if (now >= end) { + goto done; + } + + if (going_down()) { + // leave early, even if tag isn't removed, it's ok since it + // will be picked up next time around + goto done; + } + } + ret = sal_restore->trim_entries(this, y, index, marker); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWRestore::process() failed to trim entries on " + << obj_names[index] << dendl; + } + + if (!r_entries.empty()) { + ret = sal_restore->add_entries(this, y, index, r_entries); + if (ret < 0) { + ldpp_dout(this, 0) << "RGWRestore::process() failed to add entries on " + << obj_names[index] << dendl; + } + } + + r_entries.clear(); + } while (truncated); + +done: + lock.unlock(); + + return 0; +} + +int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y) +{ + int ret = 0; + std::unique_ptr bucket; + std::unique_ptr obj; + std::unique_ptr tier; + std::optional days = entry.days; + // Ensure its the same source zone processing temp entries as we do not + // replicate temp restored copies + if (days) { // temp copy + auto& zone_id = entry.zone_id; + if (driver->get_zone()->get_id() != zone_id) { + // XXX: Do we need to check if the zone is still valid + return 0; // skip processing this entry in this zone + } + } + + // fill in the details from entry + // bucket, obj, days, state=in_progress + ret = driver->load_bucket(this, entry.bucket, &bucket, null_yield); + if (ret < 0) { + ldpp_dout(this, 0) << "Restore:get_bucket for " << bucket->get_name() + << " failed" << dendl; + return ret; + } + obj = bucket->get_object(entry.obj_key); + + ret = obj->load_obj_state(this, null_yield, true); + + if (ret < 0) { + ldpp_dout(this, 0) << "Restore:get_object for " << entry.obj_key + << " failed" << dendl; + return ret; + } + + rgw_placement_rule target_placement; + target_placement.inherit_from(bucket->get_placement_rule()); + + auto& attrs = obj->get_attrs(); + auto attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS); + rgw::sal::RGWRestoreStatus restore_status = rgw::sal::RGWRestoreStatus::None; + if (attr_iter != attrs.end()) { + bufferlist bl = attr_iter->second; + auto iter = bl.cbegin(); + decode(restore_status, iter); + } + if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) { + // XXX: Check if expiry-date needs to be update + ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " already done" << dendl; + entry.status = rgw::sal::RGWRestoreStatus::CloudRestored; + return 0; + } + + attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS); + if (attr_iter != attrs.end()) { + target_placement.storage_class = attr_iter->second.to_str(); + } + ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier); + + if (ret < 0) { + ldpp_dout(this, -1) << "failed to fetch tier placement handle, ret = " << ret << dendl; + return ret; + } else { + ldpp_dout(this, 20) << "getting tier placement handle cloud tier for " << + " storage class " << target_placement.storage_class << dendl; + } + + if (!tier->is_tier_type_s3()) { + ldpp_dout(this, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() << + " for storage class " << target_placement.storage_class << dendl; + return -EINVAL; + } + + // now go ahead with restoring object + // XXX: first check if its already restored? + bool in_progress = true; + ret = obj->restore_obj_from_cloud(bucket.get(), tier.get(), cct, days, in_progress, + this, y); + if (ret < 0) { + ldpp_dout(this, -1) << "Restore of object(" << obj->get_key() << ") failed" << ret << dendl; + auto reset_ret = set_cloud_restore_status(this, obj.get(), y, rgw::sal::RGWRestoreStatus::RestoreFailed); + entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed; + if (reset_ret < 0) { + ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << obj->get_key() << ") " << reset_ret << dendl; + } + return ret; + } + + if (in_progress) { + ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " still in progress" << dendl; + entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress; + } else { + ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " succeeded" << dendl; + entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed; + } + return ret; +} + +time_t RGWRestore::thread_stop_at() +{ + uint64_t interval = (cct->_conf->rgw_restore_debug_interval > 0) + ? cct->_conf->rgw_restore_debug_interval : secs_in_a_day; + + return time(nullptr) + interval; +} + +int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp, + rgw::sal::Object* pobj, optional_yield y, + const rgw::sal::RGWRestoreStatus& restore_status) +{ + int ret = -1; + + if (!pobj) + return ret; + + pobj->set_atomic(); + + bufferlist bl; + using ceph::encode; + encode(restore_status, bl); + + ret = pobj->modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false); + + return ret; +} + +int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, + rgw::sal::Object* pobj, + rgw::sal::PlacementTier* tier, + std::optional days, optional_yield y) +{ + int ret = 0; + + if (!pbucket || !pobj) { + ldpp_dout(this, -1) << "ERROR: Invalid bucket/object. Restore failed" << dendl; + return -EINVAL; + } + + // set restore_status as RESTORE_ALREADY_IN_PROGRESS + ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress); + if (ret < 0) { + ldpp_dout(this, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << pobj->get_key() << " failed, ret=" << ret << dendl; + return ret; + } + + // now go ahead with restoring object + bool in_progress = false; + ret = pobj->restore_obj_from_cloud(pbucket, tier, cct, days, in_progress, this, y); + + if (ret < 0) { + ldpp_dout(this, 0) << "object " << pobj->get_key() << " fetching failed" << ret << dendl; + auto reset_ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreFailed); + + if (reset_ret < 0) { + ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << pobj->get_key() << ") " << reset_ret << dendl; + } + + return ret; + } + + ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl; + if (in_progress) { + // add restore entry to the list + RGWRestoreEntry entry; + entry.bucket = pbucket->get_key(); + entry.obj_key = pobj->get_key(); + entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress; + entry.days = days; + entry.zone_id = driver->get_zone()->get_id(); + + int index = choose_oid(entry); + ret = sal_restore->add_entry(this, y, index, entry); + + if (ret < 0) { + ldpp_dout(this, -1) << "Adding restore entry of object(" << pobj->get_key() << ") failed" << ret << dendl; + } + } + + ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl; + return ret; +} diff --git a/src/rgw/rgw_restore.h b/src/rgw/rgw_restore.h new file mode 100644 index 0000000000000..662745d240bd1 --- /dev/null +++ b/src/rgw/rgw_restore.h @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include +#include + +#include "common/debug.h" + +#include "include/types.h" +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include "common/Cond.h" +#include "common/iso_8601.h" +#include "common/Thread.h" +#include "rgw_common.h" +#include "cls/rgw/cls_rgw_types.h" +#include "rgw_sal.h" + +#include +#include + +#define HASH_PRIME 7877 +#define MAX_ID_LEN 255 +static constexpr std::string_view restore_oid_prefix = "restore"; +static constexpr std::string_view restore_index_lock_name = "restore_process"; + +/** Single Restore entry state */ +struct RGWRestoreEntry { + rgw_bucket bucket; + rgw_obj_key obj_key; + std::optional days; + std::string zone_id; // or should it be zone name? + rgw::sal::RGWRestoreStatus status; + + RGWRestoreEntry() {} + + void encode(ceph::buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(bucket, bl); + encode(obj_key, bl); + encode(days, bl); + encode(zone_id, bl); + encode(status, bl); + ENCODE_FINISH(bl); + } + + void decode(ceph::buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(bucket, bl); + decode(obj_key, bl); + decode(days, bl); + decode(zone_id, bl); + decode(status, bl); + DECODE_FINISH(bl); + } + void dump(ceph::Formatter* f) const; + void decode_json(JSONObj* obj); + static void generate_test_instances(std::list& l); +}; +WRITE_CLASS_ENCODER(RGWRestoreEntry) + +class RGWRestore : public DoutPrefixProvider { + CephContext *cct; + rgw::sal::Driver* driver; + std::unique_ptr sal_restore; + int max_objs{0}; + std::vector obj_names; + std::atomic down_flag = { false }; + + class RestoreWorker : public Thread + { + const DoutPrefixProvider *dpp; + CephContext *cct; + RGWRestore *restore; + ceph::mutex lock = ceph::make_mutex("RestoreWorker"); + ceph::condition_variable cond; + + public: + + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + + RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWRestore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {} + RGWRestore* get_restore() { return restore; } + std::string thr_name() { + return std::string{"restore_thrd: "}; // + std::to_string(ix); + } + void *entry() override; + void stop(); + + friend class RGWRados; + }; // RestoreWorker + + std::unique_ptr worker; + +public: + ~RGWRestore() { + stop_processor(); + finalize(); + } + + friend class RGWRados; + + RGWRestore() : cct(nullptr), driver(nullptr), max_objs(0) {} + + void initialize(CephContext *_cct, rgw::sal::Driver* _driver); + void finalize(); + + bool going_down(); + void start_processor(); + void stop_processor(); + + CephContext *get_cct() const override { return cct; } + rgw::sal::Restore* get_restore() const { return sal_restore.get(); } + unsigned get_subsys() const; + + std::ostream& gen_prefix(std::ostream& out) const; + + int process(RestoreWorker* worker, optional_yield y); + int choose_oid(const RGWRestoreEntry& e); + int process(int index, int max_secs, optional_yield y); + int process_restore_entry(RGWRestoreEntry& entry, optional_yield y); + time_t thread_stop_at(); + + /** Set the restore status for the given object */ + int set_cloud_restore_status(const DoutPrefixProvider* dpp, rgw::sal::Object* pobj, + optional_yield y, + const rgw::sal::RGWRestoreStatus& restore_status); + + /** Given , restore the object from the cloud-tier. In case the + * object cannot be restored immediately, save that restore state(/entry) + * to be procesed later by RestoreWorker thread. */ + int restore_obj_from_cloud(rgw::sal::Bucket* pbucket, rgw::sal::Object* pobj, + rgw::sal::PlacementTier* tier, + std::optional days, optional_yield y); +}; diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index c11b5a80207a6..786da6987b972 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -76,6 +76,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* const rgw::SiteConfig& site_config, bool use_gc_thread, bool use_lc_thread, + bool use_restore_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, @@ -97,6 +98,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_use_gc(use_gc) .set_run_gc_thread(use_gc_thread) .set_run_lc_thread(use_lc_thread) + .set_run_restore_thread(use_restore_thread) .set_run_quota_threads(quota_threads) .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) @@ -126,6 +128,7 @@ rgw::sal::Driver* DriverManager::init_storage_provider(const DoutPrefixProvider* .set_use_datacache(true) .set_run_gc_thread(use_gc_thread) .set_run_lc_thread(use_lc_thread) + .set_run_restore_thread(use_restore_thread) .set_run_quota_threads(quota_threads) .set_run_sync_thread(run_sync_thread) .set_run_reshard_thread(run_reshard_thread) diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 075f54d38fdbc..9373b4fb8d4b3 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -36,6 +36,7 @@ struct RGWBucketEnt; class RGWRESTMgr; class RGWLC; +class RGWRestore; struct rgw_user_bucket; class RGWUsageBatch; class RGWCoroutinesManagerRegistry; @@ -51,6 +52,7 @@ class RGWZonePlacementInfo; struct rgw_pubsub_topic; struct RGWOIDCProviderInfo; struct RGWRoleInfo; +struct RGWRestoreEntry; using RGWBucketListNameFilter = std::function; @@ -159,7 +161,7 @@ static constexpr uint32_t FLAG_PREVENT_VERSIONING = 0x0002; // delete object where head object is missing) static constexpr uint32_t FLAG_FORCE_OP = 0x0004; -enum RGWRestoreStatus : uint8_t { +enum class RGWRestoreStatus : uint8_t { None = 0, RestoreAlreadyInProgress = 1, CloudRestored = 2, @@ -472,6 +474,9 @@ class Driver { virtual int cluster_stat(RGWClusterStat& stats) = 0; /** Get a @a Lifecycle object. Used to manage/run lifecycle transitions */ virtual std::unique_ptr get_lifecycle(void) = 0; + /** Get a @a Restore object. Used to manage/run restore objects */ + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) = 0; /** Reset the temporarily restored objects which are expired */ virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) = 0; @@ -563,6 +568,8 @@ class Driver { const DoutPrefixProvider* dpp) = 0; /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; + /** Get access to the tier restore management thread */ + virtual RGWRestore* get_rgwrestore(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0; @@ -1265,15 +1272,11 @@ class Object { optional_yield y) = 0; virtual int restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) = 0; + optional_yield y) = 0; /** Check to see if two placement rules match */ virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) = 0; /** Dump driver-specific object layout info in JSON */ @@ -1667,6 +1670,53 @@ public: const std::string& cookie) = 0; }; +/** @brief Abstraction of a serializer for Restore + */ +class RestoreSerializer : public Serializer { +public: + RestoreSerializer() {} + virtual ~RestoreSerializer() = default; +}; + +/** + * @brief Abstraction for restore processing + * + * The Restore class is designed to manage the restoration of objects + * from cloud tier storage back into the Ceph cluster. This is particularly used + * for objects stored in cold storage solutions like AWS Glacier or Tape-based systems, + * where retrieval operations are asynchronous and can take a significant amount of time. + */ +class Restore { + +public: + Restore() = default; + virtual ~Restore() = default; + /** Add a single restore entry state */ + virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, + int index, const RGWRestoreEntry& r_entry) = 0; + /** Add list of restore entries */ + virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, + int index, const std::list& restore_entries) = 0; + /** List all known entries given a marker */ + virtual int list(const DoutPrefixProvider *dpp, optional_yield y, + int index, + const std::string& marker, std::string* out_marker, + uint32_t max_entries, std::vector& entries, + bool* truncated) = 0; + + /** Trim restore entries upto the marker */ + virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) = 0; + /* Check if the list is empty */ + virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0; + + /** Get a serializer for restore processing */ + virtual std::unique_ptr get_serializer( + const std::string& lock_name, + const std::string& oid, + const std::string& cookie) = 0; +}; + /** * @brief Abstraction for a Notification event * @@ -1885,6 +1935,7 @@ public: const rgw::SiteConfig& site_config, bool use_gc_thread, bool use_lc_thread, + bool use_restore_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, @@ -1898,6 +1949,7 @@ public: site_config, use_gc_thread, use_lc_thread, + use_restore_thread, quota_threads, run_sync_thread, run_reshard_thread, @@ -1926,6 +1978,7 @@ public: const rgw::SiteConfig& site_config, bool use_gc_thread, bool use_lc_thread, + bool use_restore_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index fae28f8438f09..077f9583831cd 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -1920,6 +1920,12 @@ namespace rgw::sal { return std::make_unique(store, oid, lock_name, cookie); } + std::unique_ptr DBStore::get_restore(const int n_objs, + const std::vector& obj_names) + { + return nullptr; + } + std::unique_ptr DBStore::get_notification( rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 8dd469c9c450d..8a9451ae79478 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -893,6 +893,8 @@ public: virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list& zone_ids) override; virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) override; virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override; virtual std::unique_ptr get_notification( @@ -925,6 +927,7 @@ public: const std::string& topic_queue) override; virtual RGWLC* get_rgwlc(void) override; + virtual RGWRestore* get_rgwrestore(void) override { return NULL; } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info, optional_yield y) override; virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override; diff --git a/src/rgw/rgw_sal_filter.cc b/src/rgw/rgw_sal_filter.cc index 627e36af66be1..62132df10b819 100644 --- a/src/rgw/rgw_sal_filter.cc +++ b/src/rgw/rgw_sal_filter.cc @@ -440,6 +440,13 @@ bool FilterDriver::process_expired_objects(const DoutPrefixProvider *dpp, return next->process_expired_objects(dpp, y); } +std::unique_ptr FilterDriver::get_restore(const int n_objs, + const std::vector& obj_names) +{ + std::unique_ptr restore = next->get_restore(n_objs, obj_names); + return std::make_unique(std::move(restore)); +} + std::unique_ptr FilterDriver::get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, @@ -487,6 +494,11 @@ RGWLC* FilterDriver::get_rgwlc() return next->get_rgwlc(); } +RGWRestore* FilterDriver::get_rgwrestore() +{ + return next->get_rgwrestore(); +} + RGWCoroutinesManagerRegistry* FilterDriver::get_cr_registry() { return next->get_cr_registry(); @@ -1135,18 +1147,14 @@ int FilterObject::transition_to_cloud(Bucket* bucket, int FilterObject::restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, - const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) + bool& in_progress, + const DoutPrefixProvider* dpp, + optional_yield y) { return next->restore_obj_from_cloud(nextBucket(bucket), nextPlacementTier(tier), - placement_rule, o, cct, tier_config, olh_epoch, days, dpp, y, flags); + cct, days, in_progress, dpp, y); } bool FilterObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) @@ -1449,6 +1457,49 @@ std::unique_ptr FilterLifecycle::get_serializer( return std::make_unique(std::move(ns)); } +int FilterRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, + optional_yield y) +{ + return next->try_lock(dpp, dur, y); +} + +std::unique_ptr FilterRestore::get_serializer(const std::string& lock_name, + const std::string& oid, + const std::string& cookie) { + std::unique_ptr ns; + ns = next->get_serializer(lock_name, oid, cookie); + return std::make_unique(std::move(ns)); +} + +int FilterRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y, + int index, const RGWRestoreEntry& r_entry) { + return next->add_entry(dpp, y, index, r_entry); +} + +int FilterRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, + int index, + const std::list& restore_entries) { + return next->add_entries(dpp, y, index, restore_entries); +} + +/** List all known entries */ +int FilterRestore::list(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string& marker, std::string* out_marker, + uint32_t max_entries, std::vector& entries, + bool* truncated) { + return next->list(dpp, y, index, marker, out_marker, max_entries, + entries, truncated); +} + +int FilterRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) { + return next->trim_entries(dpp, y, index, marker); +} + +int FilterRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) { + return next->is_empty(dpp, y); +} + int FilterNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags) { diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 7627921633c5b..79b08465ca7af 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -292,6 +292,8 @@ public: } virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; + virtual std::unique_ptr get_restore(const int n_objs, + const std::vector& obj_names) override; virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override; virtual std::unique_ptr get_notification(rgw::sal::Object* obj, @@ -383,6 +385,7 @@ public: return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp); } virtual RGWLC* get_rgwlc(void) override; + virtual RGWRestore* get_rgwrestore(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; virtual int log_usage(const DoutPrefixProvider *dpp, std::map days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) override; + optional_yield y) override; virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override; virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override; @@ -1059,6 +1058,51 @@ public: const std::string& cookie) override; }; +class FilterRestoreSerializer : public RestoreSerializer { + +protected: + std::unique_ptr next; + +public: + FilterRestoreSerializer(std::unique_ptr _next) : next(std::move(_next)) {} + virtual ~FilterRestoreSerializer() = default; + virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override; + virtual int unlock() override { return next->unlock(); } + virtual void print(std::ostream& out) const override { return next->print(out); } +}; + +class FilterRestore : public Restore { + +protected: + std::unique_ptr next; + +public: + FilterRestore(std::unique_ptr _next) : next(std::move(_next)) {} + ~FilterRestore() override = default; + + virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, + int index, const RGWRestoreEntry& r_entry) override; + virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, + int index, + const std::list& restore_entries) override; + + /** List all known entries */ + virtual int list(const DoutPrefixProvider *dpp, optional_yield y, + int index, + const std::string& marker, std::string* out_marker, + uint32_t max_entries, std::vector& entries, + bool* truncated) override; + virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, + int index, const std::string_view& marker) override; + virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y); + + /** Get a serializer for lifecycle */ + virtual std::unique_ptr get_serializer( + const std::string& lock_name, + const std::string& oid, + const std::string& cookie) override; +}; + class FilterNotification : public Notification { protected: std::unique_ptr next; diff --git a/src/rgw/rgw_sal_fwd.h b/src/rgw/rgw_sal_fwd.h index 566a933f8ca05..dbdcbf98b218d 100644 --- a/src/rgw/rgw_sal_fwd.h +++ b/src/rgw/rgw_sal_fwd.h @@ -39,6 +39,7 @@ namespace sal { class Object; class MultipartUpload; class Lifecycle; + class Restore; class Notification; class Writer; class PlacementTier; diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index da98eb41f997b..0364c84d1ac03 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -370,15 +370,11 @@ class StoreObject : public Object { } virtual int restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, - rgw_placement_rule& placement_rule, - rgw_bucket_dir_entry& o, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, + bool& in_progress, const DoutPrefixProvider* dpp, - optional_yield y, - uint32_t flags) override { + optional_yield y) override { return -1; } jspan_context& get_trace() override { return trace_ctx; } @@ -459,6 +455,20 @@ public: virtual void print(std::ostream& out) const override { out << oid; } }; +class StoreRestoreSerializer : public RestoreSerializer { + +protected: + std::string oid; + +public: + StoreRestoreSerializer() {} + StoreRestoreSerializer(std::string _oid) : oid(_oid) {} + + virtual ~StoreRestoreSerializer() = default; + virtual void print(std::ostream& out) const override { out << oid; } +}; + + class StoreNotification : public Notification { protected: Object* obj; diff --git a/src/rgw/rgw_zone.cc b/src/rgw/rgw_zone.cc index 07be31e3aba65..e2bac04ca66ca 100644 --- a/src/rgw/rgw_zone.cc +++ b/src/rgw/rgw_zone.cc @@ -171,6 +171,7 @@ void RGWZoneParams::decode_json(JSONObj *obj) JSONDecoder::decode_json("placement_pools", placement_pools, obj); JSONDecoder::decode_json("tier_config", tier_config, obj); JSONDecoder::decode_json("realm_id", realm_id, obj); + JSONDecoder::decode_json("restore_pool", restore_pool, obj); } void RGWZoneParams::dump(Formatter *f) const @@ -200,6 +201,7 @@ void RGWZoneParams::dump(Formatter *f) const encode_json("placement_pools", placement_pools, f); encode_json("tier_config", tier_config, f); encode_json("realm_id", realm_id, f); + encode_json("restore_pool", restore_pool, f); } rgw_pool RGWZoneParams::get_pool(CephContext *cct) const @@ -814,6 +816,7 @@ int init_zone_pool_names(const DoutPrefixProvider *dpp, optional_yield y, info.dedup_pool = fix_zone_pool_dup(pools, info.name, ".rgw.dedup", info.dedup_pool); info.gc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:gc", info.gc_pool); info.lc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:lc", info.lc_pool); + info.restore_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:restore", info.restore_pool); info.log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log", info.log_pool); info.intent_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:intent", info.intent_log_pool); info.usage_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:usage", info.usage_log_pool); diff --git a/src/rgw/rgw_zone.h b/src/rgw/rgw_zone.h index 3f177e02f3c4f..cec4e574ebc63 100644 --- a/src/rgw/rgw_zone.h +++ b/src/rgw/rgw_zone.h @@ -42,6 +42,8 @@ struct RGWZoneParams { JSONFormattable tier_config; + rgw_pool restore_pool; + RGWZoneParams() {} explicit RGWZoneParams(const std::string& _name) : name(_name){} RGWZoneParams(const rgw_zone_id& _id, const std::string& _name) : id(_id.id), name(_name) {} @@ -60,7 +62,7 @@ struct RGWZoneParams { const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const; void encode(bufferlist& bl) const { - ENCODE_START(16, 1, bl); + ENCODE_START(17, 1, bl); encode(domain_root, bl); encode(control_pool, bl); encode(gc_pool, bl); @@ -97,11 +99,12 @@ struct RGWZoneParams { encode(account_pool, bl); encode(group_pool, bl); encode(dedup_pool, bl); + encode(restore_pool, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { - DECODE_START(16, bl); + DECODE_START(17, bl); decode(domain_root, bl); decode(control_pool, bl); decode(gc_pool, bl); @@ -191,6 +194,11 @@ struct RGWZoneParams { } else { dedup_pool = name + ".rgw.dedup"; } + if (struct_v >= 17) { + decode(restore_pool, bl); + } else { + restore_pool = log_pool.name + ":restore"; + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/test/rgw/rgw_cr_test.cc b/src/test/rgw/rgw_cr_test.cc index f75f79132ebee..dc6d64e18ea20 100644 --- a/src/test/rgw/rgw_cr_test.cc +++ b/src/test/rgw/rgw_cr_test.cc @@ -350,7 +350,8 @@ int main(int argc, const char **argv) false, false, false, - true, true, null_yield, cfgstore.get(), + false, + true, true, null_yield, cfgstore.get(), false)); if (!store) { std::cerr << "couldn't init storage provider" << std::endl;