From c866536bff5bdd9c31772dfa1e63e26faba49b4c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 22 Mar 2011 14:52:15 -0700 Subject: [PATCH] FileStore: replace op_queue_throttle with op_queue_reserve_throttle Previously, queue_op would call op_queue_throttle while holding the journal_lock. op_queue_throttle, however, can sleep. We fix the problem by: 1) Factor build_op out of queue_op 2) op_queue_throttle is now op_queue_reserve_throttle and takes an op as an argument. op_queue_reserve_throttle can be called before the journal lock is taken. This also avoids the race between calling throttle and incrementing op_queue_bytes and op_queue_len. 3) queue_op now takes the op generated using build_op as an argument. 4) _journaled_ahead no longer needs to call throttle as queue_transactions has already reserved space. Signed-off-by: Samuel Just --- src/os/FileStore.cc | 135 +++++++++++++++++++++----------------------- src/os/FileStore.h | 12 ++-- 2 files changed, 70 insertions(+), 77 deletions(-) diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index bd3aa35171501..c3a7b2e983da8 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1493,8 +1493,8 @@ void FileStore::stop_logger() /// ----------------------------- -void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list& tls, - Context *onreadable, Context *onreadable_sync) +FileStore::Op *FileStore::build_op(list& tls, + Context *onreadable, Context *onreadable_sync) { uint64_t bytes = 0, ops = 0; for (list::iterator p = tls.begin(); @@ -1504,55 +1504,57 @@ void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list& ops += (*p)->get_num_ops(); } - // initialize next_finish on first op - if (next_finish == 0) - next_finish = op_seq; - - // mark apply start _now_, because we need to drain the entire apply - // queue during commit in order to put the store in a consistent - // state. - _op_apply_start(op_seq); - Op *o = new Op; - o->op = op_seq; o->tls.swap(tls); o->onreadable = onreadable; o->onreadable_sync = onreadable_sync; o->ops = ops; o->bytes = bytes; + return o; +} + + +void FileStore::queue_op(OpSequencer *osr, Op *o) +{ + assert(journal_lock.is_locked()); + // initialize next_finish on first op + if (next_finish == 0) + next_finish = op_seq; + + // mark apply start _now_, because we need to drain the entire apply + // queue during commit in order to put the store in a consistent + // state. + _op_apply_start(o->op); op_tp.lock(); osr->queue(o); - _op_queue_throttle("queue_op"); - - op_queue_len++; - op_queue_bytes += bytes; if (logger) { logger->inc(l_os_ops); - logger->inc(l_os_bytes, bytes); + logger->inc(l_os_bytes, o->bytes); logger->set(l_os_oq_ops, op_queue_len); logger->set(l_os_oq_bytes, op_queue_bytes); } op_tp.unlock(); - dout(5) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" + dout(5) << "queue_op " << o << " seq " << o->op << " " << o->bytes << " bytes" << " (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)" << dendl; op_wq.queue(osr); } -void FileStore::op_queue_throttle() +void FileStore::op_queue_reserve_throttle(Op *o) { op_tp.lock(); - _op_queue_throttle("op_queue_throttle"); + _op_queue_reserve_throttle(o, "op_queue_reserve_throttle"); op_tp.unlock(); } -void FileStore::_op_queue_throttle(const char *caller) +void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller) { + // Do not call while holding the journal lock! uint64_t max_ops = g_conf.filestore_queue_max_ops; uint64_t max_bytes = g_conf.filestore_queue_max_bytes; @@ -1566,13 +1568,24 @@ void FileStore::_op_queue_throttle(const char *caller) logger->set(l_os_oq_max_bytes, max_bytes); } - while ((max_ops && op_queue_len >= max_ops) || - (max_bytes && op_queue_bytes >= max_bytes)) { + while ((max_ops && (op_queue_len + 1) > max_ops) || + (max_bytes && (op_queue_bytes + o->bytes) > max_bytes)) { dout(2) << caller << " waiting: " - << op_queue_len << " > " << max_ops << " ops || " - << op_queue_bytes << " > " << max_bytes << dendl; + << op_queue_len + 1 << " > " << max_ops << " ops || " + << op_queue_bytes + o->bytes << " > " << max_bytes << dendl; op_tp.wait(op_throttle_cond); } + + op_queue_len++; + op_queue_bytes += o->bytes; +} + +void FileStore::_op_queue_release_throttle(Op *o) +{ + // Called with op_tp lock! + op_queue_len--; + op_queue_bytes -= o->bytes; + op_throttle_cond.Signal(); } void FileStore::_do_op(OpSequencer *osr) @@ -1599,9 +1612,7 @@ void FileStore::_finish_op(OpSequencer *osr) osr->apply_lock.Unlock(); // locked in _do_op // called with tp lock held - op_queue_len--; - op_queue_bytes -= o->bytes; - op_throttle_cond.Signal(); + _op_queue_release_throttle(o); if (logger) { logger->inc(l_os_readable_ops); @@ -1644,16 +1655,13 @@ void FileStore::_finish_op(OpSequencer *osr) struct C_JournaledAhead : public Context { FileStore *fs; FileStore::OpSequencer *osr; - uint64_t op; - list tls; - Context *onreadable, *onreadable_sync; + FileStore::Op *o; Context *ondisk; - C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list& t, - Context *onr, Context *ond, Context *onrsync) : - fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { } + C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk): + fs(f), osr(os), o(o), ondisk(ondisk) { } void finish(int r) { - fs->_journaled_ahead(osr, op, tls, onreadable, ondisk, onreadable_sync); + fs->_journaled_ahead(osr, o, ondisk); } }; @@ -1687,40 +1695,29 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, //logger->inc(l_os_in_bytes, 1); } - if (journal && journal->is_writeable()) { + if (journal && journal->is_writeable() && !g_conf.filestore_journal_trailing) { + Op *o = build_op(tls, onreadable, onreadable_sync); + op_queue_reserve_throttle(o); + journal->throttle(); + o->op = op_submit_start(); if (g_conf.filestore_journal_parallel) { - - // FIXME: these throttle blocks can build up many threads, and - // then let them all (too many!) through when some space is - // available. - - journal->throttle(); // make sure we're not ahead of the jouranl - op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue. - - uint64_t op = op_submit_start(); - dout(5) << "queue_transactions (parallel) " << op << " " << tls << dendl; + dout(5) << "queue_transactions (parallel) " << o->op << " " << tls << dendl; - _op_journal_transactions(tls, op, ondisk); + _op_journal_transactions(tls, o->op, ondisk); // queue inside journal lock, to preserve ordering - queue_op(osr, op, tls, onreadable, onreadable_sync); + queue_op(osr, o); + } else if (g_conf.filestore_journal_writeahead) { + dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl; - op_submit_finish(op); - return 0; - } - else if (g_conf.filestore_journal_writeahead) { - - journal->throttle(); // make sure we're not ahead of the journal - op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue. - - uint64_t op = op_submit_start(); - dout(5) << "queue_transactions (writeahead) " << op << " " << tls << dendl; - osr->queue_journal(op); - _op_journal_transactions(tls, op, - new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync)); - op_submit_finish(op); - return 0; + osr->queue_journal(o->op); + + _op_journal_transactions(tls, o->op, new C_JournaledAhead(this, osr, o, ondisk)); + } else { + assert(0); } + op_submit_finish(o->op); + return 0; } uint64_t op = op_submit_start(); @@ -1754,19 +1751,13 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, return r; } -void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op, - list &tls, - Context *onreadable, Context *ondisk, - Context *onreadable_sync) +void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk) { - dout(5) << "_journaled_ahead " << op << " " << tls << dendl; - - op_queue_throttle(); - + dout(5) << "_journaled_ahead " << o->op << " " << o->tls << dendl; // this should queue in order because the journal does it's completions in order. journal_lock.Lock(); - queue_op(osr, op, tls, onreadable, onreadable_sync); + queue_op(osr, o); journal_lock.Unlock(); osr->dequeue_journal(); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 20af722639e0e..4c961fa6f7b37 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -206,11 +206,13 @@ class FileStore : public JournalingObjectStore { void _do_op(OpSequencer *o); void _finish_op(OpSequencer *o); - void queue_op(OpSequencer *osr, uint64_t op, list& tls, Context *onreadable, Context *onreadable_sync); - void op_queue_throttle(); - void _op_queue_throttle(const char *caller = 0); - void _journaled_ahead(OpSequencer *osr, uint64_t op, list &tls, - Context *onreadable, Context *ondisk, Context *onreadable_sync); + Op *build_op(list& tls, + Context *onreadable, Context *onreadable_sync); + void queue_op(OpSequencer *osr, Op *o); + void op_queue_reserve_throttle(Op *o); + void _op_queue_reserve_throttle(Op *o, const char *caller = 0); + void _op_queue_release_throttle(Op *o); + void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend class C_JournaledAhead; // flusher thread -- 2.39.5