From 8f96ccfa2e35b24c6fa14d3ec18c618b36365779 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 8 Feb 2022 13:11:44 -0500 Subject: [PATCH] rgw: Add generation to ChangeStatus Signed-off-by: Adam C. Emerson --- src/rgw/rgw_datalog.cc | 32 +++++++++++++++++--------------- src/rgw/rgw_datalog.h | 42 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index 246332e195958..1212a9398c51c 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -514,7 +514,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) /* we can't keep the bucket name as part of the cls_log_entry, and we need * it later, so we keep two lists under the map */ - bc::flat_map, + bc::flat_map, RGWDataChangesBE::entries>> m; std::unique_lock l(lock); @@ -535,7 +535,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) change.gen = gen; encode(change, bl); - m[index].first.push_back(bs); + m[index].first.push_back({bs, gen}); be->prepare(ut, change.key, std::move(bl), m[index].second); } @@ -554,22 +554,25 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) auto expiration = now; expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); - for (auto& bs : buckets) { - update_renewed(bs, expiration); + for (auto& [bs, gen] : buckets) { + update_renewed(bs, gen, expiration); } } return 0; } -void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, - ChangeStatusPtr& status) +auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, + uint64_t gen) + -> ChangeStatusPtr { ceph_assert(ceph_mutex_is_locked(lock)); - if (!changes.find(bs, status)) { - status = ChangeStatusPtr(new ChangeStatus); - changes.add(bs, status); + ChangeStatusPtr status; + if (!changes.find({bs, gen}, status)) { + status = std::make_shared(); + changes.add({bs, gen}, status); } + return status; } void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, @@ -580,11 +583,11 @@ void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, } void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, + uint64_t gen, real_time expiration) { std::scoped_lock l{lock}; - ChangeStatusPtr status; - _get_change(bs, status); + auto status = _get_change(bs, gen); ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id @@ -637,8 +640,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, std::unique_lock l(lock); - ChangeStatusPtr status; - _get_change(bs, status); + auto status = _get_change(bs, gen.gen); l.unlock(); auto now = real_clock::now(); @@ -646,8 +648,8 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, std::unique_lock sl(status->lock); ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name - << " shard_id=" << shard_id << " now=" << now - << " cur_expiration=" << status->cur_expiration << dendl; + << " shard_id=" << shard_id << " now=" << now + << " cur_expiration=" << status->cur_expiration << dendl; if (now < status->cur_expiration) { /* no need to send, recently completed */ diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index 423316bee0d34..d54f76a3249a2 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -198,6 +198,38 @@ public: int trim_generations(const DoutPrefixProvider *dpp, std::optional& through); }; +struct BucketGen { + rgw_bucket_shard shard; + uint64_t gen; + + BucketGen(const rgw_bucket_shard& shard, uint64_t gen) + : shard(shard), gen(gen) {} + + BucketGen(rgw_bucket_shard&& shard, uint64_t gen) + : shard(std::move(shard)), gen(gen) {} + + BucketGen(const BucketGen&) = default; + BucketGen(BucketGen&&) = default; + BucketGen& operator =(const BucketGen&) = default; + BucketGen& operator =(BucketGen&&) = default; + + ~BucketGen() = default; +}; + +inline bool operator ==(const BucketGen& l, const BucketGen& r) { + return (l.shard == r.shard) && (l.gen == r.gen); +} + +inline bool operator <(const BucketGen& l, const BucketGen& r) { + if (l.shard < r.shard) { + return true; + } else if (l.shard == r.shard) { + return l.gen < r.gen; + } else { + return false; + } +} + class RGWDataChangesLog { friend DataLogBackends; CephContext *cct; @@ -234,14 +266,16 @@ class RGWDataChangesLog { using ChangeStatusPtr = std::shared_ptr; - lru_map changes; + lru_map changes; - bc::flat_set> cur_cycle; + bc::flat_set cur_cycle; - void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); + ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen); void register_renew(const rgw_bucket_shard& bs, const rgw::bucket_log_layout_generation& gen); - void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration); + void update_renewed(const rgw_bucket_shard& bs, + uint64_t gen, + ceph::real_time expiration); ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); ceph::condition_variable renew_cond; -- 2.39.5