- acks: never.
- throttle: ??
- rmw: rmw must block on prior fs writes.
+ * JourningObjectStore interface needs work?
- separate reads/writes into separate op queues?
-
OPTION(filestore_flusher, 0, OPT_BOOL, true),
OPTION(filestore_flusher_max_fds, 0, OPT_INT, 512),
OPTION(filestore_sync_flush, 0, OPT_BOOL, false),
+ OPTION(filestore_journal_parallel, 0, OPT_BOOL, false),
+ OPTION(filestore_journal_writeahead, 0, OPT_BOOL, false),
OPTION(ebofs, 0, OPT_BOOL, false),
OPTION(ebofs_cloneable, 0, OPT_BOOL, true),
OPTION(ebofs_verify, 0, OPT_BOOL, false),
bool filestore_flusher;
int filestore_flusher_max_fds;
bool filestore_sync_flush;
+ bool filestore_journal_parallel;
+ bool filestore_journal_writeahead;
// ebofs
bool ebofs;
<< " " << h.len << " bytes"
<< dendl;
- if (seq && h.seq != seq) {
+ if (seq && h.seq < seq) {
dout(2) << "read_entry " << read_pos << " : got seq " << h.seq << ", expected " << seq << ", stopping" << dendl;
return false;
}
snprintf(fn, sizeof(fn), "%s/current/commit_op_seq", basedir.c_str());
op_fd = ::open(fn, O_CREAT|O_RDWR, 0644);
assert(op_fd >= 0);
- op_seq = 0;
- ::read(op_fd, &op_seq, sizeof(op_seq));
+ __u64 initial_op_seq = 0;
+ ::read(op_fd, &initial_op_seq, sizeof(initial_op_seq));
- dout(5) << "mount op_seq is " << op_seq << dendl;
+ dout(5) << "mount op_seq is " << initial_op_seq << dendl;
// journal
open_journal();
- r = journal_replay();
+ r = journal_replay(initial_op_seq);
if (r == -EINVAL) {
dout(0) << "mount got EINVAL on journal open, not mounting" << dendl;
return r;
}
journal_start();
sync_thread.create();
+ op_thread.create();
flusher_thread.create();
+ op_finisher.start();
// is this btrfs?
stop = true;
sync_cond.Signal();
flusher_cond.Signal();
+ op_cond.Signal();
lock.Unlock();
sync_thread.join();
+ op_thread.join();
flusher_thread.join();
journal_stop();
+ op_finisher.stop();
+
::close(fsid_fd);
::close(op_fd);
}
+/// -----------------------------
+void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable,
+ Context *oncommit)
+{
+ op_lock.Lock();
+ Op *o = new Op;
+ dout(10) << "queue_op " << o << " " << op_seq << dendl;
+ o->op = op_seq;
+ o->tls.swap(tls);
+ o->onreadable = onreadable;
+ o->oncommit = oncommit;
+ op_queue.push_back(o);
+ op_cond.Signal();
+ op_lock.Unlock();
+}
+
+void FileStore::op_entry()
+{
+ op_lock.Lock();
+ while (1) {
+ while (!op_queue.empty()) {
+ Op *o = op_queue.front();
+ op_queue.pop_front();
+ op_lock.Unlock();
-unsigned FileStore::apply_transaction(Transaction &t,
- Context *onjournal,
- Context *ondisk)
+ dout(10) << "op_entry " << o << " " << o->op << " start" << dendl;
+ op_apply_start(o->op, o->oncommit);
+ int r = do_transactions(o->tls, o->op);
+ op_apply_finish();
+ dout(10) << "op_entry " << o << " " << o->op << " r = " << r
+ << ", finisher " << o->onreadable << dendl;
+
+ op_finisher.queue(o->onreadable, r);
+
+ delete o;
+
+ op_lock.Lock();
+ }
+ if (stop)
+ break;
+ op_cond.Wait(op_lock);
+ }
+ op_lock.Unlock();
+}
+
+
+int FileStore::queue_transactions(list<Transaction*> &tls,
+ Context *onreadable,
+ Context *onjournal,
+ Context *ondisk)
{
- list<Transaction*> tls;
- tls.push_back(&t);
- return apply_transactions(tls, onjournal, ondisk);
+ __u64 op;
+
+ op = op_journal_start(0);
+ journal_transactions(tls, op, onjournal);
+
+ // queue inside journal lock, to preserve ordering
+ queue_op(op, tls, onreadable, ondisk);
+
+ op_journal_finish();
+
+ return 0;
}
-unsigned FileStore::apply_transactions(list<Transaction*> &tls,
- Context *onjournal,
- Context *ondisk)
+int FileStore::do_transactions(list<Transaction*> &tls, __u64 op_seq)
{
- int r = 0;
- op_start();
+ int r;
__u64 bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
int id = _transaction_start(bytes, ops);
if (id < 0) {
- op_journal_start();
- op_finish();
return id;
}
break;
}
+ ::pwrite(op_fd, &op_seq, sizeof(op_seq), 0);
+
_transaction_finish(id);
+ return r;
+}
- op_journal_start();
- dout(10) << "op_seq is " << op_seq << dendl;
- if (r >= 0) {
- journal_transactions(tls, onjournal, ondisk);
+unsigned FileStore::apply_transaction(Transaction &t,
+ Context *onjournal,
+ Context *ondisk)
+{
+ list<Transaction*> tls;
+ tls.push_back(&t);
+ return apply_transactions(tls, onjournal, ondisk);
+}
+
+unsigned FileStore::apply_transactions(list<Transaction*> &tls,
+ Context *onjournal,
+ Context *ondisk)
+{
+ int r = 0;
- ::pwrite(op_fd, &op_seq, sizeof(op_seq), 0);
+ if (g_conf.filestore_journal_parallel) {
+ // use op pool
+ Cond my_cond;
+ Mutex my_lock("FileStore::apply_transaction::my_lock");
+ bool done;
+ C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
+ dout(10) << "apply queued" << dendl;
+ queue_transactions(tls, onreadable, onjournal, ondisk);
+
+ my_lock.Lock();
+ while (!done)
+ my_cond.Wait(my_lock);
+ my_lock.Unlock();
+ dout(10) << "apply done r = " << r << dendl;
} else {
- delete onjournal;
- delete ondisk;
+ __u64 op_seq = op_apply_start(0, ondisk);
+ r = do_transactions(tls, op_seq);
+ op_apply_finish();
+
+ if (r >= 0) {
+ op_journal_start(op_seq);
+ journal_transactions(tls, op_seq, onjournal);
+ op_journal_finish();
+ } else {
+ delete onjournal;
+ delete ondisk;
+ }
}
-
- op_finish();
return r;
}
void sync_fs(); // actuall sync underlying fs
+ // op thread
+ struct Op {
+ __u64 op;
+ list<Transaction*> tls;
+ Context *onreadable, *oncommit;
+ };
+
+ Finisher op_finisher;
+ Mutex op_lock;
+ Cond op_cond;
+ list<Op*> op_queue;
+ void op_entry();
+ struct OpThread : public Thread {
+ FileStore *fs;
+ OpThread(FileStore *f) : fs(f) {}
+ void *entry() {
+ fs->op_entry();
+ return 0;
+ }
+ } op_thread;
+ void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *ondisk);
+
// flusher thread
Cond flusher_cond;
list<__u64> flusher_queue;
}
} flusher_thread;
bool queue_flusher(int fd, __u64 off, __u64 len);
+
int open_journal();
public:
attrs(this), fake_attrs(false),
collections(this), fake_collections(false),
lock("FileStore::lock"),
- sync_epoch(0), stop(false), sync_thread(this), flusher_queue_len(0), flusher_thread(this) { }
+ sync_epoch(0), stop(false), sync_thread(this),
+ op_lock("FileStore::op_lock"), op_thread(this),
+ flusher_queue_len(0), flusher_thread(this) { }
int mount();
int umount();
int statfs(struct statfs *buf);
+ int do_transactions(list<Transaction*> &tls, __u64 op_seq);
unsigned apply_transaction(Transaction& t, Context *onjournal=0, Context *ondisk=0);
unsigned apply_transactions(list<Transaction*>& tls, Context *onjournal=0, Context *ondisk=0);
int _transaction_start(__u64 bytes, __u64 ops);
void _transaction_finish(int id);
unsigned _do_transaction(Transaction& t);
+ int queue_transactions(list<Transaction*>& tls, Context *onreadable,
+ Context *onjournal=0, Context *ondisk=0);
+
// ------------------
// objects
int pick_object_revision_lt(sobject_t& oid) {
#define dout_prefix *_dout << dbeginl << "journal "
-int JournalingObjectStore::journal_replay()
+
+void JournalingObjectStore::journal_start()
+{
+ dout(10) << "journal_start" << dendl;
+ finisher.start();
+}
+
+void JournalingObjectStore::journal_stop()
{
+ dout(10) << "journal_stop" << dendl;
+ finisher.stop();
+ if (journal) {
+ journal->close();
+ delete journal;
+ journal = 0;
+ }
+}
+
+int JournalingObjectStore::journal_replay(__u64 fs_op_seq)
+{
+ dout(10) << "journal_replay fs op_seq " << fs_op_seq << dendl;
+ op_seq = applied_seq = fs_op_seq;
+
if (!journal)
return 0;
seq++; // we expect the next op
}
- committed_op_seq = op_seq;
+ committed_seq = op_seq;
// done reading, make writeable.
journal->make_writeable();
return count;
}
+
+
+// ------------------------------------
+
+__u64 JournalingObjectStore::op_apply_start(__u64 op, Context *ondisk)
+{
+ lock.Lock();
+ while (blocked) {
+ dout(10) << "op_apply_start blocked" << dendl;
+ cond.Wait(lock);
+ }
+ open_ops++;
+
+ if (!op)
+ op = ++op_seq;
+ assert(op > applied_seq); // !!
+ applied_seq = op;
+
+ dout(10) << "op_apply_start " << op << dendl;
+
+ if (ondisk)
+ commit_waiters[op].push_back(ondisk);
+
+ lock.Unlock();
+ return op;
+}
+
+void JournalingObjectStore::op_apply_finish()
+{
+ dout(10) << "op_apply_finish" << dendl;
+ lock.Lock();
+ if (--open_ops == 0)
+ cond.Signal();
+ lock.Unlock();
+}
+
+__u64 JournalingObjectStore::op_journal_start(__u64 op)
+{
+ journal_lock.Lock();
+ if (!op) {
+ lock.Lock();
+ op = ++op_seq;
+ lock.Unlock();
+ }
+ return op;
+}
+
+void JournalingObjectStore::op_journal_finish()
+{
+ journal_lock.Unlock();
+}
+
+
+// ------------------------------------------
+
+bool JournalingObjectStore::commit_start()
+{
+ // suspend new ops...
+ Mutex::Locker l(lock);
+
+ dout(10) << "commit_start" << dendl;
+ blocked = true;
+ while (open_ops > 0) {
+ dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+ cond.Wait(lock);
+ }
+
+ if (applied_seq == committed_seq) {
+ dout(10) << "commit_start nothing to do" << dendl;
+ blocked = false;
+ cond.Signal();
+ assert(commit_waiters.empty());
+ return false;
+ }
+ dout(10) << "commit_start" << dendl;
+ return true;
+}
+
+void JournalingObjectStore::commit_started()
+{
+ Mutex::Locker l(lock);
+ dout(10) << "commit_started" << dendl;
+ // allow new ops. (underlying fs should now be committing all prior ops)
+ committing_seq = applied_seq;
+ blocked = false;
+ cond.Signal();
+}
+
+void JournalingObjectStore::commit_finish()
+{
+ Mutex::Locker l(lock);
+ dout(10) << "commit_finish" << dendl;
+
+ if (journal)
+ journal->committed_thru(committing_seq);
+ committed_seq = committing_seq;
+
+ map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
+ while (p != commit_waiters.end() &&
+ p->first <= committing_seq) {
+ finisher.queue(p->second);
+ commit_waiters.erase(p++);
+ }
+}
+
+void JournalingObjectStore::journal_transaction(ObjectStore::Transaction& t, __u64 op,
+ Context *onjournal)
+{
+ Mutex::Locker l(lock);
+ dout(10) << "journal_transaction " << op << dendl;
+ if (journal && journal->is_writeable()) {
+ bufferlist tbl;
+ t.encode(tbl);
+ journal->submit_entry(op, tbl, onjournal);
+ } else if (onjournal)
+ commit_waiters[op].push_back(onjournal);
+}
+
+void JournalingObjectStore::journal_transactions(list<ObjectStore::Transaction*>& tls, __u64 op,
+ Context *onjournal)
+{
+ Mutex::Locker l(lock);
+ dout(10) << "journal_transactions " << op << dendl;
+
+ if (journal && journal->is_writeable()) {
+ bufferlist tbl;
+ for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++)
+ (*p)->encode(tbl);
+ journal->submit_entry(op, tbl, onjournal);
+ } else if (onjournal)
+ commit_waiters[op].push_back(onjournal);
+}
class JournalingObjectStore : public ObjectStore {
protected:
- __u64 op_seq;
- __u64 committing_op_seq, committed_op_seq;
- Journal *journal;
- Finisher finisher;
+ __u64 op_seq, applied_seq;
+ __u64 committing_seq, committed_seq;
map<version_t, vector<Context*> > commit_waiters;
- RWLock op_lock;
- Mutex journal_lock;
- Mutex lock;
-
- void journal_start() {
- finisher.start();
- }
- void journal_stop() {
- finisher.stop();
- if (journal) {
- journal->close();
- delete journal;
- journal = 0;
- }
- }
- int journal_replay();
-
- void op_start() {
- op_lock.get_read();
- }
- void op_journal_start() {
- journal_lock.Lock();
- }
- void op_finish() {
- journal_lock.Unlock();
- op_lock.put_read();
- }
-
- bool commit_start() {
- // suspend new ops...
- op_lock.get_write();
- Mutex::Locker l(lock);
- if (op_seq == committed_op_seq) {
- op_lock.put_write();
- assert(commit_waiters.empty());
- return false;
- }
- return true;
- }
- void commit_started() {
- Mutex::Locker l(lock);
- // allow new ops
- // (underlying fs should now be committing all prior ops)
- committing_op_seq = op_seq;
- op_lock.put_write();
- }
- void commit_finish() {
- Mutex::Locker l(lock);
+ int open_ops;
+ bool blocked;
- if (journal)
- journal->committed_thru(committing_op_seq);
- committed_op_seq = committing_op_seq;
-
- map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
- while (p != commit_waiters.end() &&
- p->first <= committing_op_seq) {
- finisher.queue(p->second);
- commit_waiters.erase(p++);
- }
- }
-
- void journal_transaction(ObjectStore::Transaction& t, Context *onjournal, Context *ondisk) {
- Mutex::Locker l(lock);
+ Journal *journal;
+ Finisher finisher;
- ++op_seq;
+ Cond cond;
+ Mutex journal_lock;
+ Mutex lock;
- if (journal && journal->is_writeable()) {
- bufferlist tbl;
- t.encode(tbl);
- journal->submit_entry(op_seq, tbl, onjournal);
- } else if (onjournal)
- commit_waiters[op_seq].push_back(onjournal);
+protected:
+ void journal_start();
+ void journal_stop();
+ int journal_replay(__u64 fs_op_seq);
- if (ondisk)
- commit_waiters[op_seq].push_back(ondisk);
- }
- void journal_transactions(list<ObjectStore::Transaction*>& tls, Context *onjournal, Context *ondisk) {
- Mutex::Locker l(lock);
+ // --
+ __u64 op_apply_start(__u64 op, Context *ondisk);
+ void op_apply_finish();
+ __u64 op_journal_start(__u64 op);
+ void op_journal_finish();
- ++op_seq;
+ void journal_transaction(ObjectStore::Transaction& t, __u64 op, Context *onjournal);
+ void journal_transactions(list<ObjectStore::Transaction*>& tls, __u64 op, Context *onjournal);
- if (journal && journal->is_writeable()) {
- bufferlist tbl;
- for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++)
- (*p)->encode(tbl);
- journal->submit_entry(op_seq, tbl, onjournal);
- } else if (onjournal)
- commit_waiters[op_seq].push_back(onjournal);
+ bool commit_start();
+ void commit_started(); // allow new ops (underlying fs should now be committing all prior ops)
+ void commit_finish();
- if (ondisk)
- commit_waiters[op_seq].push_back(ondisk);
- }
public:
- JournalingObjectStore() : op_seq(0), committing_op_seq(0), committed_op_seq(0),
- journal(0),
- op_lock("JournalingObjectStore::op_lock"),
+ JournalingObjectStore() : op_seq(0),
+ applied_seq(0), committing_seq(0), committed_seq(0),
+ open_ops(0), blocked(false),
+ journal(NULL),
journal_lock("JournalingObjectStore::journal_lock"),
lock("JournalingObjectStore::lock") { }