From: Sage Weil Date: Mon, 20 Apr 2015 22:33:11 +0000 (-0700) Subject: os/newstore: use aio for wal writes, too X-Git-Tag: v9.1.0~242^2~52 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2317e446c5e86f5ec6c74e6b59d8c4d6981ba62d;p=ceph.git os/newstore: use aio for wal writes, too Signed-off-by: Sage Weil --- diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index 8dd10d6c5a9a..246ff1869926 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -2245,14 +2245,27 @@ void NewStore::_aio_thread() if (r == 1) { TransContext *txc = static_cast(aio->priv); int left = txc->num_aio.dec(); - dout(10) << __func__ << " finished aio on " << txc << ", " - << left << " left" << dendl; + dout(10) << __func__ << " finished aio on " << txc << " state " + << txc->get_state_name() << ", " + << left << " aios left" << dendl; + VOID_TEMP_FAILURE_RETRY(::close(aio->fd)); if (left == 0) { - txc->state = TransContext::STATE_AIO_DONE; - if (!txc->fds.empty()) { - _txc_queue_fsync(txc); - } else { - _txc_finish_fsync(txc); + switch (txc->state) { + case TransContext::STATE_AIO_QUEUED: + txc->state = TransContext::STATE_AIO_DONE; + if (!txc->fds.empty()) { + _txc_queue_fsync(txc); + } else { + _txc_finish_fsync(txc); + } + break; + + case TransContext::STATE_WAL_AIO_WAIT: + _wal_finish(txc); + break; + + default: + assert(0 == "unexpected txc state on aio completion"); } } } @@ -2319,16 +2332,31 @@ wal_op_t *NewStore::_get_wal_op(TransContext *txc) return &txc->wal_txn->ops.back(); } -int NewStore::_apply_wal_transaction(TransContext *txc) +int NewStore::_wal_apply(TransContext *txc) { wal_transaction_t& wt = *txc->wal_txn; dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl; txc->state = TransContext::STATE_WAL_APPLYING; - int r = _do_wal_transaction(wt); + txc->aios.clear(); + int r = _do_wal_transaction(wt, txc); if (r < 0) return r; + if (!txc->aios.empty()) { + _txc_aio_submit(txc); + txc->state = TransContext::STATE_WAL_AIO_WAIT; + return 0; + } else { + return _wal_finish(txc); + } +} + +int NewStore::_wal_finish(TransContext *txc) +{ + wal_transaction_t& wt = *txc->wal_txn; + dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl; + string key; get_wal_key(wt.seq, &key); KeyValueDB::Transaction cleanup = db->get_transaction(); @@ -2345,7 +2373,8 @@ int NewStore::_apply_wal_transaction(TransContext *txc) return 0; } -int NewStore::_do_wal_transaction(wal_transaction_t& wt) +int NewStore::_do_wal_transaction(wal_transaction_t& wt, + TransContext *txc) { vector sync_fds; sync_fds.reserve(wt.ops.size()); @@ -2372,20 +2401,29 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt) int fd = _open_fid(p->fid, flags); if (fd < 0) return fd; - int r = ::lseek64(fd, p->offset, SEEK_SET); - if (r < 0) { - r = -errno; - derr << __func__ << " lseek64 on " << fd << " got: " - << cpp_strerror(r) << dendl; - return r; - } - r = p->data.write_fd(fd); - if (r < 0) { - derr << __func__ << " write_fd on " << fd << " got: " - << cpp_strerror(r) << dendl; - return r; - } - if (!(flags & O_DSYNC)) { +#ifdef HAVE_LIBAIO + if (g_conf->newstore_aio && txc && (flags & O_DIRECT)) { + txc->aios.push_back(FS::aio_t(txc, fd)); + FS::aio_t& aio = txc->aios.back(); + p->data.prepare_iov(&aio.iov); + aio.pwritev(p->offset); + dout(2) << __func__ << " prepared aio " << &aio << dendl; + } else +#endif + { + int r = ::lseek64(fd, p->offset, SEEK_SET); + if (r < 0) { + r = -errno; + derr << __func__ << " lseek64 on " << fd << " got: " + << cpp_strerror(r) << dendl; + return r; + } + r = p->data.write_fd(fd); + if (r < 0) { + derr << __func__ << " write_fd on " << fd << " got: " + << cpp_strerror(r) << dendl; + return r; + } sync_fds.push_back(fd); } } @@ -2403,6 +2441,7 @@ int NewStore::_do_wal_transaction(wal_transaction_t& wt) << cpp_strerror(r) << dendl; return r; } + // FIXME: do aio fdatasync? sync_fds.push_back(fd); } break; @@ -2469,7 +2508,7 @@ int NewStore::_replay_wal() return -EIO; } dout(20) << __func__ << " replay " << it->key() << dendl; - int r = _do_wal_transaction(wt); + int r = _do_wal_transaction(wt, NULL); // don't bother with aio here if (r < 0) return r; cleanup->rmkey(PREFIX_WAL, it->key()); @@ -2557,25 +2596,8 @@ int NewStore::queue_transactions( } else { // async path if (!txc->aios.empty()) { + _txc_aio_submit(txc); txc->state = TransContext::STATE_AIO_QUEUED; - dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios" - << dendl; - for (list::iterator p = txc->aios.begin(); - p != txc->aios.end(); - ++p) { - FS::aio_t& aio = *p; - dout(20) << __func__ << " submitting aio " << &aio << dendl; - for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) - dout(30) << __func__ << " iov " << (void*)q->iov_base - << " len " << q->iov_len << dendl; - dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR) - << dendl; - int r = aio_queue.submit(*p); - if (r) { - derr << " aio submit got " << cpp_strerror(r) << dendl; - assert(r == 0); - } - } } else if (!txc->fds.empty()) { _txc_queue_fsync(txc); } else { @@ -2586,6 +2608,29 @@ int NewStore::queue_transactions( return 0; } +void NewStore::_txc_aio_submit(TransContext *txc) +{ + int num = txc->aios.size(); + dout(10) << __func__ << " submitting " << num << " aios" << dendl; + txc->num_aio.set(num); + for (list::iterator p = txc->aios.begin(); + p != txc->aios.end(); + ++p) { + FS::aio_t& aio = *p; + dout(20) << __func__ << " submitting aio " << &aio << dendl; + for (vector::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q) + dout(30) << __func__ << " iov " << (void*)q->iov_base + << " len " << q->iov_len << dendl; + dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR) + << dendl; + int r = aio_queue.submit(*p); + if (r) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + assert(r == 0); + } + } +} + int NewStore::_do_transaction(Transaction *t, TransContext *txc, ThreadPool::TPHandle *handle) @@ -3193,12 +3238,10 @@ int NewStore::_do_write(TransContext *txc, #ifdef HAVE_LIBAIO if (g_conf->newstore_aio && (flags & O_DIRECT)) { txc->aios.push_back(FS::aio_t(txc, fd)); - txc->num_aio.inc(); FS::aio_t& aio = txc->aios.back(); bl.prepare_iov(&aio.iov); txc->aio_bl.append(bl); aio.pwritev(x_offset); - dout(2) << __func__ << " prepared aio " << &aio << dendl; } else #endif @@ -3209,8 +3252,6 @@ int NewStore::_do_write(TransContext *txc, derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl; goto out; } - } - if (!(flags & O_DSYNC)) { txc->sync_fd(fd); } r = 0; @@ -3245,7 +3286,6 @@ int NewStore::_do_write(TransContext *txc, #ifdef HAVE_LIBAIO if (g_conf->newstore_aio && (flags & O_DIRECT)) { txc->aios.push_back(FS::aio_t(txc, fd)); - txc->num_aio.inc(); FS::aio_t& aio = txc->aios.back(); bl.prepare_iov(&aio.iov); txc->aio_bl.append(bl); diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index f96f85270c59..93e547566b3e 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -155,6 +155,7 @@ public: STATE_KV_DONE, STATE_WAL_QUEUED, STATE_WAL_APPLYING, + STATE_WAL_AIO_WAIT, STATE_WAL_CLEANUP, // remove wal kv record STATE_WAL_DONE, STATE_FINISHING, @@ -176,6 +177,7 @@ public: case STATE_KV_DONE: return "kv_done"; case STATE_WAL_QUEUED: return "wal_queued"; case STATE_WAL_APPLYING: return "wal_applying"; + case STATE_WAL_AIO_WAIT: return "wal_aio_wait"; case STATE_WAL_CLEANUP: return "wal_cleanup"; case STATE_WAL_DONE: return "wal_done"; case STATE_FINISHING: return "finishing"; @@ -424,7 +426,7 @@ public: return i; } void _process(TransContext *i, ThreadPool::TPHandle &handle) { - store->_apply_wal_transaction(i); + store->_wal_apply(i); i->osr->wal_apply_lock.Unlock(); } void _clear() { @@ -557,6 +559,7 @@ private: TransContext *_txc_create(OpSequencer *osr); int _txc_finalize(OpSequencer *osr, TransContext *txc); + void _txc_aio_submit(TransContext *txc); void _txc_queue_fsync(TransContext *txc); void _txc_process_fsync(fsync_item *i); void _txc_finish_fsync(TransContext *txc); @@ -582,8 +585,9 @@ private: } wal_op_t *_get_wal_op(TransContext *txc); - int _apply_wal_transaction(TransContext *txc); - int _do_wal_transaction(wal_transaction_t& wt); + int _wal_apply(TransContext *txc); + int _wal_finish(TransContext *txc); + int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc); void _wait_object_wal(OnodeRef onode); int _replay_wal(); friend class C_ApplyWAL;