]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: add onreadable_sync callback
authorSage Weil <sage@newdream.net>
Fri, 26 Feb 2010 20:29:17 +0000 (12:29 -0800)
committerSage Weil <sage@newdream.net>
Fri, 26 Feb 2010 21:14:33 +0000 (13:14 -0800)
Add an additional completion context that gets called
synchronously when the operation completes, instead of getting
shunted to the async finisher thread.  This allows us to make
sure certain completion events happen without getting 'stuck
in line' behind other completions with conflicting locks.

src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h

index c6707b13f9e9b7b5d157999b4c4634dfea59d951..8541addf79f7cdf861cfe8f018666b6c72addba9 100644 (file)
@@ -649,7 +649,7 @@ int FileStore::umount()
 
 /// -----------------------------
 
-void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable)
+void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync)
 {
   __u64 bytes = 0, ops = 0;
   for (list<Transaction*>::iterator p = tls.begin();
@@ -667,6 +667,7 @@ void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreada
   o->op = op_seq;
   o->tls.swap(tls);
   o->onreadable = onreadable;
+  o->onreadable_sync = onreadable_sync;
   o->ops = ops;
   o->bytes = bytes;
 
@@ -691,7 +692,7 @@ void FileStore::_do_op(Op *o)
   int r = do_transactions(o->tls, o->op);
   op_apply_finish();
   dout(10) << "_do_op " << o << " " << o->op << " r = " << r
-          << ", finisher " << o->onreadable << dendl;
+          << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
   
   /*dout(10) << "op_entry finished " << o->bytes << " bytes, queue now "
           << op_queue_len << " ops, " << op_queue_bytes << " bytes" << dendl;
@@ -707,22 +708,31 @@ void FileStore::_finish_op(Op *o)
 
   if (next_finish == o->op) {
     dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
-            << " queueing " << o->onreadable << dendl;
+            << " queueing " << o->onreadable << " doing " << o->onreadable_sync << dendl;
     next_finish++;
+    if (o->onreadable_sync) {
+      o->onreadable_sync->finish(0);
+      delete o->onreadable_sync;
+    }
     op_finisher.queue(o->onreadable);
 
     while (finish_queue.begin()->first == next_finish) {
-      Context *c = finish_queue.begin()->second;
+      Context *c = finish_queue.begin()->second.first;
+      Context *s = finish_queue.begin()->second.second;
       finish_queue.erase(finish_queue.begin());
       dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
-              << " queueing delayed " << c << dendl;
+              << " queueing delayed " << c << " doing " << s << dendl;
+      if (s) {
+       s->finish(0);
+       delete s;
+      }
       op_finisher.queue(c);
       next_finish++;
     }
   } else {
     dout(10) << "_finish_op " << o->op << " next_finish " << next_finish
             << ", delaying " << o->onreadable << dendl;
-    finish_queue[o->op] = o->onreadable;
+    finish_queue[o->op] = pair<Context*,Context*>(o->onreadable, o->onreadable_sync);
   }
 
   delete o;
@@ -733,14 +743,14 @@ struct C_JournaledAhead : public Context {
   FileStore *fs;
   __u64 op;
   list<ObjectStore::Transaction*> tls;
-  Context *onreadable;
+  Context *onreadable, *onreadable_sync;
   Context *ondisk;
 
   C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
-             Context *onr, Context *ond) :
-    fs(f), op(o), tls(t), onreadable(onr), ondisk(ond) { }
+                  Context *onr, Context *ond, Context *onrsync) :
+    fs(f), op(o), tls(t), onreadable(onr), onreadable_sync(onrsync), ondisk(ond) { }
   void finish(int r) {
-    fs->_journaled_ahead(op, tls, onreadable, ondisk);
+    fs->_journaled_ahead(op, tls, onreadable, ondisk, onreadable_sync);
   }
 };
 
@@ -752,8 +762,8 @@ int FileStore::queue_transaction(Transaction *t)
 }
 
 int FileStore::queue_transactions(list<Transaction*> &tls,
-                                 Context *onreadable,
-                                 Context *ondisk)
+                                 Context *onreadable, Context *ondisk,
+                                 Context *onreadable_sync)
 {
   if (journal && journal->is_writeable()) {
     if (g_conf.filestore_journal_parallel) {
@@ -766,7 +776,7 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
       journal_transactions(tls, op, ondisk);
       
       // queue inside journal lock, to preserve ordering
-      queue_op(op, tls, onreadable);
+      queue_op(op, tls, onreadable, onreadable_sync);
       
       op_journal_finish();
       return 0;
@@ -774,7 +784,8 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
     else if (g_conf.filestore_journal_writeahead) {
       __u64 op = op_journal_start(0);
       dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
-      journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, ondisk));
+      journal_transactions(tls, op,
+                          new C_JournaledAhead(this, op, tls, onreadable, ondisk, onreadable_sync));
       op_journal_finish();
       return 0;
     }
@@ -795,6 +806,10 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
 
   // start on_readable finisher after we queue journal item, as on_readable callback
   // is allowed to delete the Transaction
+  if (onreadable_sync) {
+    onreadable_sync->finish(r);
+    delete onreadable_sync;
+  }
   op_finisher.queue(onreadable, r);
 
   return r;
@@ -802,12 +817,12 @@ int FileStore::queue_transactions(list<Transaction*> &tls,
 
 void FileStore::_journaled_ahead(__u64 op,
                                 list<Transaction*> &tls,
-                                Context *onreadable,
-                                Context *ondisk)
+                                Context *onreadable, Context *ondisk,
+                                Context *onreadable_sync)
 {
   dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
   // this should queue in order because the journal does it's completions in order.
-  queue_op(op, tls, onreadable);
+  queue_op(op, tls, onreadable, onreadable_sync);
   if (ondisk) {
     ondisk->finish(0);
     delete ondisk;
index dcff5dbae74ec7cc72544647be0db4b366145ede..85527c84665324da1cd045576355feaff8c7f801 100644 (file)
@@ -89,7 +89,7 @@ class FileStore : public JournalingObjectStore {
   struct Op {
     __u64 op;
     list<Transaction*> tls;
-    Context *onreadable;
+    Context *onreadable, *onreadable_sync;
     __u64 ops, bytes;
   };
   deque<Op*> op_queue;
@@ -97,7 +97,7 @@ class FileStore : public JournalingObjectStore {
   Cond op_throttle_cond;
   Finisher op_finisher;
   __u64 next_finish;
-  map<__u64, Context*> finish_queue;
+  map<__u64, pair<Context*,Context*> > finish_queue;
 
   ThreadPool op_tp;
   struct OpWQ : public ThreadPool::WorkQueue<Op> {
@@ -136,9 +136,9 @@ class FileStore : public JournalingObjectStore {
 
   void _do_op(Op *o);
   void _finish_op(Op *o);
-  void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable);
+  void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *onreadable_sync);
   void _journaled_ahead(__u64 op, list<Transaction*> &tls,
-                       Context *onreadable, Context *ondisk);
+                       Context *onreadable, Context *ondisk, Context *onreadable_sync);
   friend class C_JournaledAhead;
 
   // flusher thread
@@ -194,7 +194,8 @@ class FileStore : public JournalingObjectStore {
   unsigned _do_transaction(Transaction& t);
 
   int queue_transaction(Transaction* t);
-  int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0);
+  int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+                        Context *onreadable_sync=0);
 
   // ------------------
   // objects
index 34a6155edc1223f6c60866cdaf74e98b3d79c606..409a6db68d546deb6a23bdab83d1fde88bb6acbd 100644 (file)
@@ -468,13 +468,14 @@ public:
   virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
 
   virtual int queue_transaction(Transaction* t) = 0;
-  virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0) {
+  virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0,
+                               Context *onreadable_sync=0) {
     list<Transaction*> tls;
     tls.push_back(t);
-    return queue_transactions(tls, onreadable, ondisk);
+    return queue_transactions(tls, onreadable, ondisk, onreadable_sync);
   }
-  virtual int queue_transactions(list<Transaction*>& tls, Context *onreadable,
-                                Context *ondisk=0) = 0;
+  virtual int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
+                                Context *onreadable_sync=0) = 0;