]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add gen parameter to RGWDataChangesLog::add_entry
authorAdam C. Emerson <aemerson@redhat.com>
Mon, 14 Dec 2020 02:13:44 +0000 (21:13 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Feb 2022 15:48:58 +0000 (10:48 -0500)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_rados.cc
src/rgw/services/svc_bi_rados.cc

index 806c34a08ce0bdc38ccfd9ec9e66702335352d96..2da513128c17fe12ad986a72d05f01651c62e638 100644 (file)
@@ -699,7 +699,10 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *d
   }
 
   for (int i = 0; i < shards_num; ++i, ++shard_id) {
-    r = static_cast<rgw::sal::RadosStore*>(store)->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), shard_id);
+    r = static_cast<rgw::sal::RadosStore*>(store)
+      ->svc()->datalog_rados->add_entry(dpp, bucket->get_info(),
+                                       bucket->get_info().layout.logs.back(),
+                                       shard_id);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
       return r;
index e542739016a72e76c904a698c13a147aabcd4f43..9312a234111ea46d7db8db5f13143a170a9fa49e 100644 (file)
@@ -15,6 +15,7 @@
 #include "cls/log/cls_log_client.h"
 
 #include "cls_fifo_legacy.h"
+#include "rgw_bucket_layout.h"
 #include "rgw_datalog.h"
 #include "rgw_log_backing.h"
 #include "rgw_tools.h"
@@ -512,7 +513,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
 
   auto ut = real_clock::now();
   auto be = bes->head();
-  for (const auto& bs : entries) {
+  for (const auto& [bs, gen_id] : entries) {
     auto index = choose_oid(bs);
 
     rgw_data_change change;
@@ -520,6 +521,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = ut;
+    change.gen_id = gen_id;
     encode(change, bl);
 
     m[index].first.push_back(bs);
@@ -559,10 +561,11 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
   }
 }
 
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
+                                      const rgw::bucket_log_layout_generation& gen)
 {
   std::scoped_lock l{lock};
-  cur_cycle.insert(bs);
+  cur_cycle.insert({bs, gen.gen});
 }
 
 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
@@ -600,7 +603,11 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
          fmt::format("{}.{}", prefix, i));
 }
 
-int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
+int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
+                                const RGWBucketInfo& bucket_info,
+                                const rgw::bucket_log_layout_generation& gen,
+                                int shard_id)
+{
   auto& bucket = bucket_info.bucket;
 
   if (!filter_bucket(dpp, bucket, null_yield)) {
@@ -633,7 +640,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     sl.unlock();
-    register_renew(bs);
+    register_renew(bs, gen);
     return 0;
   }
 
@@ -650,7 +657,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     int ret = cond->wait();
     cond->put();
     if (!ret) {
-      register_renew(bs);
+      register_renew(bs, gen);
     }
     return ret;
   }
@@ -675,6 +682,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = now;
+    change.gen_id = gen.gen;
     encode(change, bl);
 
     ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
index 7df3be937cd1d067943fb71693fd80b49f4cfa56..2e1e0e990ab7f4eb1bbe6cd4305b7740db1a8050 100644 (file)
@@ -208,10 +208,11 @@ class RGWDataChangesLog {
 
   lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
 
-  bc::flat_set<rgw_bucket_shard> cur_cycle;
+  bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
 
   void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
-  void register_renew(const rgw_bucket_shard& bs);
+  void register_renew(const rgw_bucket_shard& bs,
+                     const rgw::bucket_log_layout_generation& gen);
   void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
 
   ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
@@ -234,7 +235,8 @@ public:
   int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
            librados::Rados* lr);
 
-  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
+  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+               const rgw::bucket_log_layout_generation& gen, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
index 22264699d0f74eeb2493ee9639be83d28e8d7654..6d64790e77ea1f30168c2e9e0fe0cd3fc647a0f0 100644 (file)
@@ -875,7 +875,9 @@ int RGWIndexCompletionThread::process(const DoutPrefixProvider *dpp)
       /* ignoring error, can't do anything about it */
       continue;
     }
-    r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
+    r = store->svc.datalog_rados->add_entry(this, bucket_info,
+                                           bucket_info.layout.logs.back(),
+                                           bs.shard_id);
     if (r < 0) {
       ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl;
     }
@@ -5212,7 +5214,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
       return r;
     }
 
-    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                           target->bucket_info.layout.logs.back(),
+                                           bs->shard_id);
     if (r < 0) {
       ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
       return r;
@@ -6286,7 +6290,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
 
   ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6313,7 +6319,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
 
   ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6340,7 +6348,9 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp,
    * for following the specific bucket shard log. Otherwise they end up staying behind, and users
    * have no way to tell that they're all caught up
    */
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6956,7 +6966,9 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo
     return r;
   }
 
-  r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id);
+  r = svc.datalog_rados->add_entry(dpp, bucket_info,
+                                  bucket_info.layout.logs.back(),
+                                  bs.shard_id);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed writing data log" << dendl;
   }
index a2f34382f5fd55870c0144115439c67a34c7ca39..984eaf487b8ca532588c956effd901dea62ebd32 100644 (file)
@@ -489,7 +489,8 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
     }
 
     for (int i = 0; i < shards_num; ++i, ++shard_id) {
-      ret = svc.datalog_rados->add_entry(dpp, info, shard_id);
+      ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(),
+                                        shard_id);
       if (ret < 0) {
         ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl;
         return ret;