From 51039da1a6ff2fccc1f91f1c01447719c0054c03 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 18 Jul 2011 17:14:25 -0700 Subject: [PATCH] rgw: use thread pool for rgw work --- src/rgw/rgw_main.cc | 177 +++++++++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 85 deletions(-) diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index c9ff45e3a3521..059685e23d4f1 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -16,7 +16,7 @@ #include "global/global_init.h" #include "common/config.h" #include "common/errno.h" -#include "common/Thread.h" +#include "common/WorkQueue.h" #include "rgw_common.h" #include "rgw_access.h" #include "rgw_acl.h" @@ -40,6 +40,10 @@ using namespace std; static sighandler_t sighandler_usr1; static sighandler_t sighandler_alrm; + +#define SOCKET_NAME "/tmp/.radosgw.sock" +#define SOCKET_BACKLOG 20 + static void godown_handler(int signum) { FCGX_ShutdownPending(); @@ -52,123 +56,126 @@ static void godown_alarm(int signum) _exit(0); } -class RGWThread : public Thread { - FCGX_Request *fcgx; -public: - RGWThread(FCGX_Request *_f) : fcgx(_f) {} - ~RGWThread() { delete fcgx; } - void *entry(); -}; - class RGWProcess { - vector m_threads; + deque m_fcgx_queue; + ThreadPool m_tp; + + struct RGWWQ : public ThreadPool::WorkQueue { + RGWProcess *process; + RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue("RGWWQ", tp), process(p) {} + + bool _enqueue(FCGX_Request *req) { + process->m_fcgx_queue.push_back(req); + return true; + } + void _dequeue(FCGX_Request *req) { + assert(0); + } + bool _empty() { + return process->m_fcgx_queue.empty(); + } + FCGX_Request *_dequeue() { + if (process->m_fcgx_queue.empty()) + return NULL; + FCGX_Request *req = process->m_fcgx_queue.front(); + process->m_fcgx_queue.pop_front(); + return req; + } + void _process(FCGX_Request *fcgx) { + process->handle_request(fcgx); + } + void _clear() { + assert(process->m_fcgx_queue.empty()); + } + } req_wq; public: - RGWProcess() {} - void start(int num_threads); - void join(); + RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {} + void run(); + void handle_request(FCGX_Request *fcgx); }; -void RGWProcess::start(int num_threads) +void RGWProcess::run() { -#if 0 - string sock = "/tmp/.radosgw.sock"; - int s = FCGX_OpenSocket(sock.c_str(), 100); + const char *sock = SOCKET_NAME; + int s = FCGX_OpenSocket(sock, SOCKET_BACKLOG); if (s < 0) { RGW_LOG(0) << "ERROR: FCGX_OpenSocket (" << sock << ") returned " << s << dendl; return; } -#endif + if (chmod(sock, 0777) < 0) { + RGW_LOG(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl; + } + + m_tp.start(); for (;;) { FCGX_Request *fcgx = new FCGX_Request; -// FCGX_InitRequest(fcgx, s, 0); - FCGX_InitRequest(fcgx, 0, 0); + FCGX_InitRequest(fcgx, s, 0); int ret = FCGX_Accept_r(fcgx); if (ret < 0) return; - RGWThread *thread = new RGWThread(fcgx); - m_threads.push_back(thread); - thread->create(); + req_wq.queue(fcgx); } } -void RGWProcess::join() -{ - vector::iterator iter; - for (iter = m_threads.begin(); iter != m_threads.end(); ++iter) { - RGWThread *thr = *iter; - int ret = thr->join(); - delete thr; - if (ret < 0) { - cerr << "WARNING: thread join returned " << ret << std::endl; - } - } -} - -void *RGWThread::entry() +void RGWProcess::handle_request(FCGX_Request *fcgx) { RGWRESTMgr rest; + int ret; - RGW_LOG(0) << "thread started thread_id=" << hex << (uint64_t)get_thread_id() << dec << dendl; + rgw_env.reinit(fcgx->envp); - int ret; + struct req_state *s = new req_state; - { - rgw_env.reinit(fcgx->envp); + RGWOp *op = NULL; + int init_error = 0; + RGWHandler *handler = rest.get_handler(s, fcgx, &init_error); - struct req_state *s = new req_state; + if (init_error != 0) { + abort_early(s, init_error); + goto done; + } - RGWOp *op = NULL; - int init_error = 0; - RGWHandler *handler = rest.get_handler(s, fcgx, &init_error); - - if (init_error != 0) { - abort_early(s, init_error); - goto done; - } + if (!handler->authorize()) { + RGW_LOG(10) << "failed to authorize request" << dendl; + abort_early(s, -EPERM); + goto done; + } + if (s->user.suspended) { + RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl; + abort_early(s, -ERR_USER_SUSPENDED); + goto done; + } + ret = handler->read_permissions(); + if (ret < 0) { + abort_early(s, ret); + goto done; + } - if (!handler->authorize()) { - RGW_LOG(10) << "failed to authorize request" << dendl; - abort_early(s, -EPERM); - goto done; - } - if (s->user.suspended) { - RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl; - abort_early(s, -ERR_USER_SUSPENDED); - goto done; - } - ret = handler->read_permissions(); + op = handler->get_op(); + if (op) { + ret = op->verify_permission(); if (ret < 0) { abort_early(s, ret); goto done; } - op = handler->get_op(); - if (op) { - ret = op->verify_permission(); - if (ret < 0) { - abort_early(s, ret); - goto done; - } + if (s->expect_cont) + dump_continue(s); - if (s->expect_cont) - dump_continue(s); - - op->execute(); - } else { - abort_early(s, -ERR_METHOD_NOT_ALLOWED); - } -done: - rgw_log_op(s); - - handler->put_op(op); - delete s; - FCGX_Finish_r(fcgx); + op->execute(); + } else { + abort_early(s, -ERR_METHOD_NOT_ALLOWED); } +done: + rgw_log_op(s); - return NULL; + handler->put_op(op); + delete s; + FCGX_Finish_r(fcgx); + delete fcgx; } /* @@ -206,9 +213,9 @@ int main(int argc, const char **argv) return EIO; } - RGWProcess process; - process.start(20); - process.join(); + RGWProcess process(g_ceph_context, 20); + + process.run(); return 0; } -- 2.39.5