void FileStore::_finish_op(OpSequencer *osr)
{
- Op *o = osr->dequeue();
+ list<Context*> to_queue;
+ Op *o = osr->dequeue(&to_queue);
dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
osr->apply_lock.Unlock(); // locked in _do_op
if (o->onreadable) {
op_finisher.queue(o->onreadable);
}
+ op_finisher.queue(to_queue);
delete o;
}
// this should queue in order because the journal does it's completions in order.
queue_op(osr, o);
- osr->dequeue_journal();
+ list<Context*> to_queue;
+ osr->dequeue_journal(&to_queue);
// do ondisk completions async, to prevent any onreadable_sync completions
// getting blocked behind an ondisk completion.
dout(10) << " queueing ondisk " << ondisk << dendl;
ondisk_finisher.queue(ondisk);
}
+ ondisk_finisher.queue(to_queue);
}
int FileStore::_do_transactions(
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
list<uint64_t> jq;
+ list<pair<uint64_t, Context*> > flush_commit_waiters;
Cond cond;
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
+ /// get_max_uncompleted
+ bool _get_max_uncompleted(
+ uint64_t *seq ///< [out] max uncompleted seq
+ ) {
+ assert(qlock.is_locked());
+ assert(seq);
+ *seq = 0;
+ if (q.empty() && jq.empty())
+ return true;
+
+ if (!q.empty())
+ *seq = q.back()->op;
+ if (!jq.empty() && jq.back() > *seq)
+ *seq = jq.back();
+
+ return false;
+ } /// @returns true if both queues are empty
+
+ /// get_min_uncompleted
+ bool _get_min_uncompleted(
+ uint64_t *seq ///< [out] min uncompleted seq
+ ) {
+ assert(qlock.is_locked());
+ assert(seq);
+ *seq = 0;
+ if (q.empty() && jq.empty())
+ return true;
+
+ if (!q.empty())
+ *seq = q.front()->op;
+ if (!jq.empty() && jq.front() < *seq)
+ *seq = jq.front();
+
+ return false;
+ } /// @returns true if both queues are empty
+
+ void _wake_flush_waiters(list<Context*> *to_queue) {
+ uint64_t seq;
+ if (_get_min_uncompleted(&seq))
+ seq = -1;
+
+ for (list<pair<uint64_t, Context*> >::iterator i =
+ flush_commit_waiters.begin();
+ i != flush_commit_waiters.end() && i->first < seq;
+ flush_commit_waiters.erase(i++)) {
+ to_queue->push_back(i->second);
+ }
+ }
+
void queue_journal(uint64_t s) {
Mutex::Locker l(qlock);
jq.push_back(s);
}
- void dequeue_journal() {
+ void dequeue_journal(list<Context*> *to_queue) {
Mutex::Locker l(qlock);
jq.pop_front();
cond.Signal();
+ _wake_flush_waiters(to_queue);
}
void queue(Op *o) {
Mutex::Locker l(qlock);
assert(apply_lock.is_locked());
return q.front();
}
- Op *dequeue() {
+
+ Op *dequeue(list<Context*> *to_queue) {
+ assert(to_queue);
assert(apply_lock.is_locked());
Mutex::Locker l(qlock);
Op *o = q.front();
q.pop_front();
cond.Signal();
+
+ _wake_flush_waiters(to_queue);
return o;
}
+
void flush() {
Mutex::Locker l(qlock);
while (g_conf->filestore_blackhole)
cond.Wait(qlock); // wait forever
+
// get max for journal _or_ op queues
uint64_t seq = 0;
if (!q.empty())
cond.Wait(qlock);
}
}
+ bool flush_commit(Context *c) {
+ Mutex::Locker l(qlock);
+ uint64_t seq = 0;
+ if (_get_max_uncompleted(&seq)) {
+ delete c;
+ return true;
+ } else {
+ flush_commit_waiters.push_back(make_pair(seq, c));
+ return false;
+ }
+ }
OpSequencer()
: qlock("FileStore::OpSequencer::qlock", false, false),
void KeyValueStore::_finish_op(OpSequencer *osr)
{
- Op *o = osr->dequeue();
+ list<Context*> to_queue;
+ Op *o = osr->dequeue(&to_queue);
dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
osr->apply_lock.Unlock(); // locked in _do_op
o->onreadable_sync->complete(0);
}
op_finisher.queue(o->onreadable);
+ op_finisher.queue(to_queue);
delete o;
}
list<Op*> q;
list<uint64_t> jq;
Cond cond;
+ list<pair<uint64_t, Context*> > flush_commit_waiters;
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
+
+ /// get_max_uncompleted
+ bool _get_max_uncompleted(
+ uint64_t *seq ///< [out] max uncompleted seq
+ ) {
+ assert(qlock.is_locked());
+ assert(seq);
+ *seq = 0;
+ if (q.empty()) {
+ return true;
+ } else {
+ *seq = q.back()->op;
+ return false;
+ }
+ } /// @returns true if the queue is empty
+
+ /// get_min_uncompleted
+ bool _get_min_uncompleted(
+ uint64_t *seq ///< [out] min uncompleted seq
+ ) {
+ assert(qlock.is_locked());
+ assert(seq);
+ *seq = 0;
+ if (q.empty()) {
+ return true;
+ } else {
+ *seq = q.front()->op;
+ return false;
+ }
+ } /// @returns true if both queues are empty
+
+ void _wake_flush_waiters(list<Context*> *to_queue) {
+ uint64_t seq;
+ if (_get_min_uncompleted(&seq))
+ seq = -1;
+
+ for (list<pair<uint64_t, Context*> >::iterator i =
+ flush_commit_waiters.begin();
+ i != flush_commit_waiters.end() && i->first < seq;
+ flush_commit_waiters.erase(i++)) {
+ to_queue->push_back(i->second);
+ }
+ }
void queue(Op *o) {
Mutex::Locker l(qlock);
assert(apply_lock.is_locked());
return q.front();
}
- Op *dequeue() {
+
+ Op *dequeue(list<Context*> *to_queue) {
+ assert(to_queue);
assert(apply_lock.is_locked());
Mutex::Locker l(qlock);
Op *o = q.front();
q.pop_front();
cond.Signal();
+
+ _wake_flush_waiters(to_queue);
return o;
}
+
void flush() {
Mutex::Locker l(qlock);
cond.Wait(qlock);
}
}
+ bool flush_commit(Context *c) {
+ Mutex::Locker l(qlock);
+ uint64_t seq = 0;
+ if (_get_max_uncompleted(&seq)) {
+ delete c;
+ return true;
+ } else {
+ flush_commit_waiters.push_back(make_pair(seq, c));
+ return false;
+ }
+ }
OpSequencer()
: qlock("KeyValueStore::OpSequencer::qlock", false, false),
*/
struct Sequencer_impl {
virtual void flush() = 0;
+
+ /**
+ * Async flush_commit
+ *
+ * There are two cases:
+ * 1) sequencer is currently idle: the method returns true and
+ * c is deleted
+ * 2) sequencer is not idle: the method returns false and c is
+ * called asyncronously with a value of 0 once all transactions
+ * queued on this sequencer prior to the call have been applied
+ * and committed.
+ */
+ virtual bool flush_commit(
+ Context *c ///< [in] context to call upon flush/commit
+ ) = 0; ///< @return true if idle, false otherwise
+
virtual ~Sequencer_impl() {}
};
if (p)
p->flush();
}
+
+ /// @see Sequencer_impl::flush_commit()
+ bool flush_commit(Context *c) {
+ if (!p) {
+ delete c;
+ return true;
+ } else {
+ return p->flush_commit(c);
+ }
+ }
};
/*********************************