#include "global/global_init.h"
#include "common/config.h"
#include "common/errno.h"
-#include "common/Thread.h"
+#include "common/WorkQueue.h"
#include "rgw_common.h"
#include "rgw_access.h"
#include "rgw_acl.h"
static sighandler_t sighandler_usr1;
static sighandler_t sighandler_alrm;
+
+#define SOCKET_NAME "/tmp/.radosgw.sock"
+#define SOCKET_BACKLOG 20
+
static void godown_handler(int signum)
{
FCGX_ShutdownPending();
_exit(0);
}
-class RGWThread : public Thread {
- FCGX_Request *fcgx;
-public:
- RGWThread(FCGX_Request *_f) : fcgx(_f) {}
- ~RGWThread() { delete fcgx; }
- void *entry();
-};
-
class RGWProcess {
- vector<RGWThread *> m_threads;
+ deque<FCGX_Request*> m_fcgx_queue;
+ ThreadPool m_tp;
+
+ struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
+ RGWProcess *process;
+ RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", tp), process(p) {}
+
+ bool _enqueue(FCGX_Request *req) {
+ process->m_fcgx_queue.push_back(req);
+ return true;
+ }
+ void _dequeue(FCGX_Request *req) {
+ assert(0);
+ }
+ bool _empty() {
+ return process->m_fcgx_queue.empty();
+ }
+ FCGX_Request *_dequeue() {
+ if (process->m_fcgx_queue.empty())
+ return NULL;
+ FCGX_Request *req = process->m_fcgx_queue.front();
+ process->m_fcgx_queue.pop_front();
+ return req;
+ }
+ void _process(FCGX_Request *fcgx) {
+ process->handle_request(fcgx);
+ }
+ void _clear() {
+ assert(process->m_fcgx_queue.empty());
+ }
+ } req_wq;
public:
- RGWProcess() {}
- void start(int num_threads);
- void join();
+ RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {}
+ void run();
+ void handle_request(FCGX_Request *fcgx);
};
-void RGWProcess::start(int num_threads)
+void RGWProcess::run()
{
-#if 0
- string sock = "/tmp/.radosgw.sock";
- int s = FCGX_OpenSocket(sock.c_str(), 100);
+ const char *sock = SOCKET_NAME;
+ int s = FCGX_OpenSocket(sock, SOCKET_BACKLOG);
if (s < 0) {
RGW_LOG(0) << "ERROR: FCGX_OpenSocket (" << sock << ") returned " << s << dendl;
return;
}
-#endif
+ if (chmod(sock, 0777) < 0) {
+ RGW_LOG(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl;
+ }
+
+ m_tp.start();
for (;;) {
FCGX_Request *fcgx = new FCGX_Request;
-// FCGX_InitRequest(fcgx, s, 0);
- FCGX_InitRequest(fcgx, 0, 0);
+ FCGX_InitRequest(fcgx, s, 0);
int ret = FCGX_Accept_r(fcgx);
if (ret < 0)
return;
- RGWThread *thread = new RGWThread(fcgx);
- m_threads.push_back(thread);
- thread->create();
+ req_wq.queue(fcgx);
}
}
-void RGWProcess::join()
-{
- vector<RGWThread *>::iterator iter;
- for (iter = m_threads.begin(); iter != m_threads.end(); ++iter) {
- RGWThread *thr = *iter;
- int ret = thr->join();
- delete thr;
- if (ret < 0) {
- cerr << "WARNING: thread join returned " << ret << std::endl;
- }
- }
-}
-
-void *RGWThread::entry()
+void RGWProcess::handle_request(FCGX_Request *fcgx)
{
RGWRESTMgr rest;
+ int ret;
- RGW_LOG(0) << "thread started thread_id=" << hex << (uint64_t)get_thread_id() << dec << dendl;
+ rgw_env.reinit(fcgx->envp);
- int ret;
+ struct req_state *s = new req_state;
- {
- rgw_env.reinit(fcgx->envp);
+ RGWOp *op = NULL;
+ int init_error = 0;
+ RGWHandler *handler = rest.get_handler(s, fcgx, &init_error);
- struct req_state *s = new req_state;
+ if (init_error != 0) {
+ abort_early(s, init_error);
+ goto done;
+ }
- RGWOp *op = NULL;
- int init_error = 0;
- RGWHandler *handler = rest.get_handler(s, fcgx, &init_error);
-
- if (init_error != 0) {
- abort_early(s, init_error);
- goto done;
- }
+ if (!handler->authorize()) {
+ RGW_LOG(10) << "failed to authorize request" << dendl;
+ abort_early(s, -EPERM);
+ goto done;
+ }
+ if (s->user.suspended) {
+ RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl;
+ abort_early(s, -ERR_USER_SUSPENDED);
+ goto done;
+ }
+ ret = handler->read_permissions();
+ if (ret < 0) {
+ abort_early(s, ret);
+ goto done;
+ }
- if (!handler->authorize()) {
- RGW_LOG(10) << "failed to authorize request" << dendl;
- abort_early(s, -EPERM);
- goto done;
- }
- if (s->user.suspended) {
- RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl;
- abort_early(s, -ERR_USER_SUSPENDED);
- goto done;
- }
- ret = handler->read_permissions();
+ op = handler->get_op();
+ if (op) {
+ ret = op->verify_permission();
if (ret < 0) {
abort_early(s, ret);
goto done;
}
- op = handler->get_op();
- if (op) {
- ret = op->verify_permission();
- if (ret < 0) {
- abort_early(s, ret);
- goto done;
- }
+ if (s->expect_cont)
+ dump_continue(s);
- if (s->expect_cont)
- dump_continue(s);
-
- op->execute();
- } else {
- abort_early(s, -ERR_METHOD_NOT_ALLOWED);
- }
-done:
- rgw_log_op(s);
-
- handler->put_op(op);
- delete s;
- FCGX_Finish_r(fcgx);
+ op->execute();
+ } else {
+ abort_early(s, -ERR_METHOD_NOT_ALLOWED);
}
+done:
+ rgw_log_op(s);
- return NULL;
+ handler->put_op(op);
+ delete s;
+ FCGX_Finish_r(fcgx);
+ delete fcgx;
}
/*
return EIO;
}
- RGWProcess process;
- process.start(20);
- process.join();
+ RGWProcess process(g_ceph_context, 20);
+
+ process.run();
return 0;
}