#include "rgw_metadata.h"
#include "rgw_mdlog_types.h"
+#include "services/svc_rados.h"
+
#define META_LOG_OBJ_PREFIX "meta.log."
struct RGWMetadataLogInfo {
using info_callback_t = std::function<void(int, const cls_log_header&)>;
private:
cls_log_header header;
- librados::IoCtx io_ctx;
+ RGWSI_RADOS::Obj io_obj;
librados::AioCompletion *completion;
std::mutex mutex; //< protects callback between cancel/complete
boost::optional<info_callback_t> callback; //< cleared on cancel
explicit RGWMetadataLogInfoCompletion(info_callback_t callback);
~RGWMetadataLogInfoCompletion() override;
- librados::IoCtx& get_io_ctx() { return io_ctx; }
+ RGWSI_RADOS::Obj& get_io_obj() { return io_obj; }
cls_log_header& get_header() { return header; }
librados::AioCompletion* get_completion() { return completion; }
#include "services/svc_meta.h"
#include "services/svc_meta_be.h"
#include "services/svc_meta_be_sobj.h"
+#include "services/svc_cls.h"
#include "include/ceph_assert.h"
store->shard_name(prefix, cct->_conf->rgw_md_log_max_shards, hash_key, oid, &shard_id);
mark_modified(shard_id);
real_time now = real_clock::now();
- return store->time_log_add(oid, now, section, key, bl);
+ return store->svc.cls->timelog.add(oid, now, section, key, bl, null_yield);
}
int RGWMetadataLog::store_entries_in_shard(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion)
mark_modified(shard_id);
store->shard_name(prefix, shard_id, oid);
- return store->time_log_add(oid, entries, completion, false);
+ return store->svc.cls->timelog.add(oid, entries, completion, false, null_yield);
}
void RGWMetadataLog::init_list_entries(int shard_id, const real_time& from_time, const real_time& end_time,
}
std::string next_marker;
- int ret = store->time_log_list(ctx->cur_oid, ctx->from_time, ctx->end_time,
- max_entries, entries, ctx->marker,
- &next_marker, truncated);
+ int ret = store->svc.cls->timelog.list(ctx->cur_oid, ctx->from_time, ctx->end_time,
+ max_entries, entries, ctx->marker,
+ &next_marker, truncated, null_yield);
if ((ret < 0) && (ret != -ENOENT))
return ret;
cls_log_header header;
- int ret = store->time_log_info(oid, &header);
+ int ret = store->svc.cls->timelog.info(oid, &header, null_yield);
if ((ret < 0) && (ret != -ENOENT))
return ret;
completion->get(); // hold a ref until the completion fires
- return store->time_log_info_async(completion->get_io_ctx(), oid,
- &completion->get_header(),
- completion->get_completion());
+ return store->svc.cls->timelog.info_async(completion->get_io_obj(), oid,
+ &completion->get_header(),
+ completion->get_completion());
}
int RGWMetadataLog::trim(int shard_id, const real_time& from_time, const real_time& end_time,
int ret;
- ret = store->time_log_trim(oid, from_time, end_time, start_marker, end_marker);
+ ret = store->svc.cls->timelog.trim(oid, from_time, end_time, start_marker, end_marker, nullptr, null_yield);
if (ret == -ENOENT || ret == -ENODATA)
ret = 0;
}
-void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const real_time& ut, const string& section, const string& key, bufferlist& bl)
-{
- cls_log_add_prepare_entry(entry, utime_t(ut), section, key, bl);
-}
-
-int RGWRados::time_log_add_init(librados::IoCtx& io_ctx)
-{
- return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().log_pool, io_ctx, true);
-
-}
-
-int RGWRados::time_log_add(const string& oid, const real_time& ut, const string& section, const string& key, bufferlist& bl)
-{
- librados::IoCtx io_ctx;
-
- int r = time_log_add_init(io_ctx);
- if (r < 0) {
- return r;
- }
-
- ObjectWriteOperation op;
- utime_t t(ut);
- cls_log_add(op, t, section, key, bl);
-
- return io_ctx.operate(oid, &op);
-}
-
-int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries,
- librados::AioCompletion *completion, bool monotonic_inc)
-{
- librados::IoCtx io_ctx;
-
- int r = time_log_add_init(io_ctx);
- if (r < 0) {
- return r;
- }
-
- ObjectWriteOperation op;
- cls_log_add(op, entries, monotonic_inc);
-
- if (!completion) {
- r = io_ctx.operate(oid, &op);
- } else {
- r = io_ctx.aio_operate(oid, completion, &op);
- }
- return r;
-}
-
-int RGWRados::time_log_list(const string& oid, const real_time& start_time, const real_time& end_time,
- int max_entries, list<cls_log_entry>& entries,
- const string& marker,
- string *out_marker,
- bool *truncated)
-{
- librados::IoCtx io_ctx;
-
- int r = rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().log_pool, io_ctx);
- if (r < 0)
- return r;
- librados::ObjectReadOperation op;
-
- utime_t st(start_time);
- utime_t et(end_time);
-
- cls_log_list(op, st, et, marker, max_entries, entries,
- out_marker, truncated);
-
- bufferlist obl;
-
- int ret = io_ctx.operate(oid, &op, &obl);
- if (ret < 0)
- return ret;
-
- return 0;
-}
-
-int RGWRados::time_log_info(const string& oid, cls_log_header *header)
-{
- librados::IoCtx io_ctx;
-
- int r = rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().log_pool, io_ctx);
- if (r < 0)
- return r;
- librados::ObjectReadOperation op;
-
- cls_log_info(op, header);
-
- bufferlist obl;
-
- int ret = io_ctx.operate(oid, &op, &obl);
- if (ret < 0)
- return ret;
-
- return 0;
-}
-
-int RGWRados::time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cls_log_header *header, librados::AioCompletion *completion)
-{
- int r = rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().log_pool, io_ctx);
- if (r < 0)
- return r;
-
- librados::ObjectReadOperation op;
-
- cls_log_info(op, header);
-
- int ret = io_ctx.aio_operate(oid, completion, &op, NULL);
- if (ret < 0)
- return ret;
-
- return 0;
-}
-
-int RGWRados::time_log_trim(const string& oid, const real_time& start_time, const real_time& end_time,
- const string& from_marker, const string& to_marker,
- librados::AioCompletion *completion)
-{
- librados::IoCtx io_ctx;
-
- int r = rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().log_pool, io_ctx);
- if (r < 0)
- return r;
-
- utime_t st(start_time);
- utime_t et(end_time);
-
- ObjectWriteOperation op;
- cls_log_trim(op, st, et, from_marker, to_marker);
-
- if (!completion) {
- r = io_ctx.operate(oid, &op);
- } else {
- r = io_ctx.aio_operate(oid, completion, &op);
- }
- return r;
-}
-
int RGWRados::lock_exclusive(const rgw_pool& pool, const string& oid, timespan& duration,
string& zone_id, string& owner_id) {
librados::IoCtx io_ctx;
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
void shard_name(const string& prefix, unsigned shard_id, string& name);
int get_target_shard_id(const RGWBucketInfo& bucket_info, const string& obj_key, int *shard_id);
- void time_log_prepare_entry(cls_log_entry& entry, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl);
- int time_log_add_init(librados::IoCtx& io_ctx);
- int time_log_add(const string& oid, list<cls_log_entry>& entries,
- librados::AioCompletion *completion, bool monotonic_inc = true);
- int time_log_add(const string& oid, const ceph::real_time& ut, const string& section, const string& key, bufferlist& bl);
- int time_log_list(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time,
- int max_entries, list<cls_log_entry>& entries,
- const string& marker, string *out_marker, bool *truncated);
- int time_log_info(const string& oid, cls_log_header *header);
- int time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cls_log_header *header, librados::AioCompletion *completion);
- int time_log_trim(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time,
- const string& from_marker, const string& to_marker,
- librados::AioCompletion *completion = nullptr);
int lock_exclusive(const rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
bi_rados.get(), meta.get(), meta_be_sobj.get(),
sync_modules.get());
cls->init(zone.get(), rados.get());
- mdlog->init(zone.get(), sysobj.get());
+ mdlog->init(rados.get(), zone.get(), sysobj.get());
meta->init(sysobj.get(), mdlog.get(), meta_bes);
meta_be_sobj->init(sysobj.get(), mdlog.get());
meta_be_otp->init(sysobj.get(), mdlog.get(), cls.get());
optional_yield y)
{
RGWBucketInfo bucket_info;
- int ret = read_bucket_info(ctx, bucket, &bucket_info, nullptr, nullptr, y);
+ int ret = read_bucket_info(ctx, bucket, &bucket_info, nullptr, nullptr, boost::none, y);
if (ret < 0) {
return ret;
}
#include "rgw/rgw_zone.h"
#include "cls/otp/cls_otp_client.h"
+#include "cls/log/cls_log_client.h"
#define dout_subsys ceph_subsys_rgw
return 0;
}
+void RGWSI_Cls::TimeLog::prepare_entry(cls_log_entry& entry,
+ const real_time& ut,
+ const string& section,
+ const string& key,
+ bufferlist& bl)
+{
+ cls_log_add_prepare_entry(entry, utime_t(ut), section, key, bl);
+}
+
+int RGWSI_Cls::TimeLog::init_obj(const string& oid, RGWSI_RADOS::Obj& obj)
+{
+ rgw_raw_obj o(zone_svc->get_zone_params().log_pool, oid);
+ obj = rados_svc->obj(o);
+ return obj.open();
+
+}
+int RGWSI_Cls::TimeLog::add(const string& oid,
+ const real_time& ut,
+ const string& section,
+ const string& key,
+ bufferlist& bl,
+ optional_yield y)
+{
+ RGWSI_RADOS::Obj obj;
+
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ utime_t t(ut);
+ cls_log_add(op, t, section, key, bl);
+
+ return obj.operate(&op, y);
+}
+
+int RGWSI_Cls::TimeLog::add(const string& oid,
+ std::list<cls_log_entry>& entries,
+ librados::AioCompletion *completion,
+ bool monotonic_inc,
+ optional_yield y)
+{
+ RGWSI_RADOS::Obj obj;
+
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ cls_log_add(op, entries, monotonic_inc);
+
+ if (!completion) {
+ r = obj.operate(&op, y);
+ } else {
+ r = obj.aio_operate(completion, &op);
+ }
+ return r;
+}
+
+int RGWSI_Cls::TimeLog::list(const string& oid,
+ const real_time& start_time,
+ const real_time& end_time,
+ int max_entries, std::list<cls_log_entry>& entries,
+ const string& marker,
+ string *out_marker,
+ bool *truncated,
+ optional_yield y)
+{
+ RGWSI_RADOS::Obj obj;
+
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+
+ utime_t st(start_time);
+ utime_t et(end_time);
+
+ cls_log_list(op, st, et, marker, max_entries, entries,
+ out_marker, truncated);
+
+ bufferlist obl;
+
+ int ret = obj.operate(&op, &obl, y);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWSI_Cls::TimeLog::info(const string& oid,
+ cls_log_header *header,
+ optional_yield y)
+{
+ RGWSI_RADOS::Obj obj;
+
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+
+ cls_log_info(op, header);
+
+ bufferlist obl;
+
+ int ret = obj.operate(&op, &obl, y);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWSI_Cls::TimeLog::info_async(RGWSI_RADOS::Obj& obj,
+ const string& oid,
+ cls_log_header *header,
+ librados::AioCompletion *completion)
+{
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+
+ cls_log_info(op, header);
+
+ int ret = obj.aio_operate(completion, &op, nullptr);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
+int RGWSI_Cls::TimeLog::trim(const string& oid,
+ const real_time& start_time,
+ const real_time& end_time,
+ const string& from_marker,
+ const string& to_marker,
+ librados::AioCompletion *completion,
+ optional_yield y)
+{
+ RGWSI_RADOS::Obj obj;
+
+ int r = init_obj(oid, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ utime_t st(start_time);
+ utime_t et(end_time);
+
+ librados::ObjectWriteOperation op;
+ cls_log_trim(op, st, et, from_marker, to_marker);
+
+ if (!completion) {
+ r = obj.operate(&op, y);
+ } else {
+ r = obj.aio_operate(completion, &op);
+ }
+ return r;
+}
+
#pragma once
#include "cls/otp/cls_otp_types.h"
+#include "cls/log/cls_log_types.h"
#include "rgw/rgw_service.h"
RGWObjVersionTracker *objv_tracker, ceph::real_time *pmtime, optional_yield y);
} mfa;
- RGWSI_Cls(CephContext *cct): RGWServiceInstance(cct), mfa(cct) {}
+ class TimeLog : public ClsSubService {
+ int init_obj(const string& oid, RGWSI_RADOS::Obj& obj);
+ public:
+ TimeLog(CephContext *cct): ClsSubService(cct) {}
+
+ void prepare_entry(cls_log_entry& entry,
+ const real_time& ut,
+ const string& section,
+ const string& key,
+ bufferlist& bl);
+ int add(const string& oid,
+ const real_time& ut,
+ const string& section,
+ const string& key,
+ bufferlist& bl,
+ optional_yield y);
+ int add(const string& oid,
+ std::list<cls_log_entry>& entries,
+ librados::AioCompletion *completion,
+ bool monotonic_inc,
+ optional_yield y);
+ int list(const string& oid,
+ const real_time& start_time,
+ const real_time& end_time,
+ int max_entries, list<cls_log_entry>& entries,
+ const string& marker,
+ string *out_marker,
+ bool *truncated,
+ optional_yield y);
+ int info(const string& oid,
+ cls_log_header *header,
+ optional_yield y);
+ int info_async(RGWSI_RADOS::Obj& obj,
+ const string& oid,
+ cls_log_header *header,
+ librados::AioCompletion *completion);
+ int trim(const string& oid,
+ const real_time& start_time,
+ const real_time& end_time,
+ const string& from_marker,
+ const string& to_marker,
+ librados::AioCompletion *completion,
+ optional_yield y);
+ } timelog;
+
+ RGWSI_Cls(CephContext *cct): RGWServiceInstance(cct), mfa(cct), timelog(cct) {}
void init(RGWSI_Zone *_zone_svc, RGWSI_RADOS *_rados_svc) {
rados_svc = _rados_svc;
zone_svc = _zone_svc;
mfa.init(this, zone_svc, rados_svc);
+ timelog.init(this, zone_svc, rados_svc);
}
int do_start() override;
#include "svc_mdlog.h"
+#include "svc_rados.h"
#include "svc_zone.h"
#include "svc_sys_obj.h"
RGWSI_MDLog::~RGWSI_MDLog() {
}
-int RGWSI_MDLog::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc)
+int RGWSI_MDLog::init(RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc)
{
svc.zone = zone_svc;
svc.sysobj = sysobj_svc;
svc.mdlog = this;
+ svc.rados = rados_svc;
return 0;
}
Cursor *cursor;
RGWObjVersionTracker *objv_tracker;
RGWMetadataLogHistory state;
+ RGWAsyncRadosProcessor *async_processor;
+
public:
ReadHistoryCR(const Svc& svc,
Cursor *cursor,
RGWObjVersionTracker *objv_tracker)
: RGWCoroutine(svc.zone->ctx()), svc(svc),
- cursor(cursor),
- objv_tracker(objv_tracker)
+ cursor(cursor),
+ objv_tracker(objv_tracker),
+ async_processor(svc.rados->get_async_processor())
{}
int operate() {
constexpr bool empty_on_enoent = false;
using ReadCR = RGWSimpleRadosReadCR<RGWMetadataLogHistory>;
- call(new ReadCR(store->get_async_rados(), svc.sysobj, obj,
+ call(new ReadCR(async_processor, svc.sysobj, obj,
&state, empty_on_enoent, objv_tracker));
}
if (retcode < 0) {
Cursor cursor;
RGWObjVersionTracker *objv;
RGWMetadataLogHistory state;
+ RGWAsyncRadosProcessor *async_processor;
+
public:
WriteHistoryCR(Svc& svc,
const Cursor& cursor,
RGWObjVersionTracker *objv)
: RGWCoroutine(svc.zone->ctx()), svc(svc),
- cursor(cursor), objv(objv)
+ cursor(cursor), objv(objv),
+ async_processor(svc.rados->get_async_processor())
{}
int operate() {
RGWMetadataLogHistory::oid};
using WriteCR = RGWSimpleRadosWriteCR<RGWMetadataLogHistory>;
- call(new WriteCR(store->get_async_rados(), svc.sysobj, obj, state, objv));
+ call(new WriteCR(async_processor, svc.sysobj, obj, state, objv));
}
if (retcode < 0) {
ldout(cct, 1) << "failed to write mdlog history: "
class RGWSI_Zone;
class RGWSI_SysObj;
+class RGWSI_RADOS;
namespace mdlog {
class ReadHistoryCR;
friend class mdlog::ReadHistoryCR;
friend class mdlog::WriteHistoryCR;
+ RGWSI_RADOS *rados_svc{nullptr};
RGWSI_Zone *zone_svc{nullptr};
RGWSI_SysObj *sysobj_svc{nullptr};
virtual ~RGWSI_MDLog();
struct Svc {
+ RGWSI_RADOS *rados{nullptr};
RGWSI_Zone *zone{nullptr};
RGWSI_SysObj *sysobj{nullptr};
RGWSI_MDLog *mdlog{nullptr};
} svc;
- int init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc);
+ int init(RGWSI_RADOS *_rados_svc,
+ RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj *_sysobj_svc);
int do_start() override;