void FileStore::queue_op(OpSequencer *osr, Op *o)
{
- assert(journal_lock.is_locked());
-
// mark apply start _now_, because we need to drain the entire apply
// queue during commit in order to put the store in a consistent
// state.
- _op_apply_start(o->op);
+ op_apply_start(o->op);
op_tp.lock();
osr->queue(o);
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
op_queue_reserve_throttle(o);
journal->throttle();
- o->op = op_submit_start();
+ uint64_t op_num = submit_manager.op_submit_start();
+ o->op = op_num;
if (m_filestore_do_dump)
dump_transactions(o->tls, o->op, osr);
_op_journal_transactions(o->tls, o->op, ondisk, osd_op);
- // queue inside journal lock, to preserve ordering
+ // queue inside submit_manager op submission lock
queue_op(osr, o);
} else if (m_filestore_journal_writeahead) {
dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
} else {
assert(0);
}
- op_submit_finish(o->op);
+ submit_manager.op_submit_finish(op_num);
return 0;
}
- uint64_t op = op_submit_start();
+ uint64_t op = submit_manager.op_submit_start();
dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
if (m_filestore_do_dump)
dump_transactions(tls, op, osr);
- _op_apply_start(op);
+ op_apply_start(op);
int r = do_transactions(tls, op);
if (r >= 0) {
}
op_finisher.queue(onreadable, r);
- op_submit_finish(op);
+ submit_manager.op_submit_finish(op);
op_apply_finish(op);
return r;
dout(5) << "_journaled_ahead " << o << " seq " << o->op << " " << *osr << " " << o->tls << dendl;
// this should queue in order because the journal does it's completions in order.
- journal_lock.Lock();
queue_op(osr, o);
- journal_lock.Unlock();
osr->dequeue_journal();
}
journal_lock.Lock();
- op_seq = fs_op_seq;
+ uint64_t op_seq = fs_op_seq;
committed_seq = fs_op_seq;
committing_seq = fs_op_seq;
applied_seq = fs_op_seq;
replaying = false;
+ submit_manager.set_op_seq(op_seq);
+
journal_lock.Unlock();
// done reading, make writeable.
journal_lock.Unlock();
}
-uint64_t JournalingObjectStore::op_submit_start()
+uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
{
- journal_lock.Lock();
+ lock.Lock();
uint64_t op = ++op_seq;
dout(10) << "op_submit_start " << op << dendl;
ops_submitting.push_back(op);
return op;
}
-void JournalingObjectStore::op_submit_finish(uint64_t op)
+void JournalingObjectStore::SubmitManager::op_submit_finish(uint64_t op)
{
dout(10) << "op_submit_finish " << op << dendl;
if (op != ops_submitting.front()) {
- dout(0) << "op_submit_finish " << op << " expected " << ops_submitting.front()
+ dout(0) << "op_submit_finish " << op << " expected "
+ << ops_submitting.front()
<< ", OUT OF ORDER" << dendl;
assert(0 == "out of order op_submit_finish");
}
ops_submitting.pop_front();
- journal_lock.Unlock();
+ lock.Unlock();
}
bool ret = false;
journal_lock.Lock();
- dout(10) << "commit_start op_seq " << op_seq
- << ", applied_seq " << applied_seq
- << ", committed_seq " << committed_seq << dendl;
+ dout(10) << "commit_start op_seq " << submit_manager.get_op_seq()
+ << ", applied_seq " << applied_seq
+ << ", committed_seq " << committed_seq << dendl;
blocked = true;
while (open_ops > 0) {
dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal, TrackedOpRef osd_op)
{
- assert(journal_lock.is_locked());
dout(10) << "op_journal_transactions " << op << " " << tls << dendl;
if (journal && journal->is_writeable()) {
class JournalingObjectStore : public ObjectStore {
protected:
- uint64_t op_seq, applied_seq;
+ uint64_t applied_seq;
uint64_t committing_seq, committed_seq;
map<version_t, vector<Context*> > commit_waiters;
+ class SubmitManager {
+ Mutex lock;
+ uint64_t op_seq;
+ list<uint64_t> ops_submitting;
+ public:
+ SubmitManager() :
+ lock("JOS::SubmitManager::lock"),
+ op_seq(0)
+ {}
+ uint64_t op_submit_start();
+ void op_submit_finish(uint64_t op);
+ void set_op_seq(uint64_t seq) {
+ Mutex::Locker l(lock);
+ seq = op_seq;
+ }
+ uint64_t get_op_seq() {
+ return op_seq;
+ }
+ } submit_manager;
+
int open_ops;
bool blocked;
}
public:
- JournalingObjectStore() : op_seq(0),
- applied_seq(0), committing_seq(0), committed_seq(0),
+ JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0),
open_ops(0), blocked(false),
journal(NULL), finisher(g_ceph_context),
journal_lock("JournalingObjectStore::journal_lock"),