lr::ObjectWriteOperation op;
cls_log_trim(op, {}, {}, {}, std::string(marker));
auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield);
- if (r == -ENOENT) r = 0;
- if (r < 0) {
+ if (r == -ENOENT) r = -ENODATA;
+ if (r < 0 && r != -ENODATA) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": failed to get info from " << oids[index]
<< cpp_strerror(-r) << dendl;
lr::ObjectWriteOperation op;
cls_log_trim(op, {}, {}, {}, std::string(marker));
auto r = ioctx.aio_operate(oids[index], c, &op, 0);
- if (r == -ENOENT) r = 0;
+ if (r == -ENOENT) r = -ENODATA;
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": failed to get info from " << oids[index]
librados::AioCompletion* c) override {
int r = 0;
if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
- rgw_complete_aio_completion(c, 0);
+ rgw_complete_aio_completion(c, -ENODATA);
} else {
fifos[index]->trim(marker, false, c);
}
prefix(get_prefix()),
changes(cct->_conf->rgw_data_log_changes_size) {}
+bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
+ std::unique_lock l(m);
+
+ for (const auto& [gen_id, gen] : e) {
+ if (gen.empty) {
+ lderr(datalog.cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
+ }
+ if (count(gen_id) != 0) {
+ lderr(datalog.cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
+ }
+ try {
+ switch (gen.type) {
+ case log_type::omap:
+ emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
+ break;
+ case log_type::fifo:
+ emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
+ break;
+ default:
+ lderr(datalog.cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
+ << ", type" << gen.type << dendl;
+ return bs::error_code(EFAULT, bs::system_category());
+ }
+ } catch (const bs::system_error& err) {
+ lderr(datalog.cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": error setting up backend: gen_id=" << gen_id
+ << ", err=" << err.what() << dendl;
+ return err.code();
+ }
+ }
+ return {};
+}
+bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
+ return handle_init(std::move(e));
+}
+bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
+ std::unique_lock l(m);
+ auto i = cbegin();
+ if (i->first < new_tail) {
+ return {};
+ }
+ if (new_tail >= (cend() - 1)->first) {
+ lderr(datalog.cct)
+ << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
+ return bs::error_code(EFAULT, bs::system_category());
+ }
+ erase(i, upper_bound(new_tail));
+ return {};
+}
+
+
int RGWDataChangesLog::start(const RGWZone* _zone,
const RGWZoneParams& zoneparams,
librados::Rados* lr)
return -r;
}
- auto found = log_backing_type(ioctx, *defbacking, num_shards,
- [this](int i) { return get_oid(0, i); },
- null_yield);
+ auto besr = logback_generations::init<DataLogBackends>(
+ ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
+ return get_oid(gen_id, shard);
+ },
+ num_shards, *defbacking, null_yield, *this);
- if (!found) {
- lderr(cct) << __PRETTY_FUNCTION__
- << ": Error when checking log type: "
- << found.error().message() << dendl;
- }
- try {
- switch (*found) {
- case log_type::omap:
- bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards));
- break;
- case log_type::fifo:
- bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards));
- break;
- }
- } catch (bs::system_error& e) {
+
+ if (!besr) {
lderr(cct) << __PRETTY_FUNCTION__
- << ": Error when starting backend: "
- << e.what() << dendl;
- return ceph::from_error_code(e.code());
+ << ": Error initializing backends: "
+ << besr.error().message() << dendl;
+ return ceph::from_error_code(besr.error());
}
+ bes = std::move(*besr);
renew_thread = make_named_thread("rgw_dt_lg_renew",
&RGWDataChangesLog::renew_run, this);
return 0;
l.unlock();
auto ut = real_clock::now();
- auto be = bes.head();
+ auto be = bes->head();
for (const auto& bs : entries) {
auto index = choose_oid(bs);
ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
- auto be = bes.head();
+ auto be = bes->head();
ret = be->push(index, now, change.key, std::move(bl));
now = real_clock::now();
if (r < 0)
return r;
- *out_marker = gencursor(gen_id, out_cursor);
+ if (out_marker && !out_cursor.empty()) {
+ *out_marker = gencursor(gen_id, out_cursor);
+ }
for (auto& g : gentries) {
g.log_id = gencursor(gen_id, g.log_id);
}
std::string* out_marker, bool* truncated)
{
assert(shard < num_shards);
- return bes.list(shard, max_entries, entries, marker, out_marker, truncated);
+ return bes->list(shard, max_entries, entries, marker, out_marker, truncated);
}
int RGWDataChangesLog::list_entries(int max_entries,
int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
{
assert(shard_id < num_shards);
- auto be = bes.head();
- return be->get_info(shard_id, info);
+ auto be = bes->head();
+ auto r = be->get_info(shard_id, info);
+ if (!info->marker.empty()) {
+ info->marker = gencursor(be->gen_id, info->marker);
+ }
+ return r;
}
int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
const auto tail_gen = begin()->first;
if (target_gen < tail_gen) return 0;
auto r = 0;
- for (auto i = lower_bound(0);
- i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0;
- i = upper_bound(i->first)) {
- auto be = i->second;
+ for (auto be = lower_bound(0)->second;
+ be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
+ be = upper_bound(be->gen_id)->second) {
l.unlock();
auto c = be->gen_id == target_gen ? cursor : be->max_marker();
r = be->trim(shard_id, c);
+ if (r == -ENODATA && be->gen_id < target_gen) r = 0;
l.lock();
};
return r;
int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
{
assert(shard_id < num_shards);
- return bes.trim_entries(shard_id, marker);
+ return bes->trim_entries(shard_id, marker);
}
class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
void handle(Ptr&& p, int r) {
auto gen_id = be->gen_id;
be.reset();
+ if (r == -ENOENT) r = -ENODATA;
+ if (r == -ENODATA && gen_id < target_gen) r = 0;
if (r < 0) {
complete(std::move(p), r);
return;
librados::AioCompletion* c)
{
assert(shard_id < num_shards);
- bes.trim_entries(shard_id, marker, c);
+ bes->trim_entries(shard_id, marker, c);
return 0;
}
#include "cls/log/cls_log_types.h"
#include "rgw_basic_types.h"
+#include "rgw_log_backing.h"
#include "rgw_sync_policy.h"
#include "rgw_zone.h"
#include "rgw_trim_bilog.h"
class RGWDataChangesBE;
-class DataLogBackends
- : private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+class DataLogBackends final
+ : public logback_generations,
+ private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+ friend class logback_generations;
friend class GenTrim;
std::mutex m;
+ RGWDataChangesLog& datalog;
+
+ DataLogBackends(librados::IoCtx& ioctx,
+ std::string oid,
+ fu2::unique_function<std::string(
+ uint64_t, int) const>&& get_oid,
+ int shards, RGWDataChangesLog& datalog) noexcept
+ : logback_generations(ioctx, oid, std::move(get_oid),
+ shards), datalog(datalog) {}
public:
boost::intrusive_ptr<RGWDataChangesBE> head() {
void set_zero(RGWDataChangesBE* be) {
emplace(0, be);
}
+
+ bs::error_code handle_init(entries_t e) noexcept override;
+ bs::error_code handle_new_gens(entries_t e) noexcept override;
+ bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
};
class RGWDataChangesLog {
+ friend DataLogBackends;
CephContext *cct;
librados::IoCtx ioctx;
rgw::BucketChangeObserver *observer = nullptr;
const RGWZone* zone;
- DataLogBackends bes;
+ std::unique_ptr<DataLogBackends> bes;
const int num_shards;
std::string get_prefix() {
auto prefix = cct->_conf->rgw_data_log_obj_prefix;
return prefix.empty() ? prefix : "data_log"s;
}
+ std::string metadata_log_oid() {
+ return get_prefix() + "generations_metadata"s;
+ }
std::string prefix;
ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");