see_also:
- rgw_enable_gc_threads
- rgw_enable_lc_threads
+ - rgw_enable_restore_threads
with_legacy: true
- name: rgw_enable_gc_threads
type: bool
see_also:
- rgw_enable_quota_threads
- rgw_enable_lc_threads
+ - rgw_enable_restore_threads
with_legacy: true
- name: rgw_enable_lc_threads
type: bool
see_also:
- rgw_enable_gc_threads
- rgw_enable_quota_threads
+ - rgw_enable_restore_threads
+ with_legacy: true
+- name: rgw_enable_restore_threads
+ type: bool
+ level: advanced
+ desc: Enables the objects' restore maintenance thread.
+ long_desc: The objects restore maintenance thread is responsible for all the objects
+ restoration related maintenance work. The thread itself can be disabled, but in order
+ for the restore from the cloud to work correctly, at least one RGW in each zone needs
+ to have this thread running. Having the thread enabled on multiple RGW processes within
+ the same zone can spread some of the maintenance work between them.
+ default: true
+ services:
+ - rgw
+ see_also:
+ - rgw_enable_gc_threads
+ - rgw_enable_quota_threads
+ - rgw_enable_lc_threads
with_legacy: true
- name: rgw_data
type: str
services:
- rgw
with_legacy: true
+- name: rgw_restore_max_objs
+ type: int
+ level: advanced
+ desc: Number of shards for restore processing
+ long_desc: Number of RADOS objects to use for storing restore entries which are in progress. This affects concurrency of restore maintenance, as shards can be processed in parallel.
+ default: 32
+ services:
+ - rgw
+ with_legacy: true
+- name: rgw_restore_lock_max_time
+ type: int
+ level: dev
+ default: 90
+ services:
+ - rgw
+ see_also:
+ with_legacy: true
+- name: rgw_restore_processor_period
+ type: int
+ level: advanced
+ desc: Restore cycle run time
+ long_desc: The amount of time between the start of consecutive runs of the restore
+ processing threads. If the thread runs takes more than this period, it will
+ not wait before running again.
+ fmt_desc: The cycle time for restore state processing.
+ default: 15_min
+ services:
+ - rgw
+ with_legacy: true
- name: rgw_mp_lock_max_time
type: int
level: advanced
services:
- rgw
with_legacy: true
+- name: rgw_nfs_run_restore_threads
+ type: bool
+ level: advanced
+ desc: run objects' restore threads in librgw (default off)
+ default: false
+ services:
+ - rgw
+ with_legacy: true
- name: rgw_nfs_run_sync_thread
type: bool
level: advanced
SUBSYS(rgw_dbstore, 1, 5)
SUBSYS(rgw_flight, 1, 5)
SUBSYS(rgw_lifecycle, 1, 5)
+SUBSYS(rgw_restore, 1, 5)
SUBSYS(rgw_notification, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
rgw_ldap.cc
rgw_lc.cc
rgw_lc_s3.cc
+ rgw_restore.cc
rgw_metadata.cc
rgw_multi.cc
rgw_multi_del.cc
int DaosObject::restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
RGWObjTier& tier_config,
uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags)
+ optional_yield y)
{
return DAOS_NOT_IMPLEMENTED_LOG(dpp);
}
return 0;
}
+std::unique_ptr<Restore> DaosStore::get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) {
+ DAOS_NOT_IMPLEMENTED_LOG(nullptr);
+ return 0;
+}
+
bool DaosStore::process_expired_objects(const DoutPrefixProvider *dpp,
optional_yield y) {
DAOS_NOT_IMPLEMENTED_LOG(nullptr);
optional_yield y) override;
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
RGWObjTier& tier_config,
uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) override;
+ optional_yield y) override;
virtual bool placement_rules_match(rgw_placement_rule& r1,
rgw_placement_rule& r2) override;
virtual int dump_obj_layout(const DoutPrefixProvider* dpp, optional_yield y,
virtual std::string zone_unique_trans_id(const uint64_t unique_num) override;
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) override;
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual std::unique_ptr<Notification> get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
std::string& _req_id,
optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return NULL; }
+ virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override {
return NULL;
}
return 0;
}
+std::unique_ptr<Restore> MotrStore::get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) {
+ return 0;
+}
+
bool MotrStore::process_expired_objects(const DoutPrefixProvider *dpp,
optional_yield y)
{
virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) override;
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj,
req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
std::string& _req_id,
optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return NULL; }
+ virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
int POSIXObject::restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags)
+ optional_yield y)
{
return -ERR_NOT_IMPLEMENTED;
}
optional_yield y) override;
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) override;
+ optional_yield y) override;
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override;
virtual int swift_versioning_restore(const ACLOwner& owner, const rgw_user& remote_user, bool& restored,
uint64_t& accounted_size, rgw::sal::Attrs& attrs,
std::optional<uint64_t> days,
RGWZoneGroupTierS3Glacier& glacier_params,
+ bool& in_progress,
void* cb) {
RGWRESTConn::get_obj_params req_params;
std::string target_obj_name;
target_obj_name += get_key_instance(tier_ctx.obj->get_key());
}
- if (glacier_params.glacier_restore_tier_type != GlacierRestoreTierType::Expedited) {
- //XXX: Supporting STANDARD tier type is still in WIP
- ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: Only Expedited tier_type is supported " << dendl;
- return -1;
- }
+ if (!in_progress) { // first time. Send RESTORE req.
- rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
+ rgw_obj dest_obj(dest_bucket, rgw_obj_key(target_obj_name));
+ ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
- ret = cloud_tier_restore(tier_ctx.dpp, tier_ctx.conn, dest_obj, days, glacier_params);
-
- ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << dest_obj << "returned ret = " << ret << dendl;
-
- if (ret < 0 ) {
- ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl;
- return ret;
+ ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << "returned ret = " << ret << dendl;
+
+ if (ret < 0 ) {
+ ldpp_dout(tier_ctx.dpp, -1) << __func__ << "ERROR: failed to restore object=" << dest_obj << "; ret = " << ret << dendl;
+ return ret;
+ }
+ in_progress = true;
}
// now send HEAD request and verify if restore is complete on glacier/tape endpoint
- bool restore_in_progress = false;
+ static constexpr int MAX_RETRIES = 10;
+ uint32_t retries = 0;
do {
ret = rgw_cloud_tier_get_object(tier_ctx, true, headers, nullptr, etag,
accounted_size, attrs, nullptr);
return ret;
}
- restore_in_progress = is_restore_in_progress(tier_ctx.dpp, headers);
- } while(restore_in_progress);
+ in_progress = is_restore_in_progress(tier_ctx.dpp, headers);
+
+ } while(retries++ < MAX_RETRIES && in_progress);
+
+ if (in_progress) {
+ ldpp_dout(tier_ctx.dpp, 20) << __func__ << "Restoring object=" << target_obj_name << " still in progress; returning " << dendl;
+ return 0;
+ }
// now do the actual GET
ret = rgw_cloud_tier_get_object(tier_ctx, false, headers, pset_mtime, etag,
std::map<std::string, std::string>& headers,
real_time* pset_mtime, std::string& etag,
uint64_t& accounted_size, rgw::sal::Attrs& attrs,
- std::optional<uint64_t> days,
+ std::optional<uint64_t> days,
RGWZoneGroupTierS3Glacier& glacier_params,
+ bool& in_progress,
void* cb);
int cloud_tier_restore(const DoutPrefixProvider *dpp,
#include "rgw_datalog.h"
#include "rgw_putobj_processor.h"
#include "rgw_lc_tier.h"
+#include "rgw_restore.h"
#include "cls/rgw/cls_rgw_ops.h"
#include "cls/rgw/cls_rgw_client.h"
rgw::notify::shutdown();
v1_topic_migration.stop();
}
+
+ if (use_restore_thread) {
+ restore->stop_processor();
+ }
+ delete restore;
+ restore = NULL;
}
/**
if (ret < 0)
return ret;
+ ret = open_restore_pool_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
ret = open_objexp_pool_ctx(dpp);
if (ret < 0)
return ret;
if (use_lc_thread)
lc->start_processor();
+ restore = new RGWRestore();
+ restore->initialize(cct, this->driver);
+
+ if (use_restore_thread)
+ restore->start_processor();
+
quota_handler = RGWQuotaHandler::generate_handler(dpp, this->driver, quota_threads);
bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().lc_pool, lc_pool_ctx, true, true);
}
+int RGWRados::open_restore_pool_ctx(const DoutPrefixProvider *dpp)
+{
+ return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().restore_pool, restore_pool_ctx, true, true);
+}
+
int RGWRados::open_objexp_pool_ctx(const DoutPrefixProvider *dpp)
{
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().log_pool, objexp_pool_ctx, true, true);
RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
const rgw_obj& dest_obj,
- rgw_placement_rule& dest_placement,
RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider *dpp,
- optional_yield y,
- bool log_op){
+ optional_yield y) {
//XXX: read below from attrs .. check transition_obj()
ACLOwner owner;
dest_obj_bi.key.instance.clear();
}
+ uint64_t olh_epoch = 0; // read it from attrs fetched from cloud below
rgw::putobj::AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info, nullptr,
owner, obj_ctx, dest_obj_bi, olh_epoch, tag, dpp, y, no_trace);
}
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
- dest_placement.storage_class = tier_ctx.restore_storage_class;
+ rgw_placement_rule dest_placement(dest_bucket_info.placement_rule, tier_ctx.restore_storage_class);
RGWRadosPutObj cb(dpp, cct, plugin, compressor, &processor, progress_cb, progress_data,
[&](map<string, bufferlist> obj_attrs) {
- // XXX: do we need filter() like in fetch_remote_obj() cb
- dest_placement.inherit_from(dest_bucket_info.placement_rule);
processor.set_tail_placement(dest_placement);
ret = processor.prepare(rctx.y);
return ret;
}
+ // For Permanent restore, `log_op` depends on flag set on the bucket->get_info().flags
+ bool log_op = (dest_bucket_info.flags & rgw::sal::FLAG_LOG_OP);
+
uint64_t accounted_size = 0;
string etag;
real_time set_mtime;
RGWZoneGroupTierS3Glacier& glacier_params = tier_config.tier_placement.s3_glacier;
ret = rgw_cloud_tier_restore_object(tier_ctx, headers,
&set_mtime, etag, accounted_size,
- attrs, days, glacier_params, &cb);
+ attrs, days, glacier_params, in_progress, &cb);
} else {
ldpp_dout(dpp, 20) << "Fetching object:" << dest_obj << "from the cloud" << dendl;
ret = rgw_cloud_tier_get_object(tier_ctx, false, headers,
return ret;
}
+ if (in_progress) {
+ ldpp_dout(tier_ctx.dpp, 20) << "Restoring object:" << dest_obj << " still in progress; returning " << dendl;
+ return ret;
+ }
+
if (!cb_processed) {
ldpp_dout(dpp, 20) << "Callback not processed, object:" << dest_obj << dendl;
return -EIO;
std::optional<uint64_t> olh_ep = ceph::parse<uint64_t>(rgw_bl_str(aiter->second));
if (olh_ep) {
olh_epoch = *olh_ep;
+ // update in tier_ctx too
+ tier_ctx.o.versioned_epoch = *olh_ep;
}
attrs.erase(aiter);
}
attrs[RGW_ATTR_RESTORE_TYPE] = std::move(bl);
ldpp_dout(dpp, 20) << "Permanent restore, object:" << dest_obj << dendl;
}
+ // bucket->get_info().flags doesn't seem to be having rgw::sal::FLAG_LOG_OP // set by default. Hence set it explicity
+ // XXX: Do we need to check if sync is disabled for bucket.
log_op = true;
}
int open_root_pool_ctx(const DoutPrefixProvider *dpp);
int open_gc_pool_ctx(const DoutPrefixProvider *dpp);
int open_lc_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_restore_pool_ctx(const DoutPrefixProvider *dpp);
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
rgw::sal::RadosStore* driver{nullptr};
RGWGC* gc{nullptr};
RGWLC* lc{nullptr};
+ RGWRestore* restore{nullptr};
RGWObjectExpirer* obj_expirer{nullptr};
bool use_gc_thread{false};
bool use_lc_thread{false};
+ bool use_restore_thread{false};
bool quota_threads{false};
bool run_sync_thread{false};
bool run_reshard_thread{false};
librados::IoCtx gc_pool_ctx; // .rgw.gc
librados::IoCtx lc_pool_ctx; // .rgw.lc
+ librados::IoCtx restore_pool_ctx; // .rgw.restore
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
return gc;
}
+ RGWRestore *get_restore() {
+ return restore;
+ }
+
RGWRados& set_run_gc_thread(bool _use_gc_thread) {
use_gc_thread = _use_gc_thread;
return *this;
return *this;
}
+ RGWRados& set_run_restore_thread(bool _use_restore_thread) {
+ use_restore_thread = _use_restore_thread;
+ return *this;
+ }
+
RGWRados& set_run_quota_threads(bool _run_quota_threads) {
quota_threads = _run_quota_threads;
return *this;
return &lc_pool_ctx;
}
+ librados::IoCtx* get_restore_pool_ctx() {
+ return &restore_pool_ctx;
+ }
+
librados::IoCtx& get_notif_pool_ctx() {
return notif_pool_ctx;
}
RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
const rgw_obj& dest_obj,
- rgw_placement_rule& dest_placement,
RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider *dpp,
- optional_yield y,
- bool log_op = true);
+ optional_yield y);
int check_bucket_empty(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, optional_yield y);
#include "rgw_tools.h"
#include "rgw_tracer.h"
#include "rgw_zone.h"
+#include "rgw_restore.h"
#include "services/svc_bilog_rados.h"
#include "services/svc_bi_rados.h"
return std::make_unique<RadosLifecycle>(this);
}
+std::unique_ptr<Restore> RadosStore::get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names)
+{
+ return std::make_unique<RadosRestore>(this, n_objs, obj_names);
+}
+
bool RadosStore::process_expired_objects(const DoutPrefixProvider *dpp,
optional_yield y)
{
int RadosObject::restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
RGWObjTier& tier_config,
uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags)
+ optional_yield y)
{
/* init */
rgw::sal::RadosPlacementTier* rtier = static_cast<rgw::sal::RadosPlacementTier*>(tier);
string bucket_name = rtier->get_rt().t.s3.target_path;
const rgw::sal::ZoneGroup& zonegroup = store->get_zone()->get_zonegroup();
int ret = 0;
- string src_storage_class = o.meta.storage_class; // or take src_placement also as input
+
+ auto& attrs = get_attrs();
+ RGWObjTier tier_config;
+
+ auto attr_iter = attrs.find(RGW_ATTR_MANIFEST);
+ if (attr_iter != attrs.end()) {
+ RGWObjManifest m;
+ try {
+ using ceph::decode;
+ decode(m, attr_iter->second);
+ m.get_tier_config(&tier_config);
+ } catch (const buffer::end_of_buffer&) {
+ //empty manifest; it's not cloud-tiered
+ ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
+ return -EIO;
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
+ return -EIO;
+ }
+ }
+
// update tier_config in case tier params are updated
tier_config.tier_placement = rtier->get_rt();
"-cloud-bucket";
boost::algorithm::to_lower(bucket_name);
}
+
+ rgw_bucket_dir_entry ent;
+ ent.key.name = get_key().name;
+ ent.key.instance = get_key().instance;
+ ent.meta.accounted_size = ent.meta.size = get_obj_size();
+ ent.meta.etag = "" ;
+
+ if (!ent.key.instance.empty()) { // non-current versioned object
+ ent.flags |= rgw_bucket_dir_entry::FLAG_VER;
+ }
+
/* Create RGW REST connection */
S3RESTConn conn(cct, id, { endpoint }, key, zonegroup.get_id(), region, host_style);
// save source cloudtier storage class
- RGWLCCloudTierCtx tier_ctx(cct, dpp, o, store, bucket->get_info(),
+ RGWLCCloudTierCtx tier_ctx(cct, dpp, ent, store, bucket->get_info(),
this, conn, bucket_name,
rtier->get_rt().t.s3.target_storage_class);
tier_ctx.acl_mappings = rtier->get_rt().t.s3.acl_mappings;
tier_ctx.restore_storage_class = rtier->get_rt().restore_storage_class;
tier_ctx.tier_type = rtier->get_rt().tier_type;
- ldpp_dout(dpp, 20) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl;
+ ldpp_dout(dpp, 20) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl;
if (days && days == 0) {
- ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl;
+ ldpp_dout(dpp, 0) << "Days = 0 not valid; Not restoring object (" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl;
return 0;
}
- // Note: For non-versioned objects, below should have already been set by the callers-
- // o.current should be false; this(obj)->instance should have version-id.
-
- // set restore_status as RESTORE_ALREADY_IN_PROGRESS
- ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreAlreadyInProgress);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
- return ret;
- }
-
/* Restore object from the cloud endpoint.
* All restore related status and attrs are set as part of object download to
* avoid any races */
ret = store->getRados()->restore_obj_from_cloud(tier_ctx, *rados_ctx,
- bucket->get_info(), get_obj(), placement_rule,
- tier_config,
- olh_epoch, days, dpp, y, flags & FLAG_LOG_OP);
+ bucket->get_info(), get_obj(),
+ tier_config, days, in_progress, dpp, y);
if (ret < 0) { //failed to restore
- ldpp_dout(dpp, 0) << "Restoring object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
- auto reset_ret = set_cloud_restore_status(dpp, y, RGWRestoreStatus::RestoreFailed);
+ ldpp_dout(dpp, 0) << "Restoring object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << ret << dendl;
rgw_placement_rule target_placement;
target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
target_placement.storage_class = tier->get_storage_class();
/* Reset HEAD object as CloudTiered */
- reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch,
+ int reset_ret = write_cloud_tier(dpp, y, tier_ctx.o.versioned_epoch,
tier, tier_ctx.is_multipart_upload,
target_placement, tier_ctx.obj);
if (reset_ret < 0) {
- ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << o.key << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl;
+ ldpp_dout(dpp, 0) << " Reset to cloud_tier of object(" << get_key() << ") from the cloud endpoint(" << endpoint << ") failed, ret=" << reset_ret << dendl;
}
return ret;
}
return ret;
}
-int RadosObject::set_cloud_restore_status(const DoutPrefixProvider* dpp,
- optional_yield y,
- rgw::sal::RGWRestoreStatus restore_status)
-{
- int ret = 0;
- set_atomic(true);
-
- bufferlist bl;
- using ceph::encode;
- encode(restore_status, bl);
- ret = modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false);
-
- return ret;
-}
-
/*
* If the object is restored temporarily and is expired, delete the data and
* reset the HEAD object as cloud-transitioned.
return std::make_unique<LCRadosSerializer>(store, oid, lock_name, cookie);
}
+RadosRestoreSerializer::RadosRestoreSerializer(RadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) :
+ StoreRestoreSerializer(_oid),
+ ioctx(*store->getRados()->get_restore_pool_ctx()),
+ lock(lock_name)
+{
+ lock.set_cookie(cookie);
+}
+
+int RadosRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y)
+{
+ lock.set_duration(dur);
+ return lock.lock_exclusive((librados::IoCtx*)(&ioctx), oid);
+}
+
+std::unique_ptr<RestoreSerializer> RadosRestore::get_serializer(
+ const std::string& lock_name,
+ const std::string& oid,
+ const std::string& cookie)
+{
+ return std::make_unique<RadosRestoreSerializer>(store, oid, lock_name, cookie);
+}
+
+int RadosRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const RGWRestoreEntry& entry) {
+ bufferlist bl;
+
+ encode(entry, bl);
+
+ auto ret = push(dpp, y, index, std::move(bl));
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+
+int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const std::list<RGWRestoreEntry>& restore_entries) {
+ std::vector<ceph::buffer::list> ent_list;
+
+ for (auto& entry : restore_entries) {
+ bufferlist bl;
+
+ encode(entry, bl);
+ ent_list.push_back(std::move(bl));
+
+ }
+
+ int ret = push(dpp, y, index, std::move(ent_list));
+
+ if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, std::vector<ceph::buffer::list>&& items) {
+ auto r = fifos[index].push(dpp, items, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to push to FIFO: " << obj_names[index]
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ return r;
+}
+
+int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, ceph::buffer::list&& bl) {
+ auto r = fifos[index].push(dpp, std::move(bl), y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to push to FIFO: " << obj_names[index]
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+ return r;
+}
+
+struct rgw_restore_fifo_entry {
+ std::string id;
+ ceph::real_time mtime;
+ RGWRestoreEntry entry;
+ rgw_restore_fifo_entry() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(mtime, bl);
+ encode(entry, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(mtime, bl);
+ decode(entry, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+};
+WRITE_CLASS_ENCODER(rgw_restore_fifo_entry)
+
+int RadosRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string& marker, std::string* out_marker,
+ uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ bool* truncated)
+{
+ std::vector<rgw::cls::fifo::list_entry> restore_entries;
+ bool more = false;
+
+ auto r = fifos[index].list(dpp, max_entries, marker, &restore_entries, &more, y);
+
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to list FIFO: " << obj_names[index]
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ entries.clear();
+
+ for (const auto& entry : restore_entries) {
+ rgw_restore_fifo_entry r_entry;
+ r_entry.id = entry.marker;
+ r_entry.mtime = entry.mtime;
+
+ auto liter = entry.data.cbegin();
+ try {
+ decode(r_entry.entry, liter);
+ } catch (const buffer::error& err) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": failed to decode restore entry: "
+ << err.what() << dendl;
+ return -EIO;
+ }
+ RGWRestoreEntry& e = r_entry.entry;
+ entries.push_back(std::move(e));
+ }
+
+ if (truncated)
+ *truncated = more;
+
+ if (out_marker && !restore_entries.empty()) {
+ *out_marker = restore_entries.back().marker;
+ }
+
+ return 0;
+}
+
+int RadosRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker)
+{
+ assert(index < num_objs);
+
+ int ret = trim(dpp, y, index, marker);
+ return ret;
+}
+
+int RadosRestore::trim(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker) {
+ auto r = fifos[index].trim(dpp, marker, false, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to trim FIFO: " << obj_names[index]
+ << ": " << cpp_strerror(-r) << dendl;
+ }
+
+ return r;
+}
+
+std::string_view RadosRestore::max_marker() {
+ static const std::string mm = rgw::cls::fifo::marker::max().to_string();
+ return std::string_view(mm);
+}
+
+int RadosRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
+ std::vector<rgw::cls::fifo::list_entry> restore_entries;
+ bool more = false;
+
+ for (auto shard = 0u; shard < fifos.size(); ++shard) {
+ auto r = fifos[shard].list(dpp, 1, {}, &restore_entries, &more, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to list FIFO: " << obj_names[shard]
+ << ": " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+ if (!restore_entries.empty()) {
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags)
{
return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types, res, obj_tags);
#include "rgw_putobj_processor.h"
#include "services/svc_tier_rados.h"
#include "cls/lock/cls_lock_client.h"
+#include "rgw_log_backing.h"
namespace rgw { namespace sal {
virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) override;
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
optional_yield y,
const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
+ virtual RGWRestore* get_rgwrestore(void) override { return rados->get_restore(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
optional_yield y) override;
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) override;
+ optional_yield y) override;
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) override;
const std::string& cookie) override;
};
+class RadosRestoreSerializer : public StoreRestoreSerializer {
+ librados::IoCtx& ioctx;
+ ::rados::cls::lock::Lock lock;
+
+public:
+ RadosRestoreSerializer(RadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie);
+
+ virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
+ virtual int unlock() override {
+ return lock.unlock(&ioctx, oid);
+ }
+};
+
+class RadosRestore : public Restore {
+ RadosStore* store;
+ const int num_objs;
+ const std::vector<std::string_view>& obj_names;
+ librados::IoCtx& ioctx;
+ ceph::containers::tiny_vector<LazyFIFO> fifos;
+
+public:
+ RadosRestore(RadosStore* _st, const int n_objs, const std::vector<std::string_view>& o_names) : store(_st),
+ num_objs(n_objs), obj_names(o_names),
+ ioctx(*store->getRados()->get_restore_pool_ctx()),
+ fifos(num_objs,
+ [&](const size_t ix, auto emplacer) {
+ emplacer.emplace(ioctx, std::string(obj_names[ix]));
+ }) {}
+
+ ~RadosRestore() override = default;
+
+ virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const RGWRestoreEntry& r_entry) override;
+ virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const std::list<RGWRestoreEntry>& restore_entries) override;
+ virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+ int index,
+ const std::string& marker, std::string* out_marker,
+ uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ bool* truncated) override;
+ virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker) override;
+ virtual std::unique_ptr<RestoreSerializer> get_serializer(
+ const std::string& lock_name,
+ const std::string& oid,
+ const std::string& cookie) override;
+ /** Below routines deal with actual FIFO */
+ int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
+ std::string_view max_marker();
+ int trim(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker);
+ int push(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, ceph::buffer::list&& bl);
+ int push(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, std::vector<ceph::buffer::list>&& items);
+};
+
class RadosNotification : public StoreNotification {
RadosStore* store;
/* XXX it feels incorrect to me that rgw::notify::reservation_t is
JSONFormattable tier_config;
+ rgw_pool restore_pool;
+
RGWZoneParams() : RGWSystemMetaObj() {}
explicit RGWZoneParams(const std::string& name) : RGWSystemMetaObj(name){}
RGWZoneParams(const rgw_zone_id& id, const std::string& name) : RGWSystemMetaObj(id.id, name) {}
const std::string& get_compression_type(const rgw_placement_rule& placement_rule) const;
void encode(bufferlist& bl) const override {
- ENCODE_START(16, 1, bl);
+ ENCODE_START(17, 1, bl);
encode(domain_root, bl);
encode(control_pool, bl);
encode(gc_pool, bl);
encode(account_pool, bl);
encode(group_pool, bl);
encode(dedup_pool, bl);
+ encode(restore_pool, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) override {
- DECODE_START(16, bl);
+ DECODE_START(17, bl);
decode(domain_root, bl);
decode(control_pool, bl);
decode(gc_pool, bl);
} else {
dedup_pool = name + ".rgw.dedup";
}
+ if (struct_v >= 17) {
+ decode(restore_pool, bl);
+ } else {
+ restore_pool = log_pool.name + ":restore";
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
false,
false,
false,
+ false,
false,
false, // No background tasks!
null_yield,
(g_conf()->rgw_enable_lc_threads &&
((!nfs) || (nfs && g_conf()->rgw_nfs_run_lc_threads)));
+ auto run_restore =
+ (g_conf()->rgw_enable_restore_threads &&
+ ((!nfs) || (nfs && g_conf()->rgw_nfs_run_restore_threads)));
+
auto run_quota =
(g_conf()->rgw_enable_quota_threads &&
((!nfs) || (nfs && g_conf()->rgw_nfs_run_quota_threads)));
site,
run_gc,
run_lc,
+ run_restore,
run_quota,
run_sync,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
CephContext *cct;
rgw::sal::Driver* driver;
std::unique_ptr<rgw::sal::Lifecycle> sal_lc;
+ std::unique_ptr<rgw::sal::Restore> sal_restore;
int max_objs{0};
std::string *obj_names{nullptr};
std::atomic<bool> down_flag = { false };
CephContext *get_cct() const override { return cct; }
rgw::sal::Lifecycle* get_lc() const { return sal_lc.get(); }
+ rgw::sal::Restore* get_restore() const { return sal_restore.get(); }
unsigned get_subsys() const;
std::ostream& gen_prefix(std::ostream& out) const;
exit(1);
}
- driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, true, null_yield);
+ driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, false, true, null_yield);
if (!driver) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
#include "rgw_iam_managed_policy.h"
#include "rgw_bucket_sync.h"
#include "rgw_bucket_logging.h"
+#include "rgw_restore.h"
#include "services/svc_zone.h"
#include "services/svc_quota.h"
*/
int handle_cloudtier_obj(req_state* s, const DoutPrefixProvider *dpp, rgw::sal::Driver* driver,
rgw::sal::Attrs& attrs, bool sync_cloudtiered, std::optional<uint64_t> days,
- bool restore_op, optional_yield y)
+ bool read_through, optional_yield y)
{
int op_ret = 0;
ldpp_dout(dpp, 20) << "reached handle cloud tier " << dendl;
auto attr_iter = attrs.find(RGW_ATTR_MANIFEST);
if (attr_iter == attrs.end()) {
- if (restore_op) {
+ if (!read_through) {
op_ret = -ERR_INVALID_OBJECT_STATE;
s->err.message = "only cloud tier object can be restored";
return op_ret;
decode(m, attr_iter->second);
if (!m.is_tier_type_s3()) {
ldpp_dout(dpp, 20) << "not a cloud tier object " << s->object->get_key().name << dendl;
- if (restore_op) {
+ if (!read_through) {
op_ret = -ERR_INVALID_OBJECT_STATE;
s->err.message = "only cloud tier object can be restored";
return op_ret;
auto iter = bl.cbegin();
decode(restore_status, iter);
}
- if (attr_iter == attrs.end() || restore_status == rgw::sal::RGWRestoreStatus::RestoreFailed) {
- // first time restore or previous restore failed
- rgw::sal::Bucket* pbucket = NULL;
- pbucket = s->bucket.get();
-
+ if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+ if (read_through) {
+ op_ret = -ERR_REQUEST_TIMEOUT;
+ ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
+ s->err.message = "restore is still in progress";
+ return op_ret;
+ } else {
+ // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS
+ return static_cast<int>(rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress);
+ }
+ } else if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) {
+ // corresponds to CLOUD_RESTORED
+ return static_cast<int>(rgw::sal::RGWRestoreStatus::CloudRestored);
+ } else { // first time restore or previous restore failed.
+ // Restore the object.
std::unique_ptr<rgw::sal::PlacementTier> tier;
rgw_placement_rule target_placement;
- target_placement.inherit_from(pbucket->get_placement_rule());
- attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
+ target_placement.inherit_from(s->bucket->get_placement_rule());
+ auto attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
if (attr_iter != attrs.end()) {
target_placement.storage_class = attr_iter->second.to_str();
}
op_ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier);
- ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier" << op_ret <<
- " storage class " << target_placement.storage_class << dendl;
if (op_ret < 0) {
- s->err.message = "failed to restore object";
+ ldpp_dout(dpp, -1) << "failed to fetch tier placement handle, ret = " << op_ret << dendl;
return op_ret;
+ } else {
+ ldpp_dout(dpp, 20) << "getting tier placement handle cloud tier for " <<
+ " storage class " << target_placement.storage_class << dendl;
}
- if (!restore_op) {
+
+ if (!tier->is_tier_type_s3()) {
+ ldpp_dout(dpp, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() <<
+ " for storage class " << target_placement.storage_class << dendl;
+ s->err.message = "failed to restore object";
+ return -EINVAL;
+ }
+
+ if (read_through) {
if (tier->allow_read_through()) {
days = tier->get_read_through_restore_days();
} else { //read-through is not enabled
return op_ret;
}
}
- // fill in the entry. XXX: Maybe we can avoid it by passing only necessary params
- rgw_bucket_dir_entry ent;
- ent.key.name = s->object->get_key().name;
- ent.key.instance = s->object->get_key().instance;
- ent.meta.accounted_size = ent.meta.size = s->obj_size;
- ent.meta.etag = "" ;
- uint64_t epoch = 0;
- op_ret = get_system_versioning_params(s, &epoch, NULL);
- if (!ent.key.instance.empty()) { // non-current versioned object
- ent.flags |= rgw_bucket_dir_entry::FLAG_VER;
- }
- ldpp_dout(dpp, 20) << "getting versioning params tier placement handle cloud tier" << op_ret << dendl;
- if (op_ret < 0) {
- ldpp_dout(dpp, 20) << "failed to get versioning params, op_ret = " << op_ret << dendl;
- s->err.message = "failed to restore object";
- return op_ret;
- }
- op_ret = s->object->restore_obj_from_cloud(pbucket, tier.get(), target_placement, ent,
- s->cct, tier_config, epoch,
- days, dpp, y, s->bucket->get_info().flags);
+
+ op_ret = driver->get_rgwrestore()->restore_obj_from_cloud(s->bucket.get(),
+ s->object.get(), tier.get(), days, y);
+
if (op_ret < 0) {
- ldpp_dout(dpp, 0) << "object " << ent.key.name << " fetching failed" << op_ret << dendl;
+ ldpp_dout(dpp, 0) << "Restore of object " << s->object->get_key() << " failed" << op_ret << dendl;
s->err.message = "failed to restore object";
return op_ret;
}
- ldpp_dout(dpp, 20) << "object " << ent.key.name << " fetching succeed" << dendl;
- /* Even if restore is complete the first read through request will return but actually downloaded
- * object asyncronously.
+
+ ldpp_dout(dpp, 20) << "Restore of object " << s->object->get_key() << " succeed" << dendl;
+ /* Even if restore is complete the first read through request will return
+ * but actually downloaded object asyncronously.
*/
- if (!restore_op) { //read-through
+ if (read_through) { //read-through
op_ret = -ERR_REQUEST_TIMEOUT;
ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
s->err.message = "restore is still in progress";
}
return op_ret;
- } else if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
- if (!restore_op) {
- op_ret = -ERR_REQUEST_TIMEOUT;
- ldpp_dout(dpp, 5) << "restore is still in progress, please check restore status and retry" << dendl;
- s->err.message = "restore is still in progress";
- return op_ret;
- } else {
- return 1; // for restore-op, corresponds to RESTORE_ALREADY_IN_PROGRESS
- }
- } else {
- return 2; // corresponds to CLOUD_RESTORED
}
} catch (const buffer::end_of_buffer&) {
//empty manifest; it's not cloud-tiered
- if (restore_op) {
+ if (!read_through) {
op_ret = -ERR_INVALID_OBJECT_STATE;
s->err.message = "only cloud tier object can be restored";
}
if (get_type() == RGW_OP_GET_OBJ && get_data) {
std::optional<uint64_t> days;
- op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, false, y);
+ op_ret = handle_cloudtier_obj(s, this, driver, attrs, sync_cloudtiered, days, true, y);
if (op_ret < 0) {
ldpp_dout(this, 4) << "Cannot get cloud tiered object: " << *s->object
<<". Failing with " << op_ret << dendl;
*env.site,
cct->_conf->rgw_enable_gc_threads,
cct->_conf->rgw_enable_lc_threads,
+ cct->_conf->rgw_enable_restore_threads,
cct->_conf->rgw_enable_quota_threads,
cct->_conf->rgw_run_sync_thread,
cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
}
}
} /* checksum_mode */
- auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
- if (attr_iter != attrs.end()) {
- rgw::sal::RGWRestoreType rt;
- bufferlist bl = attr_iter->second;
- auto iter = bl.cbegin();
- decode(rt, iter);
+
+ rgw::sal::RGWRestoreStatus restore_status;
+ auto r_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+ if (r_iter != attrs.end()) {
+ bufferlist rbl = r_iter->second;
+ auto iter = rbl.cbegin();
+ decode(restore_status, iter);
- rgw::sal::RGWRestoreStatus restore_status;
- attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
- if (attr_iter != attrs.end()) {
- bufferlist bl = attr_iter->second;
- auto iter = bl.cbegin();
- decode(restore_status, iter);
- }
-
//restore status
if (restore_status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
- dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
- }
- if (rt == rgw::sal::RGWRestoreType::Temporary) {
- auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
- ceph::real_time expiration_date;
-
- if (expire_iter != attrs.end()) {
- bufferlist bl = expire_iter->second;
+ dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
+ } else {
+ auto attr_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
+ if (attr_iter != attrs.end()) {
+ rgw::sal::RGWRestoreType rt;
+ bufferlist bl = attr_iter->second;
auto iter = bl.cbegin();
- decode(expiration_date, iter);
- }
- //restore status
- dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
- // temporary restore; set storage-class to cloudtier storage class
- auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS);
+ decode(rt, iter);
- if (c_iter != attrs.end()) {
- attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second;
+ if (rt == rgw::sal::RGWRestoreType::Temporary) {
+ auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
+ ceph::real_time expiration_date;
+
+ if (expire_iter != attrs.end()) {
+ bufferlist bl = expire_iter->second;
+ auto iter = bl.cbegin();
+ decode(expiration_date, iter);
+ }
+
+ //restore status
+ dump_header_if_nonempty(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
+
+ // temporary restore; set storage-class to cloudtier storage class
+ auto c_iter = attrs.find(RGW_ATTR_CLOUDTIER_STORAGE_CLASS);
+
+ if (c_iter != attrs.end()) {
+ attrs[RGW_ATTR_STORAGE_CLASS] = c_iter->second;
+ }
+ }
}
}
}
if (restore_ret == 0) {
s->err.http_ret = 202; // OK
- } else if (restore_ret == 1) {
- restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS;
- set_req_state_err(s, restore_ret);
- dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
- } else if (restore_ret == 2) {
- rgw::sal::Attrs attrs;
- ceph::real_time expiration_date;
- rgw::sal::RGWRestoreType rt;
- attrs = s->object->get_attrs();
- auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
- auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
-
- if (expire_iter != attrs.end()) {
- bufferlist bl = expire_iter->second;
- auto iter = bl.cbegin();
- decode(expiration_date, iter);
- }
-
- if (type_iter != attrs.end()) {
- bufferlist bl = type_iter->second;
- auto iter = bl.cbegin();
- decode(rt, iter);
- }
- if (rt == rgw::sal::RGWRestoreType::Temporary) {
- s->err.http_ret = 200; // OK
- dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
- } else {
- s->err.http_ret = 200;
- dump_header(s, "x-amz-restore", "ongoing-request=\"false\"");
+ } else {
+ rgw::sal::RGWRestoreStatus st = static_cast<rgw::sal::RGWRestoreStatus>(restore_ret);
+
+ if (st == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+ restore_ret = -ERR_RESTORE_ALREADY_IN_PROGRESS;
+ set_req_state_err(s, restore_ret);
+ dump_header(s, "x-amz-restore", "ongoing-request=\"true\"");
+ } else if (st == rgw::sal::RGWRestoreStatus::CloudRestored) {
+ rgw::sal::Attrs attrs;
+ ceph::real_time expiration_date;
+ rgw::sal::RGWRestoreType rt;
+ attrs = s->object->get_attrs();
+ auto expire_iter = attrs.find(RGW_ATTR_RESTORE_EXPIRY_DATE);
+ auto type_iter = attrs.find(RGW_ATTR_RESTORE_TYPE);
+
+ if (expire_iter != attrs.end()) {
+ bufferlist bl = expire_iter->second;
+ auto iter = bl.cbegin();
+ decode(expiration_date, iter);
+ }
+
+ if (type_iter != attrs.end()) {
+ bufferlist bl = type_iter->second;
+ auto iter = bl.cbegin();
+ decode(rt, iter);
+ }
+
+ if (rt == rgw::sal::RGWRestoreType::Temporary) {
+ s->err.http_ret = 200; // OK
+ dump_header(s, "x-amz-restore", "ongoing-request=\"false\", expiry-date=\""+ dump_time_to_str(expiration_date) +"\"");
+ } else {
+ s->err.http_ret = 200;
+ dump_header(s, "x-amz-restore", "ongoing-request=\"false\"");
+ }
}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <fmt/chrono.h>
+#include <string.h>
+#include <iostream>
+#include <map>
+#include <algorithm>
+#include <tuple>
+#include <functional>
+
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/variant.hpp>
+
+#include "include/scope_guard.h"
+#include "include/function2.hpp"
+#include "common/Formatter.h"
+#include "common/containers.h"
+#include "common/split.h"
+#include <common/errno.h>
+#include "include/random.h"
+#include "cls/lock/cls_lock_client.h"
+#include "rgw_perf_counters.h"
+#include "rgw_common.h"
+#include "rgw_bucket.h"
+#include "rgw_restore.h"
+#include "rgw_zone.h"
+#include "rgw_string.h"
+#include "rgw_multi.h"
+#include "rgw_sal.h"
+#include "rgw_lc_tier.h"
+#include "rgw_notify.h"
+#include "common/dout.h"
+
+#include "fmt/format.h"
+
+#include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
+#include "services/svc_tier_rados.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw_restore
+
+
+constexpr int32_t hours_in_a_day = 24;
+constexpr int32_t secs_in_a_day = hours_in_a_day * 60 * 60;
+
+using namespace std;
+using namespace rgw::sal;
+
+void RGWRestoreEntry::dump(Formatter *f) const
+{
+ encode_json("Bucket", bucket, f);
+ encode_json("Object", obj_key, f);
+ if (days) {
+ encode_json("Days", days, f);
+ } else {
+ encode_json("Days", 0, f);
+ }
+ encode_json("Zone_id", zone_id, f);
+ encode_json("Status", static_cast<int>(status), f);
+}
+
+void RGWRestoreEntry::decode_json(JSONObj *obj)
+{
+ JSONDecoder::decode_json("Bucket", bucket, obj);
+ JSONDecoder::decode_json("Object", obj_key, obj);
+ JSONDecoder::decode_json("Days", days, obj);
+ JSONDecoder::decode_json("Zone_id", zone_id, obj);
+ int st;
+ JSONDecoder::decode_json("Status", st, obj);
+ status = static_cast<rgw::sal::RGWRestoreStatus>(st);
+}
+
+void RGWRestoreEntry::generate_test_instances(std::list<RGWRestoreEntry*>& l)
+{
+ auto p = new RGWRestoreEntry;
+ rgw_bucket bk("tenant1", "bucket1");
+ rgw_obj_key obj("object1");
+ rgw_obj_key obj2("object2");
+ uint64_t days1 = 3;
+ std::optional<uint64_t> days;
+ std::string zone_id = "zone1";
+ rgw::sal::RGWRestoreStatus status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+
+ p->bucket = bk;
+ p->obj_key = obj;
+ p->zone_id = zone_id;
+ p->days = days;
+ p->status = status;
+ l.push_back(p);
+
+ p = new RGWRestoreEntry;
+ days = days1;
+ status = rgw::sal::RGWRestoreStatus::CloudRestored;
+ p->bucket = bk;
+ p->obj_key = obj2;
+ p->zone_id = zone_id;
+ p->days = days;
+ p->status = status;
+ l.push_back(p);
+
+ l.push_back(new RGWRestoreEntry);
+}
+
+void RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
+ cct = _cct;
+ driver = _driver;
+
+ /* max_objs indicates the number of shards or objects
+ * used to store Restore Entries */
+ max_objs = cct->_conf->rgw_restore_max_objs;
+ if (max_objs > HASH_PRIME)
+ max_objs = HASH_PRIME;
+
+ for (int i = 0; i < max_objs; i++) {
+ obj_names.push_back(fmt::format("{}.{}", restore_oid_prefix, i));
+ }
+ sal_restore = driver->get_restore(max_objs, obj_names);
+}
+
+void RGWRestore::finalize()
+{
+ obj_names.clear();
+}
+
+static inline std::ostream& operator<<(std::ostream &os, RGWRestoreEntry& ent) {
+ os << "<ent: bucket=";
+ os << ent.bucket;
+ os << "; obj_key=";
+ os << ent.obj_key;
+ os << "; days=";
+ os << ent.days;
+ os << "; zone_id=";
+ os << ent.zone_id;
+ os << "; status=";
+ os << rgw_restore_status_dump(ent.status);
+ os << ">";
+
+ return os;
+}
+
+void RGWRestore::RestoreWorker::stop()
+{
+ std::lock_guard l{lock};
+ cond.notify_all();
+}
+
+bool RGWRestore::going_down()
+{
+ return down_flag;
+}
+
+void RGWRestore::start_processor()
+{
+ worker = std::make_unique<RGWRestore::RestoreWorker>(this, cct, this);
+ worker->create("rgw_restore");
+}
+
+void RGWRestore::stop_processor()
+{
+ down_flag = true;
+ if (worker) {
+ worker->stop();
+ worker->join();
+ }
+ worker.reset(nullptr);
+}
+
+unsigned RGWRestore::get_subsys() const
+{
+ return dout_subsys;
+}
+
+std::ostream& RGWRestore::gen_prefix(std::ostream& out) const
+{
+ return out << "restore: ";
+}
+
+/* Hash based on both <bucket, obj> */
+int RGWRestore::choose_oid(const RGWRestoreEntry& e) {
+ int index;
+ const auto& name = e.bucket.name + e.obj_key.name + e.obj_key.instance;
+ index = ((ceph_str_hash_linux(name.data(), name.size())) % max_objs);
+ return static_cast<int>(index);
+}
+
+void *RGWRestore::RestoreWorker::entry() {
+ do {
+ utime_t start = ceph_clock_now();
+ int r = 0;
+ r = restore->process(this, null_yield);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: restore process() returned error r=" << r << dendl;
+ }
+ if (restore->going_down())
+ break;
+ utime_t end = ceph_clock_now();
+ end -= start;
+ int secs = cct->_conf->rgw_restore_processor_period;
+
+ if (secs <= end.sec())
+ continue; // next round
+
+ secs -= end.sec();
+ std::unique_lock locker{lock};
+ cond.wait_for(locker, std::chrono::seconds(secs));
+ } while (!restore->going_down());
+
+ return NULL;
+
+}
+
+int RGWRestore::process(RestoreWorker* worker, optional_yield y)
+{
+ int max_secs = cct->_conf->rgw_restore_lock_max_time;
+
+ const int start = ceph::util::generate_random_number(0, max_objs - 1);
+ for (int i = 0; i < max_objs; i++) {
+ int index = (i + start) % max_objs;
+ int ret = process(index, max_secs, y);
+ if (ret < 0)
+ return ret;
+ }
+ return 0;
+}
+
+/*
+ * Given an index, fetch a list of restore entries to process. After each
+ * iteration, trim the list to the last marker read.
+ *
+ * While processing the entries, if any of their restore operation is still in
+ * progress, such entries are added back to the list.
+ */
+int RGWRestore::process(int index, int max_secs, optional_yield y)
+{
+ ldpp_dout(this, 20) << "RGWRestore::process entered with Restore index_shard="
+ << index << ", max_secs=" << max_secs << dendl;
+
+ /* list used to gather still IN_PROGRESS */
+ std::list<RGWRestoreEntry> r_entries;
+
+ std::unique_ptr<rgw::sal::RestoreSerializer> serializer =
+ sal_restore->get_serializer(std::string(restore_index_lock_name),
+ std::string(obj_names[index]),
+ worker->thr_name());
+ utime_t end = ceph_clock_now();
+
+ /* max_secs should be greater than zero. We don't want a zero max_secs
+ * to be translated as no timeout, since we'd then need to break the
+ * lock and that would require a manual intervention. In this case
+ * we can just wait it out. */
+
+ if (max_secs <= 0)
+ return -EAGAIN;
+
+ end += max_secs;
+ utime_t time(max_secs, 0);
+ int ret = serializer->try_lock(this, time, null_yield);
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
+ ldpp_dout(this, 0) << "RGWRestore::process() failed to acquire lock on "
+ << obj_names[index] << dendl;
+ return -EBUSY;
+ }
+ if (ret < 0)
+ return 0;
+
+ std::unique_lock<rgw::sal::RestoreSerializer> lock(*(serializer.get()), std::adopt_lock);
+ std::string marker;
+ std::string next_marker;
+ bool truncated = false;
+
+ do {
+ int max = 100;
+ std::vector<RGWRestoreEntry> entries;
+
+ ret = sal_restore->list(this, y, index, marker, &next_marker, max, entries, &truncated);
+ ldpp_dout(this, 20) <<
+ "RGWRestore::process sal_restore->list returned with returned:" << ret <<
+ ", entries.size=" << entries.size() << ", truncated=" << truncated <<
+ ", next_marker='" << next_marker << "'" << dendl;
+
+ if (entries.size() == 0) {
+ lock.unlock();
+ return 0;
+ }
+
+ if (ret < 0)
+ goto done;
+
+ marker = next_marker;
+ std::vector<RGWRestoreEntry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ RGWRestoreEntry entry = *iter;
+
+ ret = process_restore_entry(entry, y);
+
+ if (entry.status == rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
+ r_entries.push_back(entry);
+ }
+
+ ///process all entries, trim and re-add
+ utime_t now = ceph_clock_now();
+ if (now >= end) {
+ goto done;
+ }
+
+ if (going_down()) {
+ // leave early, even if tag isn't removed, it's ok since it
+ // will be picked up next time around
+ goto done;
+ }
+ }
+ ret = sal_restore->trim_entries(this, y, index, marker);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWRestore::process() failed to trim entries on "
+ << obj_names[index] << dendl;
+ }
+
+ if (!r_entries.empty()) {
+ ret = sal_restore->add_entries(this, y, index, r_entries);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "RGWRestore::process() failed to add entries on "
+ << obj_names[index] << dendl;
+ }
+ }
+
+ r_entries.clear();
+ } while (truncated);
+
+done:
+ lock.unlock();
+
+ return 0;
+}
+
+int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y)
+{
+ int ret = 0;
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ std::unique_ptr<rgw::sal::Object> obj;
+ std::unique_ptr<rgw::sal::PlacementTier> tier;
+ std::optional<uint64_t> days = entry.days;
+ // Ensure its the same source zone processing temp entries as we do not
+ // replicate temp restored copies
+ if (days) { // temp copy
+ auto& zone_id = entry.zone_id;
+ if (driver->get_zone()->get_id() != zone_id) {
+ // XXX: Do we need to check if the zone is still valid
+ return 0; // skip processing this entry in this zone
+ }
+ }
+
+ // fill in the details from entry
+ // bucket, obj, days, state=in_progress
+ ret = driver->load_bucket(this, entry.bucket, &bucket, null_yield);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "Restore:get_bucket for " << bucket->get_name()
+ << " failed" << dendl;
+ return ret;
+ }
+ obj = bucket->get_object(entry.obj_key);
+
+ ret = obj->load_obj_state(this, null_yield, true);
+
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "Restore:get_object for " << entry.obj_key
+ << " failed" << dendl;
+ return ret;
+ }
+
+ rgw_placement_rule target_placement;
+ target_placement.inherit_from(bucket->get_placement_rule());
+
+ auto& attrs = obj->get_attrs();
+ auto attr_iter = attrs.find(RGW_ATTR_RESTORE_STATUS);
+ rgw::sal::RGWRestoreStatus restore_status = rgw::sal::RGWRestoreStatus::None;
+ if (attr_iter != attrs.end()) {
+ bufferlist bl = attr_iter->second;
+ auto iter = bl.cbegin();
+ decode(restore_status, iter);
+ }
+ if (restore_status == rgw::sal::RGWRestoreStatus::CloudRestored) {
+ // XXX: Check if expiry-date needs to be update
+ ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " already done" << dendl;
+ entry.status = rgw::sal::RGWRestoreStatus::CloudRestored;
+ return 0;
+ }
+
+ attr_iter = attrs.find(RGW_ATTR_STORAGE_CLASS);
+ if (attr_iter != attrs.end()) {
+ target_placement.storage_class = attr_iter->second.to_str();
+ }
+ ret = driver->get_zone()->get_zonegroup().get_placement_tier(target_placement, &tier);
+
+ if (ret < 0) {
+ ldpp_dout(this, -1) << "failed to fetch tier placement handle, ret = " << ret << dendl;
+ return ret;
+ } else {
+ ldpp_dout(this, 20) << "getting tier placement handle cloud tier for " <<
+ " storage class " << target_placement.storage_class << dendl;
+ }
+
+ if (!tier->is_tier_type_s3()) {
+ ldpp_dout(this, -1) << "ERROR: not s3 tier type - " << tier->get_tier_type() <<
+ " for storage class " << target_placement.storage_class << dendl;
+ return -EINVAL;
+ }
+
+ // now go ahead with restoring object
+ // XXX: first check if its already restored?
+ bool in_progress = true;
+ ret = obj->restore_obj_from_cloud(bucket.get(), tier.get(), cct, days, in_progress,
+ this, y);
+ if (ret < 0) {
+ ldpp_dout(this, -1) << "Restore of object(" << obj->get_key() << ") failed" << ret << dendl;
+ auto reset_ret = set_cloud_restore_status(this, obj.get(), y, rgw::sal::RGWRestoreStatus::RestoreFailed);
+ entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed;
+ if (reset_ret < 0) {
+ ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << obj->get_key() << ") " << reset_ret << dendl;
+ }
+ return ret;
+ }
+
+ if (in_progress) {
+ ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " still in progress" << dendl;
+ entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+ } else {
+ ldpp_dout(this, 20) << "Restore of object " << obj->get_key() << " succeeded" << dendl;
+ entry.status = rgw::sal::RGWRestoreStatus::RestoreFailed;
+ }
+ return ret;
+}
+
+time_t RGWRestore::thread_stop_at()
+{
+ uint64_t interval = (cct->_conf->rgw_restore_debug_interval > 0)
+ ? cct->_conf->rgw_restore_debug_interval : secs_in_a_day;
+
+ return time(nullptr) + interval;
+}
+
+int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp,
+ rgw::sal::Object* pobj, optional_yield y,
+ const rgw::sal::RGWRestoreStatus& restore_status)
+{
+ int ret = -1;
+
+ if (!pobj)
+ return ret;
+
+ pobj->set_atomic();
+
+ bufferlist bl;
+ using ceph::encode;
+ encode(restore_status, bl);
+
+ ret = pobj->modify_obj_attrs(RGW_ATTR_RESTORE_STATUS, bl, y, dpp, false);
+
+ return ret;
+}
+
+int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
+ rgw::sal::Object* pobj,
+ rgw::sal::PlacementTier* tier,
+ std::optional<uint64_t> days, optional_yield y)
+{
+ int ret = 0;
+
+ if (!pbucket || !pobj) {
+ ldpp_dout(this, -1) << "ERROR: Invalid bucket/object. Restore failed" << dendl;
+ return -EINVAL;
+ }
+
+ // set restore_status as RESTORE_ALREADY_IN_PROGRESS
+ ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress);
+ if (ret < 0) {
+ ldpp_dout(this, 0) << " Setting cloud restore status to RESTORE_ALREADY_IN_PROGRESS for the object(" << pobj->get_key() << " failed, ret=" << ret << dendl;
+ return ret;
+ }
+
+ // now go ahead with restoring object
+ bool in_progress = false;
+ ret = pobj->restore_obj_from_cloud(pbucket, tier, cct, days, in_progress, this, y);
+
+ if (ret < 0) {
+ ldpp_dout(this, 0) << "object " << pobj->get_key() << " fetching failed" << ret << dendl;
+ auto reset_ret = set_cloud_restore_status(this, pobj, y, rgw::sal::RGWRestoreStatus::RestoreFailed);
+
+ if (reset_ret < 0) {
+ ldpp_dout(this, -1) << "Setting restore status ad RestoreFailed failed for object(" << pobj->get_key() << ") " << reset_ret << dendl;
+ }
+
+ return ret;
+ }
+
+ ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl;
+ if (in_progress) {
+ // add restore entry to the list
+ RGWRestoreEntry entry;
+ entry.bucket = pbucket->get_key();
+ entry.obj_key = pobj->get_key();
+ entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
+ entry.days = days;
+ entry.zone_id = driver->get_zone()->get_id();
+
+ int index = choose_oid(entry);
+ ret = sal_restore->add_entry(this, y, index, entry);
+
+ if (ret < 0) {
+ ldpp_dout(this, -1) << "Adding restore entry of object(" << pobj->get_key() << ") failed" << ret << dendl;
+ }
+ }
+
+ ldpp_dout(this, 20) << "Restore of object " << pobj->get_key() << " succeeded" << dendl;
+ return ret;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <map>
+#include <array>
+#include <string>
+#include <iostream>
+
+#include "common/debug.h"
+
+#include "include/types.h"
+#include "include/rados/librados.hpp"
+#include "common/ceph_mutex.h"
+#include "common/Cond.h"
+#include "common/iso_8601.h"
+#include "common/Thread.h"
+#include "rgw_common.h"
+#include "cls/rgw/cls_rgw_types.h"
+#include "rgw_sal.h"
+
+#include <atomic>
+#include <tuple>
+
+#define HASH_PRIME 7877
+#define MAX_ID_LEN 255
+static constexpr std::string_view restore_oid_prefix = "restore";
+static constexpr std::string_view restore_index_lock_name = "restore_process";
+
+/** Single Restore entry state */
+struct RGWRestoreEntry {
+ rgw_bucket bucket;
+ rgw_obj_key obj_key;
+ std::optional<uint64_t> days;
+ std::string zone_id; // or should it be zone name?
+ rgw::sal::RGWRestoreStatus status;
+
+ RGWRestoreEntry() {}
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(bucket, bl);
+ encode(obj_key, bl);
+ encode(days, bl);
+ encode(zone_id, bl);
+ encode(status, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(bucket, bl);
+ decode(obj_key, bl);
+ decode(days, bl);
+ decode(zone_id, bl);
+ decode(status, bl);
+ DECODE_FINISH(bl);
+ }
+ void dump(ceph::Formatter* f) const;
+ void decode_json(JSONObj* obj);
+ static void generate_test_instances(std::list<RGWRestoreEntry*>& l);
+};
+WRITE_CLASS_ENCODER(RGWRestoreEntry)
+
+class RGWRestore : public DoutPrefixProvider {
+ CephContext *cct;
+ rgw::sal::Driver* driver;
+ std::unique_ptr<rgw::sal::Restore> sal_restore;
+ int max_objs{0};
+ std::vector<std::string_view> obj_names;
+ std::atomic<bool> down_flag = { false };
+
+ class RestoreWorker : public Thread
+ {
+ const DoutPrefixProvider *dpp;
+ CephContext *cct;
+ RGWRestore *restore;
+ ceph::mutex lock = ceph::make_mutex("RestoreWorker");
+ ceph::condition_variable cond;
+
+ public:
+
+ using lock_guard = std::lock_guard<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
+
+ RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWRestore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {}
+ RGWRestore* get_restore() { return restore; }
+ std::string thr_name() {
+ return std::string{"restore_thrd: "}; // + std::to_string(ix);
+ }
+ void *entry() override;
+ void stop();
+
+ friend class RGWRados;
+ }; // RestoreWorker
+
+ std::unique_ptr<RGWRestore::RestoreWorker> worker;
+
+public:
+ ~RGWRestore() {
+ stop_processor();
+ finalize();
+ }
+
+ friend class RGWRados;
+
+ RGWRestore() : cct(nullptr), driver(nullptr), max_objs(0) {}
+
+ void initialize(CephContext *_cct, rgw::sal::Driver* _driver);
+ void finalize();
+
+ bool going_down();
+ void start_processor();
+ void stop_processor();
+
+ CephContext *get_cct() const override { return cct; }
+ rgw::sal::Restore* get_restore() const { return sal_restore.get(); }
+ unsigned get_subsys() const;
+
+ std::ostream& gen_prefix(std::ostream& out) const;
+
+ int process(RestoreWorker* worker, optional_yield y);
+ int choose_oid(const RGWRestoreEntry& e);
+ int process(int index, int max_secs, optional_yield y);
+ int process_restore_entry(RGWRestoreEntry& entry, optional_yield y);
+ time_t thread_stop_at();
+
+ /** Set the restore status for the given object */
+ int set_cloud_restore_status(const DoutPrefixProvider* dpp, rgw::sal::Object* pobj,
+ optional_yield y,
+ const rgw::sal::RGWRestoreStatus& restore_status);
+
+ /** Given <bucket, obj>, restore the object from the cloud-tier. In case the
+ * object cannot be restored immediately, save that restore state(/entry)
+ * to be procesed later by RestoreWorker thread. */
+ int restore_obj_from_cloud(rgw::sal::Bucket* pbucket, rgw::sal::Object* pobj,
+ rgw::sal::PlacementTier* tier,
+ std::optional<uint64_t> days, optional_yield y);
+};
const rgw::SiteConfig& site_config,
bool use_gc_thread,
bool use_lc_thread,
+ bool use_restore_thread,
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
.set_use_gc(use_gc)
.set_run_gc_thread(use_gc_thread)
.set_run_lc_thread(use_lc_thread)
+ .set_run_restore_thread(use_restore_thread)
.set_run_quota_threads(quota_threads)
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
.set_use_datacache(true)
.set_run_gc_thread(use_gc_thread)
.set_run_lc_thread(use_lc_thread)
+ .set_run_restore_thread(use_restore_thread)
.set_run_quota_threads(quota_threads)
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
struct RGWBucketEnt;
class RGWRESTMgr;
class RGWLC;
+class RGWRestore;
struct rgw_user_bucket;
class RGWUsageBatch;
class RGWCoroutinesManagerRegistry;
struct rgw_pubsub_topic;
struct RGWOIDCProviderInfo;
struct RGWRoleInfo;
+struct RGWRestoreEntry;
using RGWBucketListNameFilter = std::function<bool (const std::string&)>;
// delete object where head object is missing)
static constexpr uint32_t FLAG_FORCE_OP = 0x0004;
-enum RGWRestoreStatus : uint8_t {
+enum class RGWRestoreStatus : uint8_t {
None = 0,
RestoreAlreadyInProgress = 1,
CloudRestored = 2,
virtual int cluster_stat(RGWClusterStat& stats) = 0;
/** Get a @a Lifecycle object. Used to manage/run lifecycle transitions */
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
+ /** Get a @a Restore object. Used to manage/run restore objects */
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) = 0;
/** Reset the temporarily restored objects which are expired */
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) = 0;
const DoutPrefixProvider* dpp) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
+ /** Get access to the tier restore management thread */
+ virtual RGWRestore* get_rgwrestore(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0;
optional_yield y) = 0;
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) = 0;
+ optional_yield y) = 0;
/** Check to see if two placement rules match */
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) = 0;
/** Dump driver-specific object layout info in JSON */
const std::string& cookie) = 0;
};
+/** @brief Abstraction of a serializer for Restore
+ */
+class RestoreSerializer : public Serializer {
+public:
+ RestoreSerializer() {}
+ virtual ~RestoreSerializer() = default;
+};
+
+/**
+ * @brief Abstraction for restore processing
+ *
+ * The Restore class is designed to manage the restoration of objects
+ * from cloud tier storage back into the Ceph cluster. This is particularly used
+ * for objects stored in cold storage solutions like AWS Glacier or Tape-based systems,
+ * where retrieval operations are asynchronous and can take a significant amount of time.
+ */
+class Restore {
+
+public:
+ Restore() = default;
+ virtual ~Restore() = default;
+ /** Add a single restore entry state */
+ virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const RGWRestoreEntry& r_entry) = 0;
+ /** Add list of restore entries */
+ virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const std::list<RGWRestoreEntry>& restore_entries) = 0;
+ /** List all known entries given a marker */
+ virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+ int index,
+ const std::string& marker, std::string* out_marker,
+ uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ bool* truncated) = 0;
+
+ /** Trim restore entries upto the marker */
+ virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker) = 0;
+ /* Check if the list is empty */
+ virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
+
+ /** Get a serializer for restore processing */
+ virtual std::unique_ptr<RestoreSerializer> get_serializer(
+ const std::string& lock_name,
+ const std::string& oid,
+ const std::string& cookie) = 0;
+};
+
/**
* @brief Abstraction for a Notification event
*
const rgw::SiteConfig& site_config,
bool use_gc_thread,
bool use_lc_thread,
+ bool use_restore_thread,
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
site_config,
use_gc_thread,
use_lc_thread,
+ use_restore_thread,
quota_threads,
run_sync_thread,
run_reshard_thread,
const rgw::SiteConfig& site_config,
bool use_gc_thread,
bool use_lc_thread,
+ bool use_restore_thread,
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
return std::make_unique<LCDBSerializer>(store, oid, lock_name, cookie);
}
+ std::unique_ptr<Restore> DBStore::get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names)
+ {
+ return nullptr;
+ }
+
std::unique_ptr<Notification> DBStore::get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s,
rgw::notify::EventType event_type, optional_yield y,
virtual int list_all_zones(const DoutPrefixProvider* dpp, std::list<std::string>& zone_ids) override;
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) override;
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual std::unique_ptr<Notification> get_notification(
const std::string& topic_queue) override;
virtual RGWLC* get_rgwlc(void) override;
+ virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
return next->process_expired_objects(dpp, y);
}
+std::unique_ptr<Restore> FilterDriver::get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names)
+{
+ std::unique_ptr<Restore> restore = next->get_restore(n_objs, obj_names);
+ return std::make_unique<FilterRestore>(std::move(restore));
+}
+
std::unique_ptr<Notification> FilterDriver::get_notification(rgw::sal::Object* obj,
rgw::sal::Object* src_obj, req_state* s,
rgw::notify::EventType event_type, optional_yield y,
return next->get_rgwlc();
}
+RGWRestore* FilterDriver::get_rgwrestore()
+{
+ return next->get_rgwrestore();
+}
+
RGWCoroutinesManagerRegistry* FilterDriver::get_cr_registry()
{
return next->get_cr_registry();
int FilterObject::restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
- const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags)
+ bool& in_progress,
+ const DoutPrefixProvider* dpp,
+ optional_yield y)
{
return next->restore_obj_from_cloud(nextBucket(bucket), nextPlacementTier(tier),
- placement_rule, o, cct, tier_config, olh_epoch, days, dpp, y, flags);
+ cct, days, in_progress, dpp, y);
}
bool FilterObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
return std::make_unique<FilterLCSerializer>(std::move(ns));
}
+int FilterRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur,
+ optional_yield y)
+{
+ return next->try_lock(dpp, dur, y);
+}
+
+std::unique_ptr<RestoreSerializer> FilterRestore::get_serializer(const std::string& lock_name,
+ const std::string& oid,
+ const std::string& cookie) {
+ std::unique_ptr<RestoreSerializer> ns;
+ ns = next->get_serializer(lock_name, oid, cookie);
+ return std::make_unique<FilterRestoreSerializer>(std::move(ns));
+}
+
+int FilterRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const RGWRestoreEntry& r_entry) {
+ return next->add_entry(dpp, y, index, r_entry);
+}
+
+int FilterRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+ int index,
+ const std::list<RGWRestoreEntry>& restore_entries) {
+ return next->add_entries(dpp, y, index, restore_entries);
+}
+
+/** List all known entries */
+int FilterRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string& marker, std::string* out_marker,
+ uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ bool* truncated) {
+ return next->list(dpp, y, index, marker, out_marker, max_entries,
+ entries, truncated);
+}
+
+int FilterRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker) {
+ return next->trim_entries(dpp, y, index, marker);
+}
+
+int FilterRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
+ return next->is_empty(dpp, y);
+}
+
int FilterNotification::publish_reserve(const DoutPrefixProvider *dpp,
RGWObjTags* obj_tags)
{
}
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
+ virtual std::unique_ptr<Restore> get_restore(const int n_objs,
+ const std::vector<std::string_view>& obj_names) override;
virtual bool process_expired_objects(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj,
return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
}
virtual RGWLC* get_rgwlc(void) override;
+ virtual RGWRestore* get_rgwrestore(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket,
optional_yield y) override;
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) override;
+ optional_yield y) override;
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) override;
virtual int dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y,
Formatter* f) override;
const std::string& cookie) override;
};
+class FilterRestoreSerializer : public RestoreSerializer {
+
+protected:
+ std::unique_ptr<RestoreSerializer> next;
+
+public:
+ FilterRestoreSerializer(std::unique_ptr<RestoreSerializer> _next) : next(std::move(_next)) {}
+ virtual ~FilterRestoreSerializer() = default;
+ virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
+ virtual int unlock() override { return next->unlock(); }
+ virtual void print(std::ostream& out) const override { return next->print(out); }
+};
+
+class FilterRestore : public Restore {
+
+protected:
+ std::unique_ptr<Restore> next;
+
+public:
+ FilterRestore(std::unique_ptr<Restore> _next) : next(std::move(_next)) {}
+ ~FilterRestore() override = default;
+
+ virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
+ int index, const RGWRestoreEntry& r_entry) override;
+ virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
+ int index,
+ const std::list<RGWRestoreEntry>& restore_entries) override;
+
+ /** List all known entries */
+ virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
+ int index,
+ const std::string& marker, std::string* out_marker,
+ uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ bool* truncated) override;
+ virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
+ int index, const std::string_view& marker) override;
+ virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
+
+ /** Get a serializer for lifecycle */
+ virtual std::unique_ptr<RestoreSerializer> get_serializer(
+ const std::string& lock_name,
+ const std::string& oid,
+ const std::string& cookie) override;
+};
+
class FilterNotification : public Notification {
protected:
std::unique_ptr<Notification> next;
class Object;
class MultipartUpload;
class Lifecycle;
+ class Restore;
class Notification;
class Writer;
class PlacementTier;
}
virtual int restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
- rgw_placement_rule& placement_rule,
- rgw_bucket_dir_entry& o,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
+ bool& in_progress,
const DoutPrefixProvider* dpp,
- optional_yield y,
- uint32_t flags) override {
+ optional_yield y) override {
return -1;
}
jspan_context& get_trace() override { return trace_ctx; }
virtual void print(std::ostream& out) const override { out << oid; }
};
+class StoreRestoreSerializer : public RestoreSerializer {
+
+protected:
+ std::string oid;
+
+public:
+ StoreRestoreSerializer() {}
+ StoreRestoreSerializer(std::string _oid) : oid(_oid) {}
+
+ virtual ~StoreRestoreSerializer() = default;
+ virtual void print(std::ostream& out) const override { out << oid; }
+};
+
+
class StoreNotification : public Notification {
protected:
Object* obj;
JSONDecoder::decode_json("placement_pools", placement_pools, obj);
JSONDecoder::decode_json("tier_config", tier_config, obj);
JSONDecoder::decode_json("realm_id", realm_id, obj);
+ JSONDecoder::decode_json("restore_pool", restore_pool, obj);
}
void RGWZoneParams::dump(Formatter *f) const
encode_json("placement_pools", placement_pools, f);
encode_json("tier_config", tier_config, f);
encode_json("realm_id", realm_id, f);
+ encode_json("restore_pool", restore_pool, f);
}
int RGWZoneParams::init(const DoutPrefixProvider *dpp,
info.dedup_pool = fix_zone_pool_dup(pools, info.name, ".rgw.dedup", info.dedup_pool);
info.gc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:gc", info.gc_pool);
info.lc_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:lc", info.lc_pool);
+ info.restore_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:restore", info.restore_pool);
info.log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log", info.log_pool);
info.intent_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:intent", info.intent_log_pool);
info.usage_log_pool = fix_zone_pool_dup(pools, info.name, ".rgw.log:usage", info.usage_log_pool);
false,
false,
false,
+ false,
false,
true, true, null_yield,
false));