} else {
osr = new OpSequencer;
osr->parent = posr;
- posr->p = (void *)osr;
+ posr->p = osr;
dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
}
- osr->q.push_back(o);
+ osr->queue(o);
op_queue_len++;
op_queue_bytes += bytes;
void FileStore::_do_op(OpSequencer *osr)
{
- osr->lock.Lock();
- Op *o = osr->q.front();
+ osr->apply_lock.Lock();
+ Op *o = osr->peek_queue();
dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
void FileStore::_finish_op(OpSequencer *osr)
{
- Op *o = osr->q.front();
- osr->q.pop_front();
+ Op *o = osr->dequeue();
- if (osr->q.empty()) {
- dout(10) << "_finish_op last op " << o << " on osr " << osr << "/" << osr->parent << dendl;
- osr->parent->p = NULL;
- osr->lock.Unlock(); // locked in _do_op
- delete osr;
- } else {
- dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << dendl; // << " q now " << osr->q << dendl;
- osr->lock.Unlock(); // locked in _do_op
- }
+ dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << dendl;
+ osr->apply_lock.Unlock(); // locked in _do_op
// called with tp lock held
op_queue_len--;
Context *onreadable, *onreadable_sync;
uint64_t ops, bytes;
};
- struct OpSequencer {
- Sequencer *parent;
+ class OpSequencer : public Sequencer_impl {
+ Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
- Mutex lock;
- OpSequencer() : lock("FileStore::OpSequencer::lock", false, false) {}
+ Cond cond;
+ public:
+ Sequencer *parent;
+ Mutex apply_lock; // for apply mutual exclusion
+
+ void queue(Op *o) {
+ Mutex::Locker l(qlock);
+ q.push_back(o);
+ }
+ Op *peek_queue() {
+ assert(apply_lock.is_locked());
+ return q.front();
+ }
+ Op *dequeue() {
+ assert(apply_lock.is_locked());
+ Mutex::Locker l(qlock);
+ Op *o = q.front();
+ q.pop_front();
+ cond.Signal();
+ return o;
+ }
+ void flush() {
+ Mutex::Locker l(qlock);
+ if (!q.empty()) {
+ uint64_t seq = q.back()->op;
+ while (!q.empty() && q.front()->op <= seq)
+ cond.Wait(qlock);
+ }
+ }
+
+ OpSequencer() : qlock("FileStore::OpSequencer::qlock", false, false),
+ apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
+ ~OpSequencer() {
+ assert(q.empty());
+ }
};
Sequencer default_osr;
deque<OpSequencer*> op_queue;
};
+ struct Sequencer_impl {
+ virtual void flush() = 0;
+ virtual ~Sequencer_impl() {}
+ };
struct Sequencer {
- void *p;
+ Sequencer_impl *p;
Sequencer() : p(NULL) {}
~Sequencer() {
- assert(p == NULL);
+ delete p;
+ }
+ void flush() {
+ if (p)
+ p->flush();
}
};