From b7a53b58741deff5d10b92f719fc42a351d4d0f4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 20 Apr 2015 10:34:04 -0700 Subject: [PATCH] os/newstore: basic aio support Signed-off-by: Sage Weil --- src/common/config_opts.h | 3 + src/os/fs/FS.h | 83 +++++++++++++++++++++ src/os/newstore/NewStore.cc | 145 ++++++++++++++++++++++++++++++++---- src/os/newstore/NewStore.h | 32 +++++++- 4 files changed, 248 insertions(+), 15 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index ac913c2b0a68c..8d26707c79b36 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -810,6 +810,9 @@ OPTION(newstore_overlay_max, OPT_INT, 32) OPTION(newstore_open_by_handle, OPT_BOOL, true) OPTION(newstore_o_direct, OPT_BOOL, true) OPTION(newstore_db_path, OPT_STR, "") +OPTION(newstore_aio, OPT_BOOL, true) +OPTION(newstore_aio_poll_ms, OPT_INT, 250) // milliseconds +OPTION(newstore_aio_max_queue_depth, OPT_INT, 64) OPTION(filestore_omap_backend, OPT_STR, "leveldb") diff --git a/src/os/fs/FS.h b/src/os/fs/FS.h index a9d8100fafcc8..51c6363f13fcd 100644 --- a/src/os/fs/FS.h +++ b/src/os/fs/FS.h @@ -15,9 +15,19 @@ #ifndef CEPH_OS_FS_H #define CEPH_OS_FS_H +#include +#include + +#include "acconfig.h" +#ifdef HAVE_LIBAIO +# include +#endif + #include #include "include/types.h" +#include "common/Mutex.h" +#include "common/Cond.h" class FS { public: @@ -39,6 +49,79 @@ public: int from_fd, uint64_t from_offset, uint64_t from_len); virtual int zero(int fd, uint64_t offset, uint64_t length); + + // -- aio -- + + struct aio_t { + struct iocb iocb; // must be first element; see shenanigans in aio_queue_t + void *priv; + int fd; + vector iov; + + aio_t(void *p, int f) : priv(p), fd(f) { + memset(&iocb, 0, sizeof(iocb)); + } + + void pwritev(uint64_t offset) { + io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset); + } + }; + + struct aio_queue_t { + int max_iodepth; + io_context_t ctx; + + aio_queue_t(unsigned max_iodepth = 8) + : max_iodepth(max_iodepth), + ctx(0) { + } + ~aio_queue_t() { + assert(ctx == 0); + } + + int init() { + assert(ctx == 0); + return io_setup(max_iodepth, &ctx); + } + void shutdown() { + if (ctx) { + int r = io_destroy(ctx); + assert(r == 0); + ctx = 0; + } + } + + int submit(aio_t &aio) { + int attempts = 10; + iocb *piocb = &aio.iocb; + do { + int r = io_submit(ctx, 1, &piocb); + if (r < 0) { + if (r == -EAGAIN && attempts-- > 0) { + usleep(500); + continue; + } + return r; + } + } while (false); + return 0; + } + + int get_next_completed(int timeout_ms, aio_t **paio) { + io_event event[1]; + struct timespec t = { + timeout_ms / 1000, + (timeout_ms % 1000) * 1000 * 1000 + }; + int r = io_getevents(ctx, 1, 1, event, &t); + if (r <= 0) { + return r; + } + *paio = (aio_t *)event[0].obj; + return 1; + } + }; + }; #endif diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index 9db13d368c8fb..7de2092308d91 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -598,6 +598,9 @@ NewStore::NewStore(CephContext *cct, const string& path) cct->_conf->newstore_fsync_thread_timeout, cct->_conf->newstore_fsync_thread_suicide_timeout, &fsync_tp), + aio_thread(this), + aio_stop(false), + aio_queue(cct->_conf->newstore_aio_max_queue_depth), kv_sync_thread(this), kv_lock("NewStore::kv_lock"), kv_stop(false), @@ -830,6 +833,29 @@ void NewStore::_close_db() db = NULL; } +int NewStore::_aio_start() +{ + if (g_conf->newstore_aio) { + dout(10) << __func__ << dendl; + int r = aio_queue.init(); + if (r < 0) + return r; + aio_thread.create(); + } + return 0; +} + +void NewStore::_aio_stop() +{ + if (g_conf->newstore_aio) { + dout(10) << __func__ << dendl; + aio_stop = true; + aio_thread.join(); + aio_stop = false; + aio_queue.shutdown(); + } +} + int NewStore::_open_collections() { KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL); @@ -961,10 +987,14 @@ int NewStore::mount() if (r < 0) goto out_db; - r = _replay_wal(); + r = _aio_start(); if (r < 0) goto out_db; + r = _replay_wal(); + if (r < 0) + goto out_aio; + finisher.start(); fsync_tp.start(); wal_tp.start(); @@ -973,6 +1003,8 @@ int NewStore::mount() mounted = true; return 0; + out_aio: + _aio_stop(); out_db: _close_db(); out_frag: @@ -994,6 +1026,8 @@ int NewStore::umount() dout(20) << __func__ << " stopping fsync_wq" << dendl; fsync_tp.stop(); + dout(20) << __func__ << " stopping aio" << dendl; + _aio_stop(); dout(20) << __func__ << " stopping kv thread" << dendl; _kv_stop(); dout(20) << __func__ << " draining wal_wq" << dendl; @@ -2198,6 +2232,34 @@ void NewStore::_osr_reap_done(OpSequencer *osr) } } +void NewStore::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + FS::aio_t *aio; + int r = aio_queue.get_next_completed(g_conf->newstore_aio_poll_ms, &aio); + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + } + if (r == 1) { + TransContext *txc = static_cast(aio->priv); + int left = txc->num_aio.dec(); + dout(10) << __func__ << " finished aio on " << txc << ", " + << left << " left" << dendl; + if (left == 0) { + txc->state = TransContext::STATE_AIO_DONE; + if (!txc->fds.empty()) { + _txc_queue_fsync(txc); + } else { + _txc_finish_fsync(txc); + } + } + } + } + dout(10) << __func__ << " end" << dendl; +} + void NewStore::_kv_sync_thread() { dout(10) << __func__ << " start" << dendl; @@ -2317,7 +2379,12 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt) << cpp_strerror(r) << dendl; return r; } - p->data.write_fd(fd); + r = p->data.write_fd(fd); + if (r < 0) { + derr << __func__ << " write_fd on " << fd << " got: " + << cpp_strerror(r) << dendl; + return r; + } sync_fds.push_back(fd); } break; @@ -2481,7 +2548,27 @@ int NewStore::queue_transactions( _txc_finish_kv(txc); } else { // async path - if (!txc->fds.empty()) { + if (!txc->aios.empty()) { + txc->state = TransContext::STATE_AIO_QUEUED; + dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios" + << dendl; + for (list::iterator p = txc->aios.begin(); + p != txc->aios.end(); + ++p) { + FS::aio_t& aio = *p; + dout(20) << __func__ << " submitting aio " << &aio << dendl; + for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) + dout(30) << __func__ << " iov " << (void*)q->iov_base + << " len " << q->iov_len << dendl; + dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR) + << dendl; + int r = aio_queue.submit(*p); + if (r) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + assert(r == 0); + } + } + } else if (!txc->fds.empty()) { _txc_queue_fsync(txc); } else { _txc_finish_fsync(txc); @@ -3063,6 +3150,7 @@ int NewStore::_do_write(TransContext *txc, o->onode.size == 0 || o->onode.data_map.empty()) { _do_overlay_clear(txc, o); + uint64_t x_offset; if (o->onode.data_map.empty()) { // create fragment_t &f = o->onode.data_map[0]; @@ -3073,7 +3161,7 @@ int NewStore::_do_write(TransContext *txc, r = fd; goto out; } - ::lseek64(fd, offset, SEEK_SET); + x_offset = offset; dout(20) << __func__ << " create " << f.fid << " writing " << offset << "~" << length << dendl; } else { @@ -3087,17 +3175,32 @@ int NewStore::_do_write(TransContext *txc, } ::ftruncate(fd, f.length); // in case there is trailing crap f.length = (offset + length) - f.offset; - ::lseek64(fd, offset - f.offset, SEEK_SET); + x_offset = offset - f.offset; dout(20) << __func__ << " append " << f.fid << " writing " << (offset - f.offset) << "~" << length << dendl; } if (offset + length > o->onode.size) { o->onode.size = offset + length; } - r = bl.write_fd(fd); - if (r < 0) { - derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl; - goto out; +#ifdef HAVE_LIBAIO + if (g_conf->newstore_aio && (flags & O_DIRECT)) { + txc->aios.push_back(FS::aio_t(txc, fd)); + txc->num_aio.inc(); + FS::aio_t& aio = txc->aios.back(); + bl.prepare_iov(&aio.iov); + txc->aio_bl.append(bl); + aio.pwritev(x_offset); + + dout(2) << __func__ << " prepared aio " << &aio << dendl; + } else +#endif + { + ::lseek64(fd, x_offset, SEEK_SET); + r = bl.write_fd(fd); + if (r < 0) { + derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl; + goto out; + } } txc->sync_fd(fd); r = 0; @@ -3128,12 +3231,26 @@ int NewStore::_do_write(TransContext *txc, dout(20) << __func__ << " replace old fid " << op->fid << " with new fid " << f.fid << ", writing " << offset << "~" << length << dendl; - r = bl.write_fd(fd); - if (r < 0) { - derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl; - goto out; + +#ifdef HAVE_LIBAIO + if (g_conf->newstore_aio && (flags & O_DIRECT)) { + txc->aios.push_back(FS::aio_t(txc, fd)); + txc->num_aio.inc(); + FS::aio_t& aio = txc->aios.back(); + bl.prepare_iov(&aio.iov); + txc->aio_bl.append(bl); + aio.pwritev(0); + dout(2) << __func__ << " prepared aio " << &aio << dendl; + } else +#endif + { + r = bl.write_fd(fd); + if (r < 0) { + derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl; + goto out; + } + txc->sync_fd(fd); } - txc->sync_fd(fd); r = 0; goto out; } diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index 35b73770d63fc..f96f85270c594 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -15,6 +15,8 @@ #ifndef CEPH_OSD_NEWSTORE_H #define CEPH_OSD_NEWSTORE_H +#include "acconfig.h" + #include #include "include/assert.h" @@ -143,6 +145,8 @@ public: struct TransContext { typedef enum { STATE_PREPARE, + STATE_AIO_QUEUED, + STATE_AIO_DONE, STATE_FSYNC_QUEUED, STATE_FSYNC_FSYNCING, STATE_FSYNC_DONE, @@ -165,6 +169,8 @@ public: case STATE_FSYNC_QUEUED: return "fsync_queued"; case STATE_FSYNC_FSYNCING: return "fsync_fsyncing"; case STATE_FSYNC_DONE: return "fsync_done"; + case STATE_AIO_QUEUED: return "aio_queued"; + case STATE_AIO_DONE: return "aio_done"; case STATE_KV_QUEUED: return "kv_queued"; case STATE_KV_COMMITTING: return "kv_committing"; case STATE_KV_DONE: return "kv_done"; @@ -194,6 +200,10 @@ public: wal_transaction_t *wal_txn; ///< wal transaction (if any) unsigned num_fsyncs_completed; + list aios; + bufferlist aio_bl; // just a pile of refs + atomic_t num_aio; + Mutex lock; Cond cond; @@ -207,6 +217,7 @@ public: onreadable_sync(NULL), wal_txn(NULL), num_fsyncs_completed(0), + num_aio(0), lock("NewStore::TransContext::lock") { //cout << "txc new " << this << std::endl; } @@ -373,7 +384,9 @@ public: public: WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp) : ThreadPool::WorkQueue("NewStore::WALWQ", ti, sti, tp), - store(s) { + store(s), + ops(0), + bytes(0) { } bool _empty() { return wal_queue.empty(); @@ -445,6 +458,15 @@ public: } }; + struct AioCompletionThread : public Thread { + NewStore *store; + AioCompletionThread(NewStore *s) : store(s) {} + void *entry() { + store->_aio_thread(); + return NULL; + } + }; + // -------------------------------------------------------- // members private: @@ -479,6 +501,10 @@ private: ThreadPool fsync_tp; FsyncWQ fsync_wq; + AioCompletionThread aio_thread; + bool aio_stop; + FS::aio_queue_t aio_queue; + KVSyncThread kv_sync_thread; Mutex kv_lock; Cond kv_cond, kv_sync_cond; @@ -540,6 +566,10 @@ private: void _osr_reap_done(OpSequencer *osr); + void _aio_thread(); + int _aio_start(); + void _aio_stop(); + void _kv_sync_thread(); void _kv_stop() { { -- 2.39.5