};
class RGWProcess {
+ deque<RGWRequest *> m_req_queue;
+protected:
RGWRados *store;
OpsLogSocket *olog;
- deque<RGWFCGXRequest *> m_req_queue;
ThreadPool m_tp;
Throttle req_throttle;
RGWREST *rest;
- int sock_fd;
RGWFrontendConfig *conf;
RGWProcessEnv *process_env;
- struct RGWWQ : public ThreadPool::WorkQueue<RGWFCGXRequest> {
+ struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess *process;
RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
- : ThreadPool::WorkQueue<RGWFCGXRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
+ : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
- bool _enqueue(RGWFCGXRequest *req) {
+ bool _enqueue(RGWRequest *req) {
process->m_req_queue.push_back(req);
perfcounter->inc(l_rgw_qlen);
dout(20) << "enqueued request req=" << hex << req << dec << dendl;
_dump_queue();
return true;
}
- void _dequeue(RGWFCGXRequest *req) {
+ void _dequeue(RGWRequest *req) {
assert(0);
}
bool _empty() {
return process->m_req_queue.empty();
}
- RGWFCGXRequest *_dequeue() {
+ RGWRequest *_dequeue() {
if (process->m_req_queue.empty())
return NULL;
- RGWFCGXRequest *req = process->m_req_queue.front();
+ RGWRequest *req = process->m_req_queue.front();
process->m_req_queue.pop_front();
dout(20) << "dequeued request req=" << hex << req << dec << dendl;
_dump_queue();
perfcounter->inc(l_rgw_qlen, -1);
return req;
}
- void _process(RGWFCGXRequest *req) {
+ void _process(RGWRequest *req) {
perfcounter->inc(l_rgw_qactive);
process->handle_request(req);
process->req_throttle.put(1);
perfcounter->inc(l_rgw_qactive, -1);
}
void _dump_queue() {
- deque<RGWFCGXRequest *>::iterator iter;
+ deque<RGWRequest *>::iterator iter;
if (process->m_req_queue.empty()) {
dout(20) << "RGWWQ: empty" << dendl;
return;
RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
: store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
req_throttle(cct, "rgw_ops", num_threads * 2),
- rest(pe->rest), sock_fd(-1),
+ rest(pe->rest),
conf(_conf),
req_wq(this, g_conf->rgw_op_thread_timeout,
g_conf->rgw_op_thread_suicide_timeout, &m_tp),
max_req_id(0) {}
+ virtual ~RGWProcess() {}
+ virtual void run() = 0;
+ virtual void handle_request(RGWRequest *req) = 0;
+};
+
+
+class RGWFCGXProcess : public RGWProcess {
+ int sock_fd;
+public:
+ RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
+ RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
void run();
- void handle_request(RGWFCGXRequest *req);
+ void handle_request(RGWRequest *req);
void close_fd() {
if (sock_fd >= 0)
}
};
-void RGWProcess::run()
+void RGWFCGXProcess::run()
{
sock_fd = 0;
return ret;
}
-void RGWProcess::handle_request(RGWFCGXRequest *req)
+void RGWFCGXProcess::handle_request(RGWRequest *r)
{
+ RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
FCGX_Request *fcgx = &req->fcgx;
RGWFCGX client_io(fcgx);
class RGWFCGXFrontend : public RGWFrontend {
RGWFrontendConfig *conf;
- RGWProcess *pprocess;
+ RGWFCGXProcess *pprocess;
RGWProcessEnv env;
RGWFCGXControlThread *thread;
public:
RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), env(pe) {
- pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
+ pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
thread = new RGWFCGXControlThread(pprocess);
}