]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
store-nbd: support concurrent operations
authorSamuel Just <sjust@redhat.com>
Tue, 8 Dec 2020 23:49:34 +0000 (15:49 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 20:48:39 +0000 (12:48 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/tools/store-nbd.cc

index cdf853d15c1a3e3fe6f34215caea76d2bdd4214d..839daff8dc6607581ea24edd3a2c2ac0a5d2bb3f 100644 (file)
@@ -36,6 +36,7 @@
 #include <linux/fs.h>
 
 #include <seastar/core/byteorder.hh>
+#include <seastar/core/rwlock.hh>
 
 #include "crimson/os/seastore/cache.h"
 #include "crimson/os/seastore/segment_cleaner.h"
@@ -146,6 +147,11 @@ struct request_context_t {
   std::optional<bufferptr> in_buffer;
   std::optional<bufferlist> out_buffer;
 
+  using ref = std::unique_ptr<request_context_t>;
+  static ref make_ref() {
+    return std::make_unique<request_context_t>();
+  }
+
   bool check_magic() const {
     // todo
     return true;
@@ -190,12 +196,14 @@ struct request_context_t {
     auto p = buffer.get_write();
     seastar::produce_be<uint32_t>(p, NBD_REPLY_MAGIC);
     seastar::produce_be<uint32_t>(p, err);
+    logger().debug("write_reply writing err {}", err);
     memcpy(p, handle, sizeof(handle));
     return out.write(std::move(buffer)).then([this, &out] {
       if (out_buffer) {
         return seastar::do_for_each(
           out_buffer->mut_buffers(),
           [&out](bufferptr &ptr) {
+           logger().debug("write_reply writing {}", ptr.length());
             return out.write(
              seastar::temporary_buffer<char>(
                ptr.c_str(),
@@ -212,6 +220,44 @@ struct request_context_t {
   }
 };
 
+struct RequestWriter {
+  seastar::rwlock lock;
+  seastar::output_stream<char> stream;
+  bool stopped = false;
+  int pending = 0;
+
+  RequestWriter(
+    seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
+  RequestWriter(RequestWriter &&) = default;
+
+  seastar::future<> complete(request_context_t::ref &&req) {
+    if (stopped)
+      throw std::system_error(
+       std::make_error_code(
+         std::errc::operation_canceled));
+    auto &request = *req;
+    ++pending;
+    return lock.write_lock(
+    ).then([&request, this] {
+      return request.write_reply(stream);
+    }).finally([&, this, req=std::move(req)] {
+      --pending;
+      lock.write_unlock();
+      logger().debug("complete");
+      return seastar::now();
+    });
+  }
+
+  seastar::future<> close() {
+    stopped = true;
+    return lock.write_lock(
+    ).then([this] {
+      assert(pending == 0);
+      return stream.close();
+    });
+  }
+};
+
 /**
  * NBDHandler
  *
@@ -354,21 +400,23 @@ seastar::future<> send_negotiation(
 
 seastar::future<> handle_command(
   BlockDriver &backend,
-  request_context_t &context,
-  seastar::output_stream<char> &out)
+  request_context_t::ref request_ref,
+  RequestWriter &out)
 {
-  logger().debug("got command {}", context.get_command());
+  auto &request = *request_ref;
+  logger().debug("got command {}", request.get_command());
   return ([&] {
-    switch (context.get_command()) {
+    switch (request.get_command()) {
     case NBD_CMD_WRITE:
       return backend.write(
-       context.from,
-       *context.in_buffer);
+       request.from,
+       *request.in_buffer);
     case NBD_CMD_READ:
       return backend.read(
-       context.from,
-       context.len).then([&context] (auto buffer) {
-         context.out_buffer = buffer;
+       request.from,
+       request.len).then([&] (auto buffer) {
+         logger().debug("read returned buffer len {}", buffer.length());
+         request.out_buffer = buffer;
        });
     case NBD_CMD_DISC:
       throw std::system_error(std::make_error_code(std::errc::bad_message));
@@ -377,9 +425,9 @@ seastar::future<> handle_command(
     default:
       throw std::system_error(std::make_error_code(std::errc::bad_message));
     }
-  })().then([&] {
-    logger().debug("Writing reply");
-    return context.write_reply(out);
+  })().then([&, request_ref=std::move(request_ref)]() mutable {
+    logger().debug("handle_command complete");
+    return out.complete(std::move(request_ref));
   });
 }
 
@@ -387,19 +435,19 @@ seastar::future<> handle_command(
 seastar::future<> handle_commands(
   BlockDriver &backend,
   seastar::input_stream<char>& in,
-  seastar::output_stream<char>& out)
+  RequestWriter &out)
 {
   logger().debug("handle_commands");
   return seastar::keep_doing(
     [&] {
       logger().debug("waiting for command");
-      auto request_ref = std::make_unique<request_context_t>();
+      auto request_ref = request_context_t::make_ref();
       auto &request = *request_ref;
       return request.read_request(in
-      ).then([&] {
-       return handle_command(backend, request, out);
-      }).then([req=std::move(request_ref)] {
-       logger().debug("complete");
+      ).then([&, request_ref=std::move(request_ref)]() mutable {
+       static_cast<void>(handle_command(backend, std::move(request_ref), out));
+       logger().debug("handle_commands after fork");
+       return seastar::now();
       });
     });
 }
@@ -421,11 +469,11 @@ seastar::future<> NBDHandler::run()
              [this](auto &conn) {
                return seastar::do_with(
                  conn.input(),
-                 conn.output(),
+                 RequestWriter{conn.output()},
                  [&, this](auto &input, auto &output) {
                    return send_negotiation(
                      backend.get_size(),
-                     output
+                     output.stream
                    ).then([&, this] {
                      return handle_commands(backend, input, output);
                    }).finally([&] {
@@ -465,31 +513,41 @@ public:
     logger().debug("Writing offset {}", offset);
     assert(offset % segment_manager->get_block_size() == 0);
     assert(ptr.length() == (size_t)segment_manager->get_block_size());
-    return seastar::do_with(
-      tm->create_transaction(),
-      std::move(ptr),
-      [this, offset](auto &t, auto &ptr) {
-       return tm->dec_ref(
-         *t,
-         offset
-       ).safe_then([](auto){}).handle_error(
-         crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
-         crimson::ct_error::pass_further_all{}
-       ).safe_then([=, &t, &ptr] {
-         logger().debug("dec_ref complete");
-         return tm->alloc_extent<TestBlock>(
-           *t,
-           offset,
-           ptr.length());
-       }).safe_then([=, &t, &ptr](auto ext) mutable {
-         assert(ext->get_laddr() == (size_t)offset);
-         assert(ext->get_bptr().length() == ptr.length());
-         ext->get_bptr().swap(ptr);
-         logger().debug("submitting transaction");
-         return tm->submit_transaction(std::move(t));
-       });
+    return crimson::do_until(
+      [this, offset, ptr=std::move(ptr)] {
+       return seastar::do_with(
+         tm->create_transaction(),
+         ptr,
+         [this, offset](auto &t, auto &ptr) mutable {
+           return tm->dec_ref(
+             *t,
+             offset
+           ).safe_then([](auto){}).handle_error(
+             crimson::ct_error::enoent::handle([](auto) { return seastar::now(); }),
+             crimson::ct_error::pass_further_all{}
+           ).safe_then([=, &t, &ptr] {
+             logger().debug("dec_ref complete");
+             return tm->alloc_extent<TestBlock>(
+               *t,
+               offset,
+               ptr.length());
+           }).safe_then([=, &t, &ptr](auto ext) mutable {
+             assert(ext->get_laddr() == (size_t)offset);
+             assert(ext->get_bptr().length() == ptr.length());
+             ext->get_bptr().swap(ptr);
+             logger().debug("submitting transaction");
+             return tm->submit_transaction(std::move(t));
+           }).safe_then([] {
+             return true;
+           }).handle_error(
+             [](const crimson::ct_error::eagain &e) {
+               return seastar::make_ready_future<bool>(false);
+             },
+             crimson::ct_error::pass_further_all{}
+           );
+         });
       }).handle_error(
-       crimson::ct_error::assert_all{}
+       crimson::ct_error::assert_all{"store-nbd write"}
       );
   }
 
@@ -499,31 +557,43 @@ public:
     logger().debug("Reading offset {}", offset);
     assert(offset % segment_manager->get_block_size() == 0);
     assert(size % (size_t)segment_manager->get_block_size() == 0);
-    return seastar::do_with(
-      tm->create_transaction(),
-      [this, offset, size](auto &t) {
-       return tm->read_extents<TestBlock>(*t, offset, size
-       ).safe_then([=](auto ext_list) mutable {
-         size_t cur = offset;
-         bufferlist bl;
-         for (auto &i: ext_list) {
-           if (cur != i.first) {
-             assert(cur < i.first);
-             bl.append_zero(i.first - cur);
-             cur = i.first;
-           }
-           bl.append(i.second->get_bptr());
-           cur += i.second->get_bptr().length();
-         }
-         if (bl.length() != size) {
-           assert(bl.length() < size);
-           bl.append_zero(size - bl.length());
-         }
-         return seastar::make_ready_future<bufferlist>(std::move(bl));
-       });
+    auto blptrret = std::make_unique<bufferlist>();
+    auto &blret = *blptrret;
+    return crimson::do_until(
+      [=, &blret] {
+       return seastar::do_with(
+         tm->create_transaction(),
+         [=, &blret](auto &t) {
+           return tm->read_extents<TestBlock>(*t, offset, size
+           ).safe_then([=, &blret](auto ext_list) mutable {
+             size_t cur = offset;
+             for (auto &i: ext_list) {
+               if (cur != i.first) {
+                 assert(cur < i.first);
+                 blret.append_zero(i.first - cur);
+                 cur = i.first;
+               }
+               blret.append(i.second->get_bptr());
+               cur += i.second->get_bptr().length();
+             }
+             if (blret.length() != size) {
+               assert(blret.length() < size);
+               blret.append_zero(size - blret.length());
+             }
+             return seastar::make_ready_future<bool>(true);
+           }).handle_error(
+             [](const crimson::ct_error::eagain &e) {
+               return seastar::make_ready_future<bool>(false);
+             },
+             crimson::ct_error::pass_further_all{}
+           );
+         });
       }).handle_error(
-       crimson::ct_error::assert_all{}
-      );
+       crimson::ct_error::assert_all{"store-nbd read"}
+      ).then([blptrret=std::move(blptrret)]() mutable {
+       logger().debug("read complete");
+       return std::move(*blptrret);
+      });
   }
 
   void init() {