]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: abstract RGWProcess
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 16 Dec 2013 21:18:41 +0000 (13:18 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 20 Dec 2013 21:50:39 +0000 (13:50 -0800)
RGWProcess is a generic request handler that sends request to the
workqueue for processing.
Add RGWFCGXProcess specialization. Later we'll add a new frontend that
will leverage that.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_main.cc

index 3fcbaf72c9c4829e6ea871998b454a6c6d45c6cc..5d7d64022098d6d4142d1fbf54a978dc0e9b9bb7 100644 (file)
@@ -159,53 +159,53 @@ struct RGWProcessEnv {
 };
 
 class RGWProcess {
+  deque<RGWRequest *> m_req_queue;
+protected:
   RGWRados *store;
   OpsLogSocket *olog;
-  deque<RGWFCGXRequest *> m_req_queue;
   ThreadPool m_tp;
   Throttle req_throttle;
   RGWREST *rest;
-  int sock_fd;
   RGWFrontendConfig *conf;
 
   RGWProcessEnv *process_env;
 
-  struct RGWWQ : public ThreadPool::WorkQueue<RGWFCGXRequest> {
+  struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
     RGWProcess *process;
     RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
-      : ThreadPool::WorkQueue<RGWFCGXRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
+      : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
 
-    bool _enqueue(RGWFCGXRequest *req) {
+    bool _enqueue(RGWRequest *req) {
       process->m_req_queue.push_back(req);
       perfcounter->inc(l_rgw_qlen);
       dout(20) << "enqueued request req=" << hex << req << dec << dendl;
       _dump_queue();
       return true;
     }
-    void _dequeue(RGWFCGXRequest *req) {
+    void _dequeue(RGWRequest *req) {
       assert(0);
     }
     bool _empty() {
       return process->m_req_queue.empty();
     }
-    RGWFCGXRequest *_dequeue() {
+    RGWRequest *_dequeue() {
       if (process->m_req_queue.empty())
        return NULL;
-      RGWFCGXRequest *req = process->m_req_queue.front();
+      RGWRequest *req = process->m_req_queue.front();
       process->m_req_queue.pop_front();
       dout(20) << "dequeued request req=" << hex << req << dec << dendl;
       _dump_queue();
       perfcounter->inc(l_rgw_qlen, -1);
       return req;
     }
-    void _process(RGWFCGXRequest *req) {
+    void _process(RGWRequest *req) {
       perfcounter->inc(l_rgw_qactive);
       process->handle_request(req);
       process->req_throttle.put(1);
       perfcounter->inc(l_rgw_qactive, -1);
     }
     void _dump_queue() {
-      deque<RGWFCGXRequest *>::iterator iter;
+      deque<RGWRequest *>::iterator iter;
       if (process->m_req_queue.empty()) {
         dout(20) << "RGWWQ: empty" << dendl;
         return;
@@ -226,13 +226,24 @@ public:
   RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
     : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
       req_throttle(cct, "rgw_ops", num_threads * 2),
-      rest(pe->rest), sock_fd(-1),
+      rest(pe->rest),
       conf(_conf),
       req_wq(this, g_conf->rgw_op_thread_timeout,
             g_conf->rgw_op_thread_suicide_timeout, &m_tp),
       max_req_id(0) {}
+  virtual ~RGWProcess() {}
+  virtual void run() = 0;
+  virtual void handle_request(RGWRequest *req) = 0;
+};
+
+
+class RGWFCGXProcess : public RGWProcess {
+  int sock_fd;
+public:
+  RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
+    RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
   void run();
-  void handle_request(RGWFCGXRequest *req);
+  void handle_request(RGWRequest *req);
 
   void close_fd() {
     if (sock_fd >= 0)
@@ -240,7 +251,7 @@ public:
   }
 };
 
-void RGWProcess::run()
+void RGWFCGXProcess::run()
 {
   sock_fd = 0;
 
@@ -496,8 +507,9 @@ done:
   return ret;
 }
 
-void RGWProcess::handle_request(RGWFCGXRequest *req)
+void RGWFCGXProcess::handle_request(RGWRequest *r)
 {
+  RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
   FCGX_Request *fcgx = &req->fcgx;
   RGWFCGX client_io(fcgx);
 
@@ -666,13 +678,13 @@ public:
 
 class RGWFCGXFrontend : public RGWFrontend {
   RGWFrontendConfig *conf;
-  RGWProcess *pprocess;
+  RGWFCGXProcess *pprocess;
   RGWProcessEnv env;
   RGWFCGXControlThread *thread;
 
 public:
   RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
-    pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+    pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
     thread = new RGWFCGXControlThread(pprocess);
   }