]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: add Sequencer to queue_transaction interface
authorSage Weil <sage@newdream.net>
Tue, 2 Mar 2010 23:07:29 +0000 (15:07 -0800)
committerSage Weil <sage@newdream.net>
Tue, 2 Mar 2010 23:07:29 +0000 (15:07 -0800)
The (optional) sequencer parameter lets you order operations
submitted via the same Sequencer relative to each other.  If no
sequencer is specified, a default one is used.  Operations are
single-threaded with respect to an individual sequencer, so you
effectively get single threaded io submission to btrfs/whatever
if you do not specify a sequencer.

src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h

index 6f4e016d8ffd244f2aef928ab3a0da8cb8f13784..8dd7998f6e13f8b8afd447f550b2c836d71242e6 100644 (file)
@@ -651,7 +651,7 @@ int FileStore::umount()
 
 /// -----------------------------
 
-void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
+void FileStore::queue_op(Sequencer *posr, __u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
 {
   __u64 bytes = 0, ops = 0;
   for (list<Transaction*>::iterator p = tls.begin();
@@ -674,6 +674,21 @@ void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreada
   o->bytes = bytes;
 
   op_tp.lock();
+
+  OpSequencer *osr;
+  if (!posr)
+    posr = &default_osr;
+  if (posr->p) {
+    osr = (OpSequencer *)posr->p;
+    dout(10) << "queue_op existing osr " << osr << "/" << osr->parent << " w/ q " << osr->q << dendl;
+  } else {
+    osr = new OpSequencer;
+    osr->parent = posr;
+    posr->p = (void *)osr;
+    dout(10) << "queue_op new osr " << osr << "/" << osr->parent << dendl;
+  }
+  osr->q.push_back(o);
+
   while ((g_conf.filestore_queue_max_ops && op_queue_len >= (unsigned)g_conf.filestore_queue_max_ops) ||
         (g_conf.filestore_queue_max_bytes && op_queue_bytes >= (unsigned)g_conf.filestore_queue_max_bytes)) {
     dout(2) << "queue_op " << o << " throttle: "
@@ -684,12 +699,15 @@ void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreada
   op_tp.unlock();
 
   dout(10) << "queue_op " << o << " seq " << op_seq << " " << bytes << " bytes" << dendl;
-  op_wq.queue(o);
+  op_wq.queue(osr);
 }
 
-void FileStore::_do_op(Op *o)
+void FileStore::_do_op(OpSequencer *osr)
 {
-  dout(10) << "_do_op " << o << " " << o->op << " start" << dendl;
+  osr->lock.Lock();
+  Op *o = osr->q.front();
+
+  dout(10) << "_do_op " << o << " " << o->op << " osr " << osr << "/" << osr->parent << " start" << dendl;
   op_apply_start(o->op);
   int r = do_transactions(o->tls, o->op);
   op_apply_finish();
@@ -701,8 +719,21 @@ void FileStore::_do_op(Op *o)
   */
 }
 
-void FileStore::_finish_op(Op *o)
+void FileStore::_finish_op(OpSequencer *osr)
 {
+  Op *o = osr->q.front();
+  osr->q.pop_front();
+  
+  if (osr->q.empty()) {
+    dout(10) << "_finish_op last op " << o << " on osr " << osr << "/" << osr->parent << dendl;
+    osr->parent->p = NULL;
+    osr->lock.Unlock();  // locked in _do_op
+    delete osr;
+  } else {
+    dout(10) << "_finish_op on osr " << osr << "/" << osr->parent << " q now " << osr->q << dendl;
+    osr->lock.Unlock();  // locked in _do_op
+  }
+
   // called with tp lock held
   op_queue_len--;
   op_queue_bytes -= o->bytes;
@@ -743,27 +774,28 @@ void FileStore::_finish_op(Op *o)
 
 struct C_JournaledAhead : public Context {
   FileStore *fs;
+  ObjectStore::Sequencer *osr;
   __u64 op;
   list<ObjectStore::Transaction*> tls;
   Context *onreadable, *onreadable_sync;
   Context *ondisk;
 
-  C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
+  C_JournaledAhead(FileStore *f, ObjectStore::Sequencer *os, __u64 o, list<ObjectStore::Transaction*>& t,
                   Context *onr, Context *ond, Context *onrsync) :
-    fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
+    fs(f), osr(os), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
   void finish(int r) {
-    fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync);
+    fs->_journaled_ahead(osr, op, tls, onreadable, ondisk, onreadable_sync);
   }
 };
 
-int FileStore::queue_transaction(Transaction *t)
+int FileStore::queue_transaction(Sequencer *osr, Transaction *t)
 {
   list<Transaction*> tls;
   tls.push_back(t);
-  return queue_transactions(tls, new C_DeleteTransaction(t));
+  return queue_transactions(osr, tls, new C_DeleteTransaction(t));
 }
 
-int FileStore::queue_transactions(list<Transaction*> &tls,
+int FileStore::queue_transactions(Sequencer *osr, list<Transaction*> &tls,
                                  Context *onreadable, Context *ondisk,
                                  Context *onreadable_sync)
 {
@@ -778,7 +810,7 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
       journal_transactions(tls, op, ondisk);
       
       // queue inside journal lock, to preserve ordering
-      queue_op(op, tls, onreadable, onreadable_sync);
+      queue_op(osr, op, tls, onreadable, onreadable_sync);
       
       op_journal_finish();
       return 0;
@@ -787,7 +819,7 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
       __u64 op = op_journal_start(0);
       dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
       journal_transactions(tls, op,
-                          new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync));
+                          new C_JournaledAhead(this, osr, op, tls, onreadable, ondisk, onreadable_sync));
       op_journal_finish();
       return 0;
     }
@@ -817,14 +849,14 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
   return r;
 }
 
-void FileStore::_journaled_ahead(__u64 op,
+void FileStore::_journaled_ahead(Sequencer *osr, __u64 op,
                                 list<Transaction*> &tls,
                                 Context *onreadable, Context *ondisk,
                                 Context *onreadable_sync)
 {
   dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
   // this should queue in order because the journal does it's completions in order.
-  queue_op(op, tls, onreadable, onreadable_sync);
+  queue_op(osr, op, tls, onreadable, onreadable_sync);
 
   // do ondisk completions async, to prevent any onreadable_sync completions
   // getting blocked behind an ondisk completion.
@@ -885,7 +917,7 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls,
     C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
 
     dout(10) << "apply queued" << dendl;
-    queue_transactions(tls, onreadable, ondisk);
+    queue_transactions(NULL, tls, onreadable, ondisk);
     
     my_lock.Lock();
     while (!done)
index ed6038faa8ec3e253d958e2c0f65b01ffd15f9c6..bd5267a5e3d67ab78bfc1f5b65d975b5d65d496d 100644 (file)
@@ -94,7 +94,14 @@ class FileStore : public JournalingObjectStore {
     Context *onreadable, *onreadable_sync;
     __u64 ops, bytes;
   };
-  deque<Op*> op_queue;
+  struct OpSequencer {
+    Sequencer *parent;
+    list<Op*> q;
+    Mutex lock;
+    OpSequencer() : lock("FileStore::OpSequencer::lock", false, false) {}
+  };
+  Sequencer default_osr;
+  deque<OpSequencer*> op_queue;
   __u64 op_queue_len, op_queue_bytes;
   Cond op_throttle_cond;
   Finisher op_finisher;
@@ -102,44 +109,44 @@ class FileStore : public JournalingObjectStore {
   map<__u64, pair<Context*,Context*> > finish_queue;
 
   ThreadPool op_tp;
-  struct OpWQ : public ThreadPool::WorkQueue<Op> {
+  struct OpWQ : public ThreadPool::WorkQueue<OpSequencer> {
     FileStore *store;
-    OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<Op>("FileStore::OpWQ", tp), store(fs) {}
+    OpWQ(FileStore *fs, ThreadPool *tp) : ThreadPool::WorkQueue<OpSequencer>("FileStore::OpWQ", tp), store(fs) {}
 
-    bool _enqueue(Op *o) {
-      store->op_queue.push_back(o);
+    bool _enqueue(OpSequencer *osr) {
+      store->op_queue.push_back(osr);
       store->op_queue_len++;
-      store->op_queue_bytes += o->bytes;
+      store->op_queue_bytes += osr->q.back()->bytes;
       return true;
     }
-    void _dequeue(Op *o) {
+    void _dequeue(OpSequencer *o) {
       assert(0);
     }
     bool _empty() {
       return store->op_queue.empty();
     }
-    Op *_dequeue() {
+    OpSequencer *_dequeue() {
       if (store->op_queue.empty())
        return NULL;
-      Op *o = store->op_queue.front();
+      OpSequencer *osr = store->op_queue.front();
       store->op_queue.pop_front();
-      return o;
+      return osr;
     }
-    void _process(Op *o) {
-      store->_do_op(o);
+    void _process(OpSequencer *osr) {
+      store->_do_op(osr);
     }
-    void _process_finish(Op *o) {
-      store->_finish_op(o);
+    void _process_finish(OpSequencer *osr) {
+      store->_finish_op(osr);
     }
     void _clear() {
       assert(store->op_queue.empty());
     }
   } op_wq;
 
-  void _do_op(Op *o);
-  void _finish_op(Op *o);
-  void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
-  void _journaled_ahead(__u64 op, list<Transaction*> &tls,
+  void _do_op(OpSequencer *o);
+  void _finish_op(OpSequencer *o);
+  void queue_op(Sequencer *osr, __u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
+  void _journaled_ahead(Sequencer *osr, __u64 op, list<Transaction*> &tls,
                        Context *onreadable, Context *ondisk, Context *onreadable_sync);
   friend class C_JournaledAhead;
 
@@ -195,8 +202,8 @@ class FileStore : public JournalingObjectStore {
   void _transaction_finish(int id);
   unsigned _do_transaction(Transaction& t);
 
-  int queue_transaction(Transaction* t);
-  int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+  int queue_transaction(Sequencer *osr, Transaction* t);
+  int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
                         Context *onreadable_sync=0);
 
   // ------------------
index 409a6db68d546deb6a23bdab83d1fde88bb6acbd..beb32d572438e2c90ecf68a382aa8e1a929ecabd 100644 (file)
@@ -73,6 +73,14 @@ public:
     map<int,int> free_extent_dist_sum;     // powers of two
   };
   
+
+  struct Sequencer {
+    void *p;
+    Sequencer() : p(NULL) {}
+    ~Sequencer() {
+      assert(p == NULL);
+    }
+  };
   
 
   /*********************************
@@ -467,14 +475,14 @@ public:
   virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0;
   virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
 
-  virtual int queue_transaction(Transaction* t) = 0;
-  virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0,
+  virtual int queue_transaction(Sequencer *osr, Transaction* t) = 0;
+  virtual int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
                                Context *onreadable_sync=0) {
     list<Transaction*> tls;
     tls.push_back(t);
-    return queue_transactions(tls, onreadable, ondisk, onreadable_sync);
+    return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync);
   }
-  virtual int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+  virtual int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
                                 Context *onreadable_sync=0) = 0;