]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: refactor rgw_main
authorMatt Benjamin <mbenjamin@redhat.com>
Fri, 8 May 2015 18:07:01 +0000 (20:07 +0200)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 12 Feb 2016 16:54:17 +0000 (11:54 -0500)
This change is an extended version of work by Orit Wasserman
moving some request-processing class headers and definition to
their own files (RGWProcess, RGWProcessEnv, RGWProcessFrontend),
along with process_request.

I have taken the refactoring further, so as to remove all class
declarations and member definitions from rgw_main.cc, so that the
file retains just:

* the responsibility to parse arguments and supervise the daemon
* the responsibility to select from front-ends/configure the RGW service
* usage

I have made small adjustments:
* moving some processing out of headers, to remove circular deps
* removed unused headers from rgw_main.cc
* added editor boilerplate and tweaked include guards in some files

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
src/CMakeLists.txt
src/rgw/Makefile.am
src/rgw/rgw_fcgi.h
src/rgw/rgw_frontend.cc [new file with mode: 0644]
src/rgw/rgw_frontend.h [new file with mode: 0644]
src/rgw/rgw_main.cc
src/rgw/rgw_op.cc
src/rgw/rgw_process.cc [new file with mode: 0644]
src/rgw/rgw_process.h [new file with mode: 0644]
src/rgw/rgw_request.cc [new file with mode: 0644]
src/rgw/rgw_request.h [new file with mode: 0644]

index ff86860390802537e5b99b154561d889e26c7867..39f77a3934f35f56981a59201b85af14e78a1def 100644 (file)
@@ -1166,6 +1166,10 @@ if(${WITH_RADOSGW})
     rgw/rgw_keystone.cc
     rgw/rgw_quota.cc
     rgw/rgw_dencoder.cc
+    rgw/rgw_dencoder.cc
+    rgw/rgw_request.cc
+    rgw/rgw_process.cc
+    rgw/rgw_frontend.cc
     rgw/rgw_object_expirer_core.cc
     rgw/rgw_website.cc
     rgw/rgw_xml_enc.cc)
index 0fcc93df2b6f946a9d73d74211cf0b758367eded..d2bb9ac0e4944edda400260d5c6c190c59e1e629 100644 (file)
@@ -49,8 +49,12 @@ librgw_la_SOURCES =  \
        rgw/rgw_keystone.cc \
        rgw/rgw_quota.cc \
        rgw/rgw_dencoder.cc \
+       rgw/rgw_request.cc \
+       rgw/rgw_process.cc \
+       rgw/rgw_frontend.cc \
        rgw/rgw_object_expirer_core.cc \
        rgw/rgw_website.cc
+
 librgw_la_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
 noinst_LTLIBRARIES += librgw.la
 
@@ -144,6 +148,9 @@ noinst_HEADERS += \
        rgw/rgw_http_errors.h \
        rgw/rgw_log.h \
        rgw/rgw_loadgen.h \
+       rgw/rgw_process.h \
+       rgw/rgw_request.h \
+       rgw/rgw_frontend.h \
        rgw/rgw_multi.h \
        rgw/rgw_policy_s3.h \
        rgw/rgw_gc.h \
index 92171f4780d236bbe08dc59ea8dcdf2403a790b6..9a585c817fb9b532febccd6f997e65dd02547ec0 100644 (file)
@@ -4,8 +4,13 @@
 #ifndef CEPH_RGW_FCGI_H
 #define CEPH_RGW_FCGI_H
 
-#include "rgw_client_io.h"
+#ifdef FASTCGI_INCLUDE_DIR
+# include "fastcgi/fcgiapp.h"
+#else
+# include "fcgiapp.h"
+#endif
 
+#include "rgw_client_io.h"
 
 struct FCGX_Request;
 
diff --git a/src/rgw/rgw_frontend.cc b/src/rgw/rgw_frontend.cc
new file mode 100644 (file)
index 0000000..860a09b
--- /dev/null
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_frontend.h"
+
+#include "include/str_list.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static int civetweb_callback(struct mg_connection *conn) {
+  struct mg_request_info *req_info = mg_get_request_info(conn);
+  RGWProcessEnv *pe = static_cast<RGWProcessEnv *>(req_info->user_data);
+  RGWRados *store = pe->store;
+  RGWREST *rest = pe->rest;
+  OpsLogSocket *olog = pe->olog;
+
+  RGWRequest *req = new RGWRequest(store->get_new_req_id());
+  RGWMongoose client_io(conn, pe->port);
+
+  int ret = process_request(store, rest, req, &client_io, olog);
+  if (ret < 0) {
+    /* we don't really care about return code */
+    dout(20) << "process_request() returned " << ret << dendl;
+  }
+
+  delete req;
+
+// Mark as processed
+  return 1;
+}
+
+int RGWFrontendConfig::parse_config(const string& config,
+                                   map<string, string>& config_map)
+{
+  list<string> config_list;
+  get_str_list(config, " ", config_list);
+
+  list<string>::iterator iter;
+  for (iter = config_list.begin(); iter != config_list.end(); ++iter) {
+    string& entry = *iter;
+    string key;
+    string val;
+
+    if (framework.empty()) {
+      framework = entry;
+      dout(0) << "framework: " << framework << dendl;
+      continue;
+    }
+
+    ssize_t pos = entry.find('=');
+    if (pos < 0) {
+      dout(0) << "framework conf key: " << entry << dendl;
+      config_map[entry] = "";
+      continue;
+    }
+
+    int ret = parse_key_value(entry, key, val);
+    if (ret < 0) {
+      cerr << "ERROR: can't parse " << entry << std::endl;
+      return ret;
+    }
+
+    dout(0) << "framework conf key: " << key << ", val: " << val << dendl;
+    config_map[key] = val;
+  }
+
+  return 0;
+}
+
+bool RGWFrontendConfig::get_val(const string& key, const string& def_val,
+                               string *out)
+{
+ map<string, string>::iterator iter = config_map.find(key);
+ if (iter == config_map.end()) {
+   *out = def_val;
+   return false;
+ }
+
+ *out = iter->second;
+ return true;
+}
+
+bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
+{
+  string str;
+  bool found = get_val(key, "", &str);
+  if (!found) {
+    *out = def_val;
+    return false;
+  }
+  string err;
+  *out = strict_strtol(str.c_str(), 10, &err);
+  if (!err.empty()) {
+    cerr << "error parsing int: " << str << ": " << err << std::endl;
+    return -EINVAL;
+  }
+  return 0;
+}
+
+int RGWMongooseFrontend::run() {
+  char thread_pool_buf[32];
+  snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d",
+          (int)g_conf->rgw_thread_pool_size);
+  string port_str;
+  map<string, string> conf_map = conf->get_config_map();
+  conf->get_val("port", "80", &port_str);
+  conf_map.erase("port");
+  conf_map["listening_ports"] = port_str;
+  set_conf_default(conf_map, "enable_keep_alive", "yes");
+  set_conf_default(conf_map, "num_threads", thread_pool_buf);
+  set_conf_default(conf_map, "decode_url", "no");
+
+  const char *options[conf_map.size() * 2 + 1];
+  int i = 0;
+  for (map<string, string>::iterator iter = conf_map.begin();
+       iter != conf_map.end(); ++iter) {
+    options[i] = iter->first.c_str();
+    options[i + 1] = iter->second.c_str();
+    dout(20)<< "civetweb config: " << options[i] << ": "
+           << (options[i + 1] ? options[i + 1] : "<null>") << dendl;
+    i += 2;
+  }
+  options[i] = NULL;
+
+  struct mg_callbacks cb;
+  memset((void *)&cb, 0, sizeof(cb));
+  cb.begin_request = civetweb_callback;
+  cb.log_message = rgw_civetweb_log_callback;
+  cb.log_access = rgw_civetweb_log_access_callback;
+  ctx = mg_start(&cb, &env, (const char **)&options);
+
+  if (!ctx) {
+    return -EIO;
+  }
+
+  return 0;
+} /* RGWMongooseFrontend::run */
diff --git a/src/rgw/rgw_frontend.h b/src/rgw/rgw_frontend.h
new file mode 100644 (file)
index 0000000..836e1e7
--- /dev/null
@@ -0,0 +1,170 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_FRONTEND_H
+#define RGW_FRONTEND_H
+
+#include "rgw_request.h"
+#include "rgw_process.h"
+
+#include "rgw_civetweb.h"
+#include "rgw_civetweb_log.h"
+#include "civetweb/civetweb.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+class RGWFrontendConfig {
+  string config;
+  map<string, string> config_map;
+  int parse_config(const string& config, map<string, string>& config_map);
+  string framework;
+public:
+  RGWFrontendConfig(const string& _conf) : config(_conf) {}
+  int init() {
+    int ret = parse_config(config, config_map);
+    if (ret < 0)
+      return ret;
+    return 0;
+  }
+  bool get_val(const string& key, const string& def_val, string *out);
+  bool get_val(const string& key, int def_val, int *out);
+
+  map<string, string>& get_config_map() { return config_map; }
+
+  string get_framework() { return framework; }
+};
+
+class RGWFrontend {
+public:
+  virtual ~RGWFrontend() {}
+
+  virtual int init() = 0;
+
+  virtual int run() = 0;
+  virtual void stop() = 0;
+  virtual void join() = 0;
+};
+
+class RGWMongooseFrontend : public RGWFrontend {
+  RGWFrontendConfig* conf;
+  struct mg_context* ctx;
+  RGWProcessEnv env;
+
+  void set_conf_default(map<string, string>& m, const string& key,
+                       const string& def_val) {
+    if (m.find(key) == m.end()) {
+      m[key] = def_val;
+    }
+  }
+
+public:
+  RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+    : conf(_conf), ctx(nullptr), env(pe) {
+  }
+
+  int init() {
+    return 0;
+  }
+
+  int run();
+
+  void stop() {
+    if (ctx) {
+      mg_stop(ctx);
+    }
+  }
+
+  void join() {
+  }
+}; /* RGWMongooseFrontend */
+
+class RGWProcessFrontend : public RGWFrontend {
+protected:
+  RGWFrontendConfig* conf;
+  RGWProcess* pprocess;
+  RGWProcessEnv env;
+  RGWProcessControlThread* thread;
+
+public:
+  RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+    : conf(_conf), pprocess(nullptr), env(pe), thread(nullptr) {
+  }
+
+  ~RGWProcessFrontend() {
+    delete thread;
+    delete pprocess;
+  }
+
+  int run() {
+    assert(pprocess); /* should have initialized by init() */
+    thread = new RGWProcessControlThread(pprocess);
+    thread->create("rgw_frontend");
+    return 0;
+  }
+
+  void stop() {
+    pprocess->close_fd();
+    thread->kill(SIGUSR1);
+  }
+
+  void join() {
+    thread->join();
+  }
+}; /* RGWProcessFrontend */
+
+class RGWFCGXFrontend : public RGWProcessFrontend {
+public:
+  RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig* _conf)
+    : RGWProcessFrontend(pe, _conf) {}
+
+  int init() {
+    pprocess = new RGWFCGXProcess(g_ceph_context, &env,
+                                 g_conf->rgw_thread_pool_size, conf);
+    return 0;
+  }
+}; /* RGWFCGXFrontend */
+
+class RGWLoadGenFrontend : public RGWProcessFrontend {
+public:
+  RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf)
+    : RGWProcessFrontend(pe, _conf) {}
+
+  int init() {
+    int num_threads;
+    conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
+    RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env,
+                                                 num_threads, conf);
+
+    pprocess = pp;
+
+    string uid_str;
+    conf->get_val("uid", "", &uid_str);
+    if (uid_str.empty()) {
+      derr << "ERROR: uid param must be specified for loadgen frontend"
+          << dendl;
+      return EINVAL;
+    }
+
+    rgw_user uid(uid_str);
+
+    RGWUserInfo user_info;
+    int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
+    if (ret < 0) {
+      derr << "ERROR: failed reading user info: uid=" << uid << " ret="
+          << ret << dendl;
+      return ret;
+    }
+
+    map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
+    if (aiter == user_info.access_keys.end()) {
+      derr << "ERROR: user has no S3 access keys set" << dendl;
+      return -EINVAL;
+    }
+
+    pp->set_access_key(aiter->second);
+
+    return 0;
+  }
+}; /* RGWLoadGenFrontend */
+
+#endif /* RGW_FRONTEND_H */
index 6666ef267a30a0f4c72c01ce80b6143e370d116d..6162baf92e13e04d96acc592acce22214561e35d 100644 (file)
 #include <curl/curl.h>
 
 #include "acconfig.h"
-#ifdef FASTCGI_INCLUDE_DIR
-# include "fastcgi/fcgiapp.h"
-#else
-# include "fcgiapp.h"
-#endif
-
-#include "rgw_fcgi.h"
 
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
 #include "global/signal_handler.h"
 #include "common/config.h"
 #include "common/errno.h"
-#include "common/WorkQueue.h"
 #include "common/Timer.h"
-#include "common/Throttle.h"
-#include "common/QueueRing.h"
 #include "common/safe_io.h"
 #include "include/compat.h"
 #include "include/str_list.h"
 #include "rgw_common.h"
 #include "rgw_rados.h"
-#include "rgw_acl.h"
 #include "rgw_user.h"
-#include "rgw_op.h"
 #include "rgw_rest.h"
 #include "rgw_rest_s3.h"
 #include "rgw_rest_swift.h"
 #include "rgw_log.h"
 #include "rgw_tools.h"
 #include "rgw_resolve.h"
-#include "rgw_loadgen.h"
-#include "rgw_civetweb.h"
-#include "rgw_civetweb_log.h"
 
-#include "civetweb/civetweb.h"
+#include "rgw_request.h"
+#include "rgw_process.h"
+#include "rgw_frontend.h"
 
 #include <map>
 #include <string>
 #include <vector>
-#include <iostream>
-#include <sstream>
 
 #include "include/types.h"
 #include "common/BackTrace.h"
@@ -83,409 +68,15 @@ class RGWProcess;
 static int signal_fd[2] = {0, 0};
 static atomic_t disable_signal_fd;
 
-static void signal_shutdown();
-
-
-#define SOCKET_BACKLOG 1024
-
-struct RGWRequest
-{
-  uint64_t id;
-  struct req_state *s;
-  string req_str;
-  RGWOp *op;
-  utime_t ts;
-
-  explicit RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {
-  }
-
-  virtual ~RGWRequest() {}
-
-  void init_state(req_state *_s) {
-    s = _s;
-  }
-
-  void log_format(struct req_state *s, const char *fmt, ...)
-  {
-#define LARGE_SIZE 1024
-    char buf[LARGE_SIZE];
-    va_list ap;
-
-    va_start(ap, fmt);
-    vsnprintf(buf, sizeof(buf), fmt, ap);
-    va_end(ap);
-
-    log(s, buf);
-  }
-
-  void log_init() {
-    ts = ceph_clock_now(g_ceph_context);
-  }
-
-  void log(struct req_state *s, const char *msg) {
-    if (s->info.method && req_str.size() == 0) {
-      req_str = s->info.method;
-      req_str.append(" ");
-      req_str.append(s->info.request_uri);
-    }
-    utime_t t = ceph_clock_now(g_ceph_context) - ts;
-    dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":" << req_str << ":" << (op ? op->name() : "") << ":" << msg << dendl;
-  }
-};
-
-class RGWFrontendConfig {
-  string config;
-  map<string, string> config_map;
-  int parse_config(const string& config, map<string, string>& config_map);
-  string framework;
-public:
-  explicit RGWFrontendConfig(const string& _conf) : config(_conf) {}
-  int init() {
-    int ret = parse_config(config, config_map);
-    if (ret < 0)
-      return ret;
-    return 0;
-  }
-  bool get_val(const string& key, const string& def_val, string *out);
-  bool get_val(const string& key, int def_val, int *out);
-
-  map<string, string>& get_config_map() { return config_map; }
-
-  string get_framework() { return framework; }
-};
-
-
-struct RGWFCGXRequest : public RGWRequest {
-  FCGX_Request *fcgx;
-  QueueRing<FCGX_Request *> *qr;
-
-  RGWFCGXRequest(uint64_t req_id, QueueRing<FCGX_Request *> *_qr) : RGWRequest(req_id), qr(_qr) {
-    qr->dequeue(&fcgx);
-  }
-
-  ~RGWFCGXRequest() {
-    FCGX_Finish_r(fcgx);
-    qr->enqueue(fcgx);
-  }
-};
-
-struct RGWProcessEnv {
-  RGWRados *store;
-  RGWREST *rest;
-  OpsLogSocket *olog;
-  int port;
-};
-
-class RGWProcess {
-  deque<RGWRequest *> m_req_queue;
-protected:
-  RGWRados *store;
-  OpsLogSocket *olog;
-  ThreadPool m_tp;
-  Throttle req_throttle;
-  RGWREST *rest;
-  RGWFrontendConfig *conf;
-  int sock_fd;
-
-  struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
-    RGWProcess *process;
-    RGWWQ(RGWProcess *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
-      : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout, tp), process(p) {}
-
-    bool _enqueue(RGWRequest *req) {
-      process->m_req_queue.push_back(req);
-      perfcounter->inc(l_rgw_qlen);
-      dout(20) << "enqueued request req=" << hex << req << dec << dendl;
-      _dump_queue();
-      return true;
-    }
-    void _dequeue(RGWRequest *req) {
-      assert(0);
-    }
-    bool _empty() {
-      return process->m_req_queue.empty();
-    }
-    RGWRequest *_dequeue() {
-      if (process->m_req_queue.empty())
-       return NULL;
-      RGWRequest *req = process->m_req_queue.front();
-      process->m_req_queue.pop_front();
-      dout(20) << "dequeued request req=" << hex << req << dec << dendl;
-      _dump_queue();
-      perfcounter->inc(l_rgw_qlen, -1);
-      return req;
-    }
-    void _process(RGWRequest *req, ThreadPool::TPHandle &) override {
-      perfcounter->inc(l_rgw_qactive);
-      process->handle_request(req);
-      process->req_throttle.put(1);
-      perfcounter->inc(l_rgw_qactive, -1);
-    }
-    void _dump_queue() {
-      if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
-        return;
-      }
-      deque<RGWRequest *>::iterator iter;
-      if (process->m_req_queue.empty()) {
-        dout(20) << "RGWWQ: empty" << dendl;
-        return;
-      }
-      dout(20) << "RGWWQ:" << dendl;
-      for (iter = process->m_req_queue.begin(); iter != process->m_req_queue.end(); ++iter) {
-        dout(20) << "req: " << hex << *iter << dec << dendl;
-      }
-    }
-    void _clear() {
-      assert(process->m_req_queue.empty());
-    }
-  } req_wq;
-
-public:
-  RGWProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf)
-    : store(pe->store), olog(pe->olog), m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
-      req_throttle(cct, "rgw_ops", num_threads * 2),
-      rest(pe->rest),
-      conf(_conf),
-      sock_fd(-1),
-      req_wq(this, g_conf->rgw_op_thread_timeout,
-            g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
-  virtual ~RGWProcess() {}
-  virtual void run() = 0;
-  virtual void handle_request(RGWRequest *req) = 0;
-
-  void close_fd() {
-    if (sock_fd >= 0) {
-      ::close(sock_fd);
-      sock_fd = -1;
-    }
-  }
-};
-
-
-class RGWFCGXProcess : public RGWProcess {
-  int max_connections;
-public:
-  RGWFCGXProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
-    RGWProcess(cct, pe, num_threads, _conf),
-    max_connections(num_threads + (num_threads >> 3)) /* have a bit more connections than threads so that requests
-                                                       are still accepted even if we're still processing older requests */
-    {}
-  void run();
-  void handle_request(RGWRequest *req);
-};
-
-void RGWFCGXProcess::run()
-{
-  string socket_path;
-  string socket_port;
-  string socket_host;
-
-  conf->get_val("socket_path", "", &socket_path);
-  conf->get_val("socket_port", g_conf->rgw_port, &socket_port);
-  conf->get_val("socket_host", g_conf->rgw_host, &socket_host);
-
-  if (socket_path.empty() && socket_port.empty() && socket_host.empty()) {
-    socket_path = g_conf->rgw_socket_path;
-    if (socket_path.empty()) {
-      dout(0) << "ERROR: no socket server point defined, cannot start fcgi frontend" << dendl;
-      return;
-    }
-  }
-
-  if (!socket_path.empty()) {
-    string path_str = socket_path;
-
-    /* this is necessary, as FCGX_OpenSocket might not return an error, but rather ungracefully exit */
-    int fd = open(path_str.c_str(), O_CREAT, 0644);
-    if (fd < 0) {
-      int err = errno;
-      /* ENXIO is actually expected, we'll get that if we try to open a unix domain socket */
-      if (err != ENXIO) {
-        dout(0) << "ERROR: cannot create socket: path=" << path_str << " error=" << cpp_strerror(err) << dendl;
-        return;
-      }
-    } else {
-      close(fd);
-    }
-
-    const char *path = path_str.c_str();
-    sock_fd = FCGX_OpenSocket(path, SOCKET_BACKLOG);
-    if (sock_fd < 0) {
-      dout(0) << "ERROR: FCGX_OpenSocket (" << path << ") returned " << sock_fd << dendl;
-      return;
-    }
-    if (chmod(path, 0777) < 0) {
-      dout(0) << "WARNING: couldn't set permissions on unix domain socket" << dendl;
-    }
-  } else if (!socket_port.empty()) {
-    string bind = socket_host + ":" + socket_port;
-    sock_fd = FCGX_OpenSocket(bind.c_str(), SOCKET_BACKLOG);
-    if (sock_fd < 0) {
-      dout(0) << "ERROR: FCGX_OpenSocket (" << bind.c_str() << ") returned " << sock_fd << dendl;
-      return;
-    }
-  }
-
-  m_tp.start();
-
-  FCGX_Request fcgx_reqs[max_connections];
-
-  QueueRing<FCGX_Request *> qr(max_connections);
-  for (int i = 0; i < max_connections; i++) {
-    FCGX_Request *fcgx = &fcgx_reqs[i];
-    FCGX_InitRequest(fcgx, sock_fd, 0);
-    qr.enqueue(fcgx);
-  }
-
-  for (;;) {
-    RGWFCGXRequest *req = new RGWFCGXRequest(store->get_new_req_id(), &qr);
-    dout(10) << "allocated request req=" << hex << req << dec << dendl;
-    req_throttle.get(1);
-    int ret = FCGX_Accept_r(req->fcgx);
-    if (ret < 0) {
-      delete req;
-      dout(0) << "ERROR: FCGX_Accept_r returned " << ret << dendl;
-      req_throttle.put(1);
-      break;
-    }
-
-    req_wq.queue(req);
-  }
-
-  m_tp.drain(&req_wq);
-  m_tp.stop();
-
-  dout(20) << "cleaning up fcgx connections" << dendl;
-
-  for (int i = 0; i < max_connections; i++) {
-    FCGX_Finish_r(&fcgx_reqs[i]);
-  }
-}
-
-struct RGWLoadGenRequest : public RGWRequest {
-  string method;
-  string resource;
-  int content_length;
-  atomic_t *fail_flag;
-
-
-  RGWLoadGenRequest(uint64_t req_id, const string& _m, const  string& _r, int _cl,
-                    atomic_t *ff) : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl), fail_flag(ff) {}
-};
-
-class RGWLoadGenProcess : public RGWProcess {
-  RGWAccessKey access_key;
-public:
-  RGWLoadGenProcess(CephContext *cct, RGWProcessEnv *pe, int num_threads, RGWFrontendConfig *_conf) :
-    RGWProcess(cct, pe, num_threads, _conf) {}
-  void run();
-  void checkpoint();
-  void handle_request(RGWRequest *req);
-  void gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag);
-
-  void set_access_key(RGWAccessKey& key) { access_key = key; }
-};
-
-void RGWLoadGenProcess::checkpoint()
-{
-  m_tp.drain(&req_wq);
-}
-
-void RGWLoadGenProcess::run()
-{
-  m_tp.start(); /* start thread pool */
-
-  int i;
-
-  int num_objs;
-
-  conf->get_val("num_objs", 1000, &num_objs);
-
-  int num_buckets;
-  conf->get_val("num_buckets", 1, &num_buckets);
-
-  vector<string> buckets(num_buckets);
-
-  atomic_t failed;
-
-  for (i = 0; i < num_buckets; i++) {
-    buckets[i] = "/loadgen";
-    string& bucket = buckets[i];
-    append_rand_alpha(NULL, bucket, bucket, 16);
-
-    /* first create a bucket */
-    gen_request("PUT", bucket, 0, &failed);
-    checkpoint();
-  }
-
-  string *objs = new string[num_objs];
-
-  if (failed.read()) {
-    derr << "ERROR: bucket creation failed" << dendl;
-    goto done;
-  }
-
-  for (i = 0; i < num_objs; i++) {
-    char buf[16 + 1];
-    gen_rand_alphanumeric(NULL, buf, sizeof(buf));
-    buf[16] = '\0';
-    objs[i] = buckets[i % num_buckets] + "/" + buf;
-  }
-
-  for (i = 0; i < num_objs; i++) {
-    gen_request("PUT", objs[i], 4096, &failed);
-  }
-
-  checkpoint();
-
-  if (failed.read()) {
-    derr << "ERROR: bucket creation failed" << dendl;
-    goto done;
-  }
-
-  for (i = 0; i < num_objs; i++) {
-    gen_request("GET", objs[i], 4096, NULL);
-  }
-
-  checkpoint();
-
-  for (i = 0; i < num_objs; i++) {
-    gen_request("DELETE", objs[i], 0, NULL);
-  }
-
-  checkpoint();
-
-  for (i = 0; i < num_buckets; i++) {
-    gen_request("DELETE", buckets[i], 0, NULL);
-  }
-
-done:
-  checkpoint();
-
-  m_tp.stop();
-
-  delete[] objs;
-
-  signal_shutdown();
-}
-
-void RGWLoadGenProcess::gen_request(const string& method, const string& resource, int content_length, atomic_t *fail_flag)
-{
-  RGWLoadGenRequest *req = new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
-                                                content_length, fail_flag);
-  dout(10) << "allocated request req=" << hex << req << dec << dendl;
-  req_throttle.get(1);
-  req_wq.queue(req);
-}
-
-static void signal_shutdown()
+void signal_shutdown()
 {
   if (!disable_signal_fd.read()) {
     int val = 0;
     int ret = write(signal_fd[0], (char *)&val, sizeof(val));
     if (ret < 0) {
       int err = -errno;
-      derr << "ERROR: " << __func__ << ": write() returned " << cpp_strerror(-err) << dendl;
+      derr << "ERROR: " << __func__ << ": write() returned "
+          << cpp_strerror(-err) << dendl;
     }
   }
 }
@@ -535,246 +126,6 @@ static void godown_alarm(int signum)
   _exit(0);
 }
 
-static int process_request(RGWRados *store, RGWREST *rest, RGWRequest *req, RGWClientIO *client_io, OpsLogSocket *olog)
-{
-  int ret = 0;
-
-  client_io->init(g_ceph_context);
-
-  req->log_init();
-
-  dout(1) << "====== starting new request req=" << hex << req << dec << " =====" << dendl;
-  perfcounter->inc(l_rgw_req);
-
-  RGWEnv& rgw_env = client_io->get_env();
-
-  struct req_state rstate(g_ceph_context, &rgw_env);
-
-  struct req_state *s = &rstate;
-
-  RGWObjectCtx rados_ctx(store, s);
-  s->obj_ctx = &rados_ctx;
-
-  s->req_id = store->unique_id(req->id);
-  s->trans_id = store->unique_trans_id(req->id);
-  s->host_id = store->host_id;
-
-  req->log_format(s, "initializing for trans_id = %s", s->trans_id.c_str());
-
-  RGWOp *op = NULL;
-  int init_error = 0;
-  bool should_log = false;
-  RGWRESTMgr *mgr;
-  RGWHandler *handler = rest->get_handler(store, s, client_io, &mgr, &init_error);
-  if (init_error != 0) {
-    abort_early(s, NULL, init_error, NULL);
-    goto done;
-  }
-  dout(10) << "handler=" << typeid(*handler).name() << dendl;
-
-  should_log = mgr->get_logging();
-
-  req->log_format(s, "getting op %d", s->op);
-  op = handler->get_op(store);
-  if (!op) {
-    abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler);
-    goto done;
-  }
-  req->op = op;
-  dout(10) << "op=" << typeid(*op).name() << dendl;
-
-  req->log(s, "authorizing");
-  ret = handler->authorize();
-  if (ret < 0) {
-    dout(10) << "failed to authorize request" << dendl;
-    abort_early(s, NULL, ret, handler);
-    goto done;
-  }
-
-  req->log(s, "normalizing buckets and tenants");
-  ret = handler->postauth_init();
-  if (ret < 0) {
-    dout(10) << "failed to run post-auth init" << dendl;
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  if (s->user.suspended) {
-    dout(10) << "user is suspended, uid=" << s->user.user_id << dendl;
-    abort_early(s, op, -ERR_USER_SUSPENDED, handler);
-    goto done;
-  }
-
-  req->log(s, "init permissions");
-  ret = handler->init_permissions(op);
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  /**
-   * Only some accesses support website mode, and website mode does NOT apply
-   * if you are using the REST endpoint either (ergo, no authenticated access)
-   */
-  req->log(s, "recalculating target");
-  ret = handler->retarget(op, &op);
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-  req->op = op;
-
-  req->log(s, "reading permissions");
-  ret = handler->read_permissions(op);
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  req->log(s, "init op");
-  ret = op->init_processing();
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  req->log(s, "verifying op mask");
-  ret = op->verify_op_mask();
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  req->log(s, "verifying op permissions");
-  ret = op->verify_permission();
-  if (ret < 0) {
-    if (s->system_request) {
-      dout(2) << "overriding permissions due to system operation" << dendl;
-    } else {
-      abort_early(s, op, ret, handler);
-      goto done;
-    }
-  }
-
-  req->log(s, "verifying op params");
-  ret = op->verify_params();
-  if (ret < 0) {
-    abort_early(s, op, ret, handler);
-    goto done;
-  }
-
-  req->log(s, "pre-executing");
-  op->pre_exec();
-
-  req->log(s, "executing");
-  op->execute();
-
-  req->log(s, "completing");
-  op->complete();
-done:
-  int r = client_io->complete_request();
-  if (r < 0) {
-    dout(0) << "ERROR: client_io->complete_request() returned " << r << dendl;
-  }
-  if (should_log) {
-    rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
-  }
-
-  int http_ret = s->err.http_ret;
-
-  int op_ret = 0;
-  if (op) {
-    op_ret = op->get_ret();
-  }
-
-  req->log_format(s, "op status=%d", op_ret);
-  req->log_format(s, "http status=%d", http_ret);
-
-  if (handler)
-    handler->put_op(op);
-  rest->put_handler(handler);
-
-  dout(1) << "====== req done req=" << hex << req << dec
-         << " op status=" << op_ret
-         << " http_status=" << http_ret
-         << " ======"
-         << dendl;
-
-  return (ret < 0 ? ret : s->err.ret);
-}
-
-void RGWFCGXProcess::handle_request(RGWRequest *r)
-{
-  RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
-  FCGX_Request *fcgx = req->fcgx;
-  RGWFCGX client_io(fcgx);
-
-  int ret = process_request(store, rest, req, &client_io, olog);
-  if (ret < 0) {
-    /* we don't really care about return code */
-    dout(20) << "process_request() returned " << ret << dendl;
-  }
-
-  FCGX_Finish_r(fcgx);
-
-  delete req;
-}
-
-void RGWLoadGenProcess::handle_request(RGWRequest *r)
-{
-  RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
-
-  RGWLoadGenRequestEnv env;
-
-  utime_t tm = ceph_clock_now(NULL);
-
-  env.port = 80;
-  env.content_length = req->content_length;
-  env.content_type = "binary/octet-stream";
-  env.request_method = req->method;
-  env.uri = req->resource;
-  env.set_date(tm);
-  env.sign(access_key);
-
-  RGWLoadGenIO client_io(&env);
-
-  int ret = process_request(store, rest, req, &client_io, olog);
-  if (ret < 0) {
-    /* we don't really care about return code */
-    dout(20) << "process_request() returned " << ret << dendl;
-
-    if (req->fail_flag) {
-      req->fail_flag->inc();
-    }
-  }
-
-  delete req;
-}
-
-
-static int civetweb_callback(struct mg_connection *conn) {
-  struct mg_request_info *req_info = mg_get_request_info(conn);
-  RGWProcessEnv *pe = static_cast<RGWProcessEnv *>(req_info->user_data);
-  RGWRados *store = pe->store;
-  RGWREST *rest = pe->rest;
-  OpsLogSocket *olog = pe->olog;
-
-  RGWRequest *req = new RGWRequest(store->get_new_req_id());
-  RGWMongoose client_io(conn, pe->port);
-
-  int ret = process_request(store, rest, req, &client_io, olog);
-  if (ret < 0) {
-    /* we don't really care about return code */
-    dout(20) << "process_request() returned " << ret << dendl;
-  }
-
-  delete req;
-
-// Mark as processed
-  return 1;
-}
-
 #ifdef HAVE_CURL_MULTI_WAIT
 static void check_curl()
 {
@@ -818,244 +169,6 @@ static RGWRESTMgr *set_logging(RGWRESTMgr *mgr)
   return mgr;
 }
 
-
-int RGWFrontendConfig::parse_config(const string& config, map<string, string>& config_map)
-{
-  list<string> config_list;
-  get_str_list(config, " ", config_list);
-
-  list<string>::iterator iter;
-  for (iter = config_list.begin(); iter != config_list.end(); ++iter) {
-    string& entry = *iter;
-    string key;
-    string val;
-
-    if (framework.empty()) {
-      framework = entry;
-      dout(0) << "framework: " << framework << dendl;
-      continue;
-    }
-
-    ssize_t pos = entry.find('=');
-    if (pos < 0) {
-      dout(0) << "framework conf key: " << entry << dendl;
-      config_map[entry] = "";
-      continue;
-    }
-
-    int ret = parse_key_value(entry, key, val);
-    if (ret < 0) {
-      cerr << "ERROR: can't parse " << entry << std::endl;
-      return ret;
-    }
-
-    dout(0) << "framework conf key: " << key << ", val: " << val << dendl;
-    config_map[key] = val;
-  }
-
-  return 0;
-}
-
-
-bool RGWFrontendConfig::get_val(const string& key, const string& def_val, string *out)
-{
- map<string, string>::iterator iter = config_map.find(key);
- if (iter == config_map.end()) {
-   *out = def_val;
-   return false;
- }
-
- *out = iter->second;
- return true;
-}
-
-
-bool RGWFrontendConfig::get_val(const string& key, int def_val, int *out)
-{
-  string str;
-  bool found = get_val(key, "", &str);
-  if (!found) {
-    *out = def_val;
-    return false;
-  }
-  string err;
-  *out = strict_strtol(str.c_str(), 10, &err);
-  if (!err.empty()) {
-    cerr << "error parsing int: " << str << ": " << err << std::endl;
-    return -EINVAL;
-  }
-  return 0;
-}
-
-class RGWFrontend {
-public:
-  virtual ~RGWFrontend() {}
-
-  virtual int init() = 0;
-
-  virtual int run() = 0;
-  virtual void stop() = 0;
-  virtual void join() = 0;
-};
-
-class RGWProcessControlThread : public Thread {
-  RGWProcess *pprocess;
-public:
-  explicit RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
-
-  void *entry() {
-    pprocess->run();
-    return NULL;
-  }
-};
-
-class RGWProcessFrontend : public RGWFrontend {
-protected:
-  RGWFrontendConfig *conf;
-  RGWProcess *pprocess;
-  RGWProcessEnv env;
-  RGWProcessControlThread *thread;
-
-public:
-  RGWProcessFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), pprocess(NULL), env(pe), thread(NULL) {
-  }
-
-  ~RGWProcessFrontend() {
-    delete thread;
-    delete pprocess;
-  }
-
-  int run() {
-    assert(pprocess); /* should have initialized by init() */
-    thread = new RGWProcessControlThread(pprocess);
-    thread->create("rgw_frontend");
-    return 0;
-  }
-
-  void stop() {
-    pprocess->close_fd();
-    thread->kill(SIGUSR1);
-  }
-
-  void join() {
-    thread->join();
-  }
-};
-
-class RGWFCGXFrontend : public RGWProcessFrontend {
-public:
-  RGWFCGXFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
-
-  int init() {
-    pprocess = new RGWFCGXProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size, conf);
-    return 0;
-  }
-};
-
-class RGWLoadGenFrontend : public RGWProcessFrontend {
-public:
-  RGWLoadGenFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : RGWProcessFrontend(pe, _conf) {}
-
-  int init() {
-    int num_threads;
-    conf->get_val("num_threads", g_conf->rgw_thread_pool_size, &num_threads);
-    RGWLoadGenProcess *pp = new RGWLoadGenProcess(g_ceph_context, &env, num_threads, conf);
-
-    pprocess = pp;
-
-    string uid_str;
-    conf->get_val("uid", "", &uid_str);
-    if (uid_str.empty()) {
-      derr << "ERROR: uid param must be specified for loadgen frontend" << dendl;
-      return EINVAL;
-    }
-
-    rgw_user uid(uid_str);
-
-    RGWUserInfo user_info;
-    int ret = rgw_get_user_info_by_uid(env.store, uid, user_info, NULL);
-    if (ret < 0) {
-      derr << "ERROR: failed reading user info: uid=" << uid << " ret=" << ret << dendl;
-      return ret;
-    }
-
-    map<string, RGWAccessKey>::iterator aiter = user_info.access_keys.begin();
-    if (aiter == user_info.access_keys.end()) {
-      derr << "ERROR: user has no S3 access keys set" << dendl;
-      return -EINVAL;
-    }
-
-    pp->set_access_key(aiter->second);
-
-    return 0;
-  }
-};
-
-class RGWMongooseFrontend : public RGWFrontend {
-  RGWFrontendConfig *conf;
-  struct mg_context *ctx;
-  RGWProcessEnv env;
-
-  void set_conf_default(map<string, string>& m, const string& key, const string& def_val) {
-    if (m.find(key) == m.end()) {
-      m[key] = def_val;
-    }
-  }
-
-public:
-  RGWMongooseFrontend(RGWProcessEnv& pe, RGWFrontendConfig *_conf) : conf(_conf), ctx(NULL), env(pe) {
-  }
-
-  int init() {
-    return 0;
-  }
-
-  int run() {
-    char thread_pool_buf[32];
-    snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
-    string port_str;
-    map<string, string> conf_map = conf->get_config_map();
-    conf->get_val("port", "80", &port_str);
-    conf_map.erase("port");
-    conf_map["listening_ports"] = port_str;
-    set_conf_default(conf_map, "enable_keep_alive", "yes");
-    set_conf_default(conf_map, "num_threads", thread_pool_buf);
-    set_conf_default(conf_map, "decode_url", "no");
-
-    const char *options[conf_map.size() * 2 + 1];
-    int i = 0;
-    for (map<string, string>::iterator iter = conf_map.begin(); iter != conf_map.end(); ++iter) {
-      options[i] = iter->first.c_str();
-      options[i + 1] = iter->second.c_str();
-      dout(20)<< "civetweb config: " << options[i] << ": " << (options[i + 1] ? options[i + 1] : "<null>") << dendl;
-      i += 2;
-    }
-    options[i] = NULL;
-
-    struct mg_callbacks cb;
-    memset((void *)&cb, 0, sizeof(cb));
-    cb.begin_request = civetweb_callback;
-    cb.log_message = rgw_civetweb_log_callback;
-    cb.log_access = rgw_civetweb_log_access_callback;
-    ctx = mg_start(&cb, &env, (const char **)&options);
-
-    if (!ctx) {
-      return -EIO;
-    }
-
-    return 0;
-  }
-
-  void stop() {
-    if (ctx) {
-      mg_stop(ctx);
-    }
-  }
-
-  void join() {
-  }
-};
-
 /*
  * start up the RADOS connection and then handle HTTP messages as they come in
  */
@@ -1162,11 +275,13 @@ int main(int argc, const char **argv)
   if (apis_map.count("swift") > 0) {
     do_swift = true;
     swift_init(g_ceph_context);
-    rest.register_resource(g_conf->rgw_swift_url_prefix, set_logging(new RGWRESTMgr_SWIFT));
+    rest.register_resource(g_conf->rgw_swift_url_prefix,
+                          set_logging(new RGWRESTMgr_SWIFT));
   }
 
   if (apis_map.count("swift_auth") > 0)
-    rest.register_resource(g_conf->rgw_swift_auth_entry, set_logging(new RGWRESTMgr_SWIFT_Auth));
+    rest.register_resource(g_conf->rgw_swift_auth_entry,
+                          set_logging(new RGWRESTMgr_SWIFT_Auth));
 
   if (apis_map.count("admin") > 0) {
     RGWRESTMgr_Admin *admin_resource = new RGWRESTMgr_Admin;
@@ -1211,7 +326,8 @@ int main(int argc, const char **argv)
   if (frontends.empty()) {
     frontends.push_back("fastcgi");
   }
-  for (list<string>::iterator iter = frontends.begin(); iter != frontends.end(); ++iter) {
+  for (list<string>::iterator iter = frontends.begin();
+       iter != frontends.end(); ++iter) {
     string& f = *iter;
 
     if (f.find("civetweb") != string::npos) {
@@ -1239,7 +355,8 @@ int main(int argc, const char **argv)
 
   list<RGWFrontend *> fes;
 
-  for (multimap<string, RGWFrontendConfig *>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
+  for (multimap<string, RGWFrontendConfig *>::iterator fiter = fe_map.begin();
+       fiter != fe_map.end(); ++fiter) {
     RGWFrontendConfig *config = fiter->second;
     string framework = config->get_framework();
     RGWFrontend *fe;
@@ -1280,18 +397,21 @@ int main(int argc, const char **argv)
 
   derr << "shutting down" << dendl;
 
-  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end();
+       ++liter) {
     RGWFrontend *fe = *liter;
     fe->stop();
   }
 
-  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end();
+       ++liter) {
     RGWFrontend *fe = *liter;
     fe->join();
     delete fe;
   }
 
-  for (list<RGWFrontendConfig *>::iterator liter = configs.begin(); liter != configs.end(); ++liter) {
+  for (list<RGWFrontendConfig *>::iterator liter = configs.begin();
+       liter != configs.end(); ++liter) {
     RGWFrontendConfig *fec = *liter;
     delete fec;
   }
@@ -1325,4 +445,3 @@ int main(int argc, const char **argv)
 
   return 0;
 }
-
index e5a6af131be7735e8b56a985a5be42c6bb9e8c50..2ceebaa8059fa3f69461b271b0d58881fe03b13e 100644 (file)
@@ -1878,7 +1878,7 @@ void RGWDeleteBucket::execute()
     }
   }
 
-  op_ret = rgw_bucket_sync_user_stats(store, s->user.user_id, s->bucket);
+  op_ret = rgw_bucket_sync_user_stats(store, s->user->user_id, s->bucket);
   if ( op_ret < 0) {
      ldout(s->cct, 1) << "WARNING: failed to sync user stats before bucket delete: op_ret= " << op_ret << dendl;
   }
diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc
new file mode 100644 (file)
index 0000000..75d1162
--- /dev/null
@@ -0,0 +1,460 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "common/errno.h"
+#include "common/Throttle.h"
+#include "common/WorkQueue.h"
+
+#include "rgw_rados.h"
+#include "rgw_rest.h"
+#include "rgw_frontend.h"
+#include "rgw_request.h"
+#include "rgw_process.h"
+#include "rgw_loadgen.h"
+#include "rgw_client_io.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+void RGWProcess::RGWWQ::_dump_queue()
+{
+  if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+    return;
+  }
+  deque<RGWRequest *>::iterator iter;
+  if (process->m_req_queue.empty()) {
+    dout(20) << "RGWWQ: empty" << dendl;
+    return;
+  }
+  dout(20) << "RGWWQ:" << dendl;
+  for (iter = process->m_req_queue.begin();
+       iter != process->m_req_queue.end(); ++iter) {
+    dout(20) << "req: " << hex << *iter << dec << dendl;
+  }
+} /* RGWProcess::RGWWQ::_dump_queue */
+
+void RGWFCGXProcess::run()
+{
+  string socket_path;
+  string socket_port;
+  string socket_host;
+
+  conf->get_val("socket_path", "", &socket_path);
+  conf->get_val("socket_port", g_conf->rgw_port, &socket_port);
+  conf->get_val("socket_host", g_conf->rgw_host, &socket_host);
+
+  if (socket_path.empty() && socket_port.empty() && socket_host.empty()) {
+    socket_path = g_conf->rgw_socket_path;
+    if (socket_path.empty()) {
+      dout(0) << "ERROR: no socket server point defined, cannot "
+       "start fcgi frontend" << dendl;
+      return;
+    }
+  }
+
+  if (!socket_path.empty()) {
+    string path_str = socket_path;
+
+    /* this is necessary, as FCGX_OpenSocket might not return an
+        * error, but rather ungracefully exit */
+    int fd = open(path_str.c_str(), O_CREAT, 0644);
+    if (fd < 0) {
+      int err = errno;
+      /* ENXIO is actually expected, we'll get that if we try to open
+          * a unix domain socket */
+      if (err != ENXIO) {
+                 dout(0) << "ERROR: cannot create socket: path=" << path_str
+                                 << " error=" << cpp_strerror(err) << dendl;
+                 return;
+      }
+    } else {
+               close(fd);
+    }
+
+    const char *path = path_str.c_str();
+    sock_fd = FCGX_OpenSocket(path, SOCKET_BACKLOG);
+    if (sock_fd < 0) {
+      dout(0) << "ERROR: FCGX_OpenSocket (" << path << ") returned "
+             << sock_fd << dendl;
+      return;
+    }
+    if (chmod(path, 0777) < 0) {
+      dout(0) << "WARNING: couldn't set permissions on unix domain socket"
+             << dendl;
+    }
+  } else if (!socket_port.empty()) {
+    string bind = socket_host + ":" + socket_port;
+    sock_fd = FCGX_OpenSocket(bind.c_str(), SOCKET_BACKLOG);
+    if (sock_fd < 0) {
+      dout(0) << "ERROR: FCGX_OpenSocket (" << bind.c_str() << ") returned "
+             << sock_fd << dendl;
+      return;
+    }
+  }
+
+  m_tp.start();
+
+  FCGX_Request fcgx_reqs[max_connections];
+
+  QueueRing<FCGX_Request*> qr(max_connections);
+  for (int i = 0; i < max_connections; i++) {
+    FCGX_Request* fcgx = &fcgx_reqs[i];
+    FCGX_InitRequest(fcgx, sock_fd, 0);
+    qr.enqueue(fcgx);
+  }
+
+  for (;;) {
+    RGWFCGXRequest* req = new RGWFCGXRequest(store->get_new_req_id(), &qr);
+    dout(10) << "allocated request req=" << hex << req << dec << dendl;
+    req_throttle.get(1);
+    int ret = FCGX_Accept_r(req->fcgx);
+    if (ret < 0) {
+      delete req;
+      dout(0) << "ERROR: FCGX_Accept_r returned " << ret << dendl;
+      req_throttle.put(1);
+      break;
+    }
+    req_wq.queue(req);
+  }
+
+  m_tp.drain(&req_wq);
+  m_tp.stop();
+
+  dout(20) << "cleaning up fcgx connections" << dendl;
+
+  for (int i = 0; i < max_connections; i++) {
+    FCGX_Finish_r(&fcgx_reqs[i]);
+  }
+} /* RGWFCGXProcess::run */
+
+void RGWFCGXProcess::handle_request(RGWRequest *r)
+{
+  RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
+  FCGX_Request *fcgx = req->fcgx;
+  RGWFCGX client_io(fcgx);
+
+  int ret = process_request(store, rest, req, &client_io, olog);
+  if (ret < 0) {
+    /* we don't really care about return code */
+    dout(20) << "process_request() returned " << ret << dendl;
+  }
+
+  FCGX_Finish_r(fcgx);
+
+  delete req;
+}
+
+void RGWFCGXProcess::handle_request(RGWRequest *r)
+{
+  RGWFCGXRequest *req = static_cast<RGWFCGXRequest *>(r);
+  FCGX_Request *fcgx = req->fcgx;
+  RGWFCGX client_io(fcgx);
+
+  int ret = process_request(store, rest, req, &client_io, olog);
+  if (ret < 0) {
+    /* we don't really care about return code */
+    dout(20) << "process_request() returned " << ret << dendl;
+  }
+
+  FCGX_Finish_r(fcgx);
+
+  delete req;
+} /* RGWFCGXProcess::handle_request */
+
+void RGWLoadGenProcess::checkpoint()
+{
+  m_tp.drain(&req_wq);
+}
+
+void RGWLoadGenProcess::run()
+{
+  m_tp.start(); /* start thread pool */
+
+  int i;
+
+  int num_objs;
+
+  conf->get_val("num_objs", 1000, &num_objs);
+
+  int num_buckets;
+  conf->get_val("num_buckets", 1, &num_buckets);
+
+  vector<string> buckets(num_buckets);
+
+  atomic_t failed;
+
+  for (i = 0; i < num_buckets; i++) {
+    buckets[i] = "/loadgen";
+    string& bucket = buckets[i];
+    append_rand_alpha(NULL, bucket, bucket, 16);
+
+    /* first create a bucket */
+    gen_request("PUT", bucket, 0, &failed);
+    checkpoint();
+  }
+
+  string *objs = new string[num_objs];
+
+  if (failed.read()) {
+    derr << "ERROR: bucket creation failed" << dendl;
+    goto done;
+  }
+
+  for (i = 0; i < num_objs; i++) {
+    char buf[16 + 1];
+    gen_rand_alphanumeric(NULL, buf, sizeof(buf));
+    buf[16] = '\0';
+    objs[i] = buckets[i % num_buckets] + "/" + buf;
+  }
+
+  for (i = 0; i < num_objs; i++) {
+    gen_request("PUT", objs[i], 4096, &failed);
+  }
+
+  checkpoint();
+
+  if (failed.read()) {
+    derr << "ERROR: bucket creation failed" << dendl;
+    goto done;
+  }
+
+  for (i = 0; i < num_objs; i++) {
+    gen_request("GET", objs[i], 4096, NULL);
+  }
+
+  checkpoint();
+
+  for (i = 0; i < num_objs; i++) {
+    gen_request("DELETE", objs[i], 0, NULL);
+  }
+
+  checkpoint();
+
+  for (i = 0; i < num_buckets; i++) {
+    gen_request("DELETE", buckets[i], 0, NULL);
+  }
+
+done:
+  checkpoint();
+
+  m_tp.stop();
+
+  delete[] objs;
+
+  signal_shutdown();
+} /* RGWLoadGenProcess::run() */
+
+void RGWLoadGenProcess::gen_request(const string& method,
+                                   const string& resource,
+                                   int content_length, atomic_t* fail_flag)
+{
+  RGWLoadGenRequest* req =
+    new RGWLoadGenRequest(store->get_new_req_id(), method, resource,
+                         content_length, fail_flag);
+  dout(10) << "allocated request req=" << hex << req << dec << dendl;
+  req_throttle.get(1);
+  req_wq.queue(req);
+} /* RGWLoadGenProcess::gen_request */
+
+void RGWLoadGenProcess::handle_request(RGWRequest *r)
+{
+  RGWLoadGenRequest *req = static_cast<RGWLoadGenRequest *>(r);
+
+  RGWLoadGenRequestEnv env;
+
+  utime_t tm = ceph_clock_now(NULL);
+
+  env.port = 80;
+  env.content_length = req->content_length;
+  env.content_type = "binary/octet-stream";
+  env.request_method = req->method;
+  env.uri = req->resource;
+  env.set_date(tm);
+  env.sign(access_key);
+
+  RGWLoadGenIO client_io(&env);
+
+  int ret = process_request(store, rest, req, &client_io, olog);
+  if (ret < 0) {
+    /* we don't really care about return code */
+    dout(20) << "process_request() returned " << ret << dendl;
+
+    if (req->fail_flag) {
+      req->fail_flag->inc();
+    }
+  }
+
+  delete req;
+} /* RGWLoadGenProcess::handle_request */
+
+int process_request(RGWRados* store, RGWREST* rest, RGWRequest* req,
+                   RGWClientIO* client_io, OpsLogSocket* olog)
+{
+  int ret = 0;
+
+  client_io->init(g_ceph_context);
+
+  req->log_init();
+
+  dout(1) << "====== starting new request req=" << hex << req << dec
+         << " =====" << dendl;
+  perfcounter->inc(l_rgw_req);
+
+  RGWEnv& rgw_env = client_io->get_env();
+
+  struct req_state rstate(g_ceph_context, &rgw_env);
+
+  struct req_state *s = &rstate;
+
+  RGWObjectCtx rados_ctx(store, s);
+  s->obj_ctx = &rados_ctx;
+
+  s->req_id = store->unique_id(req->id);
+  s->trans_id = store->unique_trans_id(req->id);
+  s->host_id = store->host_id;
+
+  req->log_format(s, "initializing for trans_id = %s", s->trans_id.c_str());
+
+  RGWOp *op = NULL;
+  int init_error = 0;
+  bool should_log = false;
+  RGWRESTMgr *mgr;
+  RGWHandler *handler = rest->get_handler(store, s, client_io, &mgr,
+                                         &init_error);
+  if (init_error != 0) {
+    abort_early(s, NULL, init_error, NULL);
+    goto done;
+  }
+  dout(10) << "handler=" << typeid(*handler).name() << dendl;
+
+  should_log = mgr->get_logging();
+
+  req->log_format(s, "getting op %d", s->op);
+  op = handler->get_op(store);
+  if (!op) {
+    abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler);
+    goto done;
+  }
+  req->op = op;
+  dout(10) << "op=" << typeid(*op).name() << dendl;
+
+  req->log(s, "authorizing");
+  ret = handler->authorize();
+  if (ret < 0) {
+    dout(10) << "failed to authorize request" << dendl;
+    abort_early(s, NULL, ret, handler);
+    goto done;
+  }
+
+  req->log(s, "normalizing buckets and tenants");
+  ret = handler->postauth_init();
+  if (ret < 0) {
+    dout(10) << "failed to run post-auth init" << dendl;
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  if (s->user.suspended) {
+    dout(10) << "user is suspended, uid=" << s->user.user_id << dendl;
+    abort_early(s, op, -ERR_USER_SUSPENDED, handler);
+    goto done;
+  }
+
+  req->log(s, "init permissions");
+  ret = handler->init_permissions(op);
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  /**
+   * Only some accesses support website mode, and website mode does NOT apply
+   * if you are using the REST endpoint either (ergo, no authenticated access)
+   */
+  req->log(s, "recalculating target");
+  ret = handler->retarget(op, &op);
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+  req->op = op;
+
+  req->log(s, "reading permissions");
+  ret = handler->read_permissions(op);
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  req->log(s, "init op");
+  ret = op->init_processing();
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  req->log(s, "verifying op mask");
+  ret = op->verify_op_mask();
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  req->log(s, "verifying op permissions");
+  ret = op->verify_permission();
+  if (ret < 0) {
+    if (s->system_request) {
+      dout(2) << "overriding permissions due to system operation" << dendl;
+    } else {
+      abort_early(s, op, ret, handler);
+      goto done;
+    }
+  }
+
+  req->log(s, "verifying op params");
+  ret = op->verify_params();
+  if (ret < 0) {
+    abort_early(s, op, ret, handler);
+    goto done;
+  }
+
+  req->log(s, "pre-executing");
+  op->pre_exec();
+
+  req->log(s, "executing");
+  op->execute();
+
+  req->log(s, "completing");
+  op->complete();
+done:
+  int r = client_io->complete_request();
+  if (r < 0) {
+    dout(0) << "ERROR: client_io->complete_request() returned " << r << dendl;
+  }
+  if (should_log) {
+    rgw_log_op(store, s, (op ? op->name() : "unknown"), olog);
+  }
+
+  int http_ret = s->err.http_ret;
+
+  int op_ret = 0;
+  if (op) {
+    op_ret = op->get_ret();
+  }
+
+  req->log_format(s, "op status=%d", op_ret);
+  req->log_format(s, "http status=%d", http_ret);
+
+  if (handler)
+    handler->put_op(op);
+  rest->put_handler(handler);
+
+  dout(1) << "====== req done req=" << hex << req << dec
+         << " op status=" << op_ret
+         << " http_status=" << http_ret
+         << " ======"
+         << dendl;
+
+  return (ret < 0 ? ret : s->err.ret);
+} /* process_request */
diff --git a/src/rgw/rgw_process.h b/src/rgw/rgw_process.h
new file mode 100644 (file)
index 0000000..0901131
--- /dev/null
@@ -0,0 +1,170 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_PROCESS_H
+#define RGW_PROCESS_H
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_acl.h"
+#include "rgw_user.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#if !defined(dout_subsys)
+#define dout_subsys ceph_subsys_rgw
+#define def_dout_subsys
+#endif
+
+#define SOCKET_BACKLOG 1024
+
+extern void signal_shutdown();
+
+struct RGWProcessEnv {
+  RGWRados *store;
+  RGWREST *rest;
+  OpsLogSocket *olog;
+  int port;
+};
+
+class RGWFrontendConfig;
+
+class RGWProcess {
+  deque<RGWRequest*> m_req_queue;
+protected:
+  RGWRados* store;
+  OpsLogSocket* olog;
+  ThreadPool m_tp;
+  Throttle req_throttle;
+  RGWREST* rest;
+  RGWFrontendConfig* conf;
+  int sock_fd;
+
+  struct RGWWQ : public ThreadPool::WorkQueue<RGWRequest> {
+    RGWProcess* process;
+    RGWWQ(RGWProcess* p, time_t timeout, time_t suicide_timeout, ThreadPool* tp)
+      : ThreadPool::WorkQueue<RGWRequest>("RGWWQ", timeout, suicide_timeout,
+                                         tp), process(p) {}
+
+    bool _enqueue(RGWRequest* req) {
+      process->m_req_queue.push_back(req);
+      perfcounter->inc(l_rgw_qlen);
+      dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+      _dump_queue();
+      return true;
+    }
+
+    void _dequeue(RGWRequest* req) {
+      assert(0);
+    }
+
+    bool _empty() {
+      return process->m_req_queue.empty();
+    }
+
+    RGWRequest* _dequeue() {
+      if (process->m_req_queue.empty())
+       return NULL;
+      RGWRequest *req = process->m_req_queue.front();
+      process->m_req_queue.pop_front();
+      dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+      _dump_queue();
+      perfcounter->inc(l_rgw_qlen, -1);
+      return req;
+    }
+
+    using ThreadPool::WorkQueue<RGWRequest>::_process;
+
+    void _process(RGWRequest *req, ThreadPool::TPHandle &) override  {
+      perfcounter->inc(l_rgw_qactive);
+      process->handle_request(req);
+      process->req_throttle.put(1);
+      perfcounter->inc(l_rgw_qactive, -1);
+    }
+
+    void _dump_queue();
+
+    void _clear() {
+      assert(process->m_req_queue.empty());
+    }
+  } req_wq;
+
+public:
+  RGWProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+            RGWFrontendConfig* _conf)
+    : store(pe->store), olog(pe->olog),
+      m_tp(cct, "RGWProcess::m_tp", "tp_rgw_process", num_threads),
+      req_throttle(cct, "rgw_ops", num_threads * 2),
+      rest(pe->rest),
+      conf(_conf),
+      sock_fd(-1),
+      req_wq(this, g_conf->rgw_op_thread_timeout,
+            g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
+  
+  virtual ~RGWProcess() {}
+
+  virtual void run() = 0;
+  virtual void handle_request(RGWRequest *req) = 0;
+
+  void close_fd() {
+    if (sock_fd >= 0) {
+      ::close(sock_fd);
+      sock_fd = -1;
+    }
+  }
+}; /* RGWProcess */
+
+class RGWFCGXProcess : public RGWProcess {
+       int max_connections;
+public:
+
+  /* have a bit more connections than threads so that requests are
+   * still accepted even if we're still processing older requests */
+  RGWFCGXProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+                RGWFrontendConfig* _conf)
+    : RGWProcess(cct, pe, num_threads, _conf),
+      max_connections(num_threads + (num_threads >> 3))
+    {}
+
+  void run();
+  void handle_request(RGWRequest* req);
+};
+
+class RGWProcessControlThread : public Thread {
+  RGWProcess *pprocess;
+public:
+  RGWProcessControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
+
+  void *entry() {
+    pprocess->run();
+    return NULL;
+  }
+};
+
+class RGWLoadGenProcess : public RGWProcess {
+  RGWAccessKey access_key;
+public:
+  RGWLoadGenProcess(CephContext* cct, RGWProcessEnv* pe, int num_threads,
+                 RGWFrontendConfig* _conf) :
+  RGWProcess(cct, pe, num_threads, _conf) {}
+  void run();
+  void checkpoint();
+  void handle_request(RGWRequest* req);
+  void gen_request(const string& method, const string& resource,
+                 int content_length, atomic_t* fail_flag);
+
+  void set_access_key(RGWAccessKey& key) { access_key = key; }
+};
+
+int process_request(RGWRados* store, RGWREST* rest, RGWRequest* req,
+                   RGWClientIO* client_io, OpsLogSocket* olog);
+
+#if defined(def_dout_subsys)
+#undef def_dout_subsys
+#undef dout_subsys
+#endif
+
+#endif /* RGW_PROCESS_H */
diff --git a/src/rgw/rgw_request.cc b/src/rgw/rgw_request.cc
new file mode 100644 (file)
index 0000000..e8e5640
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_op.h"
+#include "rgw_request.h"
+
+#define dout_subsys ceph_subsys_auth
+
+/* XXX */
+void RGWRequest::log_format(struct req_state *s, const char *fmt, ...)
+{
+#define LARGE_SIZE 1024
+  char buf[LARGE_SIZE];
+  va_list ap;
+
+  va_start(ap, fmt);
+  vsnprintf(buf, sizeof(buf), fmt, ap);
+  va_end(ap);
+
+  log(s, buf);
+} /* RGWRequest::log_format */
+
+void RGWRequest::log_init() {
+  ts = ceph_clock_now(g_ceph_context);
+}
+
+void RGWRequest::log(struct req_state *s, const char *msg) {
+  if (s->info.method && req_str.size() == 0) {
+    req_str = s->info.method;
+    req_str.append(" ");
+    req_str.append(s->info.request_uri);
+  }
+  utime_t t = ceph_clock_now(g_ceph_context) - ts;
+  dout(2) << "req " << id << ":" << t << ":" << s->dialect << ":"
+         << req_str << ":" << (op ? op->name() : "") << ":" << msg
+         << dendl;
+}
diff --git a/src/rgw/rgw_request.h b/src/rgw/rgw_request.h
new file mode 100644 (file)
index 0000000..29060d0
--- /dev/null
@@ -0,0 +1,64 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef RGW_REQUEST_H
+#define RGW_REQUEST_H
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_acl.h"
+#include "rgw_user.h"
+#include "rgw_op.h"
+#include "rgw_fcgi.h"
+
+#include "common/QueueRing.h"
+
+struct RGWRequest
+{
+  uint64_t id;
+  struct req_state *s;
+  string req_str;
+  RGWOp *op;
+  utime_t ts;
+
+  explicit RGWRequest(uint64_t id) : id(id), s(NULL), op(NULL) {}
+
+  virtual ~RGWRequest() {}
+
+  void init_state(req_state *_s) {
+    s = _s;
+  }
+
+  void log_format(struct req_state *s, const char *fmt, ...);
+  void log_init();
+  void log(struct req_state *s, const char *msg);
+}; /* RGWRequest */
+
+struct RGWFCGXRequest : public RGWRequest {
+  FCGX_Request *fcgx;
+  QueueRing<FCGX_Request *> *qr;
+
+  RGWFCGXRequest(uint64_t req_id, QueueRing<FCGX_Request *> *_qr)
+         : RGWRequest(req_id), qr(_qr) {
+    qr->dequeue(&fcgx);
+  }
+
+  ~RGWFCGXRequest() {
+    FCGX_Finish_r(fcgx);
+    qr->enqueue(fcgx);
+  }
+};
+
+struct RGWLoadGenRequest : public RGWRequest {
+       string method;
+       string resource;
+       int content_length;
+       atomic_t* fail_flag;
+
+RGWLoadGenRequest(uint64_t req_id, const string& _m, const  string& _r, int _cl,
+               atomic_t *ff)
+       : RGWRequest(req_id), method(_m), resource(_r), content_length(_cl),
+               fail_flag(ff) {}
+};
+
+#endif /* RGW_REQUEST_H */