--- /dev/null
+
+#include <string.h>
+
+#include "rgw_loadgen.h"
+
+
+#define dout_subsys ceph_subsys_rgw
+
+int RGWLoadGenIO::write_data(const char *buf, int len)
+{
+ return len;
+}
+
+int RGWLoadGenIO::read_data(char *buf, int len)
+{
+ int read_len = MIN(left_to_read, (uint64_t)len);
+ left_to_read -= read_len;
+ return read_len;
+}
+
+void RGWLoadGenIO::flush()
+{
+}
+
+int RGWLoadGenIO::complete_request()
+{
+ return 0;
+}
+
+void RGWLoadGenIO::init_env(CephContext *cct)
+{
+ env.init(cct);
+
+ left_to_read = req->content_length;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%lld", (long long)req->content_length);
+ env.set("CONTENT_LENGTH", buf);
+
+ env.set("CONTENT_TYPE", req->content_type.c_str());
+
+ for (map<string, string>::iterator iter = req->headers.begin(); iter != req->headers.end(); ++iter) {
+ env.set(iter->first.c_str(), iter->second.c_str());
+ }
+
+ env.set("REQUEST_METHOD", req->request_method.c_str());
+ env.set("REQUEST_URI", req->uri.c_str());
+ env.set("QUERY_STRING", req->query_string.c_str());
+ env.set("SCRIPT_URI", req->uri.c_str());
+
+ char port_buf[16];
+ snprintf(port_buf, sizeof(port_buf), "%d", req->port);
+ env.set("SERVER_PORT", port_buf);
+}
+
+int RGWLoadGenIO::send_status(const char *status, const char *status_name)
+{
+ return 0;
+}
+
+int RGWLoadGenIO::send_100_continue()
+{
+ return 0;
+}
+
+int RGWLoadGenIO::complete_header()
+{
+ return 0;
+}
+
+int RGWLoadGenIO::send_content_length(uint64_t len)
+{
+ return 0;
+}
--- /dev/null
+#ifndef CEPH_RGW_LOADGEN_H
+#define CEPH_RGW_LOADGEN_H
+
+#include "rgw_client_io.h"
+
+
+struct RGWLoadGenRequestEnv {
+ int port;
+ uint64_t content_length;
+ string content_type;
+ string request_method;
+ string uri;
+ string query_string;
+
+ map<string, string> headers;
+
+ RGWLoadGenRequestEnv() : port(0), content_length(0) {}
+};
+
+class RGWLoadGenIO : public RGWClientIO
+{
+ uint64_t left_to_read;
+ RGWLoadGenRequestEnv *req;
+public:
+ void init_env(CephContext *cct);
+
+ int write_data(const char *buf, int len);
+ int read_data(char *buf, int len);
+
+ int send_status(const char *status, const char *status_name);
+ int send_100_continue();
+ int complete_header();
+ int complete_request();
+ int send_content_length(uint64_t len);
+
+ RGWLoadGenIO(RGWLoadGenRequestEnv *_re) : left_to_read(0), req(_re) {}
+ void flush();
+};
+
+
+#endif
#include "rgw_log.h"
#include "rgw_tools.h"
#include "rgw_resolve.h"
+#include "rgw_loadgen.h"
#include "rgw_mongoose.h"
#include "mongoose/mongoose.h"
RGWREST *rest;
RGWFrontendConfig *conf;
- RGWProcessEnv *process_env;
-
struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess *process;
RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
m_tp.stop();
}
+struct RGWLoadGenRequest : public RGWRequest {
+};
+
+class RGWLoadGenProcess : public RGWProcess {
+ int sock_fd;
+public:
+ RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
+ RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
+ void run();
+ void handle_request(RGWRequest *req);
+
+ void close_fd() { }
+};
+
+void RGWLoadGenProcess::run()
+{
+ m_tp.start(); /* start thread pool */
+
+ for (;;) {
+ RGWLoadGenRequest *req = new RGWLoadGenRequest;
+ req->id = ++max_req_id;
+ dout(10) << "allocated request req=" << hex << req << dec << dendl;
+ req_throttle.get(1);
+ req_wq.queue(req);
+ }
+
+ m_tp.drain();
+ m_tp.stop();
+}
+
static void signal_shutdown()
{
if (!disable_signal_fd.read()) {
delete req;
}
+void RGWLoadGenProcess::handle_request(RGWRequest *r)
+{
+ RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
+
+ RGWLoadGenRequestEnv env;
+
+ env.port = 80;
+ env.content_length = 0;
+ env.content_type = "binary/octet-stream";
+ env.request_method = "GET";
+ env.uri = "/foo/bar";
+
+ RGWLoadGenIO client_io(&env);
+
+ int ret = process_request(store, rest, req, &client_io, olog);
+ if (ret < 0) {
+ /* we don't really care about return code */
+ dout(20) << "process_request() returned " << ret << dendl;
+ }
+
+ delete req;
+}
+
static int mongoose_callback(struct mg_connection *conn) {
struct mg_request_info *req_info = mg_get_request_info(conn);
virtual void join() = 0;
};
-class RGWFCGXControlThread : public Thread {
+class RGWProcessControlThread : public Thread {
RGWProcess *pprocess;
public:
- RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
+ RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
void *entry() {
pprocess->run();
};
};
-class RGWFCGXFrontend : public RGWFrontend {
+template <class T>
+class RGWProcessFrontend : public RGWFrontend {
RGWFrontendConfig *conf;
- RGWFCGXProcess *pprocess;
+ T *pprocess;
RGWProcessEnv env;
- RGWFCGXControlThread *thread;
+ RGWProcessControlThread *thread;
public:
- RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
- pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
- thread = new RGWFCGXControlThread(pprocess);
+ RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
+ pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+ thread = new RGWProcessControlThread(pprocess);
}
- ~RGWFCGXFrontend() {
+ ~RGWProcessFrontend() {
delete thread;
}
if (framework == "fastcgi" || framework == "fcgi") {
RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
- fe = new RGWFCGXFrontend(fcgi_pe, config);
+ fe = new RGWProcessFrontend<RGWFCGXProcess>(fcgi_pe, config);
} else if (framework == "mongoose") {
string err;
RGWProcessEnv env = { store, &rest, olog, port };
fe = new RGWMongooseFrontend(env, config);
+ } else if (framework == "loadgen") {
+ int port;
+ config->get_val("port", 80, &port);
+
+ RGWProcessEnv env = { store, &rest, olog, port };
+
+ fe = new RGWProcessFrontend<RGWLoadGenProcess>(env, config);
} else {
dout(0) << "WARNING: skipping unknown framework: " << framework << dendl;
continue;