/* we can't keep the bucket name as part of the cls_log_entry, and we need
* it later, so we keep two lists under the map */
- bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
+ bc::flat_map<int, std::pair<std::vector<BucketGen>,
RGWDataChangesBE::entries>> m;
std::unique_lock l(lock);
change.gen = gen;
encode(change, bl);
- m[index].first.push_back(bs);
+ m[index].first.push_back({bs, gen});
be->prepare(ut, change.key, std::move(bl), m[index].second);
}
auto expiration = now;
expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
- for (auto& bs : buckets) {
- update_renewed(bs, expiration);
+ for (auto& [bs, gen] : buckets) {
+ update_renewed(bs, gen, expiration);
}
}
return 0;
}
-void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
- ChangeStatusPtr& status)
+auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
+ uint64_t gen)
+ -> ChangeStatusPtr
{
ceph_assert(ceph_mutex_is_locked(lock));
- if (!changes.find(bs, status)) {
- status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bs, status);
+ ChangeStatusPtr status;
+ if (!changes.find({bs, gen}, status)) {
+ status = std::make_shared<ChangeStatus>();
+ changes.add({bs, gen}, status);
}
+ return status;
}
void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
}
void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
+ uint64_t gen,
real_time expiration)
{
std::scoped_lock l{lock};
- ChangeStatusPtr status;
- _get_change(bs, status);
+ auto status = _get_change(bs, gen);
ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
<< bs.bucket.name << " shard_id=" << bs.shard_id
std::unique_lock l(lock);
- ChangeStatusPtr status;
- _get_change(bs, status);
+ auto status = _get_change(bs, gen.gen);
l.unlock();
auto now = real_clock::now();
std::unique_lock sl(status->lock);
ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
- << " shard_id=" << shard_id << " now=" << now
- << " cur_expiration=" << status->cur_expiration << dendl;
+ << " shard_id=" << shard_id << " now=" << now
+ << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
};
+struct BucketGen {
+ rgw_bucket_shard shard;
+ uint64_t gen;
+
+ BucketGen(const rgw_bucket_shard& shard, uint64_t gen)
+ : shard(shard), gen(gen) {}
+
+ BucketGen(rgw_bucket_shard&& shard, uint64_t gen)
+ : shard(std::move(shard)), gen(gen) {}
+
+ BucketGen(const BucketGen&) = default;
+ BucketGen(BucketGen&&) = default;
+ BucketGen& operator =(const BucketGen&) = default;
+ BucketGen& operator =(BucketGen&&) = default;
+
+ ~BucketGen() = default;
+};
+
+inline bool operator ==(const BucketGen& l, const BucketGen& r) {
+ return (l.shard == r.shard) && (l.gen == r.gen);
+}
+
+inline bool operator <(const BucketGen& l, const BucketGen& r) {
+ if (l.shard < r.shard) {
+ return true;
+ } else if (l.shard == r.shard) {
+ return l.gen < r.gen;
+ } else {
+ return false;
+ }
+}
+
class RGWDataChangesLog {
friend DataLogBackends;
CephContext *cct;
using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
- lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
+ lru_map<BucketGen, ChangeStatusPtr> changes;
- bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
+ bc::flat_set<BucketGen> cur_cycle;
- void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+ ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen);
void register_renew(const rgw_bucket_shard& bs,
const rgw::bucket_log_layout_generation& gen);
- void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
+ void update_renewed(const rgw_bucket_shard& bs,
+ uint64_t gen,
+ ceph::real_time expiration);
ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
ceph::condition_variable renew_cond;