From: Yehuda Sadeh Date: Mon, 16 Dec 2013 21:18:41 +0000 (-0800) Subject: rgw: abstract RGWProcess X-Git-Tag: v0.78~307^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cf53d8a672d05311f86ad9b5a9f6be6d2f15b962;p=ceph.git rgw: abstract RGWProcess RGWProcess is a generic request handler that sends request to the workqueue for processing. Add RGWFCGXProcess specialization. Later we'll add a new frontend that will leverage that. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 3fcbaf72c9c4..5d7d64022098 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -159,53 +159,53 @@ struct RGWProcessEnv { }; class RGWProcess { + deque m_req_queue; +protected: RGWRados *store; OpsLogSocket *olog; - deque m_req_queue; ThreadPool m_tp; Throttle req_throttle; RGWREST *rest; - int sock_fd; RGWFrontendConfig *conf; RGWProcessEnv *process_env; - struct RGWWQ : public ThreadPool::WorkQueue { + struct RGWWQ : public ThreadPool::WorkQueue { RGWProcess *process; RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), process(p) {} - bool _enqueue(RGWFCGXRequest *req) { + bool _enqueue(RGWRequest *req) { process->m_req_queue.push_back(req); perfcounter->inc(l_rgw_qlen); dout(20) << "enqueued request req=" << hex << req << dec << dendl; _dump_queue(); return true; } - void _dequeue(RGWFCGXRequest *req) { + void _dequeue(RGWRequest *req) { assert(0); } bool _empty() { return process->m_req_queue.empty(); } - RGWFCGXRequest *_dequeue() { + RGWRequest *_dequeue() { if (process->m_req_queue.empty()) return NULL; - RGWFCGXRequest *req = process->m_req_queue.front(); + RGWRequest *req = process->m_req_queue.front(); process->m_req_queue.pop_front(); dout(20) << "dequeued request req=" << hex << req << dec << dendl; _dump_queue(); perfcounter->inc(l_rgw_qlen, -1); return req; } - void _process(RGWFCGXRequest *req) { + void _process(RGWRequest *req) { perfcounter->inc(l_rgw_qactive); process->handle_request(req); process->req_throttle.put(1); perfcounter->inc(l_rgw_qactive, -1); } void _dump_queue() { - deque::iterator iter; + deque::iterator iter; if (process->m_req_queue.empty()) { dout(20) << "RGWWQ: empty" << dendl; return; @@ -226,13 +226,24 @@ public: RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads), req_throttle(cct, "rgw_ops", num_threads * 2), - rest(pe->rest), sock_fd(-1), + rest(pe->rest), conf(_conf), req_wq(this, g_conf->rgw_op_thread_timeout, g_conf->rgw_op_thread_suicide_timeout, &m_tp), max_req_id(0) {} + virtual ~RGWProcess() {} + virtual void run() = 0; + virtual void handle_request(RGWRequest *req) = 0; +}; + + +class RGWFCGXProcess : public RGWProcess { + int sock_fd; +public: + RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) : + RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {} void run(); - void handle_request(RGWFCGXRequest *req); + void handle_request(RGWRequest *req); void close_fd() { if (sock_fd >= 0) @@ -240,7 +251,7 @@ public: } }; -void RGWProcess::run() +void RGWFCGXProcess::run() { sock_fd = 0; @@ -496,8 +507,9 @@ done: return ret; } -void RGWProcess::handle_request(RGWFCGXRequest *req) +void RGWFCGXProcess::handle_request(RGWRequest *r) { + RGWFCGXRequest *req = static_cast(r); FCGX_Request *fcgx = &req->fcgx; RGWFCGX client_io(fcgx); @@ -666,13 +678,13 @@ public: class RGWFCGXFrontend : public RGWFrontend { RGWFrontendConfig *conf; - RGWProcess *pprocess; + RGWFCGXProcess *pprocess; RGWProcessEnv env; RGWFCGXControlThread *thread; public: RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) { - pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); + pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf); thread = new RGWFCGXControlThread(pprocess); }