#include "common/config.h"
#include "common/dout.h"
#include "common/errno.h"
+#include "common/event_socket.h"
#include "common/module.h"
#include "common/safe_io.h"
#include "common/version.h"
static int nbd = -1;
static int nbd_index = -1;
-static bool nbd_terminate = false;
+static EventSocket terminate_event_sock;
#define RBD_NBD_BLKSIZE 512UL
}
private:
+ int terminate_event_fd = -1;
ceph::mutex disconnect_lock =
ceph::make_mutex("NBDServer::DisconnectLocker");
ceph::condition_variable disconnect_cond;
void reader_entry()
{
- struct pollfd poll_fds[1];
- memset(poll_fds, 0, sizeof(struct pollfd));
+ struct pollfd poll_fds[2];
+ memset(poll_fds, 0, sizeof(struct pollfd) * 2);
poll_fds[0].fd = fd;
poll_fds[0].events = POLLIN;
+ poll_fds[1].fd = terminate_event_fd;
+ poll_fds[1].events = POLLIN;
while (true) {
std::unique_ptr<IOContext> ctx(new IOContext());
dout(20) << __func__ << ": waiting for nbd request" << dendl;
- // We can not block on read because we have to check periodically
- // nbd_terminate flag set by the signal handler.
- // So to prevent blocking use poll with timeout to ensure we
- // have some data comes before calling read.
-
- int r = poll(poll_fds, 1, 1000);
+ int r = poll(poll_fds, 2, -1);
if (r == -1) {
if (errno == EINTR) {
continue;
goto signal;
}
- if (nbd_terminate) {
- dout(0) << "terminate received" << dendl;
+ if ((poll_fds[1].revents & POLLIN) != 0) {
+ dout(0) << __func__ << ": terminate received" << dendl;
goto signal;
}
if ((poll_fds[0].revents & POLLIN) == 0) {
- dout(20) << __func__ << ": poll timed out" << dendl;
+ dout(20) << __func__ << ": nothing to read" << dendl;
continue;
}
started = true;
+ terminate_event_fd = eventfd(0, EFD_NONBLOCK);
+ ceph_assert(terminate_event_fd > 0);
+ int r = terminate_event_sock.init(terminate_event_fd,
+ EVENT_SOCKET_TYPE_EVENTFD);
+ ceph_assert(r >= 0);
+
reader_thread.create("rbd_reader");
writer_thread.create("rbd_writer");
if (cfg->quiesce) {
if (started) {
dout(10) << __func__ << ": terminating" << dendl;
- nbd_terminate = true;
+ terminate_event_sock.notify();
+
reader_thread.join();
writer_thread.join();
if (cfg->quiesce) {
assert_clean();
+ close(terminate_event_fd);
started = false;
}
}
ceph_assert(signum == SIGINT || signum == SIGTERM);
derr << "*** Got signal " << sig_str(signum) << " ***" << dendl;
- dout(20) << __func__ << ": " << "setting terminate flag" << dendl;
- nbd_terminate = true;
+ dout(20) << __func__ << ": " << "notifying terminate" << dendl;
+
+ ceph_assert(terminate_event_sock.is_valid());
+ terminate_event_sock.notify();
}
static NBDServer *start_server(int fd, librbd::Image& image, Config *cfg)