]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use thread pool for rgw work
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 19 Jul 2011 00:14:25 +0000 (17:14 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 19 Jul 2011 00:14:25 +0000 (17:14 -0700)
src/rgw/rgw_main.cc

index c9ff45e3a35211e07c30a3669ce71b79b57714bb..059685e23d4f11e1b2ba8daaf456ffea336b750b 100644 (file)
@@ -16,7 +16,7 @@
 #include "global/global_init.h"
 #include "common/config.h"
 #include "common/errno.h"
-#include "common/Thread.h"
+#include "common/WorkQueue.h"
 #include "rgw_common.h"
 #include "rgw_access.h"
 #include "rgw_acl.h"
@@ -40,6 +40,10 @@ using namespace std;
 static sighandler_t sighandler_usr1;
 static sighandler_t sighandler_alrm;
 
+
+#define SOCKET_NAME "/tmp/.radosgw.sock"
+#define SOCKET_BACKLOG 20
+
 static void godown_handler(int signum)
 {
   FCGX_ShutdownPending();
@@ -52,123 +56,126 @@ static void godown_alarm(int signum)
   _exit(0);
 }
 
-class RGWThread : public Thread {
-  FCGX_Request *fcgx;
-public:
-  RGWThread(FCGX_Request *_f) : fcgx(_f) {}
-  ~RGWThread() { delete fcgx; }
-  void *entry();
-};
-
 class RGWProcess {
-  vector<RGWThread *> m_threads;
+  deque<FCGX_Request*> m_fcgx_queue;
+  ThreadPool m_tp;
+
+  struct RGWWQ : public ThreadPool::WorkQueue<FCGX_Request> {
+    RGWProcess *process;
+    RGWWQ(RGWProcess *p, ThreadPool *tp) : ThreadPool::WorkQueue<FCGX_Request>("RGWWQ", tp), process(p) {}
+
+    bool _enqueue(FCGX_Request *req) {
+      process->m_fcgx_queue.push_back(req);
+      return true;
+    }
+    void _dequeue(FCGX_Request *req) {
+      assert(0);
+    }
+    bool _empty() {
+      return process->m_fcgx_queue.empty();
+    }
+    FCGX_Request *_dequeue() {
+      if (process->m_fcgx_queue.empty())
+       return NULL;
+      FCGX_Request *req = process->m_fcgx_queue.front();
+      process->m_fcgx_queue.pop_front();
+      return req;
+    }
+    void _process(FCGX_Request *fcgx) {
+      process->handle_request(fcgx);
+    }
+    void _clear() {
+      assert(process->m_fcgx_queue.empty());
+    }
+  } req_wq;
 
 public:
-  RGWProcess() {}
-  void start(int num_threads);
-  void join();
+  RGWProcess(CephContext *cct, int num_threads) : m_tp(cct, "RGWProcess::m_tp", num_threads), req_wq(this, &m_tp) {}
+  void run();
+  void handle_request(FCGX_Request *fcgx);
 };
 
-void RGWProcess::start(int num_threads)
+void RGWProcess::run()
 {
-#if 0
-  string sock = "/tmp/.radosgw.sock";
-  int s = FCGX_OpenSocket(sock.c_str(), 100);
+  const char *sock = SOCKET_NAME;
+  int s = FCGX_OpenSocket(sock, SOCKET_BACKLOG);
   if (s < 0) {
     RGW_LOG(0) << "ERROR: FCGX_OpenSocket (" << sock << ") returned " << s << dendl;
     return;
   }
-#endif
+  if (chmod(sock, 0777) < 0) {
+    RGW_LOG(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl;
+  }
+
+  m_tp.start();
 
   for (;;) {
     FCGX_Request *fcgx = new FCGX_Request;
-//    FCGX_InitRequest(fcgx, s, 0);
-    FCGX_InitRequest(fcgx, 0, 0);
+    FCGX_InitRequest(fcgx, s, 0);
     int ret = FCGX_Accept_r(fcgx);
     if (ret < 0)
       return;
 
-    RGWThread *thread = new RGWThread(fcgx);
-    m_threads.push_back(thread);
-    thread->create();
+    req_wq.queue(fcgx);
   }
 }
 
-void RGWProcess::join()
-{
-  vector<RGWThread *>::iterator iter;
-  for (iter = m_threads.begin(); iter != m_threads.end(); ++iter) {
-    RGWThread *thr = *iter;
-    int ret = thr->join();
-    delete thr;
-    if (ret < 0) {
-      cerr << "WARNING: thread join returned " << ret << std::endl;
-    }
-  }
-}
-
-void *RGWThread::entry()
+void RGWProcess::handle_request(FCGX_Request *fcgx)
 {
   RGWRESTMgr rest;
+  int ret;
 
-  RGW_LOG(0) << "thread started thread_id=" << hex << (uint64_t)get_thread_id() << dec << dendl;
+  rgw_env.reinit(fcgx->envp);
 
-  int ret;
+  struct req_state *s = new req_state;
 
-  {
-    rgw_env.reinit(fcgx->envp);
+  RGWOp *op = NULL;
+  int init_error = 0;
+  RGWHandler *handler = rest.get_handler(s, fcgx, &init_error);
 
-    struct req_state *s = new req_state;
+  if (init_error != 0) {
+    abort_early(s, init_error);
+    goto done;
+  }
 
-    RGWOp *op = NULL;
-    int init_error = 0;
-    RGWHandler *handler = rest.get_handler(s, fcgx, &init_error);
-    
-    if (init_error != 0) {
-      abort_early(s, init_error);
-      goto done;
-    }
+  if (!handler->authorize()) {
+    RGW_LOG(10) << "failed to authorize request" << dendl;
+    abort_early(s, -EPERM);
+    goto done;
+  }
+  if (s->user.suspended) {
+    RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl;
+    abort_early(s, -ERR_USER_SUSPENDED);
+    goto done;
+  }
+  ret = handler->read_permissions();
+  if (ret < 0) {
+    abort_early(s, ret);
+    goto done;
+  }
 
-    if (!handler->authorize()) {
-      RGW_LOG(10) << "failed to authorize request" << dendl;
-      abort_early(s, -EPERM);
-      goto done;
-    }
-    if (s->user.suspended) {
-      RGW_LOG(10) << "user is suspended, uid=" << s->user.user_id << dendl;
-      abort_early(s, -ERR_USER_SUSPENDED);
-      goto done;
-    }
-    ret = handler->read_permissions();
+  op = handler->get_op();
+  if (op) {
+    ret = op->verify_permission();
     if (ret < 0) {
       abort_early(s, ret);
       goto done;
     }
 
-    op = handler->get_op();
-    if (op) {
-      ret = op->verify_permission();
-      if (ret < 0) {
-        abort_early(s, ret);
-        goto done;
-      }
+    if (s->expect_cont)
+      dump_continue(s);
 
-      if (s->expect_cont)
-        dump_continue(s);
-
-      op->execute();
-    } else {
-      abort_early(s, -ERR_METHOD_NOT_ALLOWED);
-    }
-done:
-    rgw_log_op(s);
-
-    handler->put_op(op);
-    delete s;
-    FCGX_Finish_r(fcgx);
+    op->execute();
+  } else {
+    abort_early(s, -ERR_METHOD_NOT_ALLOWED);
   }
+done:
+  rgw_log_op(s);
 
-  return NULL;
+  handler->put_op(op);
+  delete s;
+  FCGX_Finish_r(fcgx);
+  delete fcgx;
 }
 
 /*
@@ -206,9 +213,9 @@ int main(int argc, const char **argv)
     return EIO;
   }
 
-  RGWProcess process;
-  process.start(20);
-  process.join();
+  RGWProcess process(g_ceph_context, 20);
+
+  process.run();
 
   return 0;
 }