Use new neorados/FIFO routines to store restore state.
Note: Old librados ioctx is also still retained as it is needed
by RestoreRadosSerializer.
Signed-off-by: Soumya Koduri <skoduri@redhat.com>
#include "common/Throttle.h"
#include "common/BackTrace.h"
#include "common/ceph_time.h"
+#include "common/async/blocked_completion.h"
#include "rgw_asio_thread.h"
#include "rgw_cksum.h"
if (use_restore_thread) {
restore->stop_processor();
}
- delete restore;
restore = NULL;
}
if (ret < 0)
return ret;
+ ret = open_restore_pool_neo_ctx(dpp);
+ if (ret < 0)
+ return ret;
+
ret = open_objexp_pool_ctx(dpp);
if (ret < 0)
return ret;
if (use_lc_thread)
lc->start_processor();
- restore = new RGWRestore();
+ restore = make_unique<rgw::restore::Restore>();
ret = restore->initialize(cct, this->driver);
if (ret < 0) {
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().restore_pool, restore_pool_ctx, true, true);
}
+int RGWRados::open_restore_pool_neo_ctx(const DoutPrefixProvider *dpp)
+{
+ maybe_warn_about_blocking(dpp);
+ try {
+ restore_pool_neo_ctx = rgw::init_iocontext(dpp, driver->get_neorados(),
+ svc.zone->get_zone_params().restore_pool,
+ rgw::create, ceph::async::use_blocked);
+
+ } catch (const boost::system::system_error& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Failed to initialized ioctx: "
+ << e.what()
+ << ", for restore pool" << dendl;
+ return ceph::from_error_code(e.code());
+ }
+ return 0;
+}
+
+
int RGWRados::open_objexp_pool_ctx(const DoutPrefixProvider *dpp)
{
return rgw_init_ioctx(dpp, get_rados_handle(), svc.zone->get_zone_params().log_pool, objexp_pool_ctx, true, true);
#include "rgw_sal_fwd.h"
#include "rgw_pubsub.h"
#include "rgw_tools.h"
+#include "rgw_restore.h"
struct D3nDataCache;
struct RGWLCCloudTierCtx;
int open_gc_pool_ctx(const DoutPrefixProvider *dpp);
int open_lc_pool_ctx(const DoutPrefixProvider *dpp);
int open_restore_pool_ctx(const DoutPrefixProvider *dpp);
+ int open_restore_pool_neo_ctx(const DoutPrefixProvider *dpp);
int open_objexp_pool_ctx(const DoutPrefixProvider *dpp);
int open_reshard_pool_ctx(const DoutPrefixProvider *dpp);
int open_notif_pool_ctx(const DoutPrefixProvider *dpp);
rgw::sal::RadosStore* driver{nullptr};
RGWGC* gc{nullptr};
RGWLC* lc{nullptr};
- RGWRestore* restore{nullptr};
+ std::unique_ptr<rgw::restore::Restore> restore{nullptr};
RGWObjectExpirer* obj_expirer{nullptr};
bool use_gc_thread{false};
bool use_lc_thread{false};
librados::IoCtx gc_pool_ctx; // .rgw.gc
librados::IoCtx lc_pool_ctx; // .rgw.lc
librados::IoCtx restore_pool_ctx; // .rgw.restore
+ neorados::IOContext restore_pool_neo_ctx; // .rgw.restore
librados::IoCtx objexp_pool_ctx;
librados::IoCtx reshard_pool_ctx;
librados::IoCtx notif_pool_ctx; // .rgw.notif
return gc;
}
- RGWRestore *get_restore() {
- return restore;
+ rgw::restore::Restore *get_restore() {
+ return restore.get();
}
RGWRados& set_run_gc_thread(bool _use_gc_thread) {
return &restore_pool_ctx;
}
+ neorados::IOContext* get_restore_pool_neo_ctx() {
+ return &restore_pool_neo_ctx;
+ }
+
librados::IoCtx& get_notif_pool_ctx() {
return notif_pool_ctx;
}
#include <fmt/core.h>
#include "common/async/blocked_completion.h"
+#include "neorados/cls/fifo.h"
#include "common/ceph_time.h"
#include "common/Clock.h"
#include "common/errno.h"
+#include "common/async/blocked_completion.h"
#include "librados/AioCompletionImpl.h"
+#include "rgw_asio_thread.h"
#include "cls/rgw/cls_rgw_client.h"
int RadosObject::restore_obj_from_cloud(Bucket* bucket,
rgw::sal::PlacementTier* tier,
CephContext* cct,
- RGWObjTier& tier_config,
- uint64_t olh_epoch,
std::optional<uint64_t> days,
bool& in_progress,
const DoutPrefixProvider* dpp,
using ceph::decode;
decode(m, attr_iter->second);
m.get_tier_config(&tier_config);
- } catch (const buffer::end_of_buffer&) {
- //empty manifest; it's not cloud-tiered
- ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
- return -EIO;
} catch (const std::exception& e) {
ldpp_dout(dpp, -1) << "Error reading manifest of object:" << get_key() << dendl;
return -EIO;
rgw_bucket_dir_entry ent;
ent.key.name = get_key().name;
ent.key.instance = get_key().instance;
- ent.meta.accounted_size = ent.meta.size = get_obj_size();
+ ent.meta.accounted_size = ent.meta.size = get_size();
ent.meta.etag = "" ;
if (!ent.key.instance.empty()) { // non-current versioned object
return ret;
}
- ldpp_dout(dpp, 20) << "Sucessfully restored object(" << o.key << ") from the cloud endpoint(" << endpoint << ")" << dendl;
+ ldpp_dout(dpp, 20) << "Sucessfully restored object(" << get_key() << ") from the cloud endpoint(" << endpoint << ")" << dendl;
return ret;
}
return lock.lock_exclusive((librados::IoCtx*)(&ioctx), oid);
}
+int RadosRestoreSerializer::unlock(const DoutPrefixProvider *dpp, optional_yield y)
+{
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ lock.unlock(&op);
+ return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y);
+}
+
+RadosRestore::RadosRestore(RadosStore* _st) : store(_st),
+ ioctx(*store->getRados()->get_restore_pool_ctx()),
+ r(store->get_neorados()),
+ neo_ioctx(*store->getRados()->get_restore_pool_neo_ctx()) {}
+
std::unique_ptr<RestoreSerializer> RadosRestore::get_serializer(
const std::string& lock_name,
const std::string& oid,
int RadosRestore::initialize(const DoutPrefixProvider* dpp, optional_yield y,
int n_objs, std::vector<std::string>& o_names)
{
- int ret = 0;
num_objs = n_objs;
obj_names = o_names;
+ maybe_warn_about_blocking(dpp);
for (auto i=0; i < num_objs; i++) {
- std::unique_ptr<rgw::cls::fifo::FIFO> fifo_tmp;
- ret = rgw::cls::fifo::FIFO::create(dpp, ioctx, obj_names[i], &fifo_tmp, y);
-
- ldpp_dout(dpp, 20) << "creating fifo object for index=" << i
- << ", objname=" << obj_names[i] <<
- " returned ret=" << ret << dendl;
+ std::unique_ptr<fifo::FIFO> fifo_tmp;
+ try {
+ fifo_tmp = fifo::FIFO::create(dpp, r, obj_names[i], neo_ioctx, ceph::async::use_blocked);
+ } catch (const sys::system_error& e) {
+ ldpp_dout(dpp, -1) << "creating fifo object for index=" << i
+ << ", objname=" << obj_names[i] << " failed : " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
+ }
- if (ret) {
- return ret;
+ if (!fifo_tmp) {
+ return -ENOMEM;
}
+ ldpp_dout(dpp, 20) << "created fifo object for index=" << i
+ << ", objname=" << obj_names[i] << dendl;
fifos.push_back(std::move(fifo_tmp));
}
- return ret;
+ return 0;
}
void RadosRestore::finalize() {
fifos.clear();
}
-int RadosRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const RGWRestoreEntry& entry) {
- bufferlist bl;
-
- encode(entry, bl);
-
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
- << "Adding entry:(" << entry.bucket << "," << entry.obj_key << ") to FIFO:" << obj_names[index] << dendl;
-
- auto ret = push(dpp, y, index, std::move(bl));
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: push() returned " << ret << dendl;
- return ret;
- }
-
- return 0;
-}
-
int RadosRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const std::list<RGWRestoreEntry>& restore_entries) {
- std::vector<ceph::buffer::list> ent_list;
+ int index, const std::vector<rgw::restore::RestoreEntry>& restore_entries) {
+ std::deque<ceph::buffer::list> ent_list;
for (auto& entry : restore_entries) {
bufferlist bl;
}
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
- << "Adding " << restore_entries.size() << " entries to FIFO:" << obj_names[index] << dendl;
+ << "Adding " << restore_entries.size() << " entries to FIFO:"
+ << obj_names[index] << dendl;
int ret = push(dpp, y, index, std::move(ent_list));
}
int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
- int index, std::vector<ceph::buffer::list>&& items) {
+ int index, std::deque<ceph::buffer::list>&& items) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
<< "Pushing entries to FIFO:" << obj_names[index] << dendl;
-
- auto r = fifos[index]->push(dpp, items, y);
- if (r < 0) {
+ maybe_warn_about_blocking(dpp);
+ try {
+ fifos[index]->push(dpp, items, ceph::async::use_blocked);
+ } catch (const sys::system_error& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to push to FIFO: " << obj_names[index]
- << ": " << cpp_strerror(-r) << dendl;
- }
- return r;
+ << ": " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
+ }
+ return 0;
}
int RadosRestore::push(const DoutPrefixProvider *dpp, optional_yield y,
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
<< "Pushing entry to FIFO:" << obj_names[index] << dendl;
- auto r = fifos[index]->push(dpp, std::move(bl), y);
- if (r < 0) {
+ maybe_warn_about_blocking(dpp);
+ try {
+ fifos[index]->push(dpp, std::move(bl), ceph::async::use_blocked);
+ } catch (const sys::system_error& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to push to FIFO: " << obj_names[index]
- << ": " << cpp_strerror(-r) << dendl;
+ << ": " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
}
- return r;
+ return 0;
}
struct rgw_restore_fifo_entry {
std::string id;
ceph::real_time mtime;
- RGWRestoreEntry entry;
+ rgw::restore::RestoreEntry entry;
rgw_restore_fifo_entry() {}
void encode(ceph::buffer::list& bl) const {
int RadosRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string& marker, std::string* out_marker,
- uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ uint32_t max_entries, std::vector<rgw::restore::RestoreEntry>& entries,
bool* truncated)
{
- std::vector<rgw::cls::fifo::list_entry> restore_entries;
+ std::vector<fifo::entry> restore_entries{max_entries};
+ std::string omark = {};
bool more = false;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
<< "Listing entries from FIFO:" << obj_names[index] << dendl;
- auto r = fifos[index]->list(dpp, max_entries, marker, &restore_entries, &more, y);
- if (r < 0) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
- << ": unable to list FIFO: " << obj_names[index]
- << ": " << cpp_strerror(-r) << dendl;
- return r;
- }
-
- entries.clear();
+ maybe_warn_about_blocking(dpp);
+ try {
+ auto [lentries, lmark] = fifos[index]->list(dpp, marker,
+ restore_entries, ceph::async::use_blocked);
+ entries.clear();
- for (const auto& entry : restore_entries) {
+ for (const auto& entry : lentries) {
rgw_restore_fifo_entry r_entry;
r_entry.id = entry.marker;
r_entry.mtime = entry.mtime;
auto liter = entry.data.cbegin();
- try {
- decode(r_entry.entry, liter);
- } catch (const buffer::error& err) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
- << ": failed to decode restore entry: "
- << err.what() << dendl;
- return -EIO;
- }
- RGWRestoreEntry& e = r_entry.entry;
+ decode(r_entry.entry, liter);
+ rgw::restore::RestoreEntry& e = r_entry.entry;
entries.push_back(std::move(e));
+ omark = entry.marker;
+ }
+ if (!lmark.empty()) {
+ more = true;
+ }
+ } catch (const sys::system_error& e) {
+ if (e.code() == sys::errc::no_such_file_or_directory) {
+ } else {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": unable to list FIFO: " << obj_names[index]
+ << ": " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
+ }
}
- if (truncated)
+ if (truncated) {
*truncated = more;
+ }
- if (out_marker && !restore_entries.empty()) {
- *out_marker = restore_entries.back().marker;
+ if (out_marker) {
+ *out_marker = omark;
}
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
<< "Listing from FIFO:" << obj_names[index] << ", returned:"
- << restore_entries.size() << " entries, truncated:" << more
+ << restore_entries.size() << " entries, truncated:"
+ << (truncated ? *truncated : false)
<< ", out_marker:" << (out_marker ? *out_marker : "") << dendl;
return 0;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
<< "Trimming FIFO:" << obj_names[index] << " upto marker:" << marker << dendl;
- auto r = fifos[index]->trim(dpp, marker, false, y);
- if (r < 0) {
+ maybe_warn_about_blocking(dpp);
+ try {
+ fifos[index]->trim(dpp, std::string(marker), false, ceph::async::use_blocked);
+ } catch (const sys::system_error& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to trim FIFO: " << obj_names[index]
- << ": " << cpp_strerror(-r) << dendl;
+ << ": " << e.what() << dendl;
+ return ceph::from_error_code(e.code());
}
-
- return r;
-}
-
-std::string_view RadosRestore::max_marker() {
- static const std::string mm = rgw::cls::fifo::marker::max().to_string();
- return std::string_view(mm);
-}
-
-int RadosRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
- std::vector<rgw::cls::fifo::list_entry> restore_entries;
- bool more = false;
-
- for (auto shard = 0u; shard < fifos.size(); ++shard) {
- auto r = fifos[shard]->list(dpp, 1, {}, &restore_entries, &more, y);
- if (r < 0) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
- << ": unable to list FIFO: " << obj_names[shard]
- << ": " << cpp_strerror(-r) << dendl;
- return r;
- }
- if (!restore_entries.empty()) {
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__
- << "Entries found in FIFO:" << obj_names[shard] << dendl;
- return 0;
- }
- }
-
- return 1;
+ return 0;
}
int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags)
optional_yield y,
const DoutPrefixProvider* dpp) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
- virtual RGWRestore* get_rgwrestore(void) override { return rados->get_restore(); }
+ virtual rgw::restore::Restore* get_rgwrestore(void) override { return rados->get_restore(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
RadosRestoreSerializer(RadosStore* store, const std::string& oid, const std::string& lock_name, const std::string& cookie);
virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
- virtual int unlock() override {
- return lock.unlock(&ioctx, oid);
- }
+ virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override;
};
class RadosRestore : public StoreRestore {
RadosStore* store;
librados::IoCtx& ioctx;
+ neorados::RADOS& r;
+ neorados::IOContext neo_ioctx;
+ std::vector<std::unique_ptr<fifo::FIFO>> fifos;
int num_objs;
std::vector<std::string> obj_names;
- std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
public:
- RadosRestore(RadosStore* _st) : store(_st),
- ioctx(*store->getRados()->get_restore_pool_ctx()) {}
-
+ RadosRestore(RadosStore* _st) ;
~RadosRestore() override {
finalize();
}
int n_objs, std::vector<std::string>& obj_names) override;
void finalize();
- virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const RGWRestoreEntry& r_entry) override;
virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const std::list<RGWRestoreEntry>& restore_entries) override;
+ int index, const std::vector<rgw::restore::RestoreEntry>& restore_entries) override;
virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
int index,
const std::string& marker, std::string* out_marker,
- uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ uint32_t max_entries, std::vector<rgw::restore::RestoreEntry>& entries,
bool* truncated) override;
virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string_view& marker) override;
const std::string& oid,
const std::string& cookie) override;
/** Below routines deal with actual FIFO */
- int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
- std::string_view max_marker();
int trim(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string_view& marker);
int push(const DoutPrefixProvider *dpp, optional_yield y,
int index, ceph::buffer::list&& bl);
int push(const DoutPrefixProvider *dpp, optional_yield y,
- int index, std::vector<ceph::buffer::list>&& items);
+ int index, std::deque<ceph::buffer::list>&& items);
};
class RadosNotification : public StoreNotification {
}
rgw::sal::Attrs attrs;
attrs = s->object->get_attrs();
- op_ret = handle_cloudtier_obj(s, this, driver, attrs, false, expiry_days, true, y);
+ op_ret = handle_cloudtier_obj(s, this, driver, attrs, false, expiry_days, false, y);
restore_ret = op_ret;
ldpp_dout(this, 20) << "Restore completed of object: " << *s->object << "with op ret: " << restore_ret <<dendl;
#include "services/svc_sys_obj.h"
#include "services/svc_zone.h"
#include "services/svc_tier_rados.h"
+#include "common/ceph_time.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw_restore
using namespace std;
using namespace rgw::sal;
-void RGWRestoreEntry::dump(Formatter *f) const
+namespace rgw::restore {
+
+void RestoreEntry::dump(Formatter *f) const
{
encode_json("Bucket", bucket, f);
encode_json("Object", obj_key, f);
encode_json("Status", static_cast<int>(status), f);
}
-void RGWRestoreEntry::decode_json(JSONObj *obj)
+void RestoreEntry::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("Bucket", bucket, obj);
JSONDecoder::decode_json("Object", obj_key, obj);
status = static_cast<rgw::sal::RGWRestoreStatus>(st);
}
-void RGWRestoreEntry::generate_test_instances(std::list<RGWRestoreEntry*>& l)
+void RestoreEntry::generate_test_instances(std::list<RestoreEntry*>& l)
{
- auto p = new RGWRestoreEntry;
+ auto p = new RestoreEntry;
rgw_bucket bk("tenant1", "bucket1");
rgw_obj_key obj("object1");
rgw_obj_key obj2("object2");
p->status = status;
l.push_back(p);
- p = new RGWRestoreEntry;
+ p = new RestoreEntry;
days = days1;
status = rgw::sal::RGWRestoreStatus::CloudRestored;
p->bucket = bk;
p->status = status;
l.push_back(p);
- l.push_back(new RGWRestoreEntry);
+ l.push_back(new RestoreEntry);
}
-int RGWRestore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
+int Restore::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
int ret = 0;
cct = _cct;
driver = _driver;
- ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing RGWRestore handle" << dendl;
+ ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle" << dendl;
/* max_objs indicates the number of shards or objects
* used to store Restore Entries */
max_objs = cct->_conf->rgw_restore_max_objs;
ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": failed to initialize sal_restore" << dendl;
}
- ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing RGWRestore handle completed" << dendl;
+ ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": initializing Restore handle completed" << dendl;
return ret;
}
-void RGWRestore::finalize()
+void Restore::finalize()
{
sal_restore.reset(nullptr);
obj_names.clear();
- ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize RGWRestore handle" << dendl;
+ ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": finalize Restore handle" << dendl;
}
-static inline std::ostream& operator<<(std::ostream &os, RGWRestoreEntry& ent) {
+static inline std::ostream& operator<<(std::ostream &os, RestoreEntry& ent) {
os << "<ent: bucket=";
os << ent.bucket;
os << "; obj_key=";
return os;
}
-void RGWRestore::RestoreWorker::stop()
+void Restore::RestoreWorker::stop()
{
std::lock_guard l{lock};
cond.notify_all();
}
-bool RGWRestore::going_down()
+bool Restore::going_down()
{
return down_flag;
}
-void RGWRestore::start_processor()
+void Restore::start_processor()
{
- worker = std::make_unique<RGWRestore::RestoreWorker>(this, cct, this);
+ worker = std::make_unique<Restore::RestoreWorker>(this, cct, this);
worker->create("rgw_restore");
}
-void RGWRestore::stop_processor()
+void Restore::stop_processor()
{
down_flag = true;
if (worker) {
worker.reset(nullptr);
}
-unsigned RGWRestore::get_subsys() const
+unsigned Restore::get_subsys() const
{
return dout_subsys;
}
-std::ostream& RGWRestore::gen_prefix(std::ostream& out) const
+std::ostream& Restore::gen_prefix(std::ostream& out) const
{
return out << "restore: ";
}
/* Hash based on both <bucket, obj> */
-int RGWRestore::choose_oid(const RGWRestoreEntry& e) {
+int Restore::choose_oid(const RestoreEntry& e) {
int index;
const auto& name = e.bucket.name + e.obj_key.name + e.obj_key.instance;
index = ((ceph_str_hash_linux(name.data(), name.size())) % max_objs);
return static_cast<int>(index);
}
-void *RGWRestore::RestoreWorker::entry() {
+void *Restore::RestoreWorker::entry() {
do {
- utime_t start = ceph_clock_now();
+ ceph_timespec start = ceph::real_clock::to_ceph_timespec(real_clock::now());
int r = 0;
r = restore->process(this, null_yield);
if (r < 0) {
}
if (restore->going_down())
break;
- utime_t end = ceph_clock_now();
- end -= start;
+ ceph_timespec end = ceph::real_clock::to_ceph_timespec(real_clock::now());
+ auto d = end - start;
+ end = d;
int secs = cct->_conf->rgw_restore_processor_period;
- if (secs <= end.sec())
+ if (secs < d)
continue; // next round
- secs -= end.sec();
std::unique_lock locker{lock};
cond.wait_for(locker, std::chrono::seconds(secs));
} while (!restore->going_down());
}
-int RGWRestore::process(RestoreWorker* worker, optional_yield y)
+int Restore::process(RestoreWorker* worker, optional_yield y)
{
int max_secs = cct->_conf->rgw_restore_lock_max_time;
return 0;
}
+// unique_lock expects an unlock() taking no arguments, but
+// RadosRestoreSerializer::unlock() requires two. create an adapter that binds these
+// additional args
+struct RestoreLockAdapter {
+ rgw::sal::RestoreSerializer& serializer;
+ const DoutPrefixProvider* dpp = nullptr;
+ optional_yield y;
+
+ void unlock() {
+ serializer.unlock(dpp, y);
+ }
+};
+
/*
* Given an index, fetch a list of restore entries to process. After each
* iteration, trim the list to the last marker read.
* While processing the entries, if any of their restore operation is still in
* progress, such entries are added back to the list.
*/
-int RGWRestore::process(int index, int max_secs, optional_yield y)
+int Restore::process(int index, int max_secs, optional_yield y)
{
ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": process entered index="
<< index << ", max_secs=" << max_secs << dendl;
/* list used to gather still IN_PROGRESS */
- std::list<RGWRestoreEntry> r_entries;
+ std::vector<RestoreEntry> r_entries;
std::unique_ptr<rgw::sal::RestoreSerializer> serializer =
sal_restore->get_serializer(std::string(restore_index_lock_name),
end += max_secs;
utime_t time(max_secs, 0);
- int ret = serializer->try_lock(this, time, null_yield);
+ int ret = serializer->try_lock(this, time, y);
if (ret == -EBUSY || ret == -EEXIST) {
/* already locked by another lc processor */
ldpp_dout(this, 0) << __PRETTY_FUNCTION__ << ": failed to acquire lock on "
if (ret < 0)
return 0;
- std::unique_lock<rgw::sal::RestoreSerializer> lock(*(serializer.get()), std::adopt_lock);
+ auto lock_adapter = RestoreLockAdapter{*serializer, this, y};
+ std::unique_lock<RestoreLockAdapter> lock(lock_adapter, std::adopt_lock);
std::string marker;
std::string next_marker;
bool truncated = false;
do {
int max = 100;
- std::vector<RGWRestoreEntry> entries;
+ std::vector<RestoreEntry> entries;
ret = sal_restore->list(this, y, index, marker, &next_marker, max, entries, &truncated);
ldpp_dout(this, 20) << __PRETTY_FUNCTION__ <<
", marker='" << marker << "'" <<
", next_marker='" << next_marker << "'" << dendl;
+ if (ret < 0)
+ goto done;
+
if (entries.size() == 0) {
lock.unlock();
return 0;
}
- if (ret < 0)
- goto done;
-
marker = next_marker;
- std::vector<RGWRestoreEntry>::iterator iter;
+ std::vector<RestoreEntry>::iterator iter;
for (iter = entries.begin(); iter != entries.end(); ++iter) {
- RGWRestoreEntry entry = *iter;
+ RestoreEntry entry = *iter;
ret = process_restore_entry(entry, y);
<< obj_names[index] << dendl;
}
+ if (ret < 0)
+ goto done;
+
///process all entries, trim and re-add
utime_t now = ceph_clock_now();
if (now >= end) {
goto done;
}
}
- ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": trimming till marker: '" << marker
+ } while (truncated);
+
+ ldpp_dout(this, 20) << __PRETTY_FUNCTION__ << ": trimming till marker: '" << marker
<< "' on shard:"
<< obj_names[index] << dendl;
- ret = sal_restore->trim_entries(this, y, index, marker);
- if (ret < 0) {
- ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to trim entries on " << obj_names[index] << dendl;
- }
+ ret = sal_restore->trim_entries(this, y, index, marker);
+ if (ret < 0) {
+ ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to trim entries on " << obj_names[index] << dendl;
+ }
- if (!r_entries.empty()) {
- ret = sal_restore->add_entries(this, y, index, r_entries);
- if (ret < 0) {
- ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to add entries on "
+ if (!r_entries.empty()) {
+ ret = sal_restore->add_entries(this, y, index, r_entries);
+ if (ret < 0) {
+ ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: failed to add entries on "
<< obj_names[index] << dendl;
- }
}
+ }
- r_entries.clear();
- } while (truncated);
+ r_entries.clear();
done:
lock.unlock();
- return 0;
+ return ret;
}
-int RGWRestore::process_restore_entry(RGWRestoreEntry& entry, optional_yield y)
+int Restore::process_restore_entry(RestoreEntry& entry, optional_yield y)
{
int ret = 0;
bool in_progress = true;
std::unique_ptr<rgw::sal::PlacementTier> tier;
std::optional<uint64_t> days = entry.days;
rgw::sal::RGWRestoreStatus restore_status = rgw::sal::RGWRestoreStatus::None;
- RGWObjState* obj_state{nullptr};
rgw_placement_rule target_placement;
// Ensure its the same source zone processing temp entries as we do not
if (attr_iter != attrs.end()) {
bufferlist bl = attr_iter->second;
auto iter = bl.cbegin();
+ using ceph::decode;
decode(restore_status, iter);
}
if (restore_status != rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress) {
// now go ahead with restoring object
// XXX: first check if its already restored?
- bool in_progress = true;
ret = obj->restore_obj_from_cloud(bucket.get(), tier.get(), cct, days, in_progress,
this, y);
if (ret < 0) {
return ret;
}
-time_t RGWRestore::thread_stop_at()
+time_t Restore::thread_stop_at()
{
uint64_t interval = (cct->_conf->rgw_restore_debug_interval > 0)
? cct->_conf->rgw_restore_debug_interval : secs_in_a_day;
return time(nullptr) + interval;
}
-int RGWRestore::set_cloud_restore_status(const DoutPrefixProvider* dpp,
+int Restore::set_cloud_restore_status(const DoutPrefixProvider* dpp,
rgw::sal::Object* pobj, optional_yield y,
const rgw::sal::RGWRestoreStatus& restore_status)
{
if (!pobj)
return ret;
- pobj->set_atomic();
+ pobj->set_atomic(true);
bufferlist bl;
using ceph::encode;
return ret;
}
-int RGWRestore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
+int Restore::restore_obj_from_cloud(rgw::sal::Bucket* pbucket,
rgw::sal::Object* pobj,
rgw::sal::PlacementTier* tier,
std::optional<uint64_t> days,
if (in_progress) {
// add restore entry to the list
- RGWRestoreEntry entry;
+ RestoreEntry entry;
entry.bucket = pbucket->get_key();
entry.obj_key = pobj->get_key();
entry.status = rgw::sal::RGWRestoreStatus::RestoreAlreadyInProgress;
entry.days = days;
entry.zone_id = driver->get_zone()->get_id();
- ldpp_dout(this, 10) << "RGWRestore:: Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << dendl;
+ ldpp_dout(this, 10) << "Restore:: Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << dendl;
int index = choose_oid(entry);
ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Adding restore entry of object(" << pobj->get_key() << ") entry: " << entry << ", to shard:" << obj_names[index] << dendl;
- ret = sal_restore->add_entry(this, y, index, entry);
+ std::vector<rgw::restore::RestoreEntry> r_entries;
+ r_entries.push_back(entry);
+ ret = sal_restore->add_entries(this, y, index, r_entries);
if (ret < 0) {
ldpp_dout(this, -1) << __PRETTY_FUNCTION__ << ": ERROR: Adding restore entry of object(" << pobj->get_key() << ") failed" << ret << dendl;
ldpp_dout(this, 10) << __PRETTY_FUNCTION__ << ": Restore of object " << pobj->get_key() << (in_progress ? " is in progress" : " succeeded") << dendl;
return ret;
}
+
+} // namespace rgw::restore
static constexpr std::string_view restore_oid_prefix = "restore";
static constexpr std::string_view restore_index_lock_name = "restore_process";
+namespace rgw::restore {
+
/** Single Restore entry state */
-struct RGWRestoreEntry {
+struct RestoreEntry {
rgw_bucket bucket;
rgw_obj_key obj_key;
std::optional<uint64_t> days;
std::string zone_id; // or should it be zone name?
rgw::sal::RGWRestoreStatus status;
- RGWRestoreEntry() {}
+ RestoreEntry() {}
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
}
void dump(ceph::Formatter* f) const;
void decode_json(JSONObj* obj);
- static void generate_test_instances(std::list<RGWRestoreEntry*>& l);
+ static void generate_test_instances(std::list<rgw::restore::RestoreEntry*>& l);
};
-WRITE_CLASS_ENCODER(RGWRestoreEntry)
+WRITE_CLASS_ENCODER(RestoreEntry)
-class RGWRestore : public DoutPrefixProvider {
+class Restore : public DoutPrefixProvider {
CephContext *cct;
rgw::sal::Driver* driver;
std::unique_ptr<rgw::sal::Restore> sal_restore;
{
const DoutPrefixProvider *dpp;
CephContext *cct;
- RGWRestore *restore;
+ rgw::restore::Restore *restore;
ceph::mutex lock = ceph::make_mutex("RestoreWorker");
ceph::condition_variable cond;
using lock_guard = std::lock_guard<std::mutex>;
using unique_lock = std::unique_lock<std::mutex>;
- RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWRestore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {}
- RGWRestore* get_restore() { return restore; }
+ RestoreWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, rgw::restore::Restore *_restore) : dpp(_dpp), cct(_cct), restore(_restore) {}
+ rgw::restore::Restore* get_restore() { return restore; }
std::string thr_name() {
return std::string{"restore_thrd: "}; // + std::to_string(ix);
}
friend class RGWRados;
}; // RestoreWorker
- std::unique_ptr<RGWRestore::RestoreWorker> worker;
+ std::unique_ptr<Restore::RestoreWorker> worker;
public:
- ~RGWRestore() {
+ ~Restore() {
stop_processor();
finalize();
}
friend class RGWRados;
- RGWRestore() : cct(nullptr), driver(nullptr), max_objs(0) {}
+ Restore() : cct(nullptr), driver(nullptr), max_objs(0) {}
int initialize(CephContext *_cct, rgw::sal::Driver* _driver);
void finalize();
std::ostream& gen_prefix(std::ostream& out) const;
int process(RestoreWorker* worker, optional_yield y);
- int choose_oid(const RGWRestoreEntry& e);
+ int choose_oid(const rgw::restore::RestoreEntry& e);
int process(int index, int max_secs, optional_yield y);
- int process_restore_entry(RGWRestoreEntry& entry, optional_yield y);
+ int process_restore_entry(rgw::restore::RestoreEntry& entry, optional_yield y);
time_t thread_stop_at();
/** Set the restore status for the given object */
const DoutPrefixProvider* dpp,
optional_yield y);
};
+
+} // namespace rgw::restore
struct RGWBucketEnt;
class RGWRESTMgr;
class RGWLC;
-class RGWRestore;
struct rgw_user_bucket;
class RGWUsageBatch;
class RGWCoroutinesManagerRegistry;
struct rgw_pubsub_topic;
struct RGWOIDCProviderInfo;
struct RGWRoleInfo;
-struct RGWRestoreEntry;
using RGWBucketListNameFilter = std::function<bool (const std::string&)>;
namespace IAM { struct Policy; }
}
+namespace rgw::restore {
+ class Restore;
+ struct RestoreEntry;
+}
+
class RGWGetDataCB {
public:
virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the tier restore management thread */
- virtual RGWRestore* get_rgwrestore(void) = 0;
+ virtual rgw::restore::Restore* get_rgwrestore(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0;
virtual ~Restore() = default;
virtual int initialize(const DoutPrefixProvider* dpp, optional_yield y,
int n_objs, std::vector<std::string>& obj_names) = 0;
- /** Add a single restore entry state */
- virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const RGWRestoreEntry& r_entry) = 0;
/** Add list of restore entries */
virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const std::list<RGWRestoreEntry>& restore_entries) = 0;
+ int index, const std::vector<rgw::restore::RestoreEntry>& restore_entries) = 0;
/** List all known entries given a marker */
virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
int index,
const std::string& marker, std::string* out_marker,
- uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ uint32_t max_entries, std::vector<rgw::restore::RestoreEntry>& entries,
bool* truncated) = 0;
/** Trim restore entries upto the marker */
virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string_view& marker) = 0;
- /* Check if the list is empty */
- virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
/** Get a serializer for restore processing */
virtual std::unique_ptr<RestoreSerializer> get_serializer(
const std::string& topic_queue) override;
virtual RGWLC* get_rgwlc(void) override;
- virtual RGWRestore* get_rgwrestore(void) override { return NULL; }
+ virtual rgw::restore::Restore* get_rgwrestore(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
return next->get_rgwlc();
}
-RGWRestore* FilterDriver::get_rgwrestore()
+rgw::restore::Restore* FilterDriver::get_rgwrestore()
{
return next->get_rgwrestore();
}
return next->initialize(dpp, y, n_objs, obj_names);
}
-int FilterRestore::add_entry(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const RGWRestoreEntry& r_entry) {
- return next->add_entry(dpp, y, index, r_entry);
-}
-
int FilterRestore::add_entries(const DoutPrefixProvider* dpp, optional_yield y,
int index,
- const std::list<RGWRestoreEntry>& restore_entries) {
+ const std::vector<rgw::restore::RestoreEntry>& restore_entries) {
return next->add_entries(dpp, y, index, restore_entries);
}
/** List all known entries */
int FilterRestore::list(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string& marker, std::string* out_marker,
- uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ uint32_t max_entries, std::vector<rgw::restore::RestoreEntry>& entries,
bool* truncated) {
return next->list(dpp, y, index, marker, out_marker, max_entries,
entries, truncated);
return next->trim_entries(dpp, y, index, marker);
}
-int FilterRestore::is_empty(const DoutPrefixProvider *dpp, optional_yield y) {
- return next->is_empty(dpp, y);
-}
int FilterNotification::publish_reserve(const DoutPrefixProvider *dpp,
RGWObjTags* obj_tags)
return next->get_bucket_topic_mapping(topic, bucket_keys, y, dpp);
}
virtual RGWLC* get_rgwlc(void) override;
- virtual RGWRestore* get_rgwrestore(void) override;
+ virtual rgw::restore::Restore* get_rgwrestore(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override;
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket,
FilterRestoreSerializer(std::unique_ptr<RestoreSerializer> _next) : next(std::move(_next)) {}
virtual ~FilterRestoreSerializer() = default;
virtual int try_lock(const DoutPrefixProvider *dpp, utime_t dur, optional_yield y) override;
- virtual int unlock() override { return next->unlock(); }
+ virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override
+ { return next->unlock(dpp, y); }
virtual void print(std::ostream& out) const override { return next->print(out); }
};
virtual int initialize(const DoutPrefixProvider* dpp, optional_yield y,
int n_objs, std::vector<std::string>& obj_names) override;
- virtual int add_entry(const DoutPrefixProvider* dpp, optional_yield y,
- int index, const RGWRestoreEntry& r_entry) override;
virtual int add_entries(const DoutPrefixProvider* dpp, optional_yield y,
int index,
- const std::list<RGWRestoreEntry>& restore_entries) override;
+ const std::vector<rgw::restore::RestoreEntry>& restore_entries) override;
/** List all known entries */
virtual int list(const DoutPrefixProvider *dpp, optional_yield y,
int index,
const std::string& marker, std::string* out_marker,
- uint32_t max_entries, std::vector<RGWRestoreEntry>& entries,
+ uint32_t max_entries, std::vector<rgw::restore::RestoreEntry>& entries,
bool* truncated) override;
virtual int trim_entries(const DoutPrefixProvider *dpp, optional_yield y,
int index, const std::string_view& marker) override;
- virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y);
/** Get a serializer for lifecycle */
virtual std::unique_ptr<RestoreSerializer> get_serializer(