From: Soumya Koduri Date: Fri, 6 Jun 2025 20:12:17 +0000 (+0530) Subject: rgw/restore: Update to neorados FIFO routines X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=faf06bca959d8e8f2d40f610ae2ed409a69271f6;p=ceph.git rgw/restore: Update to neorados FIFO routines Use new neorados/FIFO routines to store restore state. Note: Old librados ioctx is also still retained as it is needed by RestoreRadosSerializer. Signed-off-by: Soumya Koduri --- diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 347a8b7350bdc..35853b368d708 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -22,6 +22,7 @@ #include "common/Throttle.h" #include "common/BackTrace.h" #include "common/ceph_time.h" +#include "common/async/blocked_completion.h" #include "rgw_asio_thread.h" #include "rgw_cksum.h" @@ -1150,7 +1151,6 @@ void RGWRados::finalize() if (use_restore_thread) { restore->stop_processor(); } - delete restore; restore = NULL; } @@ -1255,6 +1255,10 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (ret < 0) return ret; + ret = open_restore_pool_neo_ctx(dpp); + if (ret < 0) + return ret; + ret = open_objexp_pool_ctx(dpp); if (ret < 0) return ret; @@ -1378,7 +1382,7 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y, rgw if (use_lc_thread) lc->start_processor(); - restore = new RGWRestore(); + restore = make_unique(); ret = restore->initialize(cct, this->driver); if (ret < 0) { @@ -1515,6 +1519,24 @@ int RGWRados::open_restore_pool_ctx(const DoutPrefixProvider *dpp) return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().restore_pool, restore_pool_ctx, true, true); } +int RGWRados::open_restore_pool_neo_ctx(const DoutPrefixProvider *dpp) +{ + maybe_warn_about_blocking(dpp); + try { + restore_pool_neo_ctx = rgw::init_iocontext(dpp, driver->get_neorados(), + svc.zone->get_zone_params().restore_pool, + rgw::create, ceph::async::use_blocked); + + } catch (const boost::system::system_error& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Failed to initialized ioctx: " + << e.what() + << ", for restore pool" << dendl; + return ceph::from_error_code(e.code()); + } + return 0; +} + + int RGWRados::open_objexp_pool_ctx(const DoutPrefixProvider *dpp) { return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().log_pool, objexp_pool_ctx, true, true); diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index a3103f2960cfa..6a1731d4411f6 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -41,6 +41,7 @@ #include "rgw_sal_fwd.h" #include "rgw_pubsub.h" #include "rgw_tools.h" +#include "rgw_restore.h" struct D3nDataCache; struct RGWLCCloudTierCtx; @@ -359,6 +360,7 @@ class RGWRados int open_gc_pool_ctx(const DoutPrefixProvider *dpp); int open_lc_pool_ctx(const DoutPrefixProvider *dpp); int open_restore_pool_ctx(const DoutPrefixProvider *dpp); + int open_restore_pool_neo_ctx(const DoutPrefixProvider *dpp); int open_objexp_pool_ctx(const DoutPrefixProvider *dpp); int open_reshard_pool_ctx(const DoutPrefixProvider *dpp); int open_notif_pool_ctx(const DoutPrefixProvider *dpp); @@ -373,7 +375,7 @@ class RGWRados rgw::sal::RadosStore* driver{nullptr}; RGWGC* gc{nullptr}; RGWLC* lc{nullptr}; - RGWRestore* restore{nullptr}; + std::unique_ptr restore{nullptr}; RGWObjectExpirer* obj_expirer{nullptr}; bool use_gc_thread{false}; bool use_lc_thread{false}; @@ -453,6 +455,7 @@ protected: librados::IoCtx gc_pool_ctx; // .rgw.gc librados::IoCtx lc_pool_ctx; // .rgw.lc librados::IoCtx restore_pool_ctx; // .rgw.restore + neorados::IOContext restore_pool_neo_ctx; // .rgw.restore librados::IoCtx objexp_pool_ctx; librados::IoCtx reshard_pool_ctx; librados::IoCtx notif_pool_ctx; // .rgw.notif @@ -504,8 +507,8 @@ public: return gc; } - RGWRestore *get_restore() { - return restore; + rgw::restore::Restore *get_restore() { + return restore.get(); } RGWRados& set_run_gc_thread(bool _use_gc_thread) { @@ -551,6 +554,10 @@ public: return &restore_pool_ctx; } + neorados::IOContext* get_restore_pool_neo_ctx() { + return &restore_pool_neo_ctx; + } + librados::IoCtx& get_notif_pool_ctx() { return notif_pool_ctx; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index c1061965fb5d2..b7749bce71b18 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -25,12 +25,15 @@ #include #include "common/async/blocked_completion.h" +#include "neorados/cls/fifo.h" #include "common/ceph_time.h" #include "common/Clock.h" #include "common/errno.h" +#include "common/async/blocked_completion.h" #include "librados/AioCompletionImpl.h" +#include "rgw_asio_thread.h" #include "cls/rgw/cls_rgw_client.h" @@ -3038,8 +3041,6 @@ int RadosObject::transition(Bucket* bucket, int RadosObject::restore_obj_from_cloud(Bucket* bucket, rgw::sal::PlacementTier* tier, CephContext* cct, - RGWObjTier& tier_config, - uint64_t olh_epoch, std::optional days, bool& in_progress, const DoutPrefixProvider* dpp, @@ -3066,10 +3067,6 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, using ceph::decode; decode(m, attr_iter->second); m.get_tier_config(&tier_config); - } catch (const buffer::end_of_buffer&) { - //empty manifest; it's not cloud-tiered - ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl; - return -EIO; } catch (const std::exception& e) { ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl; return -EIO; @@ -3088,7 +3085,7 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, rgw_bucket_dir_entry ent; ent.key.name = get_key().name; ent.key.instance = get_key().instance; - ent.meta.accounted_size = ent.meta.size = get_obj_size(); + ent.meta.accounted_size = ent.meta.size = get_size(); ent.meta.etag = "" ; if (!ent.key.instance.empty()) { // non-current versioned object @@ -3141,7 +3138,7 @@ int RadosObject::restore_obj_from_cloud(Bucket* bucket, return ret; } - ldpp_dout(dpp, 20) << "Sucessfully restored object(" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl; + ldpp_dout(dpp, 20) << "Sucessfully restored object(" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl; return ret; } @@ -4664,6 +4661,19 @@ int RadosRestoreSerializer::try_lock(const DoutPrefixProvider *dpp, utime_t dur, return lock.lock_exclusive((librados::IoCtx*)(&ioctx), oid); } +int RadosRestoreSerializer::unlock(const DoutPrefixProvider *dpp, optional_yield y) +{ + librados::ObjectWriteOperation op; + op.assert_exists(); + lock.unlock(&op); + return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); +} + +RadosRestore::RadosRestore(RadosStore* _st) : store(_st), + ioctx(*store->getRados()->get_restore_pool_ctx()), + r(store->get_neorados()), + neo_ioctx(*store->getRados()->get_restore_pool_neo_ctx()) {} + std::unique_ptr RadosRestore::get_serializer( const std::string& lock_name, const std::string& oid, @@ -4675,26 +4685,30 @@ std::unique_ptr RadosRestore::get_serializer( int RadosRestore::initialize(const DoutPrefixProvider* dpp, optional_yield y, int n_objs, std::vector& o_names) { - int ret = 0; num_objs = n_objs; obj_names = o_names; + maybe_warn_about_blocking(dpp); for (auto i=0; i < num_objs; i++) { - std::unique_ptr fifo_tmp; - ret = rgw::cls::fifo::FIFO::create(dpp, ioctx, obj_names[i], &fifo_tmp, y); - - ldpp_dout(dpp, 20) << "creating fifo object for index=" << i - << ", objname=" << obj_names[i] << - " returned ret=" << ret << dendl; + std::unique_ptr fifo_tmp; + try { + fifo_tmp = fifo::FIFO::create(dpp, r, obj_names[i], neo_ioctx, ceph::async::use_blocked); + } catch (const sys::system_error& e) { + ldpp_dout(dpp, -1) << "creating fifo object for index=" << i + << ", objname=" << obj_names[i] << " failed : " << e.what() << dendl; + return ceph::from_error_code(e.code()); + } - if (ret) { - return ret; + if (!fifo_tmp) { + return -ENOMEM; } + ldpp_dout(dpp, 20) << "created fifo object for index=" << i + << ", objname=" << obj_names[i] << dendl; fifos.push_back(std::move(fifo_tmp)); } - return ret; + return 0; } void RadosRestore::finalize() { @@ -4702,28 +4716,10 @@ void RadosRestore::finalize() { fifos.clear(); } -int RadosRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y, - int index, const RGWRestoreEntry& entry) { - bufferlist bl; - - encode(entry, bl); - - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ - << "Adding entry:(" << entry.bucket << "," << entry.obj_key << ") to FIFO:" << obj_names[index] << dendl; - - auto ret = push(dpp, y, index, std::move(bl)); - if (ret < 0) { - ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl; - return ret; - } - - return 0; -} - int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, - int index, const std::list& restore_entries) { - std::vector ent_list; + int index, const std::vector& restore_entries) { + std::deque ent_list; for (auto& entry : restore_entries) { bufferlist bl; @@ -4734,7 +4730,8 @@ int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, } ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ - << "Adding " << restore_entries.size() << " entries to FIFO:" << obj_names[index] << dendl; + << "Adding " << restore_entries.size() << " entries to FIFO:" + << obj_names[index] << dendl; int ret = push(dpp, y, index, std::move(ent_list)); @@ -4747,17 +4744,19 @@ int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, } int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y, - int index, std::vector&& items) { + int index, std::deque&& items) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "Pushing entries to FIFO:" << obj_names[index] << dendl; - - auto r = fifos[index]->push(dpp, items, y); - if (r < 0) { + maybe_warn_about_blocking(dpp); + try { + fifos[index]->push(dpp, items, ceph::async::use_blocked); + } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": unable to push to FIFO: " << obj_names[index] - << ": " << cpp_strerror(-r) << dendl; - } - return r; + << ": " << e.what() << dendl; + return ceph::from_error_code(e.code()); + } + return 0; } int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y, @@ -4765,19 +4764,22 @@ int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y, ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "Pushing entry to FIFO:" << obj_names[index] << dendl; - auto r = fifos[index]->push(dpp, std::move(bl), y); - if (r < 0) { + maybe_warn_about_blocking(dpp); + try { + fifos[index]->push(dpp, std::move(bl), ceph::async::use_blocked); + } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": unable to push to FIFO: " << obj_names[index] - << ": " << cpp_strerror(-r) << dendl; + << ": " << e.what() << dendl; + return ceph::from_error_code(e.code()); } - return r; + return 0; } struct rgw_restore_fifo_entry { std::string id; ceph::real_time mtime; - RGWRestoreEntry entry; + rgw::restore::RestoreEntry entry; rgw_restore_fifo_entry() {} void encode(ceph::buffer::list& bl) const { @@ -4803,53 +4805,58 @@ WRITE_CLASS_ENCODER(rgw_restore_fifo_entry) int RadosRestore::list(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string& marker, std::string* out_marker, - uint32_t max_entries, std::vector& entries, + uint32_t max_entries, std::vector& entries, bool* truncated) { - std::vector restore_entries; + std::vector restore_entries{max_entries}; + std::string omark = {}; bool more = false; ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "Listing entries from FIFO:" << obj_names[index] << dendl; - auto r = fifos[index]->list(dpp, max_entries, marker, &restore_entries, &more, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to list FIFO: " << obj_names[index] - << ": " << cpp_strerror(-r) << dendl; - return r; - } - - entries.clear(); + maybe_warn_about_blocking(dpp); + try { + auto [lentries, lmark] = fifos[index]->list(dpp, marker, + restore_entries, ceph::async::use_blocked); + entries.clear(); - for (const auto& entry : restore_entries) { + for (const auto& entry : lentries) { rgw_restore_fifo_entry r_entry; r_entry.id = entry.marker; r_entry.mtime = entry.mtime; auto liter = entry.data.cbegin(); - try { - decode(r_entry.entry, liter); - } catch (const buffer::error& err) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": failed to decode restore entry: " - << err.what() << dendl; - return -EIO; - } - RGWRestoreEntry& e = r_entry.entry; + decode(r_entry.entry, liter); + rgw::restore::RestoreEntry& e = r_entry.entry; entries.push_back(std::move(e)); + omark = entry.marker; + } + if (!lmark.empty()) { + more = true; + } + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + } else { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": unable to list FIFO: " << obj_names[index] + << ": " << e.what() << dendl; + return ceph::from_error_code(e.code()); + } } - if (truncated) + if (truncated) { *truncated = more; + } - if (out_marker && !restore_entries.empty()) { - *out_marker = restore_entries.back().marker; + if (out_marker) { + *out_marker = omark; } ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "Listing from FIFO:" << obj_names[index] << ", returned:" - << restore_entries.size() << " entries, truncated:" << more + << restore_entries.size() << " entries, truncated:" + << (truncated ? *truncated : false) << ", out_marker:" << (out_marker ? *out_marker : "") << dendl; return 0; @@ -4869,41 +4876,16 @@ int RadosRestore::trim(const DoutPrefixProvider *dpp, optional_yield y, ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << "Trimming FIFO:" << obj_names[index] << " upto marker:" << marker << dendl; - auto r = fifos[index]->trim(dpp, marker, false, y); - if (r < 0) { + maybe_warn_about_blocking(dpp); + try { + fifos[index]->trim(dpp, std::string(marker), false, ceph::async::use_blocked); + } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": unable to trim FIFO: " << obj_names[index] - << ": " << cpp_strerror(-r) << dendl; + << ": " << e.what() << dendl; + return ceph::from_error_code(e.code()); } - - return r; -} - -std::string_view RadosRestore::max_marker() { - static const std::string mm = rgw::cls::fifo::marker::max().to_string(); - return std::string_view(mm); -} - -int RadosRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) { - std::vector restore_entries; - bool more = false; - - for (auto shard = 0u; shard < fifos.size(); ++shard) { - auto r = fifos[shard]->list(dpp, 1, {}, &restore_entries, &more, y); - if (r < 0) { - ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": unable to list FIFO: " << obj_names[shard] - << ": " << cpp_strerror(-r) << dendl; - return r; - } - if (!restore_entries.empty()) { - ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ - << "Entries found in FIFO:" << obj_names[shard] << dendl; - return 0; - } - } - - return 1; + return 0; } int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags) diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index e7520139e2cf1..d51d968aa6158 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -344,7 +344,7 @@ class RadosStore : public StoreDriver { optional_yield y, const DoutPrefixProvider* dpp) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } - virtual RGWRestore* get_rgwrestore(void) override { return rados->get_restore(); } + virtual rgw::restore::Restore* get_rgwrestore(void) override { return rados->get_restore(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info, optional_yield y) override; @@ -954,22 +954,20 @@ public: RadosRestoreSerializer(RadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie); virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override; - virtual int unlock() override { - return lock.unlock(&ioctx, oid); - } + virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override; }; class RadosRestore : public StoreRestore { RadosStore* store; librados::IoCtx& ioctx; + neorados::RADOS& r; + neorados::IOContext neo_ioctx; + std::vector> fifos; int num_objs; std::vector obj_names; - std::vector> fifos; public: - RadosRestore(RadosStore* _st) : store(_st), - ioctx(*store->getRados()->get_restore_pool_ctx()) {} - + RadosRestore(RadosStore* _st) ; ~RadosRestore() override { finalize(); } @@ -978,14 +976,12 @@ public: int n_objs, std::vector& obj_names) override; void finalize(); - virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, - int index, const RGWRestoreEntry& r_entry) override; virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, - int index, const std::list& restore_entries) override; + int index, const std::vector& restore_entries) override; virtual int list(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string& marker, std::string* out_marker, - uint32_t max_entries, std::vector& entries, + uint32_t max_entries, std::vector& entries, bool* truncated) override; virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string_view& marker) override; @@ -994,14 +990,12 @@ public: const std::string& oid, const std::string& cookie) override; /** Below routines deal with actual FIFO */ - int is_empty(const DoutPrefixProvider *dpp, optional_yield y); - std::string_view max_marker(); int trim(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string_view& marker); int push(const DoutPrefixProvider *dpp, optional_yield y, int index, ceph::buffer::list&& bl); int push(const DoutPrefixProvider *dpp, optional_yield y, - int index, std::vector&& items); + int index, std::deque&& items); }; class RadosNotification : public StoreNotification { diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 07098b1bf8313..3b003bb6d63f6 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -5445,7 +5445,7 @@ void RGWRestoreObj::execute(optional_yield y) } rgw::sal::Attrs attrs; attrs = s->object->get_attrs(); - op_ret = handle_cloudtier_obj(s, this, driver, attrs, false, expiry_days, true, y); + op_ret = handle_cloudtier_obj(s, this, driver, attrs, false, expiry_days, false, y); restore_ret = op_ret; ldpp_dout(this, 20) << "Restore completed of object: " << *s->object << "with op ret: " << restore_ret <(status), f); } -void RGWRestoreEntry::decode_json(JSONObj *obj) +void RestoreEntry::decode_json(JSONObj *obj) { JSONDecoder::decode_json("Bucket", bucket, obj); JSONDecoder::decode_json("Object", obj_key, obj); @@ -74,9 +77,9 @@ void RGWRestoreEntry::decode_json(JSONObj *obj) status = static_cast(st); } -void RGWRestoreEntry::generate_test_instances(std::list& l) +void RestoreEntry::generate_test_instances(std::list& l) { - auto p = new RGWRestoreEntry; + auto p = new RestoreEntry; rgw_bucket bk("tenant1", "bucket1"); rgw_obj_key obj("object1"); rgw_obj_key obj2("object2"); @@ -92,7 +95,7 @@ void RGWRestoreEntry::generate_test_instances(std::list& l) p->status = status; l.push_back(p); - p = new RGWRestoreEntry; + p = new RestoreEntry; days = days1; status = rgw::sal::RGWRestoreStatus::CloudRestored; p->bucket = bk; @@ -102,15 +105,15 @@ void RGWRestoreEntry::generate_test_instances(std::list& l) p->status = status; l.push_back(p); - l.push_back(new RGWRestoreEntry); + l.push_back(new RestoreEntry); } -int RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { +int Restore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { int ret = 0; cct = _cct; driver = _driver; - ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing RGWRestore handle" << dendl; + ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle" << dendl; /* max_objs indicates the number of shards or objects * used to store Restore Entries */ max_objs = cct->_conf->rgw_restore_max_objs; @@ -137,19 +140,19 @@ int RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) { ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": failed to initialize sal_restore" << dendl; } - ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing RGWRestore handle completed" << dendl; + ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle completed" << dendl; return ret; } -void RGWRestore::finalize() +void Restore::finalize() { sal_restore.reset(nullptr); obj_names.clear(); - ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize RGWRestore handle" << dendl; + ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize Restore handle" << dendl; } -static inline std::ostream& operator<<(std::ostream &os, RGWRestoreEntry& ent) { +static inline std::ostream& operator<<(std::ostream &os, RestoreEntry& ent) { os << " */ -int RGWRestore::choose_oid(const RGWRestoreEntry& e) { +int Restore::choose_oid(const RestoreEntry& e) { int index; const auto& name = e.bucket.name + e.obj_key.name + e.obj_key.instance; index = ((ceph_str_hash_linux(name.data(), name.size())) % max_objs); return static_cast(index); } -void *RGWRestore::RestoreWorker::entry() { +void *Restore::RestoreWorker::entry() { do { - utime_t start = ceph_clock_now(); + ceph_timespec start = ceph::real_clock::to_ceph_timespec(real_clock::now()); int r = 0; r = restore->process(this, null_yield); if (r < 0) { @@ -220,14 +223,14 @@ void *RGWRestore::RestoreWorker::entry() { } if (restore->going_down()) break; - utime_t end = ceph_clock_now(); - end -= start; + ceph_timespec end = ceph::real_clock::to_ceph_timespec(real_clock::now()); + auto d = end - start; + end = d; int secs = cct->_conf->rgw_restore_processor_period; - if (secs <= end.sec()) + if (secs < d) continue; // next round - secs -= end.sec(); std::unique_lock locker{lock}; cond.wait_for(locker, std::chrono::seconds(secs)); } while (!restore->going_down()); @@ -236,7 +239,7 @@ void *RGWRestore::RestoreWorker::entry() { } -int RGWRestore::process(RestoreWorker* worker, optional_yield y) +int Restore::process(RestoreWorker* worker, optional_yield y) { int max_secs = cct->_conf->rgw_restore_lock_max_time; @@ -250,6 +253,19 @@ int RGWRestore::process(RestoreWorker* worker, optional_yield y) return 0; } +// unique_lock expects an unlock() taking no arguments, but +// RadosRestoreSerializer::unlock() requires two. create an adapter that binds these +// additional args +struct RestoreLockAdapter { + rgw::sal::RestoreSerializer& serializer; + const DoutPrefixProvider* dpp = nullptr; + optional_yield y; + + void unlock() { + serializer.unlock(dpp, y); + } +}; + /* * Given an index, fetch a list of restore entries to process. After each * iteration, trim the list to the last marker read. @@ -257,13 +273,13 @@ int RGWRestore::process(RestoreWorker* worker, optional_yield y) * While processing the entries, if any of their restore operation is still in * progress, such entries are added back to the list. */ -int RGWRestore::process(int index, int max_secs, optional_yield y) +int Restore::process(int index, int max_secs, optional_yield y) { ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": process entered index=" << index << ", max_secs=" << max_secs << dendl; /* list used to gather still IN_PROGRESS */ - std::list r_entries; + std::vector r_entries; std::unique_ptr serializer = sal_restore->get_serializer(std::string(restore_index_lock_name), @@ -281,7 +297,7 @@ int RGWRestore::process(int index, int max_secs, optional_yield y) end += max_secs; utime_t time(max_secs, 0); - int ret = serializer->try_lock(this, time, null_yield); + int ret = serializer->try_lock(this, time, y); if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */ ldpp_dout(this, 0) << __PRETTY_FUNCTION__ << ": failed to acquire lock on " @@ -291,14 +307,15 @@ int RGWRestore::process(int index, int max_secs, optional_yield y) if (ret < 0) return 0; - std::unique_lock lock(*(serializer.get()), std::adopt_lock); + auto lock_adapter = RestoreLockAdapter{*serializer, this, y}; + std::unique_lock lock(lock_adapter, std::adopt_lock); std::string marker; std::string next_marker; bool truncated = false; do { int max = 100; - std::vector entries; + std::vector entries; ret = sal_restore->list(this, y, index, marker, &next_marker, max, entries, &truncated); ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << @@ -307,18 +324,18 @@ int RGWRestore::process(int index, int max_secs, optional_yield y) ", marker='" << marker << "'" << ", next_marker='" << next_marker << "'" << dendl; + if (ret < 0) + goto done; + if (entries.size() == 0) { lock.unlock(); return 0; } - if (ret < 0) - goto done; - marker = next_marker; - std::vector::iterator iter; + std::vector::iterator iter; for (iter = entries.begin(); iter != entries.end(); ++iter) { - RGWRestoreEntry entry = *iter; + RestoreEntry entry = *iter; ret = process_restore_entry(entry, y); @@ -329,6 +346,9 @@ int RGWRestore::process(int index, int max_secs, optional_yield y) << obj_names[index] << dendl; } + if (ret < 0) + goto done; + ///process all entries, trim and re-add utime_t now = ceph_clock_now(); if (now >= end) { @@ -341,32 +361,33 @@ int RGWRestore::process(int index, int max_secs, optional_yield y) goto done; } } - ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": trimming till marker: '" << marker + } while (truncated); + + ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": trimming till marker: '" << marker << "' on shard:" << obj_names[index] << dendl; - ret = sal_restore->trim_entries(this, y, index, marker); - if (ret < 0) { - ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to trim entries on " << obj_names[index] << dendl; - } + ret = sal_restore->trim_entries(this, y, index, marker); + if (ret < 0) { + ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to trim entries on " << obj_names[index] << dendl; + } - if (!r_entries.empty()) { - ret = sal_restore->add_entries(this, y, index, r_entries); - if (ret < 0) { - ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to add entries on " + if (!r_entries.empty()) { + ret = sal_restore->add_entries(this, y, index, r_entries); + if (ret < 0) { + ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to add entries on " << obj_names[index] << dendl; - } } + } - r_entries.clear(); - } while (truncated); + r_entries.clear(); done: lock.unlock(); - return 0; + return ret; } -int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y) +int Restore::process_restore_entry(RestoreEntry& entry, optional_yield y) { int ret = 0; bool in_progress = true; @@ -375,7 +396,6 @@ int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y) std::unique_ptr tier; std::optional days = entry.days; rgw::sal::RGWRestoreStatus restore_status = rgw::sal::RGWRestoreStatus::None; - RGWObjState* obj_state{nullptr}; rgw_placement_rule target_placement; // Ensure its the same source zone processing temp entries as we do not @@ -415,6 +435,7 @@ int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y) if (attr_iter != attrs.end()) { bufferlist bl = attr_iter->second; auto iter = bl.cbegin(); + using ceph::decode; decode(restore_status, iter); } if (restore_status != rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) { @@ -450,7 +471,6 @@ int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y) // now go ahead with restoring object // XXX: first check if its already restored? - bool in_progress = true; ret = obj->restore_obj_from_cloud(bucket.get(), tier.get(), cct, days, in_progress, this, y); if (ret < 0) { @@ -478,7 +498,7 @@ done: return ret; } -time_t RGWRestore::thread_stop_at() +time_t Restore::thread_stop_at() { uint64_t interval = (cct->_conf->rgw_restore_debug_interval > 0) ? cct->_conf->rgw_restore_debug_interval : secs_in_a_day; @@ -486,7 +506,7 @@ time_t RGWRestore::thread_stop_at() return time(nullptr) + interval; } -int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp, +int Restore::set_cloud_restore_status(const DoutPrefixProvider* dpp, rgw::sal::Object* pobj, optional_yield y, const rgw::sal::RGWRestoreStatus& restore_status) { @@ -495,7 +515,7 @@ int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp, if (!pobj) return ret; - pobj->set_atomic(); + pobj->set_atomic(true); bufferlist bl; using ceph::encode; @@ -506,7 +526,7 @@ int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp, return ret; } -int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, +int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, rgw::sal::Object* pobj, rgw::sal::PlacementTier* tier, std::optional days, @@ -544,19 +564,21 @@ int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, if (in_progress) { // add restore entry to the list - RGWRestoreEntry entry; + RestoreEntry entry; entry.bucket = pbucket->get_key(); entry.obj_key = pobj->get_key(); entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress; entry.days = days; entry.zone_id = driver->get_zone()->get_id(); - ldpp_dout(this, 10) << "RGWRestore:: Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << dendl; + ldpp_dout(this, 10) << "Restore:: Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << dendl; int index = choose_oid(entry); ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << ", to shard:" << obj_names[index] << dendl; - ret = sal_restore->add_entry(this, y, index, entry); + std::vector r_entries; + r_entries.push_back(entry); + ret = sal_restore->add_entries(this, y, index, r_entries); if (ret < 0) { ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: Adding restore entry of object(" << pobj->get_key() << ") failed" << ret << dendl; @@ -573,3 +595,5 @@ int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket, ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << (in_progress ? " is in progress" : " succeeded") << dendl; return ret; } + +} // namespace rgw::restore diff --git a/src/rgw/rgw_restore.h b/src/rgw/rgw_restore.h index b37c80c25c0a5..a5e72df588263 100644 --- a/src/rgw/rgw_restore.h +++ b/src/rgw/rgw_restore.h @@ -28,15 +28,17 @@ static constexpr std::string_view restore_oid_prefix = "restore"; static constexpr std::string_view restore_index_lock_name = "restore_process"; +namespace rgw::restore { + /** Single Restore entry state */ -struct RGWRestoreEntry { +struct RestoreEntry { rgw_bucket bucket; rgw_obj_key obj_key; std::optional days; std::string zone_id; // or should it be zone name? rgw::sal::RGWRestoreStatus status; - RGWRestoreEntry() {} + RestoreEntry() {} void encode(ceph::buffer::list& bl) const { ENCODE_START(1, 1, bl); @@ -59,11 +61,11 @@ struct RGWRestoreEntry { } void dump(ceph::Formatter* f) const; void decode_json(JSONObj* obj); - static void generate_test_instances(std::list& l); + static void generate_test_instances(std::list& l); }; -WRITE_CLASS_ENCODER(RGWRestoreEntry) +WRITE_CLASS_ENCODER(RestoreEntry) -class RGWRestore : public DoutPrefixProvider { +class Restore : public DoutPrefixProvider { CephContext *cct; rgw::sal::Driver* driver; std::unique_ptr sal_restore; @@ -75,7 +77,7 @@ class RGWRestore : public DoutPrefixProvider { { const DoutPrefixProvider *dpp; CephContext *cct; - RGWRestore *restore; + rgw::restore::Restore *restore; ceph::mutex lock = ceph::make_mutex("RestoreWorker"); ceph::condition_variable cond; @@ -84,8 +86,8 @@ class RGWRestore : public DoutPrefixProvider { using lock_guard = std::lock_guard; using unique_lock = std::unique_lock; - RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWRestore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {} - RGWRestore* get_restore() { return restore; } + RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, rgw::restore::Restore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {} + rgw::restore::Restore* get_restore() { return restore; } std::string thr_name() { return std::string{"restore_thrd: "}; // + std::to_string(ix); } @@ -95,17 +97,17 @@ class RGWRestore : public DoutPrefixProvider { friend class RGWRados; }; // RestoreWorker - std::unique_ptr worker; + std::unique_ptr worker; public: - ~RGWRestore() { + ~Restore() { stop_processor(); finalize(); } friend class RGWRados; - RGWRestore() : cct(nullptr), driver(nullptr), max_objs(0) {} + Restore() : cct(nullptr), driver(nullptr), max_objs(0) {} int initialize(CephContext *_cct, rgw::sal::Driver* _driver); void finalize(); @@ -121,9 +123,9 @@ public: std::ostream& gen_prefix(std::ostream& out) const; int process(RestoreWorker* worker, optional_yield y); - int choose_oid(const RGWRestoreEntry& e); + int choose_oid(const rgw::restore::RestoreEntry& e); int process(int index, int max_secs, optional_yield y); - int process_restore_entry(RGWRestoreEntry& entry, optional_yield y); + int process_restore_entry(rgw::restore::RestoreEntry& entry, optional_yield y); time_t thread_stop_at(); /** Set the restore status for the given object */ @@ -140,3 +142,5 @@ public: const DoutPrefixProvider* dpp, optional_yield y); }; + +} // namespace rgw::restore diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 36858228fe9a3..3f9fc829a3e9b 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -36,7 +36,6 @@ struct RGWBucketEnt; class RGWRESTMgr; class RGWLC; -class RGWRestore; struct rgw_user_bucket; class RGWUsageBatch; class RGWCoroutinesManagerRegistry; @@ -52,7 +51,6 @@ class RGWZonePlacementInfo; struct rgw_pubsub_topic; struct RGWOIDCProviderInfo; struct RGWRoleInfo; -struct RGWRestoreEntry; using RGWBucketListNameFilter = std::function; @@ -62,6 +60,11 @@ namespace rgw { namespace IAM { struct Policy; } } +namespace rgw::restore { + class Restore; + struct RestoreEntry; +} + class RGWGetDataCB { public: virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0; @@ -568,7 +571,7 @@ class Driver { /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the tier restore management thread */ - virtual RGWRestore* get_rgwrestore(void) = 0; + virtual rgw::restore::Restore* get_rgwrestore(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0; @@ -1692,24 +1695,19 @@ public: virtual ~Restore() = default; virtual int initialize(const DoutPrefixProvider* dpp, optional_yield y, int n_objs, std::vector& obj_names) = 0; - /** Add a single restore entry state */ - virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, - int index, const RGWRestoreEntry& r_entry) = 0; /** Add list of restore entries */ virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, - int index, const std::list& restore_entries) = 0; + int index, const std::vector& restore_entries) = 0; /** List all known entries given a marker */ virtual int list(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string& marker, std::string* out_marker, - uint32_t max_entries, std::vector& entries, + uint32_t max_entries, std::vector& entries, bool* truncated) = 0; /** Trim restore entries upto the marker */ virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string_view& marker) = 0; - /* Check if the list is empty */ - virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0; /** Get a serializer for restore processing */ virtual std::unique_ptr get_serializer( diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 3c7a3cc324f56..68a94742ad3a5 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -926,7 +926,7 @@ public: const std::string& topic_queue) override; virtual RGWLC* get_rgwlc(void) override; - virtual RGWRestore* get_rgwrestore(void) override { return NULL; } + virtual rgw::restore::Restore* get_rgwrestore(void) override { return NULL; } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } virtual int log_usage(const DoutPrefixProvider *dpp, std::map& usage_info, optional_yield y) override; virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override; diff --git a/src/rgw/rgw_sal_filter.cc b/src/rgw/rgw_sal_filter.cc index 09c38e45d9a72..62421bb983272 100644 --- a/src/rgw/rgw_sal_filter.cc +++ b/src/rgw/rgw_sal_filter.cc @@ -493,7 +493,7 @@ RGWLC* FilterDriver::get_rgwlc() return next->get_rgwlc(); } -RGWRestore* FilterDriver::get_rgwrestore() +rgw::restore::Restore* FilterDriver::get_rgwrestore() { return next->get_rgwrestore(); } @@ -1475,21 +1475,16 @@ int FilterRestore::initialize(const DoutPrefixProvider* dpp, optional_yield y, return next->initialize(dpp, y, n_objs, obj_names); } -int FilterRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y, - int index, const RGWRestoreEntry& r_entry) { - return next->add_entry(dpp, y, index, r_entry); -} - int FilterRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y, int index, - const std::list& restore_entries) { + const std::vector& restore_entries) { return next->add_entries(dpp, y, index, restore_entries); } /** List all known entries */ int FilterRestore::list(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string& marker, std::string* out_marker, - uint32_t max_entries, std::vector& entries, + uint32_t max_entries, std::vector& entries, bool* truncated) { return next->list(dpp, y, index, marker, out_marker, max_entries, entries, truncated); @@ -1500,9 +1495,6 @@ int FilterRestore::trim_entries(const DoutPrefixProvider *dpp, optional_yield y, return next->trim_entries(dpp, y, index, marker); } -int FilterRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) { - return next->is_empty(dpp, y); -} int FilterNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags) diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 55568fd6df84b..526f7ce2badee 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -384,7 +384,7 @@ public: return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp); } virtual RGWLC* get_rgwlc(void) override; - virtual RGWRestore* get_rgwrestore(void) override; + virtual rgw::restore::Restore* get_rgwrestore(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override; virtual int log_usage(const DoutPrefixProvider *dpp, std::map _next) : next(std::move(_next)) {} virtual ~FilterRestoreSerializer() = default; virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override; - virtual int unlock() override { return next->unlock(); } + virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override + { return next->unlock(dpp, y); } virtual void print(std::ostream& out) const override { return next->print(out); } }; @@ -1081,21 +1082,18 @@ public: virtual int initialize(const DoutPrefixProvider* dpp, optional_yield y, int n_objs, std::vector& obj_names) override; - virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y, - int index, const RGWRestoreEntry& r_entry) override; virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y, int index, - const std::list& restore_entries) override; + const std::vector& restore_entries) override; /** List all known entries */ virtual int list(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string& marker, std::string* out_marker, - uint32_t max_entries, std::vector& entries, + uint32_t max_entries, std::vector& entries, bool* truncated) override; virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y, int index, const std::string_view& marker) override; - virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y); /** Get a serializer for lifecycle */ virtual std::unique_ptr get_serializer(