#include "cls/fifo/cls_fifo_types.h"
#include "cls/fifo/cls_fifo_ops.h"
-#include "librados/AioCompletionImpl.h"
-
-#include "rgw_tools.h"
-
#include "cls_fifo_legacy.h"
namespace rgw::cls::fifo {
}
};
-template<typename T>
-struct Completion {
-private:
- lr::AioCompletion* _cur = nullptr;
- lr::AioCompletion* _super;
-public:
-
- using Ptr = std::unique_ptr<T>;
-
- lr::AioCompletion* cur() const {
- return _cur;
- }
- lr::AioCompletion* super() const {
- return _super;
- }
-
- Completion(lr::AioCompletion* super) : _super(super) {
- super->pc->get();
- }
-
- ~Completion() {
- if (_super) {
- _super->pc->put();
- }
- if (_cur)
- _cur->release();
- _super = nullptr;
- _cur = nullptr;
- }
-
- // The only times that aio_operate can return an error are:
- // 1. The completion contains a null pointer. This should just
- // crash, and in our case it does.
- // 2. An attempt is made to write to a snapshot. RGW doesn't use
- // snapshots, so we don't care.
- //
- // So we will just assert that initiating an Aio operation succeeds
- // and not worry about recovering.
- static lr::AioCompletion* call(Ptr&& p) {
- p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
- &cb);
- auto c = p->_cur;
- p.release();
- return c;
- }
- static void complete(Ptr&& p, int r) {
- auto c = p->_super;
- p->_super = nullptr;
- c->pc->put();
- rgw_complete_aio_completion(c, r);
- }
-
- static void cb(lr::completion_t, void* arg) {
- auto t = static_cast<T*>(arg);
- auto r = t->_cur->get_return_value();
- t->_cur->release();
- t->_cur = nullptr;
- t->handle(Ptr(t), r);
- }
-};
-
lr::ObjectReadOperation get_part_info(CephContext* cct,
fifo::part_header* header,
std::uint64_t tid, int* r = 0)
#include "cls/fifo/cls_fifo_types.h"
#include "cls/fifo/cls_fifo_ops.h"
+#include "librados/AioCompletionImpl.h"
+
+#include "rgw_tools.h"
+
namespace rgw::cls::fifo {
namespace cb = ceph::buffer;
namespace fifo = rados::cls::fifo;
lr::AioCompletion* c //< AIO Completion
);
};
+
+template<typename T>
+struct Completion {
+private:
+ lr::AioCompletion* _cur = nullptr;
+ lr::AioCompletion* _super;
+public:
+
+ using Ptr = std::unique_ptr<T>;
+
+ lr::AioCompletion* cur() const {
+ return _cur;
+ }
+ lr::AioCompletion* super() const {
+ return _super;
+ }
+
+ Completion(lr::AioCompletion* super) : _super(super) {
+ super->pc->get();
+ }
+
+ ~Completion() {
+ if (_super) {
+ _super->pc->put();
+ }
+ if (_cur)
+ _cur->release();
+ _super = nullptr;
+ _cur = nullptr;
+ }
+
+ // The only times that aio_operate can return an error are:
+ // 1. The completion contains a null pointer. This should just
+ // crash, and in our case it does.
+ // 2. An attempt is made to write to a snapshot. RGW doesn't use
+ // snapshots, so we don't care.
+ //
+ // So we will just assert that initiating an Aio operation succeeds
+ // and not worry about recovering.
+ static lr::AioCompletion* call(Ptr&& p) {
+ p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
+ &cb);
+ auto c = p->_cur;
+ p.release();
+ return c;
+ }
+ static void complete(Ptr&& p, int r) {
+ auto c = p->_super;
+ p->_super = nullptr;
+ rgw_complete_aio_completion(c, r);
+ }
+
+ static void cb(lr::completion_t, void* arg) {
+ auto t = static_cast<T*>(arg);
+ auto r = t->_cur->get_return_value();
+ t->_cur->release();
+ t->_cur = nullptr;
+ t->handle(Ptr(t), r);
+ }
+};
+
}
#endif // CEPH_RGW_CLS_FIFO_LEGACY_H
try {
switch (*found) {
case log_type::omap:
- be = std::make_unique<RGWDataChangesOmap>(ioctx, *this, 0, num_shards);
+ bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards));
break;
case log_type::fifo:
- be = std::make_unique<RGWDataChangesFIFO>(ioctx, *this, 0, num_shards);
+ bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards));
break;
}
} catch (bs::system_error& e) {
return ceph::from_error_code(e.code());
}
- ceph_assert(be);
renew_thread = make_named_thread("rgw_dt_lg_renew",
&RGWDataChangesLog::renew_run, this);
return 0;
l.unlock();
auto ut = real_clock::now();
+ auto be = bes.head();
for (const auto& bs : entries) {
auto index = choose_oid(bs);
ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
+ auto be = bes.head();
ret = be->push(index, now, change.key, std::move(bl));
now = real_clock::now();
return ret;
}
+int DataLogBackends::list(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated)
+{
+ auto [gen_id, cursor] = cursorgeno(marker);
+ std::string out_cursor;
+ while (max_entries > 0) {
+ std::vector<rgw_data_change_log_entry> gentries;
+ std::unique_lock l(m);
+ auto i = lower_bound(gen_id);
+ if (i == end()) return 0;
+ auto be = i->second;
+ auto r = be->list(shard, max_entries, gentries, cursor,
+ &out_cursor, truncated);
+ if (r < 0)
+ return r;
+
+ *out_marker = gencursor(gen_id, out_cursor);
+ for (auto& g : gentries) {
+ g.log_id = gencursor(gen_id, g.log_id);
+ }
+ max_entries -= gentries.size();
+ std::move(gentries.begin(), gentries.end(),
+ std::back_inserter(entries));
+ cursor = {};
+ ++gen_id;
+ }
+ return 0;
+}
+
int RGWDataChangesLog::list_entries(int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
std::optional<std::string_view> marker,
std::string* out_marker, bool* truncated)
{
assert(shard < num_shards);
- return be->list(shard, max_entries, entries, std::string(marker.value_or("")),
- out_marker, truncated);
+ return bes.list(shard, max_entries, entries, marker, out_marker, truncated);
}
int RGWDataChangesLog::list_entries(int max_entries,
int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
{
assert(shard_id < num_shards);
+ auto be = bes.head();
return be->get_info(shard_id, info);
}
+int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
+{
+ auto [target_gen, cursor] = cursorgen(marker);
+ std::unique_lock l(m);
+ const auto head_gen = (end() - 1)->second->gen_id;
+ const auto tail_gen = begin()->first;
+ if (target_gen < tail_gen) return 0;
+ auto r = 0;
+ for (auto i = lower_bound(0);
+ i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0;
+ i = upper_bound(i->first)) {
+ auto be = i->second;
+ l.unlock();
+ auto c = be->gen_id == target_gen ? cursor : be->max_marker();
+ r = be->trim(shard_id, c);
+ l.lock();
+ };
+ return r;
+}
+
int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
{
assert(shard_id < num_shards);
- return be->trim(shard_id, marker);
+ return bes.trim_entries(shard_id, marker);
+}
+
+class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
+public:
+ DataLogBackends* const bes;
+ const int shard_id;
+ const uint64_t target_gen;
+ const std::string cursor;
+ const uint64_t head_gen;
+ const uint64_t tail_gen;
+ boost::intrusive_ptr<RGWDataChangesBE> be;
+
+ GenTrim(DataLogBackends* bes, int shard_id, uint64_t target_gen, std::string cursor,
+ uint64_t head_gen, uint64_t tail_gen,
+ boost::intrusive_ptr<RGWDataChangesBE>&& be,
+ lr::AioCompletion* super)
+ : Completion(super), bes(bes), shard_id(shard_id), target_gen(target_gen),
+ cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
+ be(std::move(be)) {}
+
+ void handle(Ptr&& p, int r) {
+ auto gen_id = be->gen_id;
+ be.reset();
+ if (r < 0) {
+ complete(std::move(p), r);
+ return;
+ }
+
+ {
+ std::unique_lock l(bes->m);
+ auto i = bes->upper_bound(gen_id);
+ if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
+ l.unlock();
+ complete(std::move(p), r);
+ return;
+ }
+ be = i->second;
+ }
+ auto c = be->gen_id == target_gen ? cursor : be->max_marker();
+ r = be->trim(shard_id, c, call(std::move(p)));
+ }
+};
+
+void DataLogBackends::trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c)
+{
+ auto [target_gen, cursor] = cursorgen(marker);
+ std::unique_lock l(m);
+ const auto head_gen = (end() - 1)->second->gen_id;
+ const auto tail_gen = begin()->first;
+ if (target_gen < tail_gen) {
+ l.unlock();
+ rgw_complete_aio_completion(c, 0);
+ return;
+ }
+ auto be = lower_bound(0)->second;
+ l.unlock();
+ auto p = be.get();
+ auto gt = std::make_unique<GenTrim>(this, shard_id, target_gen,
+ std::string(cursor), head_gen, tail_gen,
+ std::move(be), c);
+
+ p->trim(shard_id, cursor, GenTrim::call(std::move(gt)));
}
int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
librados::AioCompletion* c)
{
assert(shard_id < num_shards);
- return be->trim(shard_id, marker, c);
+ bes.trim_entries(shard_id, marker, c);
+ return 0;
}
bool RGWDataChangesLog::going_down() const
modified_shards[shard_id].insert(key);
}
-std::string_view RGWDataChangesLog::max_marker() const {
- return be->max_marker();
+std::string RGWDataChangesLog::max_marker() const {
+ return gencursor(std::numeric_limits<uint64_t>::max(),
+ "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
#include <vector>
#include <boost/container/flat_map.hpp>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
class RGWDataChangesBE;
+class DataLogBackends
+ : private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+ friend class GenTrim;
+
+ std::mutex m;
+public:
+
+ boost::intrusive_ptr<RGWDataChangesBE> head() {
+ std::unique_lock l(m);
+ auto i = end();
+ --i;
+ return i->second;
+ }
+ int list(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated);
+ int trim_entries(int shard_id, std::string_view marker);
+ void trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c);
+ void set_zero(RGWDataChangesBE* be) {
+ emplace(0, be);
+ }
+};
+
class RGWDataChangesLog {
CephContext *cct;
librados::IoCtx ioctx;
rgw::BucketChangeObserver *observer = nullptr;
const RGWZone* zone;
- std::unique_ptr<RGWDataChangesBE> be;
+ DataLogBackends bes;
const int num_shards;
std::string get_prefix() {
bucket_filter = std::move(f);
}
// a marker that compares greater than any other
- std::string_view max_marker() const;
+ std::string max_marker() const;
std::string get_oid(uint64_t gen_id, int shard_id) const;
};
-class RGWDataChangesBE {
+class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
protected:
librados::IoCtx& ioctx;
CephContext* const cct;
RGWDataChangesLog& datalog;
- uint64_t gen_id;
std::string get_oid(int shard_id) {
return datalog.get_oid(gen_id, shard_id);
using entries = std::variant<std::list<cls_log_entry>,
std::vector<ceph::buffer::list>>;
+ const uint64_t gen_id;
+
RGWDataChangesBE(librados::IoCtx& ioctx,
RGWDataChangesLog& datalog,
uint64_t gen_id)