RGWOp *op;
utime_t ts;
- RGWRequest() : id(0), s(NULL), op(NULL) {
+ RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {
}
virtual ~RGWRequest() {}
FCGX_Request *fcgx;
QueueRing<FCGX_Request *> *qr;
- RGWFCGXRequest(QueueRing<FCGX_Request *> *_qr) : qr(_qr) {
+ RGWFCGXRequest(uint64_t req_id, QueueRing<FCGX_Request *> *_qr) : RGWRequest(req_id), qr(_qr) {
qr->dequeue(&fcgx);
}
}
} req_wq;
- uint64_t max_req_id;
-
public:
RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
: store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", num_threads),
conf(_conf),
sock_fd(-1),
req_wq(this, g_conf->rgw_op_thread_timeout,
- g_conf->rgw_op_thread_suicide_timeout, &m_tp),
- max_req_id(0) {}
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
}
for (;;) {
- RGWFCGXRequest *req = new RGWFCGXRequest(&qr);
- req->id = ++max_req_id;
+ RGWFCGXRequest *req = new RGWFCGXRequest(store->get_new_req_id(), &qr);
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
int ret = FCGX_Accept_r(req->fcgx);
atomic_t *fail_flag;
- RGWLoadGenRequest(const string& _m, const string& _r, int _cl,
- atomic_t *ff) : method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
+ RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
+ atomic_t *ff) : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
};
class RGWLoadGenProcess : public RGWProcess {
void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
{
- RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length, fail_flag);
- req->id = ++max_req_id;
+ RGWLoadGenRequest *req = new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
+ content_length, fail_flag);
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
req_wq.queue(req);
RGWREST *rest = pe->rest;
OpsLogSocket *olog = pe->olog;
- RGWRequest *req = new RGWRequest;
+ RGWRequest *req = new RGWRequest(store->get_new_req_id());
RGWMongoose client_io(conn, pe->port);
client_io.init(g_ceph_context);
void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
+ atomic64_t max_req_id;
Mutex lock;
Mutex watchers_lock;
SafeTimer *timer;
Finisher *finisher;
public:
- RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
+ RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL),
watch_initialized(false),
rest_master_conn(NULL),
meta_mgr(NULL), data_log(NULL) {}
+ uint64_t get_new_req_id() {
+ return max_req_id.inc();
+ }
+
void set_context(CephContext *_cct) {
cct = _cct;
}