From 9bf5cf752d4edf3862ccaa8aca83154c1a99709c Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Sun, 13 Dec 2020 21:13:44 -0500 Subject: [PATCH] rgw: add gen parameter to RGWDataChangesLog::add_entry Signed-off-by: Adam C. Emerson --- src/rgw/rgw_bucket.cc | 4 +++- src/rgw/rgw_datalog.cc | 20 ++++++++++++++------ src/rgw/rgw_datalog.h | 8 +++++--- src/rgw/rgw_rados.cc | 24 ++++++++++++++++++------ src/rgw/services/svc_bi_rados.cc | 4 ++-- 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index e76d0cd0f6bd7..4c450e77e0042 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1061,7 +1061,9 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, map *at } for (int i = 0; i < shards_num; ++i, ++shard_id) { - r = store->svc()->datalog_rados->add_entry(dpp, bucket_info, shard_id); + r = store->svc()->datalog_rados->add_entry(dpp, bucket_info, + bucket_info.layout.logs.back(), + shard_id); if (r < 0) { set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r)); return r; diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index 122e5336f2d90..42f0fe7b10833 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -13,6 +13,7 @@ #include "cls/fifo/cls_fifo_types.h" #include "cls_fifo_legacy.h" +#include "rgw_bucket_layout.h" #include "rgw_datalog.h" #include "rgw_tools.h" @@ -612,7 +613,7 @@ int RGWDataChangesLog::renew_entries() l.unlock(); auto ut = real_clock::now(); - for (const auto& bs : entries) { + for (const auto& [bs, gen_id] : entries) { auto index = choose_oid(bs); rgw_data_change change; @@ -620,6 +621,7 @@ int RGWDataChangesLog::renew_entries() change.entity_type = ENTITY_TYPE_BUCKET; change.key = bs.get_key(); change.timestamp = ut; + change.gen_id = gen_id; encode(change, bl); m[index].first.push_back(bs); @@ -659,10 +661,11 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, } } -void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs) +void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, + const rgw::bucket_log_layout_generation& gen) { std::scoped_lock l{lock}; - cur_cycle.insert(bs); + cur_cycle.insert({bs, gen.gen}); } void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, @@ -698,7 +701,11 @@ std::string RGWDataChangesLog::get_oid(int i) const { return be->get_oid(i); } -int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) { +int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id) +{ auto& bucket = bucket_info.bucket; if (!filter_bucket(dpp, bucket, null_yield)) { @@ -731,7 +738,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI if (now < status->cur_expiration) { /* no need to send, recently completed */ sl.unlock(); - register_renew(bs); + register_renew(bs, gen); return 0; } @@ -748,7 +755,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI int ret = cond->wait(); cond->put(); if (!ret) { - register_renew(bs); + register_renew(bs, gen); } return ret; } @@ -773,6 +780,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI change.entity_type = ENTITY_TYPE_BUCKET; change.key = bs.get_key(); change.timestamp = now; + change.gen_id = gen.gen; encode(change, bl); ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index 02868ce62e7ab..eee73d4286b8f 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -199,10 +199,11 @@ class RGWDataChangesLog { lru_map changes; - bc::flat_set cur_cycle; + bc::flat_set> cur_cycle; void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); - void register_renew(const rgw_bucket_shard& bs); + void register_renew(const rgw_bucket_shard& bs, + const rgw::bucket_log_layout_generation& gen); void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration); ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); @@ -225,7 +226,8 @@ public: int start(const RGWZone* _zone, const RGWZoneParams& zoneparams, RGWSI_Cls *cls_svc, librados::Rados* lr); - int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id); + int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int list_entries(int shard, int max_entries, std::vector& entries, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index d3381d22bc1a5..8dd18a92c745b 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -859,7 +859,9 @@ int RGWIndexCompletionThread::process() /* ignoring error, can't do anything about it */ continue; } - r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id); + r = store->svc.datalog_rados->add_entry(this, bucket_info, + bucket_info.layout.logs.back(), + bs.shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -5038,7 +5040,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi return r; } - r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; return r; @@ -6108,7 +6112,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64 ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace); - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -6135,7 +6141,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp, ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace); - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -6161,7 +6169,9 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp) * for following the specific bucket shard log. Otherwise they end up staying behind, and users * have no way to tell that they're all caught up */ - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { lderr(store->ctx()) << "ERROR: failed writing data log" << dendl; } @@ -6795,7 +6805,9 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo return r; } - r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id); + r = svc.datalog_rados->add_entry(dpp, bucket_info, + bucket_info.layout.logs.back(), + bs.shard_id); if (r < 0) { ldout(cct, 0) << "ERROR: failed writing data log" << dendl; } diff --git a/src/rgw/services/svc_bi_rados.cc b/src/rgw/services/svc_bi_rados.cc index eaba9c1cea2b8..7eef30bed6e25 100644 --- a/src/rgw/services/svc_bi_rados.cc +++ b/src/rgw/services/svc_bi_rados.cc @@ -475,7 +475,8 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp, } for (int i = 0; i < shards_num; ++i, ++shard_id) { - ret = svc.datalog_rados->add_entry(dpp, info, shard_id); + ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(), + shard_id); if (ret < 0) { lderr(cct) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl; return ret; @@ -485,4 +486,3 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp, return 0; } - -- 2.39.5