]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: make abstract interface for any sorts of AIO queues
authorRoman Penyaev <rpenyaev@suse.de>
Tue, 26 Mar 2019 09:10:50 +0000 (10:10 +0100)
committerKefu Chai <kchai@redhat.com>
Tue, 10 Dec 2019 09:21:38 +0000 (17:21 +0800)
In the next patch new io_uring API will be used instead of libaio.
So this prepares the abstract interface.

Signed-off-by: Roman Penyaev <rpenyaev@suse.de>
src/os/bluestore/KernelDevice.cc
src/os/bluestore/KernelDevice.h
src/os/bluestore/ceph_aio.h

index de638661399987541fd0b449fc8d67cfec5bdcf0..dce701f6928c2a6a4e004fa6af0e49031c769aed 100644 (file)
@@ -42,7 +42,6 @@
 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
   : BlockDevice(cct, cb, cbpriv),
     aio(false), dio(false),
-    aio_queue(cct->_conf->bdev_aio_max_queue_depth),
     discard_callback(d_cb),
     discard_callback_priv(d_cbpriv),
     aio_stop(false),
@@ -54,6 +53,9 @@ KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, ai
 {
   fd_directs.resize(WRITE_LIFE_MAX, -1);
   fd_buffereds.resize(WRITE_LIFE_MAX, -1);
+
+  unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
+  io_queue = std::unique_ptr<io_queue_t>(new aio_queue_t(iodepth));
 }
 
 int KernelDevice::_lock()
@@ -414,7 +416,7 @@ int KernelDevice::_aio_start()
 {
   if (aio) {
     dout(10) << __func__ << dendl;
-    int r = aio_queue.init();
+    int r = io_queue->init(fd_directs);
     if (r < 0) {
       if (r == -EAGAIN) {
        derr << __func__ << " io_setup(2) failed with EAGAIN; "
@@ -436,7 +438,7 @@ void KernelDevice::_aio_stop()
     aio_stop = true;
     aio_thread.join();
     aio_stop = false;
-    aio_queue.shutdown();
+    io_queue->shutdown();
   }
 }
 
@@ -496,7 +498,7 @@ void KernelDevice::_aio_thread()
     dout(40) << __func__ << " polling" << dendl;
     int max = cct->_conf->bdev_aio_reap_max;
     aio_t *aio[max];
-    int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
+    int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
                                         aio, max);
     if (r < 0) {
       derr << __func__ << " got " << cpp_strerror(r) << dendl;
@@ -761,7 +763,7 @@ void KernelDevice::aio_submit(IOContext *ioc)
 
   void *priv = static_cast<void*>(ioc);
   int r, retries = 0;
-  r = aio_queue.submit_batch(ioc->running_aios.begin(), e,
+  r = io_queue->submit_batch(ioc->running_aios.begin(), e,
                             pending, priv, &retries);
 
   if (retries)
index 5cf59ed1986a3591e997eef3e418a6aa86d75b8f..dfa58d982f2aadda24be087c038fecd3748e3933 100644 (file)
@@ -45,7 +45,7 @@ class KernelDevice : public BlockDevice {
   std::atomic<bool> io_since_flush = {false};
   ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex");
 
-  aio_queue_t aio_queue;
+  std::unique_ptr<io_queue_t> io_queue;
   aio_callback_t discard_callback;
   void *discard_callback_priv;
   bool aio_stop;
index 9d814a6379b376a3c1a8786b3f4cb3bc35a40b8b..17c7989203fbd6c03666e4829d8a22a4f92be934 100644 (file)
@@ -93,7 +93,19 @@ typedef boost::intrusive::list<
     boost::intrusive::list_member_hook<>,
     &aio_t::queue_item> > aio_list_t;
 
-struct aio_queue_t {
+struct io_queue_t {
+  typedef list<aio_t>::iterator aio_iter;
+
+  virtual ~io_queue_t() {};
+
+  virtual int init(std::vector<int> &fds) = 0;
+  virtual void shutdown() = 0;
+  virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+                          void *priv, int *retries) = 0;
+  virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
+};
+
+struct aio_queue_t : public io_queue_t {
   int max_iodepth;
 #if defined(HAVE_LIBAIO)
   io_context_t ctx;
@@ -101,17 +113,16 @@ struct aio_queue_t {
   int ctx;
 #endif
 
-  typedef list<aio_t>::iterator aio_iter;
-
   explicit aio_queue_t(unsigned max_iodepth)
     : max_iodepth(max_iodepth),
       ctx(0) {
   }
-  ~aio_queue_t() {
+  ~aio_queue_t() final {
     ceph_assert(ctx == 0);
   }
 
-  int init() {
+  int init(std::vector<int> &fds) final {
+    (void)fds;
     ceph_assert(ctx == 0);
 #if defined(HAVE_LIBAIO)
     int r = io_setup(max_iodepth, &ctx);
@@ -130,7 +141,7 @@ struct aio_queue_t {
       return 0;
 #endif
   }
-  void shutdown() {
+  void shutdown() final {
     if (ctx) {
 #if defined(HAVE_LIBAIO)
       int r = io_destroy(ctx);
@@ -142,7 +153,7 @@ struct aio_queue_t {
     }
   }
 
-  int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, 
-                  void *priv, int *retries);
-  int get_next_completed(int timeout_ms, aio_t **paio, int max);
+  int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+                  void *priv, int *retries) final;
+  int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
 };