static int signal_fd[2] = {0, 0};
static atomic_t disable_signal_fd;
+static void signal_shutdown();
+
#define SOCKET_BACKLOG 1024
Throttle req_throttle;
RGWREST *rest;
RGWFrontendConfig *conf;
+ int sock_fd;
struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
RGWProcess *process;
req_throttle(cct, "rgw_ops", num_threads * 2),
rest(pe->rest),
conf(_conf),
+ sock_fd(-1),
req_wq(this, g_conf->rgw_op_thread_timeout,
g_conf->rgw_op_thread_suicide_timeout, &m_tp),
max_req_id(0) {}
virtual ~RGWProcess() {}
virtual void run() = 0;
virtual void handle_request(RGWRequest *req) = 0;
- virtual void close_fd() {}
+
+ void close_fd() {
+ if (sock_fd >= 0) {
+ ::close(sock_fd);
+ sock_fd = -1;
+ }
+ }
};
class RGWFCGXProcess : public RGWProcess {
- int sock_fd;
public:
RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
- RGWProcess(cct, pe, num_threads, _conf), sock_fd(-1) {}
+ RGWProcess(cct, pe, num_threads, _conf) {}
void run();
void handle_request(RGWRequest *req);
-
- void close_fd() {
- if (sock_fd >= 0)
- close(sock_fd);
- }
};
void RGWFCGXProcess::run()
{
- sock_fd = 0;
-
string socket_path;
string socket_port;
string socket_host;
string method;
string resource;
int content_length;
+ atomic_t *fail_flag;
- RGWLoadGenRequest(const string& _m, const string& _r, int _cl) : method(_m), resource(_r), content_length(_cl) {}
+ RGWLoadGenRequest(const string& _m, const string& _r, int _cl,
+ atomic_t *ff) : method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
};
class RGWLoadGenProcess : public RGWProcess {
void run();
void checkpoint();
void handle_request(RGWRequest *req);
- void gen_request(const string& method, const string& resource, int content_length);
-
- void close_fd() { }
+ void gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag);
void set_access_key(RGWAccessKey& key) { access_key = key; }
};
string bucket_resource = string("/") + bucket;
+ atomic_t failed;
+ string *objs = new string[num_objs];
+
/* first create a bucket */
- gen_request("PUT", bucket_resource, 0);
+ gen_request("PUT", bucket_resource, 0, &failed);
checkpoint();
- string *objs = new string[num_objs];
+ if (failed.read()) {
+ derr << "ERROR: bucket creation failed" << dendl;
+ goto done;
+ }
for (i = 0; i < num_objs; i++) {
char buf[32];
}
for (i = 0; i < num_objs; i++) {
- gen_request("PUT", objs[i], 4096);
+ gen_request("PUT", objs[i], 4096, &failed);
+ }
+
+ checkpoint();
+
+ if (failed.read()) {
+ derr << "ERROR: bucket creation failed" << dendl;
+ goto done;
}
-#if 0
- for (i = 0; i < 10; i++) {
- gen_request("GET", 4096);
+
+ for (i = 0; i < num_objs; i++) {
+ gen_request("GET", objs[i], 4096, NULL);
}
- for (i = 0; i < 10; i++) {
- gen_request("DELETE", 4096);
+
+ checkpoint();
+
+ for (i = 0; i < num_objs; i++) {
+ gen_request("DELETE", objs[i], 0, NULL);
}
-#endif
- m_tp.drain();
+
+ gen_request("DELETE", bucket_resource, 0, NULL);
+
+ checkpoint();
m_tp.stop();
+done:
delete[] objs;
+
+ signal_shutdown();
}
-void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length)
+void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
{
- RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length);
+ RGWLoadGenRequest *req = new RGWLoadGenRequest(method, resource, content_length, fail_flag);
req->id = ++max_req_id;
dout(10) << "allocated request req=" << hex << req << dec << dendl;
req_throttle.get(1);
dout(1) << "====== req done req=" << hex << req << dec << " http_status=" << http_ret << " ======" << dendl;
- return ret;
+ return (ret < 0 ? ret : s->err.ret);
}
void RGWFCGXProcess::handle_request(RGWRequest *r)
if (ret < 0) {
/* we don't really care about return code */
dout(20) << "process_request() returned " << ret << dendl;
+
+ if (req->fail_flag) {
+ req->fail_flag->inc();
+ }
}
delete req;