std::atomic<bool> terminated = { false };
std::atomic<bool> allow_internal_flush = { false };
- void shutdown()
- {
- bool expected = false;
- if (terminated.compare_exchange_strong(expected, true)) {
- ::shutdown(fd, SHUT_RDWR);
-
- std::lock_guard l{lock};
- cond.notify_all();
- }
- }
-
struct IOContext
{
xlist<IOContext*>::item item;
IOContext *wait_io_finish()
{
std::unique_lock l{lock};
- cond.wait(l, [this] { return !io_finished.empty() || terminated; });
+ cond.wait(l, [this] {
+ return !io_finished.empty() ||
+ (io_pending.empty() && terminated);
+ });
if (io_finished.empty())
return NULL;
void wait_clean()
{
- ceph_assert(!reader_thread.is_started());
std::unique_lock l{lock};
cond.wait(l, [this] { return io_pending.empty(); });
}
}
+ void assert_clean()
+ {
+ std::unique_lock l{lock};
+
+ ceph_assert(!reader_thread.is_started());
+ ceph_assert(!writer_thread.is_started());
+ ceph_assert(io_pending.empty());
+ ceph_assert(io_finished.empty());
+ }
+
static void aio_callback(librbd::completion_t cb, void *arg)
{
librbd::RBD::AioCompletion *aio_completion =
poll_fds[0].fd = fd;
poll_fds[0].events = POLLIN;
- while (!terminated) {
+ while (true) {
std::unique_ptr<IOContext> ctx(new IOContext());
ctx->server = this;
goto signal;
}
}
- dout(20) << __func__ << ": terminated" << dendl;
-
signal:
- std::lock_guard l{disconnect_lock};
+ std::lock_guard l{lock};
+ terminated = true;
+ cond.notify_all();
+
+ std::lock_guard disconnect_l{disconnect_lock};
disconnect_cond.notify_all();
+
+ dout(20) << __func__ << ": terminated" << dendl;
}
void writer_entry()
{
- while (!terminated) {
+ while (true) {
dout(20) << __func__ << ": waiting for io request" << dendl;
std::unique_ptr<IOContext> ctx(wait_io_finish());
if (!ctx) {
dout(20) << __func__ << ": no io requests, terminating" << dendl;
- return;
+ goto done;
}
dout(20) << __func__ << ": got: " << *ctx << dendl;
if (r < 0) {
derr << *ctx << ": failed to write reply header: " << cpp_strerror(r)
<< dendl;
- return;
+ goto error;
}
if (ctx->command == NBD_CMD_READ && ctx->reply.error == htonl(0)) {
r = ctx->data.write_fd(fd);
if (r < 0) {
derr << *ctx << ": failed to write replay data: " << cpp_strerror(r)
<< dendl;
- return;
+ goto error;
}
}
dout(20) << *ctx << ": finish" << dendl;
}
+ error:
+ wait_clean();
+ done:
+ ::shutdown(fd, SHUT_RDWR);
+
dout(20) << __func__ << ": terminated" << dendl;
}
run_quiesce_hook(cfg->quiesce_hook, cfg->devpath, "unquiesce");
}
+
+ dout(20) << __func__ << ": terminated" << dendl;
}
class ThreadHelper : public Thread
void* entry() override
{
(server.*func)();
- server.shutdown();
return NULL;
}
} reader_thread, writer_thread, quiesce_thread;
if (started) {
dout(10) << __func__ << ": terminating" << dendl;
- shutdown();
-
+ nbd_terminate = true;
reader_thread.join();
writer_thread.join();
if (cfg->quiesce) {
quiesce_thread.join();
}
- wait_clean();
+ assert_clean();
started = false;
}