]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/MonitorDBStore: add async queue_transaction()
authorSage Weil <sage@redhat.com>
Sun, 10 Aug 2014 22:19:06 +0000 (15:19 -0700)
committerSage Weil <sage@redhat.com>
Wed, 27 Aug 2014 21:36:07 +0000 (14:36 -0700)
Do the transaction async, and then trigger a callback.

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/MonitorDBStore.h

index 1d7f8259db383469001b3c73839b64bddda4bde3..0410ca2f878a0a050da6ea7360618cf7623babd2 100644 (file)
@@ -24,6 +24,7 @@
 
 #include "include/assert.h"
 #include "common/Formatter.h"
+#include "common/Finisher.h"
 #include "common/errno.h"
 
 class MonitorDBStore
@@ -32,6 +33,8 @@ class MonitorDBStore
   bool do_dump;
   int dump_fd;
 
+  Finisher io_work;
+
  public:
 
   struct Op {
@@ -290,6 +293,38 @@ class MonitorDBStore
     return r;
   }
 
+  struct C_DoTransaction : public Context {
+    MonitorDBStore *store;
+    MonitorDBStore::TransactionRef t;
+    Context *oncommit;
+    C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
+                   Context *f)
+      : store(s), t(t), oncommit(f)
+    {}
+    void finish(int r) {
+      int ret = store->apply_transaction(t);
+      oncommit->complete(ret);
+    }
+  };
+
+  /**
+   * queue transaction
+   *
+   * Queue a transaction to commit asynchronously.  Trigger a context
+   * on completion (without any locks held).
+   */
+  void queue_transaction(MonitorDBStore::TransactionRef t,
+                        Context *oncommit) {
+    io_work.queue(new C_DoTransaction(this, t, oncommit));
+  }
+
+  /**
+   * block and flush all io activity
+   */
+  void flush() {
+    io_work.wait_for_empty();
+  }
+
   class StoreIteratorImpl {
   protected:
     bool done;
@@ -514,15 +549,25 @@ class MonitorDBStore
 
   int open(ostream &out) {
     db->init();
-    return db->open(out);
+    int r = db->open(out);
+    if (r < 0)
+      return r;
+    io_work.start();
+    return 0;
   }
 
   int create_and_open(ostream &out) {
     db->init();
-    return db->create_and_open(out);
+    int r = db->create_and_open(out);
+    if (r < 0)
+      return r;
+    io_work.start();
+    return 0;
   }
 
   void close() {
+    // there should be no work queued!
+    io_work.stop();
   }
 
   void compact() {
@@ -537,8 +582,11 @@ class MonitorDBStore
     return db->get_estimated_size(extras);
   }
 
-  MonitorDBStore(const string& path) :
-    db(0), do_dump(false), dump_fd(-1) {
+  MonitorDBStore(const string& path)
+    : db(0),
+      do_dump(false),
+      dump_fd(-1),
+      io_work(g_ceph_context, "monstore") {
     string::const_reverse_iterator rit;
     int pos = 0;
     for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {