rgw/rgw_keystone.cc
rgw/rgw_quota.cc
rgw/rgw_dencoder.cc
+ rgw/rgw_dencoder.cc
+ rgw/rgw_request.cc
+ rgw/rgw_process.cc
+ rgw/rgw_frontend.cc
rgw/rgw_object_expirer_core.cc
rgw/rgw_website.cc
rgw/rgw_xml_enc.cc)
rgw/rgw_keystone.cc \
rgw/rgw_quota.cc \
rgw/rgw_dencoder.cc \
+ rgw/rgw_request.cc \
+ rgw/rgw_process.cc \
+ rgw/rgw_frontend.cc \
rgw/rgw_object_expirer_core.cc \
rgw/rgw_website.cc
+
librgw_la_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
noinst_LTLIBRARIES += librgw.la
rgw/rgw_http_errors.h \
rgw/rgw_log.h \
rgw/rgw_loadgen.h \
+ rgw/rgw_process.h \
+ rgw/rgw_request.h \
+ rgw/rgw_frontend.h \
rgw/rgw_multi.h \
rgw/rgw_policy_s3.h \
rgw/rgw_gc.h \
#ifndef CEPH_RGW_FCGI_H
#define CEPH_RGW_FCGI_H
-#include "rgw_client_io.h"
+#ifdef FASTCGI_INCLUDE_DIR
+# include "fastcgi/fcgiapp.h"
+#else
+# include "fcgiapp.h"
+#endif
+#include "rgw_client_io.h"
struct FCGX_Request;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_frontend.h"
+
+#include "include/str_list.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static int civetweb_callback(struct mg_connection *conn) {
+ struct mg_request_info *req_info = mg_get_request_info(conn);
+ RGWProcessEnv *pe = static_cast<RGWProcessEnv *>(req_info->user_data);
+ RGWRados *store = pe->store;
+ RGWREST *rest = pe->rest;
+ OpsLogSocket *olog = pe->olog;
+
+ RGWRequest *req = new RGWRequest(store->get_new_req_id());
+ RGWMongoose client_io(conn, pe->port);
+
+ int ret = process_request(store, rest, req, &client_io, olog);
+ if (ret < 0) {
+ /* we don't really care about return code */
+ dout(20) << "process_request() returned " << ret << dendl;
+ }
+
+ delete req;
+
+// Mark as processed
+ return 1;
+}
+
+int RGWFrontendConfig::parse_config(const string& config,
+ map<string, string>& config_map)
+{
+ list<string> config_list;
+ get_str_list(config, " ", config_list);
+
+ list<string>::iterator iter;
+ for (iter = config_list.begin(); iter != config_list.end(); ++iter) {
+ string& entry = *iter;
+ string key;
+ string val;
+
+ if (framework.empty()) {
+ framework = entry;
+ dout(0) << "framework: " << framework << dendl;
+ continue;
+ }
+
+ ssize_t pos = entry.find('=');
+ if (pos < 0) {
+ dout(0) << "framework conf key: " << entry << dendl;
+ config_map[entry] = "";
+ continue;
+ }
+
+ int ret = parse_key_value(entry, key, val);
+ if (ret < 0) {
+ cerr << "ERROR: can't parse " << entry << std::endl;
+ return ret;
+ }
+
+ dout(0) << "framework conf key: " << key << ", val: " << val << dendl;
+ config_map[key] = val;
+ }
+
+ return 0;
+}
+
+bool RGWFrontendConfig::get_val(const string& key, const string& def_val,
+ string *out)
+{
+ map<string, string>::iterator iter = config_map.find(key);
+ if (iter == config_map.end()) {
+ *out = def_val;
+ return false;
+ }
+
+ *out = iter->second;
+ return true;
+}
+
+bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
+{
+ string str;
+ bool found = get_val(key, "", &str);
+ if (!found) {
+ *out = def_val;
+ return false;
+ }
+ string err;
+ *out = strict_strtol(str.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "error parsing int: " << str << ": " << err << std::endl;
+ return -EINVAL;
+ }
+ return 0;
+}
+
+int RGWMongooseFrontend::run() {
+ char thread_pool_buf[32];
+ snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d",
+ (int)g_conf->rgw_thread_pool_size);
+ string port_str;
+ map<string, string> conf_map = conf->get_config_map();
+ conf->get_val("port", "80", &port_str);
+ conf_map.erase("port");
+ conf_map["listening_ports"] = port_str;
+ set_conf_default(conf_map, "enable_keep_alive", "yes");
+ set_conf_default(conf_map, "num_threads", thread_pool_buf);
+ set_conf_default(conf_map, "decode_url", "no");
+
+ const char *options[conf_map.size() * 2 + 1];
+ int i = 0;
+ for (map<string, string>::iterator iter = conf_map.begin();
+ iter != conf_map.end(); ++iter) {
+ options[i] = iter->first.c_str();
+ options[i + 1] = iter->second.c_str();
+ dout(20)<< "civetweb config: " << options[i] << ": "
+ << (options[i + 1] ? options[i + 1] : "<null>") << dendl;
+ i += 2;
+ }
+ options[i] = NULL;
+
+ struct mg_callbacks cb;
+ memset((void *)&cb, 0, sizeof(cb));
+ cb.begin_request = civetweb_callback;
+ cb.log_message = rgw_civetweb_log_callback;
+ cb.log_access = rgw_civetweb_log_access_callback;
+ ctx = mg_start(&cb, &env, (const char **)&options);
+
+ if (!ctx) {
+ return -EIO;
+ }
+
+ return 0;
+} /* RGWMongooseFrontend::run */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_FRONTEND_H
+#define RGW_FRONTEND_H
+
+#include "rgw_request.h"
+#include "rgw_process.h"
+
+#include "rgw_civetweb.h"
+#include "rgw_civetweb_log.h"
+#include "civetweb/civetweb.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+class RGWFrontendConfig {
+ string config;
+ map<string, string> config_map;
+ int parse_config(const string& config, map<string, string>& config_map);
+ string framework;
+public:
+ RGWFrontendConfig(const string& _conf) : config(_conf) {}
+ int init() {
+ int ret = parse_config(config, config_map);
+ if (ret < 0)
+ return ret;
+ return 0;
+ }
+ bool get_val(const string& key, const string& def_val, string *out);
+ bool get_val(const string& key, int def_val, int *out);
+
+ map<string, string>& get_config_map() { return config_map; }
+
+ string get_framework() { return framework; }
+};
+
+class RGWFrontend {
+public:
+ virtual ~RGWFrontend() {}
+
+ virtual int init() = 0;
+
+ virtual int run() = 0;
+ virtual void stop() = 0;
+ virtual void join() = 0;
+};
+
+class RGWMongooseFrontend : public RGWFrontend {
+ RGWFrontendConfig* conf;
+ struct mg_context* ctx;
+ RGWProcessEnv env;
+
+ void set_conf_default(map<string, string>& m, const string& key,
+ const string& def_val) {
+ if (m.find(key) == m.end()) {
+ m[key] = def_val;
+ }
+ }
+
+public:
+ RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+ : conf(_conf), ctx(nullptr), env(pe) {
+ }
+
+ int init() {
+ return 0;
+ }
+
+ int run();
+
+ void stop() {
+ if (ctx) {
+ mg_stop(ctx);
+ }
+ }
+
+ void join() {
+ }
+}; /* RGWMongooseFrontend */
+
+class RGWProcessFrontend : public RGWFrontend {
+protected:
+ RGWFrontendConfig* conf;
+ RGWProcess* pprocess;
+ RGWProcessEnv env;
+ RGWProcessControlThread* thread;
+
+public:
+ RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+ : conf(_conf), pprocess(nullptr), env(pe), thread(nullptr) {
+ }
+
+ ~RGWProcessFrontend() {
+ delete thread;
+ delete pprocess;
+ }
+
+ int run() {
+ assert(pprocess); /* should have initialized by init() */
+ thread = new RGWProcessControlThread(pprocess);
+ thread->create("rgw_frontend");
+ return 0;
+ }
+
+ void stop() {
+ pprocess->close_fd();
+ thread->kill(SIGUSR1);
+ }
+
+ void join() {
+ thread->join();
+ }
+}; /* RGWProcessFrontend */
+
+class RGWFCGXFrontend : public RGWProcessFrontend {
+public:
+ RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+ : RGWProcessFrontend(pe, _conf) {}
+
+ int init() {
+ pprocess = new RGWFCGXProcess(g_ceph_context, &env,
+ g_conf->rgw_thread_pool_size, conf);
+ return 0;
+ }
+}; /* RGWFCGXFrontend */
+
+class RGWLoadGenFrontend : public RGWProcessFrontend {
+public:
+ RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf)
+ : RGWProcessFrontend(pe, _conf) {}
+
+ int init() {
+ int num_threads;
+ conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
+ RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env,
+ num_threads, conf);
+
+ pprocess = pp;
+
+ string uid_str;
+ conf->get_val("uid", "", &uid_str);
+ if (uid_str.empty()) {
+ derr << "ERROR: uid param must be specified for loadgen frontend"
+ << dendl;
+ return EINVAL;
+ }
+
+ rgw_user uid(uid_str);
+
+ RGWUserInfo user_info;
+ int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
+ if (ret < 0) {
+ derr << "ERROR: failed reading user info: uid=" << uid << " ret="
+ << ret << dendl;
+ return ret;
+ }
+
+ map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
+ if (aiter == user_info.access_keys.end()) {
+ derr << "ERROR: user has no S3 access keys set" << dendl;
+ return -EINVAL;
+ }
+
+ pp->set_access_key(aiter->second);
+
+ return 0;
+ }
+}; /* RGWLoadGenFrontend */
+
+#endif /* RGW_FRONTEND_H */
#include <curl/curl.h>
#include "acconfig.h"
-#ifdef FASTCGI_INCLUDE_DIR
-# include "fastcgi/fcgiapp.h"
-#else
-# include "fcgiapp.h"
-#endif
-
-#include "rgw_fcgi.h"
#include "common/ceph_argparse.h"
#include "global/global_init.h"
#include "global/signal_handler.h"
#include "common/config.h"
#include "common/errno.h"
-#include "common/WorkQueue.h"
#include "common/Timer.h"
-#include "common/Throttle.h"
-#include "common/QueueRing.h"
#include "common/safe_io.h"
#include "include/compat.h"
#include "include/str_list.h"
#include "rgw_common.h"
#include "rgw_rados.h"
-#include "rgw_acl.h"
#include "rgw_user.h"
-#include "rgw_op.h"
#include "rgw_rest.h"
#include "rgw_rest_s3.h"
#include "rgw_rest_swift.h"
#include "rgw_log.h"
#include "rgw_tools.h"
#include "rgw_resolve.h"
-#include "rgw_loadgen.h"
-#include "rgw_civetweb.h"
-#include "rgw_civetweb_log.h"
-#include "civetweb/civetweb.h"
+#include "rgw_request.h"
+#include "rgw_process.h"
+#include "rgw_frontend.h"
#include <map>
#include <string>
#include <vector>
-#include <iostream>
-#include <sstream>
#include "include/types.h"
#include "common/BackTrace.h"
static int signal_fd[2] = {0, 0};
static atomic_t disable_signal_fd;
-static void signal_shutdown();
-
-
-#define SOCKET_BACKLOG 1024
-
-struct RGWRequest
-{
- uint64_t id;
- struct req_state *s;
- string req_str;
- RGWOp *op;
- utime_t ts;
-
- explicit RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {
- }
-
- virtual ~RGWRequest() {}
-
- void init_state(req_state *_s) {
- s = _s;
- }
-
- void log_format(struct req_state *s, const char *fmt, ...)
- {
-#define LARGE_SIZE 1024
- char buf[LARGE_SIZE];
- va_list ap;
-
- va_start(ap, fmt);
- vsnprintf(buf, sizeof(buf), fmt, ap);
- va_end(ap);
-
- log(s, buf);
- }
-
- void log_init() {
- ts = ceph_clock_now(g_ceph_context);
- }
-
- void log(struct req_state *s, const char *msg) {
- if (s->info.method && req_str.size() == 0) {
- req_str = s->info.method;
- req_str.append(" ");
- req_str.append(s->info.request_uri);
- }
- utime_t t = ceph_clock_now(g_ceph_context) - ts;
- dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":" << req_str << ":" << (op ? op->name() : "") << ":" << msg << dendl;
- }
-};
-
-class RGWFrontendConfig {
- string config;
- map<string, string> config_map;
- int parse_config(const string& config, map<string, string>& config_map);
- string framework;
-public:
- explicit RGWFrontendConfig(const string& _conf) : config(_conf) {}
- int init() {
- int ret = parse_config(config, config_map);
- if (ret < 0)
- return ret;
- return 0;
- }
- bool get_val(const string& key, const string& def_val, string *out);
- bool get_val(const string& key, int def_val, int *out);
-
- map<string, string>& get_config_map() { return config_map; }
-
- string get_framework() { return framework; }
-};
-
-
-struct RGWFCGXRequest : public RGWRequest {
- FCGX_Request *fcgx;
- QueueRing<FCGX_Request *> *qr;
-
- RGWFCGXRequest(uint64_t req_id, QueueRing<FCGX_Request *> *_qr) : RGWRequest(req_id), qr(_qr) {
- qr->dequeue(&fcgx);
- }
-
- ~RGWFCGXRequest() {
- FCGX_Finish_r(fcgx);
- qr->enqueue(fcgx);
- }
-};
-
-struct RGWProcessEnv {
- RGWRados *store;
- RGWREST *rest;
- OpsLogSocket *olog;
- int port;
-};
-
-class RGWProcess {
- deque<RGWRequest *> m_req_queue;
-protected:
- RGWRados *store;
- OpsLogSocket *olog;
- ThreadPool m_tp;
- Throttle req_throttle;
- RGWREST *rest;
- RGWFrontendConfig *conf;
- int sock_fd;
-
- struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
- RGWProcess *process;
- RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
- : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
-
- bool _enqueue(RGWRequest *req) {
- process->m_req_queue.push_back(req);
- perfcounter->inc(l_rgw_qlen);
- dout(20) << "enqueued request req=" << hex << req << dec << dendl;
- _dump_queue();
- return true;
- }
- void _dequeue(RGWRequest *req) {
- assert(0);
- }
- bool _empty() {
- return process->m_req_queue.empty();
- }
- RGWRequest *_dequeue() {
- if (process->m_req_queue.empty())
- return NULL;
- RGWRequest *req = process->m_req_queue.front();
- process->m_req_queue.pop_front();
- dout(20) << "dequeued request req=" << hex << req << dec << dendl;
- _dump_queue();
- perfcounter->inc(l_rgw_qlen, -1);
- return req;
- }
- void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
- perfcounter->inc(l_rgw_qactive);
- process->handle_request(req);
- process->req_throttle.put(1);
- perfcounter->inc(l_rgw_qactive, -1);
- }
- void _dump_queue() {
- if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
- return;
- }
- deque<RGWRequest *>::iterator iter;
- if (process->m_req_queue.empty()) {
- dout(20) << "RGWWQ: empty" << dendl;
- return;
- }
- dout(20) << "RGWWQ:" << dendl;
- for (iter = process->m_req_queue.begin(); iter != process->m_req_queue.end(); ++iter) {
- dout(20) << "req: " << hex << *iter << dec << dendl;
- }
- }
- void _clear() {
- assert(process->m_req_queue.empty());
- }
- } req_wq;
-
-public:
- RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
- : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
- req_throttle(cct, "rgw_ops", num_threads * 2),
- rest(pe->rest),
- conf(_conf),
- sock_fd(-1),
- req_wq(this, g_conf->rgw_op_thread_timeout,
- g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
- virtual ~RGWProcess() {}
- virtual void run() = 0;
- virtual void handle_request(RGWRequest *req) = 0;
-
- void close_fd() {
- if (sock_fd >= 0) {
- ::close(sock_fd);
- sock_fd = -1;
- }
- }
-};
-
-
-class RGWFCGXProcess : public RGWProcess {
- int max_connections;
-public:
- RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
- RGWProcess(cct, pe, num_threads, _conf),
- max_connections(num_threads + (num_threads >> 3)) /* have a bit more connections than threads so that requests
- are still accepted even if we're still processing older requests */
- {}
- void run();
- void handle_request(RGWRequest *req);
-};
-
-void RGWFCGXProcess::run()
-{
- string socket_path;
- string socket_port;
- string socket_host;
-
- conf->get_val("socket_path", "", &socket_path);
- conf->get_val("socket_port", g_conf->rgw_port, &socket_port);
- conf->get_val("socket_host", g_conf->rgw_host, &socket_host);
-
- if (socket_path.empty() && socket_port.empty() && socket_host.empty()) {
- socket_path = g_conf->rgw_socket_path;
- if (socket_path.empty()) {
- dout(0) << "ERROR: no socket server point defined, cannot start fcgi frontend" << dendl;
- return;
- }
- }
-
- if (!socket_path.empty()) {
- string path_str = socket_path;
-
- /* this is necessary, as FCGX_OpenSocket might not return an error, but rather ungracefully exit */
- int fd = open(path_str.c_str(), O_CREAT, 0644);
- if (fd < 0) {
- int err = errno;
- /* ENXIO is actually expected, we'll get that if we try to open a unix domain socket */
- if (err != ENXIO) {
- dout(0) << "ERROR: cannot create socket: path=" << path_str << " error=" << cpp_strerror(err) << dendl;
- return;
- }
- } else {
- close(fd);
- }
-
- const char *path = path_str.c_str();
- sock_fd = FCGX_OpenSocket(path, SOCKET_BACKLOG);
- if (sock_fd < 0) {
- dout(0) << "ERROR: FCGX_OpenSocket (" << path << ") returned " << sock_fd << dendl;
- return;
- }
- if (chmod(path, 0777) < 0) {
- dout(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl;
- }
- } else if (!socket_port.empty()) {
- string bind = socket_host + ":" + socket_port;
- sock_fd = FCGX_OpenSocket(bind.c_str(), SOCKET_BACKLOG);
- if (sock_fd < 0) {
- dout(0) << "ERROR: FCGX_OpenSocket (" << bind.c_str() << ") returned " << sock_fd << dendl;
- return;
- }
- }
-
- m_tp.start();
-
- FCGX_Request fcgx_reqs[max_connections];
-
- QueueRing<FCGX_Request *> qr(max_connections);
- for (int i = 0; i < max_connections; i++) {
- FCGX_Request *fcgx = &fcgx_reqs[i];
- FCGX_InitRequest(fcgx, sock_fd, 0);
- qr.enqueue(fcgx);
- }
-
- for (;;) {
- RGWFCGXRequest *req = new RGWFCGXRequest(store->get_new_req_id(), &qr);
- dout(10) << "allocated request req=" << hex << req << dec << dendl;
- req_throttle.get(1);
- int ret = FCGX_Accept_r(req->fcgx);
- if (ret < 0) {
- delete req;
- dout(0) << "ERROR: FCGX_Accept_r returned " << ret << dendl;
- req_throttle.put(1);
- break;
- }
-
- req_wq.queue(req);
- }
-
- m_tp.drain(&req_wq);
- m_tp.stop();
-
- dout(20) << "cleaning up fcgx connections" << dendl;
-
- for (int i = 0; i < max_connections; i++) {
- FCGX_Finish_r(&fcgx_reqs[i]);
- }
-}
-
-struct RGWLoadGenRequest : public RGWRequest {
- string method;
- string resource;
- int content_length;
- atomic_t *fail_flag;
-
-
- RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
- atomic_t *ff) : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
-};
-
-class RGWLoadGenProcess : public RGWProcess {
- RGWAccessKey access_key;
-public:
- RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
- RGWProcess(cct, pe, num_threads, _conf) {}
- void run();
- void checkpoint();
- void handle_request(RGWRequest *req);
- void gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag);
-
- void set_access_key(RGWAccessKey& key) { access_key = key; }
-};
-
-void RGWLoadGenProcess::checkpoint()
-{
- m_tp.drain(&req_wq);
-}
-
-void RGWLoadGenProcess::run()
-{
- m_tp.start(); /* start thread pool */
-
- int i;
-
- int num_objs;
-
- conf->get_val("num_objs", 1000, &num_objs);
-
- int num_buckets;
- conf->get_val("num_buckets", 1, &num_buckets);
-
- vector<string> buckets(num_buckets);
-
- atomic_t failed;
-
- for (i = 0; i < num_buckets; i++) {
- buckets[i] = "/loadgen";
- string& bucket = buckets[i];
- append_rand_alpha(NULL, bucket, bucket, 16);
-
- /* first create a bucket */
- gen_request("PUT", bucket, 0, &failed);
- checkpoint();
- }
-
- string *objs = new string[num_objs];
-
- if (failed.read()) {
- derr << "ERROR: bucket creation failed" << dendl;
- goto done;
- }
-
- for (i = 0; i < num_objs; i++) {
- char buf[16 + 1];
- gen_rand_alphanumeric(NULL, buf, sizeof(buf));
- buf[16] = '\0';
- objs[i] = buckets[i % num_buckets] + "/" + buf;
- }
-
- for (i = 0; i < num_objs; i++) {
- gen_request("PUT", objs[i], 4096, &failed);
- }
-
- checkpoint();
-
- if (failed.read()) {
- derr << "ERROR: bucket creation failed" << dendl;
- goto done;
- }
-
- for (i = 0; i < num_objs; i++) {
- gen_request("GET", objs[i], 4096, NULL);
- }
-
- checkpoint();
-
- for (i = 0; i < num_objs; i++) {
- gen_request("DELETE", objs[i], 0, NULL);
- }
-
- checkpoint();
-
- for (i = 0; i < num_buckets; i++) {
- gen_request("DELETE", buckets[i], 0, NULL);
- }
-
-done:
- checkpoint();
-
- m_tp.stop();
-
- delete[] objs;
-
- signal_shutdown();
-}
-
-void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
-{
- RGWLoadGenRequest *req = new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
- content_length, fail_flag);
- dout(10) << "allocated request req=" << hex << req << dec << dendl;
- req_throttle.get(1);
- req_wq.queue(req);
-}
-
-static void signal_shutdown()
+void signal_shutdown()
{
if (!disable_signal_fd.read()) {
int val = 0;
int ret = write(signal_fd[0], (char *)&val, sizeof(val));
if (ret < 0) {
int err = -errno;
- derr << "ERROR: " << __func__ << ": write() returned " << cpp_strerror(-err) << dendl;
+ derr << "ERROR: " << __func__ << ": write() returned "
+ << cpp_strerror(-err) << dendl;
}
}
}
_exit(0);
}
-static int process_request(RGWRados *store, RGWREST *rest, RGWRequest *req, RGWClientIO *client_io, OpsLogSocket *olog)
-{
- int ret = 0;
-
- client_io->init(g_ceph_context);
-
- req->log_init();
-
- dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl;
- perfcounter->inc(l_rgw_req);
-
- RGWEnv& rgw_env = client_io->get_env();
-
- struct req_state rstate(g_ceph_context, &rgw_env);
-
- struct req_state *s = &rstate;
-
- RGWObjectCtx rados_ctx(store, s);
- s->obj_ctx = &rados_ctx;
-
- s->req_id = store->unique_id(req->id);
- s->trans_id = store->unique_trans_id(req->id);
- s->host_id = store->host_id;
-
- req->log_format(s, "initializing for trans_id = %s", s->trans_id.c_str());
-
- RGWOp *op = NULL;
- int init_error = 0;
- bool should_log = false;
- RGWRESTMgr *mgr;
- RGWHandler *handler = rest->get_handler(store, s, client_io, &mgr, &init_error);
- if (init_error != 0) {
- abort_early(s, NULL, init_error, NULL);
- goto done;
- }
- dout(10) << "handler=" << typeid(*handler).name() << dendl;
-
- should_log = mgr->get_logging();
-
- req->log_format(s, "getting op %d", s->op);
- op = handler->get_op(store);
- if (!op) {
- abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler);
- goto done;
- }
- req->op = op;
- dout(10) << "op=" << typeid(*op).name() << dendl;
-
- req->log(s, "authorizing");
- ret = handler->authorize();
- if (ret < 0) {
- dout(10) << "failed to authorize request" << dendl;
- abort_early(s, NULL, ret, handler);
- goto done;
- }
-
- req->log(s, "normalizing buckets and tenants");
- ret = handler->postauth_init();
- if (ret < 0) {
- dout(10) << "failed to run post-auth init" << dendl;
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- if (s->user.suspended) {
- dout(10) << "user is suspended, uid=" << s->user.user_id << dendl;
- abort_early(s, op, -ERR_USER_SUSPENDED, handler);
- goto done;
- }
-
- req->log(s, "init permissions");
- ret = handler->init_permissions(op);
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- /**
- * Only some accesses support website mode, and website mode does NOT apply
- * if you are using the REST endpoint either (ergo, no authenticated access)
- */
- req->log(s, "recalculating target");
- ret = handler->retarget(op, &op);
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
- req->op = op;
-
- req->log(s, "reading permissions");
- ret = handler->read_permissions(op);
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- req->log(s, "init op");
- ret = op->init_processing();
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- req->log(s, "verifying op mask");
- ret = op->verify_op_mask();
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- req->log(s, "verifying op permissions");
- ret = op->verify_permission();
- if (ret < 0) {
- if (s->system_request) {
- dout(2) << "overriding permissions due to system operation" << dendl;
- } else {
- abort_early(s, op, ret, handler);
- goto done;
- }
- }
-
- req->log(s, "verifying op params");
- ret = op->verify_params();
- if (ret < 0) {
- abort_early(s, op, ret, handler);
- goto done;
- }
-
- req->log(s, "pre-executing");
- op->pre_exec();
-
- req->log(s, "executing");
- op->execute();
-
- req->log(s, "completing");
- op->complete();
-done:
- int r = client_io->complete_request();
- if (r < 0) {
- dout(0) << "ERROR: client_io->complete_request() returned " << r << dendl;
- }
- if (should_log) {
- rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
- }
-
- int http_ret = s->err.http_ret;
-
- int op_ret = 0;
- if (op) {
- op_ret = op->get_ret();
- }
-
- req->log_format(s, "op status=%d", op_ret);
- req->log_format(s, "http status=%d", http_ret);
-
- if (handler)
- handler->put_op(op);
- rest->put_handler(handler);
-
- dout(1) << "====== req done req=" << hex << req << dec
- << " op status=" << op_ret
- << " http_status=" << http_ret
- << " ======"
- << dendl;
-
- return (ret < 0 ? ret : s->err.ret);
-}
-
-void RGWFCGXProcess::handle_request(RGWRequest *r)
-{
- RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
- FCGX_Request *fcgx = req->fcgx;
- RGWFCGX client_io(fcgx);
-
-
- int ret = process_request(store, rest, req, &client_io, olog);
- if (ret < 0) {
- /* we don't really care about return code */
- dout(20) << "process_request() returned " << ret << dendl;
- }
-
- FCGX_Finish_r(fcgx);
-
- delete req;
-}
-
-void RGWLoadGenProcess::handle_request(RGWRequest *r)
-{
- RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
-
- RGWLoadGenRequestEnv env;
-
- utime_t tm = ceph_clock_now(NULL);
-
- env.port = 80;
- env.content_length = req->content_length;
- env.content_type = "binary/octet-stream";
- env.request_method = req->method;
- env.uri = req->resource;
- env.set_date(tm);
- env.sign(access_key);
-
- RGWLoadGenIO client_io(&env);
-
- int ret = process_request(store, rest, req, &client_io, olog);
- if (ret < 0) {
- /* we don't really care about return code */
- dout(20) << "process_request() returned " << ret << dendl;
-
- if (req->fail_flag) {
- req->fail_flag->inc();
- }
- }
-
- delete req;
-}
-
-
-static int civetweb_callback(struct mg_connection *conn) {
- struct mg_request_info *req_info = mg_get_request_info(conn);
- RGWProcessEnv *pe = static_cast<RGWProcessEnv *>(req_info->user_data);
- RGWRados *store = pe->store;
- RGWREST *rest = pe->rest;
- OpsLogSocket *olog = pe->olog;
-
- RGWRequest *req = new RGWRequest(store->get_new_req_id());
- RGWMongoose client_io(conn, pe->port);
-
- int ret = process_request(store, rest, req, &client_io, olog);
- if (ret < 0) {
- /* we don't really care about return code */
- dout(20) << "process_request() returned " << ret << dendl;
- }
-
- delete req;
-
-// Mark as processed
- return 1;
-}
-
#ifdef HAVE_CURL_MULTI_WAIT
static void check_curl()
{
return mgr;
}
-
-int RGWFrontendConfig::parse_config(const string& config, map<string, string>& config_map)
-{
- list<string> config_list;
- get_str_list(config, " ", config_list);
-
- list<string>::iterator iter;
- for (iter = config_list.begin(); iter != config_list.end(); ++iter) {
- string& entry = *iter;
- string key;
- string val;
-
- if (framework.empty()) {
- framework = entry;
- dout(0) << "framework: " << framework << dendl;
- continue;
- }
-
- ssize_t pos = entry.find('=');
- if (pos < 0) {
- dout(0) << "framework conf key: " << entry << dendl;
- config_map[entry] = "";
- continue;
- }
-
- int ret = parse_key_value(entry, key, val);
- if (ret < 0) {
- cerr << "ERROR: can't parse " << entry << std::endl;
- return ret;
- }
-
- dout(0) << "framework conf key: " << key << ", val: " << val << dendl;
- config_map[key] = val;
- }
-
- return 0;
-}
-
-
-bool RGWFrontendConfig::get_val(const string& key, const string& def_val, string *out)
-{
- map<string, string>::iterator iter = config_map.find(key);
- if (iter == config_map.end()) {
- *out = def_val;
- return false;
- }
-
- *out = iter->second;
- return true;
-}
-
-
-bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
-{
- string str;
- bool found = get_val(key, "", &str);
- if (!found) {
- *out = def_val;
- return false;
- }
- string err;
- *out = strict_strtol(str.c_str(), 10, &err);
- if (!err.empty()) {
- cerr << "error parsing int: " << str << ": " << err << std::endl;
- return -EINVAL;
- }
- return 0;
-}
-
-class RGWFrontend {
-public:
- virtual ~RGWFrontend() {}
-
- virtual int init() = 0;
-
- virtual int run() = 0;
- virtual void stop() = 0;
- virtual void join() = 0;
-};
-
-class RGWProcessControlThread : public Thread {
- RGWProcess *pprocess;
-public:
- explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
-
- void *entry() {
- pprocess->run();
- return NULL;
- }
-};
-
-class RGWProcessFrontend : public RGWFrontend {
-protected:
- RGWFrontendConfig *conf;
- RGWProcess *pprocess;
- RGWProcessEnv env;
- RGWProcessControlThread *thread;
-
-public:
- RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
- }
-
- ~RGWProcessFrontend() {
- delete thread;
- delete pprocess;
- }
-
- int run() {
- assert(pprocess); /* should have initialized by init() */
- thread = new RGWProcessControlThread(pprocess);
- thread->create("rgw_frontend");
- return 0;
- }
-
- void stop() {
- pprocess->close_fd();
- thread->kill(SIGUSR1);
- }
-
- void join() {
- thread->join();
- }
-};
-
-class RGWFCGXFrontend : public RGWProcessFrontend {
-public:
- RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
-
- int init() {
- pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
- return 0;
- }
-};
-
-class RGWLoadGenFrontend : public RGWProcessFrontend {
-public:
- RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
-
- int init() {
- int num_threads;
- conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
- RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, num_threads, conf);
-
- pprocess = pp;
-
- string uid_str;
- conf->get_val("uid", "", &uid_str);
- if (uid_str.empty()) {
- derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
- return EINVAL;
- }
-
- rgw_user uid(uid_str);
-
- RGWUserInfo user_info;
- int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
- if (ret < 0) {
- derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
- return ret;
- }
-
- map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
- if (aiter == user_info.access_keys.end()) {
- derr << "ERROR: user has no S3 access keys set" << dendl;
- return -EINVAL;
- }
-
- pp->set_access_key(aiter->second);
-
- return 0;
- }
-};
-
-class RGWMongooseFrontend : public RGWFrontend {
- RGWFrontendConfig *conf;
- struct mg_context *ctx;
- RGWProcessEnv env;
-
- void set_conf_default(map<string, string>& m, const string& key, const string& def_val) {
- if (m.find(key) == m.end()) {
- m[key] = def_val;
- }
- }
-
-public:
- RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
- }
-
- int init() {
- return 0;
- }
-
- int run() {
- char thread_pool_buf[32];
- snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
- string port_str;
- map<string, string> conf_map = conf->get_config_map();
- conf->get_val("port", "80", &port_str);
- conf_map.erase("port");
- conf_map["listening_ports"] = port_str;
- set_conf_default(conf_map, "enable_keep_alive", "yes");
- set_conf_default(conf_map, "num_threads", thread_pool_buf);
- set_conf_default(conf_map, "decode_url", "no");
-
- const char *options[conf_map.size() * 2 + 1];
- int i = 0;
- for (map<string, string>::iterator iter = conf_map.begin(); iter != conf_map.end(); ++iter) {
- options[i] = iter->first.c_str();
- options[i + 1] = iter->second.c_str();
- dout(20)<< "civetweb config: " << options[i] << ": " << (options[i + 1] ? options[i + 1] : "<null>") << dendl;
- i += 2;
- }
- options[i] = NULL;
-
- struct mg_callbacks cb;
- memset((void *)&cb, 0, sizeof(cb));
- cb.begin_request = civetweb_callback;
- cb.log_message = rgw_civetweb_log_callback;
- cb.log_access = rgw_civetweb_log_access_callback;
- ctx = mg_start(&cb, &env, (const char **)&options);
-
- if (!ctx) {
- return -EIO;
- }
-
- return 0;
- }
-
- void stop() {
- if (ctx) {
- mg_stop(ctx);
- }
- }
-
- void join() {
- }
-};
-
/*
* start up the RADOS connection and then handle HTTP messages as they come in
*/
if (apis_map.count("swift") > 0) {
do_swift = true;
swift_init(g_ceph_context);
- rest.register_resource(g_conf->rgw_swift_url_prefix, set_logging(new RGWRESTMgr_SWIFT));
+ rest.register_resource(g_conf->rgw_swift_url_prefix,
+ set_logging(new RGWRESTMgr_SWIFT));
}
if (apis_map.count("swift_auth") > 0)
- rest.register_resource(g_conf->rgw_swift_auth_entry, set_logging(new RGWRESTMgr_SWIFT_Auth));
+ rest.register_resource(g_conf->rgw_swift_auth_entry,
+ set_logging(new RGWRESTMgr_SWIFT_Auth));
if (apis_map.count("admin") > 0) {
RGWRESTMgr_Admin *admin_resource = new RGWRESTMgr_Admin;
if (frontends.empty()) {
frontends.push_back("fastcgi");
}
- for (list<string>::iterator iter = frontends.begin(); iter != frontends.end(); ++iter) {
+ for (list<string>::iterator iter = frontends.begin();
+ iter != frontends.end(); ++iter) {
string& f = *iter;
if (f.find("civetweb") != string::npos) {
list<RGWFrontend *> fes;
- for (multimap<string, RGWFrontendConfig *>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
+ for (multimap<string, RGWFrontendConfig *>::iterator fiter = fe_map.begin();
+ fiter != fe_map.end(); ++fiter) {
RGWFrontendConfig *config = fiter->second;
string framework = config->get_framework();
RGWFrontend *fe;
derr << "shutting down" << dendl;
- for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+ for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end();
+ ++liter) {
RGWFrontend *fe = *liter;
fe->stop();
}
- for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+ for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end();
+ ++liter) {
RGWFrontend *fe = *liter;
fe->join();
delete fe;
}
- for (list<RGWFrontendConfig *>::iterator liter = configs.begin(); liter != configs.end(); ++liter) {
+ for (list<RGWFrontendConfig *>::iterator liter = configs.begin();
+ liter != configs.end(); ++liter) {
RGWFrontendConfig *fec = *liter;
delete fec;
}
return 0;
}
-
}
}
- op_ret = rgw_bucket_sync_user_stats(store, s->user.user_id, s->bucket);
+ op_ret = rgw_bucket_sync_user_stats(store, s->user->user_id, s->bucket);
if ( op_ret < 0) {
ldout(s->cct, 1) << "WARNING: failed to sync user stats before bucket delete: op_ret= " << op_ret << dendl;
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/errno.h"
+#include "common/Throttle.h"
+#include "common/WorkQueue.h"
+
+#include "rgw_rados.h"
+#include "rgw_rest.h"
+#include "rgw_frontend.h"
+#include "rgw_request.h"
+#include "rgw_process.h"
+#include "rgw_loadgen.h"
+#include "rgw_client_io.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+void RGWProcess::RGWWQ::_dump_queue()
+{
+ if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+ return;
+ }
+ deque<RGWRequest *>::iterator iter;
+ if (process->m_req_queue.empty()) {
+ dout(20) << "RGWWQ: empty" << dendl;
+ return;
+ }
+ dout(20) << "RGWWQ:" << dendl;
+ for (iter = process->m_req_queue.begin();
+ iter != process->m_req_queue.end(); ++iter) {
+ dout(20) << "req: " << hex << *iter << dec << dendl;
+ }
+} /* RGWProcess::RGWWQ::_dump_queue */
+
+void RGWFCGXProcess::run()
+{
+ string socket_path;
+ string socket_port;
+ string socket_host;
+
+ conf->get_val("socket_path", "", &socket_path);
+ conf->get_val("socket_port", g_conf->rgw_port, &socket_port);
+ conf->get_val("socket_host", g_conf->rgw_host, &socket_host);
+
+ if (socket_path.empty() && socket_port.empty() && socket_host.empty()) {
+ socket_path = g_conf->rgw_socket_path;
+ if (socket_path.empty()) {
+ dout(0) << "ERROR: no socket server point defined, cannot "
+ "start fcgi frontend" << dendl;
+ return;
+ }
+ }
+
+ if (!socket_path.empty()) {
+ string path_str = socket_path;
+
+ /* this is necessary, as FCGX_OpenSocket might not return an
+ * error, but rather ungracefully exit */
+ int fd = open(path_str.c_str(), O_CREAT, 0644);
+ if (fd < 0) {
+ int err = errno;
+ /* ENXIO is actually expected, we'll get that if we try to open
+ * a unix domain socket */
+ if (err != ENXIO) {
+ dout(0) << "ERROR: cannot create socket: path=" << path_str
+ << " error=" << cpp_strerror(err) << dendl;
+ return;
+ }
+ } else {
+ close(fd);
+ }
+
+ const char *path = path_str.c_str();
+ sock_fd = FCGX_OpenSocket(path, SOCKET_BACKLOG);
+ if (sock_fd < 0) {
+ dout(0) << "ERROR: FCGX_OpenSocket (" << path << ") returned "
+ << sock_fd << dendl;
+ return;
+ }
+ if (chmod(path, 0777) < 0) {
+ dout(0) << "WARNING: couldn't set permissions on unix domain socket"
+ << dendl;
+ }
+ } else if (!socket_port.empty()) {
+ string bind = socket_host + ":" + socket_port;
+ sock_fd = FCGX_OpenSocket(bind.c_str(), SOCKET_BACKLOG);
+ if (sock_fd < 0) {
+ dout(0) << "ERROR: FCGX_OpenSocket (" << bind.c_str() << ") returned "
+ << sock_fd << dendl;
+ return;
+ }
+ }
+
+ m_tp.start();
+
+ FCGX_Request fcgx_reqs[max_connections];
+
+ QueueRing<FCGX_Request*> qr(max_connections);
+ for (int i = 0; i < max_connections; i++) {
+ FCGX_Request* fcgx = &fcgx_reqs[i];
+ FCGX_InitRequest(fcgx, sock_fd, 0);
+ qr.enqueue(fcgx);
+ }
+
+ for (;;) {
+ RGWFCGXRequest* req = new RGWFCGXRequest(store->get_new_req_id(), &qr);
+ dout(10) << "allocated request req=" << hex << req << dec << dendl;
+ req_throttle.get(1);
+ int ret = FCGX_Accept_r(req->fcgx);
+ if (ret < 0) {
+ delete req;
+ dout(0) << "ERROR: FCGX_Accept_r returned " << ret << dendl;
+ req_throttle.put(1);
+ break;
+ }
+ req_wq.queue(req);
+ }
+
+ m_tp.drain(&req_wq);
+ m_tp.stop();
+
+ dout(20) << "cleaning up fcgx connections" << dendl;
+
+ for (int i = 0; i < max_connections; i++) {
+ FCGX_Finish_r(&fcgx_reqs[i]);
+ }
+} /* RGWFCGXProcess::run */
+
+void RGWFCGXProcess::handle_request(RGWRequest *r)
+{
+ RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
+ FCGX_Request *fcgx = req->fcgx;
+ RGWFCGX client_io(fcgx);
+
+
+ int ret = process_request(store, rest, req, &client_io, olog);
+ if (ret < 0) {
+ /* we don't really care about return code */
+ dout(20) << "process_request() returned " << ret << dendl;
+ }
+
+ FCGX_Finish_r(fcgx);
+
+ delete req;
+}
+
+void RGWFCGXProcess::handle_request(RGWRequest *r)
+{
+ RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
+ FCGX_Request *fcgx = req->fcgx;
+ RGWFCGX client_io(fcgx);
+
+
+ int ret = process_request(store, rest, req, &client_io, olog);
+ if (ret < 0) {
+ /* we don't really care about return code */
+ dout(20) << "process_request() returned " << ret << dendl;
+ }
+
+ FCGX_Finish_r(fcgx);
+
+ delete req;
+} /* RGWFCGXProcess::handle_request */
+
+void RGWLoadGenProcess::checkpoint()
+{
+ m_tp.drain(&req_wq);
+}
+
+void RGWLoadGenProcess::run()
+{
+ m_tp.start(); /* start thread pool */
+
+ int i;
+
+ int num_objs;
+
+ conf->get_val("num_objs", 1000, &num_objs);
+
+ int num_buckets;
+ conf->get_val("num_buckets", 1, &num_buckets);
+
+ vector<string> buckets(num_buckets);
+
+ atomic_t failed;
+
+ for (i = 0; i < num_buckets; i++) {
+ buckets[i] = "/loadgen";
+ string& bucket = buckets[i];
+ append_rand_alpha(NULL, bucket, bucket, 16);
+
+ /* first create a bucket */
+ gen_request("PUT", bucket, 0, &failed);
+ checkpoint();
+ }
+
+ string *objs = new string[num_objs];
+
+ if (failed.read()) {
+ derr << "ERROR: bucket creation failed" << dendl;
+ goto done;
+ }
+
+ for (i = 0; i < num_objs; i++) {
+ char buf[16 + 1];
+ gen_rand_alphanumeric(NULL, buf, sizeof(buf));
+ buf[16] = '\0';
+ objs[i] = buckets[i % num_buckets] + "/" + buf;
+ }
+
+ for (i = 0; i < num_objs; i++) {
+ gen_request("PUT", objs[i], 4096, &failed);
+ }
+
+ checkpoint();
+
+ if (failed.read()) {
+ derr << "ERROR: bucket creation failed" << dendl;
+ goto done;
+ }
+
+ for (i = 0; i < num_objs; i++) {
+ gen_request("GET", objs[i], 4096, NULL);
+ }
+
+ checkpoint();
+
+ for (i = 0; i < num_objs; i++) {
+ gen_request("DELETE", objs[i], 0, NULL);
+ }
+
+ checkpoint();
+
+ for (i = 0; i < num_buckets; i++) {
+ gen_request("DELETE", buckets[i], 0, NULL);
+ }
+
+done:
+ checkpoint();
+
+ m_tp.stop();
+
+ delete[] objs;
+
+ signal_shutdown();
+} /* RGWLoadGenProcess::run() */
+
+void RGWLoadGenProcess::gen_request(const string& method,
+ const string& resource,
+ int content_length, atomic_t* fail_flag)
+{
+ RGWLoadGenRequest* req =
+ new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
+ content_length, fail_flag);
+ dout(10) << "allocated request req=" << hex << req << dec << dendl;
+ req_throttle.get(1);
+ req_wq.queue(req);
+} /* RGWLoadGenProcess::gen_request */
+
+void RGWLoadGenProcess::handle_request(RGWRequest *r)
+{
+ RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
+
+ RGWLoadGenRequestEnv env;
+
+ utime_t tm = ceph_clock_now(NULL);
+
+ env.port = 80;
+ env.content_length = req->content_length;
+ env.content_type = "binary/octet-stream";
+ env.request_method = req->method;
+ env.uri = req->resource;
+ env.set_date(tm);
+ env.sign(access_key);
+
+ RGWLoadGenIO client_io(&env);
+
+ int ret = process_request(store, rest, req, &client_io, olog);
+ if (ret < 0) {
+ /* we don't really care about return code */
+ dout(20) << "process_request() returned " << ret << dendl;
+
+ if (req->fail_flag) {
+ req->fail_flag->inc();
+ }
+ }
+
+ delete req;
+} /* RGWLoadGenProcess::handle_request */
+
+int process_request(RGWRados* store, RGWREST* rest, RGWRequest* req,
+ RGWClientIO* client_io, OpsLogSocket* olog)
+{
+ int ret = 0;
+
+ client_io->init(g_ceph_context);
+
+ req->log_init();
+
+ dout(1) << "====== starting new request req=" << hex << req << dec
+ << " =====" << dendl;
+ perfcounter->inc(l_rgw_req);
+
+ RGWEnv& rgw_env = client_io->get_env();
+
+ struct req_state rstate(g_ceph_context, &rgw_env);
+
+ struct req_state *s = &rstate;
+
+ RGWObjectCtx rados_ctx(store, s);
+ s->obj_ctx = &rados_ctx;
+
+ s->req_id = store->unique_id(req->id);
+ s->trans_id = store->unique_trans_id(req->id);
+ s->host_id = store->host_id;
+
+ req->log_format(s, "initializing for trans_id = %s", s->trans_id.c_str());
+
+ RGWOp *op = NULL;
+ int init_error = 0;
+ bool should_log = false;
+ RGWRESTMgr *mgr;
+ RGWHandler *handler = rest->get_handler(store, s, client_io, &mgr,
+ &init_error);
+ if (init_error != 0) {
+ abort_early(s, NULL, init_error, NULL);
+ goto done;
+ }
+ dout(10) << "handler=" << typeid(*handler).name() << dendl;
+
+ should_log = mgr->get_logging();
+
+ req->log_format(s, "getting op %d", s->op);
+ op = handler->get_op(store);
+ if (!op) {
+ abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler);
+ goto done;
+ }
+ req->op = op;
+ dout(10) << "op=" << typeid(*op).name() << dendl;
+
+ req->log(s, "authorizing");
+ ret = handler->authorize();
+ if (ret < 0) {
+ dout(10) << "failed to authorize request" << dendl;
+ abort_early(s, NULL, ret, handler);
+ goto done;
+ }
+
+ req->log(s, "normalizing buckets and tenants");
+ ret = handler->postauth_init();
+ if (ret < 0) {
+ dout(10) << "failed to run post-auth init" << dendl;
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ if (s->user.suspended) {
+ dout(10) << "user is suspended, uid=" << s->user.user_id << dendl;
+ abort_early(s, op, -ERR_USER_SUSPENDED, handler);
+ goto done;
+ }
+
+ req->log(s, "init permissions");
+ ret = handler->init_permissions(op);
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ /**
+ * Only some accesses support website mode, and website mode does NOT apply
+ * if you are using the REST endpoint either (ergo, no authenticated access)
+ */
+ req->log(s, "recalculating target");
+ ret = handler->retarget(op, &op);
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+ req->op = op;
+
+ req->log(s, "reading permissions");
+ ret = handler->read_permissions(op);
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ req->log(s, "init op");
+ ret = op->init_processing();
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ req->log(s, "verifying op mask");
+ ret = op->verify_op_mask();
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ req->log(s, "verifying op permissions");
+ ret = op->verify_permission();
+ if (ret < 0) {
+ if (s->system_request) {
+ dout(2) << "overriding permissions due to system operation" << dendl;
+ } else {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+ }
+
+ req->log(s, "verifying op params");
+ ret = op->verify_params();
+ if (ret < 0) {
+ abort_early(s, op, ret, handler);
+ goto done;
+ }
+
+ req->log(s, "pre-executing");
+ op->pre_exec();
+
+ req->log(s, "executing");
+ op->execute();
+
+ req->log(s, "completing");
+ op->complete();
+done:
+ int r = client_io->complete_request();
+ if (r < 0) {
+ dout(0) << "ERROR: client_io->complete_request() returned " << r << dendl;
+ }
+ if (should_log) {
+ rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
+ }
+
+ int http_ret = s->err.http_ret;
+
+ int op_ret = 0;
+ if (op) {
+ op_ret = op->get_ret();
+ }
+
+ req->log_format(s, "op status=%d", op_ret);
+ req->log_format(s, "http status=%d", http_ret);
+
+ if (handler)
+ handler->put_op(op);
+ rest->put_handler(handler);
+
+ dout(1) << "====== req done req=" << hex << req << dec
+ << " op status=" << op_ret
+ << " http_status=" << http_ret
+ << " ======"
+ << dendl;
+
+ return (ret < 0 ? ret : s->err.ret);
+} /* process_request */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_PROCESS_H
+#define RGW_PROCESS_H
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_acl.h"
+#include "rgw_user.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#if !defined(dout_subsys)
+#define dout_subsys ceph_subsys_rgw
+#define def_dout_subsys
+#endif
+
+#define SOCKET_BACKLOG 1024
+
+extern void signal_shutdown();
+
+struct RGWProcessEnv {
+ RGWRados *store;
+ RGWREST *rest;
+ OpsLogSocket *olog;
+ int port;
+};
+
+class RGWFrontendConfig;
+
+class RGWProcess {
+ deque<RGWRequest*> m_req_queue;
+protected:
+ RGWRados* store;
+ OpsLogSocket* olog;
+ ThreadPool m_tp;
+ Throttle req_throttle;
+ RGWREST* rest;
+ RGWFrontendConfig* conf;
+ int sock_fd;
+
+ struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
+ RGWProcess* process;
+ RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
+ : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
+ tp), process(p) {}
+
+ bool _enqueue(RGWRequest* req) {
+ process->m_req_queue.push_back(req);
+ perfcounter->inc(l_rgw_qlen);
+ dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return true;
+ }
+
+ void _dequeue(RGWRequest* req) {
+ assert(0);
+ }
+
+ bool _empty() {
+ return process->m_req_queue.empty();
+ }
+
+ RGWRequest* _dequeue() {
+ if (process->m_req_queue.empty())
+ return NULL;
+ RGWRequest *req = process->m_req_queue.front();
+ process->m_req_queue.pop_front();
+ dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ perfcounter->inc(l_rgw_qlen, -1);
+ return req;
+ }
+
+ using ThreadPool::WorkQueue<RGWRequest>::_process;
+
+ void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
+ perfcounter->inc(l_rgw_qactive);
+ process->handle_request(req);
+ process->req_throttle.put(1);
+ perfcounter->inc(l_rgw_qactive, -1);
+ }
+
+ void _dump_queue();
+
+ void _clear() {
+ assert(process->m_req_queue.empty());
+ }
+ } req_wq;
+
+public:
+ RGWProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+ RGWFrontendConfig* _conf)
+ : store(pe->store), olog(pe->olog),
+ m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
+ req_throttle(cct, "rgw_ops", num_threads * 2),
+ rest(pe->rest),
+ conf(_conf),
+ sock_fd(-1),
+ req_wq(this, g_conf->rgw_op_thread_timeout,
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
+
+ virtual ~RGWProcess() {}
+
+ virtual void run() = 0;
+ virtual void handle_request(RGWRequest *req) = 0;
+
+ void close_fd() {
+ if (sock_fd >= 0) {
+ ::close(sock_fd);
+ sock_fd = -1;
+ }
+ }
+}; /* RGWProcess */
+
+class RGWFCGXProcess : public RGWProcess {
+ int max_connections;
+public:
+
+ /* have a bit more connections than threads so that requests are
+ * still accepted even if we're still processing older requests */
+ RGWFCGXProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+ RGWFrontendConfig* _conf)
+ : RGWProcess(cct, pe, num_threads, _conf),
+ max_connections(num_threads + (num_threads >> 3))
+ {}
+
+ void run();
+ void handle_request(RGWRequest* req);
+};
+
+class RGWProcessControlThread : public Thread {
+ RGWProcess *pprocess;
+public:
+ RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
+
+ void *entry() {
+ pprocess->run();
+ return NULL;
+ }
+};
+
+class RGWLoadGenProcess : public RGWProcess {
+ RGWAccessKey access_key;
+public:
+ RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+ RGWFrontendConfig* _conf) :
+ RGWProcess(cct, pe, num_threads, _conf) {}
+ void run();
+ void checkpoint();
+ void handle_request(RGWRequest* req);
+ void gen_request(const string& method, const string& resource,
+ int content_length, atomic_t* fail_flag);
+
+ void set_access_key(RGWAccessKey& key) { access_key = key; }
+};
+
+int process_request(RGWRados* store, RGWREST* rest, RGWRequest* req,
+ RGWClientIO* client_io, OpsLogSocket* olog);
+
+#if defined(def_dout_subsys)
+#undef def_dout_subsys
+#undef dout_subsys
+#endif
+
+#endif /* RGW_PROCESS_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_op.h"
+#include "rgw_request.h"
+
+#define dout_subsys ceph_subsys_auth
+
+/* XXX */
+void RGWRequest::log_format(struct req_state *s, const char *fmt, ...)
+{
+#define LARGE_SIZE 1024
+ char buf[LARGE_SIZE];
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ log(s, buf);
+} /* RGWRequest::log_format */
+
+void RGWRequest::log_init() {
+ ts = ceph_clock_now(g_ceph_context);
+}
+
+void RGWRequest::log(struct req_state *s, const char *msg) {
+ if (s->info.method && req_str.size() == 0) {
+ req_str = s->info.method;
+ req_str.append(" ");
+ req_str.append(s->info.request_uri);
+ }
+ utime_t t = ceph_clock_now(g_ceph_context) - ts;
+ dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":"
+ << req_str << ":" << (op ? op->name() : "") << ":" << msg
+ << dendl;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_REQUEST_H
+#define RGW_REQUEST_H
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_acl.h"
+#include "rgw_user.h"
+#include "rgw_op.h"
+#include "rgw_fcgi.h"
+
+#include "common/QueueRing.h"
+
+struct RGWRequest
+{
+ uint64_t id;
+ struct req_state *s;
+ string req_str;
+ RGWOp *op;
+ utime_t ts;
+
+ explicit RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {}
+
+ virtual ~RGWRequest() {}
+
+ void init_state(req_state *_s) {
+ s = _s;
+ }
+
+ void log_format(struct req_state *s, const char *fmt, ...);
+ void log_init();
+ void log(struct req_state *s, const char *msg);
+}; /* RGWRequest */
+
+struct RGWFCGXRequest : public RGWRequest {
+ FCGX_Request *fcgx;
+ QueueRing<FCGX_Request *> *qr;
+
+ RGWFCGXRequest(uint64_t req_id, QueueRing<FCGX_Request *> *_qr)
+ : RGWRequest(req_id), qr(_qr) {
+ qr->dequeue(&fcgx);
+ }
+
+ ~RGWFCGXRequest() {
+ FCGX_Finish_r(fcgx);
+ qr->enqueue(fcgx);
+ }
+};
+
+struct RGWLoadGenRequest : public RGWRequest {
+ string method;
+ string resource;
+ int content_length;
+ atomic_t* fail_flag;
+
+RGWLoadGenRequest(uint64_t req_id, const string& _m, const string& _r, int _cl,
+ atomic_t *ff)
+ : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl),
+ fail_flag(ff) {}
+};
+
+#endif /* RGW_REQUEST_H */