]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/tools/store_nbd: ensure RequestWriter outlives any requests on error 41291/head
authorSamuel Just <sjust@redhat.com>
Sat, 8 May 2021 01:58:14 +0000 (18:58 -0700)
committerSamuel Just <sjust@redhat.com>
Wed, 12 May 2021 04:15:57 +0000 (21:15 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/tools/store_nbd/store-nbd.cc

index fa36d03bad70e6f3c80bd2eddc6d2e34506da0ba..e5286d25bc1615d8724c278c79b2f733e006023f 100644 (file)
@@ -162,25 +162,31 @@ struct request_context_t {
 struct RequestWriter {
   seastar::rwlock lock;
   seastar::output_stream<char> stream;
-  bool stopped = false;
+  std::optional<seastar::promise<>> wait_pending;
   int pending = 0;
 
   RequestWriter(
     seastar::output_stream<char> &&stream) : stream(std::move(stream)) {}
   RequestWriter(RequestWriter &&) = default;
 
+  void inc_pending() {
+    ++pending;
+  }
+
+  void dec_pending() {
+    ceph_assert(pending > 0);
+    --pending;
+    if (pending == 0 && wait_pending) {
+      (*wait_pending).set_value();
+    }
+  }
+
   seastar::future<> complete(request_context_t::ref &&req) {
-    if (stopped)
-      throw std::system_error(
-       std::make_error_code(
-         std::errc::operation_canceled));
     auto &request = *req;
-    ++pending;
     return lock.write_lock(
     ).then([&request, this] {
       return request.write_reply(stream);
     }).finally([&, this, req=std::move(req)] {
-      --pending;
       lock.write_unlock();
       logger().debug("complete");
       return seastar::now();
@@ -188,10 +194,14 @@ struct RequestWriter {
   }
 
   seastar::future<> close() {
-    stopped = true;
-    return lock.write_lock(
-    ).then([this] {
-      assert(pending == 0);
+    ceph_assert(!wait_pending);
+    auto do_wait_pending = seastar::now();
+    if (pending > 0) {
+      wait_pending = seastar::promise<>();
+      do_wait_pending = (*wait_pending).get_future();
+    }
+    return do_wait_pending.then([this] {
+      ceph_assert(pending == 0);
       return stream.close();
     });
   }
@@ -393,7 +403,14 @@ seastar::future<> handle_commands(
       auto &request = *request_ref;
       return request.read_request(in
       ).then([&, request_ref=std::move(request_ref)]() mutable {
-       static_cast<void>(handle_command(backend, std::move(request_ref), out));
+       out.inc_pending();
+       static_cast<void>(
+         handle_command(
+           backend, std::move(request_ref), out
+         ).finally([&out] {
+           out.dec_pending();
+         })
+       );
        logger().debug("handle_commands after fork");
        return seastar::now();
       });
@@ -429,6 +446,7 @@ seastar::future<> NBDHandler::run()
                    }).finally([&] {
                      return output.close();
                    }).handle_exception([](auto e) {
+                     logger().error("NBDHandler::run saw exception {}", e);
                      return seastar::now();
                    });
                  });