From: Yehuda Sadeh Date: Mon, 4 Nov 2013 22:54:19 +0000 (-0800) Subject: rgw: clean up front end configuration X-Git-Tag: v0.75~60^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6d737ce5eb62eb9984f27e18532efef72a03e326;p=ceph.git rgw: clean up front end configuration Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 40fd3c5985fe..6ae958e31295 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -72,7 +72,7 @@ static sighandler_t sighandler_alrm; class RGWProcess; -static RGWProcess *pprocess = NULL; +int signal_fd[2] = {0, 0}; #define SOCKET_BACKLOG 1024 @@ -282,8 +282,8 @@ static void handle_sigterm(int signum) dout(1) << __func__ << dendl; FCGX_ShutdownPending(); - // close the fd, so that accept can't start again. - pprocess->close_fd(); + int val = 0; + write(signal_fd[0], (char *)&val, sizeof(val)); // send a signal to make fcgi's accept(2) wake up. unfortunately the // initial signal often isn't sufficient because we race with accept's @@ -514,6 +514,94 @@ static RGWRESTMgr *set_logging(RGWRESTMgr *mgr) return mgr; } + +class RGWFrontend { +public: + virtual ~RGWFrontend() {} + virtual int run() = 0; + virtual void stop() = 0; + virtual void join() = 0; +}; + +class RGWFCGXControlThread : public Thread { + RGWProcess *pprocess; +public: + RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {} + + void *entry() { + pprocess->run(); + return NULL; + }; +}; + +class RGWFCGXFrontend : public RGWFrontend { + RGWProcess *pprocess; + RGWProcessEnv env; + RGWFCGXControlThread *thread; + +public: + RGWFCGXFrontend(RGWProcessEnv& pe) : env(pe) { + pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size); + thread = new RGWFCGXControlThread(pprocess); + } + + ~RGWFCGXFrontend() { + delete thread; + } + + int run() { + thread->create(); + return 0; + } + + void stop() { + pprocess->close_fd(); + } + + void join() { + thread->join(); + } +}; + +class RGWMongooseFrontend : public RGWFrontend { + struct mg_context *ctx; + RGWProcessEnv env; + string port; + +public: + RGWMongooseFrontend(RGWProcessEnv& pe, const string& config) : ctx(NULL), env(pe) { + port = config; + } + + int run() { + + char thread_pool_buf[32]; + snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size); + const char *options[] = {"listening_ports", port.c_str(), "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL}; + + struct mg_callbacks cb; + memset((void *)&cb, 0, sizeof(cb)); + cb.begin_request = mongoose_callback; + ctx = mg_start(&cb, &env, (const char **)&options); + + if (!ctx) { + return -EIO; + } + + return 0; + } + + void stop() { + if (ctx) { + mg_stop(ctx); + } + } + + void join() { + } +}; + + /* * start up the RADOS connection and then handle HTTP messages as they come in */ @@ -643,6 +731,12 @@ int main(int argc, const char **argv) olog->init(g_conf->rgw_ops_log_socket_path); } + r = socketpair(AF_UNIX, SOCK_STREAM, 0, signal_fd); + if (r < 0) { + cerr << "radosgw[" << getpid() << "]: unable to create socketpair: " << cpp_strerror(errno) << std::endl; + exit(1); + } + init_async_signal_handler(); register_async_signal_handler(SIGHUP, sighup_handler); register_async_signal_handler(SIGTERM, handle_sigterm); @@ -656,68 +750,75 @@ int main(int argc, const char **argv) get_str_list(g_conf->rgw_frontends, frontends); - map fe_map; + map fe_map; for (list::iterator iter = frontends.begin(); iter != frontends.end(); ++iter) { string& f = *iter; string framework; - int port; + string config; + int pos = f.find (':'); if (pos >= 0) { framework = f.substr(0, pos); - string port_str = f.substr(pos + 1); + config = f.substr(pos + 1); + } else { + framework = f; + } + + fe_map[framework] = config; + } + + if (fe_map.empty()) { + fe_map["fastcgi"] = ""; + } + + list fes; + + for (map::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) { + const string& framework = fiter->first; + const string& config = fiter->second; + dout(0) << "handler: " << fiter->first << ":" << config << dendl; + RGWFrontend *fe; + + if (framework == "fastcgi" || framework == "fcgi") { + RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 }; + + fe = new RGWFCGXFrontend(fcgi_pe); + } else if (framework == "mongoose") { string err; - port = strict_strtol(port_str.c_str(), 10, &err); + int port = strict_strtol(config.c_str(), 10, &err); if (!err.empty()) { cerr << "error parsing frontend config for framework " << framework << ": " << err << std::endl; return EINVAL; } + + RGWProcessEnv env = { store, &rest, olog, port }; + + fe = new RGWMongooseFrontend(env, config); } else { - framework = f; - port = 0; + continue; } + fe->run(); - fe_map[framework] = port; + fes.push_back(fe); } - struct mg_context *mongoose_ctx = NULL; - RGWProcessEnv *mongoose_pe = NULL; - - RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 }; - - RGWProcess *pprocess = new RGWProcess(g_ceph_context, &fcgi_pe, g_conf->rgw_thread_pool_size); - - for (map::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) { - int port = fiter->second; - dout(0) << "handler: " << fiter->first << ":" << port << dendl; - if (fiter->first == "fastcgi" || fiter->first == "fcgi") { - // later - } else if (fiter->first == "mongoose") { - mongoose_pe = new RGWProcessEnv; - *mongoose_pe = { store, &rest, olog, port }; - char port_buf[32]; - snprintf(port_buf, sizeof(port_buf), "%d", port); - - char thread_pool_buf[32]; - snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size); - const char *options[] = {"listening_ports", port_buf, "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL}; - - struct mg_callbacks cb; - memset((void *)&cb, 0, sizeof(cb)); - cb.begin_request = mongoose_callback; - mongoose_ctx = mg_start(&cb, mongoose_pe, (const char **)&options); - assert(mongoose_ctx); - } - } + /* FIXME: exact read */ + int val; + read(signal_fd[1], &val, sizeof(val)); - pprocess->run(); + derr << "shutting down" << dendl; - if (mongoose_ctx) { - mg_stop(mongoose_ctx); + for (list::iterator liter = fes.begin(); liter != fes.end(); ++liter) { + RGWFrontend *fe = *liter; + fe->stop(); } - derr << "shutting down" << dendl; + for (list::iterator liter = fes.begin(); liter != fes.end(); ++liter) { + RGWFrontend *fe = *liter; + fe->join(); + } unregister_async_signal_handler(SIGHUP, sighup_handler); unregister_async_signal_handler(SIGTERM, handle_sigterm); @@ -725,9 +826,6 @@ int main(int argc, const char **argv) unregister_async_signal_handler(SIGUSR1, handle_sigterm); shutdown_async_signal_handler(); - delete pprocess; - delete mongoose_pe; - if (do_swift) { swift_finalize(); }