]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: an infrastructure for hooking into the raw cache
authorYehuda Sadeh <yehuda@inktank.com>
Wed, 19 Mar 2014 23:34:21 +0000 (16:34 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Wed, 11 Jun 2014 06:08:57 +0000 (23:08 -0700)
Extend the RGWCache so that we can chain other caches to it so that when
data is invalidated it notifies them.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_cache.cc
src/rgw/rgw_cache.h
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_tools.cc
src/rgw/rgw_tools.h

index 94b3d0404edb623f1484308383fafe31bb495253..c402352a02ad65ae46da3d302ba44ef8a4ed37e0 100644 (file)
@@ -6,7 +6,7 @@
 
 using namespace std;
 
-int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
+int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info)
 {
   RWLock::RLocker l(lock);
 
@@ -17,16 +17,25 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
     return -ENOENT;
   }
 
-  ObjectCacheEntry& entry = iter->second;
+  ObjectCacheEntry *entry = &iter->second;
 
-  if (lru_counter - entry.lru_promotion_ts > lru_window) {
-    ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry.lru_promotion_ts << dendl;
+  if (lru_counter - entry->lru_promotion_ts > lru_window) {
+    ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry->lru_promotion_ts << dendl;
     lock.unlock();
     lock.get_write(); /* promote lock to writer */
 
+    /* need to redo this because entry might have dropped off the cache */
+    iter = cache_map.find(name);
+    if (iter == cache_map.end()) {
+      ldout(cct, 10) << "lost race! cache get: name=" << name << " : miss" << dendl;
+      if(perfcounter) perfcounter->inc(l_rgw_cache_miss);
+      return -ENOENT;
+    }
+
+    entry = &iter->second;
     /* check again, we might have lost a race here */
-    if (lru_counter - entry.lru_promotion_ts > lru_window) {
-      touch_lru(name, entry, iter->second.lru_iter);
+    if (lru_counter - entry->lru_promotion_ts > lru_window) {
+      touch_lru(name, *entry, iter->second.lru_iter);
     }
   }
 
@@ -39,12 +48,38 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask)
   ldout(cct, 10) << "cache get: name=" << name << " : hit" << dendl;
 
   info = src;
+  if (cache_info) {
+    cache_info->cache_locator = name;
+    cache_info->gen = entry->gen;
+  }
   if(perfcounter) perfcounter->inc(l_rgw_cache_hit);
 
   return 0;
 }
 
-void ObjectCache::put(string& name, ObjectCacheInfo& info)
+bool ObjectCache::chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry)
+{
+  RWLock::WLocker l(lock);
+
+  ldout(cct, 10) << "cache put: name=" << cache_info.cache_locator << dendl;
+  map<string, ObjectCacheEntry>::iterator iter = cache_map.find(cache_info.cache_locator);
+  if (iter == cache_map.end()) {
+    return false;
+  }
+
+  ObjectCacheEntry& entry = iter->second;
+  if (entry.gen != cache_info.gen) {
+    return false;
+  }
+
+  chained_entry->cache->chain_cb(chained_entry->key, chained_entry->data);
+
+  entry.chained_entries.push_back(make_pair<RGWChainedCache *, string>(chained_entry->cache, chained_entry->key));
+
+  return true;
+}
+
+void ObjectCache::put(string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info)
 {
   RWLock::WLocker l(lock);
 
@@ -59,6 +94,15 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info)
   ObjectCacheEntry& entry = iter->second;
   ObjectCacheInfo& target = entry.info;
 
+  for (list<pair<RGWChainedCache *, string> >::iterator iiter = entry.chained_entries.begin();
+       iiter != entry.chained_entries.end(); ++iiter) {
+    RGWChainedCache *chained_cache = iiter->first;
+    chained_cache->invalidate(iiter->second);
+  }
+
+  entry.chained_entries.clear();
+  entry.gen++;
+
   touch_lru(name, entry, entry.lru_iter);
 
   target.status = info.status;
@@ -70,6 +114,11 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info)
     return;
   }
 
+  if (cache_info) {
+    cache_info->cache_locator = name;
+    cache_info->gen = entry.gen;
+  }
+
   target.flags |= info.flags;
 
   if (info.flags & CACHE_FLAG_META)
@@ -111,6 +160,13 @@ void ObjectCache::remove(string& name)
     return;
 
   ldout(cct, 10) << "removing " << name << " from cache" << dendl;
+  ObjectCacheEntry& entry = iter->second;
+
+  for (list<pair<RGWChainedCache *, string> >::iterator iiter = entry.chained_entries.begin();
+       iiter != entry.chained_entries.end(); ++iiter) {
+    RGWChainedCache *chained_cache = iiter->first;
+    chained_cache->invalidate(iiter->second);
+  }
 
   remove_lru(name, iter->second.lru_iter);
   cache_map.erase(iter);
index 3b793f1400588a3e3b0aa8b42eb8ef267706039b..e5f349961b1c771b5d21e6f7369cdcf6768ebcf8 100644 (file)
@@ -128,8 +128,10 @@ struct ObjectCacheEntry {
   ObjectCacheInfo info;
   std::list<string>::iterator lru_iter;
   uint64_t lru_promotion_ts;
+  uint64_t gen;
+  std::list<pair<RGWChainedCache *, string> > chained_entries;
 
-  ObjectCacheEntry() : lru_promotion_ts(0) {}
+  ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {}
 };
 
 class ObjectCache {
@@ -145,13 +147,14 @@ class ObjectCache {
   void remove_lru(string& name, std::list<string>::iterator& lru_iter);
 public:
   ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL) { }
-  int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask);
-  void put(std::string& name, ObjectCacheInfo& bl);
+  int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
+  void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
   void remove(std::string& name);
   void set_ctx(CephContext *_cct) {
     cct = _cct;
     lru_window = cct->_conf->rgw_cache_lru_size / 2;
   }
+  bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry);
 };
 
 template <class T>
@@ -213,12 +216,17 @@ public:
   int put_obj_data(void *ctx, rgw_obj& obj, const char *data,
               off_t ofs, size_t len, bool exclusive);
 
-  int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end);
+  int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& bl, off_t ofs, off_t end,
+              rgw_cache_entry_info *cache_info);
 
   int obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
                bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker);
 
   int delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& obj, RGWObjVersionTracker *objv_tracker);
+
+  bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) {
+    return cache.chain_cache_entry(cache_info, chained_entry);
+  }
 };
 
 template <class T>
@@ -252,13 +260,14 @@ int RGWCache<T>::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj&
 }
 
 template <class T>
-int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end)
+int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj, bufferlist& obl, off_t ofs, off_t end,
+                         rgw_cache_entry_info *cache_info)
 {
   rgw_bucket bucket;
   string oid;
   normalize_bucket_and_obj(obj.bucket, obj.object, bucket, oid);
   if (bucket.name[0] != '.' || ofs != 0)
-    return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end);
+    return T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info);
 
   string name = normal_name(obj.bucket, oid);
 
@@ -268,7 +277,7 @@ int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h
   if (objv_tracker)
     flags |= CACHE_FLAG_OBJV;
   
-  if (cache.get(name, info, flags) == 0) {
+  if (cache.get(name, info, flags, cache_info) == 0) {
     if (info.status < 0)
       return info.status;
 
@@ -283,11 +292,11 @@ int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h
       objv_tracker->read_version = info.version;
     return bl.length();
   }
-  int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end);
+  int r = T::get_obj(ctx, objv_tracker, handle, obj, obl, ofs, end, cache_info);
   if (r < 0) {
     if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
       info.status = r;
-      cache.put(name, info);
+      cache.put(name, info, cache_info);
     }
     return r;
   }
@@ -307,7 +316,7 @@ int RGWCache<T>::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **h
   if (objv_tracker) {
     info.version = objv_tracker->read_version;
   }
-  cache.put(name, info);
+  cache.put(name, info, cache_info);
   return r;
 }
 
@@ -333,7 +342,7 @@ int RGWCache<T>::set_attr(void *ctx, rgw_obj& obj, const char *attr_name, buffer
   if (cacheable) {
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
-      cache.put(name, info);
+      cache.put(name, info, NULL);
       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
@@ -372,7 +381,7 @@ int RGWCache<T>::set_attrs(void *ctx, rgw_obj& obj,
   if (cacheable) {
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
-      cache.put(name, info);
+      cache.put(name, info, NULL);
       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
@@ -416,7 +425,7 @@ int RGWCache<T>::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, time_
   if (cacheable) {
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
-      cache.put(name, info);
+      cache.put(name, info, NULL);
       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
@@ -451,7 +460,7 @@ int RGWCache<T>::put_obj_data(void *ctx, rgw_obj& obj, const char *data,
   if (cacheable) {
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
-      cache.put(name, info);
+      cache.put(name, info, NULL);
       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
@@ -484,7 +493,7 @@ int RGWCache<T>::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti
   uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
   if (objv_tracker)
     flags |= CACHE_FLAG_OBJV;
-  int r = cache.get(name, info, flags);
+  int r = cache.get(name, info, flags, NULL);
   if (r == 0) {
     if (info.status < 0)
       return info.status;
@@ -500,7 +509,7 @@ int RGWCache<T>::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti
   if (r < 0) {
     if (r == -ENOENT) {
       info.status = r;
-      cache.put(name, info);
+      cache.put(name, info, NULL);
     }
     return r;
   }
@@ -513,7 +522,7 @@ int RGWCache<T>::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmti
     info.flags |= CACHE_FLAG_OBJV;
     info.version = objv_tracker->read_version;
   }
-  cache.put(name, info);
+  cache.put(name, info, NULL);
 done:
   if (psize)
     *psize = size;
@@ -564,7 +573,7 @@ int RGWCache<T>::watch_cb(int opcode, uint64_t ver, bufferlist& bl)
   
   switch (info.op) {
   case UPDATE_OBJ:
-    cache.put(name, info.obj_info);
+    cache.put(name, info.obj_info, NULL);
     break;
   case REMOVE_OBJ:
     cache.remove(name);
index 9550072af7a0f6e08306c66e48b49da3becabd68..8e09d897b986285db75b7f37c583b4951eb4d7b4 100644 (file)
@@ -1226,6 +1226,13 @@ public:
 };
 WRITE_CLASS_ENCODER(rgw_obj)
 
+struct rgw_cache_entry_info {
+  string cache_locator;
+  uint64_t gen;
+
+  rgw_cache_entry_info() : gen(0) {}
+};
+
 inline ostream& operator<<(ostream& out, const rgw_obj &o) {
   return out << o.bucket.name << ":" << o.object;
 }
index 27c7a72600aacb54f28c5999b9f328e4178a586b..bde482f410a4119e30e9c6aeb21ef1b1eac9e8ce 100644 (file)
@@ -657,7 +657,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc
   perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs);
   while (cur_ofs <= cur_end) {
     bufferlist bl;
-    ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end);
+    ret = store->get_obj(obj_ctx, NULL, &handle, part, bl, cur_ofs, cur_end, NULL);
     if (ret < 0)
       goto done_err;
 
index 934ca9651313839cbf333568b5700639da680220..25bfdc1a9f356974c34eb84f85ca9068aad874df 100644 (file)
@@ -3389,7 +3389,7 @@ set_err_state:
   }
 
   if (copy_first) {
-    ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size);
+    ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size, NULL);
     if (ret < 0)
       goto done_ret;
 
@@ -3462,7 +3462,7 @@ int RGWRados::copy_obj_data(void *ctx,
 
   do {
     bufferlist bl;
-    ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end);
+    ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end, NULL);
     if (ret < 0)
       return ret;
 
@@ -4552,7 +4552,7 @@ int RGWRados::clone_objs(void *ctx, rgw_obj& dst_obj,
 
 
 int RGWRados::get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj,
-                      bufferlist& bl, off_t ofs, off_t end)
+                      bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info)
 {
   rgw_bucket bucket;
   std::string oid, key;
index 9ff5f3d2eaeea99d9a8e65d8d0af7b18266260bb..ea1d276e650557832979917573dde09f5ce3d34e 100644 (file)
@@ -4,6 +4,7 @@
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 #include "common/RefCountedObj.h"
+#include "common/RWLock.h"
 #include "rgw_common.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/version/cls_version_types.h"
@@ -1166,6 +1167,21 @@ struct rgw_rados_ref {
   librados::IoCtx ioctx;
 };
 
+class RGWChainedCache {
+public:
+  virtual ~RGWChainedCache() {}
+  virtual void chain_cb(const string& key, void *data) = 0;
+  virtual void invalidate(const string& key) = 0;
+
+  struct Entry {
+    RGWChainedCache *cache;
+    const string& key;
+    void *data;
+
+    Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {}
+  };
+};
+
 
 class RGWRados
 {
@@ -1647,10 +1663,12 @@ public:
             struct rgw_err *err);
 
   virtual int get_obj(void *ctx, RGWObjVersionTracker *objv_tracker, void **handle, rgw_obj& obj,
-                      bufferlist& bl, off_t ofs, off_t end);
+                      bufferlist& bl, off_t ofs, off_t end, rgw_cache_entry_info *cache_info);
 
   virtual void finish_get_obj(void **handle);
 
+  virtual bool chain_cache_entry(rgw_cache_entry_info& cache_info, RGWChainedCache::Entry *chained_entry) { return false; }
+
   int iterate_obj(void *ctx, rgw_obj& obj,
                   off_t ofs, off_t end,
                   uint64_t max_chunk_size,
@@ -1918,6 +1936,44 @@ public:
 
 };
 
+template <class T>
+class RGWChainedCacheImpl : public RGWChainedCache {
+  RWLock lock;
+
+  map<string, T> entries;
+
+public:
+  RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
+
+  bool find(const string& key, T *entry) {
+    RWLock::RLocker rl(lock);
+    typename map<string, T>::iterator iter = entries.find(key);
+    if (iter == entries.end()) {
+      return false;
+    }
+
+    *entry = iter->second;
+    return true;
+  }
+
+  bool put(RGWRados *store, const string& key, T *entry, rgw_cache_entry_info& cache_info) {
+    Entry chain_entry(this, key, entry);
+
+    /* we need the store cache to call us under its lock to maintain lock ordering */
+    return store->chain_cache_entry(cache_info, &chain_entry);
+  }
+
+  void chain_cb(const string& key, void *data) {
+    T *entry = (T *)data;
+    RWLock::WLocker wl(lock);
+    entries[key] = *entry;
+  }
+
+  void invalidate(const string& key) {
+    RWLock::WLocker wl(lock);
+    entries.erase(key);
+  }
+};
 
 
 #endif
index 49894eae7ced03d0a78e0368c68ba9a2a5933f08..ca6d06e75ca4786b827fa5585c3b3122c6a5c6d3 100644 (file)
@@ -35,7 +35,8 @@ int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, cons
 }
 
 int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl,
-                       RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs)
+                       RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs,
+                       rgw_cache_entry_info *cache_info)
 {
   struct rgw_err err;
   void *handle = NULL;
@@ -50,7 +51,7 @@ int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const
     if (ret < 0)
       return ret;
 
-    ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1);
+    ret = rgwstore->get_obj(ctx, objv_tracker, &handle, obj, bl, 0, request_len - 1, cache_info);
     rgwstore->finish_get_obj(&handle);
     if (ret < 0)
       return ret;
index 9adaee484cc1b0a5ac316d9564970bd42b8a8b2f..1dfebffca4761cef3685ba7d3c79b3ab1ef4143f 100644 (file)
@@ -14,7 +14,8 @@ struct obj_version;
 int rgw_put_system_obj(RGWRados *rgwstore, rgw_bucket& bucket, string& oid, const char *data, size_t size, bool exclusive,
                        RGWObjVersionTracker *objv_tracker, time_t set_mtime, map<string, bufferlist> *pattrs = NULL);
 int rgw_get_system_obj(RGWRados *rgwstore, void *ctx, rgw_bucket& bucket, const string& key, bufferlist& bl,
-                       RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs = NULL);
+                       RGWObjVersionTracker *objv_tracker, time_t *pmtime, map<string, bufferlist> *pattrs = NULL,
+                       rgw_cache_entry_info *cache_info = NULL);
 
 int rgw_tools_init(CephContext *cct);
 void rgw_tools_cleanup();