delete binfo_cache;
delete obj_tombstone_cache;
+ delete topic_cache;
if (d3n_data_cache)
delete d3n_data_cache;
binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
binfo_cache->init(svc.cache);
+ topic_cache = new RGWChainedCacheImpl<pubsub_bucket_topics_entry>;
+ topic_cache->init(svc.cache);
+
lc = new RGWLC();
lc->initialize(cct, this->driver);
#include "common/ceph_mutex.h"
#include "rgw_cache.h"
#include "rgw_sal_fwd.h"
+#include "rgw_pubsub.h"
struct D3nDataCache;
std::map<std::string, bufferlist> attrs;
};
+struct pubsub_bucket_topics_entry {
+ rgw_pubsub_bucket_topics info;
+ RGWObjVersionTracker objv_tracker;
+ real_time mtime;
+};
+
struct tombstone_entry;
template <class K, class V>
tombstone_cache_t* obj_tombstone_cache{nullptr};
+ using RGWChainedCacheImpl_bucket_topics_entry = RGWChainedCacheImpl<pubsub_bucket_topics_entry>;
+ RGWChainedCacheImpl_bucket_topics_entry* topic_cache{nullptr};
+
librados::IoCtx gc_pool_ctx; // .rgw.gc
librados::IoCtx lc_pool_ctx; // .rgw.lc
librados::IoCtx objexp_pool_ctx;
ceph::real_time *pmtime, optional_yield y,
const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *pattrs = NULL);
+ RGWChainedCacheImpl_bucket_topics_entry *get_topic_cache() { return topic_cache; }
+
// Returns 0 on successful refresh. Returns error code if there was
// an error or the version stored on the OSD is the same as that
// presented in the BucketInfo structure.
#include "services/svc_zone_utils.h"
#include "services/svc_role_rados.h"
#include "services/svc_user.h"
+#include "services/svc_sys_obj_cache.h"
#include "cls/rgw/cls_rgw_client.h"
#include "rgw_pubsub.h"
int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications,
RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp)
{
+ // read from cache
+ auto cache = store->getRados()->get_topic_cache();
+ const std::string key = store->svc()->zone->get_zone_params().log_pool.to_str() + topics_oid();
+ if (auto e = cache->find(key)) {
+ notifications = e->info;
+ return 0;
+ }
+
bufferlist bl;
+ rgw_cache_entry_info cache_info;
const int ret = rgw_get_system_obj(store->svc()->sysobj,
store->svc()->zone->get_zone_params().log_pool,
topics_oid(),
bl,
- objv_tracker,
- nullptr, y, dpp, nullptr);
+ objv_tracker, nullptr,
+ y, dpp, nullptr, &cache_info);
if (ret < 0) {
return ret;
}
return -EIO;
}
+ pubsub_bucket_topics_entry e;
+ e.info = notifications;
+ if (!cache->put(dpp, store->getRados()->svc.cache, key, &e, { &cache_info })) {
+ ldpp_dout(dpp, 10) << "couldn't put bucket topics cache entry" << dendl;
+ }
return 0;
}