]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add gen parameter to RGWDataChangesLog::add_entry
authorAdam C. Emerson <aemerson@redhat.com>
Mon, 14 Dec 2020 02:13:44 +0000 (21:13 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:50 +0000 (12:27 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_rados.cc
src/rgw/services/svc_bi_rados.cc

index 0a0dd417d2b313e38b8dba534b08b418add8dc47..f6c0a280c0e25eece2013288d1cc30cf3d1bb594 100644 (file)
@@ -689,7 +689,10 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, const DoutPrefixProvider *d
   }
 
   for (int i = 0; i < shards_num; ++i, ++shard_id) {
-    r = static_cast<rgw::sal::RadosStore*>(store)->svc()->datalog_rados->add_entry(dpp, bucket->get_info(), shard_id);
+    r = static_cast<rgw::sal::RadosStore*>(store)
+      ->svc()->datalog_rados->add_entry(dpp, bucket->get_info(),
+                                       bucket->get_info().layout.logs.back(),
+                                       shard_id);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
       return r;
index 728350763840e486b7996caa47ab9f80c7dcfdb1..4903f447ff6b5ce319b4e1fcce9390c6e5530fe4 100644 (file)
@@ -15,6 +15,7 @@
 #include "cls/log/cls_log_client.h"
 
 #include "cls_fifo_legacy.h"
+#include "rgw_bucket_layout.h"
 #include "rgw_datalog.h"
 #include "rgw_log_backing.h"
 #include "rgw_tools.h"
@@ -512,7 +513,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
 
   auto ut = real_clock::now();
   auto be = bes->head();
-  for (const auto& bs : entries) {
+  for (const auto& [bs, gen_id] : entries) {
     auto index = choose_oid(bs);
 
     rgw_data_change change;
@@ -520,6 +521,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = ut;
+    change.gen_id = gen_id;
     encode(change, bl);
 
     m[index].first.push_back(bs);
@@ -559,10 +561,11 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
   }
 }
 
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
+                                      const rgw::bucket_log_layout_generation& gen)
 {
   std::scoped_lock l{lock};
-  cur_cycle.insert(bs);
+  cur_cycle.insert({bs, gen.gen});
 }
 
 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
@@ -600,7 +603,11 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
          fmt::format("{}.{}", prefix, i));
 }
 
-int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
+int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
+                                const RGWBucketInfo& bucket_info,
+                                const rgw::bucket_log_layout_generation& gen,
+                                int shard_id)
+{
   auto& bucket = bucket_info.bucket;
 
   if (!filter_bucket(dpp, bucket, null_yield)) {
@@ -633,7 +640,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     sl.unlock();
-    register_renew(bs);
+    register_renew(bs, gen);
     return 0;
   }
 
@@ -650,7 +657,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     int ret = cond->wait();
     cond->put();
     if (!ret) {
-      register_renew(bs);
+      register_renew(bs, gen);
     }
     return ret;
   }
@@ -675,6 +682,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = now;
+    change.gen_id = gen.gen;
     encode(change, bl);
 
     ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
index 7df3be937cd1d067943fb71693fd80b49f4cfa56..2e1e0e990ab7f4eb1bbe6cd4305b7740db1a8050 100644 (file)
@@ -208,10 +208,11 @@ class RGWDataChangesLog {
 
   lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
 
-  bc::flat_set<rgw_bucket_shard> cur_cycle;
+  bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
 
   void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
-  void register_renew(const rgw_bucket_shard& bs);
+  void register_renew(const rgw_bucket_shard& bs,
+                     const rgw::bucket_log_layout_generation& gen);
   void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
 
   ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
@@ -234,7 +235,8 @@ public:
   int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
            librados::Rados* lr);
 
-  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
+  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+               const rgw::bucket_log_layout_generation& gen, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
index 5f75b7731f176aeffc31f6044e2c430b66ff1934..96518eab01c83b990d4416984bc61e0c11eff71c 100644 (file)
@@ -859,7 +859,9 @@ int RGWIndexCompletionThread::process(const DoutPrefixProvider *dpp)
       /* ignoring error, can't do anything about it */
       continue;
     }
-    r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
+    r = store->svc.datalog_rados->add_entry(this, bucket_info,
+                                           bucket_info.layout.logs.back(),
+                                           bs.shard_id);
     if (r < 0) {
       ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl;
     }
@@ -5097,7 +5099,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
       return r;
     }
 
-    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                           target->bucket_info.layout.logs.back(),
+                                           bs->shard_id);
     if (r < 0) {
       ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
       return r;
@@ -6169,7 +6173,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
 
   ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6196,7 +6202,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
 
   ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6222,7 +6230,9 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp)
    * for following the specific bucket shard log. Otherwise they end up staying behind, and users
    * have no way to tell that they're all caught up
    */
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   }
@@ -6838,7 +6848,9 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo
     return r;
   }
 
-  r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id);
+  r = svc.datalog_rados->add_entry(dpp, bucket_info,
+                                  bucket_info.layout.logs.back(),
+                                  bs.shard_id);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "ERROR: failed writing data log" << dendl;
   }
index 349ab5dc85318dc47b3fdced8caf375a8321fd54..97c07c87202cb2eb4c61a3c1cc3e2ec41ee3c95a 100644 (file)
@@ -485,7 +485,8 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
     }
 
     for (int i = 0; i < shards_num; ++i, ++shard_id) {
-      ret = svc.datalog_rados->add_entry(dpp, info, shard_id);
+      ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(),
+                                        shard_id);
       if (ret < 0) {
         ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl;
         return ret;