class RGWDataChangesOmap final : public RGWDataChangesBE {
using centries = std::list<cls_log_entry>;
std::vector<std::string> oids;
- std::string get_oid(int i) const {
- return datalog.get_oid(i);
- }
+
public:
RGWDataChangesOmap(lr::IoCtx& ioctx,
RGWDataChangesLog& datalog,
+ uint64_t gen_id,
int num_shards)
- : RGWDataChangesBE(ioctx, datalog) {
+ : RGWDataChangesBE(ioctx, datalog, gen_id) {
oids.reserve(num_shards);
for (auto i = 0; i < num_shards; ++i) {
oids.push_back(get_oid(i));
class RGWDataChangesFIFO final : public RGWDataChangesBE {
using centries = std::vector<ceph::buffer::list>;
std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
- std::string get_oid(int i) const {
- return datalog.get_oid(i);
- }
+
public:
RGWDataChangesFIFO(lr::IoCtx& ioctx,
RGWDataChangesLog& datalog,
- int shards)
- : RGWDataChangesBE(ioctx, datalog) {
+ uint64_t gen_id, int shards)
+ : RGWDataChangesBE(ioctx, datalog, gen_id) {
fifos.resize(shards);
for (auto i = 0; i < shards; ++i) {
auto r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
}
auto found = log_backing_type(ioctx, *defbacking, num_shards,
- [this](int i) { return get_oid(i); },
+ [this](int i) { return get_oid(0, i); },
null_yield);
if (!found) {
try {
switch (*found) {
case log_type::omap:
- be = std::make_unique<RGWDataChangesOmap>(ioctx, *this, num_shards);
+ be = std::make_unique<RGWDataChangesOmap>(ioctx, *this, 0, num_shards);
break;
case log_type::fifo:
- be = std::make_unique<RGWDataChangesFIFO>(ioctx, *this, num_shards);
+ be = std::make_unique<RGWDataChangesFIFO>(ioctx, *this, 0, num_shards);
break;
}
} catch (bs::system_error& e) {
return bucket_filter(bucket, y);
}
-std::string RGWDataChangesLog::get_oid(int i) const {
- return fmt::format("{}.{}", prefix, i);
+std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
+ return (gen_id > 0 ?
+ fmt::format("{}@G{}.{}", prefix, gen_id, i) :
+ fmt::format("{}.{}", prefix, i));
}
int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
class RGWDataChangesLog;
-class RGWDataChangesBE {
-protected:
- librados::IoCtx& ioctx;
- CephContext* const cct;
- RGWDataChangesLog& datalog;
-private:
-public:
- using entries = std::variant<std::list<cls_log_entry>,
- std::vector<ceph::buffer::list>>;
-
- RGWDataChangesBE(librados::IoCtx& ioctx,
- RGWDataChangesLog& datalog)
- : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())),
- datalog(datalog) {}
- virtual ~RGWDataChangesBE() = default;
-
- virtual void prepare(ceph::real_time now,
- const std::string& key,
- ceph::buffer::list&& entry,
- entries& out) = 0;
- virtual int push(int index, entries&& items) = 0;
- virtual int push(int index, ceph::real_time now,
- const std::string& key,
- ceph::buffer::list&& bl) = 0;
- virtual int list(int shard, int max_entries,
- std::vector<rgw_data_change_log_entry>& entries,
- std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated) = 0;
- virtual int get_info(int index, RGWDataChangesLogInfo *info) = 0;
- virtual int trim(int index, std::string_view marker) = 0;
- virtual int trim(int index, std::string_view marker,
- librados::AioCompletion* c) = 0;
- virtual std::string_view max_marker() const = 0;
-};
+class RGWDataChangesBE;
class RGWDataChangesLog {
CephContext *cct;
}
// a marker that compares greater than any other
std::string_view max_marker() const;
- std::string get_oid(int shard_id) const;
+ std::string get_oid(uint64_t gen_id, int shard_id) const;
+};
+
+class RGWDataChangesBE {
+protected:
+ librados::IoCtx& ioctx;
+ CephContext* const cct;
+ RGWDataChangesLog& datalog;
+ uint64_t gen_id;
+
+ std::string get_oid(int shard_id) {
+ return datalog.get_oid(gen_id, shard_id);
+ }
+public:
+ using entries = std::variant<std::list<cls_log_entry>,
+ std::vector<ceph::buffer::list>>;
+
+ RGWDataChangesBE(librados::IoCtx& ioctx,
+ RGWDataChangesLog& datalog,
+ uint64_t gen_id)
+ : ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())),
+ datalog(datalog), gen_id(gen_id) {}
+ virtual ~RGWDataChangesBE() = default;
+
+ virtual void prepare(ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& entry,
+ entries& out) = 0;
+ virtual int push(int index, entries&& items) = 0;
+ virtual int push(int index, ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& bl) = 0;
+ virtual int list(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated) = 0;
+ virtual int get_info(int index, RGWDataChangesLogInfo *info) = 0;
+ virtual int trim(int index, std::string_view marker) = 0;
+ virtual int trim(int index, std::string_view marker,
+ librados::AioCompletion* c) = 0;
+ virtual std::string_view max_marker() const = 0;
};
+
#endif