]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: clean up front end configuration
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 4 Nov 2013 22:54:19 +0000 (14:54 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 5 Nov 2013 18:20:31 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_main.cc

index 40fd3c5985fea5325d9fcb71b7537ba0a4a1b7ef..6ae958e31295d1c0c92be178c9526994a519341f 100644 (file)
@@ -72,7 +72,7 @@ static sighandler_t sighandler_alrm;
 
 class RGWProcess;
 
-static RGWProcess *pprocess = NULL;
+int signal_fd[2] = {0, 0};
 
 
 #define SOCKET_BACKLOG 1024
@@ -282,8 +282,8 @@ static void handle_sigterm(int signum)
   dout(1) << __func__ << dendl;
   FCGX_ShutdownPending();
 
-  // close the fd, so that accept can't start again.
-  pprocess->close_fd();
+  int val = 0;
+  write(signal_fd[0], (char *)&val, sizeof(val));
 
   // send a signal to make fcgi's accept(2) wake up.  unfortunately the
   // initial signal often isn't sufficient because we race with accept's
@@ -514,6 +514,94 @@ static RGWRESTMgr *set_logging(RGWRESTMgr *mgr)
   return mgr;
 }
 
+
+class RGWFrontend {
+public:
+  virtual ~RGWFrontend() {}
+  virtual int run() = 0;
+  virtual void stop() = 0;
+  virtual void join() = 0;
+};
+
+class RGWFCGXControlThread : public Thread {
+  RGWProcess *pprocess;
+public:
+  RGWFCGXControlThread(RGWProcess *_pprocess) : pprocess(_pprocess) {}
+
+  void *entry() {
+    pprocess->run();
+    return NULL;
+  };
+};
+
+class RGWFCGXFrontend : public RGWFrontend {
+  RGWProcess *pprocess;
+  RGWProcessEnv env;
+  RGWFCGXControlThread *thread;
+
+public:
+  RGWFCGXFrontend(RGWProcessEnv& pe) : env(pe) {
+    pprocess = new RGWProcess(g_ceph_context, &env, g_conf->rgw_thread_pool_size);
+    thread = new RGWFCGXControlThread(pprocess);
+  }
+
+  ~RGWFCGXFrontend() {
+    delete thread;
+  }
+
+  int run() {
+    thread->create();
+    return 0;
+  }
+
+  void stop() {
+    pprocess->close_fd();
+  }
+
+  void join() {
+    thread->join();
+  }
+};
+
+class RGWMongooseFrontend : public RGWFrontend {
+  struct mg_context *ctx;
+  RGWProcessEnv env;
+  string port;
+
+public:
+  RGWMongooseFrontend(RGWProcessEnv& pe, const string& config) : ctx(NULL), env(pe) {
+    port = config;
+  }
+
+  int run() {
+
+    char thread_pool_buf[32];
+    snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
+    const char *options[] = {"listening_ports", port.c_str(), "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL};
+
+    struct mg_callbacks cb;
+    memset((void *)&cb, 0, sizeof(cb));
+    cb.begin_request = mongoose_callback;
+    ctx = mg_start(&cb, &env, (const char **)&options);
+
+    if (!ctx) {
+      return -EIO;
+    }
+
+    return 0;
+  }
+
+  void stop() {
+    if (ctx) {
+      mg_stop(ctx);
+    }
+  }
+
+  void join() {
+  }
+};
+
+
 /*
  * start up the RADOS connection and then handle HTTP messages as they come in
  */
@@ -643,6 +731,12 @@ int main(int argc, const char **argv)
     olog->init(g_conf->rgw_ops_log_socket_path);
   }
 
+  r = socketpair(AF_UNIX, SOCK_STREAM, 0, signal_fd);
+  if (r < 0) {
+    cerr << "radosgw[" << getpid() << "]: unable to create socketpair: " << cpp_strerror(errno) << std::endl;
+    exit(1);
+  }
+
   init_async_signal_handler();
   register_async_signal_handler(SIGHUP, sighup_handler);
   register_async_signal_handler(SIGTERM, handle_sigterm);
@@ -656,68 +750,75 @@ int main(int argc, const char **argv)
 
   get_str_list(g_conf->rgw_frontends, frontends);
 
-  map<string, int> fe_map;
+  map<string, string> fe_map;
 
   for (list<string>::iterator iter = frontends.begin(); iter != frontends.end(); ++iter) {
     string& f = *iter;
     string framework;
-    int port;
+    string config;
+
     int pos = f.find (':');
     if (pos >= 0) {
       framework = f.substr(0, pos);
-      string port_str = f.substr(pos + 1);
+      config = f.substr(pos + 1);
+    } else {
+      framework = f;
+    }
+
+    fe_map[framework] = config;
+  }
+
+  if (fe_map.empty()) {
+    fe_map["fastcgi"] = "";
+  }
+
+  list<RGWFrontend *> fes;
+
+  for (map<string, string>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
+    const string& framework = fiter->first;
+    const string& config = fiter->second;
+    dout(0) << "handler: " << fiter->first << ":" << config << dendl;
+    RGWFrontend *fe;
+
+    if (framework == "fastcgi" || framework == "fcgi") {
+      RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
+
+      fe = new RGWFCGXFrontend(fcgi_pe);
+    } else if (framework == "mongoose") {
       string err;
-      port = strict_strtol(port_str.c_str(), 10, &err);
+      int port = strict_strtol(config.c_str(), 10, &err);
       if (!err.empty()) {
         cerr << "error parsing frontend config for framework " << framework << ": " << err << std::endl;
         return EINVAL;
       }
+
+      RGWProcessEnv env = { store, &rest, olog, port };
+
+      fe = new RGWMongooseFrontend(env, config);
     } else {
-      framework = f;
-      port = 0;
+      continue;
     }
+    fe->run();
 
-    fe_map[framework] = port;
+    fes.push_back(fe);
   }
 
-  struct mg_context *mongoose_ctx = NULL;
-  RGWProcessEnv *mongoose_pe = NULL;
-  
-  RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
-
-  RGWProcess *pprocess = new RGWProcess(g_ceph_context, &fcgi_pe, g_conf->rgw_thread_pool_size);
-
-  for (map<string, int>::iterator fiter = fe_map.begin(); fiter != fe_map.end(); ++fiter) {
-    int port = fiter->second;
-    dout(0) << "handler: " << fiter->first << ":" << port << dendl;
-    if (fiter->first == "fastcgi" || fiter->first == "fcgi") {
-      // later
-    } else if (fiter->first == "mongoose") {
-      mongoose_pe = new RGWProcessEnv;
-      *mongoose_pe = { store, &rest, olog, port };
-      char port_buf[32];
-      snprintf(port_buf, sizeof(port_buf), "%d", port);
-
-      char thread_pool_buf[32];
-      snprintf(thread_pool_buf, sizeof(thread_pool_buf), "%d", (int)g_conf->rgw_thread_pool_size);
-      const char *options[] = {"listening_ports", port_buf, "enable_keep_alive", "yes", "num_threads", thread_pool_buf, NULL};
-
-      struct mg_callbacks cb;
-      memset((void *)&cb, 0, sizeof(cb));
-      cb.begin_request = mongoose_callback;
-      mongoose_ctx = mg_start(&cb, mongoose_pe, (const char **)&options);
-      assert(mongoose_ctx);
-    }
-  }
 
+  /* FIXME: exact read */
+  int val;
+  read(signal_fd[1], &val, sizeof(val));
 
-  pprocess->run();
+  derr << "shutting down" << dendl;
 
-  if (mongoose_ctx) {
-    mg_stop(mongoose_ctx);
+  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+    RGWFrontend *fe = *liter;
+    fe->stop();
   }
 
-  derr << "shutting down" << dendl;
+  for (list<RGWFrontend *>::iterator liter = fes.begin(); liter != fes.end(); ++liter) {
+    RGWFrontend *fe = *liter;
+    fe->join();
+  }
 
   unregister_async_signal_handler(SIGHUP, sighup_handler);
   unregister_async_signal_handler(SIGTERM, handle_sigterm);
@@ -725,9 +826,6 @@ int main(int argc, const char **argv)
   unregister_async_signal_handler(SIGUSR1, handle_sigterm);
   shutdown_async_signal_handler();
 
-  delete pprocess;
-  delete mongoose_pe;
-
   if (do_swift) {
     swift_finalize();
   }