]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/: add async flush_commit() method to Sequencer and implementations
authorSamuel Just <sam.just@inktank.com>
Mon, 9 Jun 2014 18:00:32 +0000 (11:00 -0700)
committerSamuel Just <sam.just@inktank.com>
Fri, 27 Jun 2014 17:50:50 +0000 (10:50 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileStore.cc
src/os/FileStore.h
src/os/KeyValueStore.cc
src/os/KeyValueStore.h
src/os/ObjectStore.h

index 9d6252ca4ca9c41a7f593abab2e0ff563a866151..31bd6a6045453896e59bb140baad2b2e7cc58a2d 100644 (file)
@@ -1703,7 +1703,8 @@ void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
 
 void FileStore::_finish_op(OpSequencer *osr)
 {
-  Op *o = osr->dequeue();
+  list<Context*> to_queue;
+  Op *o = osr->dequeue(&to_queue);
   
   dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
   osr->apply_lock.Unlock();  // locked in _do_op
@@ -1721,6 +1722,7 @@ void FileStore::_finish_op(OpSequencer *osr)
   if (o->onreadable) {
     op_finisher.queue(o->onreadable);
   }
+  op_finisher.queue(to_queue);
   delete o;
 }
 
@@ -1836,7 +1838,8 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
   // this should queue in order because the journal does it's completions in order.
   queue_op(osr, o);
 
-  osr->dequeue_journal();
+  list<Context*> to_queue;
+  osr->dequeue_journal(&to_queue);
 
   // do ondisk completions async, to prevent any onreadable_sync completions
   // getting blocked behind an ondisk completion.
@@ -1844,6 +1847,7 @@ void FileStore::_journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk)
     dout(10) << " queueing ondisk " << ondisk << dendl;
     ondisk_finisher.queue(ondisk);
   }
+  ondisk_finisher.queue(to_queue);
 }
 
 int FileStore::_do_transactions(
index 4c9ffdba2f3e87915e1f7b7474cb87b30954d70d..3fcd89a265091ade7f739d8543d8c8a0d558bd8b 100644 (file)
@@ -193,19 +193,70 @@ private:
     Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
     list<Op*> q;
     list<uint64_t> jq;
+    list<pair<uint64_t, Context*> > flush_commit_waiters;
     Cond cond;
   public:
     Sequencer *parent;
     Mutex apply_lock;  // for apply mutual exclusion
     
+    /// get_max_uncompleted
+    bool _get_max_uncompleted(
+      uint64_t *seq ///< [out] max uncompleted seq
+      ) {
+      assert(qlock.is_locked());
+      assert(seq);
+      *seq = 0;
+      if (q.empty() && jq.empty())
+       return true;
+
+      if (!q.empty())
+       *seq = q.back()->op;
+      if (!jq.empty() && jq.back() > *seq)
+       *seq = jq.back();
+
+      return false;
+    } /// @returns true if both queues are empty
+
+    /// get_min_uncompleted
+    bool _get_min_uncompleted(
+      uint64_t *seq ///< [out] min uncompleted seq
+      ) {
+      assert(qlock.is_locked());
+      assert(seq);
+      *seq = 0;
+      if (q.empty() && jq.empty())
+       return true;
+
+      if (!q.empty())
+       *seq = q.front()->op;
+      if (!jq.empty() && jq.front() < *seq)
+       *seq = jq.front();
+
+      return false;
+    } /// @returns true if both queues are empty
+
+    void _wake_flush_waiters(list<Context*> *to_queue) {
+      uint64_t seq;
+      if (_get_min_uncompleted(&seq))
+       seq = -1;
+
+      for (list<pair<uint64_t, Context*> >::iterator i =
+            flush_commit_waiters.begin();
+          i != flush_commit_waiters.end() && i->first < seq;
+          flush_commit_waiters.erase(i++)) {
+       to_queue->push_back(i->second);
+      }
+    }
+
     void queue_journal(uint64_t s) {
       Mutex::Locker l(qlock);
       jq.push_back(s);
     }
-    void dequeue_journal() {
+    void dequeue_journal(list<Context*> *to_queue) {
       Mutex::Locker l(qlock);
       jq.pop_front();
       cond.Signal();
+      _wake_flush_waiters(to_queue);
     }
     void queue(Op *o) {
       Mutex::Locker l(qlock);
@@ -215,20 +266,26 @@ private:
       assert(apply_lock.is_locked());
       return q.front();
     }
-    Op *dequeue() {
+
+    Op *dequeue(list<Context*> *to_queue) {
+      assert(to_queue);
       assert(apply_lock.is_locked());
       Mutex::Locker l(qlock);
       Op *o = q.front();
       q.pop_front();
       cond.Signal();
+
+      _wake_flush_waiters(to_queue);
       return o;
     }
+
     void flush() {
       Mutex::Locker l(qlock);
 
       while (g_conf->filestore_blackhole)
        cond.Wait(qlock);  // wait forever
 
+
       // get max for journal _or_ op queues
       uint64_t seq = 0;
       if (!q.empty())
@@ -243,6 +300,17 @@ private:
          cond.Wait(qlock);
       }
     }
+    bool flush_commit(Context *c) {
+      Mutex::Locker l(qlock);
+      uint64_t seq = 0;
+      if (_get_max_uncompleted(&seq)) {
+       delete c;
+       return true;
+      } else {
+       flush_commit_waiters.push_back(make_pair(seq, c));
+       return false;
+      }
+    }
 
     OpSequencer()
       : qlock("FileStore::OpSequencer::qlock", false, false),
index cc117fa3a6718b49e61bc0b6cb537317fe8ea149..3255ad4ed06c981e7987c86f73e7ef9218c7a9b7 100644 (file)
@@ -1085,7 +1085,8 @@ void KeyValueStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
 
 void KeyValueStore::_finish_op(OpSequencer *osr)
 {
-  Op *o = osr->dequeue();
+  list<Context*> to_queue;
+  Op *o = osr->dequeue(&to_queue);
 
   dout(10) << "_finish_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << dendl;
   osr->apply_lock.Unlock();  // locked in _do_op
@@ -1099,6 +1100,7 @@ void KeyValueStore::_finish_op(OpSequencer *osr)
     o->onreadable_sync->complete(0);
   }
   op_finisher.queue(o->onreadable);
+  op_finisher.queue(to_queue);
   delete o;
 }
 
index d7b9c0a11644aad34cb46fd948b6f314b781f35f..402b4cba92b656729336740b180d6ff299276647 100644 (file)
@@ -259,9 +259,53 @@ class KeyValueStore : public ObjectStore,
     list<Op*> q;
     list<uint64_t> jq;
     Cond cond;
+    list<pair<uint64_t, Context*> > flush_commit_waiters;
    public:
     Sequencer *parent;
     Mutex apply_lock;  // for apply mutual exclusion
+    
+    /// get_max_uncompleted
+    bool _get_max_uncompleted(
+      uint64_t *seq ///< [out] max uncompleted seq
+      ) {
+      assert(qlock.is_locked());
+      assert(seq);
+      *seq = 0;
+      if (q.empty()) {
+       return true;
+      } else {
+       *seq = q.back()->op;
+       return false;
+      }
+    } /// @returns true if the queue is empty
+
+    /// get_min_uncompleted
+    bool _get_min_uncompleted(
+      uint64_t *seq ///< [out] min uncompleted seq
+      ) {
+      assert(qlock.is_locked());
+      assert(seq);
+      *seq = 0;
+      if (q.empty()) {
+       return true;
+      } else {
+       *seq = q.front()->op;
+       return false;
+      }
+    } /// @returns true if both queues are empty
+
+    void _wake_flush_waiters(list<Context*> *to_queue) {
+      uint64_t seq;
+      if (_get_min_uncompleted(&seq))
+       seq = -1;
+
+      for (list<pair<uint64_t, Context*> >::iterator i =
+            flush_commit_waiters.begin();
+          i != flush_commit_waiters.end() && i->first < seq;
+          flush_commit_waiters.erase(i++)) {
+       to_queue->push_back(i->second);
+      }
+    }
 
     void queue(Op *o) {
       Mutex::Locker l(qlock);
@@ -271,14 +315,19 @@ class KeyValueStore : public ObjectStore,
       assert(apply_lock.is_locked());
       return q.front();
     }
-    Op *dequeue() {
+
+    Op *dequeue(list<Context*> *to_queue) {
+      assert(to_queue);
       assert(apply_lock.is_locked());
       Mutex::Locker l(qlock);
       Op *o = q.front();
       q.pop_front();
       cond.Signal();
+
+      _wake_flush_waiters(to_queue);
       return o;
     }
+
     void flush() {
       Mutex::Locker l(qlock);
 
@@ -297,6 +346,17 @@ class KeyValueStore : public ObjectStore,
           cond.Wait(qlock);
       }
     }
+    bool flush_commit(Context *c) {
+      Mutex::Locker l(qlock);
+      uint64_t seq = 0;
+      if (_get_max_uncompleted(&seq)) {
+       delete c;
+       return true;
+      } else {
+       flush_commit_waiters.push_back(make_pair(seq, c));
+       return false;
+      }
+    }
 
     OpSequencer()
       : qlock("KeyValueStore::OpSequencer::qlock", false, false),
index 15ed31f0311aef9dcf2438bf6ad3675afdb06b3e..a5f5fcb0b44180c835325ad0fef8cca7284ff5ad 100644 (file)
@@ -128,6 +128,22 @@ public:
    */
   struct Sequencer_impl {
     virtual void flush() = 0;
+
+    /**
+     * Async flush_commit
+     *
+     * There are two cases:
+     * 1) sequencer is currently idle: the method returns true and
+     *    c is deleted
+     * 2) sequencer is not idle: the method returns false and c is
+     *    called asyncronously with a value of 0 once all transactions
+     *    queued on this sequencer prior to the call have been applied
+     *    and committed.
+     */
+    virtual bool flush_commit(
+      Context *c ///< [in] context to call upon flush/commit
+      ) = 0; ///< @return true if idle, false otherwise
+
     virtual ~Sequencer_impl() {}
   };
 
@@ -153,6 +169,16 @@ public:
       if (p)
        p->flush();
     }
+
+    /// @see Sequencer_impl::flush_commit()
+    bool flush_commit(Context *c) {
+      if (!p) {
+       delete c;
+       return true;
+      } else {
+       return p->flush_commit(c);
+      }
+    }
   };
 
   /*********************************