]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: loadgen frontend read uid, init access key
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 17 Dec 2013 01:05:18 +0000 (17:05 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 17 Jan 2014 18:14:42 +0000 (10:14 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_main.cc

index f28272c62122bbcffa33afa472e0cea8310cdf54..3990950bed72cd164127622d30d8a59925612221 100644 (file)
@@ -233,6 +233,7 @@ public:
   virtual ~RGWProcess() {}
   virtual void run() = 0;
   virtual void handle_request(RGWRequest *req) = 0;
+  virtual void close_fd() {}
 };
 
 
@@ -321,35 +322,47 @@ void RGWFCGXProcess::run()
 }
 
 struct RGWLoadGenRequest : public RGWRequest {
+  const char *method;
+  int content_length;
+
+  RGWLoadGenRequest(const char *_m, int _cl) : method(_m), content_length(_cl) {}
 };
 
 class RGWLoadGenProcess : public RGWProcess {
-  int sock_fd;
+  RGWAccessKey access_key;
 public:
   RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
-    RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
+    RGWProcess(cct, pe, num_threads, _conf) {}
   void run();
   void handle_request(RGWRequest *req);
+  void gen_request(const char *method, int content_length);
 
   void close_fd() { }
+
+  void set_access_key(RGWAccessKey& key) { access_key = key; }
 };
 
 void RGWLoadGenProcess::run()
 {
   m_tp.start(); /* start thread pool */
 
-  for (;;) {
-    RGWLoadGenRequest *req = new RGWLoadGenRequest;
-    req->id = ++max_req_id;
-    dout(10) << "allocated request req=" << hex << req << dec << dendl;
-    req_throttle.get(1);
-    req_wq.queue(req);
+  for (int i = 0; i < 1000; i++) {
+    gen_request("PUT", 4096);
   }
-
   m_tp.drain();
+
   m_tp.stop();
 }
 
+void RGWLoadGenProcess::gen_request(const char *method, int content_length)
+{
+  RGWLoadGenRequest *req = new RGWLoadGenRequest(method, content_length);
+  req->id = ++max_req_id;
+  dout(10) << "allocated request req=" << hex << req << dec << dendl;
+  req_throttle.get(1);
+  req_wq.queue(req);
+}
+
 static void signal_shutdown()
 {
   if (!disable_signal_fd.read()) {
@@ -561,9 +574,9 @@ void RGWLoadGenProcess::handle_request(RGWRequest *r)
   RGWLoadGenRequestEnv env;
 
   env.port = 80;
-  env.content_length = 0;
+  env.content_length = req->content_length;
   env.content_type = "binary/octet-stream";
-  env.request_method = "GET";
+  env.request_method = req->method;
   env.uri = "/foo/bar";
 
   RGWLoadGenIO client_io(&env);
@@ -712,6 +725,9 @@ bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
 class RGWFrontend {
 public:
   virtual ~RGWFrontend() {}
+
+  virtual int init() = 0;
+
   virtual int run() = 0;
   virtual void stop() = 0;
   virtual void join() = 0;
@@ -728,17 +744,15 @@ public:
   };
 };
 
-template <class T>
 class RGWProcessFrontend : public RGWFrontend {
+protected:
   RGWFrontendConfig *conf;
-  T *pprocess;
+  RGWProcess *pprocess;
   RGWProcessEnv env;
   RGWProcessControlThread *thread;
 
 public:
-  RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
-    pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
-    thread = new RGWProcessControlThread(pprocess);
+  RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
   }
 
   ~RGWProcessFrontend() {
@@ -746,6 +760,8 @@ public:
   }
 
   int run() {
+    assert(pprocess); /* should have initialized by init() */
+    thread = new RGWProcessControlThread(pprocess);
     thread->create();
     return 0;
   }
@@ -760,6 +776,51 @@ public:
   }
 };
 
+class RGWFCGXFrontend : public RGWProcessFrontend {
+public:
+  RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
+
+  int init() {
+    pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+    return 0;
+  }
+};
+
+class RGWLoadGenFrontend : public RGWProcessFrontend {
+public:
+  RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
+
+  int init() {
+    RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+
+    pprocess = pp;
+
+    string uid;
+    conf->get_val("uid", "", &uid);
+    if (uid.empty()) {
+      derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
+      return EINVAL;
+    }
+
+    RGWUserInfo user_info;
+    int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
+    if (ret < 0) {
+      derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
+      return ret;
+    }
+
+    map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
+    if (aiter == user_info.access_keys.end()) {
+      derr << "ERROR: user has no S3 access keys set" << dendl;
+      return -EINVAL;
+    }
+
+    pp->set_access_key(aiter->second);
+
+    return 0;
+  }
+};
+
 class RGWMongooseFrontend : public RGWFrontend {
   RGWFrontendConfig *conf;
   struct mg_context *ctx;
@@ -769,6 +830,10 @@ public:
   RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
   }
 
+  int init() {
+    return 0;
+  }
+
   int run() {
     char thread_pool_buf[32];
     snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
@@ -971,7 +1036,7 @@ int main(int argc, const char **argv)
     if (framework == "fastcgi" || framework == "fcgi") {
       RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
 
-      fe = new RGWProcessFrontend<RGWFCGXProcess>(fcgi_pe, config);
+      fe = new RGWFCGXFrontend(fcgi_pe, config);
     } else if (framework == "mongoose") {
       string err;
 
@@ -987,12 +1052,17 @@ int main(int argc, const char **argv)
 
       RGWProcessEnv env = { store, &rest, olog, port };
 
-      fe = new RGWProcessFrontend<RGWLoadGenProcess>(env, config);
+      fe = new RGWLoadGenFrontend(env, config);
     } else {
       dout(0) << "WARNING: skipping unknown framework: " << framework << dendl;
       continue;
     }
     dout(0) << "starting handler: " << fiter->first << dendl;
+    int r = fe->init();
+    if (r < 0) {
+      derr << "ERROR: failed initializing frontend" << dendl;
+      return -r;
+    }
     fe->run();
 
     fes.push_back(fe);