]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement Frontend interface for asio
authorCasey Bodley <cbodley@redhat.com>
Sat, 18 Jun 2016 14:15:18 +0000 (10:15 -0400)
committerRadoslaw Zarzynski <rzarzynski@mirantis.com>
Fri, 21 Oct 2016 20:57:16 +0000 (22:57 +0200)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_asio_frontend.cc
src/rgw/rgw_asio_frontend.h
src/rgw/rgw_main.cc

index 2f6a372941f1bccf21aa9e6ebba9e601fc102e93..faec668ac1191c4960521d2a45bbc04c0dbd61b1 100644 (file)
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <boost/asio.hpp>
+#include <boost/optional.hpp>
+
 #include "rgw_asio_frontend.h"
 
-RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "asio: ")
+
+namespace {
+
+class Pauser {
+  std::mutex mutex;
+  std::condition_variable cond_ready; // signaled on ready==true
+  std::condition_variable cond_paused; // signaled on waiters==thread_count
+  bool ready{false};
+  int waiters{0};
+ public:
+  template <typename Func>
+  void pause(int thread_count, Func&& func);
+  void unpause();
+  void wait();
+};
+
+template <typename Func>
+void Pauser::pause(int thread_count, Func&& func)
 {
+  std::unique_lock<std::mutex> lock(mutex);
+  ready = false;
+  lock.unlock();
+
+  func();
+
+  // wait for all threads to pause
+  lock.lock();
+  cond_paused.wait(lock, [=] { return waiters == thread_count; });
 }
 
-RGWAsioFrontend::~RGWAsioFrontend()
+void Pauser::unpause()
 {
+  std::lock_guard<std::mutex> lock(mutex);
+  ready = true;
+  cond_ready.notify_all();
 }
 
-int RGWAsioFrontend::init()
+void Pauser::wait()
+{
+  std::unique_lock<std::mutex> lock(mutex);
+  ++waiters;
+  cond_paused.notify_one(); // notify pause() that we're waiting
+  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
+  --waiters;
+}
+
+using tcp = boost::asio::ip::tcp;
+
+class AsioFrontend {
+  RGWProcessEnv env;
+  boost::asio::io_service service;
+
+  tcp::acceptor acceptor;
+  tcp::socket peer_socket;
+  tcp::endpoint peer_endpoint;
+
+  std::vector<std::thread> threads;
+  Pauser pauser;
+  std::atomic<bool> going_down{false};
+
+  CephContext* ctx() const { return env.store->ctx(); }
+
+  void accept(boost::system::error_code ec);
+
+ public:
+  AsioFrontend(const RGWProcessEnv& env)
+    : env(env), acceptor(service), peer_socket(service) {}
+
+  int init();
+  int run();
+  void stop();
+  void join();
+  void pause();
+  void unpause(RGWRados *store);
+};
+
+int AsioFrontend::init()
 {
+  auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
+  ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
+
+  acceptor.open(ep.protocol());
+  acceptor.set_option(tcp::acceptor::reuse_address(true));
+  acceptor.bind(ep);
+  acceptor.listen(boost::asio::socket_base::max_connections);
+  acceptor.async_accept(peer_socket, peer_endpoint,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
   return 0;
 }
 
-int RGWAsioFrontend::run()
+void AsioFrontend::accept(boost::system::error_code ec)
 {
+  if (!acceptor.is_open()) {
+    return;
+  } else if (ec == boost::asio::error::operation_aborted) {
+    return;
+  } else if (ec) {
+    throw ec;
+  }
+  auto socket = std::move(peer_socket);
+  auto endpoint = std::move(peer_endpoint);
+
+  acceptor.async_accept(peer_socket, peer_endpoint,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
+
+  ldout(ctx(), 4) << "accept " << endpoint << dendl;
+}
+
+int AsioFrontend::run()
+{
+  auto cct = ctx();
+  const int thread_count = cct->_conf->rgw_thread_pool_size;
+  threads.reserve(thread_count);
+
+  ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
+
+  for (int i = 0; i < thread_count; i++) {
+    threads.emplace_back([=] {
+      for (;;) {
+        service.run();
+        if (going_down) {
+          break;
+        }
+        pauser.wait();
+      }
+    });
+  }
   return 0;
 }
 
+void AsioFrontend::stop()
+{
+  ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
+
+  going_down = true;
+
+  boost::system::error_code ec;
+  acceptor.close(ec); // unblock the run() threads
+}
+
+void AsioFrontend::join()
+{
+  if (!going_down) {
+    stop();
+  }
+  ldout(ctx(), 4) << "frontend joining threads..." << dendl;
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  ldout(ctx(), 4) << "frontend done" << dendl;
+}
+
+void AsioFrontend::pause()
+{
+  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
+  pauser.pause(threads.size(), [=] {
+    // stop accepting but leave the port open
+    boost::system::error_code ec;
+    acceptor.cancel(ec);
+  });
+  ldout(ctx(), 4) << "frontend paused" << dendl;
+}
+
+void AsioFrontend::unpause(RGWRados *store)
+{
+  env.store = store;
+  ldout(ctx(), 4) << "frontend unpaused" << dendl;
+  service.reset();
+  acceptor.async_accept(peer_socket, peer_endpoint,
+                        [this] (boost::system::error_code ec) {
+                          return accept(ec);
+                        });
+  pauser.unpause();
+}
+
+} // anonymous namespace
+
+class RGWAsioFrontend::Impl : public AsioFrontend {
+ public:
+  Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
+};
+
+RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
+  : impl(new Impl(env))
+{
+}
+
+RGWAsioFrontend::~RGWAsioFrontend() = default;
+
+int RGWAsioFrontend::init()
+{
+  return impl->init();
+}
+
+int RGWAsioFrontend::run()
+{
+  return impl->run();
+}
+
 void RGWAsioFrontend::stop()
 {
+  impl->stop();
 }
 
 void RGWAsioFrontend::join()
 {
+  impl->join();
 }
 
 void RGWAsioFrontend::pause_for_new_config()
 {
+  impl->pause();
 }
 
 void RGWAsioFrontend::unpause_with_new_config(RGWRados *store)
 {
+  impl->unpause(store);
 }
index 22d36296198d018caad368cdae31494049ba25d6..c429e19db5c17b7ddb552f88f76ce1d66a41294c 100644 (file)
@@ -4,9 +4,12 @@
 #ifndef RGW_ASIO_FRONTEND_H
 #define RGW_ASIO_FRONTEND_H
 
+#include <memory>
 #include "rgw_frontend.h"
 
 class RGWAsioFrontend : public RGWFrontend {
+  class Impl;
+  std::unique_ptr<Impl> impl;
 public:
   RGWAsioFrontend(const RGWProcessEnv& env);
   ~RGWAsioFrontend();
index a0e08af2ac7579083f089088aa7864133c2cefd9..67724fddc529567d238d3b2b26b08038951914cf 100644 (file)
@@ -53,6 +53,7 @@
 #include "rgw_request.h"
 #include "rgw_process.h"
 #include "rgw_frontend.h"
+#include "rgw_asio_frontend.h"
 
 #include <map>
 #include <string>
@@ -430,7 +431,12 @@ int main(int argc, const char **argv)
     RGWFrontendConfig *config = fiter->second;
     string framework = config->get_framework();
     RGWFrontend *fe;
-    if (framework == "fastcgi" || framework == "fcgi") {
+    if (framework == "asio") {
+      int port;
+      config->get_val("port", 80, &port);
+      RGWProcessEnv env{ store, &rest, olog, port };
+      fe = new RGWAsioFrontend(env);
+    } else if (framework == "fastcgi" || framework == "fcgi") {
       RGWProcessEnv fcgi_pe = { store, &rest, olog, 0 };
 
       fe = new RGWFCGXFrontend(fcgi_pe, config);