if (r < 0)
goto out_db;
- r = _replay_wal();
+ r = _wal_replay();
if (r < 0)
goto out_aio;
return txc;
}
+void NewStore::_txc_state_proc(TransContext *txc)
+{
+ while (true) {
+ dout(10) << __func__ << " txc " << txc
+ << " " << txc->get_state_name() << dendl;
+ switch (txc->state) {
+ case TransContext::STATE_PREPARE:
+ if (!txc->aios.empty()) {
+ txc->state = TransContext::STATE_AIO_WAIT;
+ _txc_aio_submit(txc);
+ return;
+ }
+ // ** fall-thru **
+
+ case TransContext::STATE_AIO_WAIT:
+ if (!txc->fds.empty()) {
+ txc->state = TransContext::STATE_FSYNC_WAIT;
+ if (!g_conf->newstore_sync_io) {
+ _txc_queue_fsync(txc);
+ return;
+ }
+ _txc_do_sync_fsync(txc);
+ }
+ _txc_finish_io(txc); // may trigger blocked txc's too
+ return;
+
+ case TransContext::STATE_IO_DONE:
+ assert(txc->osr->qlock.is_locked()); // see _txc_finish_io
+ txc->state = TransContext::STATE_KV_QUEUED;
+ if (!g_conf->newstore_sync_transaction) {
+ Mutex::Locker l(kv_lock);
+ db->submit_transaction(txc->t);
+ kv_queue.push_back(txc);
+ kv_cond.SignalOne();
+ return;
+ }
+ db->submit_transaction_sync(txc->t);
+ break;
+
+ case TransContext::STATE_KV_QUEUED:
+ txc->state = TransContext::STATE_KV_DONE;
+ _txc_finish_kv(txc);
+ // ** fall-thru **
+
+ case TransContext::STATE_KV_DONE:
+ if (txc->wal_txn) {
+ txc->state = TransContext::STATE_WAL_QUEUED;
+ wal_wq.queue(txc);
+ return;
+ }
+ txc->state = TransContext::STATE_FINISHING;
+ break;
+
+ case TransContext::STATE_WAL_APPLYING:
+ if (!txc->aios.empty()) {
+ txc->state = TransContext::STATE_WAL_AIO_WAIT;
+ _txc_aio_submit(txc);
+ return;
+ }
+ // ** fall-thru **
+
+ case TransContext::STATE_WAL_AIO_WAIT:
+ _wal_finish(txc);
+ return;
+
+ case TransContext::STATE_WAL_CLEANUP:
+ txc->state = TransContext::STATE_FINISHING;
+ // ** fall-thru **
+
+ case TransContext::TransContext::STATE_FINISHING:
+ _txc_finish(txc);
+ return;
+
+ default:
+ derr << __func__ << " unexpected txc " << txc
+ << " state " << txc->get_state_name() << dendl;
+ assert(0 == "unexpected txc state");
+ return;
+ }
+ }
+}
+
void NewStore::_txc_process_fsync(fsync_item *i)
{
dout(20) << __func__ << " txc " << i->txc << dendl;
}
VOID_TEMP_FAILURE_RETRY(::close(i->fd));
if (i->txc->finish_fsync()) {
- _txc_finish_fsync(i->txc);
+ _txc_finish_io(i->txc);
}
dout(20) << __func__ << " txc " << i->txc << " done" << dendl;
}
-void NewStore::_txc_finish_fsync(TransContext *txc)
+void NewStore::_txc_finish_io(TransContext *txc)
{
dout(20) << __func__ << " " << txc << dendl;
OpSequencer *osr = txc->osr.get();
Mutex::Locker l(osr->qlock);
- txc->state = TransContext::STATE_FSYNC_DONE;
+ txc->state = TransContext::STATE_IO_DONE;
OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
while (p != osr->q.begin()) {
--p;
- if (p->state < TransContext::STATE_FSYNC_DONE) {
+ if (p->state < TransContext::STATE_IO_DONE) {
dout(20) << __func__ << " " << txc << " blocked by " << &*p << " "
<< p->get_state_name() << dendl;
return;
}
- if (p->state > TransContext::STATE_FSYNC_DONE) {
+ if (p->state > TransContext::STATE_IO_DONE) {
++p;
break;
}
}
do {
- _txc_submit_kv(&*p++);
+ _txc_state_proc(&*p++);
} while (p != osr->q.end() &&
- p->state == TransContext::STATE_FSYNC_DONE);
+ p->state == TransContext::STATE_IO_DONE);
}
int NewStore::_txc_finalize(OpSequencer *osr, TransContext *txc)
void NewStore::_txc_queue_fsync(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
- txc->state = TransContext::STATE_FSYNC_QUEUED;
fsync_wq.lock();
for (list<fsync_item>::iterator p = txc->fds.begin();
p != txc->fds.end();
fsync_wq.unlock();
}
-void NewStore::_txc_submit_kv(TransContext *txc)
+void NewStore::_txc_do_sync_fsync(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
- txc->state = TransContext::STATE_KV_QUEUED;
-
- Mutex::Locker l(kv_lock);
- db->submit_transaction(txc->t);
- kv_queue.push_back(txc);
- kv_cond.SignalOne();
+ for (list<fsync_item>::iterator p = txc->fds.begin();
+ p != txc->fds.end(); ++p) {
+ dout(30) << __func__ << " fsync " << p->fd << dendl;
+ int r = ::fdatasync(p->fd);
+ if (r < 0) {
+ r = -errno;
+ derr << __func__ << " fsync: " << cpp_strerror(r) << dendl;
+ assert(0 == "fsync error");
+ }
+ VOID_TEMP_FAILURE_RETRY(::close(p->fd));
+ }
}
void NewStore::_txc_finish_kv(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << dendl;
- txc->osr->qlock.Lock();
- txc->state = TransContext::STATE_KV_DONE;
// warning: we're calling onreadable_sync inside the sequencer lock
if (txc->onreadable_sync) {
finisher.queue(txc->oncommits.front());
txc->oncommits.pop_front();
}
-
- if (txc->wal_txn) {
- dout(20) << __func__ << " starting wal apply" << dendl;
- txc->state = TransContext::STATE_WAL_QUEUED;
- txc->osr->qlock.Unlock();
- wal_wq.queue(txc);
- } else {
- txc->state = TransContext::STATE_FINISHING;
- txc->osr->qlock.Unlock();
- _txc_finish_apply(txc);
- }
}
-void NewStore::_txc_finish_apply(TransContext *txc)
+void NewStore::_txc_finish(TransContext *txc)
{
dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
assert(txc->state == TransContext::STATE_FINISHING);
<< left << " aios left" << dendl;
VOID_TEMP_FAILURE_RETRY(::close(aio->fd));
if (left == 0) {
- switch (txc->state) {
- case TransContext::STATE_AIO_QUEUED:
- txc->state = TransContext::STATE_AIO_DONE;
- if (!txc->fds.empty()) {
- _txc_queue_fsync(txc);
- } else {
- _txc_finish_fsync(txc);
- }
- break;
-
- case TransContext::STATE_WAL_AIO_WAIT:
- _wal_finish(txc);
- break;
-
- default:
- assert(0 == "unexpected txc state on aio completion");
- }
+ _txc_state_proc(txc);
}
}
}
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
- if (txc->state == TransContext::STATE_WAL_CLEANUP) {
- txc->osr->qlock.Lock();
- txc->state = TransContext::STATE_FINISHING;
- txc->osr->qlock.Unlock();
- _txc_finish_apply(txc);
- } else if (txc->state == TransContext::STATE_KV_QUEUED) {
- _txc_finish_kv(txc);
- } else {
- derr << __func__ << " unexpected txc state " << txc->get_state_name()
- << dendl;
- assert(0);
- }
+ _txc_state_proc(txc);
kv_committing.pop_front();
}
txc->aios.clear();
int r = _do_wal_transaction(wt, txc);
- if (r < 0)
- return r;
+ assert(r == 0);
- if (!txc->aios.empty()) {
- _txc_aio_submit(txc);
- txc->state = TransContext::STATE_WAL_AIO_WAIT;
- return 0;
- } else {
- return _wal_finish(txc);
- }
+ _txc_state_proc(txc);
+ return 0;
}
int NewStore::_wal_finish(TransContext *txc)
KeyValueDB::Transaction cleanup = db->get_transaction();
cleanup->rmkey(PREFIX_WAL, key);
- txc->osr->qlock.Lock();
txc->state = TransContext::STATE_WAL_CLEANUP;
- txc->osr->qlock.Unlock();
Mutex::Locker l(kv_lock);
db->submit_transaction(cleanup);
return 0;
}
-int NewStore::_replay_wal()
+int NewStore::_wal_replay()
{
dout(10) << __func__ << " start" << dendl;
KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
tls, &onreadable, &ondisk, &onreadable_sync);
int r;
- // throttle wal work
+ // throttle on wal work
wal_wq.throttle(g_conf->newstore_wal_max_ops,
g_conf->newstore_wal_max_bytes);
dout(5) << __func__ << " new " << *osr << "/" << osr->parent << dendl;
}
+ // prepare
TransContext *txc = _txc_create(osr);
+ txc->onreadable = onreadable;
+ txc->onreadable_sync = onreadable_sync;
+ txc->oncommit = ondisk;
- // XXX do it sync for now; this is not crash safe
for (list<Transaction*>::iterator p = tls.begin(); p != tls.end(); ++p) {
(*p)->set_osr(osr);
- _do_transaction(*p, txc, handle);
+ _txc_add_transaction(txc, *p);
}
- txc->onreadable = onreadable;
- txc->onreadable_sync = onreadable_sync;
- txc->oncommit = ondisk;
-
r = _txc_finalize(osr, txc);
assert(r == 0);
- if (g_conf->newstore_sync_queue_transaction) {
- // do it syncrhonously. for example, if we have a *very* fast backend.
-
- // sync
- txc->state = TransContext::STATE_FSYNC_FSYNCING;
- for (list<fsync_item>::iterator p = txc->fds.begin();
- p != txc->fds.end(); ++p) {
- dout(30) << __func__ << " fsync " << p->fd << dendl;
- int r = ::fdatasync(p->fd);
- if (r < 0) {
- r = -errno;
- derr << __func__ << " fsync: " << cpp_strerror(r) << dendl;
- return r;
- }
- VOID_TEMP_FAILURE_RETRY(::close(p->fd));
- }
-
- txc->state = TransContext::STATE_KV_COMMITTING;
- db->submit_transaction_sync(txc->t);
-
- _txc_finish_kv(txc);
- } else {
- // async path
- if (!txc->aios.empty()) {
- _txc_aio_submit(txc);
- txc->state = TransContext::STATE_AIO_QUEUED;
- } else if (!txc->fds.empty()) {
- _txc_queue_fsync(txc);
- } else {
- _txc_finish_fsync(txc);
- }
- }
-
+ // execute (start)
+ _txc_state_proc(txc);
return 0;
}
p != txc->aios.end();
++p) {
FS::aio_t& aio = *p;
- dout(20) << __func__ << " submitting aio " << &aio << dendl;
+ dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd << dendl;
for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
dout(30) << __func__ << " iov " << (void*)q->iov_base
<< " len " << q->iov_len << dendl;
}
}
-int NewStore::_do_transaction(Transaction *t,
- TransContext *txc,
- ThreadPool::TPHandle *handle)
+int NewStore::_txc_add_transaction(TransContext *txc, Transaction *t)
{
Transaction::iterator i = t->begin();
int pos = 0;
struct TransContext {
typedef enum {
STATE_PREPARE,
- STATE_AIO_QUEUED,
- STATE_AIO_DONE,
- STATE_FSYNC_QUEUED,
- STATE_FSYNC_FSYNCING,
- STATE_FSYNC_DONE,
+ STATE_FSYNC_WAIT,
+ STATE_AIO_WAIT,
+ STATE_IO_DONE,
STATE_KV_QUEUED,
STATE_KV_COMMITTING,
STATE_KV_DONE,
const char *get_state_name() {
switch (state) {
case STATE_PREPARE: return "prepare";
- case STATE_FSYNC_QUEUED: return "fsync_queued";
- case STATE_FSYNC_FSYNCING: return "fsync_fsyncing";
- case STATE_FSYNC_DONE: return "fsync_done";
- case STATE_AIO_QUEUED: return "aio_queued";
- case STATE_AIO_DONE: return "aio_done";
+ case STATE_FSYNC_WAIT: return "fsync_wait";
+ case STATE_AIO_WAIT: return "aio_wait";
+ case STATE_IO_DONE: return "io_done";
case STATE_KV_QUEUED: return "kv_queued";
case STATE_KV_COMMITTING: return "kv_committing";
case STATE_KV_DONE: return "kv_done";
int _clean_fid_tail(TransContext *txc, const fragment_t& f);
TransContext *_txc_create(OpSequencer *osr);
+ int _txc_add_transaction(TransContext *txc, Transaction *t);
int _txc_finalize(OpSequencer *osr, TransContext *txc);
+ void _txc_state_proc(TransContext *txc);
void _txc_aio_submit(TransContext *txc);
+ void _txc_do_sync_fsync(TransContext *txc);
void _txc_queue_fsync(TransContext *txc);
void _txc_process_fsync(fsync_item *i);
- void _txc_finish_fsync(TransContext *txc);
- void _txc_submit_kv(TransContext *txc);
+ void _txc_finish_io(TransContext *txc);
void _txc_finish_kv(TransContext *txc);
- void _txc_finish_apply(TransContext *txc);
+ void _txc_finish(TransContext *txc);
void _osr_reap_done(OpSequencer *osr);
int _wal_apply(TransContext *txc);
int _wal_finish(TransContext *txc);
int _do_wal_transaction(wal_transaction_t& wt, TransContext *txc);
- void _wait_object_wal(OnodeRef onode);
- int _replay_wal();
- friend class C_ApplyWAL;
+ int _wal_replay();
public:
NewStore(CephContext *cct, const string& path);