]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: introduce new io_uring IO engine 27392/head
authorRoman Penyaev <rpenyaev@suse.de>
Thu, 21 Mar 2019 17:10:11 +0000 (18:10 +0100)
committerKefu Chai <kchai@redhat.com>
Tue, 10 Dec 2019 09:22:15 +0000 (17:22 +0800)
This implements low-level IO engine, which utilizes brand-new
io_uring IO interface: https://lwn.net/Articles/776428/

By default libaio is used.  If bluestore_ioring=true is set but kernel
does not support io_uring or architecture is not x86-64, libaio will be
used instead.

In current patch liburing library is used in order not to open code
everything.

In order to compile with liburing WITH_LIBURING=ON should be specified.

Signed-off-by: Roman Penyaev <rpenyaev@suse.de>
CMakeLists.txt
src/common/options.cc
src/include/config-h.in.cmake
src/os/CMakeLists.txt
src/os/bluestore/KernelDevice.cc
src/os/bluestore/ceph_io_uring.h [new file with mode: 0644]
src/os/bluestore/io_uring.cc [new file with mode: 0644]

index 49243d0670dba28579eaf1e2acfcc2fd479b03e5..2fe35f0091a712371387661ec5b23757ecf7ee7c 100644 (file)
@@ -186,6 +186,10 @@ if(WITH_BLUESTORE)
 endif()
 
 include(CMakeDependentOption)
+CMAKE_DEPENDENT_OPTION(WITH_LIBURING "Build with liburing library support" OFF
+  "WITH_BLUESTORE;HAVE_LIBAIO" OFF)
+set(HAVE_LIBURING ${WITH_LIBURING})
+
 CMAKE_DEPENDENT_OPTION(WITH_BLUESTORE_PMEM "Enable PMDK libraries" OFF
   "WITH_BLUESTORE" OFF)
 
index 3cd01fe85969e274fc4bcf63f94d2407b034fea2..367877296ac9419d1566bdc0e1998f7aadfcefd6 100644 (file)
@@ -4624,6 +4624,10 @@ std::vector<Option> get_global_options() {
       .set_default(0)
       .set_description("Space reserved at DB device and not allowed for 'use some extra' policy usage. Overrides 'bluestore_volume_selection_reserved_factor' setting and introduces straightforward limit."),
 
+    Option("bluestore_ioring", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+    .set_default(false)
+    .set_description("Enables Linux io_uring API instead of libaio"),
+
     // -----------------------------------------
     // kstore
 
index 22f8518121176e77d0b844b634d70916cf8b7cbf..1eb7807ed4acd992029261186bb90a3c2345fc32 100644 (file)
@@ -75,6 +75,9 @@
 /* Defined if you have libaio */
 #cmakedefine HAVE_LIBAIO
 
+/* Defined if you have liburing */
+#cmakedefine HAVE_LIBURING
+
 /* Defind if you have POSIX AIO */
 #cmakedefine HAVE_POSIXAIO
 
index 801c3efc72b588c695b0409dcbf9c5121cb11416..c9de652ee3e97803c7e3e01a7e163a6e848d3219 100644 (file)
@@ -33,6 +33,7 @@ if(WITH_BLUESTORE)
     bluestore/StupidAllocator.cc
     bluestore/BitmapAllocator.cc
     bluestore/AvlAllocator.cc
+    bluestore/io_uring.cc
   )
 endif(WITH_BLUESTORE)
 
@@ -130,3 +131,29 @@ endif()
 if(WITH_EVENTTRACE)
   add_dependencies(os eventtrace_tp)
 endif()
+
+if(WITH_LIBURING)
+  include(ExternalProject)
+  if("${CMAKE_GENERATOR}" MATCHES "Make")
+    set(make_cmd "$(MAKE)")
+  else()
+    set(make_cmd "make")
+  endif()
+  ExternalProject_Add(liburing_ext
+    DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/src/
+    GIT_REPOSITORY http://git.kernel.dk/liburing
+    GIT_TAG "4e360f71131918c36774f51688e5c65dea8d43f2"
+    SOURCE_DIR ${CMAKE_BINARY_DIR}/src/liburing
+    CONFIGURE_COMMAND <SOURCE_DIR>/configure
+    BUILD_COMMAND env CC=${CMAKE_C_COMPILER} ${make_cmd} -C src -s
+    BUILD_IN_SOURCE 1
+    INSTALL_COMMAND "")
+  unset(make_cmd)
+  add_library(liburing STATIC IMPORTED GLOBAL)
+  add_dependencies(liburing liburing_ext)
+  set_target_properties(liburing PROPERTIES
+    IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+    IMPORTED_LOCATION "${CMAKE_BINARY_DIR}/src/liburing/src/liburing.a")
+  target_link_libraries(os liburing)
+  target_include_directories(os SYSTEM PRIVATE "${CMAKE_BINARY_DIR}/src/liburing/src/include")
+endif(WITH_LIBURING)
index dce701f6928c2a6a4e004fa6af0e49031c769aed..990e0ce90999d9bca1888323da6574a1ceb9e7fe 100644 (file)
@@ -33,6 +33,7 @@
 #include "common/numa.h"
 
 #include "global/global_context.h"
+#include "ceph_io_uring.h"
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_bdev
@@ -54,8 +55,20 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
   fd_directs.resize(WRITE_LIFE_MAX, -1);
   fd_buffereds.resize(WRITE_LIFE_MAX, -1);
 
+  bool use_ioring = g_ceph_context->_conf.get_val<bool>("bluestore_ioring");
   unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
-  io_queue = std::unique_ptr<io_queue_t>(new aio_queue_t(iodepth));
+
+  if (use_ioring && ioring_queue_t::supported()) {
+    io_queue = std::make_unique<ioring_queue_t>(iodepth);
+  } else {
+    static bool once;
+    if (use_ioring && !once) {
+      derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
+           << dendl;
+      once = true;
+    }
+    io_queue = std::make_unique<aio_queue_t>(iodepth);
+  }
 }
 
 int KernelDevice::_lock()
diff --git a/src/os/bluestore/ceph_io_uring.h b/src/os/bluestore/ceph_io_uring.h
new file mode 100644 (file)
index 0000000..f14135a
--- /dev/null
@@ -0,0 +1,31 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#include "include/types.h"
+#include "ceph_aio.h"
+
+struct ioring_data;
+
+struct ioring_queue_t final : public io_queue_t {
+  std::unique_ptr<ioring_data> d;
+  unsigned iodepth = 0;
+
+  typedef std::list<aio_t>::iterator aio_iter;
+
+  // Returns true if arch is x86-64 and kernel supports io_uring
+  static bool supported();
+
+  ioring_queue_t(unsigned iodepth_);
+  ~ioring_queue_t() final;
+
+  int init(std::vector<int> &fds) final;
+  void shutdown() final;
+
+  int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+                   void *priv, int *retries) final;
+  int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};
diff --git a/src/os/bluestore/io_uring.cc b/src/os/bluestore/io_uring.cc
new file mode 100644 (file)
index 0000000..54fa0f9
--- /dev/null
@@ -0,0 +1,267 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "ceph_io_uring.h"
+
+#if defined(HAVE_LIBURING) && defined(__x86_64__)
+
+#include "liburing.h"
+#include <sys/epoll.h>
+
+/* Options */
+
+static bool hipri = false;      /* use IO polling */
+static bool sq_thread = false;  /* use kernel submission/poller thread */
+
+struct ioring_data {
+  struct io_uring io_uring;
+  pthread_mutex_t cq_mutex;
+  pthread_mutex_t sq_mutex;
+  int epoll_fd = -1;
+  std::map<int, int> fixed_fds_map;
+};
+
+static int ioring_get_cqe(struct ioring_data *d, unsigned int max,
+                         struct aio_t **paio)
+{
+  struct io_uring *ring = &d->io_uring;
+  struct io_uring_cqe *cqe;
+
+  unsigned nr = 0;
+  unsigned head;
+  io_uring_for_each_cqe(ring, head, cqe) {
+    struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe);
+    io->rval = cqe->res;
+
+    paio[nr++] = io;
+
+    if (nr == max)
+      break;
+  }
+  io_uring_cq_advance(ring, nr);
+
+  return nr;
+}
+
+static int find_fixed_fd(struct ioring_data *d, int real_fd)
+{
+  auto it = d->fixed_fds_map.find(real_fd);
+  if (it == d->fixed_fds_map.end())
+    return -1;
+
+  return it->second;
+}
+
+static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe,
+                    struct aio_t *io)
+{
+  int fixed_fd = find_fixed_fd(d, io->fd);
+
+  ceph_assert(fixed_fd != -1);
+
+  if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV)
+    io_uring_prep_writev(sqe, fixed_fd, &io->iov[0],
+                        io->iov.size(), io->offset);
+  else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV)
+    io_uring_prep_readv(sqe, fixed_fd, &io->iov[0],
+                       io->iov.size(), io->offset);
+  else
+    ceph_assert(0);
+
+  io_uring_sqe_set_data(sqe, io);
+  io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
+}
+
+static int ioring_queue(struct ioring_data *d, void *priv,
+                       list<aio_t>::iterator beg, list<aio_t>::iterator end)
+{
+  struct io_uring *ring = &d->io_uring;
+  struct aio_t *io = nullptr;
+
+  ceph_assert(beg != end);
+
+  do {
+    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
+    if (!sqe)
+      break;
+
+    io = &*beg;
+    io->priv = priv;
+
+    init_sqe(d, sqe, io);
+
+  } while (++beg != end);
+
+  if (!io)
+    /* Queue is full, go and reap something first */
+    return 0;
+
+  return io_uring_submit(ring);
+}
+
+static void build_fixed_fds_map(struct ioring_data *d,
+                               std::vector<int> &fds)
+{
+  int fixed_fd = 0;
+  for (int real_fd : fds) {
+    d->fixed_fds_map[real_fd] = fixed_fd++;
+  }
+}
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_) :
+  d(make_unique<ioring_data>()),
+  iodepth(iodepth_)
+{
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+  unsigned flags = 0;
+
+  pthread_mutex_init(&d->cq_mutex, NULL);
+  pthread_mutex_init(&d->sq_mutex, NULL);
+
+  if (hipri)
+    flags |= IORING_SETUP_IOPOLL;
+  if (sq_thread)
+    flags |= IORING_SETUP_SQPOLL;
+
+  int ret = io_uring_queue_init(iodepth, &d->io_uring, flags);
+  if (ret < 0)
+    return ret;
+
+  ret = io_uring_register(d->io_uring.ring_fd, IORING_REGISTER_FILES,
+                         &fds[0], fds.size());
+  if (ret < 0) {
+    ret = -errno;
+    goto close_ring_fd;
+  }
+
+  build_fixed_fds_map(d.get(), fds);
+
+  d->epoll_fd = epoll_create1(0);
+  if (d->epoll_fd < 0) {
+    ret = -errno;
+    goto close_ring_fd;
+  }
+
+  struct epoll_event ev;
+  ev.events = EPOLLIN;
+  ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev);
+  if (ret < 0) {
+    ret = -errno;
+    goto close_epoll_fd;
+  }
+
+  return 0;
+
+close_epoll_fd:
+  close(d->epoll_fd);
+close_ring_fd:
+  io_uring_queue_exit(&d->io_uring);
+
+  return ret;
+}
+
+void ioring_queue_t::shutdown()
+{
+  d->fixed_fds_map.clear();
+  close(d->epoll_fd);
+  d->epoll_fd = -1;
+  io_uring_queue_exit(&d->io_uring);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+                                 uint16_t aios_size, void *priv,
+                                 int *retries)
+{
+  (void)aios_size;
+  (void)retries;
+
+  pthread_mutex_lock(&d->sq_mutex);
+  int rc = ioring_queue(d.get(), priv, beg, end);
+  pthread_mutex_unlock(&d->sq_mutex);
+
+  return rc;
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+get_cqe:
+  pthread_mutex_lock(&d->cq_mutex);
+  int events = ioring_get_cqe(d.get(), max, paio);
+  pthread_mutex_unlock(&d->cq_mutex);
+
+  if (events == 0) {
+    struct epoll_event ev;
+    int ret = epoll_wait(d->epoll_fd, &ev, 1, timeout_ms);
+    if (ret < 0)
+      events = -errno;
+    else if (ret > 0)
+      /* Time to reap */
+      goto get_cqe;
+  }
+
+  return events;
+}
+
+bool ioring_queue_t::supported()
+{
+  struct io_uring_params p;
+
+  memset(&p, 0, sizeof(p));
+  int fd = io_uring_setup(16, &p);
+  if (fd < 0)
+    return false;
+
+  close(fd);
+
+  return true;
+}
+
+#else // #if defined(HAVE_LIBURING) && defined(__x86_64__)
+
+struct ioring_data {};
+
+ioring_queue_t::ioring_queue_t(unsigned iodepth_)
+{
+  ceph_assert(0);
+}
+
+ioring_queue_t::~ioring_queue_t()
+{
+  ceph_assert(0);
+}
+
+int ioring_queue_t::init(std::vector<int> &fds)
+{
+  ceph_assert(0);
+}
+
+void ioring_queue_t::shutdown()
+{
+  ceph_assert(0);
+}
+
+int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
+                                 uint16_t aios_size, void *priv,
+                                 int *retries)
+{
+  ceph_assert(0);
+}
+
+int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+  ceph_assert(0);
+}
+
+bool ioring_queue_t::supported()
+{
+  return false;
+}
+
+#endif // #if defined(HAVE_LIBURING) && defined(__x86_64__)