From: Yehuda Sadeh Date: Mon, 18 Jun 2012 20:25:44 +0000 (-0700) Subject: rgw: use multiple notification objects X-Git-Tag: v0.51~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b28db08ea8b84ec9f1d2df88ac4edd6aea0ba7d4;p=ceph.git rgw: use multiple notification objects Issue #2504. This makes us listen and notify on more than a single object, which reduces the contention of cache notifications. NOTE: This change requires that any radosgw and radosgw-admin use the same 'rgw num control oids' config value. A config value of 0 will maintain old compatibility, and will allow an upgraded process run in conjuction with an old one. Setting value other than 0 (or using the non-zero default) will require upgrading and restarting all the gateways together. Failing to do so might lead to inconsistent user and buckets metadata (which will be resolved once gateways are restarted). Signed-off-by: Yehuda Sadeh --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index af9213e07b69..3045dfb8beb3 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -409,6 +409,7 @@ OPTION(rgw_remote_addr_param, OPT_STR, "REMOTE_ADDR") // e.g. X-Forwarded-For, OPTION(rgw_op_thread_timeout, OPT_INT, 10*60) OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 0) OPTION(rgw_thread_pool_size, OPT_INT, 100) +OPTION(rgw_num_control_oids, OPT_INT, 8) OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0) OPTION(rgw_pools_preallocate_max, OPT_INT, 100) OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70) diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 1cc319b1fa3f..fa7630e17f8e 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -185,7 +185,7 @@ class RGWCache : public T void finalize() { T::finalize_watch(); } - int distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op); + int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op); int watch_cb(int opcode, uint64_t ver, bufferlist& bl); public: RGWCache() {} @@ -223,7 +223,7 @@ int RGWCache::delete_obj(void *ctx, rgw_obj& obj, bool sync) cache.remove(name); ObjectCacheInfo info; - distribute(obj, info, REMOVE_OBJ); + distribute(name, obj, info, REMOVE_OBJ); return T::delete_obj(ctx, obj, sync); } @@ -292,7 +292,7 @@ int RGWCache::set_attr(void *ctx, rgw_obj& obj, const char *attr_name, buffer string name = normal_name(bucket, oid); if (ret >= 0) { cache.put(name, info); - int r = distribute(obj, info, UPDATE_OBJ); + int r = distribute(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { @@ -326,7 +326,7 @@ int RGWCache::set_attrs(void *ctx, rgw_obj& obj, string name = normal_name(bucket, oid); if (ret >= 0) { cache.put(name, info); - int r = distribute(obj, info, UPDATE_OBJ); + int r = distribute(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { @@ -362,7 +362,7 @@ int RGWCache::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mt string name = normal_name(bucket, oid); if (ret >= 0) { cache.put(name, info); - int r = distribute(obj, info, UPDATE_OBJ); + int r = distribute(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { @@ -397,7 +397,7 @@ int RGWCache::put_obj_data(void *ctx, rgw_obj& obj, const char *data, string name = normal_name(bucket, oid); if (ret >= 0) { cache.put(name, info); - int r = distribute(obj, info, UPDATE_OBJ); + int r = distribute(name, obj, info, UPDATE_OBJ); if (r < 0) mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { @@ -456,7 +456,7 @@ done: } template -int RGWCache::distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op) +int RGWCache::distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op) { RGWCacheNotifyInfo info; @@ -466,7 +466,7 @@ int RGWCache::distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op) info.obj = obj; bufferlist bl; ::encode(info, bl); - int ret = T::distribute(bl); + int ret = T::distribute(normal_name, bl); return ret; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 5676564d6ef7..d9e105074511 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -38,7 +38,8 @@ static RGWRados rados_provider; RGWRados* RGWRados::store; -static string notify_oid = "notify"; +static string notify_oid_prefix = "notify"; +static string *notify_oids = NULL; static string shadow_ns = "shadow"; static string dir_oid_prefix = ".dir."; static string default_storage_pool = ".rgw.buckets"; @@ -120,7 +121,20 @@ int RGWRados::initialize() void RGWRados::finalize_watch() { - control_pool_ctx.unwatch(notify_oid, watch_handle); + for (int i = 0; i < num_watchers; i++) { + string& notify_oid = notify_oids[i]; + if (notify_oid.empty()) + continue; + uint64_t watch_handle = watch_handles[i]; + control_pool_ctx.unwatch(notify_oid, watch_handle); + + RGWWatcher *watcher = watchers[i]; + delete watcher; + } + + delete[] notify_oids; + delete[] watch_handles; + delete[] watchers; } /** @@ -158,14 +172,50 @@ int RGWRados::init_watch() return r; } - r = control_pool_ctx.create(notify_oid, false); - if (r < 0 && r != -EEXIST) - return r; + num_watchers = cct->_conf->rgw_num_control_oids; - watcher = new RGWWatcher(this); - r = control_pool_ctx.watch(notify_oid, 0, &watch_handle, watcher); + bool compat_oid = (num_watchers == 0); - return r; + if (num_watchers <= 0) + num_watchers = 1; + + notify_oids = new string[num_watchers]; + watchers = new RGWWatcher *[num_watchers]; + watch_handles = new uint64_t[num_watchers]; + + for (int i=0; i < num_watchers; i++) { + string& notify_oid = notify_oids[i]; + notify_oid = notify_oid_prefix; + if (!compat_oid) { + char buf[16]; + snprintf(buf, sizeof(buf), ".%d", i); + notify_oid.append(buf); + } + r = control_pool_ctx.create(notify_oid, false); + if (r < 0 && r != -EEXIST) + return r; + + RGWWatcher *watcher = new RGWWatcher(this); + watchers[i] = watcher; + + r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher); + if (r < 0) + return r; + } + + return 0; +} + +void RGWRados::pick_control_oid(const string& key, string& notify_oid) +{ + uint32_t r = ceph_str_hash_linux(key.c_str(), key.size()); + + int i = r % num_watchers; + char buf[16]; + snprintf(buf, sizeof(buf), ".%d", i); + + notify_oid = notify_oid_prefix; + notify_oid.append(buf); } int RGWRados::open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx) @@ -2429,8 +2479,11 @@ int RGWRados::append_async(rgw_obj& obj, size_t size, bufferlist& bl) return r; } -int RGWRados::distribute(bufferlist& bl) +int RGWRados::distribute(const string& key, bufferlist& bl) { + string notify_oid; + pick_control_oid(key, notify_oid); + ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl; int r = control_pool_ctx.notify(notify_oid, 0, bl); return r; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 4acd8e8da800..930c440ed0c2 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -224,8 +224,9 @@ class RGWRados }; - RGWWatcher *watcher; - uint64_t watch_handle; + int num_watchers; + RGWWatcher **watchers; + uint64_t *watch_handles; librados::IoCtx root_pool_ctx; // .rgw librados::IoCtx control_pool_ctx; // .rgw.control @@ -280,7 +281,7 @@ protected: CephContext *cct; public: - RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0), + RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL), bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {} virtual ~RGWRados() {} @@ -521,8 +522,9 @@ public: virtual int init_watch(); virtual void finalize_watch(); - virtual int distribute(bufferlist& bl); + virtual int distribute(const string& key, bufferlist& bl); virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; } + void pick_control_oid(const string& key, string& notify_oid); void *create_context(void *user_ctx) { RGWRadosCtx *rctx = new RGWRadosCtx();