/// -----------------------------
-void FileStore::queue_op(OpSequencer *osr, uint64_t op_seq, list<Transaction*>& tls,
- Context *onreadable, Context *onreadable_sync)
+FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
+ Context *onreadable, Context *onreadable_sync)
{
uint64_t bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
ops += (*p)->get_num_ops();
}
- // initialize next_finish on first op
- if (next_finish == 0)
- next_finish = op_seq;
-
- // mark apply start _now_, because we need to drain the entire apply
- // queue during commit in order to put the store in a consistent
- // state.
- _op_apply_start(op_seq);
-
Op *o = new Op;
- o->op = op_seq;
o->tls.swap(tls);
o->onreadable = onreadable;
o->onreadable_sync = onreadable_sync;
o->ops = ops;
o->bytes = bytes;
+ return o;
+}
+
+
+void FileStore::queue_op(OpSequencer *osr, Op *o)
+{
+ assert(journal_lock.is_locked());
+ // initialize next_finish on first op
+ if (next_finish == 0)
+ next_finish = op_seq;
+
+ // mark apply start _now_, because we need to drain the entire apply
+ // queue during commit in order to put the store in a consistent
+ // state.
+ _op_apply_start(o->op);
op_tp.lock();
osr->queue(o);
- _op_queue_throttle("queue_op");
-
- op_queue_len++;
- op_queue_bytes += bytes;
if (logger) {
logger->inc(l_os_ops);
- logger->inc(l_os_bytes, bytes);
+ logger->inc(l_os_bytes, o->bytes);
logger->set(l_os_oq_ops, op_queue_len);
logger->set(l_os_oq_bytes, op_queue_bytes);
}
op_tp.unlock();
- dout(5) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes"
+ dout(5) << "queue_op " << o << " seq " << o->op << " " << o->bytes << " bytes"
<< " (queue has " << op_queue_len << " ops and " << op_queue_bytes << " bytes)"
<< dendl;
op_wq.queue(osr);
}
-void FileStore::op_queue_throttle()
+void FileStore::op_queue_reserve_throttle(Op *o)
{
op_tp.lock();
- _op_queue_throttle("op_queue_throttle");
+ _op_queue_reserve_throttle(o, "op_queue_reserve_throttle");
op_tp.unlock();
}
-void FileStore::_op_queue_throttle(const char *caller)
+void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller)
{
+ // Do not call while holding the journal lock!
uint64_t max_ops = g_conf.filestore_queue_max_ops;
uint64_t max_bytes = g_conf.filestore_queue_max_bytes;
logger->set(l_os_oq_max_bytes, max_bytes);
}
- while ((max_ops && op_queue_len >= max_ops) ||
- (max_bytes && op_queue_bytes >= max_bytes)) {
+ while ((max_ops && (op_queue_len + 1) > max_ops) ||
+ (max_bytes && (op_queue_bytes + o->bytes) > max_bytes)) {
dout(2) << caller << " waiting: "
- << op_queue_len << " > " << max_ops << " ops || "
- << op_queue_bytes << " > " << max_bytes << dendl;
+ << op_queue_len + 1 << " > " << max_ops << " ops || "
+ << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
op_tp.wait(op_throttle_cond);
}
+
+ op_queue_len++;
+ op_queue_bytes += o->bytes;
+}
+
+void FileStore::_op_queue_release_throttle(Op *o)
+{
+ // Called with op_tp lock!
+ op_queue_len--;
+ op_queue_bytes -= o->bytes;
+ op_throttle_cond.Signal();
}
void FileStore::_do_op(OpSequencer *osr)
osr->apply_lock.Unlock(); // locked in _do_op
// called with tp lock held
- op_queue_len--;
- op_queue_bytes -= o->bytes;
- op_throttle_cond.Signal();
+ _op_queue_release_throttle(o);
if (logger) {
logger->inc(l_os_readable_ops);
struct C_JournaledAhead : public Context {
FileStore *fs;
FileStore::OpSequencer *osr;
- uint64_t op;
- list<ObjectStore::Transaction*> tls;
- Context *onreadable, *onreadable_sync;
+ FileStore::Op *o;
Context *ondisk;
- C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, uint64_t o, list<ObjectStore::Transaction*>& t,
- Context *onr, Context *ond, Context *onrsync) :
- fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
+ C_JournaledAhead(FileStore *f, FileStore::OpSequencer *os, FileStore::Op *o, Context *ondisk):
+ fs(f), osr(os), o(o), ondisk(ondisk) { }
void finish(int r) {
- fs->_journaled_ahead(osr, op, tls, onreadable, ondisk, onreadable_sync);
+ fs->_journaled_ahead(osr, o, ondisk);
}
};
//logger->inc(l_os_in_bytes, 1);
}
- if (journal && journal->is_writeable()) {
+ if (journal && journal->is_writeable() && !g_conf.filestore_journal_trailing) {
+ Op *o = build_op(tls, onreadable, onreadable_sync);
+ op_queue_reserve_throttle(o);
+ journal->throttle();
+ o->op = op_submit_start();
if (g_conf.filestore_journal_parallel) {
-
- // FIXME: these throttle blocks can build up many threads, and
- // then let them all (too many!) through when some space is
- // available.
-
- journal->throttle(); // make sure we're not ahead of the jouranl
- op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
-
- uint64_t op = op_submit_start();
- dout(5) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+ dout(5) << "queue_transactions (parallel) " << o->op << " " << tls << dendl;
- _op_journal_transactions(tls, op, ondisk);
+ _op_journal_transactions(tls, o->op, ondisk);
// queue inside journal lock, to preserve ordering
- queue_op(osr, op, tls, onreadable, onreadable_sync);
+ queue_op(osr, o);
+ } else if (g_conf.filestore_journal_writeahead) {
+ dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
- op_submit_finish(op);
- return 0;
- }
- else if (g_conf.filestore_journal_writeahead) {
-
- journal->throttle(); // make sure we're not ahead of the journal
- op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
-
- uint64_t op = op_submit_start();
- dout(5) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
- osr->queue_journal(op);
- _op_journal_transactions(tls, op,
- new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
- op_submit_finish(op);
- return 0;
+ osr->queue_journal(o->op);
+
+ _op_journal_transactions(tls, o->op, new C_JournaledAhead(this, osr, o, ondisk));
+ } else {
+ assert(0);
}
+ op_submit_finish(o->op);
+ return 0;
}
uint64_t op = op_submit_start();
return r;
}
-void FileStore::_journaled_ahead(OpSequencer *osr, uint64_t op,
- list<Transaction*> &tls,
- Context *onreadable, Context *ondisk,
- Context *onreadable_sync)
+void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
{
- dout(5) << "_journaled_ahead " << op << " " << tls << dendl;
-
- op_queue_throttle();
-
+ dout(5) << "_journaled_ahead " << o->op << " " << o->tls << dendl;
// this should queue in order because the journal does it's completions in order.
journal_lock.Lock();
- queue_op(osr, op, tls, onreadable, onreadable_sync);
+ queue_op(osr, o);
journal_lock.Unlock();
osr->dequeue_journal();