From: Yehuda Sadeh Date: Mon, 16 Dec 2013 23:30:57 +0000 (-0800) Subject: rgw: add a load generation frontend X-Git-Tag: v0.78~307^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=57137cb4b0083ccc906d7f43ed5e0e716d68e889;p=ceph.git rgw: add a load generation frontend Still missing some pieces, but already generates requests. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index a2c06abe260e..9bcd939c2a17 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -67,6 +67,7 @@ radosgw_SOURCES = \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ rgw/rgw_swift_auth.cc \ + rgw/rgw_loadgen.cc \ rgw/rgw_mongoose.cc \ mongoose/mongoose.c \ rgw/rgw_main.cc diff --git a/src/rgw/rgw_loadgen.cc b/src/rgw/rgw_loadgen.cc new file mode 100644 index 000000000000..56c96b078715 --- /dev/null +++ b/src/rgw/rgw_loadgen.cc @@ -0,0 +1,74 @@ + +#include + +#include "rgw_loadgen.h" + + +#define dout_subsys ceph_subsys_rgw + +int RGWLoadGenIO::write_data(const char *buf, int len) +{ + return len; +} + +int RGWLoadGenIO::read_data(char *buf, int len) +{ + int read_len = MIN(left_to_read, (uint64_t)len); + left_to_read -= read_len; + return read_len; +} + +void RGWLoadGenIO::flush() +{ +} + +int RGWLoadGenIO::complete_request() +{ + return 0; +} + +void RGWLoadGenIO::init_env(CephContext *cct) +{ + env.init(cct); + + left_to_read = req->content_length; + + char buf[32]; + snprintf(buf, sizeof(buf), "%lld", (long long)req->content_length); + env.set("CONTENT_LENGTH", buf); + + env.set("CONTENT_TYPE", req->content_type.c_str()); + + for (map::iterator iter = req->headers.begin(); iter != req->headers.end(); ++iter) { + env.set(iter->first.c_str(), iter->second.c_str()); + } + + env.set("REQUEST_METHOD", req->request_method.c_str()); + env.set("REQUEST_URI", req->uri.c_str()); + env.set("QUERY_STRING", req->query_string.c_str()); + env.set("SCRIPT_URI", req->uri.c_str()); + + char port_buf[16]; + snprintf(port_buf, sizeof(port_buf), "%d", req->port); + env.set("SERVER_PORT", port_buf); +} + +int RGWLoadGenIO::send_status(const char *status, const char *status_name) +{ + return 0; +} + +int RGWLoadGenIO::send_100_continue() +{ + return 0; +} + +int RGWLoadGenIO::complete_header() +{ + return 0; +} + +int RGWLoadGenIO::send_content_length(uint64_t len) +{ + return 0; +} diff --git a/src/rgw/rgw_loadgen.h b/src/rgw/rgw_loadgen.h new file mode 100644 index 000000000000..4a7f16120dee --- /dev/null +++ b/src/rgw/rgw_loadgen.h @@ -0,0 +1,41 @@ +#ifndef CEPH_RGW_LOADGEN_H +#define CEPH_RGW_LOADGEN_H + +#include "rgw_client_io.h" + + +struct RGWLoadGenRequestEnv { + int port; + uint64_t content_length; + string content_type; + string request_method; + string uri; + string query_string; + + map headers; + + RGWLoadGenRequestEnv() : port(0), content_length(0) {} +}; + +class RGWLoadGenIO : public RGWClientIO +{ + uint64_t left_to_read; + RGWLoadGenRequestEnv *req; +public: + void init_env(CephContext *cct); + + int write_data(const char *buf, int len); + int read_data(char *buf, int len); + + int send_status(const char *status, const char *status_name); + int send_100_continue(); + int complete_header(); + int complete_request(); + int send_content_length(uint64_t len); + + RGWLoadGenIO(RGWLoadGenRequestEnv *_re) : left_to_read(0), req(_re) {} + void flush(); +}; + + +#endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 5d7d64022098..f28272c62122 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -52,6 +52,7 @@ #include "rgw_log.h" #include "rgw_tools.h" #include "rgw_resolve.h" +#include "rgw_loadgen.h" #include "rgw_mongoose.h" #include "mongoose/mongoose.h" @@ -168,8 +169,6 @@ protected: RGWREST *rest; RGWFrontendConfig *conf; - RGWProcessEnv *process_env; - struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess *process; RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) @@ -321,6 +320,36 @@ void RGWFCGXProcess::run() m_tp.stop(); } +struct RGWLoadGenRequest : public RGWRequest { +}; + +class RGWLoadGenProcess : public RGWProcess { + int sock_fd; +public: + RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : + RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {} + void run(); + void handle_request(RGWRequest *req); + + void close_fd() { } +}; + +void RGWLoadGenProcess::run() +{ + m_tp.start(); /* start thread pool */ + + for (;;) { + RGWLoadGenRequest *req = new RGWLoadGenRequest; + req->id = ++max_req_id; + dout(10) << "allocated request req=" << hex << req << dec << dendl; + req_throttle.get(1); + req_wq.queue(req); + } + + m_tp.drain(); + m_tp.stop(); +} + static void signal_shutdown() { if (!disable_signal_fd.read()) { @@ -525,6 +554,29 @@ void RGWFCGXProcess::handle_request(RGWRequest *r) delete req; } +void RGWLoadGenProcess::handle_request(RGWRequest *r) +{ + RGWLoadGenRequest *req = static_cast(r); + + RGWLoadGenRequestEnv env; + + env.port = 80; + env.content_length = 0; + env.content_type = "binary/octet-stream"; + env.request_method = "GET"; + env.uri = "/foo/bar"; + + RGWLoadGenIO client_io(&env); + + int ret = process_request(store, rest, req, &client_io, olog); + if (ret < 0) { + /* we don't really care about return code */ + dout(20) << "process_request() returned " << ret << dendl; + } + + delete req; +} + static int mongoose_callback(struct mg_connection *conn) { struct mg_request_info *req_info = mg_get_request_info(conn); @@ -665,10 +717,10 @@ public: virtual void join() = 0; }; -class RGWFCGXControlThread : public Thread { +class RGWProcessControlThread : public Thread { RGWProcess *pprocess; public: - RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} + RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} void *entry() { pprocess->run(); @@ -676,19 +728,20 @@ public: }; }; -class RGWFCGXFrontend : public RGWFrontend { +template +class RGWProcessFrontend : public RGWFrontend { RGWFrontendConfig *conf; - RGWFCGXProcess *pprocess; + T *pprocess; RGWProcessEnv env; - RGWFCGXControlThread *thread; + RGWProcessControlThread *thread; public: - RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) { - pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); - thread = new RGWFCGXControlThread(pprocess); + RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) { + pprocess = new T(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); + thread = new RGWProcessControlThread(pprocess); } - ~RGWFCGXFrontend() { + ~RGWProcessFrontend() { delete thread; } @@ -918,7 +971,7 @@ int main(int argc, const char **argv) if (framework == "fastcgi" || framework == "fcgi") { RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 }; - fe = new RGWFCGXFrontend(fcgi_pe, config); + fe = new RGWProcessFrontend(fcgi_pe, config); } else if (framework == "mongoose") { string err; @@ -928,6 +981,13 @@ int main(int argc, const char **argv) RGWProcessEnv env = { store, &rest, olog, port }; fe = new RGWMongooseFrontend(env, config); + } else if (framework == "loadgen") { + int port; + config->get_val("port", 80, &port); + + RGWProcessEnv env = { store, &rest, olog, port }; + + fe = new RGWProcessFrontend(env, config); } else { dout(0) << "WARNING: skipping unknown framework: " << framework << dendl; continue;