]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ObjectStore: add Context management to Transaction
authorSamuel Just <sam.just@inktank.com>
Sun, 3 Mar 2013 23:47:48 +0000 (15:47 -0800)
committerSamuel Just <sam.just@inktank.com>
Thu, 14 Mar 2013 02:45:12 +0000 (19:45 -0700)
ObjectStore now appends passed contexts in queue_transaction
to the Transaction contexts and uses that to pass into
the virtual queue_transactions.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h

index cff138b97f46be0e2993f2ffa871220f8fd41451..bd235586f90ac091c07b00300ba7ef9cda10e383 100644 (file)
@@ -2008,10 +2008,13 @@ struct C_JournaledAhead : public Context {
 };
 
 int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
-                                 Context *onreadable, Context *ondisk,
-                                 Context *onreadable_sync,
                                  TrackedOpRef osd_op)
 {
+  Context *onreadable;
+  Context *ondisk;
+  Context *onreadable_sync;
+  ObjectStore::Transaction::collect_contexts(
+    tls, &onreadable, &ondisk, &onreadable_sync);
   if (g_conf->filestore_blackhole) {
     dout(0) << "queue_transactions filestore_blackhole = TRUE, dropping transaction" << dendl;
     return 0;
index 144939a26679b960d3a761074e3f261f52b71ef7..f1b9c1b04c9f5dc38327b0064b862fabf0cbf0d4 100644 (file)
@@ -315,8 +315,6 @@ public:
   unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num);
 
   int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
-                        Context *onreadable, Context *ondisk=0,
-                        Context *onreadable_sync=0,
                         TrackedOpRef op = TrackedOpRef());
 
   /**
index a36a7fbb3c4b802b15f3efd485e755bef08e6d2f..23265041f29a4ca1c6df8e4567a2443ab0493606 100644 (file)
@@ -167,7 +167,52 @@ public:
     bool use_pool_override;
     bool replica;
 
+    list<Context *> on_applied;
+    list<Context *> on_commit;
+    list<Context *> on_applied_sync;
+
   public:
+    void register_on_applied(Context *c) {
+      on_applied.push_back(c);
+    }
+    void register_on_commit(Context *c) {
+      on_commit.push_back(c);
+    }
+    void register_on_applied_sync(Context *c) {
+      on_applied_sync.push_back(c);
+    }
+
+    static void collect_contexts(
+      list<Transaction *> &t,
+      Context **out_on_applied,
+      Context **out_on_commit,
+      Context **out_on_applied_sync) {
+      assert(out_on_applied);
+      assert(out_on_commit);
+      assert(out_on_applied_sync);
+      list<Context *> on_applied, on_commit, on_applied_sync;
+      for (list<Transaction *>::iterator i = t.begin();
+          i != t.end();
+          ++i) {
+       on_applied.splice(on_applied.end(), (*i)->on_applied);
+       on_commit.splice(on_commit.end(), (*i)->on_commit);
+       on_applied_sync.splice(on_applied_sync.end(), (*i)->on_applied_sync);
+      }
+      *out_on_applied = C_Contexts::list_to_context(on_applied);
+      *out_on_commit = C_Contexts::list_to_context(on_commit);
+      *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
+    }
+
+    Context *get_on_applied() {
+      return C_Contexts::list_to_context(on_applied);
+    }
+    Context *get_on_commit() {
+      return C_Contexts::list_to_context(on_commit);
+    }
+    Context *get_on_applied_sync() {
+      return C_Contexts::list_to_context(on_applied_sync);
+    }
+
     void set_pool_override(int64_t pool) {
       pool_override = pool;
     }
@@ -181,6 +226,9 @@ public:
       std::swap(largest_data_len, other.largest_data_len);
       std::swap(largest_data_off, other.largest_data_off);
       std::swap(largest_data_off_in_tbl, other.largest_data_off_in_tbl);
+      std::swap(on_applied, other.on_applied);
+      std::swap(on_commit, other.on_commit);
+      std::swap(on_applied_sync, other.on_applied_sync);
       tbl.swap(other.tbl);
     }
 
@@ -194,6 +242,9 @@ public:
        largest_data_off_in_tbl = tbl.length() + other.largest_data_off_in_tbl;
       }
       tbl.append(other.tbl);
+      on_applied.splice(on_applied.end(), other.on_applied);
+      on_commit.splice(on_commit.end(), other.on_commit);
+      on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);
     }
 
     uint64_t get_encoded_bytes() {
@@ -664,6 +715,7 @@ public:
     tls.push_back(t);
     return queue_transactions(osr, tls, new C_DeleteTransaction(t));
   }
+
   int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
                                Context *onreadable_sync=0,
                                TrackedOpRef op = TrackedOpRef()) {
@@ -671,9 +723,22 @@ public:
     tls.push_back(t);
     return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op);
   }
-  virtual int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
-                                Context *onreadable_sync=0,
-                                TrackedOpRef op = TrackedOpRef()) = 0;
+
+  int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
+                        Context *onreadable, Context *ondisk=0,
+                        Context *onreadable_sync=0,
+                        TrackedOpRef op = TrackedOpRef()) {
+    assert(!tls.empty());
+    tls.back()->register_on_applied(onreadable);
+    tls.back()->register_on_commit(ondisk);
+    tls.back()->register_on_applied_sync(onreadable_sync);
+    return queue_transactions(osr, tls, op);
+  }
+
+  virtual int queue_transactions(
+    Sequencer *osr, list<Transaction*>& tls,
+    TrackedOpRef op = TrackedOpRef()) = 0;
+
 
   int queue_transactions(
     Sequencer *osr,