flusher_thread.create();
op_finisher.start();
+ if (journal && g_conf.filestore_journal_writeahead)
+ journal->set_wait_on_full(true);
// is this btrfs?
Transaction empty;
op_lock.Unlock();
}
+struct C_JournaledAhead : public Context {
+ FileStore *fs;
+ __u64 op;
+ list<ObjectStore::Transaction*> tls;
+ Context *onreadable;
+ Context *onjournal;
+ Context *ondisk;
+
+ C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
+ Context *onr, Context *onj, Context *ond) :
+ fs(f), op(o), tls(t), onreadable(onr), onjournal(onj), ondisk(ond) { }
+ void finish(int r) {
+ fs->_journaled_ahead(op, tls, onreadable, onjournal, ondisk);
+ }
+};
int FileStore::queue_transactions(list<Transaction*> &tls,
Context *onreadable,
Context *onjournal,
Context *ondisk)
{
- __u64 op;
+ if (journal && journal->is_writeable()) {
+ if (g_conf.filestore_journal_parallel) {
+ __u64 op = op_journal_start(0);
+ dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
+
+ journal_transactions(tls, op, onjournal);
+
+ // queue inside journal lock, to preserve ordering
+ queue_op(op, tls, onreadable, ondisk);
+
+ op_journal_finish();
+ return 0;
+ }
+ else if (g_conf.filestore_journal_writeahead) {
+ __u64 op = op_journal_start(0);
+ dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
+ journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, onjournal, ondisk));
+ op_journal_finish();
+ return 0;
+ }
+ }
- op = op_journal_start(0);
- journal_transactions(tls, op, onjournal);
+ __u64 op_seq = op_apply_start(0, ondisk);
+ dout(10) << "queue_transactions (trailing journal) " << op_seq << " " << tls << dendl;
+ int r = do_transactions(tls, op_seq);
+ op_apply_finish();
+ op_finisher.queue(onreadable, r);
+
+ if (r >= 0) {
+ op_journal_start(op_seq);
+ journal_transactions(tls, op_seq, onjournal);
+ op_journal_finish();
+ } else {
+ delete onjournal;
+ delete ondisk;
+ }
+ return r;
+}
- // queue inside journal lock, to preserve ordering
+void FileStore::_journaled_ahead(__u64 op,
+ list<Transaction*> &tls,
+ Context *onreadable,
+ Context *onjournal,
+ Context *ondisk)
+{
+ dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
+ // this should queue in order because the journal does it's completions in order.
queue_op(op, tls, onreadable, ondisk);
-
- op_journal_finish();
-
- return 0;
+ if (onjournal) {
+ onjournal->finish(0);
+ delete onjournal;
+ }
}
int FileStore::do_transactions(list<Transaction*> &tls, __u64 op_seq)
{
int r = 0;
- if (g_conf.filestore_journal_parallel) {
+ if (journal && journal->is_writeable() &&
+ (g_conf.filestore_journal_parallel || g_conf.filestore_journal_writeahead)) {
// use op pool
Cond my_cond;
Mutex my_lock("FileStore::apply_transaction::my_lock");