]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add gen parameter to RGWDataChangesLog::add_entry
authorAdam C. Emerson <aemerson@redhat.com>
Mon, 14 Dec 2020 02:13:44 +0000 (21:13 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 9 Feb 2021 22:11:06 +0000 (17:11 -0500)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_rados.cc
src/rgw/services/svc_bi_rados.cc

index e76d0cd0f6bd75d4dcb38162fe1fd3f7714b2e2f..4c450e77e004227a9624fa1c508ee47f398501dc 100644 (file)
@@ -1061,7 +1061,9 @@ int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *at
   }
 
   for (int i = 0; i < shards_num; ++i, ++shard_id) {
-    r = store->svc()->datalog_rados->add_entry(dpp, bucket_info, shard_id);
+    r = store->svc()->datalog_rados->add_entry(dpp, bucket_info,
+                                              bucket_info.layout.logs.back(),
+                                              shard_id);
     if (r < 0) {
       set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
       return r;
index 122e5336f2d90b180d4a0a9e9ba652c299f29f8b..42f0fe7b10833736b5ded04ac7e5c45e5178810f 100644 (file)
@@ -13,6 +13,7 @@
 #include "cls/fifo/cls_fifo_types.h"
 
 #include "cls_fifo_legacy.h"
+#include "rgw_bucket_layout.h"
 #include "rgw_datalog.h"
 #include "rgw_tools.h"
 
@@ -612,7 +613,7 @@ int RGWDataChangesLog::renew_entries()
   l.unlock();
 
   auto ut = real_clock::now();
-  for (const auto& bs : entries) {
+  for (const auto& [bs, gen_id] : entries) {
     auto index = choose_oid(bs);
 
     rgw_data_change change;
@@ -620,6 +621,7 @@ int RGWDataChangesLog::renew_entries()
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = ut;
+    change.gen_id = gen_id;
     encode(change, bl);
 
     m[index].first.push_back(bs);
@@ -659,10 +661,11 @@ void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
   }
 }
 
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs)
+void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
+                                      const rgw::bucket_log_layout_generation& gen)
 {
   std::scoped_lock l{lock};
-  cur_cycle.insert(bs);
+  cur_cycle.insert({bs, gen.gen});
 }
 
 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
@@ -698,7 +701,11 @@ std::string RGWDataChangesLog::get_oid(int i) const {
   return be->get_oid(i);
 }
 
-int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id) {
+int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
+                                const RGWBucketInfo& bucket_info,
+                                const rgw::bucket_log_layout_generation& gen,
+                                int shard_id)
+{
   auto& bucket = bucket_info.bucket;
 
   if (!filter_bucket(dpp, bucket, null_yield)) {
@@ -731,7 +738,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     sl.unlock();
-    register_renew(bs);
+    register_renew(bs, gen);
     return 0;
   }
 
@@ -748,7 +755,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     int ret = cond->wait();
     cond->put();
     if (!ret) {
-      register_renew(bs);
+      register_renew(bs, gen);
     }
     return ret;
   }
@@ -773,6 +780,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bs.get_key();
     change.timestamp = now;
+    change.gen_id = gen.gen;
     encode(change, bl);
 
     ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
index 02868ce62e7ab5559ba7724ecebc5d7f3071345e..eee73d4286b8f60380708a66375a2c7cb28f090c 100644 (file)
@@ -199,10 +199,11 @@ class RGWDataChangesLog {
 
   lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
 
-  bc::flat_set<rgw_bucket_shard> cur_cycle;
+  bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
 
   void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
-  void register_renew(const rgw_bucket_shard& bs);
+  void register_renew(const rgw_bucket_shard& bs,
+                     const rgw::bucket_log_layout_generation& gen);
   void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
 
   ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
@@ -225,7 +226,8 @@ public:
   int start(const RGWZone* _zone, const RGWZoneParams& zoneparams,
            RGWSI_Cls *cls_svc, librados::Rados* lr);
 
-  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
+  int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+               const rgw::bucket_log_layout_generation& gen, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int list_entries(int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
index d3381d22bc1a526b53e51be267aa655b4f3da4ed..8dd18a92c745b7b36773ead4661f74fea48b2c18 100644 (file)
@@ -859,7 +859,9 @@ int RGWIndexCompletionThread::process()
       /* ignoring error, can't do anything about it */
       continue;
     }
-    r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
+    r = store->svc.datalog_rados->add_entry(this, bucket_info,
+                                           bucket_info.layout.logs.back(),
+                                           bs.shard_id);
     if (r < 0) {
       lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
     }
@@ -5038,7 +5040,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
       return r;
     }
 
-    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+    r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                           target->bucket_info.layout.logs.back(),
+                                           bs->shard_id);
     if (r < 0) {
       lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
       return r;
@@ -6108,7 +6112,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
 
   ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
@@ -6135,7 +6141,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
 
   ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
 
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
@@ -6161,7 +6169,9 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp)
    * for following the specific bucket shard log. Otherwise they end up staying behind, and users
    * have no way to tell that they're all caught up
    */
-  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info, bs->shard_id);
+  int r = store->svc.datalog_rados->add_entry(dpp, target->bucket_info,
+                                             target->bucket_info.layout.logs.back(),
+                                             bs->shard_id);
   if (r < 0) {
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
@@ -6795,7 +6805,9 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo
     return r;
   }
 
-  r = svc.datalog_rados->add_entry(dpp, bucket_info, bs.shard_id);
+  r = svc.datalog_rados->add_entry(dpp, bucket_info,
+                                  bucket_info.layout.logs.back(),
+                                  bs.shard_id);
   if (r < 0) {
     ldout(cct, 0) << "ERROR: failed writing data log" << dendl;
   }
index eaba9c1cea2b8a8fdea1677c26405d9db00dbe9a..7eef30bed6e25034382ffaf6805162983fb1aab4 100644 (file)
@@ -475,7 +475,8 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
     }
 
     for (int i = 0; i < shards_num; ++i, ++shard_id) {
-      ret = svc.datalog_rados->add_entry(dpp, info, shard_id);
+      ret = svc.datalog_rados->add_entry(dpp, info, info.layout.logs.back(),
+                                        shard_id);
       if (ret < 0) {
         lderr(cct) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << shard_id << ")" << dendl;
         return ret;
@@ -485,4 +486,3 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
 
   return 0;
 }
-