}
for (int i = 0; i < shards_num; ++i, ++shard_id) {
- r = static_cast<rgw::sal::RadosStore*>(store)->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), shard_id);
+ r = static_cast<rgw::sal::RadosStore*>(store)
+ ->svc()->datalog_rados->add_entry(dpp, bucket->get_info(),
+ bucket->get_info().layout.logs.back(),
+ shard_id);
if (r < 0) {
set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
return r;
#include "cls/log/cls_log_client.h"
#include "cls_fifo_legacy.h"
+#include "rgw_bucket_layout.h"
#include "rgw_datalog.h"
#include "rgw_log_backing.h"
#include "rgw_tools.h"
auto ut = real_clock::now();
auto be = bes->head();
- for (const auto& bs : entries) {
+ for (const auto& [bs, gen_id] : entries) {
auto index = choose_oid(bs);
rgw_data_change change;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bs.get_key();
change.timestamp = ut;
+ change.gen_id = gen_id;
encode(change, bl);
m[index].first.push_back(bs);
}
}
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
+ const rgw::bucket_log_layout_generation& gen)
{
std::scoped_lock l{lock};
- cur_cycle.insert(bs);
+ cur_cycle.insert({bs, gen.gen});
}
void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
fmt::format("{}.{}", prefix, i));
}
-int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
+int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& gen,
+ int shard_id)
+{
auto& bucket = bucket_info.bucket;
if (!filter_bucket(dpp, bucket, null_yield)) {
if (now < status->cur_expiration) {
/* no need to send, recently completed */
sl.unlock();
- register_renew(bs);
+ register_renew(bs, gen);
return 0;
}
int ret = cond->wait();
cond->put();
if (!ret) {
- register_renew(bs);
+ register_renew(bs, gen);
}
return ret;
}
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bs.get_key();
change.timestamp = now;
+ change.gen_id = gen.gen;
encode(change, bl);
ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
- bc::flat_set<rgw_bucket_shard> cur_cycle;
+ bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
- void register_renew(const rgw_bucket_shard& bs);
+ void register_renew(const rgw_bucket_shard& bs,
+ const rgw::bucket_log_layout_generation& gen);
void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
librados::Rados* lr);
- int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
+ int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& gen, int shard_id);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
/* ignoring error, can't do anything about it */
continue;
}
- r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
+ r = store->svc.datalog_rados->add_entry(this, bucket_info,
+ bucket_info.layout.logs.back(),
+ bs.shard_id);
if (r < 0) {
ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl;
}
return r;
}
- r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+ r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+ target->bucket_info.layout.logs.back(),
+ bs->shard_id);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
return r;
ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
- int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+ target->bucket_info.layout.logs.back(),
+ bs->shard_id);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
}
ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
- int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+ target->bucket_info.layout.logs.back(),
+ bs->shard_id);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
}
* for following the specific bucket shard log. Otherwise they end up staying behind, and users
* have no way to tell that they're all caught up
*/
- int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+ int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+ target->bucket_info.layout.logs.back(),
+ bs->shard_id);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
}
return r;
}
- r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id);
+ r = svc.datalog_rados->add_entry(dpp, bucket_info,
+ bucket_info.layout.logs.back(),
+ bs.shard_id);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed writing data log" << dendl;
}
}
for (int i = 0; i < shards_num; ++i, ++shard_id) {
- ret = svc.datalog_rados->add_entry(dpp, info, shard_id);
+ ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(),
+ shard_id);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl;
return ret;