struct RequestWriter {
seastar::rwlock lock;
seastar::output_stream<char> stream;
- std::optional<seastar::promise<>> wait_pending;
- int pending = 0;
+ seastar::gate gate;
RequestWriter(
seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
RequestWriter(RequestWriter &&) = default;
- void inc_pending() {
- ++pending;
- }
-
- void dec_pending() {
- ceph_assert(pending > 0);
- --pending;
- if (pending == 0 && wait_pending) {
- (*wait_pending).set_value();
- }
- }
-
seastar::future<> complete(request_context_t::ref &&req) {
auto &request = *req;
return lock.write_lock(
}
seastar::future<> close() {
- ceph_assert(!wait_pending);
- auto do_wait_pending = seastar::now();
- if (pending > 0) {
- wait_pending = seastar::promise<>();
- do_wait_pending = (*wait_pending).get_future();
- }
- return do_wait_pending.then([this] {
- ceph_assert(pending == 0);
+ return gate.close().then([this] {
return stream.close();
});
}
RequestWriter &out)
{
logger().debug("handle_commands");
- return seastar::keep_doing(
- [&] {
- logger().debug("waiting for command");
- auto request_ref = request_context_t::make_ref();
- auto &request = *request_ref;
- return request.read_request(in
- ).then([&, request_ref=std::move(request_ref)]() mutable {
- out.inc_pending();
- static_cast<void>(
- handle_command(
- backend, std::move(request_ref), out
- ).finally([&out] {
- out.dec_pending();
- })
- );
- logger().debug("handle_commands after fork");
- return seastar::now();
+ return seastar::keep_doing([&] {
+ logger().debug("waiting for command");
+ auto request_ref = request_context_t::make_ref();
+ auto &request = *request_ref;
+ return request.read_request(in).then(
+ [&, request_ref=std::move(request_ref)]() mutable {
+ // keep running in background
+ (void)seastar::try_with_gate(out.gate,
+ [&backend, &out, request_ref=std::move(request_ref)]() mutable {
+ return handle_command(backend, std::move(request_ref), out);
});
+ logger().debug("handle_commands after fork");
});
+ }).handle_exception_type([](const seastar::gate_closed_exception&) {});
}
void NBDHandler::run()