if (r == 1) {
TransContext *txc = static_cast<TransContext*>(aio->priv);
int left = txc->num_aio.dec();
- dout(10) << __func__ << " finished aio on " << txc << ", "
- << left << " left" << dendl;
+ dout(10) << __func__ << " finished aio on " << txc << " state "
+ << txc->get_state_name() << ", "
+ << left << " aios left" << dendl;
+ VOID_TEMP_FAILURE_RETRY(::close(aio->fd));
if (left == 0) {
- txc->state = TransContext::STATE_AIO_DONE;
- if (!txc->fds.empty()) {
- _txc_queue_fsync(txc);
- } else {
- _txc_finish_fsync(txc);
+ switch (txc->state) {
+ case TransContext::STATE_AIO_QUEUED:
+ txc->state = TransContext::STATE_AIO_DONE;
+ if (!txc->fds.empty()) {
+ _txc_queue_fsync(txc);
+ } else {
+ _txc_finish_fsync(txc);
+ }
+ break;
+
+ case TransContext::STATE_WAL_AIO_WAIT:
+ _wal_finish(txc);
+ break;
+
+ default:
+ assert(0 == "unexpected txc state on aio completion");
}
}
}
return &txc->wal_txn->ops.back();
}
-int NewStore::_apply_wal_transaction(TransContext *txc)
+int NewStore::_wal_apply(TransContext *txc)
{
wal_transaction_t& wt = *txc->wal_txn;
dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
txc->state = TransContext::STATE_WAL_APPLYING;
- int r = _do_wal_transaction(wt);
+ txc->aios.clear();
+ int r = _do_wal_transaction(wt, txc);
if (r < 0)
return r;
+ if (!txc->aios.empty()) {
+ _txc_aio_submit(txc);
+ txc->state = TransContext::STATE_WAL_AIO_WAIT;
+ return 0;
+ } else {
+ return _wal_finish(txc);
+ }
+}
+
+int NewStore::_wal_finish(TransContext *txc)
+{
+ wal_transaction_t& wt = *txc->wal_txn;
+ dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
+
string key;
get_wal_key(wt.seq, &key);
KeyValueDB::Transaction cleanup = db->get_transaction();
return 0;
}
-int NewStore::_do_wal_transaction(wal_transaction_t& wt)
+int NewStore::_do_wal_transaction(wal_transaction_t& wt,
+ TransContext *txc)
{
vector<int> sync_fds;
sync_fds.reserve(wt.ops.size());
int fd = _open_fid(p->fid, flags);
if (fd < 0)
return fd;
- int r = ::lseek64(fd, p->offset, SEEK_SET);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " lseek64 on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- r = p->data.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " write_fd on " << fd << " got: "
- << cpp_strerror(r) << dendl;
- return r;
- }
- if (!(flags & O_DSYNC)) {
+#ifdef HAVE_LIBAIO
+ if (g_conf->newstore_aio && txc && (flags & O_DIRECT)) {
+ txc->aios.push_back(FS::aio_t(txc, fd));
+ FS::aio_t& aio = txc->aios.back();
+ p->data.prepare_iov(&aio.iov);
+ aio.pwritev(p->offset);
+ dout(2) << __func__ << " prepared aio " << &aio << dendl;
+ } else
+#endif
+ {
+ int r = ::lseek64(fd, p->offset, SEEK_SET);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " lseek64 on " << fd << " got: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+ r = p->data.write_fd(fd);
+ if (r < 0) {
+ derr << __func__ << " write_fd on " << fd << " got: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
sync_fds.push_back(fd);
}
}
<< cpp_strerror(r) << dendl;
return r;
}
+ // FIXME: do aio fdatasync?
sync_fds.push_back(fd);
}
break;
return -EIO;
}
dout(20) << __func__ << " replay " << it->key() << dendl;
- int r = _do_wal_transaction(wt);
+ int r = _do_wal_transaction(wt, NULL); // don't bother with aio here
if (r < 0)
return r;
cleanup->rmkey(PREFIX_WAL, it->key());
} else {
// async path
if (!txc->aios.empty()) {
+ _txc_aio_submit(txc);
txc->state = TransContext::STATE_AIO_QUEUED;
- dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios"
- << dendl;
- for (list<FS::aio_t>::iterator p = txc->aios.begin();
- p != txc->aios.end();
- ++p) {
- FS::aio_t& aio = *p;
- dout(20) << __func__ << " submitting aio " << &aio << dendl;
- for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
- dout(30) << __func__ << " iov " << (void*)q->iov_base
- << " len " << q->iov_len << dendl;
- dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
- << dendl;
- int r = aio_queue.submit(*p);
- if (r) {
- derr << " aio submit got " << cpp_strerror(r) << dendl;
- assert(r == 0);
- }
- }
} else if (!txc->fds.empty()) {
_txc_queue_fsync(txc);
} else {
return 0;
}
+void NewStore::_txc_aio_submit(TransContext *txc)
+{
+ int num = txc->aios.size();
+ dout(10) << __func__ << " submitting " << num << " aios" << dendl;
+ txc->num_aio.set(num);
+ for (list<FS::aio_t>::iterator p = txc->aios.begin();
+ p != txc->aios.end();
+ ++p) {
+ FS::aio_t& aio = *p;
+ dout(20) << __func__ << " submitting aio " << &aio << dendl;
+ for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
+ dout(30) << __func__ << " iov " << (void*)q->iov_base
+ << " len " << q->iov_len << dendl;
+ dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
+ << dendl;
+ int r = aio_queue.submit(*p);
+ if (r) {
+ derr << " aio submit got " << cpp_strerror(r) << dendl;
+ assert(r == 0);
+ }
+ }
+}
+
int NewStore::_do_transaction(Transaction *t,
TransContext *txc,
ThreadPool::TPHandle *handle)
#ifdef HAVE_LIBAIO
if (g_conf->newstore_aio && (flags & O_DIRECT)) {
txc->aios.push_back(FS::aio_t(txc, fd));
- txc->num_aio.inc();
FS::aio_t& aio = txc->aios.back();
bl.prepare_iov(&aio.iov);
txc->aio_bl.append(bl);
aio.pwritev(x_offset);
-
dout(2) << __func__ << " prepared aio " << &aio << dendl;
} else
#endif
derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
goto out;
}
- }
- if (!(flags & O_DSYNC)) {
txc->sync_fd(fd);
}
r = 0;
#ifdef HAVE_LIBAIO
if (g_conf->newstore_aio && (flags & O_DIRECT)) {
txc->aios.push_back(FS::aio_t(txc, fd));
- txc->num_aio.inc();
FS::aio_t& aio = txc->aios.back();
bl.prepare_iov(&aio.iov);
txc->aio_bl.append(bl);
STATE_KV_DONE,
STATE_WAL_QUEUED,
STATE_WAL_APPLYING,
+ STATE_WAL_AIO_WAIT,
STATE_WAL_CLEANUP, // remove wal kv record
STATE_WAL_DONE,
STATE_FINISHING,
case STATE_KV_DONE: return "kv_done";
case STATE_WAL_QUEUED: return "wal_queued";
case STATE_WAL_APPLYING: return "wal_applying";
+ case STATE_WAL_AIO_WAIT: return "wal_aio_wait";
case STATE_WAL_CLEANUP: return "wal_cleanup";
case STATE_WAL_DONE: return "wal_done";
case STATE_FINISHING: return "finishing";
return i;
}
void _process(TransContext *i, ThreadPool::TPHandle &handle) {
- store->_apply_wal_transaction(i);
+ store->_wal_apply(i);
i->osr->wal_apply_lock.Unlock();
}
void _clear() {
TransContext *_txc_create(OpSequencer *osr);
int _txc_finalize(OpSequencer *osr, TransContext *txc);
+ void _txc_aio_submit(TransContext *txc);
void _txc_queue_fsync(TransContext *txc);
void _txc_process_fsync(fsync_item *i);
void _txc_finish_fsync(TransContext *txc);
}
wal_op_t *_get_wal_op(TransContext *txc);
- int _apply_wal_transaction(TransContext *txc);
- int _do_wal_transaction(wal_transaction_t& wt);
+ int _wal_apply(TransContext *txc);
+ int _wal_finish(TransContext *txc);
+ int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc);
void _wait_object_wal(OnodeRef onode);
int _replay_wal();
friend class C_ApplyWAL;