virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
+ virtual void close_fd() {}
};
}
struct RGWLoadGenRequest : public RGWRequest {
+ const char *method;
+ int content_length;
+
+ RGWLoadGenRequest(const char *_m, int _cl) : method(_m), content_length(_cl) {}
};
class RGWLoadGenProcess : public RGWProcess {
- int sock_fd;
+ RGWAccessKey access_key;
public:
RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
- RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
+ RGWProcess(cct, pe, num_threads, _conf) {}
void run();
void handle_request(RGWRequest *req);
+ void gen_request(const char *method, int content_length);
void close_fd() { }
+
+ void set_access_key(RGWAccessKey& key) { access_key = key; }
};
void RGWLoadGenProcess::run()
{
m_tp.start(); /* start thread pool */
- for (;;) {
- RGWLoadGenRequest *req = new RGWLoadGenRequest;
- req->id = ++max_req_id;
- dout(10) << "allocated request req=" << hex << req << dec << dendl;
- req_throttle.get(1);
- req_wq.queue(req);
+ for (int i = 0; i < 1000; i++) {
+ gen_request("PUT", 4096);
}
-
m_tp.drain();
+
m_tp.stop();
}
+void RGWLoadGenProcess::gen_request(const char *method, int content_length)
+{
+ RGWLoadGenRequest *req = new RGWLoadGenRequest(method, content_length);
+ req->id = ++max_req_id;
+ dout(10) << "allocated request req=" << hex << req << dec << dendl;
+ req_throttle.get(1);
+ req_wq.queue(req);
+}
+
static void signal_shutdown()
{
if (!disable_signal_fd.read()) {
RGWLoadGenRequestEnv env;
env.port = 80;
- env.content_length = 0;
+ env.content_length = req->content_length;
env.content_type = "binary/octet-stream";
- env.request_method = "GET";
+ env.request_method = req->method;
env.uri = "/foo/bar";
RGWLoadGenIO client_io(&env);
class RGWFrontend {
public:
virtual ~RGWFrontend() {}
+
+ virtual int init() = 0;
+
virtual int run() = 0;
virtual void stop() = 0;
virtual void join() = 0;
};
};
-template <class T>
class RGWProcessFrontend : public RGWFrontend {
+protected:
RGWFrontendConfig *conf;
- T *pprocess;
+ RGWProcess *pprocess;
RGWProcessEnv env;
RGWProcessControlThread *thread;
public:
- RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
- pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
- thread = new RGWProcessControlThread(pprocess);
+ RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
}
~RGWProcessFrontend() {
}
int run() {
+ assert(pprocess); /* should have initialized by init() */
+ thread = new RGWProcessControlThread(pprocess);
thread->create();
return 0;
}
}
};
+class RGWFCGXFrontend : public RGWProcessFrontend {
+public:
+ RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
+
+ int init() {
+ pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+ return 0;
+ }
+};
+
+class RGWLoadGenFrontend : public RGWProcessFrontend {
+public:
+ RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
+
+ int init() {
+ RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+
+ pprocess = pp;
+
+ string uid;
+ conf->get_val("uid", "", &uid);
+ if (uid.empty()) {
+ derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
+ return EINVAL;
+ }
+
+ RGWUserInfo user_info;
+ int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
+ if (ret < 0) {
+ derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
+ return ret;
+ }
+
+ map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
+ if (aiter == user_info.access_keys.end()) {
+ derr << "ERROR: user has no S3 access keys set" << dendl;
+ return -EINVAL;
+ }
+
+ pp->set_access_key(aiter->second);
+
+ return 0;
+ }
+};
+
class RGWMongooseFrontend : public RGWFrontend {
RGWFrontendConfig *conf;
struct mg_context *ctx;
RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
}
+ int init() {
+ return 0;
+ }
+
int run() {
char thread_pool_buf[32];
snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
if (framework == "fastcgi" || framework == "fcgi") {
RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
- fe = new RGWProcessFrontend<RGWFCGXProcess>(fcgi_pe, config);
+ fe = new RGWFCGXFrontend(fcgi_pe, config);
} else if (framework == "mongoose") {
string err;
RGWProcessEnv env = { store, &rest, olog, port };
- fe = new RGWProcessFrontend<RGWLoadGenProcess>(env, config);
+ fe = new RGWLoadGenFrontend(env, config);
} else {
dout(0) << "WARNING: skipping unknown framework: " << framework << dendl;
continue;
}
dout(0) << "starting handler: " << fiter->first << dendl;
+ int r = fe->init();
+ if (r < 0) {
+ derr << "ERROR: failed initializing frontend" << dendl;
+ return -r;
+ }
fe->run();
fes.push_back(fe);