/// -----------------------------
-void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
+void FileStore::queue_op(Sequencer *posr, __u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
{
__u64 bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
o->bytes = bytes;
op_tp.lock();
+
+ OpSequencer *osr;
+ if (!posr)
+ posr = &default_osr;
+ if (posr->p) {
+ osr = (OpSequencer *)posr->p;
+ dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << " w/ q " << osr->q << dendl;
+ } else {
+ osr = new OpSequencer;
+ osr->parent = posr;
+ posr->p = (void *)osr;
+ dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
+ }
+ osr->q.push_back(o);
+
while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) ||
(g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) {
dout(2) << "queue_op " << o << " throttle: "
op_tp.unlock();
dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" << dendl;
- op_wq.queue(o);
+ op_wq.queue(osr);
}
-void FileStore::_do_op(Op *o)
+void FileStore::_do_op(OpSequencer *osr)
{
- dout(10) << "_do_op " << o << " " << o->op << " start" << dendl;
+ osr->lock.Lock();
+ Op *o = osr->q.front();
+
+ dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
op_apply_start(o->op);
int r = do_transactions(o->tls, o->op);
op_apply_finish();
*/
}
-void FileStore::_finish_op(Op *o)
+void FileStore::_finish_op(OpSequencer *osr)
{
+ Op *o = osr->q.front();
+ osr->q.pop_front();
+
+ if (osr->q.empty()) {
+ dout(10) << "_finish_op last op " << o << " on osr " << osr << "/" << osr->parent << dendl;
+ osr->parent->p = NULL;
+ osr->lock.Unlock(); // locked in _do_op
+ delete osr;
+ } else {
+ dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << " q now " << osr->q << dendl;
+ osr->lock.Unlock(); // locked in _do_op
+ }
+
// called with tp lock held
op_queue_len--;
op_queue_bytes -= o->bytes;
struct C_JournaledAhead : public Context {
FileStore *fs;
+ ObjectStore::Sequencer *osr;
__u64 op;
list<ObjectStore::Transaction*> tls;
Context *onreadable, *onreadable_sync;
Context *ondisk;
- C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
+ C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, __u64 o, list<ObjectStore::Transaction*>& t,
Context *onr, Context *ond, Context *onrsync) :
- fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
+ fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
void finish(int r) {
- fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync);
+ fs->_journaled_ahead(osr, op, tls, onreadable, ondisk, onreadable_sync);
}
};
-int FileStore::queue_transaction(Transaction *t)
+int FileStore::queue_transaction(Sequencer *osr, Transaction *t)
{
list<Transaction*> tls;
tls.push_back(t);
- return queue_transactions(tls, new C_DeleteTransaction(t));
+ return queue_transactions(osr, tls, new C_DeleteTransaction(t));
}
-int FileStore::queue_transactions(list<Transaction*> &tls,
+int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
{
journal_transactions(tls, op, ondisk);
// queue inside journal lock, to preserve ordering
- queue_op(op, tls, onreadable, onreadable_sync);
+ queue_op(osr, op, tls, onreadable, onreadable_sync);
op_journal_finish();
return 0;
__u64 op = op_journal_start(0);
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
journal_transactions(tls, op,
- new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync));
+ new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
op_journal_finish();
return 0;
}
return r;
}
-void FileStore::_journaled_ahead(__u64 op,
+void FileStore::_journaled_ahead(Sequencer *osr, __u64 op,
list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
{
dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
// this should queue in order because the journal does it's completions in order.
- queue_op(op, tls, onreadable, onreadable_sync);
+ queue_op(osr, op, tls, onreadable, onreadable_sync);
// do ondisk completions async, to prevent any onreadable_sync completions
// getting blocked behind an ondisk completion.
C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
dout(10) << "apply queued" << dendl;
- queue_transactions(tls, onreadable, ondisk);
+ queue_transactions(NULL, tls, onreadable, ondisk);
my_lock.Lock();
while (!done)
Context *onreadable, *onreadable_sync;
__u64 ops, bytes;
};
- deque<Op*> op_queue;
+ struct OpSequencer {
+ Sequencer *parent;
+ list<Op*> q;
+ Mutex lock;
+ OpSequencer() : lock("FileStore::OpSequencer::lock", false, false) {}
+ };
+ Sequencer default_osr;
+ deque<OpSequencer*> op_queue;
__u64 op_queue_len, op_queue_bytes;
Cond op_throttle_cond;
Finisher op_finisher;
map<__u64, pair<Context*,Context*> > finish_queue;
ThreadPool op_tp;
- struct OpWQ : public ThreadPool::WorkQueue<Op> {
+ struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
FileStore *store;
- OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<Op>("FileStore::OpWQ", tp), store(fs) {}
+ OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", tp), store(fs) {}
- bool _enqueue(Op *o) {
- store->op_queue.push_back(o);
+ bool _enqueue(OpSequencer *osr) {
+ store->op_queue.push_back(osr);
store->op_queue_len++;
- store->op_queue_bytes += o->bytes;
+ store->op_queue_bytes += osr->q.back()->bytes;
return true;
}
- void _dequeue(Op *o) {
+ void _dequeue(OpSequencer *o) {
assert(0);
}
bool _empty() {
return store->op_queue.empty();
}
- Op *_dequeue() {
+ OpSequencer *_dequeue() {
if (store->op_queue.empty())
return NULL;
- Op *o = store->op_queue.front();
+ OpSequencer *osr = store->op_queue.front();
store->op_queue.pop_front();
- return o;
+ return osr;
}
- void _process(Op *o) {
- store->_do_op(o);
+ void _process(OpSequencer *osr) {
+ store->_do_op(osr);
}
- void _process_finish(Op *o) {
- store->_finish_op(o);
+ void _process_finish(OpSequencer *osr) {
+ store->_finish_op(osr);
}
void _clear() {
assert(store->op_queue.empty());
}
} op_wq;
- void _do_op(Op *o);
- void _finish_op(Op *o);
- void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
- void _journaled_ahead(__u64 op, list<Transaction*> &tls,
+ void _do_op(OpSequencer *o);
+ void _finish_op(OpSequencer *o);
+ void queue_op(Sequencer *osr, __u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
+ void _journaled_ahead(Sequencer *osr, __u64 op, list<Transaction*> &tls,
Context *onreadable, Context *ondisk, Context *onreadable_sync);
friend class C_JournaledAhead;
void _transaction_finish(int id);
unsigned _do_transaction(Transaction& t);
- int queue_transaction(Transaction* t);
- int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+ int queue_transaction(Sequencer *osr, Transaction* t);
+ int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0);
// ------------------
map<int,int> free_extent_dist_sum; // powers of two
};
+
+ struct Sequencer {
+ void *p;
+ Sequencer() : p(NULL) {}
+ ~Sequencer() {
+ assert(p == NULL);
+ }
+ };
/*********************************
virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0;
virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
- virtual int queue_transaction(Transaction* t) = 0;
- virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0,
+ virtual int queue_transaction(Sequencer *osr, Transaction* t) = 0;
+ virtual int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0) {
list<Transaction*> tls;
tls.push_back(t);
- return queue_transactions(tls, onreadable, ondisk, onreadable_sync);
+ return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync);
}
- virtual int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+ virtual int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0) = 0;