services/svc_bucket.cc
services/svc_bucket_sobj.cc
services/svc_cls.cc
+ services/svc_datalog_rados.cc
services/svc_mdlog.cc
services/svc_meta.cc
services/svc_meta_be.cc
}
for (int i = 0; i < shards_num; ++i, ++shard_id) {
- r = store->data_log->add_entry(bucket_info.bucket, shard_id);
+ r = store->svc.datalog_rados->add_entry(bucket_info.bucket, shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
return r;
if (ret < 0)
return -ret;
- RGWDataChangesLog *log = store->data_log;
+ auto datalog_svc = store->svc.datalog_rados;
RGWDataChangesLog::LogMarker log_marker;
do {
list<rgw_data_change_log_entry> entries;
if (specified_shard_id) {
- ret = log->list_entries(shard_id, start_time.to_real_time(), end_time.to_real_time(), max_entries - count, entries, marker, NULL, &truncated);
+ ret = datalog_svc->list_entries(shard_id, start_time.to_real_time(), end_time.to_real_time(), max_entries - count, entries, marker, NULL, &truncated);
} else {
- ret = log->list_entries(start_time.to_real_time(), end_time.to_real_time(), max_entries - count, entries, log_marker, &truncated);
+ ret = datalog_svc->list_entries(start_time.to_real_time(), end_time.to_real_time(), max_entries - count, entries, log_marker, &truncated);
}
if (ret < 0) {
cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
}
if (opt_cmd == OPT_DATALOG_STATUS) {
- RGWDataChangesLog *log = store->data_log;
int i = (specified_shard_id ? shard_id : 0);
formatter->open_array_section("entries");
list<cls_log_entry> entries;
RGWDataChangesLogInfo info;
- log->get_info(i, &info);
+ store->svc.datalog_rados->get_info(i, &info);
::encode_json("info", info, formatter);
if (ret < 0)
return -ret;
- RGWDataChangesLog *log = store->data_log;
- ret = log->trim_entries(start_time.to_real_time(), end_time.to_real_time(), start_marker, end_marker);
+ ret = store->svc.datalog_rados->trim_entries(start_time.to_real_time(), end_time.to_real_time(), start_marker, end_marker);
if (ret < 0) {
cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
-RGWDataChangesLog::RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store),
+RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone *zone_svc,
+ RGWSI_Cls *cls_svc) : cct(zone_svc->ctx()),
lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"),
changes(cct->_conf->rgw_data_log_changes_size)
{
- svc.zone = store->svc.zone;
- svc.cls = store->svc.cls;
+ svc.zone = zone_svc;
+ svc.cls = cls_svc;
num_shards = cct->_conf->rgw_data_log_num_shards;
return choose_oid(bs);
}
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
+int RGWDataChangesLog::add_entry(const rgw_bucket& bucket, int shard_id) {
if (!svc.zone->need_to_log_data())
return 0;
return 0;
}
+#warning clean me up
+#if 0
+
int RGWDataChangesLog::lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id) {
return store->lock_exclusive(store->svc.zone->get_zone_params().log_pool, oids[shard_id], duration, zone_id, owner_id);
}
int RGWDataChangesLog::unlock(int shard_id, string& zone_id, string& owner_id) {
return store->unlock(store->svc.zone->get_zone_params().log_pool, oids[shard_id], zone_id, owner_id);
}
+#endif
bool RGWDataChangesLog::going_down()
{
class RGWDataChangesLog {
CephContext *cct;
- RGWRados *store;
rgw::BucketChangeObserver *observer = nullptr;
struct Svc {
public:
- RGWDataChangesLog(CephContext *_cct, RGWRados *_store);
+ RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc);
~RGWDataChangesLog();
int choose_oid(const rgw_bucket_shard& bs);
const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
- int add_entry(rgw_bucket& bucket, int shard_id);
+ int add_entry(const rgw_bucket& bucket, int shard_id);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
int renew_entries();
int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
int trim_entries(const real_time& start_time, const real_time& end_time,
const string& start_marker, const string& end_marker);
int get_info(int shard_id, RGWDataChangesLogInfo *info);
+#if 0
int lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id);
int unlock(int shard_id, string& zone_id, string& owner_id);
+#endif
struct LogMarker {
int shard;
string marker;
op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
cn = stack->create_completion_notifier();
- return bs.index_ctx.aio_operate(bs.bucket_obj, cn->completion(), &op);
+ return bs.bucket_obj.aio_operate(cn->completion(), &op);
}
int RGWRadosBILogTrimCR::request_complete()
#include "services/svc_zone.h"
#include "services/svc_sync_modules.h"
+#include "services/svc_datalog_rados.h"
#include "include/random.h"
char buf[16];
snprintf(buf, sizeof(buf), ":%d", i);
s = key + buf;
- yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+ yield entries_index->append(s, store->svc.data_log_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
}
} else {
- yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
+ yield entries_index->append(key, store->svc.data_log_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
}
}
truncated = result.truncated;
/* object exists, but policy is broken */
ldout(cct, 0) << "WARNING: couldn't find acl header for object, generating default" << dendl;
RGWUserInfo uinfo;
- ret = rgw_get_user_info_by_uid(store, bucket_info.owner, uinfo);
+ ret = store->ctl.user->get_info_by_uid(bucket_info.owner, &uinfo);
if (ret < 0)
return ret;
map<string, bufferlist>& bucket_attrs,
RGWAccessControlPolicy *policy)
{
- return get_bucket_instance_policy_from_attr(cct, store, bucket_info, bucket_attrs, policy);
+ return get_bucket_instance_policy_from_attr(cct, user_ctl, bucket_info, bucket_attrs, policy);
}
static boost::optional<Policy> get_iam_policy_from_attr(CephContext* cct,
s->bucket = s->bucket_info.bucket;
if (s->bucket_exists) {
- ret = read_bucket_policy(store, s, s->bucket_info, s->bucket_attrs,
+ ret = read_bucket_policy(store->ctl.user, s, s->bucket_info, s->bucket_attrs,
s->bucket_acl.get(), s->bucket);
acct_acl_user = {
s->bucket_info.owner,
{
/* Params have been extracted earlier. See init_processing(). */
RGWUserInfo new_uinfo;
- op_ret = store->ctl.user->get_info_by_uid(s->user->user_id, new_uinfo,
+ op_ret = store->ctl.user->get_info_by_uid(s->user->user_id, &new_uinfo,
RGWUserCtl::GetParams()
.set_objv_tracker(&acct_op_tracker));
if (op_ret < 0) {
/* We are passing here the current (old) user info to allow the function
* optimize-out some operations. */
- op_ret = store->ctl.user->store_info(new_uinfo, RGWCtl::PutParams()
- .set_old_info(s->user),
+ op_ret = store->ctl.user->store_info(new_uinfo, RGWUserCtl::PutParams()
+ .set_old_info(s->user)
.set_objv_tracker(&acct_op_tracker)
.set_attrs(&attrs));
}
int RGWUserStatsCache::fetch_stats_from_storage(const rgw_user& user, const rgw_bucket& bucket, RGWStorageStats& stats)
{
- int r = store->ctl.user->get_user_stats(user, stats);
+ int r = store->ctl.user->read_stats(user, &stats);
if (r < 0) {
ldout(store->ctx(), 0) << "could not get user stats for user=" << user << dendl;
return r;
#include "services/svc_sys_obj_cache.h"
#include "services/svc_bucket.h"
#include "services/svc_mdlog.h"
+#include "services/svc_datalog_rados.h"
#include "compressor/Compressor.h"
int RGWDataNotifier::process()
{
- if (!store->data_log) {
+ auto data_log = store->svc.datalog_rados->get_log();
+ if (!data_log) {
return 0;
}
map<int, set<string> > shards;
- store->data_log->read_clear_modified(shards);
+ data_log->read_clear_modified(shards);
if (shards.empty()) {
return 0;
/* ignoring error, can't do anything about it */
continue;
}
- r = store->data_log->add_entry(bs.bucket, bs.shard_id);
+ r = store->svc.datalog_rados->add_entry(bs.bucket, bs.shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
data_notifier->stop();
delete data_notifier;
}
- delete data_log;
delete sync_tracer;
delete lc;
return ret;
}
- data_log = new RGWDataChangesLog(cct, this);
cr_registry = crs.release();
return ret;
}
ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
return ret;
}
- data_log->set_observer(&*bucket_trim);
+ svc.datalog_rados->set_observer(&*bucket_trim);
Mutex::Locker dl(data_sync_thread_lock);
for (auto source_zone : svc.zone->get_data_sync_source_zones()) {
}
if (target->bucket_info.datasync_flag_enabled()) {
- r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
return r;
ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
if (target->bucket_info.datasync_flag_enabled()) {
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
if (target->bucket_info.datasync_flag_enabled()) {
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
* have no way to tell that they're all caught up
*/
if (target->bucket_info.datasync_flag_enabled()) {
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(bs->bucket, bs->shard_id);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
}
}
if (log_data_change && bucket_info.datasync_flag_enabled()) {
- data_log->add_entry(bs.bucket, bs.shard_id);
+ svc.datalog_rados->add_entry(bs.bucket, bs.shard_id);
}
return 0;
quota_handler(NULL),
cr_registry(NULL),
pctl(&ctl),
- data_log(NULL), reshard(NULL) {}
+ reshard(NULL) {}
RGWRados& set_use_cache(bool status) {
use_cache = status;
*/
string host_id;
- RGWDataChangesLog *data_log;
-
RGWReshard *reshard;
std::shared_ptr<RGWReshardWait> reshard_wait;
if (ret < 0) {
return ret;
}
- ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
+ ret = bs.bucket_obj.aio_operate(c, &op);
if (ret < 0) {
derr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << dendl;
return ret;
#include "rgw_mdlog.h"
#include "services/svc_zone.h"
+#include "services/svc_mdlog.h"
+#include "services/svc_bilog_rados.h"
+#include "services/svc_datalog_rados.h"
#include "common/errno.h"
#include "include/ceph_assert.h"
void RGWOp_MDLog_Info::execute() {
num_objects = s->cct->_conf->rgw_md_log_max_shards;
- period = store->ctl.meta.mgr->read_oldest_log_period();
+ period = store->svc.mdlog->read_oldest_log_period();
http_ret = period.get_error();
}
return;
}
}
- RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.ctl, period};
+ RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.cls, period};
http_ret = meta_log.get_info(shard_id, &info);
}
return;
}
}
- RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.ctl, period};
+ RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.cls, period};
http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker);
}
return;
}
- RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.ctl, period};
+ RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.cls, period};
unsigned dur;
dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
if (!err.empty() || dur <= 0) {
return;
}
- RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.ctl, period};
+ RGWMetadataLog meta_log{s->cct, store->svc.zone, store->svc.cls, period};
http_ret = meta_log.unlock(shard_id, zone_id, locker_id);
}
send_response();
do {
list<rgw_bi_log_entry> entries;
- int ret = store->svc.bilog->log_list(bucket_info, shard_id,
- marker, max_entries - count,
- entries, &truncated);
+ int ret = store->svc.bilog_rados->log_list(bucket_info, shard_id,
+ marker, max_entries - count,
+ entries, &truncated);
if (ret < 0) {
ldpp_dout(s, 5) << "ERROR: list_bi_log_entries()" << dendl;
return;
return;
}
}
- http_ret = store->svc.bilog->log_trim(bucket_info, shard_id, start_marker, end_marker);
+ http_ret = store->svc.bilog_rados->log_trim(bucket_info, shard_id, start_marker, end_marker);
if (http_ret < 0) {
ldpp_dout(s, 5) << "ERROR: trim_bi_log_entries() " << dendl;
}
// Note that last_marker is updated to be the marker of the last
// entry listed
- http_ret = store->data_log->list_entries(shard_id, ut_st, ut_et,
- max_entries, entries, marker,
- &last_marker, &truncated);
+ http_ret = store->svc.datalog_rados->list_entries(shard_id, ut_st, ut_et,
+ max_entries, entries, marker,
+ &last_marker, &truncated);
}
void RGWOp_DATALog_List::send_response() {
return;
}
- http_ret = store->data_log->get_info(shard_id, &info);
+ http_ret = store->svc.datalog_rados->get_info(shard_id, &info);
}
void RGWOp_DATALog_ShardInfo::send_response() {
flusher.flush();
}
-void RGWOp_DATALog_Lock::execute() {
- string shard_id_str, duration_str, locker_id, zone_id;
- unsigned shard_id;
-
- http_ret = 0;
-
- shard_id_str = s->info.args.get("id");
- duration_str = s->info.args.get("length");
- locker_id = s->info.args.get("locker-id");
- zone_id = s->info.args.get("zone-id");
-
- if (shard_id_str.empty() ||
- (duration_str.empty()) ||
- locker_id.empty() ||
- zone_id.empty()) {
- dout(5) << "Error invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- string err;
- shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
- if (!err.empty()) {
- dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- unsigned dur;
- dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err);
- if (!err.empty() || dur <= 0) {
- dout(5) << "invalid length param " << duration_str << dendl;
- http_ret = -EINVAL;
- return;
- }
- http_ret = store->data_log->lock_exclusive(shard_id, make_timespan(dur), zone_id, locker_id);
- if (http_ret == -EBUSY)
- http_ret = -ERR_LOCKED;
-}
-
-void RGWOp_DATALog_Unlock::execute() {
- string shard_id_str, locker_id, zone_id;
- unsigned shard_id;
-
- http_ret = 0;
-
- shard_id_str = s->info.args.get("id");
- locker_id = s->info.args.get("locker-id");
- zone_id = s->info.args.get("zone-id");
-
- if (shard_id_str.empty() ||
- locker_id.empty() ||
- zone_id.empty()) {
- dout(5) << "Error invalid parameter list" << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- string err;
- shard_id = (unsigned)strict_strtol(shard_id_str.c_str(), 10, &err);
- if (!err.empty()) {
- dout(5) << "Error parsing shard_id param " << shard_id_str << dendl;
- http_ret = -EINVAL;
- return;
- }
-
- http_ret = store->data_log->unlock(shard_id, zone_id, locker_id);
-}
-
void RGWOp_DATALog_Notify::execute() {
string source_zone = s->info.args.get("source-zone");
#define LARGE_ENOUGH_BUF (128 * 1024)
return;
}
- http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
+ http_ret = store->svc.datalog_rados->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker);
}
// not in header to avoid pulling in rgw_sync.h
else if (s->info.args.exists("notify"))
return new RGWOp_MDLog_Notify;
} else if (type.compare("data") == 0) {
- if (s->info.args.exists("lock"))
- return new RGWOp_DATALog_Lock;
- else if (s->info.args.exists("unlock"))
- return new RGWOp_DATALog_Unlock;
- else if (s->info.args.exists("notify"))
+ if (s->info.args.exists("notify"))
return new RGWOp_DATALog_Notify;
}
return NULL;
}
};
-class RGWOp_DATALog_Lock : public RGWRESTOp {
-public:
- RGWOp_DATALog_Lock() {}
- ~RGWOp_DATALog_Lock() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap("datalog", RGW_CAP_WRITE);
- }
- void execute() override;
- const char* name() const override {
- return "lock_datalog_object";
- }
-};
-
-class RGWOp_DATALog_Unlock : public RGWRESTOp {
-public:
- RGWOp_DATALog_Unlock() {}
- ~RGWOp_DATALog_Unlock() override {}
-
- int check_caps(RGWUserCaps& caps) override {
- return caps.check_cap("datalog", RGW_CAP_WRITE);
- }
- void execute() override;
- const char* name() const override {
- return "unlock_datalog_object";
- }
-};
-
class RGWOp_DATALog_Notify : public RGWRESTOp {
public:
RGWOp_DATALog_Notify() {}
RGWObjVersionTracker objv_tracker;
op_ret = store->ctl.user->store_info(info, RGWUserCtl::PutParams()
- .set_old_info(&info),
+ .set_old_info(&info)
.set_objv_tracker(&objv_tracker)
.set_attrs(&uattrs));
if (op_ret < 0) {
#include "services/svc_bilog_rados.h"
#include "services/svc_bucket_sobj.h"
#include "services/svc_cls.h"
+#include "services/svc_datalog_rados.h"
#include "services/svc_mdlog.h"
#include "services/svc_meta.h"
#include "services/svc_meta_be.h"
bi_rados = std::make_unique<RGWSI_BucketIndex_RADOS>(cct);
bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
cls = std::make_unique<RGWSI_Cls>(cct);
+ datalog_rados = std::make_unique<RGWSI_DataLog_RADOS>(cct);
mdlog = std::make_unique<RGWSI_MDLog>(cct);
meta = std::make_unique<RGWSI_Meta>(cct);
meta_be_sobj = std::make_unique<RGWSI_MetaBackend_SObj>(cct);
vector<RGWSI_MetaBackend *> meta_bes{meta_be_sobj.get(), meta_be_otp.get()};
finisher->init();
- bi_rados->init(zone.get(), rados.get(), bilog_rados.get());
+ bi_rados->init(zone.get(), rados.get(), bilog_rados.get(), datalog_rados.get());
bilog_rados->init(bi_rados.get());
bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
bi_rados.get(), meta.get(), meta_be_sobj.get(),
return r;
}
+ r = datalog_rados->start();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
r = mdlog->start();
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to start mdlog service (" << cpp_strerror(-r) << dendl;
return;
}
+ datalog_rados->shutdown();
+
sysobj->shutdown();
sysobj_core->shutdown();
notify->shutdown();
bucket_sobj = _svc.bucket_sobj.get();
bucket = bucket_sobj;
cls = _svc.cls.get();
+ datalog_rados = _svc.datalog_rados.get();
mdlog = _svc.mdlog.get();
meta = _svc.meta.get();
meta_be_sobj = _svc.meta_be_sobj.get();
class RGWSI_BucketIndex_RADOS;
class RGWSI_BILog_RADOS;
class RGWSI_Cls;
+class RGWSI_DataLog_RADOS;
class RGWSI_MDLog;
class RGWSI_Meta;
class RGWSI_MetaBackend;
std::unique_ptr<RGWSI_BucketIndex_RADOS> bi_rados;
std::unique_ptr<RGWSI_BILog_RADOS> bilog_rados;
std::unique_ptr<RGWSI_Cls> cls;
+ std::unique_ptr<RGWSI_DataLog_RADOS> datalog_rados;
std::unique_ptr<RGWSI_MDLog> mdlog;
std::unique_ptr<RGWSI_Meta> meta;
std::unique_ptr<RGWSI_MetaBackend_SObj> meta_be_sobj;
RGWSI_BucketIndex_RADOS *bi_rados{nullptr};
RGWSI_BILog_RADOS *bilog_rados{nullptr};
RGWSI_Cls *cls{nullptr};
+ RGWSI_DataLog_RADOS *datalog_rados{nullptr};
RGWSI_MDLog *mdlog{nullptr};
RGWSI_Meta *meta{nullptr};
RGWSI_MetaBackend *meta_be_sobj{nullptr};
ldout(cct, 10) << "trimming log shard " << i
<< " at marker=" << m
<< " last_trim=" << last_trim[i] << dendl;
- spawn(new TrimCR(store, store->data_log->get_oid(i),
+ spawn(new TrimCR(store, store->svc.datalog_rados->get_oid(i),
m, &last_trim[i]),
true);
}
int num_shards, utime_t interval)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards), interval(interval),
- lock_oid(store->data_log->get_oid(0)),
+ lock_oid(store->svc.datalog_rados->get_oid(0)),
lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
last_trim(num_shards)
{}
RGWStorageStats stats;
RGWStorageStats *arg_stats = NULL;
if (op_state.fetch_stats) {
- int ret = store->ctl.user->get_stats(info.user_id, stats);
+ int ret = store->ctl.user->read_stats(info.user_id, &stats);
if (ret < 0 && ret != -ENOENT) {
return ret;
}
int RGWUserCtl::read_stats(const rgw_user& user, RGWStorageStats *stats)
{
return be_handler->call([&](RGWSI_MetaBackend_Handler::Op *op) {
- return svc.user->reset_bucket_stats(op->ctx(), user, stats);
+ return svc.user->read_stats(op->ctx(), user, stats);
});
}
int flush_bucket_stats(const rgw_user& user,
const RGWBucketEnt& ent);
+ int complete_flush_stats(const rgw_user& user);
int reset_stats(const rgw_user& user);
int read_stats(const rgw_user& user, RGWStorageStats *stats);
};
#include "svc_bi_rados.h"
#include "svc_bilog_rados.h"
+#include "svc_datalog_rados.h"
#include "svc_zone.h"
#include "rgw/rgw_zone.h"
void RGWSI_BucketIndex_RADOS::init(RGWSI_Zone *zone_svc,
RGWSI_RADOS *rados_svc,
- RGWSI_BILog_RADOS *bilog_svc)
+ RGWSI_BILog_RADOS *bilog_svc,
+ RGWSI_DataLog_RADOS *datalog_rados_svc)
{
svc.zone = zone_svc;
svc.rados = rados_svc;
svc.bilog = bilog_svc;
+ svc.datalog_rados = datalog_rados_svc;
}
int RGWSI_BucketIndex_RADOS::open_pool(const rgw_pool& pool,
int shard_id,
RGWSI_RADOS::Obj *bucket_obj)
{
+ RGWSI_RADOS::Pool index_pool;
string bucket_oid_base;
- int ret = open_bucket_index_base(bucket_info, index_pool, &bucket_oid_base);
+ int ret = open_bucket_index_base(bucket_info, &index_pool, &bucket_oid_base);
if (ret < 0) {
ldout(cct, 20) << __func__ << ": open_bucket_index_pool() returned "
<< r << dendl;
get_bucket_index_object(bucket_oid_base, bucket_info.num_shards,
shard_id, &oid);
- *bucket_obj = svc.rados->obj(pool, oid);
+ *bucket_obj = svc.rados->obj(index_pool, oid);
return 0;
}
}
for (int i = 0; i < shards_num; ++i, ++shard_id) {
- ret = store->data_log->add_entry(info.bucket, shard_id);
+ ret = svc.datalog_rados->add_entry(info.bucket, shard_id);
if (ret < 0) {
lderr(cct) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl;
return ret;
struct rgw_bucket_dir_header;
class RGWSI_BILog_RADOS;
+class RGWSI_DataLog_RADOS;
#define RGW_NO_SHARD -1
RGWSI_Zone *zone{nullptr};
RGWSI_RADOS *rados{nullptr};
RGWSI_BILog_RADOS *bilog{nullptr};
+ RGWSI_DataLog_RADOS *datalog_rados{nullptr};
} svc;
RGWSI_BucketIndex_RADOS(CephContext *cct);
void init(RGWSI_Zone *zone_svc,
RGWSI_RADOS *rados_svc,
- RGWSI_BILog_RADOS *bilog_svc);
+ RGWSI_BILog_RADOS *bilog_svc,
+ RGWSI_DataLog_RADOS *datalog_rados_svc);
static int shards_max() {
return RGW_SHARDS_PRIME_1;
--- /dev/null
+#include "svc_datalog_rados.h"
+#include "svc_zone.h"
+#include "svc_cls.h"
+
+#include "rgw/rgw_bucket.h"
+
+
+#define dout_subsys ceph_subsys_rgw
+
+RGWSI_DataLog_RADOS::RGWSI_DataLog_RADOS(CephContext *cct) : RGWServiceInstance(cct) {
+}
+
+RGWSI_DataLog_RADOS::~RGWSI_DataLog_RADOS() {
+}
+
+int RGWSI_DataLog_RADOS::init(RGWSI_Zone *_zone_svc, RGWSI_Cls *_cls_svc)
+{
+ svc.zone = _zone_svc;
+ svc.cls = _cls_svc;
+
+ return 0;
+}
+
+int RGWSI_DataLog_RADOS::do_start()
+{
+ log.reset(new RGWDataChangesLog(svc.zone, svc.cls));
+
+ return 0;
+}
+
+void RGWSI_DataLog_RADOS::shutdown()
+{
+ log.reset();
+}
+
+void RGWSI_DataLog_RADOS::set_observer(rgw::BucketChangeObserver *observer)
+{
+ log->set_observer(observer);
+}
+
+int RGWSI_DataLog_RADOS::get_log_shard_id(rgw_bucket& bucket, int shard_id)
+{
+ return log->get_log_shard_id(bucket, shard_id);
+}
+
+const std::string& RGWSI_DataLog_RADOS::get_oid(int shard_id) const
+{
+ return log->get_oid(shard_id);
+}
+
+int RGWSI_DataLog_RADOS::get_info(int shard_id, RGWDataChangesLogInfo *info)
+{
+ return log->get_info(shard_id, info);
+}
+
+int RGWSI_DataLog_RADOS::add_entry(const rgw_bucket& bucket, int shard_id)
+{
+ return log->add_entry(bucket, shard_id);
+}
+
+int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
+ list<rgw_data_change_log_entry>& entries,
+ const string& marker,
+ string *out_marker,
+ bool *truncated)
+{
+ return log->list_entries(shard, start_time, end_time, max_entries,
+ entries, marker, out_marker, truncated);
+}
+
+int RGWSI_DataLog_RADOS::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
+ const string& start_marker, const string& end_marker)
+{
+ return log->trim_entries(shard_id, start_time, end_time, start_marker, end_marker);
+}
+
+int RGWSI_DataLog_RADOS::trim_entries(const real_time& start_time, const real_time& end_time,
+ const string& start_marker, const string& end_marker)
+{
+ return log->trim_entries(start_time, end_time, start_marker, end_marker);
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#pragma once
+
+#include "rgw/rgw_service.h"
+
+
+class RGWDataChangesLog;
+class RGWDataChangesLogInfo;
+struct rgw_data_change_log_entry;
+
+namespace rgw {
+ class BucketChangeObserver;
+}
+
+class RGWSI_DataLog_RADOS : public RGWServiceInstance
+{
+ std::unique_ptr<RGWDataChangesLog> log;
+
+public:
+ RGWSI_DataLog_RADOS(CephContext *cct);
+ virtual ~RGWSI_DataLog_RADOS();
+
+ struct Svc {
+ RGWSI_Zone *zone{nullptr};
+ RGWSI_Cls *cls{nullptr};
+ } svc;
+
+ int init(RGWSI_Zone *_zone_svc,
+ RGWSI_Cls *_cls_svc);
+
+ int do_start() override;
+ void shutdown() override;
+
+ RGWDataChangesLog *get_log() {
+ return log.get();
+ }
+
+ void set_observer(rgw::BucketChangeObserver *observer);
+
+ int get_log_shard_id(rgw_bucket& bucket, int shard_id);
+ const std::string& get_oid(int shard_id) const;
+
+ int get_info(int shard_id, RGWDataChangesLogInfo *info);
+
+ int add_entry(const rgw_bucket& bucket, int shard_id);
+ int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
+ list<rgw_data_change_log_entry>& entries,
+ const string& marker,
+ string *out_marker,
+ bool *truncated);
+ int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
+ const string& start_marker, const string& end_marker);
+ int trim_entries(const real_time& start_time, const real_time& end_time,
+ const string& start_marker, const string& end_marker);
+};
+
period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
current_period.get_id()));
- if (svc.zone->need_sync()) {
+ if (svc.zone->need_to_sync()) {
// initialize the log period history
svc.mdlog->init_oldest_log_period();
}
RGWSI_RADOS::Obj::Obj(Pool& pool, const string& oid) : rados_svc(pool.rados_svc), rados_handle(pool.rados_handle)
{
ref.pool = pool;
- ref.obj = rgw_obj(pool.get_pool(), oid);
+ ref.obj = rgw_raw_obj(pool.get_pool(), oid);
}
void RGWSI_RADOS::Obj::init(const rgw_raw_obj& obj)
const rgw_user& user) = 0;
virtual int reset_bucket_stats(RGWSI_MetaBackend::Context *ctx,
const rgw_user& user) = 0;
- virtual int read_stats(const rgw_user& user, RGWStorageStats *stats) = 0;
+ virtual int read_stats(RGWSI_MetaBackend::Context *ctx,
+ const rgw_user& user, RGWStorageStats *stats) = 0;
};
}
int RGWSI_User_RADOS::reset_bucket_stats(RGWSI_MetaBackend::Context *ctx,
- const rgw_user& user) override
+ const rgw_user& user)
{
return cls_user_reset_stats(user);
}
}
librados::ObjectWriteOperation op;
::cls_user_reset_stats(op);
- return rados_obj->operate(&op, null_yield);
+ return rados_obj.operate(&op, null_yield);
}
int RGWSI_User_RADOS::complete_flush_stats(RGWSI_MetaBackend::Context *ctx,
}
librados::ObjectWriteOperation op;
::cls_user_complete_stats_sync(op);
- return rados_obj->operate(&op, null_yield);
+ return rados_obj.operate(&op, null_yield);
}
int RGWSI_User_RADOS::cls_user_get_header(const rgw_user& user, cls_user_header *header)
bufferlist ibl;
librados::ObjectReadOperation op;
::cls_user_get_header(op, header, &rc);
- return rados_obj->operate(&op, &ibl, null_yield);
+ return rados_obj.operate(&op, &ibl, null_yield);
}
-int RGWSI_User_RADOS::read_stats(const rgw_user& user, RGWStorageStats *stats)
+int RGWSI_User_RADOS::read_stats(RGWSI_MetaBackend::Context *ctx,
+ const rgw_user& user, RGWStorageStats *stats)
{
string user_str = user.to_str();
int reset_bucket_stats(RGWSI_MetaBackend::Context *ctx,
const rgw_user& user) override;
- int read_stats(const rgw_user& user, RGWStorageStats *stats) override;
+ int read_stats(RGWSI_MetaBackend::Context *ctx,
+ const rgw_user& user, RGWStorageStats *stats) override;
};
bool RGWSI_Zone::need_to_sync() const
{
- return !(zonegroup.master_zone.empty() ||
+ return !(zonegroup->master_zone.empty() ||
!rest_master_conn ||
- current_period.get_id().empty());
+ current_period->get_id().empty());
}
bool RGWSI_Zone::need_to_log_data() const