struct RequestWriter {
seastar::rwlock lock;
seastar::output_stream<char> stream;
- bool stopped = false;
+ std::optional<seastar::promise<>> wait_pending;
int pending = 0;
RequestWriter(
seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
RequestWriter(RequestWriter &&) = default;
+ void inc_pending() {
+ ++pending;
+ }
+
+ void dec_pending() {
+ ceph_assert(pending > 0);
+ --pending;
+ if (pending == 0 && wait_pending) {
+ (*wait_pending).set_value();
+ }
+ }
+
seastar::future<> complete(request_context_t::ref &&req) {
- if (stopped)
- throw std::system_error(
- std::make_error_code(
- std::errc::operation_canceled));
auto &request = *req;
- ++pending;
return lock.write_lock(
).then([&request, this] {
return request.write_reply(stream);
}).finally([&, this, req=std::move(req)] {
- --pending;
lock.write_unlock();
logger().debug("complete");
return seastar::now();
}
seastar::future<> close() {
- stopped = true;
- return lock.write_lock(
- ).then([this] {
- assert(pending == 0);
+ ceph_assert(!wait_pending);
+ auto do_wait_pending = seastar::now();
+ if (pending > 0) {
+ wait_pending = seastar::promise<>();
+ do_wait_pending = (*wait_pending).get_future();
+ }
+ return do_wait_pending.then([this] {
+ ceph_assert(pending == 0);
return stream.close();
});
}
auto &request = *request_ref;
return request.read_request(in
).then([&, request_ref=std::move(request_ref)]() mutable {
- static_cast<void>(handle_command(backend, std::move(request_ref), out));
+ out.inc_pending();
+ static_cast<void>(
+ handle_command(
+ backend, std::move(request_ref), out
+ ).finally([&out] {
+ out.dec_pending();
+ })
+ );
logger().debug("handle_commands after fork");
return seastar::now();
});
}).finally([&] {
return output.close();
}).handle_exception([](auto e) {
+ logger().error("NBDHandler::run saw exception {}", e);
return seastar::now();
});
});