using namespace std;
-int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
+int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info)
{
RWLock::RLocker l(lock);
return -ENOENT;
}
- ObjectCacheEntry& entry = iter->second;
+ ObjectCacheEntry *entry = &iter->second;
- if (lru_counter - entry.lru_promotion_ts > lru_window) {
- ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry.lru_promotion_ts << dendl;
+ if (lru_counter - entry->lru_promotion_ts > lru_window) {
+ ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry->lru_promotion_ts << dendl;
lock.unlock();
lock.get_write(); /* promote lock to writer */
+ /* need to redo this because entry might have dropped off the cache */
+ iter = cache_map.find(name);
+ if (iter == cache_map.end()) {
+ ldout(cct, 10) << "lost race! cache get: name=" << name << " : miss" << dendl;
+ if(perfcounter) perfcounter->inc(l_rgw_cache_miss);
+ return -ENOENT;
+ }
+
+ entry = &iter->second;
/* check again, we might have lost a race here */
- if (lru_counter - entry.lru_promotion_ts > lru_window) {
- touch_lru(name, entry, iter->second.lru_iter);
+ if (lru_counter - entry->lru_promotion_ts > lru_window) {
+ touch_lru(name, *entry, iter->second.lru_iter);
}
}
ldout(cct, 10) << "cache get: name=" << name << " : hit" << dendl;
info = src;
+ if (cache_info) {
+ cache_info->cache_locator = name;
+ cache_info->gen = entry->gen;
+ }
if(perfcounter) perfcounter->inc(l_rgw_cache_hit);
return 0;
}
-void ObjectCache::put(string& name, ObjectCacheInfo& info)
+bool ObjectCache::chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry)
+{
+ RWLock::WLocker l(lock);
+
+ ldout(cct, 10) << "cache put: name=" << cache_info.cache_locator << dendl;
+ map<string, ObjectCacheEntry>::iterator iter = cache_map.find(cache_info.cache_locator);
+ if (iter == cache_map.end()) {
+ return false;
+ }
+
+ ObjectCacheEntry& entry = iter->second;
+ if (entry.gen != cache_info.gen) {
+ return false;
+ }
+
+ chained_entry->cache->chain_cb(chained_entry->key, chained_entry->data);
+
+ entry.chained_entries.push_back(make_pair<RGWChainedCache *, string>(chained_entry->cache, chained_entry->key));
+
+ return true;
+}
+
+void ObjectCache::put(string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info)
{
RWLock::WLocker l(lock);
ObjectCacheEntry& entry = iter->second;
ObjectCacheInfo& target = entry.info;
+ for (list<pair<RGWChainedCache *, string> >::iterator iiter = entry.chained_entries.begin();
+ iiter != entry.chained_entries.end(); ++iiter) {
+ RGWChainedCache *chained_cache = iiter->first;
+ chained_cache->invalidate(iiter->second);
+ }
+
+ entry.chained_entries.clear();
+ entry.gen++;
+
touch_lru(name, entry, entry.lru_iter);
target.status = info.status;
return;
}
+ if (cache_info) {
+ cache_info->cache_locator = name;
+ cache_info->gen = entry.gen;
+ }
+
target.flags |= info.flags;
if (info.flags & CACHE_FLAG_META)
return;
ldout(cct, 10) << "removing " << name << " from cache" << dendl;
+ ObjectCacheEntry& entry = iter->second;
+
+ for (list<pair<RGWChainedCache *, string> >::iterator iiter = entry.chained_entries.begin();
+ iiter != entry.chained_entries.end(); ++iiter) {
+ RGWChainedCache *chained_cache = iiter->first;
+ chained_cache->invalidate(iiter->second);
+ }
remove_lru(name, iter->second.lru_iter);
cache_map.erase(iter);
ObjectCacheInfo info;
std::list<string>::iterator lru_iter;
uint64_t lru_promotion_ts;
+ uint64_t gen;
+ std::list<pair<RGWChainedCache *, string> > chained_entries;
- ObjectCacheEntry() : lru_promotion_ts(0) {}
+ ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
};
class ObjectCache {
void remove_lru(string& name, std::list<string>::iterator& lru_iter);
public:
ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL) { }
- int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask);
- void put(std::string& name, ObjectCacheInfo& bl);
+ int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
+ void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
void remove(std::string& name);
void set_ctx(CephContext *_cct) {
cct = _cct;
lru_window = cct->_conf->rgw_cache_lru_size / 2;
}
+ bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry);
};
template <class T>
int put_obj_data(void *ctx, rgw_obj& obj, const char *data,
off_t ofs, size_t len, bool exclusive);
- int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end);
+ int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end,
+ rgw_cache_entry_info *cache_info);
int obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker);
int delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& obj, RGWObjVersionTracker *objv_tracker);
+
+ bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) {
+ return cache.chain_cache_entry(cache_info, chained_entry);
+ }
};
template <class T>
}
template <class T>
-int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end)
+int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end,
+ rgw_cache_entry_info *cache_info)
{
rgw_bucket bucket;
string oid;
normalize_bucket_and_obj(obj.bucket, obj.object, bucket, oid);
if (bucket.name[0] != '.' || ofs != 0)
- return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end);
+ return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info);
string name = normal_name(obj.bucket, oid);
if (objv_tracker)
flags |= CACHE_FLAG_OBJV;
- if (cache.get(name, info, flags) == 0) {
+ if (cache.get(name, info, flags, cache_info) == 0) {
if (info.status < 0)
return info.status;
objv_tracker->read_version = info.version;
return bl.length();
}
- int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end);
+ int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info);
if (r < 0) {
if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
info.status = r;
- cache.put(name, info);
+ cache.put(name, info, cache_info);
}
return r;
}
if (objv_tracker) {
info.version = objv_tracker->read_version;
}
- cache.put(name, info);
+ cache.put(name, info, cache_info);
return r;
}
if (cacheable) {
string name = normal_name(bucket, oid);
if (ret >= 0) {
- cache.put(name, info);
+ cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
if (cacheable) {
string name = normal_name(bucket, oid);
if (ret >= 0) {
- cache.put(name, info);
+ cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
if (cacheable) {
string name = normal_name(bucket, oid);
if (ret >= 0) {
- cache.put(name, info);
+ cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
if (cacheable) {
string name = normal_name(bucket, oid);
if (ret >= 0) {
- cache.put(name, info);
+ cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
if (objv_tracker)
flags |= CACHE_FLAG_OBJV;
- int r = cache.get(name, info, flags);
+ int r = cache.get(name, info, flags, NULL);
if (r == 0) {
if (info.status < 0)
return info.status;
if (r < 0) {
if (r == -ENOENT) {
info.status = r;
- cache.put(name, info);
+ cache.put(name, info, NULL);
}
return r;
}
info.flags |= CACHE_FLAG_OBJV;
info.version = objv_tracker->read_version;
}
- cache.put(name, info);
+ cache.put(name, info, NULL);
done:
if (psize)
*psize = size;
switch (info.op) {
case UPDATE_OBJ:
- cache.put(name, info.obj_info);
+ cache.put(name, info.obj_info, NULL);
break;
case REMOVE_OBJ:
cache.remove(name);
};
WRITE_CLASS_ENCODER(rgw_obj)
+struct rgw_cache_entry_info {
+ string cache_locator;
+ uint64_t gen;
+
+ rgw_cache_entry_info() : gen(0) {}
+};
+
inline ostream& operator<<(ostream& out, const rgw_obj &o) {
return out << o.bucket.name << ":" << o.object;
}
perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs);
while (cur_ofs <= cur_end) {
bufferlist bl;
- ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end);
+ ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end, NULL);
if (ret < 0)
goto done_err;
}
if (copy_first) {
- ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size);
+ ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size, NULL);
if (ret < 0)
goto done_ret;
do {
bufferlist bl;
- ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end);
+ ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end, NULL);
if (ret < 0)
return ret;
int RGWRados::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj,
- bufferlist& bl, off_t ofs, off_t end)
+ bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info)
{
rgw_bucket bucket;
std::string oid, key;
#include "include/rados/librados.hpp"
#include "include/Context.h"
#include "common/RefCountedObj.h"
+#include "common/RWLock.h"
#include "rgw_common.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/version/cls_version_types.h"
librados::IoCtx ioctx;
};
+class RGWChainedCache {
+public:
+ virtual ~RGWChainedCache() {}
+ virtual void chain_cb(const string& key, void *data) = 0;
+ virtual void invalidate(const string& key) = 0;
+
+ struct Entry {
+ RGWChainedCache *cache;
+ const string& key;
+ void *data;
+
+ Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {}
+ };
+};
+
class RGWRados
{
struct rgw_err *err);
virtual int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj,
- bufferlist& bl, off_t ofs, off_t end);
+ bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info);
virtual void finish_get_obj(void **handle);
+ virtual bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) { return false; }
+
int iterate_obj(void *ctx, rgw_obj& obj,
off_t ofs, off_t end,
uint64_t max_chunk_size,
};
+template <class T>
+class RGWChainedCacheImpl : public RGWChainedCache {
+ RWLock lock;
+
+ map<string, T> entries;
+
+public:
+ RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
+
+ bool find(const string& key, T *entry) {
+ RWLock::RLocker rl(lock);
+ typename map<string, T>::iterator iter = entries.find(key);
+ if (iter == entries.end()) {
+ return false;
+ }
+
+ *entry = iter->second;
+ return true;
+ }
+
+ bool put(RGWRados *store, const string& key, T *entry, rgw_cache_entry_info& cache_info) {
+ Entry chain_entry(this, key, entry);
+
+ /* we need the store cache to call us under its lock to maintain lock ordering */
+ return store->chain_cache_entry(cache_info, &chain_entry);
+ }
+
+ void chain_cb(const string& key, void *data) {
+ T *entry = (T *)data;
+ RWLock::WLocker wl(lock);
+ entries[key] = *entry;
+ }
+
+ void invalidate(const string& key) {
+ RWLock::WLocker wl(lock);
+ entries.erase(key);
+ }
+};
#endif
}
int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs)
+ RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info)
{
struct rgw_err err;
void *handle = NULL;
if (ret < 0)
return ret;
- ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1);
+ ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1, cache_info);
rgwstore->finish_get_obj(&handle);
if (ret < 0)
return ret;
int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive,
RGWObjVersionTracker *objv_tracker, time_t set_mtime, map<string, bufferlist> *pattrs = NULL);
int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs = NULL);
+ RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs = NULL,
+ rgw_cache_entry_info *cache_info = NULL);
int rgw_tools_init(CephContext *cct);
void rgw_tools_cleanup();