- Combine journal_lock and lock.
- Move throttling outside of the lock (this fixes potential deadlock in
parallel journal mode)
- Make interface nomenclature a bit more helpful
Signed-off-by: Sage Weil <sage@newdream.net>
// mark apply start _now_, because we need to drain the entire apply
// queue during commit in order to put the store in a consistent
// state.
- op_apply_start(op_seq);
+ _op_apply_start(op_seq);
Op *o = new Op;
o->op = op_seq;
journal->throttle(); // make sure we're not ahead of the jouranl
op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
- uint64_t op = op_journal_start(0);
+ uint64_t op = op_submit_start();
dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
- journal_transactions(tls, op, ondisk);
+ _op_journal_transactions(tls, op, ondisk);
// queue inside journal lock, to preserve ordering
queue_op(osr, op, tls, onreadable, onreadable_sync);
- op_journal_finish();
+ op_submit_finish();
return 0;
}
else if (g_conf.filestore_journal_writeahead) {
journal->throttle(); // make sure we're not ahead of the journal
op_queue_throttle(); // make sure the journal isn't getting ahead of our op queue.
- uint64_t op = op_journal_start(0);
+ uint64_t op = op_submit_start();
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
- journal_transactions(tls, op,
- new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
- op_journal_finish();
+ _op_journal_transactions(tls, op,
+ new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
+ op_submit_finish();
return 0;
}
}
op_apply_finish(op_seq);
if (r >= 0) {
- op_journal_start(op_seq);
- journal_transactions(tls, op_seq, ondisk);
- op_journal_finish();
+ op_journal_transactions(tls, op_seq, ondisk);
} else {
delete ondisk;
}
op_queue_throttle();
// this should queue in order because the journal does it's completions in order.
+ journal_lock.Lock();
queue_op(osr, op, tls, onreadable, onreadable_sync);
+ journal_lock.Unlock();
// do ondisk completions async, to prevent any onreadable_sync completions
// getting blocked behind an ondisk completion.
uint64_t JournalingObjectStore::op_apply_start(uint64_t op)
{
- lock.Lock();
+ Mutex::Locker l(journal_lock);
+ return _op_apply_start(op);
+}
+
+uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
+{
+ assert(journal_lock.is_locked());
while (blocked) {
dout(10) << "op_apply_start blocked" << dendl;
- cond.Wait(lock);
+ cond.Wait(journal_lock);
}
open_ops++;
op = ++op_seq;
dout(10) << "op_apply_start " << op << dendl;
- lock.Unlock();
return op;
}
void JournalingObjectStore::op_apply_finish(uint64_t op)
{
dout(10) << "op_apply_finish" << dendl;
- lock.Lock();
+ journal_lock.Lock();
if (--open_ops == 0)
cond.Signal();
if (op > applied_seq)
applied_seq = op;
- lock.Unlock();
+ journal_lock.Unlock();
}
-uint64_t JournalingObjectStore::op_journal_start(uint64_t op)
+uint64_t JournalingObjectStore::op_submit_start()
{
journal_lock.Lock();
- if (!op) {
- lock.Lock();
- op = ++op_seq;
- lock.Unlock();
- }
- return op;
+ return ++op_seq;
}
-void JournalingObjectStore::op_journal_finish()
+void JournalingObjectStore::op_submit_finish()
{
journal_lock.Unlock();
}
bool JournalingObjectStore::commit_start()
{
- // suspend new ops...
- Mutex::Locker l(lock);
+ bool ret = false;
+ journal_lock.Lock();
dout(10) << "commit_start op_seq " << op_seq
<< ", applied_seq " << applied_seq
<< ", committed_seq " << committed_seq << dendl;
blocked = true;
while (open_ops > 0) {
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
- cond.Wait(lock);
+ cond.Wait(journal_lock);
}
if (applied_seq == committed_seq) {
blocked = false;
cond.Signal();
assert(commit_waiters.empty());
- return false;
+ goto out;
}
// we can _only_ read applied_seq here because open_ops == 0 (we've
committing_seq = applied_seq;
dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
- return true;
+ ret = true;
+
+ out:
+ journal_lock.Unlock();
+ return ret;
}
void JournalingObjectStore::commit_started()
{
- Mutex::Locker l(lock);
+ Mutex::Locker l(journal_lock);
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
void JournalingObjectStore::commit_finish()
{
- Mutex::Locker l(lock);
+ Mutex::Locker l(journal_lock);
dout(10) << "commit_finish thru " << committing_seq << dendl;
if (journal)
}
}
-void JournalingObjectStore::journal_transaction(ObjectStore::Transaction& t, uint64_t op,
- Context *onjournal)
+void JournalingObjectStore::op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
+ Context *onjournal)
{
- Mutex::Locker l(lock);
- dout(10) << "journal_transaction " << op << dendl;
- if (journal && journal->is_writeable()) {
- bufferlist tbl;
- t.encode(tbl);
-
- int alignment = -1;
- if ((int)t.get_data_length() >= g_conf.journal_align_min_size)
- alignment = t.get_data_alignment();
-
- journal->submit_entry(op, tbl, alignment, onjournal);
- } else if (onjournal)
- commit_waiters[op].push_back(onjournal);
+ Mutex::Locker l(journal_lock);
+ _op_journal_transactions(tls, op, onjournal);
}
-void JournalingObjectStore::journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
- Context *onjournal)
+void JournalingObjectStore::_op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
+ Context *onjournal)
{
- Mutex::Locker l(lock);
- dout(10) << "journal_transactions " << op << dendl;
+ assert(journal_lock.is_locked());
+ dout(10) << "op_journal_transactions " << op << dendl;
if (journal && journal->is_writeable()) {
bufferlist tbl;
Cond cond;
Mutex journal_lock;
- Mutex lock;
protected:
void journal_start();
int journal_replay(uint64_t fs_op_seq);
// --
+ uint64_t op_submit_start();
+ void op_submit_finish();
+
uint64_t op_apply_start(uint64_t op);
+ uint64_t _op_apply_start(uint64_t op);
void op_apply_finish(uint64_t op);
- uint64_t op_journal_start(uint64_t op);
- void op_journal_finish();
- void journal_transaction(ObjectStore::Transaction& t, uint64_t op, Context *onjournal);
- void journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
+ void op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
+ void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
bool commit_start();
void commit_started(); // allow new ops (underlying fs should now be committing all prior ops)
applied_seq(0), committing_seq(0), committed_seq(0),
open_ops(0), blocked(false),
journal(NULL),
- journal_lock("JournalingObjectStore::journal_lock"),
- lock("JournalingObjectStore::lock") { }
+ journal_lock("JournalingObjectStore::journal_lock") { }
};