class RGWDataChangesOmap final : public RGWDataChangesBE {
using centries = std::list<cls_log_entry>;
std::vector<std::string> oids;
+ std::string get_oid(int i) const {
+ return datalog.get_oid(i);
+ }
public:
- RGWDataChangesOmap(lr::IoCtx& ioctx, int num_shards)
- : RGWDataChangesBE(ioctx) {
+ RGWDataChangesOmap(lr::IoCtx& ioctx,
+ RGWDataChangesLog& datalog,
+ int num_shards)
+ : RGWDataChangesBE(ioctx, datalog) {
oids.reserve(num_shards);
for (auto i = 0; i < num_shards; ++i) {
oids.push_back(get_oid(i));
class RGWDataChangesFIFO final : public RGWDataChangesBE {
using centries = std::vector<ceph::buffer::list>;
std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
+ std::string get_oid(int i) const {
+ return datalog.get_oid(i);
+ }
public:
- RGWDataChangesFIFO(lr::IoCtx& ioctx, int shards)
- : RGWDataChangesBE(ioctx) {
+ RGWDataChangesFIFO(lr::IoCtx& ioctx,
+ RGWDataChangesLog& datalog,
+ int shards)
+ : RGWDataChangesBE(ioctx, datalog) {
fifos.resize(shards);
for (auto i = 0; i < shards; ++i) {
auto r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
: cct(cct),
num_shards(cct->_conf->rgw_data_log_num_shards),
+ prefix(get_prefix()),
changes(cct->_conf->rgw_data_log_changes_size) {}
int RGWDataChangesLog::start(const RGWZone* _zone,
<< ", pool=" << log_pool << dendl;
return -r;
}
+
auto found = log_backing_type(ioctx, *defbacking, num_shards,
- [this](int i) {
- return RGWDataChangesBE::get_oid(cct, i);
- },
- null_yield);
+ [this](int i) { return get_oid(i); },
+ null_yield);
if (!found) {
lderr(cct) << __PRETTY_FUNCTION__
try {
switch (*found) {
case log_type::omap:
- be = std::make_unique<RGWDataChangesOmap>(ioctx, num_shards);
+ be = std::make_unique<RGWDataChangesOmap>(ioctx, *this, num_shards);
break;
case log_type::fifo:
- be = std::make_unique<RGWDataChangesFIFO>(ioctx, num_shards);
+ be = std::make_unique<RGWDataChangesFIFO>(ioctx, *this, num_shards);
break;
}
} catch (bs::system_error& e) {
}
std::string RGWDataChangesLog::get_oid(int i) const {
- return be->get_oid(i);
+ return fmt::format("{}.{}", prefix, i);
}
int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
#include "include/buffer.h"
#include "include/encoding.h"
+#include "include/function2.hpp"
#include "include/rados/librados.hpp"
RGWDataChangesLogMarker() = default;
};
+class RGWDataChangesLog;
+
class RGWDataChangesBE {
protected:
librados::IoCtx& ioctx;
CephContext* const cct;
+ RGWDataChangesLog& datalog;
private:
- std::string prefix;
- static std::string_view get_prefix(CephContext* cct) {
- std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
- if (prefix.empty()) {
- prefix = "data_log"sv;
- }
- return prefix;
- }
public:
using entries = std::variant<std::list<cls_log_entry>,
std::vector<ceph::buffer::list>>;
- RGWDataChangesBE(librados::IoCtx& ioctx)
+ RGWDataChangesBE(librados::IoCtx& ioctx,
+ RGWDataChangesLog& datalog)
: ioctx(ioctx), cct(static_cast<CephContext*>(ioctx.cct())),
- prefix(get_prefix(cct)) {}
+ datalog(datalog) {}
virtual ~RGWDataChangesBE() = default;
- static std::string get_oid(CephContext* cct, int i) {
- return fmt::format("{}.{}", get_prefix(cct), i);
- }
- std::string get_oid(int i) {
- return fmt::format("{}.{}", prefix, i);
- }
virtual void prepare(ceph::real_time now,
const std::string& key,
ceph::buffer::list&& entry,
std::unique_ptr<RGWDataChangesBE> be;
const int num_shards;
+ std::string get_prefix() {
+ auto prefix = cct->_conf->rgw_data_log_obj_prefix;
+ return prefix.empty() ? prefix : "data_log"s;
+ }
+ std::string prefix;
ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
ceph::shared_mutex modified_lock =