RGWDataChangesLog::LogMarker log_marker;
do {
- list<rgw_data_change_log_entry> entries;
+ std::vector<rgw_data_change_log_entry> entries;
if (specified_shard_id) {
- ret = datalog_svc->list_entries(shard_id, {}, {}, max_entries - count,
- entries, marker, &marker, &truncated);
+ ret = datalog_svc->list_entries(shard_id, max_entries - count,
+ entries,
+ marker.empty() ?
+ std::nullopt :
+ std::make_optional(marker),
+ &marker, &truncated);
} else {
- ret = datalog_svc->list_entries({}, {}, max_entries - count, entries,
+ ret = datalog_svc->list_entries(max_entries - count, entries,
log_marker, &truncated);
}
if (ret < 0) {
count += entries.size();
- for (list<rgw_data_change_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_data_change_log_entry& entry = *iter;
+ for (const auto& entry : entries) {
if (!extra_info) {
encode_json("entry", entry.entry, formatter.get());
} else {
// loop until -ENODATA
do {
auto datalog = store->svc()->datalog_rados;
- ret = datalog->trim_entries(shard_id, {}, {}, {}, marker);
+ ret = datalog->trim_entries(shard_id, marker);
} while (ret == 0);
if (ret < 0 && ret != -ENODATA) {
#include <boost/format.hpp>
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include "fmt/format.h"
+
#include "common/errno.h"
#include "common/ceph_json.h"
#include "include/scope_guard.h"
}
for (int i = 0; i < num_shards; i++) {
- char buf[16];
- snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
- oids[i] = buf;
+ oids[i] = get_oid(i);
}
renew_thread = new ChangesRenewThread(cct, this);
return bucket_filter(bucket, y);
}
+std::string RGWDataChangesLog::get_oid(int i) const {
+ std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
+ if (prefix.empty()) {
+ prefix = "data_log"sv;
+ }
+ return fmt::format("{}.{}", prefix, i);
+}
+
+
int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
auto& bucket = bucket_info.bucket;
status->lock.lock();
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
+ << " shard_id=" << shard_id << " now=" << now
+ << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
status->lock.unlock();
-
+
bufferlist bl;
rgw_data_change change;
change.entity_type = ENTITY_TYPE_BUCKET;
return ret;
}
-int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries,
- const string& marker,
- string *out_marker,
- bool *truncated) {
- if (shard >= num_shards)
- return -EINVAL;
-
- list<cls_log_entry> log_entries;
+int RGWDataChangesLog::list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated)
+{
+ assert(shard < num_shards);
+ std::list<cls_log_entry> log_entries;
- int ret = svc.cls->timelog.list(oids[shard], start_time, end_time,
- max_entries, log_entries, marker,
- out_marker, truncated, null_yield);
+ int ret = svc.cls->timelog.list(oids[shard], {}, {},
+ max_entries, log_entries,
+ std::string(marker.value_or("")),
+ out_marker, truncated, null_yield);
if (ret < 0)
return ret;
- list<cls_log_entry>::iterator iter;
- for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+ for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
rgw_data_change_log_entry log_entry;
log_entry.log_id = iter->id;
real_time rt = iter->timestamp.to_real_time();
return 0;
}
-int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
+int RGWDataChangesLog::list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ LogMarker& marker, bool *ptruncated)
+{
bool truncated;
entries.clear();
for (; marker.shard < num_shards && (int)entries.size() < max_entries;
- marker.shard++, marker.marker.clear()) {
- int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
- marker.marker, NULL, &truncated);
+ marker.shard++, marker.marker.reset()) {
+ int ret = list_entries(marker.shard, max_entries - entries.size(),
+ entries, marker.marker, NULL, &truncated);
if (ret == -ENOENT) {
continue;
}
int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
{
- if (shard_id >= num_shards)
- return -EINVAL;
-
+ assert(shard_id < num_shards);
string oid = oids[shard_id];
cls_log_header header;
return 0;
}
-int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
- const string& start_marker, const string& end_marker)
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
{
- if (shard_id > num_shards)
- return -EINVAL;
+ assert(shard_id < num_shards);
+ return svc.cls->timelog.trim(oids[shard_id], {}, {},
+ {}, std::string(marker), nullptr, null_yield);
+}
- return svc.cls->timelog.trim(oids[shard_id], start_time, end_time,
- start_marker, end_marker, nullptr, null_yield);
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c)
+{
+ assert(shard_id < num_shards);
+ return svc.cls->timelog.trim(oids[shard_id], {}, {},
+ {}, std::string(marker), c, null_yield);
}
bool RGWDataChangesLog::going_down()
RGWBucketInstanceMetadataHandler() {}
void init(RGWSI_Zone *zone_svc,
- RGWSI_Bucket *bucket_svc,
- RGWSI_BucketIndex *bi_svc) {
+ RGWSI_Bucket *bucket_svc,
+ RGWSI_BucketIndex *bi_svc) override {
base_init(bucket_svc->ctx(),
bucket_svc->get_bi_be_handler().get());
svc.zone = zone_svc;
struct RGWDataChangesLogMarker {
int shard = 0;
- std::string marker;
+ std::optional<std::string> marker;
RGWDataChangesLogMarker() = default;
};
~RGWDataChangesLog();
int choose_oid(const rgw_bucket_shard& bs);
- const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
+ std::string get_oid(int shard_id) const;
int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
int renew_entries();
- int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries,
- const string& marker,
- string *out_marker,
- bool *truncated);
- int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
- const string& start_marker, const string& end_marker);
+ int list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated);
+ int trim_entries(int shard_id, std::string_view marker);
+ int trim_entries(int shard_id, std::string_view marker,
+ librados::AioCompletion* c); // :(
int get_info(int shard_id, RGWDataChangesLogInfo *info);
using LogMarker = RGWDataChangesLogMarker;
- int list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated);
+ int list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ LogMarker& marker, bool* ptruncated);
void mark_modified(int shard_id, const rgw_bucket_shard& bs);
void read_clear_modified(map<int, set<string> > &modified);
// Note that last_marker is updated to be the marker of the last
// entry listed
- http_ret = store->svc()->datalog_rados->list_entries(shard_id, {}, {},
+ http_ret = store->svc()->datalog_rados->list_entries(shard_id,
max_entries, entries,
marker, &last_marker,
&truncated);
s->formatter->dump_bool("truncated", truncated);
{
s->formatter->open_array_section("entries");
- for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
- iter != entries.end(); ++iter) {
- rgw_data_change_log_entry& entry = *iter;
+ for (const auto& entry : entries) {
if (!extra_info) {
encode_json("entry", entry.entry, s->formatter);
} else {
return;
}
- http_ret = store->svc()->datalog_rados->trim_entries(shard_id, {}, {}, {},
- marker);
+ http_ret = store->svc()->datalog_rados->trim_entries(shard_id, marker);
}
// not in header to avoid pulling in rgw_sync.h
};
class RGWOp_DATALog_List : public RGWRESTOp {
- list<rgw_data_change_log_entry> entries;
- string last_marker;
+ std::vector<rgw_data_change_log_entry> entries;
+ std::string last_marker;
bool truncated;
bool extra_info;
public:
return log->get_log_shard_id(bucket, shard_id);
}
-const std::string& RGWSI_DataLog_RADOS::get_oid(int shard_id) const
+std::string RGWSI_DataLog_RADOS::get_oid(int shard_id) const
{
return log->get_oid(shard_id);
}
return log->add_entry(bucket_info, shard_id);
}
-int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries,
- const string& marker,
- string *out_marker,
- bool *truncated)
+int RGWSI_DataLog_RADOS::list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker,
+ bool* truncated)
{
- return log->list_entries(shard, start_time, end_time, max_entries,
+ return log->list_entries(shard, max_entries,
entries, marker, out_marker, truncated);
}
-int RGWSI_DataLog_RADOS::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated)
+int RGWSI_DataLog_RADOS::list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated)
{
- return log->list_entries(start_time, end_time, max_entries,
- entries, marker, ptruncated);
+ return log->list_entries(max_entries, entries, marker, ptruncated);
}
-int RGWSI_DataLog_RADOS::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
- const string& start_marker, const string& end_marker)
+int RGWSI_DataLog_RADOS::trim_entries(int shard_id, std::string_view marker)
{
- return log->trim_entries(shard_id, start_time, end_time, start_marker, end_marker);
+ return log->trim_entries(shard_id, marker);
}
void set_observer(rgw::BucketChangeObserver *observer);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
- const std::string& get_oid(int shard_id) const;
+ std::string get_oid(int shard_id) const;
int get_info(int shard_id, RGWDataChangesLogInfo *info);
int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
- int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries,
- const string& marker,
- string *out_marker,
- bool *truncated);
- int list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
- list<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated);
- int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
- const string& start_marker, const string& end_marker);
+ int list_entries(int shard, int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ std::optional<std::string_view> marker,
+ std::string* out_marker, bool* truncated);
+ int list_entries(int max_entries,
+ std::vector<rgw_data_change_log_entry>& entries,
+ RGWDataChangesLogMarker& marker, bool *ptruncated);
+ int trim_entries(int shard_id, std::string_view marker);
};
-