]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: add per-sequencer flush operation
authorSage Weil <sage@newdream.net>
Fri, 17 Dec 2010 20:51:19 +0000 (12:51 -0800)
committerSage Weil <sage@newdream.net>
Fri, 17 Dec 2010 22:15:35 +0000 (14:15 -0800)
Signed-off-by: Sage Weil <sage@newdream.net>
src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h

index 8cf22e97233f8d29d5d45c434e3dc64f2b1c4eb2..d180f9daa666a8399c3b822f24e8eda20b7517f0 100644 (file)
@@ -1269,10 +1269,10 @@ void FileStore::queue_op(Sequencer *posr, uint64_t op_seq, list<Transaction*>& t
   } else {
     osr = new OpSequencer;
     osr->parent = posr;
-    posr->p = (void *)osr;
+    posr->p = osr;
     dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
   }
-  osr->q.push_back(o);
+  osr->queue(o);
 
   op_queue_len++;
   op_queue_bytes += bytes;
@@ -1300,8 +1300,8 @@ void FileStore::op_queue_throttle()
 
 void FileStore::_do_op(OpSequencer *osr)
 {
-  osr->lock.Lock();
-  Op *o = osr->q.front();
+  osr->apply_lock.Lock();
+  Op *o = osr->peek_queue();
 
   dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
   int r = do_transactions(o->tls, o->op);
@@ -1316,18 +1316,10 @@ void FileStore::_do_op(OpSequencer *osr)
 
 void FileStore::_finish_op(OpSequencer *osr)
 {
-  Op *o = osr->q.front();
-  osr->q.pop_front();
+  Op *o = osr->dequeue();
   
-  if (osr->q.empty()) {
-    dout(10) << "_finish_op last op " << o << " on osr " << osr << "/" << osr->parent << dendl;
-    osr->parent->p = NULL;
-    osr->lock.Unlock();  // locked in _do_op
-    delete osr;
-  } else {
-    dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << dendl; // << " q now " << osr->q << dendl;
-    osr->lock.Unlock();  // locked in _do_op
-  }
+  dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << dendl;
+  osr->apply_lock.Unlock();  // locked in _do_op
 
   // called with tp lock held
   op_queue_len--;
index 79d43e908c0e3e0682da56bc56b3d707f7d147bb..2c062bee146dda9effe191303c02b0823ca6575c 100644 (file)
@@ -98,11 +98,44 @@ class FileStore : public JournalingObjectStore {
     Context *onreadable, *onreadable_sync;
     uint64_t ops, bytes;
   };
-  struct OpSequencer {
-    Sequencer *parent;
+  class OpSequencer : public Sequencer_impl {
+    Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
     list<Op*> q;
-    Mutex lock;
-    OpSequencer() : lock("FileStore::OpSequencer::lock", false, false) {}
+    Cond cond;
+  public:
+    Sequencer *parent;
+    Mutex apply_lock;  // for apply mutual exclusion
+    
+    void queue(Op *o) {
+      Mutex::Locker l(qlock);
+      q.push_back(o);
+    }
+    Op *peek_queue() {
+      assert(apply_lock.is_locked());
+      return q.front();
+    }
+    Op *dequeue() {
+      assert(apply_lock.is_locked());
+      Mutex::Locker l(qlock);
+      Op *o = q.front();
+      q.pop_front();
+      cond.Signal();
+      return o;
+    }
+    void flush() {
+      Mutex::Locker l(qlock);
+      if (!q.empty()) {
+       uint64_t seq = q.back()->op;
+       while (!q.empty() && q.front()->op <= seq)
+         cond.Wait(qlock);
+      }
+    }
+
+    OpSequencer() : qlock("FileStore::OpSequencer::qlock", false, false),
+                   apply_lock("FileStore::OpSequencer::apply_lock", false, false) {}
+    ~OpSequencer() {
+      assert(q.empty());
+    }
   };
   Sequencer default_osr;
   deque<OpSequencer*> op_queue;
index 71fcf73154e821261cfaecfa937278c0c4fe0520..e3a6e06a28d338ad5f1999322f6b4c6a273f26a9 100644 (file)
@@ -74,11 +74,19 @@ public:
   };
   
 
+  struct Sequencer_impl {
+    virtual void flush() = 0;
+    virtual ~Sequencer_impl() {}
+  };
   struct Sequencer {
-    void *p;
+    Sequencer_impl *p;
     Sequencer() : p(NULL) {}
     ~Sequencer() {
-      assert(p == NULL);
+      delete p;
+    }
+    void flush() {
+      if (p)
+       p->flush();
     }
   };