RGWOp *op;
utime_t ts;
- RGWRequest() : id(0), s(NULL), op(NULL) {
+ RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {
}
virtual ~RGWRequest() {}
struct RGWFCGXRequest : public RGWRequest {
FCGX_Request fcgx;
+
+ RGWFCGXRequest(uint64_t req_id) : RGWRequest(req_id) {}
+
};
struct RGWProcessEnv {
}
} req_wq;
- uint64_t max_req_id;
-
public:
RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
: store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
conf(_conf),
sock_fd(-1),
req_wq(this, g_conf->rgw_op_thread_timeout,
- g_conf->rgw_op_thread_suicide_timeout, &m_tp),
- max_req_id(0) {}
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
m_tp.start();
for (;;) {
- RGWFCGXRequest *req = new RGWFCGXRequest;
- req->id = ++max_req_id;
+ RGWFCGXRequest *req = new RGWFCGXRequest(store->get_new_req_id());
dout(10) << "allocated request req=" << hex << req << dec << dendl;
FCGX_InitRequest(&req->fcgx, sock_fd, 0);
req_throttle.get(1);
atomic_t *fail_flag;
- RGWLoadGenRequest(const string& _m, const string& _r, int _cl,
- atomic_t *ff) : method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
+ RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
+ atomic_t *ff) : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
};
class RGWLoadGenProcess : public RGWProcess {
void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
{
- RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length, fail_flag);
- req->id = ++max_req_id;
+ RGWLoadGenRequest *req = new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
+ content_length, fail_flag);
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
req_wq.queue(req);
RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
FCGX_Request *fcgx = &req->fcgx;
RGWFCGX client_io(fcgx);
-
+
int ret = process_request(store, rest, req, &client_io, olog);
if (ret < 0) {
RGWREST *rest = pe->rest;
OpsLogSocket *olog = pe->olog;
- RGWRequest *req = new RGWRequest;
+ RGWRequest *req = new RGWRequest(store->get_new_req_id());
RGWMongoose client_io(conn, pe->port);
client_io.init(g_ceph_context);
GetObjState() : sent_data(false) {}
};
+ atomic64_t max_req_id;
Mutex lock;
SafeTimer *timer;
RGWQuotaHandler *quota_handler;
public:
- RGWRados() : lock("rados_timer_lock"), timer(NULL),
+ RGWRados() : max_req_id(0), lock("rados_timer_lock"), timer(NULL),
gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL), watch_handles(NULL),
watch_initialized(false),
rest_master_conn(NULL),
meta_mgr(NULL), data_log(NULL) {}
+ uint64_t get_new_req_id() {
+ return max_req_id.inc();
+ }
+
void set_context(CephContext *_cct) {
cct = _cct;
}