}).discard_result();
}
-/**
- * \brief continuously run a block of code "within a gate"
- *
- * Neither gate closure, nor a failure of the code block we are running, will
- * cause an exception to be thrown by safe_action_gate_func()
- */
-template <typename AsyncAction>
-seastar::future<> do_until_gate(seastar::gate& gt, AsyncAction action)
-{
- auto stop_cond = [>] { return gt.is_closed(); };
- auto safe_action{ [act = std::move(action), >]() mutable {
- if (gt.is_closed())
- return seastar::make_ready_future<>();
- return with_gate(gt, [act = std::move(act)] {
- return act().handle_exception([](auto e) {});
- });
- } };
-
- return seastar::do_until(stop_cond, std::move(safe_action)).discard_result();
+namespace {
+struct connection_t {
+ seastar::connected_socket cs;
+ seastar::input_stream<char> input;
+ seastar::output_stream<char> output;
+ connection_t(seastar::connected_socket&& s) :
+ cs(std::move(s)), input(cs.input()), output(cs.output())
+ {}
+};
}
seastar::future<> AdminSocket::start(const std::string& path)
logger().debug("{}: asok socket path={}", __func__, path);
auto sock_path = seastar::socket_address{ seastar::unix_domain_addr{ path } };
-
- std::ignore = return seastar::do_with(
- seastar::engine().listen(sock_path),
- [this](seastar::server_socket& lstn) {
- m_server_sock = &lstn; // used for 'abort_accept()'
- return do_until_gate(arrivals_gate, [&lstn, this] {
- return lstn.accept().then(
- [this](seastar::accept_result from_accept) {
- seastar::connected_socket cn =
- std::move(from_accept.connection);
- return do_with(cn.input(), cn.output(), std::move(cn),
- [this](auto& inp, auto& out, auto& cn) {
- return handle_client(inp, out).finally([] {
- ; // left for debugging
- });
- });
- });
- }).then([] {
- logger().debug("AdminSocket::init(): admin-sock thread terminated");
- return seastar::now();
+ server_sock = seastar::engine().listen(sock_path);
+ // listen in background
+ std::ignore = seastar::do_until(
+ [this] { return stop_gate.is_closed(); },
+ [this] {
+ return seastar::with_gate(stop_gate, [this] {
+ return server_sock->accept().then([this](seastar::accept_result acc) {
+ return seastar::do_with(connection_t{std::move(acc.connection)},
+ [this](auto& conn) mutable {
+ return handle_client(conn.input, conn.output);
});
+ }).handle_exception([this](auto ep) {
+ if (!stop_gate.is_closed()) {
+ logger().error("AdminSocket: terminated: {}", ep);
+ }
+ });
});
- });
+ }).then([] {
+ logger().debug("AdminSocket::init(): admin-sock thread terminated");
+ return seastar::now();
+ });
return seastar::make_ready_future<>();
}
seastar::future<> AdminSocket::stop()
{
- if (m_server_sock && !arrivals_gate.is_closed()) {
- // note that we check 'is_closed()' as if already closed - the server-sock
- // may have already been discarded
- m_server_sock->abort_accept();
- m_server_sock = nullptr;
+ if (!server_sock) {
+ return seastar::now();
}
-
- return seastar::futurize_apply([this] { return arrivals_gate.close(); })
- .then_wrapped([](seastar::future<> res) {
- if (res.failed()) {
- std::ignore = res.handle_exception([](std::exception_ptr eptr) {
- return seastar::make_ready_future<>();
- });
- }
- return seastar::make_ready_future<>();
- }).handle_exception(
- [](std::exception_ptr eptr) { return seastar::make_ready_future<>();
- }).finally([] { return seastar::make_ready_future<>(); });
+ server_sock->abort_accept();
+ return stop_gate.close();
}
/////////////////////////////////////////