]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: poll allocation thread
authorYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 1 Sep 2011 23:00:39 +0000 (16:00 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 1 Sep 2011 23:00:39 +0000 (16:00 -0700)
src/common/config.cc
src/common/config.h
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_main.cc
src/rgw/rgw_rados.h

index c1d99d1412ddafcc94a693a582e41354dd6415c2..8bfcc21423a601f430a17e3e2eecf6c07abed4c9 100644 (file)
@@ -434,6 +434,9 @@ struct config_option config_optionsp[] = {
   OPTION(rgw_op_thread_timeout, OPT_INT, 10*60),
   OPTION(rgw_op_thread_suicide_timeout, OPT_INT, 60*60),
   OPTION(rgw_thread_pool_size, OPT_INT, 100),
+  OPTION(rgw_maintenance_tick_interval, OPT_DOUBLE, 10.0),
+  OPTION(rgw_pools_preallocate_max, OPT_INT, 100),
+  OPTION(rgw_pools_preallocate_threshold, OPT_INT, 70),
 
   // see config.h
   OPTION(internal_safe_to_start_threads, OPT_BOOL, false),
index 644cc1d6be1a6d7acc1a150761bf6539d7fa2d53..467789c7994443129697b127932affd13119ab12 100644 (file)
@@ -568,6 +568,9 @@ public:
   int rgw_op_thread_timeout;
   int rgw_op_thread_suicide_timeout;
   int rgw_thread_pool_size;
+  double rgw_maintenance_tick_interval;
+  int rgw_pools_preallocate_max;
+  int rgw_pools_preallocate_threshold;
 
   // This will be set to true when it is safe to start threads.
   // Once it is true, it will never change.
index c8ceccde9b07261a12614223cc2ace298705ee3b..8069dbd9027709bfe3209a87230553cc7e73a64b 100644 (file)
@@ -15,8 +15,6 @@ static rgw_bucket pi_buckets(BUCKETS_POOL_NAME);
 static string avail_pools = ".pools.avail";
 static string pool_name_prefix = "p";
 
-#define POOLS_PREALLOCATE_NUM 100
-
 
 int rgw_store_bucket_info(string& bucket_name, RGWBucketInfo& info)
 {
@@ -64,11 +62,11 @@ int rgw_remove_bucket_info(string& bucket_name)
   return ret;
 }
 
-static int generate_preallocated_pools(vector<string>& pools)
+static int generate_preallocated_pools(vector<string>& pools, int num)
 {
   vector<string> names;
 
-  for (int i = 0; i < POOLS_PREALLOCATE_NUM; i++) {
+  for (int i = 0; i < num; i++) {
     string name = pool_name_prefix;
     append_rand_alpha(pool_name_prefix, name, 8);
     names.push_back(name);
@@ -98,18 +96,8 @@ static int generate_preallocated_pools(vector<string>& pools)
   return 0;
 }
 
-static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+static int register_available_pools(vector<string>& pools)
 {
-  vector<string> pools;
-  int ret = generate_preallocated_pools(pools);
-  if (ret < 0) {
-    RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
-    return ret;
-  }
-  bucket.pool = pools.back();
-  pools.pop_back();
-  bucket.name = bucket_name;
-
   map<string, bufferlist> m;
   vector<string>::iterator iter;
 
@@ -119,7 +107,7 @@ static int generate_pool(string& bucket_name, rgw_bucket& bucket)
     m[name] = bl;
   }
   rgw_obj obj(pi_buckets, avail_pools);
-  ret = rgwstore->tmap_set(obj, m);
+  int ret = rgwstore->tmap_set(obj, m);
   if (ret == -ENOENT) {
     rgw_bucket new_bucket;
     map<string,bufferlist> attrs;
@@ -136,6 +124,26 @@ static int generate_pool(string& bucket_name, rgw_bucket& bucket)
   return 0;
 }
 
+static int generate_pool(string& bucket_name, rgw_bucket& bucket)
+{
+  vector<string> pools;
+  int ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max);
+  if (ret < 0) {
+    RGW_LOG(0) << "generate_preallocad_pools returned " << ret << dendl;
+    return ret;
+  }
+  bucket.pool = pools.back();
+  pools.pop_back();
+  bucket.name = bucket_name;
+
+  ret = register_available_pools(pools);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return 0;
+}
+
 static int withdraw_pool(string& pool_name)
 {
   rgw_obj obj(pi_buckets, avail_pools);
@@ -143,6 +151,37 @@ static int withdraw_pool(string& pool_name)
   return rgwstore->tmap_set(obj, pool_name, bl);
 }
 
+int rgw_bucket_maintain_pools()
+{
+  bufferlist header;
+  map<string, bufferlist> m;
+  string pool_name;
+
+  rgw_obj obj(pi_buckets, avail_pools);
+  int ret = rgwstore->tmap_get(obj, header, m);
+  if (ret < 0 && ret != -ENOENT) {
+      return ret;
+  }
+
+  if ((int)m.size() < g_conf->rgw_pools_preallocate_threshold) {
+    RGW_LOG(0) << "rgw_bucket_maintain_pools allocating pools (m.size()=" << m.size() << " threshold="
+               << g_conf->rgw_pools_preallocate_threshold << ")" << dendl;
+    vector<string> pools;
+    ret = generate_preallocated_pools(pools, g_conf->rgw_pools_preallocate_max - m.size());
+    if (ret < 0) {
+      RGW_LOG(0) << "failed to preallocate pools" << dendl;
+      return ret;
+    }
+    ret = register_available_pools(pools);
+    if (ret < 0) {
+      RGW_LOG(0) << "failed to register available pools" << dendl;
+      return ret;
+    }
+  }
+
+  return 0;
+}
+
 int rgw_bucket_allocate_pool(string& bucket_name, rgw_bucket& bucket)
 {
   bufferlist header;
index 63c339c1402e21f545c2dfac4de44eb3bb66bb98..91706f56455ea60cbac7fa2872badf831f24e3e4 100644 (file)
@@ -17,6 +17,7 @@ extern int rgw_bucket_allocate_pool(string& bucket_name, rgw_bucket& bucket);
 extern int rgw_create_bucket(std::string& id, string& bucket_name, rgw_bucket& bucket,
                       map<std::string, bufferlist>& attrs, bool exclusive = true, uint64_t auid = 0);
 
+extern int rgw_bucket_maintain_pools(void);
 
 #endif
 
index 1ab2219e556b10d46665b68538a7311acf78f195..a67d8dd915121bdfaefe3e3dafb2b95103dd60db 100644 (file)
@@ -17,6 +17,7 @@
 #include "common/config.h"
 #include "common/errno.h"
 #include "common/WorkQueue.h"
+#include "common/Timer.h"
 #include "rgw_common.h"
 #include "rgw_access.h"
 #include "rgw_acl.h"
@@ -25,6 +26,7 @@
 #include "rgw_rest.h"
 #include "rgw_os.h"
 #include "rgw_log.h"
+#include "rgw_bucket.h"
 
 #include <map>
 #include <string>
@@ -218,6 +220,16 @@ done:
   RGW_LOG(0) << "====== req done fcgx=" << hex << fcgx << dec << " http_status=" << http_ret << " ======" << dendl;
 }
 
+class C_RGWMaintenanceTick : public Context {
+  SafeTimer *timer;
+public:
+  C_RGWMaintenanceTick(SafeTimer *t) : timer(t) {}
+  void finish(int r) {
+    rgw_bucket_maintain_pools();
+    RGW_LOG(20) << "C_RGWMaintenanceTick::finish()" << dendl;
+    timer->add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(timer));
+  }
+};
 /*
  * start up the RADOS connection and then handle HTTP messages as they come in
  */
@@ -255,8 +267,20 @@ int main(int argc, const char **argv)
 
   RGWProcess process(g_ceph_context, g_conf->rgw_thread_pool_size);
 
+  Mutex lock("rgw_timer_lock");
+  SafeTimer timer(g_ceph_context, lock);
+
+  lock.Lock();
+  timer.init();
+  timer.add_event_after(g_conf->rgw_maintenance_tick_interval, new C_RGWMaintenanceTick(&timer));
+  lock.Unlock();
+
   process.run();
 
+  lock.Lock();
+  timer.shutdown();
+  lock.Unlock();
+
   return 0;
 }
 
index 2cbaaa703407df362f3d50acb31d38a804ac520d..bcc96835e29ce380d5bc2f2bf0a5b341174d6a85 100644 (file)
@@ -2,10 +2,12 @@
 #define CEPH_RGWRADOS_H
 
 #include "include/rados/librados.hpp"
+#include "include/Context.h"
 #include "rgw_access.h"
 #include "rgw_common.h"
 
 class RGWWatcher;
+class SafeTimer;
 
 struct RGWObjState {
   bool is_atomic;
@@ -77,6 +79,19 @@ class RGWRados  : public RGWAccess
 
   int set_buckets_auid(vector<rgw_bucket>& buckets, uint64_t auid);
 
+  Mutex lock;
+  SafeTimer *timer;
+
+  class C_Tick : public Context {
+    RGWRados *rados;
+  public:
+    C_Tick(RGWRados *_r) : rados(_r) {}
+    void finish(int r) {
+      rados->tick();
+    }
+  };
+
+
   RGWWatcher *watcher;
   uint64_t watch_handle;
   librados::IoCtx root_pool_ctx;
@@ -106,7 +121,9 @@ class RGWRados  : public RGWAccess
                  pair<string, bufferlist> *cmp_xattr);
   int delete_obj_impl(void *ctx, std::string& id, rgw_obj& src_obj, bool sync);
 public:
-  RGWRados() : watcher(NULL), watch_handle(0) {}
+  RGWRados() : lock("rados_timer_lock"), timer(NULL), watcher(NULL), watch_handle(0) {}
+
+  void tick();
 
   /** Initialize the RADOS instance and prepare to do other ops */
   virtual int initialize(CephContext *cct);