In the next patch new io_uring API will be used instead of libaio.
So this prepares the abstract interface.
Signed-off-by: Roman Penyaev <rpenyaev@suse.de>
KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
: BlockDevice(cct, cb, cbpriv),
aio(false), dio(false),
- aio_queue(cct->_conf->bdev_aio_max_queue_depth),
discard_callback(d_cb),
discard_callback_priv(d_cbpriv),
aio_stop(false),
{
fd_directs.resize(WRITE_LIFE_MAX, -1);
fd_buffereds.resize(WRITE_LIFE_MAX, -1);
+
+ unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
+ io_queue = std::unique_ptr<io_queue_t>(new aio_queue_t(iodepth));
}
int KernelDevice::_lock()
{
if (aio) {
dout(10) << __func__ << dendl;
- int r = aio_queue.init();
+ int r = io_queue->init(fd_directs);
if (r < 0) {
if (r == -EAGAIN) {
derr << __func__ << " io_setup(2) failed with EAGAIN; "
aio_stop = true;
aio_thread.join();
aio_stop = false;
- aio_queue.shutdown();
+ io_queue->shutdown();
}
}
dout(40) << __func__ << " polling" << dendl;
int max = cct->_conf->bdev_aio_reap_max;
aio_t *aio[max];
- int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
+ int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
aio, max);
if (r < 0) {
derr << __func__ << " got " << cpp_strerror(r) << dendl;
void *priv = static_cast<void*>(ioc);
int r, retries = 0;
- r = aio_queue.submit_batch(ioc->running_aios.begin(), e,
+ r = io_queue->submit_batch(ioc->running_aios.begin(), e,
pending, priv, &retries);
if (retries)
std::atomic<bool> io_since_flush = {false};
ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex");
- aio_queue_t aio_queue;
+ std::unique_ptr<io_queue_t> io_queue;
aio_callback_t discard_callback;
void *discard_callback_priv;
bool aio_stop;
boost::intrusive::list_member_hook<>,
&aio_t::queue_item> > aio_list_t;
-struct aio_queue_t {
+struct io_queue_t {
+ typedef list<aio_t>::iterator aio_iter;
+
+ virtual ~io_queue_t() {};
+
+ virtual int init(std::vector<int> &fds) = 0;
+ virtual void shutdown() = 0;
+ virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) = 0;
+ virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
+};
+
+struct aio_queue_t : public io_queue_t {
int max_iodepth;
#if defined(HAVE_LIBAIO)
io_context_t ctx;
int ctx;
#endif
- typedef list<aio_t>::iterator aio_iter;
-
explicit aio_queue_t(unsigned max_iodepth)
: max_iodepth(max_iodepth),
ctx(0) {
}
- ~aio_queue_t() {
+ ~aio_queue_t() final {
ceph_assert(ctx == 0);
}
- int init() {
+ int init(std::vector<int> &fds) final {
+ (void)fds;
ceph_assert(ctx == 0);
#if defined(HAVE_LIBAIO)
int r = io_setup(max_iodepth, &ctx);
return 0;
#endif
}
- void shutdown() {
+ void shutdown() final {
if (ctx) {
#if defined(HAVE_LIBAIO)
int r = io_destroy(ctx);
}
}
- int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
- void *priv, int *retries);
- int get_next_completed(int timeout_ms, aio_t **paio, int max);
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
};