class RGWProcess;
-static RGWProcess *pprocess = NULL;
+int signal_fd[2] = {0, 0};
#define SOCKET_BACKLOG 1024
dout(1) << __func__ << dendl;
FCGX_ShutdownPending();
- // close the fd, so that accept can't start again.
- pprocess->close_fd();
+ int val = 0;
+ write(signal_fd[0], (char *)&val, sizeof(val));
// send a signal to make fcgi's accept(2) wake up. unfortunately the
// initial signal often isn't sufficient because we race with accept's
return mgr;
}
+
+class RGWFrontend {
+public:
+ virtual ~RGWFrontend() {}
+ virtual int run() = 0;
+ virtual void stop() = 0;
+ virtual void join() = 0;
+};
+
+class RGWFCGXControlThread : public Thread {
+ RGWProcess *pprocess;
+public:
+ RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
+
+ void *entry() {
+ pprocess->run();
+ return NULL;
+ };
+};
+
+class RGWFCGXFrontend : public RGWFrontend {
+ RGWProcess *pprocess;
+ RGWProcessEnv env;
+ RGWFCGXControlThread *thread;
+
+public:
+ RGWFCGXFrontend(RGWProcessEnv& pe) : env(pe) {
+ pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size);
+ thread = new RGWFCGXControlThread(pprocess);
+ }
+
+ ~RGWFCGXFrontend() {
+ delete thread;
+ }
+
+ int run() {
+ thread->create();
+ return 0;
+ }
+
+ void stop() {
+ pprocess->close_fd();
+ }
+
+ void join() {
+ thread->join();
+ }
+};
+
+class RGWMongooseFrontend : public RGWFrontend {
+ struct mg_context *ctx;
+ RGWProcessEnv env;
+ string port;
+
+public:
+ RGWMongooseFrontend(RGWProcessEnv& pe, const string& config) : ctx(NULL), env(pe) {
+ port = config;
+ }
+
+ int run() {
+
+ char thread_pool_buf[32];
+ snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
+ const char *options[] = {"listening_ports", port.c_str(), "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL};
+
+ struct mg_callbacks cb;
+ memset((void *)&cb, 0, sizeof(cb));
+ cb.begin_request = mongoose_callback;
+ ctx = mg_start(&cb, &env, (const char **)&options);
+
+ if (!ctx) {
+ return -EIO;
+ }
+
+ return 0;
+ }
+
+ void stop() {
+ if (ctx) {
+ mg_stop(ctx);
+ }
+ }
+
+ void join() {
+ }
+};
+
+
/*
* start up the RADOS connection and then handle HTTP messages as they come in
*/
olog->init(g_conf->rgw_ops_log_socket_path);
}
+ r = socketpair(AF_UNIX, SOCK_STREAM, 0, signal_fd);
+ if (r < 0) {
+ cerr << "radosgw[" << getpid() << "]: unable to create socketpair: " << cpp_strerror(errno) << std::endl;
+ exit(1);
+ }
+
init_async_signal_handler();
register_async_signal_handler(SIGHUP, sighup_handler);
register_async_signal_handler(SIGTERM, handle_sigterm);
get_str_list(g_conf->rgw_frontends, frontends);
- map<string, int> fe_map;
+ map<string, string> fe_map;
for (list<string>::iterator iter = frontends.begin(); iter != frontends.end(); ++iter) {
string& f = *iter;
string framework;
- int port;
+ string config;
+
int pos = f.find (':');
if (pos >= 0) {
framework = f.substr(0, pos);
- string port_str = f.substr(pos + 1);
+ config = f.substr(pos + 1);
+ } else {
+ framework = f;
+ }
+
+ fe_map[framework] = config;
+ }
+
+ if (fe_map.empty()) {
+ fe_map["fastcgi"] = "";
+ }
+
+ list<RGWFrontend *> fes;
+
+ for (map<string, string>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
+ const string& framework = fiter->first;
+ const string& config = fiter->second;
+ dout(0) << "handler: " << fiter->first << ":" << config << dendl;
+ RGWFrontend *fe;
+
+ if (framework == "fastcgi" || framework == "fcgi") {
+ RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
+
+ fe = new RGWFCGXFrontend(fcgi_pe);
+ } else if (framework == "mongoose") {
string err;
- port = strict_strtol(port_str.c_str(), 10, &err);
+ int port = strict_strtol(config.c_str(), 10, &err);
if (!err.empty()) {
cerr << "error parsing frontend config for framework " << framework << ": " << err << std::endl;
return EINVAL;
}
+
+ RGWProcessEnv env = { store, &rest, olog, port };
+
+ fe = new RGWMongooseFrontend(env, config);
} else {
- framework = f;
- port = 0;
+ continue;
}
+ fe->run();
- fe_map[framework] = port;
+ fes.push_back(fe);
}
- struct mg_context *mongoose_ctx = NULL;
- RGWProcessEnv *mongoose_pe = NULL;
-
- RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
-
- RGWProcess *pprocess = new RGWProcess(g_ceph_context, &fcgi_pe, g_conf->rgw_thread_pool_size);
-
- for (map<string, int>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
- int port = fiter->second;
- dout(0) << "handler: " << fiter->first << ":" << port << dendl;
- if (fiter->first == "fastcgi" || fiter->first == "fcgi") {
- // later
- } else if (fiter->first == "mongoose") {
- mongoose_pe = new RGWProcessEnv;
- *mongoose_pe = { store, &rest, olog, port };
- char port_buf[32];
- snprintf(port_buf, sizeof(port_buf), "%d", port);
-
- char thread_pool_buf[32];
- snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
- const char *options[] = {"listening_ports", port_buf, "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL};
-
- struct mg_callbacks cb;
- memset((void *)&cb, 0, sizeof(cb));
- cb.begin_request = mongoose_callback;
- mongoose_ctx = mg_start(&cb, mongoose_pe, (const char **)&options);
- assert(mongoose_ctx);
- }
- }
+ /* FIXME: exact read */
+ int val;
+ read(signal_fd[1], &val, sizeof(val));
- pprocess->run();
+ derr << "shutting down" << dendl;
- if (mongoose_ctx) {
- mg_stop(mongoose_ctx);
+ for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+ RGWFrontend *fe = *liter;
+ fe->stop();
}
- derr << "shutting down" << dendl;
+ for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+ RGWFrontend *fe = *liter;
+ fe->join();
+ }
unregister_async_signal_handler(SIGHUP, sighup_handler);
unregister_async_signal_handler(SIGTERM, handle_sigterm);
unregister_async_signal_handler(SIGUSR1, handle_sigterm);
shutdown_async_signal_handler();
- delete pprocess;
- delete mongoose_pe;
-
if (do_swift) {
swift_finalize();
}