OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 60*60),
OPTION(rgw_thread_pool_size, OPT_INT, 100),
+ OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0),
+ OPTION(rgw_pools_preallocate_max, OPT_INT, 100),
+ OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70),
// see config.h
OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
int rgw_op_thread_timeout;
int rgw_op_thread_suicide_timeout;
int rgw_thread_pool_size;
+ double rgw_maintenance_tick_interval;
+ int rgw_pools_preallocate_max;
+ int rgw_pools_preallocate_threshold;
// This will be set to true when it is safe to start threads.
// Once it is true, it will never change.
static string avail_pools = ".pools.avail";
static string pool_name_prefix = "p";
-#define POOLS_PREALLOCATE_NUM 100
-
int rgw_store_bucket_info(string& bucket_name, RGWBucketInfo& info)
{
return ret;
}
-static int generate_preallocated_pools(vector<string>& pools)
+static int generate_preallocated_pools(vector<string>& pools, int num)
{
vector<string> names;
- for (int i = 0; i < POOLS_PREALLOCATE_NUM; i++) {
+ for (int i = 0; i < num; i++) {
string name = pool_name_prefix;
append_rand_alpha(pool_name_prefix, name, 8);
names.push_back(name);
return 0;
}
-static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+static int register_available_pools(vector<string>& pools)
{
- vector<string> pools;
- int ret = generate_preallocated_pools(pools);
- if (ret < 0) {
- RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
- return ret;
- }
- bucket.pool = pools.back();
- pools.pop_back();
- bucket.name = bucket_name;
-
map<string, bufferlist> m;
vector<string>::iterator iter;
m[name] = bl;
}
rgw_obj obj(pi_buckets, avail_pools);
- ret = rgwstore->tmap_set(obj, m);
+ int ret = rgwstore->tmap_set(obj, m);
if (ret == -ENOENT) {
rgw_bucket new_bucket;
map<string,bufferlist> attrs;
return 0;
}
+static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+{
+ vector<string> pools;
+ int ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max);
+ if (ret < 0) {
+ RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
+ return ret;
+ }
+ bucket.pool = pools.back();
+ pools.pop_back();
+ bucket.name = bucket_name;
+
+ ret = register_available_pools(pools);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
static int withdraw_pool(string& pool_name)
{
rgw_obj obj(pi_buckets, avail_pools);
return rgwstore->tmap_set(obj, pool_name, bl);
}
+int rgw_bucket_maintain_pools()
+{
+ bufferlist header;
+ map<string, bufferlist> m;
+ string pool_name;
+
+ rgw_obj obj(pi_buckets, avail_pools);
+ int ret = rgwstore->tmap_get(obj, header, m);
+ if (ret < 0 && ret != -ENOENT) {
+ return ret;
+ }
+
+ if ((int)m.size() < g_conf->rgw_pools_preallocate_threshold) {
+ RGW_LOG(0) << "rgw_bucket_maintain_pools allocating pools (m.size()=" << m.size() << " threshold="
+ << g_conf->rgw_pools_preallocate_threshold << ")" << dendl;
+ vector<string> pools;
+ ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max - m.size());
+ if (ret < 0) {
+ RGW_LOG(0) << "failed to preallocate pools" << dendl;
+ return ret;
+ }
+ ret = register_available_pools(pools);
+ if (ret < 0) {
+ RGW_LOG(0) << "failed to register available pools" << dendl;
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
int rgw_bucket_allocate_pool(string& bucket_name, rgw_bucket& bucket)
{
bufferlist header;
extern int rgw_create_bucket(std::string& id, string& bucket_name, rgw_bucket& bucket,
map<std::string, bufferlist>& attrs, bool exclusive = true, uint64_t auid = 0);
+extern int rgw_bucket_maintain_pools(void);
#endif
#include "common/config.h"
#include "common/errno.h"
#include "common/WorkQueue.h"
+#include "common/Timer.h"
#include "rgw_common.h"
#include "rgw_access.h"
#include "rgw_acl.h"
#include "rgw_rest.h"
#include "rgw_os.h"
#include "rgw_log.h"
+#include "rgw_bucket.h"
#include <map>
#include <string>
RGW_LOG(0) << "====== req done fcgx=" << hex << fcgx << dec << " http_status=" << http_ret << " ======" << dendl;
}
+class C_RGWMaintenanceTick : public Context {
+ SafeTimer *timer;
+public:
+ C_RGWMaintenanceTick(SafeTimer *t) : timer(t) {}
+ void finish(int r) {
+ rgw_bucket_maintain_pools();
+ RGW_LOG(20) << "C_RGWMaintenanceTick::finish()" << dendl;
+ timer->add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(timer));
+ }
+};
/*
* start up the RADOS connection and then handle HTTP messages as they come in
*/
RGWProcess process(g_ceph_context, g_conf->rgw_thread_pool_size);
+ Mutex lock("rgw_timer_lock");
+ SafeTimer timer(g_ceph_context, lock);
+
+ lock.Lock();
+ timer.init();
+ timer.add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(&timer));
+ lock.Unlock();
+
process.run();
+ lock.Lock();
+ timer.shutdown();
+ lock.Unlock();
+
return 0;
}
#define CEPH_RGWRADOS_H
#include "include/rados/librados.hpp"
+#include "include/Context.h"
#include "rgw_access.h"
#include "rgw_common.h"
class RGWWatcher;
+class SafeTimer;
struct RGWObjState {
bool is_atomic;
int set_buckets_auid(vector<rgw_bucket>& buckets, uint64_t auid);
+ Mutex lock;
+ SafeTimer *timer;
+
+ class C_Tick : public Context {
+ RGWRados *rados;
+ public:
+ C_Tick(RGWRados *_r) : rados(_r) {}
+ void finish(int r) {
+ rados->tick();
+ }
+ };
+
+
RGWWatcher *watcher;
uint64_t watch_handle;
librados::IoCtx root_pool_ctx;
pair<string, bufferlist> *cmp_xattr);
int delete_obj_impl(void *ctx, std::string& id, rgw_obj& src_obj, bool sync);
public:
- RGWRados() : watcher(NULL), watch_handle(0) {}
+ RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0) {}
+
+ void tick();
/** Initialize the RADOS instance and prepare to do other ops */
virtual int initialize(CephContext *cct);