From: Samuel Just Date: Sat, 8 May 2021 01:58:14 +0000 (-0700) Subject: crimson/tools/store_nbd: ensure RequestWriter outlives any requests on error X-Git-Tag: v17.1.0~2000^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F41291%2Fhead;p=ceph.git crimson/tools/store_nbd: ensure RequestWriter outlives any requests on error Signed-off-by: Samuel Just --- diff --git a/src/crimson/tools/store_nbd/store-nbd.cc b/src/crimson/tools/store_nbd/store-nbd.cc index fa36d03bad70..e5286d25bc16 100644 --- a/src/crimson/tools/store_nbd/store-nbd.cc +++ b/src/crimson/tools/store_nbd/store-nbd.cc @@ -162,25 +162,31 @@ struct request_context_t { struct RequestWriter { seastar::rwlock lock; seastar::output_stream stream; - bool stopped = false; + std::optional> wait_pending; int pending = 0; RequestWriter( seastar::output_stream &&stream) : stream(std::move(stream)) {} RequestWriter(RequestWriter &&) = default; + void inc_pending() { + ++pending; + } + + void dec_pending() { + ceph_assert(pending > 0); + --pending; + if (pending == 0 && wait_pending) { + (*wait_pending).set_value(); + } + } + seastar::future<> complete(request_context_t::ref &&req) { - if (stopped) - throw std::system_error( - std::make_error_code( - std::errc::operation_canceled)); auto &request = *req; - ++pending; return lock.write_lock( ).then([&request, this] { return request.write_reply(stream); }).finally([&, this, req=std::move(req)] { - --pending; lock.write_unlock(); logger().debug("complete"); return seastar::now(); @@ -188,10 +194,14 @@ struct RequestWriter { } seastar::future<> close() { - stopped = true; - return lock.write_lock( - ).then([this] { - assert(pending == 0); + ceph_assert(!wait_pending); + auto do_wait_pending = seastar::now(); + if (pending > 0) { + wait_pending = seastar::promise<>(); + do_wait_pending = (*wait_pending).get_future(); + } + return do_wait_pending.then([this] { + ceph_assert(pending == 0); return stream.close(); }); } @@ -393,7 +403,14 @@ seastar::future<> handle_commands( auto &request = *request_ref; return request.read_request(in ).then([&, request_ref=std::move(request_ref)]() mutable { - static_cast(handle_command(backend, std::move(request_ref), out)); + out.inc_pending(); + static_cast( + handle_command( + backend, std::move(request_ref), out + ).finally([&out] { + out.dec_pending(); + }) + ); logger().debug("handle_commands after fork"); return seastar::now(); }); @@ -429,6 +446,7 @@ seastar::future<> NBDHandler::run() }).finally([&] { return output.close(); }).handle_exception([](auto e) { + logger().error("NBDHandler::run saw exception {}", e); return seastar::now(); }); });