From ab764f38c1cfb7860a657f3c3557419e8e96f4c2 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 19 Mar 2014 16:34:21 -0700 Subject: [PATCH] rgw: an infrastructure for hooking into the raw cache Extend the RGWCache so that we can chain other caches to it so that when data is invalidated it notifies them. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cache.cc | 70 +++++++++++++++++++++++++++++++++++++++----- src/rgw/rgw_cache.h | 45 ++++++++++++++++------------ src/rgw/rgw_common.h | 7 +++++ src/rgw/rgw_op.cc | 2 +- src/rgw/rgw_rados.cc | 6 ++-- src/rgw/rgw_rados.h | 58 +++++++++++++++++++++++++++++++++++- src/rgw/rgw_tools.cc | 5 ++-- src/rgw/rgw_tools.h | 3 +- 8 files changed, 163 insertions(+), 33 deletions(-) diff --git a/src/rgw/rgw_cache.cc b/src/rgw/rgw_cache.cc index 94b3d0404edb6..c402352a02ad6 100644 --- a/src/rgw/rgw_cache.cc +++ b/src/rgw/rgw_cache.cc @@ -6,7 +6,7 @@ using namespace std; -int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask) +int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info) { RWLock::RLocker l(lock); @@ -17,16 +17,25 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask) return -ENOENT; } - ObjectCacheEntry& entry = iter->second; + ObjectCacheEntry *entry = &iter->second; - if (lru_counter - entry.lru_promotion_ts > lru_window) { - ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry.lru_promotion_ts << dendl; + if (lru_counter - entry->lru_promotion_ts > lru_window) { + ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry->lru_promotion_ts << dendl; lock.unlock(); lock.get_write(); /* promote lock to writer */ + /* need to redo this because entry might have dropped off the cache */ + iter = cache_map.find(name); + if (iter == cache_map.end()) { + ldout(cct, 10) << "lost race! cache get: name=" << name << " : miss" << dendl; + if(perfcounter) perfcounter->inc(l_rgw_cache_miss); + return -ENOENT; + } + + entry = &iter->second; /* check again, we might have lost a race here */ - if (lru_counter - entry.lru_promotion_ts > lru_window) { - touch_lru(name, entry, iter->second.lru_iter); + if (lru_counter - entry->lru_promotion_ts > lru_window) { + touch_lru(name, *entry, iter->second.lru_iter); } } @@ -39,12 +48,38 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask) ldout(cct, 10) << "cache get: name=" << name << " : hit" << dendl; info = src; + if (cache_info) { + cache_info->cache_locator = name; + cache_info->gen = entry->gen; + } if(perfcounter) perfcounter->inc(l_rgw_cache_hit); return 0; } -void ObjectCache::put(string& name, ObjectCacheInfo& info) +bool ObjectCache::chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) +{ + RWLock::WLocker l(lock); + + ldout(cct, 10) << "cache put: name=" << cache_info.cache_locator << dendl; + map::iterator iter = cache_map.find(cache_info.cache_locator); + if (iter == cache_map.end()) { + return false; + } + + ObjectCacheEntry& entry = iter->second; + if (entry.gen != cache_info.gen) { + return false; + } + + chained_entry->cache->chain_cb(chained_entry->key, chained_entry->data); + + entry.chained_entries.push_back(make_pair(chained_entry->cache, chained_entry->key)); + + return true; +} + +void ObjectCache::put(string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info) { RWLock::WLocker l(lock); @@ -59,6 +94,15 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info) ObjectCacheEntry& entry = iter->second; ObjectCacheInfo& target = entry.info; + for (list >::iterator iiter = entry.chained_entries.begin(); + iiter != entry.chained_entries.end(); ++iiter) { + RGWChainedCache *chained_cache = iiter->first; + chained_cache->invalidate(iiter->second); + } + + entry.chained_entries.clear(); + entry.gen++; + touch_lru(name, entry, entry.lru_iter); target.status = info.status; @@ -70,6 +114,11 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info) return; } + if (cache_info) { + cache_info->cache_locator = name; + cache_info->gen = entry.gen; + } + target.flags |= info.flags; if (info.flags & CACHE_FLAG_META) @@ -111,6 +160,13 @@ void ObjectCache::remove(string& name) return; ldout(cct, 10) << "removing " << name << " from cache" << dendl; + ObjectCacheEntry& entry = iter->second; + + for (list >::iterator iiter = entry.chained_entries.begin(); + iiter != entry.chained_entries.end(); ++iiter) { + RGWChainedCache *chained_cache = iiter->first; + chained_cache->invalidate(iiter->second); + } remove_lru(name, iter->second.lru_iter); cache_map.erase(iter); diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 3b793f1400588..e5f349961b1c7 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -128,8 +128,10 @@ struct ObjectCacheEntry { ObjectCacheInfo info; std::list::iterator lru_iter; uint64_t lru_promotion_ts; + uint64_t gen; + std::list > chained_entries; - ObjectCacheEntry() : lru_promotion_ts(0) {} + ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {} }; class ObjectCache { @@ -145,13 +147,14 @@ class ObjectCache { void remove_lru(string& name, std::list::iterator& lru_iter); public: ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL) { } - int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask); - void put(std::string& name, ObjectCacheInfo& bl); + int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info); + void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info); void remove(std::string& name); void set_ctx(CephContext *_cct) { cct = _cct; lru_window = cct->_conf->rgw_cache_lru_size / 2; } + bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry); }; template @@ -213,12 +216,17 @@ public: int put_obj_data(void *ctx, rgw_obj& obj, const char *data, off_t ofs, size_t len, bool exclusive); - int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end); + int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end, + rgw_cache_entry_info *cache_info); int obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, uint64_t *epoch, map *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker); int delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& obj, RGWObjVersionTracker *objv_tracker); + + bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) { + return cache.chain_cache_entry(cache_info, chained_entry); + } }; template @@ -252,13 +260,14 @@ int RGWCache::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& } template -int RGWCache::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end) +int RGWCache::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end, + rgw_cache_entry_info *cache_info) { rgw_bucket bucket; string oid; normalize_bucket_and_obj(obj.bucket, obj.object, bucket, oid); if (bucket.name[0] != '.' || ofs != 0) - return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end); + return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info); string name = normal_name(obj.bucket, oid); @@ -268,7 +277,7 @@ int RGWCache::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h if (objv_tracker) flags |= CACHE_FLAG_OBJV; - if (cache.get(name, info, flags) == 0) { + if (cache.get(name, info, flags, cache_info) == 0) { if (info.status < 0) return info.status; @@ -283,11 +292,11 @@ int RGWCache::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h objv_tracker->read_version = info.version; return bl.length(); } - int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end); + int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info); if (r < 0) { if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors info.status = r; - cache.put(name, info); + cache.put(name, info, cache_info); } return r; } @@ -307,7 +316,7 @@ int RGWCache::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h if (objv_tracker) { info.version = objv_tracker->read_version; } - cache.put(name, info); + cache.put(name, info, cache_info); return r; } @@ -333,7 +342,7 @@ int RGWCache::set_attr(void *ctx, rgw_obj& obj, const char *attr_name, buffer if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { - cache.put(name, info); + cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; @@ -372,7 +381,7 @@ int RGWCache::set_attrs(void *ctx, rgw_obj& obj, if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { - cache.put(name, info); + cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; @@ -416,7 +425,7 @@ int RGWCache::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, time_ if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { - cache.put(name, info); + cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; @@ -451,7 +460,7 @@ int RGWCache::put_obj_data(void *ctx, rgw_obj& obj, const char *data, if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { - cache.put(name, info); + cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; @@ -484,7 +493,7 @@ int RGWCache::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; if (objv_tracker) flags |= CACHE_FLAG_OBJV; - int r = cache.get(name, info, flags); + int r = cache.get(name, info, flags, NULL); if (r == 0) { if (info.status < 0) return info.status; @@ -500,7 +509,7 @@ int RGWCache::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti if (r < 0) { if (r == -ENOENT) { info.status = r; - cache.put(name, info); + cache.put(name, info, NULL); } return r; } @@ -513,7 +522,7 @@ int RGWCache::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti info.flags |= CACHE_FLAG_OBJV; info.version = objv_tracker->read_version; } - cache.put(name, info); + cache.put(name, info, NULL); done: if (psize) *psize = size; @@ -564,7 +573,7 @@ int RGWCache::watch_cb(int opcode, uint64_t ver, bufferlist& bl) switch (info.op) { case UPDATE_OBJ: - cache.put(name, info.obj_info); + cache.put(name, info.obj_info, NULL); break; case REMOVE_OBJ: cache.remove(name); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 9550072af7a0f..8e09d897b9862 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1226,6 +1226,13 @@ public: }; WRITE_CLASS_ENCODER(rgw_obj) +struct rgw_cache_entry_info { + string cache_locator; + uint64_t gen; + + rgw_cache_entry_info() : gen(0) {} +}; + inline ostream& operator<<(ostream& out, const rgw_obj &o) { return out << o.bucket.name << ":" << o.object; } diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 27c7a72600aac..bde482f410a41 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -657,7 +657,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs); while (cur_ofs <= cur_end) { bufferlist bl; - ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end); + ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end, NULL); if (ret < 0) goto done_err; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 934ca96513138..25bfdc1a9f356 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3389,7 +3389,7 @@ set_err_state: } if (copy_first) { - ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size); + ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size, NULL); if (ret < 0) goto done_ret; @@ -3462,7 +3462,7 @@ int RGWRados::copy_obj_data(void *ctx, do { bufferlist bl; - ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end); + ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end, NULL); if (ret < 0) return ret; @@ -4552,7 +4552,7 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj, int RGWRados::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, - bufferlist& bl, off_t ofs, off_t end) + bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info) { rgw_bucket bucket; std::string oid, key; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 9ff5f3d2eaeea..ea1d276e65055 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -4,6 +4,7 @@ #include "include/rados/librados.hpp" #include "include/Context.h" #include "common/RefCountedObj.h" +#include "common/RWLock.h" #include "rgw_common.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/version/cls_version_types.h" @@ -1166,6 +1167,21 @@ struct rgw_rados_ref { librados::IoCtx ioctx; }; +class RGWChainedCache { +public: + virtual ~RGWChainedCache() {} + virtual void chain_cb(const string& key, void *data) = 0; + virtual void invalidate(const string& key) = 0; + + struct Entry { + RGWChainedCache *cache; + const string& key; + void *data; + + Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {} + }; +}; + class RGWRados { @@ -1647,10 +1663,12 @@ public: struct rgw_err *err); virtual int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, - bufferlist& bl, off_t ofs, off_t end); + bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info); virtual void finish_get_obj(void **handle); + virtual bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) { return false; } + int iterate_obj(void *ctx, rgw_obj& obj, off_t ofs, off_t end, uint64_t max_chunk_size, @@ -1918,6 +1936,44 @@ public: }; +template +class RGWChainedCacheImpl : public RGWChainedCache { + RWLock lock; + + map entries; + +public: + RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {} + + bool find(const string& key, T *entry) { + RWLock::RLocker rl(lock); + typename map::iterator iter = entries.find(key); + if (iter == entries.end()) { + return false; + } + + *entry = iter->second; + return true; + } + + bool put(RGWRados *store, const string& key, T *entry, rgw_cache_entry_info& cache_info) { + Entry chain_entry(this, key, entry); + + /* we need the store cache to call us under its lock to maintain lock ordering */ + return store->chain_cache_entry(cache_info, &chain_entry); + } + + void chain_cb(const string& key, void *data) { + T *entry = (T *)data; + RWLock::WLocker wl(lock); + entries[key] = *entry; + } + + void invalidate(const string& key) { + RWLock::WLocker wl(lock); + entries.erase(key); + } +}; #endif diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 49894eae7ced0..ca6d06e75ca47 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -35,7 +35,8 @@ int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, cons } int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, time_t *pmtime, map *pattrs) + RGWObjVersionTracker *objv_tracker, time_t *pmtime, map *pattrs, + rgw_cache_entry_info *cache_info) { struct rgw_err err; void *handle = NULL; @@ -50,7 +51,7 @@ int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const if (ret < 0) return ret; - ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1); + ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1, cache_info); rgwstore->finish_get_obj(&handle); if (ret < 0) return ret; diff --git a/src/rgw/rgw_tools.h b/src/rgw/rgw_tools.h index 9adaee484cc1b..1dfebffca4761 100644 --- a/src/rgw/rgw_tools.h +++ b/src/rgw/rgw_tools.h @@ -14,7 +14,8 @@ struct obj_version; int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive, RGWObjVersionTracker *objv_tracker, time_t set_mtime, map *pattrs = NULL); int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl, - RGWObjVersionTracker *objv_tracker, time_t *pmtime, map *pattrs = NULL); + RGWObjVersionTracker *objv_tracker, time_t *pmtime, map *pattrs = NULL, + rgw_cache_entry_info *cache_info = NULL); int rgw_tools_init(CephContext *cct); void rgw_tools_cleanup(); -- 2.39.5