OPTION(rgw_op_thread_timeout, OPT_INT, 10*60)
OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 0)
OPTION(rgw_thread_pool_size, OPT_INT, 100)
+OPTION(rgw_num_control_oids, OPT_INT, 8)
OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0)
OPTION(rgw_pools_preallocate_max, OPT_INT, 100)
OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70)
void finalize() {
T::finalize_watch();
}
- int distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
+ int distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
public:
RGWCache() {}
cache.remove(name);
ObjectCacheInfo info;
- distribute(obj, info, REMOVE_OBJ);
+ distribute(name, obj, info, REMOVE_OBJ);
return T::delete_obj(ctx, obj, sync);
}
string name = normal_name(bucket, oid);
if (ret >= 0) {
cache.put(name, info);
- int r = distribute(obj, info, UPDATE_OBJ);
+ int r = distribute(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
string name = normal_name(bucket, oid);
if (ret >= 0) {
cache.put(name, info);
- int r = distribute(obj, info, UPDATE_OBJ);
+ int r = distribute(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
string name = normal_name(bucket, oid);
if (ret >= 0) {
cache.put(name, info);
- int r = distribute(obj, info, UPDATE_OBJ);
+ int r = distribute(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
string name = normal_name(bucket, oid);
if (ret >= 0) {
cache.put(name, info);
- int r = distribute(obj, info, UPDATE_OBJ);
+ int r = distribute(name, obj, info, UPDATE_OBJ);
if (r < 0)
mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
}
template <class T>
-int RGWCache<T>::distribute(rgw_obj& obj, ObjectCacheInfo& obj_info, int op)
+int RGWCache<T>::distribute(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op)
{
RGWCacheNotifyInfo info;
info.obj = obj;
bufferlist bl;
::encode(info, bl);
- int ret = T::distribute(bl);
+ int ret = T::distribute(normal_name, bl);
return ret;
}
RGWRados* RGWRados::store;
-static string notify_oid = "notify";
+static string notify_oid_prefix = "notify";
+static string *notify_oids = NULL;
static string shadow_ns = "shadow";
static string dir_oid_prefix = ".dir.";
static string default_storage_pool = ".rgw.buckets";
void RGWRados::finalize_watch()
{
- control_pool_ctx.unwatch(notify_oid, watch_handle);
+ for (int i = 0; i < num_watchers; i++) {
+ string& notify_oid = notify_oids[i];
+ if (notify_oid.empty())
+ continue;
+ uint64_t watch_handle = watch_handles[i];
+ control_pool_ctx.unwatch(notify_oid, watch_handle);
+
+ RGWWatcher *watcher = watchers[i];
+ delete watcher;
+ }
+
+ delete[] notify_oids;
+ delete[] watch_handles;
+ delete[] watchers;
}
/**
return r;
}
- r = control_pool_ctx.create(notify_oid, false);
- if (r < 0 && r != -EEXIST)
- return r;
+ num_watchers = cct->_conf->rgw_num_control_oids;
- watcher = new RGWWatcher(this);
- r = control_pool_ctx.watch(notify_oid, 0, &watch_handle, watcher);
+ bool compat_oid = (num_watchers == 0);
- return r;
+ if (num_watchers <= 0)
+ num_watchers = 1;
+
+ notify_oids = new string[num_watchers];
+ watchers = new RGWWatcher *[num_watchers];
+ watch_handles = new uint64_t[num_watchers];
+
+ for (int i=0; i < num_watchers; i++) {
+ string& notify_oid = notify_oids[i];
+ notify_oid = notify_oid_prefix;
+ if (!compat_oid) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ".%d", i);
+ notify_oid.append(buf);
+ }
+ r = control_pool_ctx.create(notify_oid, false);
+ if (r < 0 && r != -EEXIST)
+ return r;
+
+ RGWWatcher *watcher = new RGWWatcher(this);
+ watchers[i] = watcher;
+
+ r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher);
+ if (r < 0)
+ return r;
+ }
+
+ return 0;
+}
+
+void RGWRados::pick_control_oid(const string& key, string& notify_oid)
+{
+ uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
+
+ int i = r % num_watchers;
+ char buf[16];
+ snprintf(buf, sizeof(buf), ".%d", i);
+
+ notify_oid = notify_oid_prefix;
+ notify_oid.append(buf);
}
int RGWRados::open_bucket_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx)
return r;
}
-int RGWRados::distribute(bufferlist& bl)
+int RGWRados::distribute(const string& key, bufferlist& bl)
{
+ string notify_oid;
+ pick_control_oid(key, notify_oid);
+
ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
int r = control_pool_ctx.notify(notify_oid, 0, bl);
return r;
};
- RGWWatcher *watcher;
- uint64_t watch_handle;
+ int num_watchers;
+ RGWWatcher **watchers;
+ uint64_t *watch_handles;
librados::IoCtx root_pool_ctx; // .rgw
librados::IoCtx control_pool_ctx; // .rgw.control
CephContext *cct;
public:
- RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0),
+ RGWRados() : lock("rados_timer_lock"), timer(NULL), num_watchers(0), watchers(NULL), watch_handles(NULL),
bucket_id_lock("rados_bucket_id"), max_bucket_id(0) {}
virtual ~RGWRados() {}
virtual int init_watch();
virtual void finalize_watch();
- virtual int distribute(bufferlist& bl);
+ virtual int distribute(const string& key, bufferlist& bl);
virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; }
+ void pick_control_oid(const string& key, string& notify_oid);
void *create_context(void *user_ctx) {
RGWRadosCtx *rctx = new RGWRadosCtx();