From: Sage Weil Date: Thu, 27 Apr 2017 22:08:00 +0000 (-0400) Subject: common/aio: move aio out of its wierd home in os/fs/FS.h X-Git-Tag: v12.0.3~137^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f8df7d1a02cae17ec7fd3c083dacf8139da7b267;p=ceph.git common/aio: move aio out of its wierd home in os/fs/FS.h This never made sense. Move it. Fix users. Signed-off-by: Sage Weil --- diff --git a/src/os/CMakeLists.txt b/src/os/CMakeLists.txt index 7bd358122f2e..b24686d81170 100644 --- a/src/os/CMakeLists.txt +++ b/src/os/CMakeLists.txt @@ -24,6 +24,7 @@ set(libos_srcs kstore/KStore.cc kstore/kstore_types.cc fs/FS.cc + fs/aio.cc ${libos_xfs_srcs}) if(HAVE_LIBAIO) diff --git a/src/os/bluestore/BlockDevice.h b/src/os/bluestore/BlockDevice.h index 613a73afa45f..157add61fcc1 100644 --- a/src/os/bluestore/BlockDevice.h +++ b/src/os/bluestore/BlockDevice.h @@ -20,9 +20,10 @@ #include #include #include +#include #include "acconfig.h" -#include "os/fs/FS.h" +#include "os/fs/aio.h" #define SPDK_PREFIX "spdk:" @@ -38,8 +39,8 @@ struct IOContext { std::mutex lock; std::condition_variable cond; - list pending_aios; ///< not yet submitted - list running_aios; ///< submitting or submitted + std::list pending_aios; ///< not yet submitted + std::list running_aios; ///< submitting or submitted std::atomic_int num_pending = {0}; std::atomic_int num_running = {0}; std::atomic_int num_reading = {0}; @@ -73,7 +74,7 @@ public: CephContext* cct; private: std::mutex ioc_reap_lock; - vector ioc_reap_queue; + std::vector ioc_reap_queue; std::atomic_int ioc_reap_count = {0}; protected: @@ -85,7 +86,7 @@ public: typedef void (*aio_callback_t)(void *handle, void *aio); static BlockDevice *create( - CephContext* cct, const string& path, aio_callback_t cb, void *cbpriv); + CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv); virtual bool supported_bdev_label() { return true; } virtual bool is_rotational() { return rotational; } @@ -94,7 +95,7 @@ public: virtual uint64_t get_size() const = 0; virtual uint64_t get_block_size() const = 0; - virtual int collect_metadata(string prefix, map *pm) const = 0; + virtual int collect_metadata(std::string prefix, std::map *pm) const = 0; virtual int read( uint64_t off, @@ -125,7 +126,7 @@ public: // for managing buffered readers/writers virtual int invalidate_cache(uint64_t off, uint64_t len) = 0; - virtual int open(const string& path) = 0; + virtual int open(const std::string& path) = 0; virtual void close() = 0; }; diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index 4cc34f7f036b..f5adab3465a9 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -1400,7 +1400,7 @@ int BlueFS::_flush_and_sync_log(std::unique_lock& l, } // drop lock while we wait for io - list completed_ios; + list completed_ios; _claim_completed_aios(log_writer, &completed_ios); l.unlock(); wait_for_aio(log_writer); @@ -1643,7 +1643,7 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) // we need to retire old completed aios so they don't stick around in // memory indefinitely (along with their bufferlist refs). -void BlueFS::_claim_completed_aios(FileWriter *h, list *ls) +void BlueFS::_claim_completed_aios(FileWriter *h, list *ls) { for (auto p : h->iocv) { if (p) { @@ -1742,7 +1742,7 @@ int BlueFS::_fsync(FileWriter *h, std::unique_lock& l) return r; uint64_t old_dirty_seq = h->file->dirty_seq; - list completed_ios; + list completed_ios; _claim_completed_aios(h, &completed_ios); lock.unlock(); wait_for_aio(h); diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 030cf1709d20..be845d8500eb 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -273,7 +273,7 @@ private: int _flush(FileWriter *h, bool force); int _fsync(FileWriter *h, std::unique_lock& l); - void _claim_completed_aios(FileWriter *h, list *ls); + void _claim_completed_aios(FileWriter *h, list *ls); void wait_for_aio(FileWriter *h); // safe to call without a lock int _flush_and_sync_log(std::unique_lock& l, diff --git a/src/os/bluestore/KernelDevice.cc b/src/os/bluestore/KernelDevice.cc index e73c439566e6..77eaf3602bc8 100644 --- a/src/os/bluestore/KernelDevice.cc +++ b/src/os/bluestore/KernelDevice.cc @@ -334,7 +334,7 @@ void KernelDevice::_aio_thread() while (!aio_stop) { dout(40) << __func__ << " polling" << dendl; int max = 16; - FS::aio_t *aio[max]; + aio_t *aio[max]; int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms, aio, max); if (r < 0) { @@ -435,7 +435,7 @@ void KernelDevice::_aio_log_start( } } -void KernelDevice::debug_aio_link(FS::aio_t& aio) +void KernelDevice::debug_aio_link(aio_t& aio) { if (debug_queue.empty()) { debug_oldest = &aio; @@ -443,7 +443,7 @@ void KernelDevice::debug_aio_link(FS::aio_t& aio) debug_queue.push_back(aio); } -void KernelDevice::debug_aio_unlink(FS::aio_t& aio) +void KernelDevice::debug_aio_unlink(aio_t& aio) { if (aio.queue_item.is_linked()) { debug_queue.erase(debug_queue.iterator_to(aio)); @@ -483,9 +483,9 @@ void KernelDevice::aio_submit(IOContext *ioc) // move these aside, and get our end iterator position now, as the // aios might complete as soon as they are submitted and queue more // wal aio's. - list::iterator e = ioc->running_aios.begin(); + list::iterator e = ioc->running_aios.begin(); ioc->running_aios.splice(e, ioc->pending_aios); - list::iterator p = ioc->running_aios.begin(); + list::iterator p = ioc->running_aios.begin(); int pending = ioc->num_pending.load(); ioc->num_running += pending; @@ -494,7 +494,7 @@ void KernelDevice::aio_submit(IOContext *ioc) bool done = false; while (!done) { - FS::aio_t& aio = *p; + aio_t& aio = *p; aio.priv = static_cast(ioc); dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd << " 0x" << std::hex << aio.offset << "~" << aio.length @@ -506,7 +506,7 @@ void KernelDevice::aio_submit(IOContext *ioc) // be careful: as soon as we submit aio we race with completion. // since we are holding a ref take care not to dereference txc at // all after that point. - list::iterator cur = p; + list::iterator cur = p; ++p; done = (p == e); @@ -555,9 +555,9 @@ int KernelDevice::aio_write( #ifdef HAVE_LIBAIO if (aio && dio && !buffered) { - ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); + ioc->pending_aios.push_back(aio_t(ioc, fd_direct)); ++ioc->num_pending; - FS::aio_t& aio = ioc->pending_aios.back(); + aio_t& aio = ioc->pending_aios.back(); if (cct->_conf->bdev_inject_crash && rand() % cct->_conf->bdev_inject_crash == 0) { derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex @@ -664,9 +664,9 @@ int KernelDevice::aio_read( #ifdef HAVE_LIBAIO if (aio && dio) { _aio_log_start(ioc, off, len); - ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct)); + ioc->pending_aios.push_back(aio_t(ioc, fd_direct)); ++ioc->num_pending; - FS::aio_t& aio = ioc->pending_aios.back(); + aio_t& aio = ioc->pending_aios.back(); aio.pread(off, len); for (unsigned i=0; i #include "os/fs/FS.h" +#include "os/fs/aio.h" #include "include/interval_set.h" #include "BlockDevice.h" @@ -26,7 +27,7 @@ class KernelDevice : public BlockDevice { int fd_direct, fd_buffered; uint64_t size; uint64_t block_size; - string path; + std::string path; FS *fs; bool aio, dio; @@ -36,7 +37,7 @@ class KernelDevice : public BlockDevice { std::atomic io_since_flush = {false}; std::mutex flush_mutex; - FS::aio_queue_t aio_queue; + aio_queue_t aio_queue; aio_callback_t aio_callback; void *aio_callback_priv; bool aio_stop; @@ -64,12 +65,12 @@ class KernelDevice : public BlockDevice { int direct_read_unaligned(uint64_t off, uint64_t len, char *buf); // stalled aio debugging - FS::aio_list_t debug_queue; + aio_list_t debug_queue; std::mutex debug_queue_lock; - FS::aio_t *debug_oldest = nullptr; + aio_t *debug_oldest = nullptr; utime_t debug_stall_since; - void debug_aio_link(FS::aio_t& aio); - void debug_aio_unlink(FS::aio_t& aio); + void debug_aio_link(aio_t& aio); + void debug_aio_unlink(aio_t& aio); public: KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv); @@ -83,7 +84,7 @@ public: return block_size; } - int collect_metadata(string prefix, map *pm) const override; + int collect_metadata(std::string prefix, map *pm) const override; int read(uint64_t off, uint64_t len, bufferlist *pbl, IOContext *ioc, @@ -99,7 +100,7 @@ public: // for managing buffered readers/writers int invalidate_cache(uint64_t off, uint64_t len) override; - int open(const string& path) override; + int open(const std::string& path) override; void close() override; }; diff --git a/src/os/fs/FS.cc b/src/os/fs/FS.cc index b9798872b1c3..d15a6bf82f28 100644 --- a/src/os/fs/FS.cc +++ b/src/os/fs/FS.cc @@ -185,47 +185,3 @@ int FS::zero(int fd, uint64_t offset, uint64_t length) // --------------- -#if defined(HAVE_LIBAIO) -int FS::aio_queue_t::submit(aio_t &aio, int *retries) -{ - // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds - int attempts = 16; - int delay = 125; - iocb *piocb = &aio.iocb; - while (true) { - int r = io_submit(ctx, 1, &piocb); - if (r < 0) { - if (r == -EAGAIN && attempts-- > 0) { - usleep(delay); - delay *= 2; - (*retries)++; - continue; - } - return r; - } - assert(r == 1); - break; - } - return 0; -} - -int FS::aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) -{ - io_event event[max]; - struct timespec t = { - timeout_ms / 1000, - (timeout_ms % 1000) * 1000 * 1000 - }; - - int r = 0; - do { - r = io_getevents(ctx, 1, max, event, &t); - } while (r == -EINTR); - - for (int i=0; irval = event[i].res; - } - return r; -} -#endif diff --git a/src/os/fs/FS.h b/src/os/fs/FS.h index 68ff65368025..aafa64e5350b 100644 --- a/src/os/fs/FS.h +++ b/src/os/fs/FS.h @@ -18,18 +18,11 @@ #include #include -#include "acconfig.h" -#ifdef HAVE_LIBAIO -# include -#endif - #include #include "include/types.h" #include "common/Mutex.h" #include "common/Cond.h" -#include -#include class FS { public: @@ -53,75 +46,6 @@ public: virtual int zero(int fd, uint64_t offset, uint64_t length); // -- aio -- -#if defined(HAVE_LIBAIO) - struct aio_t { - struct iocb iocb; // must be first element; see shenanigans in aio_queue_t - void *priv; - int fd; - boost::container::small_vector iov; - uint64_t offset, length; - int rval; - bufferlist bl; ///< write payload (so that it remains stable for duration) - - boost::intrusive::list_member_hook<> queue_item; - - aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) { - } - - void pwritev(uint64_t _offset, uint64_t len) { - offset = _offset; - length = len; - io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset); - } - void pread(uint64_t _offset, uint64_t len) { - offset = _offset; - length = len; - bufferptr p = buffer::create_page_aligned(length); - io_prep_pread(&iocb, fd, p.c_str(), length, offset); - bl.append(std::move(p)); - } - - int get_return_value() { - return rval; - } - }; - - typedef boost::intrusive::list< - aio_t, - boost::intrusive::member_hook< - aio_t, - boost::intrusive::list_member_hook<>, - &aio_t::queue_item> > aio_list_t; - - struct aio_queue_t { - int max_iodepth; - io_context_t ctx; - - - explicit aio_queue_t(unsigned max_iodepth) - : max_iodepth(max_iodepth), - ctx(0) { - } - ~aio_queue_t() { - assert(ctx == 0); - } - - int init() { - assert(ctx == 0); - return io_setup(max_iodepth, &ctx); - } - void shutdown() { - if (ctx) { - int r = io_destroy(ctx); - assert(r == 0); - ctx = 0; - } - } - - int submit(aio_t &aio, int *retries); - int get_next_completed(int timeout_ms, aio_t **paio, int max); - }; -#endif }; #endif diff --git a/src/os/fs/aio.cc b/src/os/fs/aio.cc new file mode 100644 index 000000000000..a5edf6266655 --- /dev/null +++ b/src/os/fs/aio.cc @@ -0,0 +1,51 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "aio.h" + +#if defined(HAVE_LIBAIO) + +int aio_queue_t::submit(aio_t &aio, int *retries) +{ + // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds + int attempts = 16; + int delay = 125; + iocb *piocb = &aio.iocb; + while (true) { + int r = io_submit(ctx, 1, &piocb); + if (r < 0) { + if (r == -EAGAIN && attempts-- > 0) { + usleep(delay); + delay *= 2; + (*retries)++; + continue; + } + return r; + } + assert(r == 1); + break; + } + return 0; +} + +int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ + io_event event[max]; + struct timespec t = { + timeout_ms / 1000, + (timeout_ms % 1000) * 1000 * 1000 + }; + + int r = 0; + do { + r = io_getevents(ctx, 1, max, event, &t); + } while (r == -EINTR); + + for (int i=0; irval = event[i].res; + } + return r; +} + +#endif diff --git a/src/os/fs/aio.h b/src/os/fs/aio.h new file mode 100644 index 000000000000..35d99f2c94f5 --- /dev/null +++ b/src/os/fs/aio.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "acconfig.h" +#ifdef HAVE_LIBAIO +# include + +#include +#include + +#include "include/buffer.h" + +struct aio_t { + struct iocb iocb; // must be first element; see shenanigans in aio_queue_t + void *priv; + int fd; + boost::container::small_vector iov; + uint64_t offset, length; + int rval; + bufferlist bl; ///< write payload (so that it remains stable for duration) + + boost::intrusive::list_member_hook<> queue_item; + + aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) { + } + + void pwritev(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; + io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset); + } + void pread(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; + bufferptr p = buffer::create_page_aligned(length); + io_prep_pread(&iocb, fd, p.c_str(), length, offset); + bl.append(std::move(p)); + } + + int get_return_value() { + return rval; + } +}; + +typedef boost::intrusive::list< + aio_t, + boost::intrusive::member_hook< + aio_t, + boost::intrusive::list_member_hook<>, + &aio_t::queue_item> > aio_list_t; + +struct aio_queue_t { + int max_iodepth; + io_context_t ctx; + + explicit aio_queue_t(unsigned max_iodepth) + : max_iodepth(max_iodepth), + ctx(0) { + } + ~aio_queue_t() { + assert(ctx == 0); + } + + int init() { + assert(ctx == 0); + return io_setup(max_iodepth, &ctx); + } + void shutdown() { + if (ctx) { + int r = io_destroy(ctx); + assert(r == 0); + ctx = 0; + } + } + + int submit(aio_t &aio, int *retries); + int get_next_completed(int timeout_ms, aio_t **paio, int max); +}; + +#endif