]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-nbd: make sure all pending io is processed before exiting
authorMykola Golub <mgolub@suse.com>
Wed, 23 Sep 2020 17:08:58 +0000 (18:08 +0100)
committerMykola Golub <mgolub@suse.com>
Wed, 7 Oct 2020 12:26:07 +0000 (13:26 +0100)
Signed-off-by: Mykola Golub <mgolub@suse.com>
src/tools/rbd_nbd/rbd-nbd.cc

index 6ca68a49c475eb599501c6282ada1267b4af42da..13ae55fe88b9a1cca42d7d35198972213104d79e 100644 (file)
@@ -224,17 +224,6 @@ private:
   std::atomic<bool> terminated = { false };
   std::atomic<bool> allow_internal_flush = { false };
 
-  void shutdown()
-  {
-    bool expected = false;
-    if (terminated.compare_exchange_strong(expected, true)) {
-      ::shutdown(fd, SHUT_RDWR);
-
-      std::lock_guard l{lock};
-      cond.notify_all();
-    }
-  }
-
   struct IOContext
   {
     xlist<IOContext*>::item item;
@@ -274,7 +263,10 @@ private:
   IOContext *wait_io_finish()
   {
     std::unique_lock l{lock};
-    cond.wait(l, [this] { return !io_finished.empty() || terminated; });
+    cond.wait(l, [this] {
+                   return !io_finished.empty() ||
+                          (io_pending.empty() && terminated);
+                 });
 
     if (io_finished.empty())
       return NULL;
@@ -287,7 +279,6 @@ private:
 
   void wait_clean()
   {
-    ceph_assert(!reader_thread.is_started());
     std::unique_lock l{lock};
     cond.wait(l, [this] { return io_pending.empty(); });
 
@@ -297,6 +288,16 @@ private:
     }
   }
 
+  void assert_clean()
+  {
+    std::unique_lock l{lock};
+
+    ceph_assert(!reader_thread.is_started());
+    ceph_assert(!writer_thread.is_started());
+    ceph_assert(io_pending.empty());
+    ceph_assert(io_finished.empty());
+  }
+
   static void aio_callback(librbd::completion_t cb, void *arg)
   {
     librbd::RBD::AioCompletion *aio_completion =
@@ -339,7 +340,7 @@ private:
     poll_fds[0].fd = fd;
     poll_fds[0].events = POLLIN;
 
-    while (!terminated) {
+    while (true) {
       std::unique_ptr<IOContext> ctx(new IOContext());
       ctx->server = this;
 
@@ -435,21 +436,25 @@ private:
           goto signal;
       }
     }
-    dout(20) << __func__ << ": terminated" << dendl;
-
 signal:
-    std::lock_guard l{disconnect_lock};
+    std::lock_guard l{lock};
+    terminated = true;
+    cond.notify_all();
+
+    std::lock_guard disconnect_l{disconnect_lock};
     disconnect_cond.notify_all();
+
+    dout(20) << __func__ << ": terminated" << dendl;
   }
 
   void writer_entry()
   {
-    while (!terminated) {
+    while (true) {
       dout(20) << __func__ << ": waiting for io request" << dendl;
       std::unique_ptr<IOContext> ctx(wait_io_finish());
       if (!ctx) {
        dout(20) << __func__ << ": no io requests, terminating" << dendl;
-        return;
+        goto done;
       }
 
       dout(20) << __func__ << ": got: " << *ctx << dendl;
@@ -458,18 +463,23 @@ signal:
       if (r < 0) {
        derr << *ctx << ": failed to write reply header: " << cpp_strerror(r)
             << dendl;
-        return;
+        goto error;
       }
       if (ctx->command == NBD_CMD_READ && ctx->reply.error == htonl(0)) {
        r = ctx->data.write_fd(fd);
         if (r < 0) {
          derr << *ctx << ": failed to write replay data: " << cpp_strerror(r)
               << dendl;
-          return;
+          goto error;
        }
       }
       dout(20) << *ctx << ": finish" << dendl;
     }
+  error:
+    wait_clean();
+  done:
+    ::shutdown(fd, SHUT_RDWR);
+
     dout(20) << __func__ << ": terminated" << dendl;
   }
 
@@ -544,6 +554,8 @@ signal:
 
       run_quiesce_hook(cfg->quiesce_hook, cfg->devpath, "unquiesce");
     }
+
+    dout(20) << __func__ << ": terminated" << dendl;
   }
 
   class ThreadHelper : public Thread
@@ -562,7 +574,6 @@ signal:
     void* entry() override
     {
       (server.*func)();
-      server.shutdown();
       return NULL;
     }
   } reader_thread, writer_thread, quiesce_thread;
@@ -622,15 +633,14 @@ public:
     if (started) {
       dout(10) << __func__ << ": terminating" << dendl;
 
-      shutdown();
-
+      nbd_terminate = true;
       reader_thread.join();
       writer_thread.join();
       if (cfg->quiesce) {
         quiesce_thread.join();
       }
 
-      wait_clean();
+      assert_clean();
 
       started = false;
     }