From: Adam C. Emerson Date: Mon, 14 Dec 2020 02:13:44 +0000 (-0500) Subject: rgw: add gen parameter to RGWDataChangesLog::add_entry X-Git-Tag: v18.0.0~787^2~122 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6f83f07d7f1d5301b9bb99e557565087b4ccf1a3;p=ceph.git rgw: add gen parameter to RGWDataChangesLog::add_entry Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 2fd64ca5916b6..329e5e3582458 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -668,7 +668,10 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *d } for (int i = 0; i < shards_num; ++i, ++shard_id) { - r = static_cast(store)->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), shard_id); + r = static_cast(store) + ->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), + bucket->get_info().layout.logs.back(), + shard_id); if (r < 0) { set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r)); return r; diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index 5ef5d502c768a..2b55133da6ca5 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -15,6 +15,7 @@ #include "cls/log/cls_log_client.h" #include "cls_fifo_legacy.h" +#include "rgw_bucket_layout.h" #include "rgw_datalog.h" #include "rgw_log_backing.h" #include "rgw_tools.h" @@ -512,7 +513,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) auto ut = real_clock::now(); auto be = bes->head(); - for (const auto& bs : entries) { + for (const auto& [bs, gen_id] : entries) { auto index = choose_oid(bs); rgw_data_change change; @@ -520,6 +521,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp) change.entity_type = ENTITY_TYPE_BUCKET; change.key = bs.get_key(); change.timestamp = ut; + change.gen_id = gen_id; encode(change, bl); m[index].first.push_back(bs); @@ -559,10 +561,11 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, } } -void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs) +void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, + const rgw::bucket_log_layout_generation& gen) { std::scoped_lock l{lock}; - cur_cycle.insert(bs); + cur_cycle.insert({bs, gen.gen}); } void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, @@ -604,11 +607,11 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { fmt::format("{}.{}", prefix, i)); } -int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) { - if (!zone->log_data) { - return 0; - } - +int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, + const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, + int shard_id) +{ auto& bucket = bucket_info.bucket; if (!filter_bucket(dpp, bucket, null_yield)) { @@ -641,7 +644,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI if (now < status->cur_expiration) { /* no need to send, recently completed */ sl.unlock(); - register_renew(bs); + register_renew(bs, gen); return 0; } @@ -658,7 +661,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI int ret = cond->wait(); cond->put(); if (!ret) { - register_renew(bs); + register_renew(bs, gen); } return ret; } @@ -683,6 +686,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI change.entity_type = ENTITY_TYPE_BUCKET; change.key = bs.get_key(); change.timestamp = now; + change.gen_id = gen.gen; encode(change, bl); ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; diff --git a/src/rgw/rgw_datalog.h b/src/rgw/rgw_datalog.h index 7df3be937cd1d..2e1e0e990ab7f 100644 --- a/src/rgw/rgw_datalog.h +++ b/src/rgw/rgw_datalog.h @@ -208,10 +208,11 @@ class RGWDataChangesLog { lru_map changes; - bc::flat_set cur_cycle; + bc::flat_set> cur_cycle; void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); - void register_renew(const rgw_bucket_shard& bs); + void register_renew(const rgw_bucket_shard& bs, + const rgw::bucket_log_layout_generation& gen); void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration); ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock"); @@ -234,7 +235,8 @@ public: int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, librados::Rados* lr); - int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id); + int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, + const rgw::bucket_log_layout_generation& gen, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries, std::vector& entries, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index e80bd6d5bc491..c65965e0e05dc 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -920,7 +920,9 @@ void RGWIndexCompletionManager::process() continue; } - r = store->svc.datalog_rados->add_entry(&dpp, bucket_info, bs.shard_id); + r = store->svc.datalog_rados->add_entry(&dpp, bucket_info, + bucket_info.layout.logs.back(), + bs.shard_id); if (r < 0) { ldpp_dout(&dpp, -1) << "ERROR: failed writing data log" << dendl; } @@ -5163,7 +5165,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi return r; } - r = store->svc.datalog_rados->add_entry(dpp, target->get_bucket_info(), bs->shard_id); + r = store->svc.datalog_rados->add_entry(dpp, target->get_bucket_info(), + target->get_bucket_info().layout.logs.back(), + bs->shard_id); if (r < 0) { ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl; return r; @@ -6266,7 +6270,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64 ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace); - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl; } @@ -6293,7 +6299,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp, ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace); - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl; } @@ -6320,7 +6328,9 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp, * for following the specific bucket shard log. Otherwise they end up staying behind, and users * have no way to tell that they're all caught up */ - int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id); + int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, + target->bucket_info.layout.logs.back(), + bs->shard_id); if (r < 0) { ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl; } @@ -6939,7 +6949,9 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo return r; } - r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id); + r = svc.datalog_rados->add_entry(dpp, bucket_info, + bucket_info.layout.logs.back(), + bs.shard_id); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: failed writing data log" << dendl; } diff --git a/src/rgw/services/svc_bi_rados.cc b/src/rgw/services/svc_bi_rados.cc index a2f34382f5fd5..984eaf487b8ca 100644 --- a/src/rgw/services/svc_bi_rados.cc +++ b/src/rgw/services/svc_bi_rados.cc @@ -489,7 +489,8 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp, } for (int i = 0; i < shards_num; ++i, ++shard_id) { - ret = svc.datalog_rados->add_entry(dpp, info, shard_id); + ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(), + shard_id); if (ret < 0) { ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl; return ret;