#include "common/async/librados_completion.h"
#include "cls/fifo/cls_fifo_types.h"
+#include "cls/log/cls_log_client.h"
#include "cls_fifo_legacy.h"
#include "rgw_datalog.h"
static constexpr auto dout_subsys = ceph_subsys_rgw;
namespace bs = boost::system;
+namespace lr = librados;
void rgw_data_change::dump(ceph::Formatter *f) const
{
class RGWDataChangesOmap final : public RGWDataChangesBE {
using centries = std::list<cls_log_entry>;
- RGWSI_Cls& cls;
std::vector<std::string> oids;
public:
- RGWDataChangesOmap(CephContext* cct, RGWSI_Cls& cls)
- : RGWDataChangesBE(cct), cls(cls) {
- auto num_shards = cct->_conf->rgw_data_log_num_shards;
+ RGWDataChangesOmap(lr::IoCtx& ioctx, int num_shards)
+ : RGWDataChangesBE(ioctx) {
oids.reserve(num_shards);
for (auto i = 0; i < num_shards; ++i) {
oids.push_back(get_oid(i));
}
cls_log_entry e;
- cls.timelog.prepare_entry(e, ut, {}, key, entry);
+ cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
std::get<centries>(out).push_back(std::move(e));
}
int push(int index, entries&& items) override {
- auto r = cls.timelog.add(oids[index], std::get<centries>(items),
- nullptr, true, null_yield);
+ lr::ObjectWriteOperation op;
+ cls_log_add(op, std::get<centries>(items), true);
+ auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield);
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": failed to push to " << oids[index] << cpp_strerror(-r)
int push(int index, ceph::real_time now,
const std::string& key,
ceph::buffer::list&& bl) override {
- auto r = cls.timelog.add(oids[index], now, {}, key, bl, null_yield);
+ lr::ObjectWriteOperation op;
+ cls_log_add(op, utime_t(now), {}, key, bl);
+ auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield);
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": failed to push to " << oids[index]
std::optional<std::string_view> marker,
std::string* out_marker, bool* truncated) override {
std::list<cls_log_entry> log_entries;
- auto r = cls.timelog.list(oids[index], {}, {},
- max_entries, log_entries,
- std::string(marker.value_or("")),
- out_marker, truncated, null_yield);
+ lr::ObjectReadOperation op;
+ cls_log_list(op, {}, {}, std::string(marker.value_or("")),
+ max_entries, log_entries, out_marker, truncated);
+ auto r = rgw_rados_operate(ioctx, oids[index], &op, nullptr, null_yield);
if (r == -ENOENT) {
*truncated = false;
return 0;
}
int get_info(int index, RGWDataChangesLogInfo *info) override {
cls_log_header header;
- auto r = cls.timelog.info(oids[index], &header, null_yield);
+ lr::ObjectReadOperation op;
+ cls_log_info(op, &header);
+ auto r = rgw_rados_operate(ioctx, oids[index], &op, nullptr, null_yield);
if (r == -ENOENT) r = 0;
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
return r;
}
int trim(int index, std::string_view marker) override {
- auto r = cls.timelog.trim(oids[index], {}, {},
- {}, std::string(marker), nullptr,
- null_yield);
-
+ lr::ObjectWriteOperation op;
+ cls_log_trim(op, {}, {}, {}, std::string(marker));
+ auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield);
if (r == -ENOENT) r = 0;
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
return r;
}
int trim(int index, std::string_view marker,
- librados::AioCompletion* c) override {
- auto r = cls.timelog.trim(oids[index], {}, {},
- {}, std::string(marker), c, null_yield);
-
+ lr::AioCompletion* c) override {
+ lr::ObjectWriteOperation op;
+ cls_log_trim(op, {}, {}, {}, std::string(marker));
+ auto r = ioctx.aio_operate(oids[index], c, &op, 0);
if (r == -ENOENT) r = 0;
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
using centries = std::vector<ceph::buffer::list>;
std::vector<std::unique_ptr<rgw::cls::fifo::FIFO>> fifos;
public:
- RGWDataChangesFIFO(CephContext* cct, librados::Rados* rados,
- const rgw_pool& log_pool)
- : RGWDataChangesBE(cct) {
- librados::IoCtx ioctx;
- auto shards = cct->_conf->rgw_data_log_num_shards;
- auto r = rgw_init_ioctx(rados, log_pool.name, ioctx,
- true, false);
- if (r < 0) {
- throw bs::system_error(ceph::to_error_code(r));
- }
+ RGWDataChangesFIFO(lr::IoCtx& ioctx, int shards)
+ : RGWDataChangesBE(ioctx) {
fifos.resize(shards);
for (auto i = 0; i < shards; ++i) {
- r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
- &fifos[i], null_yield);
+ auto r = rgw::cls::fifo::FIFO::create(ioctx, get_oid(i),
+ &fifos[i], null_yield);
if (r < 0) {
throw bs::system_error(ceph::to_error_code(r));
}
int RGWDataChangesLog::start(const RGWZone* _zone,
const RGWZoneParams& zoneparams,
- RGWSI_Cls *cls, librados::Rados* lr)
+ librados::Rados* lr)
{
zone = _zone;
ceph_assert(zone);
try {
switch (*found) {
case log_type::omap:
- be = std::make_unique<RGWDataChangesOmap>(cct, *cls);
+ be = std::make_unique<RGWDataChangesOmap>(ioctx, num_shards);
break;
case log_type::fifo:
- be = std::make_unique<RGWDataChangesFIFO>(cct, lr, log_pool);
+ be = std::make_unique<RGWDataChangesFIFO>(ioctx, num_shards);
break;
}
} catch (bs::system_error& e) {