From 568a332fba8022f463b41ccdabf41a6ecf346c2d Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 8 Dec 2020 15:49:34 -0800 Subject: [PATCH] store-nbd: support concurrent operations Signed-off-by: Samuel Just --- src/crimson/tools/store-nbd.cc | 206 ++++++++++++++++++++++----------- 1 file changed, 138 insertions(+), 68 deletions(-) diff --git a/src/crimson/tools/store-nbd.cc b/src/crimson/tools/store-nbd.cc index cdf853d15c1..839daff8dc6 100644 --- a/src/crimson/tools/store-nbd.cc +++ b/src/crimson/tools/store-nbd.cc @@ -36,6 +36,7 @@ #include #include +#include #include "crimson/os/seastore/cache.h" #include "crimson/os/seastore/segment_cleaner.h" @@ -146,6 +147,11 @@ struct request_context_t { std::optional in_buffer; std::optional out_buffer; + using ref = std::unique_ptr; + static ref make_ref() { + return std::make_unique(); + } + bool check_magic() const { // todo return true; @@ -190,12 +196,14 @@ struct request_context_t { auto p = buffer.get_write(); seastar::produce_be(p, NBD_REPLY_MAGIC); seastar::produce_be(p, err); + logger().debug("write_reply writing err {}", err); memcpy(p, handle, sizeof(handle)); return out.write(std::move(buffer)).then([this, &out] { if (out_buffer) { return seastar::do_for_each( out_buffer->mut_buffers(), [&out](bufferptr &ptr) { + logger().debug("write_reply writing {}", ptr.length()); return out.write( seastar::temporary_buffer( ptr.c_str(), @@ -212,6 +220,44 @@ struct request_context_t { } }; +struct RequestWriter { + seastar::rwlock lock; + seastar::output_stream stream; + bool stopped = false; + int pending = 0; + + RequestWriter( + seastar::output_stream &&stream) : stream(std::move(stream)) {} + RequestWriter(RequestWriter &&) = default; + + seastar::future<> complete(request_context_t::ref &&req) { + if (stopped) + throw std::system_error( + std::make_error_code( + std::errc::operation_canceled)); + auto &request = *req; + ++pending; + return lock.write_lock( + ).then([&request, this] { + return request.write_reply(stream); + }).finally([&, this, req=std::move(req)] { + --pending; + lock.write_unlock(); + logger().debug("complete"); + return seastar::now(); + }); + } + + seastar::future<> close() { + stopped = true; + return lock.write_lock( + ).then([this] { + assert(pending == 0); + return stream.close(); + }); + } +}; + /** * NBDHandler * @@ -354,21 +400,23 @@ seastar::future<> send_negotiation( seastar::future<> handle_command( BlockDriver &backend, - request_context_t &context, - seastar::output_stream &out) + request_context_t::ref request_ref, + RequestWriter &out) { - logger().debug("got command {}", context.get_command()); + auto &request = *request_ref; + logger().debug("got command {}", request.get_command()); return ([&] { - switch (context.get_command()) { + switch (request.get_command()) { case NBD_CMD_WRITE: return backend.write( - context.from, - *context.in_buffer); + request.from, + *request.in_buffer); case NBD_CMD_READ: return backend.read( - context.from, - context.len).then([&context] (auto buffer) { - context.out_buffer = buffer; + request.from, + request.len).then([&] (auto buffer) { + logger().debug("read returned buffer len {}", buffer.length()); + request.out_buffer = buffer; }); case NBD_CMD_DISC: throw std::system_error(std::make_error_code(std::errc::bad_message)); @@ -377,9 +425,9 @@ seastar::future<> handle_command( default: throw std::system_error(std::make_error_code(std::errc::bad_message)); } - })().then([&] { - logger().debug("Writing reply"); - return context.write_reply(out); + })().then([&, request_ref=std::move(request_ref)]() mutable { + logger().debug("handle_command complete"); + return out.complete(std::move(request_ref)); }); } @@ -387,19 +435,19 @@ seastar::future<> handle_command( seastar::future<> handle_commands( BlockDriver &backend, seastar::input_stream& in, - seastar::output_stream& out) + RequestWriter &out) { logger().debug("handle_commands"); return seastar::keep_doing( [&] { logger().debug("waiting for command"); - auto request_ref = std::make_unique(); + auto request_ref = request_context_t::make_ref(); auto &request = *request_ref; return request.read_request(in - ).then([&] { - return handle_command(backend, request, out); - }).then([req=std::move(request_ref)] { - logger().debug("complete"); + ).then([&, request_ref=std::move(request_ref)]() mutable { + static_cast(handle_command(backend, std::move(request_ref), out)); + logger().debug("handle_commands after fork"); + return seastar::now(); }); }); } @@ -421,11 +469,11 @@ seastar::future<> NBDHandler::run() [this](auto &conn) { return seastar::do_with( conn.input(), - conn.output(), + RequestWriter{conn.output()}, [&, this](auto &input, auto &output) { return send_negotiation( backend.get_size(), - output + output.stream ).then([&, this] { return handle_commands(backend, input, output); }).finally([&] { @@ -465,31 +513,41 @@ public: logger().debug("Writing offset {}", offset); assert(offset % segment_manager->get_block_size() == 0); assert(ptr.length() == (size_t)segment_manager->get_block_size()); - return seastar::do_with( - tm->create_transaction(), - std::move(ptr), - [this, offset](auto &t, auto &ptr) { - return tm->dec_ref( - *t, - offset - ).safe_then([](auto){}).handle_error( - crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }), - crimson::ct_error::pass_further_all{} - ).safe_then([=, &t, &ptr] { - logger().debug("dec_ref complete"); - return tm->alloc_extent( - *t, - offset, - ptr.length()); - }).safe_then([=, &t, &ptr](auto ext) mutable { - assert(ext->get_laddr() == (size_t)offset); - assert(ext->get_bptr().length() == ptr.length()); - ext->get_bptr().swap(ptr); - logger().debug("submitting transaction"); - return tm->submit_transaction(std::move(t)); - }); + return crimson::do_until( + [this, offset, ptr=std::move(ptr)] { + return seastar::do_with( + tm->create_transaction(), + ptr, + [this, offset](auto &t, auto &ptr) mutable { + return tm->dec_ref( + *t, + offset + ).safe_then([](auto){}).handle_error( + crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }), + crimson::ct_error::pass_further_all{} + ).safe_then([=, &t, &ptr] { + logger().debug("dec_ref complete"); + return tm->alloc_extent( + *t, + offset, + ptr.length()); + }).safe_then([=, &t, &ptr](auto ext) mutable { + assert(ext->get_laddr() == (size_t)offset); + assert(ext->get_bptr().length() == ptr.length()); + ext->get_bptr().swap(ptr); + logger().debug("submitting transaction"); + return tm->submit_transaction(std::move(t)); + }).safe_then([] { + return true; + }).handle_error( + [](const crimson::ct_error::eagain &e) { + return seastar::make_ready_future(false); + }, + crimson::ct_error::pass_further_all{} + ); + }); }).handle_error( - crimson::ct_error::assert_all{} + crimson::ct_error::assert_all{"store-nbd write"} ); } @@ -499,31 +557,43 @@ public: logger().debug("Reading offset {}", offset); assert(offset % segment_manager->get_block_size() == 0); assert(size % (size_t)segment_manager->get_block_size() == 0); - return seastar::do_with( - tm->create_transaction(), - [this, offset, size](auto &t) { - return tm->read_extents(*t, offset, size - ).safe_then([=](auto ext_list) mutable { - size_t cur = offset; - bufferlist bl; - for (auto &i: ext_list) { - if (cur != i.first) { - assert(cur < i.first); - bl.append_zero(i.first - cur); - cur = i.first; - } - bl.append(i.second->get_bptr()); - cur += i.second->get_bptr().length(); - } - if (bl.length() != size) { - assert(bl.length() < size); - bl.append_zero(size - bl.length()); - } - return seastar::make_ready_future(std::move(bl)); - }); + auto blptrret = std::make_unique(); + auto &blret = *blptrret; + return crimson::do_until( + [=, &blret] { + return seastar::do_with( + tm->create_transaction(), + [=, &blret](auto &t) { + return tm->read_extents(*t, offset, size + ).safe_then([=, &blret](auto ext_list) mutable { + size_t cur = offset; + for (auto &i: ext_list) { + if (cur != i.first) { + assert(cur < i.first); + blret.append_zero(i.first - cur); + cur = i.first; + } + blret.append(i.second->get_bptr()); + cur += i.second->get_bptr().length(); + } + if (blret.length() != size) { + assert(blret.length() < size); + blret.append_zero(size - blret.length()); + } + return seastar::make_ready_future(true); + }).handle_error( + [](const crimson::ct_error::eagain &e) { + return seastar::make_ready_future(false); + }, + crimson::ct_error::pass_further_all{} + ); + }); }).handle_error( - crimson::ct_error::assert_all{} - ); + crimson::ct_error::assert_all{"store-nbd read"} + ).then([blptrret=std::move(blptrret)]() mutable { + logger().debug("read complete"); + return std::move(*blptrret); + }); } void init() { -- 2.39.5