// vim: ts=8 sw=2 smarttab
#include <atomic>
-#include <condition_variable>
-#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
+#include "common/async/shared_mutex.h"
+
#include "rgw_asio_client.h"
#include "rgw_asio_frontend.h"
namespace {
-class Pauser {
- std::mutex mutex;
- std::condition_variable cond_ready; // signaled on ready==true
- std::condition_variable cond_paused; // signaled on waiters==thread_count
- bool ready{false};
- int waiters{0};
- public:
- template <typename Func>
- void pause(int thread_count, Func&& func);
- void unpause();
- void wait();
-};
-
-template <typename Func>
-void Pauser::pause(int thread_count, Func&& func)
-{
- std::unique_lock<std::mutex> lock(mutex);
- ready = false;
- lock.unlock();
-
- func();
-
- // wait for all threads to pause
- lock.lock();
- cond_paused.wait(lock, [=] { return waiters == thread_count; });
-}
-
-void Pauser::unpause()
-{
- std::lock_guard<std::mutex> lock(mutex);
- ready = true;
- cond_ready.notify_all();
-}
-
-void Pauser::wait()
-{
- std::unique_lock<std::mutex> lock(mutex);
- ++waiters;
- cond_paused.notify_one(); // notify pause() that we're waiting
- cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
- --waiters;
-}
-
using tcp = boost::asio::ip::tcp;
namespace beast = boost::beast;
#ifdef WITH_RADOSGW_BEAST_OPENSSL
}
};
+using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
+
template <typename Stream>
void handle_connection(RGWProcessEnv& env, Stream& stream,
beast::flat_buffer& buffer, bool is_ssl,
+ SharedMutex& pause_mutex,
boost::system::error_code& ec,
boost::asio::yield_context yield)
{
return;
}
- // process the request
- RGWRequest req{env.store->get_new_req_id()};
-
- auto& socket = stream.lowest_layer();
- StreamIO real_client{stream, parser, buffer, is_ssl,
- socket.local_endpoint(),
- socket.remote_endpoint()};
+ {
+ auto lock = pause_mutex.async_lock_shared(yield[ec]);
+ if (ec == boost::asio::error::operation_aborted) {
+ return;
+ } else if (ec) {
+ ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
+ return;
+ }
- auto real_client_io = rgw::io::add_reordering(
- rgw::io::add_buffering(cct,
- rgw::io::add_chunking(
- rgw::io::add_conlen_controlling(
- &real_client))));
- RGWRestfulIO client(cct, &real_client_io);
- process_request(env.store, env.rest, &req, env.uri_prefix,
- *env.auth_registry, &client, env.olog);
+ // process the request
+ RGWRequest req{env.store->get_new_req_id()};
+
+ auto& socket = stream.lowest_layer();
+ StreamIO real_client{stream, parser, buffer, is_ssl,
+ socket.local_endpoint(),
+ socket.remote_endpoint()};
+
+ auto real_client_io = rgw::io::add_reordering(
+ rgw::io::add_buffering(cct,
+ rgw::io::add_chunking(
+ rgw::io::add_conlen_controlling(
+ &real_client))));
+ RGWRestfulIO client(cct, &real_client_io);
+ process_request(env.store, env.rest, &req, env.uri_prefix,
+ *env.auth_registry, &client, env.olog);
+ }
if (!parser.keep_alive()) {
return;
class AsioFrontend {
RGWProcessEnv env;
RGWFrontendConfig* conf;
- boost::asio::io_service service;
+ boost::asio::io_context context;
#ifdef WITH_RADOSGW_BEAST_OPENSSL
boost::optional<ssl::context> ssl_context;
int init_ssl();
#endif
+ SharedMutex pause_mutex;
struct Listener {
tcp::endpoint endpoint;
tcp::socket socket;
bool use_ssl = false;
- explicit Listener(boost::asio::io_service& service)
- : acceptor(service), socket(service) {}
+ explicit Listener(boost::asio::io_context& context)
+ : acceptor(context), socket(context) {}
};
std::vector<Listener> listeners;
+ // work guard to keep run() threads busy while listeners are paused
+ using Executor = boost::asio::io_context::executor_type;
+ std::optional<boost::asio::executor_work_guard<Executor>> work;
+
std::vector<std::thread> threads;
- Pauser pauser;
std::atomic<bool> going_down{false};
CephContext* ctx() const { return env.store->ctx(); }
public:
AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
- : env(env), conf(conf) {}
+ : env(env), conf(conf),
+ pause_mutex(context.get_executor())
+ {}
int init();
int run();
lderr(ctx()) << "failed to parse port=" << i->second << dendl;
return -ec.value();
}
- listeners.emplace_back(service);
+ listeners.emplace_back(context);
listeners.back().endpoint.port(port);
}
lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
return -ec.value();
}
- listeners.emplace_back(service);
+ listeners.emplace_back(context);
listeners.back().endpoint = endpoint;
}
lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
return -ec.value();
}
- listeners.emplace_back(service);
+ listeners.emplace_back(context);
listeners.back().endpoint.port(port);
listeners.back().use_ssl = true;
}
lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
return -ec.value();
}
- listeners.emplace_back(service);
+ listeners.emplace_back(context);
listeners.back().endpoint = endpoint;
listeners.back().use_ssl = true;
}
// spawn a coroutine to handle the connection
#ifdef WITH_RADOSGW_BEAST_OPENSSL
if (l.use_ssl) {
- boost::asio::spawn(service,
+ boost::asio::spawn(context,
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
// wrap the socket in an ssl stream
ssl::stream<tcp::socket&> stream{s, *ssl_context};
return;
}
buffer.consume(bytes);
- handle_connection(env, stream, buffer, true, ec, yield);
+ handle_connection(env, stream, buffer, true, pause_mutex, ec, yield);
if (!ec) {
// ssl shutdown (ignoring errors)
stream.async_shutdown(yield[ec]);
#else
{
#endif // WITH_RADOSGW_BEAST_OPENSSL
- boost::asio::spawn(service,
+ boost::asio::spawn(context,
[this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
beast::flat_buffer buffer;
boost::system::error_code ec;
- handle_connection(env, s, buffer, false, ec, yield);
+ handle_connection(env, s, buffer, false, pause_mutex, ec, yield);
s.shutdown(tcp::socket::shutdown_both, ec);
});
}
ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
+ // the worker threads call io_context::run(), which will return when there's
+ // no work left. hold a work guard to keep these threads going until join()
+ work.emplace(boost::asio::make_work_guard(context));
+
for (int i = 0; i < thread_count; i++) {
threads.emplace_back([=] {
- for (;;) {
- service.run();
- if (going_down) {
- break;
- }
- pauser.wait();
- }
+ boost::system::error_code ec;
+ context.run(ec);
});
}
return 0;
}
// unblock the run() threads
- service.stop();
+ context.stop(); // XXX: kill connections instead
}
void AsioFrontend::join()
if (!going_down) {
stop();
}
+ work.reset();
+
ldout(ctx(), 4) << "frontend joining threads..." << dendl;
for (auto& thread : threads) {
thread.join();
void AsioFrontend::pause()
{
- ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
- pauser.pause(threads.size(), [=] {
- // unblock the run() threads
- service.stop();
- });
- ldout(ctx(), 4) << "frontend paused" << dendl;
+ ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
+
+ // cancel pending calls to accept(), but don't close the sockets
+ boost::system::error_code ec;
+ for (auto& l : listeners) {
+ l.acceptor.cancel(ec);
+ }
+
+ // pause and wait for outstanding requests to complete
+ pause_mutex.lock(ec);
+
+ if (ec) {
+ ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
+ } else {
+ ldout(ctx(), 4) << "frontend paused" << dendl;
+ }
}
void AsioFrontend::unpause(RGWRados* const store,
{
env.store = store;
env.auth_registry = std::move(auth_registry);
+
+ // unpause to unblock connections
+ pause_mutex.unlock();
+
+ // start accepting connections again
+ for (auto& l : listeners) {
+ l.acceptor.async_accept(l.socket,
+ [this, &l] (boost::system::error_code ec) {
+ accept(l, ec);
+ });
+ }
+
ldout(ctx(), 4) << "frontend unpaused" << dendl;
- service.reset();
- pauser.unpause();
}
} // anonymous namespace