]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add support for new watch/notify functionality
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 21 Nov 2014 18:13:29 +0000 (10:13 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 5 Mar 2015 20:29:23 +0000 (12:29 -0800)
Disable and invalidate cache on watch error, then reinitialize watch,
reenable cache.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Conflicts:
src/rgw/rgw_cache.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

src/rgw/rgw_admin.cc
src/rgw/rgw_cache.cc
src/rgw/rgw_cache.h
src/rgw/rgw_main.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_user.cc
src/rgw/rgw_user.h

index 1fa3875c782c326298226b3369c96a92efe98a08..e5afb5aba1e9d3639cf57ab0aaad137eabd13005 100644 (file)
@@ -1242,7 +1242,7 @@ int main(int argc, char **argv)
     return 5; //EIO
   }
 
-  rgw_user_init(store->meta_mgr);
+  rgw_user_init(store);
   rgw_bucket_init(store->meta_mgr);
 
   StoreDestructor store_destructor(store);
index d1e80936b91f0c5291aa48654b385d026c9626d6..c3f3b06ffc88bb654d9320999e3768911bf12331 100644 (file)
@@ -13,6 +13,10 @@ int ObjectCache::get(string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cac
 {
   RWLock::RLocker l(lock);
 
+  if (!enabled) {
+    return -ENOENT;
+  }
+
   map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
   if (iter == cache_map.end()) {
     ldout(cct, 10) << "cache get: name=" << name << " : miss" << dendl;
@@ -64,6 +68,10 @@ bool ObjectCache::chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_ent
 {
   RWLock::WLocker l(lock);
 
+  if (!enabled) {
+    return false;
+  }
+
   list<rgw_cache_entry_info *>::iterator citer;
 
   list<ObjectCacheEntry *> cache_entry_list;
@@ -107,6 +115,10 @@ void ObjectCache::put(string& name, ObjectCacheInfo& info, rgw_cache_entry_info
 {
   RWLock::WLocker l(lock);
 
+  if (!enabled) {
+    return;
+  }
+
   ldout(cct, 10) << "cache put: name=" << name << dendl;
   map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
   if (iter == cache_map.end()) {
@@ -179,6 +191,10 @@ void ObjectCache::remove(string& name)
 {
   RWLock::WLocker l(lock);
 
+  if (!enabled) {
+    return;
+  }
+
   map<string, ObjectCacheEntry>::iterator iter = cache_map.find(name);
   if (iter == cache_map.end())
     return;
@@ -242,4 +258,40 @@ void ObjectCache::remove_lru(string& name, std::list<string>::iterator& lru_iter
   lru_iter = lru.end();
 }
 
+void ObjectCache::set_enabled(bool status)
+{
+  RWLock::WLocker l(lock);
+
+  enabled = status;
+
+  if (!enabled) {
+    do_invalidate_all();
+  }
+}
+
+void ObjectCache::invalidate_all()
+{
+  RWLock::WLocker l(lock);
+
+  do_invalidate_all();
+}
+
+void ObjectCache::do_invalidate_all()
+{
+  cache_map.clear();
+  lru.clear();
+
+  lru_size = 0;
+  lru_counter = 0;
+  lru_window = 0;
+
+  for (list<RGWChainedCache *>::iterator iter = chained_cache.begin(); iter != chained_cache.end(); ++iter) {
+    (*iter)->invalidate_all();
+  }
+}
+
+void ObjectCache::chain_cache(RGWChainedCache *cache) {
+  RWLock::WLocker l(lock);
+  chained_cache.push_back(cache);
+}
 
index 2446bf0729b9d84b071af1a7d6d5bb22557255e6..5ebb48c688f6619fc0e5aedd6587811813259350 100644 (file)
@@ -146,10 +146,16 @@ class ObjectCache {
   RWLock lock;
   CephContext *cct;
 
+  list<RGWChainedCache *> chained_cache;
+
+  bool enabled;
+
   void touch_lru(string& name, ObjectCacheEntry& entry, std::list<string>::iterator& lru_iter);
   void remove_lru(string& name, std::list<string>::iterator& lru_iter);
+
+  void do_invalidate_all();
 public:
-  ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL) { }
+  ObjectCache() : lru_size(0), lru_counter(0), lru_window(0), lock("ObjectCache"), cct(NULL), enabled(false) { }
   int get(std::string& name, ObjectCacheInfo& bl, uint32_t mask, rgw_cache_entry_info *cache_info);
   void put(std::string& name, ObjectCacheInfo& bl, rgw_cache_entry_info *cache_info);
   void remove(std::string& name);
@@ -158,6 +164,11 @@ public:
     lru_window = cct->_conf->rgw_cache_lru_size / 2;
   }
   bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry);
+
+  void set_enabled(bool status);
+
+  void chain_cache(RGWChainedCache *cache);
+  void invalidate_all();
 };
 
 template <class T>
@@ -205,9 +216,17 @@ class RGWCache  : public T
               uint64_t cookie,
               uint64_t notifier_id,
               bufferlist& bl);
+
+  void set_cache_enabled(bool state) {
+    cache.set_enabled(state);
+  }
 public:
   RGWCache() {}
 
+  void register_chained_cache(RGWChainedCache *cc) {
+    cache.chain_cache(cc);
+  }
+
   int set_attr(void *ctx, rgw_obj& obj, const char *name, bufferlist& bl, RGWObjVersionTracker *objv_tracker);
   int set_attrs(void *ctx, rgw_obj& obj, 
                 map<string, bufferlist>& attrs,
index 92ae502445930742612fde116c5b38e484f2f5c1..727a8adf507785f9fc694857c477b293deaa99fe 100644 (file)
@@ -1060,7 +1060,7 @@ int main(int argc, const char **argv)
   if (r) 
     return 1;
 
-  rgw_user_init(store->meta_mgr);
+  rgw_user_init(store);
   rgw_bucket_init(store->meta_mgr);
   rgw_log_usage_init(g_ceph_context, store);
 
index e9fc30a9d6e312caa51f7b1ca96fd32823f870ca..55d14168a82ec97cce7b300aa638c7fb72bbe9df 100644 (file)
@@ -82,6 +82,14 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
 
 #define dout_subsys ceph_subsys_rgw
 
+struct bucket_info_entry {
+  RGWBucketInfo info;
+  time_t mtime;
+  map<string, bufferlist> attrs;
+};
+
+static RGWChainedCacheImpl<bucket_info_entry> binfo_cache;
+
 void RGWDefaultRegionInfo::dump(Formatter *f) const {
   encode_json("default_region", default_region, f);
 }
@@ -1223,6 +1231,13 @@ public:
   void handle_error(uint64_t cookie, int err) {
     lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
                        << " err " << cpp_strerror(err) << dendl;
+    rados->set_cache_enabled(false);
+    rados->finalize_watch();
+    int ret = rados->init_watch();
+    if (ret < 0) {
+      ldout(rados->ctx(), 0) << "ERROR: init_watch() returned ret=" << ret << ", cache is disabled" << dendl;
+      return;
+    }
   }
 };
 
@@ -1460,6 +1475,8 @@ int RGWRados::init_complete()
   }
   ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
 
+  binfo_cache.init(this);
+
   return ret;
 }
 
@@ -1629,6 +1646,8 @@ int RGWRados::init_watch()
 
   watch_initialized = true;
 
+  set_cache_enabled(true);
+
   return 0;
 }
 
@@ -6760,14 +6779,6 @@ int RGWRados::convert_old_bucket_info(RGWObjectCtx& obj_ctx, string& bucket_name
   return 0;
 }
 
-struct bucket_info_entry {
-  RGWBucketInfo info;
-  time_t mtime;
-  map<string, bufferlist> attrs;
-};
-
-static RGWChainedCacheImpl<bucket_info_entry> binfo_cache;
-
 int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx, const string& bucket_name, RGWBucketInfo& info,
                               time_t *pmtime, map<string, bufferlist> *pattrs)
 {
index fa09f0617dd3acad07ec5b31101ce50735ec381d..23707a8a3f15eced0d19d984faeb10b284cc29f0 100644 (file)
@@ -1130,6 +1130,7 @@ public:
   virtual ~RGWChainedCache() {}
   virtual void chain_cb(const string& key, void *data) = 0;
   virtual void invalidate(const string& key) = 0;
+  virtual void invalidate_all() = 0;
 
   struct Entry {
     RGWChainedCache *cache;
@@ -1823,6 +1824,7 @@ public:
                              bufferlist& bl, off_t ofs, off_t end,
                              rgw_cache_entry_info *cache_info);
 
+  virtual void register_chained_cache(RGWChainedCache *cache) {}
   virtual bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) { return false; }
 
   int iterate_obj(RGWObjectCtx& ctx, rgw_obj& obj,
@@ -1896,6 +1898,8 @@ public:
                       bufferlist& bl) { return 0; }
   void pick_control_oid(const string& key, string& notify_oid);
 
+  virtual void set_cache_enabled(bool state) {}
+
   void set_atomic(void *ctx, rgw_obj& obj) {
     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
     rctx->set_atomic(obj);
@@ -2129,6 +2133,10 @@ class RGWChainedCacheImpl : public RGWChainedCache {
 public:
   RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
 
+  void init(RGWRados *store) {
+    store->register_chained_cache(this);
+  }
+
   bool find(const string& key, T *entry) {
     RWLock::RLocker rl(lock);
     typename map<string, T>::iterator iter = entries.find(key);
@@ -2157,6 +2165,11 @@ public:
     RWLock::WLocker wl(lock);
     entries.erase(key);
   }
+
+  void invalidate_all() {
+    RWLock::WLocker wl(lock);
+    entries.clear();
+  }
 };
 
 class RGWPutObjProcessor
index bdd2133ee116363e3e11b52c3703c040563ce929..24b72fb8572ccc4492bfb4c7351d9db2f51ac7b1 100644 (file)
@@ -2514,8 +2514,10 @@ public:
   }
 };
 
-void rgw_user_init(RGWMetadataManager *mm)
+void rgw_user_init(RGWRados *store)
 {
+  uinfo_cache.init(store);
+
   user_meta_handler = new RGWUserMetadataHandler;
-  mm->register_handler(user_meta_handler);
+  store->meta_mgr->register_handler(user_meta_handler);
 }
index 317cd06cd31127b1c2eb0363ab204b697ee0e3e6..6204b0967322adcd5234424261ff947294147108 100644 (file)
@@ -670,6 +670,6 @@ public:
 
 class RGWMetadataManager;
 
-extern void rgw_user_init(RGWMetadataManager *mm);
+extern void rgw_user_init(RGWRados *store);
 
 #endif