]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add bucket notification cache 49807/head
authorliuhong <liuhong_yewu@cmss.chinamobile.com>
Fri, 20 Jan 2023 13:02:24 +0000 (21:02 +0800)
committerliuhong <liuhong_yewu@cmss.chinamobile.com>
Fri, 10 Mar 2023 07:34:05 +0000 (08:34 +0100)
Bucket notification supports reading the rule configuration from the cache first, rather than directly from the bottom layer

Signed-off-by: liuhong <liuhong_yewu@cmss.chinamobile.com>
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc

index 3f2332db482ef22416d1f76407c8254883a2b726..db546a3ca68f95bf7729928ba2a17184018a3cff 100644 (file)
@@ -1051,6 +1051,7 @@ void RGWRados::finalize()
 
   delete binfo_cache;
   delete obj_tombstone_cache;
+  delete topic_cache;
   if (d3n_data_cache)
     delete d3n_data_cache;
 
@@ -1281,6 +1282,9 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp)
   binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
   binfo_cache->init(svc.cache);
 
+  topic_cache = new RGWChainedCacheImpl<pubsub_bucket_topics_entry>;
+  topic_cache->init(svc.cache);
+
   lc = new RGWLC();
   lc->initialize(cct, this->driver);
 
index cc118f37dc7247523767b268440ddfd7efea431d..043e9569e81633460e8b78616b7d61737e1378c3 100644 (file)
@@ -39,6 +39,7 @@
 #include "common/ceph_mutex.h"
 #include "rgw_cache.h"
 #include "rgw_sal_fwd.h"
+#include "rgw_pubsub.h"
 
 struct D3nDataCache;
 
@@ -319,6 +320,12 @@ struct bucket_info_entry {
   std::map<std::string, bufferlist> attrs;
 };
 
+struct pubsub_bucket_topics_entry {
+  rgw_pubsub_bucket_topics info;
+  RGWObjVersionTracker objv_tracker;
+  real_time mtime;
+};
+
 struct tombstone_entry;
 
 template <class K, class V>
@@ -426,6 +433,9 @@ protected:
 
   tombstone_cache_t* obj_tombstone_cache{nullptr};
 
+  using RGWChainedCacheImpl_bucket_topics_entry = RGWChainedCacheImpl<pubsub_bucket_topics_entry>;
+  RGWChainedCacheImpl_bucket_topics_entry* topic_cache{nullptr};
+
   librados::IoCtx gc_pool_ctx;        // .rgw.gc
   librados::IoCtx lc_pool_ctx;        // .rgw.lc
   librados::IoCtx objexp_pool_ctx;
@@ -1365,6 +1375,8 @@ public:
                      ceph::real_time *pmtime, optional_yield y,
                       const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *pattrs = NULL);
 
+  RGWChainedCacheImpl_bucket_topics_entry *get_topic_cache() { return topic_cache; }
+
   // Returns 0 on successful refresh. Returns error code if there was
   // an error or the version stored on the OSD is the same as that
   // presented in the BucketInfo structure.
index fb34ae25aa8ff9683fde19055a107aa546a1a8c8..bed3fbfae01daf723ad17953f4f06d7c43450859 100644 (file)
@@ -59,6 +59,7 @@
 #include "services/svc_zone_utils.h"
 #include "services/svc_role_rados.h"
 #include "services/svc_user.h"
+#include "services/svc_sys_obj_cache.h"
 #include "cls/rgw/cls_rgw_client.h"
 
 #include "rgw_pubsub.h"
@@ -1033,13 +1034,22 @@ std::string RadosBucket::topics_oid() const {
 int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications, 
     RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) 
 {
+  // read from cache
+  auto cache = store->getRados()->get_topic_cache();
+  const std::string key = store->svc()->zone->get_zone_params().log_pool.to_str() + topics_oid();
+  if (auto e = cache->find(key)) {
+    notifications = e->info;
+    return 0;
+  }
+
   bufferlist bl;
+  rgw_cache_entry_info cache_info;
   const int ret = rgw_get_system_obj(store->svc()->sysobj,
                                store->svc()->zone->get_zone_params().log_pool,
                                topics_oid(),
                                bl,
-                               objv_tracker,
-                               nullptr, y, dpp, nullptr);
+                               objv_tracker, nullptr,
+                               y, dpp, nullptr, &cache_info);
   if (ret < 0) {
     return ret;
   }
@@ -1053,6 +1063,11 @@ int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications,
     return -EIO;
   }
 
+  pubsub_bucket_topics_entry e;
+  e.info = notifications;
+  if (!cache->put(dpp, store->getRados()->svc.cache, key, &e, { &cache_info })) {
+    ldpp_dout(dpp, 10) << "couldn't put bucket topics cache entry" << dendl;
+  }
   return 0;
 }