#include "include/assert.h"
#include "common/Formatter.h"
+#include "common/Finisher.h"
#include "common/errno.h"
class MonitorDBStore
bool do_dump;
int dump_fd;
+ Finisher io_work;
+
public:
struct Op {
return r;
}
+ struct C_DoTransaction : public Context {
+ MonitorDBStore *store;
+ MonitorDBStore::TransactionRef t;
+ Context *oncommit;
+ C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
+ Context *f)
+ : store(s), t(t), oncommit(f)
+ {}
+ void finish(int r) {
+ int ret = store->apply_transaction(t);
+ oncommit->complete(ret);
+ }
+ };
+
+ /**
+ * queue transaction
+ *
+ * Queue a transaction to commit asynchronously. Trigger a context
+ * on completion (without any locks held).
+ */
+ void queue_transaction(MonitorDBStore::TransactionRef t,
+ Context *oncommit) {
+ io_work.queue(new C_DoTransaction(this, t, oncommit));
+ }
+
+ /**
+ * block and flush all io activity
+ */
+ void flush() {
+ io_work.wait_for_empty();
+ }
+
class StoreIteratorImpl {
protected:
bool done;
int open(ostream &out) {
db->init();
- return db->open(out);
+ int r = db->open(out);
+ if (r < 0)
+ return r;
+ io_work.start();
+ return 0;
}
int create_and_open(ostream &out) {
db->init();
- return db->create_and_open(out);
+ int r = db->create_and_open(out);
+ if (r < 0)
+ return r;
+ io_work.start();
+ return 0;
}
void close() {
+ // there should be no work queued!
+ io_work.stop();
}
void compact() {
return db->get_estimated_size(extras);
}
- MonitorDBStore(const string& path) :
- db(0), do_dump(false), dump_fd(-1) {
+ MonitorDBStore(const string& path)
+ : db(0),
+ do_dump(false),
+ dump_fd(-1),
+ io_work(g_ceph_context, "monstore") {
string::const_reverse_iterator rit;
int pos = 0;
for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {