]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use multiple notification objects
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 18 Jun 2012 20:25:44 +0000 (13:25 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 9 Aug 2012 20:33:37 +0000 (13:33 -0700)
Issue #2504. This makes us listen and notify on more than
a single object, which reduces the contention of cache
notifications.

NOTE: This change requires that any radosgw and radosgw-admin
use the same 'rgw num control oids' config value. A config value
of 0 will maintain old compatibility, and will allow an upgraded
process run in conjuction with an old one. Setting value other
than 0 (or using the non-zero default) will require upgrading
and restarting all the gateways together. Failing to do so
might lead to inconsistent user and buckets metadata (which
will be resolved once gateways are restarted).

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/rgw/rgw_cache.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index af9213e07b69be4db10b2e81bcde1b875512b4e9..3045dfb8beb3b8358cbcdc89f92ba0e3e2d3f3ad 100644 (file)
@@ -409,6 +409,7 @@ OPTION(rgw_remote_addr_param, OPT_STR, "REMOTE_ADDR")  // e.g. X-Forwarded-For,
 OPTION(rgw_op_thread_timeout, OPT_INT, 10*60)
 OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 0)
 OPTION(rgw_thread_pool_size, OPT_INT, 100)
+OPTION(rgw_num_control_oids, OPT_INT, 8)
 OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0)
 OPTION(rgw_pools_preallocate_max, OPT_INT, 100)
 OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70)
index 1cc319b1fa3f8ab0eb8a1c3bd2998a8585fb2522..fa7630e17f8ef77186b018b4600db9b4999c072e 100644 (file)
@@ -185,7 +185,7 @@ class RGWCache  : public T
   void finalize() {
     T::finalize_watch();
   }
-  int distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
+  int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
   int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
 public:
   RGWCache() {}
@@ -223,7 +223,7 @@ int RGWCache<T>::delete_obj(void *ctx, rgw_obj& obj, bool sync)
   cache.remove(name);
 
   ObjectCacheInfo info;
-  distribute(obj, info, REMOVE_OBJ);
+  distribute(name, obj, info, REMOVE_OBJ);
 
   return T::delete_obj(ctx, obj, sync);
 }
@@ -292,7 +292,7 @@ int RGWCache<T>::set_attr(void *ctx, rgw_obj& obj, const char *attr_name, buffer
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
       cache.put(name, info);
-      int r = distribute(obj, info, UPDATE_OBJ);
+      int r = distribute(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
     } else {
@@ -326,7 +326,7 @@ int RGWCache<T>::set_attrs(void *ctx, rgw_obj& obj,
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
       cache.put(name, info);
-      int r = distribute(obj, info, UPDATE_OBJ);
+      int r = distribute(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
     } else {
@@ -362,7 +362,7 @@ int RGWCache<T>::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mt
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
       cache.put(name, info);
-      int r = distribute(obj, info, UPDATE_OBJ);
+      int r = distribute(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
     } else {
@@ -397,7 +397,7 @@ int RGWCache<T>::put_obj_data(void *ctx, rgw_obj& obj, const char *data,
     string name = normal_name(bucket, oid);
     if (ret >= 0) {
       cache.put(name, info);
-      int r = distribute(obj, info, UPDATE_OBJ);
+      int r = distribute(name, obj, info, UPDATE_OBJ);
       if (r < 0)
         mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
     } else {
@@ -456,7 +456,7 @@ done:
 }
 
 template <class T>
-int RGWCache<T>::distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op)
+int RGWCache<T>::distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op)
 {
   RGWCacheNotifyInfo info;
 
@@ -466,7 +466,7 @@ int RGWCache<T>::distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op)
   info.obj = obj;
   bufferlist bl;
   ::encode(info, bl);
-  int ret = T::distribute(bl);
+  int ret = T::distribute(normal_name, bl);
   return ret;
 }
 
index 5676564d6ef787a67309a330b5aa4270be461371..d9e105074511596ee76b22e1c221725a0b5cc733 100644 (file)
@@ -38,7 +38,8 @@ static RGWRados rados_provider;
 RGWRados* RGWRados::store;
 
 
-static string notify_oid = "notify";
+static string notify_oid_prefix = "notify";
+static string *notify_oids = NULL;
 static string shadow_ns = "shadow";
 static string dir_oid_prefix = ".dir.";
 static string default_storage_pool = ".rgw.buckets";
@@ -120,7 +121,20 @@ int RGWRados::initialize()
 
 void RGWRados::finalize_watch()
 {
-  control_pool_ctx.unwatch(notify_oid, watch_handle);
+  for (int i = 0; i < num_watchers; i++) {
+    string& notify_oid = notify_oids[i];
+    if (notify_oid.empty())
+      continue;
+    uint64_t watch_handle = watch_handles[i];
+    control_pool_ctx.unwatch(notify_oid, watch_handle);
+
+    RGWWatcher *watcher = watchers[i];
+    delete watcher;
+  }
+
+  delete[] notify_oids;
+  delete[] watch_handles;
+  delete[] watchers;
 }
 
 /**
@@ -158,14 +172,50 @@ int RGWRados::init_watch()
       return r;
   }
 
-  r = control_pool_ctx.create(notify_oid, false);
-  if (r < 0 && r != -EEXIST)
-    return r;
+  num_watchers = cct->_conf->rgw_num_control_oids;
 
-  watcher = new RGWWatcher(this);
-  r = control_pool_ctx.watch(notify_oid, 0, &watch_handle, watcher);
+  bool compat_oid = (num_watchers == 0);
 
-  return r;
+  if (num_watchers <= 0)
+    num_watchers = 1;
+
+  notify_oids = new string[num_watchers];
+  watchers = new RGWWatcher *[num_watchers];
+  watch_handles = new uint64_t[num_watchers];
+
+  for (int i=0; i < num_watchers; i++) {
+    string& notify_oid = notify_oids[i];
+    notify_oid = notify_oid_prefix;
+    if (!compat_oid) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ".%d", i);
+      notify_oid.append(buf);
+    }
+    r = control_pool_ctx.create(notify_oid, false);
+    if (r < 0 && r != -EEXIST)
+      return r;
+
+    RGWWatcher *watcher = new RGWWatcher(this);
+    watchers[i] = watcher;
+
+    r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher);
+    if (r < 0)
+      return r;
+  }
+
+  return 0;
+}
+
+void RGWRados::pick_control_oid(const string& key, string& notify_oid)
+{
+  uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
+
+  int i = r % num_watchers;
+  char buf[16];
+  snprintf(buf, sizeof(buf), ".%d", i);
+
+  notify_oid = notify_oid_prefix;
+  notify_oid.append(buf);
 }
 
 int RGWRados::open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx)
@@ -2429,8 +2479,11 @@ int RGWRados::append_async(rgw_obj& obj, size_t size, bufferlist& bl)
   return r;
 }
 
-int RGWRados::distribute(bufferlist& bl)
+int RGWRados::distribute(const string& key, bufferlist& bl)
 {
+  string notify_oid;
+  pick_control_oid(key, notify_oid);
+
   ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
   int r = control_pool_ctx.notify(notify_oid, 0, bl);
   return r;
index 4acd8e8da800edb51053f6e8f7b9c3358793d32d..930c440ed0c2dcc807bca447ef67c4b225ec1a1b 100644 (file)
@@ -224,8 +224,9 @@ class RGWRados
   };
 
 
-  RGWWatcher *watcher;
-  uint64_t watch_handle;
+  int num_watchers;
+  RGWWatcher **watchers;
+  uint64_t *watch_handles;
   librados::IoCtx root_pool_ctx;      // .rgw
   librados::IoCtx control_pool_ctx;   // .rgw.control
 
@@ -280,7 +281,7 @@ protected:
   CephContext *cct;
 
 public:
-  RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0),
+  RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL),
                bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {}
   virtual ~RGWRados() {}
 
@@ -521,8 +522,9 @@ public:
 
   virtual int init_watch();
   virtual void finalize_watch();
-  virtual int distribute(bufferlist& bl);
+  virtual int distribute(const string& key, bufferlist& bl);
   virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; }
+  void pick_control_oid(const string& key, string& notify_oid);
 
   void *create_context(void *user_ctx) {
     RGWRadosCtx *rctx = new RGWRadosCtx();