From 37bc5b348682a69553ac733fa25a7ebafcf339e8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 7 May 2021 18:58:14 -0700 Subject: [PATCH] crimson/tools/store_nbd: ensure RequestWriter outlives any requests on error Signed-off-by: Samuel Just --- src/crimson/tools/store_nbd/store-nbd.cc | 42 +++++++++++++++++------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/crimson/tools/store_nbd/store-nbd.cc b/src/crimson/tools/store_nbd/store-nbd.cc index fa36d03bad7..e5286d25bc1 100644 --- a/src/crimson/tools/store_nbd/store-nbd.cc +++ b/src/crimson/tools/store_nbd/store-nbd.cc @@ -162,25 +162,31 @@ struct request_context_t { struct RequestWriter { seastar::rwlock lock; seastar::output_stream stream; - bool stopped = false; + std::optional> wait_pending; int pending = 0; RequestWriter( seastar::output_stream &&stream) : stream(std::move(stream)) {} RequestWriter(RequestWriter &&) = default; + void inc_pending() { + ++pending; + } + + void dec_pending() { + ceph_assert(pending > 0); + --pending; + if (pending == 0 && wait_pending) { + (*wait_pending).set_value(); + } + } + seastar::future<> complete(request_context_t::ref &&req) { - if (stopped) - throw std::system_error( - std::make_error_code( - std::errc::operation_canceled)); auto &request = *req; - ++pending; return lock.write_lock( ).then([&request, this] { return request.write_reply(stream); }).finally([&, this, req=std::move(req)] { - --pending; lock.write_unlock(); logger().debug("complete"); return seastar::now(); @@ -188,10 +194,14 @@ struct RequestWriter { } seastar::future<> close() { - stopped = true; - return lock.write_lock( - ).then([this] { - assert(pending == 0); + ceph_assert(!wait_pending); + auto do_wait_pending = seastar::now(); + if (pending > 0) { + wait_pending = seastar::promise<>(); + do_wait_pending = (*wait_pending).get_future(); + } + return do_wait_pending.then([this] { + ceph_assert(pending == 0); return stream.close(); }); } @@ -393,7 +403,14 @@ seastar::future<> handle_commands( auto &request = *request_ref; return request.read_request(in ).then([&, request_ref=std::move(request_ref)]() mutable { - static_cast(handle_command(backend, std::move(request_ref), out)); + out.inc_pending(); + static_cast( + handle_command( + backend, std::move(request_ref), out + ).finally([&out] { + out.dec_pending(); + }) + ); logger().debug("handle_commands after fork"); return seastar::now(); }); @@ -429,6 +446,7 @@ seastar::future<> NBDHandler::run() }).finally([&] { return output.close(); }).handle_exception([](auto e) { + logger().error("NBDHandler::run saw exception {}", e); return seastar::now(); }); }); -- 2.39.5