From: Mykola Golub Date: Wed, 23 Sep 2020 17:08:58 +0000 (+0100) Subject: rbd-nbd: make sure all pending io is processed before exiting X-Git-Tag: v16.1.0~754^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=20670fab51de77ffec4203b3117d17e1a4fbb574;p=ceph.git rbd-nbd: make sure all pending io is processed before exiting Signed-off-by: Mykola Golub --- diff --git a/src/tools/rbd_nbd/rbd-nbd.cc b/src/tools/rbd_nbd/rbd-nbd.cc index 6ca68a49c475..13ae55fe88b9 100644 --- a/src/tools/rbd_nbd/rbd-nbd.cc +++ b/src/tools/rbd_nbd/rbd-nbd.cc @@ -224,17 +224,6 @@ private: std::atomic terminated = { false }; std::atomic allow_internal_flush = { false }; - void shutdown() - { - bool expected = false; - if (terminated.compare_exchange_strong(expected, true)) { - ::shutdown(fd, SHUT_RDWR); - - std::lock_guard l{lock}; - cond.notify_all(); - } - } - struct IOContext { xlist::item item; @@ -274,7 +263,10 @@ private: IOContext *wait_io_finish() { std::unique_lock l{lock}; - cond.wait(l, [this] { return !io_finished.empty() || terminated; }); + cond.wait(l, [this] { + return !io_finished.empty() || + (io_pending.empty() && terminated); + }); if (io_finished.empty()) return NULL; @@ -287,7 +279,6 @@ private: void wait_clean() { - ceph_assert(!reader_thread.is_started()); std::unique_lock l{lock}; cond.wait(l, [this] { return io_pending.empty(); }); @@ -297,6 +288,16 @@ private: } } + void assert_clean() + { + std::unique_lock l{lock}; + + ceph_assert(!reader_thread.is_started()); + ceph_assert(!writer_thread.is_started()); + ceph_assert(io_pending.empty()); + ceph_assert(io_finished.empty()); + } + static void aio_callback(librbd::completion_t cb, void *arg) { librbd::RBD::AioCompletion *aio_completion = @@ -339,7 +340,7 @@ private: poll_fds[0].fd = fd; poll_fds[0].events = POLLIN; - while (!terminated) { + while (true) { std::unique_ptr ctx(new IOContext()); ctx->server = this; @@ -435,21 +436,25 @@ private: goto signal; } } - dout(20) << __func__ << ": terminated" << dendl; - signal: - std::lock_guard l{disconnect_lock}; + std::lock_guard l{lock}; + terminated = true; + cond.notify_all(); + + std::lock_guard disconnect_l{disconnect_lock}; disconnect_cond.notify_all(); + + dout(20) << __func__ << ": terminated" << dendl; } void writer_entry() { - while (!terminated) { + while (true) { dout(20) << __func__ << ": waiting for io request" << dendl; std::unique_ptr ctx(wait_io_finish()); if (!ctx) { dout(20) << __func__ << ": no io requests, terminating" << dendl; - return; + goto done; } dout(20) << __func__ << ": got: " << *ctx << dendl; @@ -458,18 +463,23 @@ signal: if (r < 0) { derr << *ctx << ": failed to write reply header: " << cpp_strerror(r) << dendl; - return; + goto error; } if (ctx->command == NBD_CMD_READ && ctx->reply.error == htonl(0)) { r = ctx->data.write_fd(fd); if (r < 0) { derr << *ctx << ": failed to write replay data: " << cpp_strerror(r) << dendl; - return; + goto error; } } dout(20) << *ctx << ": finish" << dendl; } + error: + wait_clean(); + done: + ::shutdown(fd, SHUT_RDWR); + dout(20) << __func__ << ": terminated" << dendl; } @@ -544,6 +554,8 @@ signal: run_quiesce_hook(cfg->quiesce_hook, cfg->devpath, "unquiesce"); } + + dout(20) << __func__ << ": terminated" << dendl; } class ThreadHelper : public Thread @@ -562,7 +574,6 @@ signal: void* entry() override { (server.*func)(); - server.shutdown(); return NULL; } } reader_thread, writer_thread, quiesce_thread; @@ -622,15 +633,14 @@ public: if (started) { dout(10) << __func__ << ": terminating" << dendl; - shutdown(); - + nbd_terminate = true; reader_thread.join(); writer_thread.join(); if (cfg->quiesce) { quiesce_thread.join(); } - wait_clean(); + assert_clean(); started = false; }