/// -----------------------------
-void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable,
- Context *oncommit)
+void FileStore::queue_op(__u64 op_seq, list<Transaction*>& tls, Context *onreadable)
{
__u64 bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
o->op = op_seq;
o->tls.swap(tls);
o->onreadable = onreadable;
- o->oncommit = oncommit;
o->ops = ops;
o->bytes = bytes;
void FileStore::_do_op(Op *o)
{
dout(10) << "_do_op " << o << " " << o->op << " start" << dendl;
- op_apply_start(o->op, o->oncommit);
+ op_apply_start(o->op);
int r = do_transactions(o->tls, o->op);
op_apply_finish();
dout(10) << "_do_op " << o << " " << o->op << " r = " << r
__u64 op;
list<ObjectStore::Transaction*> tls;
Context *onreadable;
- Context *onjournal;
Context *ondisk;
C_JournaledAhead(FileStore *f, __u64 o, list<ObjectStore::Transaction*>& t,
- Context *onr, Context *onj, Context *ond) :
- fs(f), op(o), tls(t), onreadable(onr), onjournal(onj), ondisk(ond) { }
+ Context *onr, Context *ond) :
+ fs(f), op(o), tls(t), onreadable(onr), ondisk(ond) { }
void finish(int r) {
- fs->_journaled_ahead(op, tls, onreadable, onjournal, ondisk);
+ fs->_journaled_ahead(op, tls, onreadable, ondisk);
}
};
int FileStore::queue_transactions(list<Transaction*> &tls,
Context *onreadable,
- Context *onjournal,
Context *ondisk)
{
if (journal && journal->is_writeable()) {
__u64 op = op_journal_start(0);
dout(10) << "queue_transactions (parallel) " << op << " " << tls << dendl;
- journal_transactions(tls, op, onjournal);
+ journal_transactions(tls, op, ondisk);
// queue inside journal lock, to preserve ordering
- queue_op(op, tls, onreadable, ondisk);
+ queue_op(op, tls, onreadable);
op_journal_finish();
return 0;
else if (g_conf.filestore_journal_writeahead) {
__u64 op = op_journal_start(0);
dout(10) << "queue_transactions (writeahead) " << op << " " << tls << dendl;
- journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, onjournal, ondisk));
+ journal_transactions(tls, op, new C_JournaledAhead(this, op, tls, onreadable, ondisk));
op_journal_finish();
return 0;
}
}
- __u64 op_seq = op_apply_start(0, ondisk);
+ __u64 op_seq = op_apply_start(0);
dout(10) << "queue_transactions (trailing journal) " << op_seq << " " << tls << dendl;
int r = do_transactions(tls, op_seq);
op_apply_finish();
if (r >= 0) {
op_journal_start(op_seq);
- journal_transactions(tls, op_seq, onjournal);
+ journal_transactions(tls, op_seq, ondisk);
op_journal_finish();
} else {
- delete onjournal;
delete ondisk;
}
void FileStore::_journaled_ahead(__u64 op,
list<Transaction*> &tls,
Context *onreadable,
- Context *onjournal,
Context *ondisk)
{
dout(10) << "_journaled_ahead " << op << " " << tls << dendl;
// this should queue in order because the journal does it's completions in order.
- queue_op(op, tls, onreadable, ondisk);
- if (onjournal) {
- onjournal->finish(0);
- delete onjournal;
+ queue_op(op, tls, onreadable);
+ if (ondisk) {
+ ondisk->finish(0);
+ delete ondisk;
}
}
}
unsigned FileStore::apply_transaction(Transaction &t,
- Context *onjournal,
Context *ondisk)
{
list<Transaction*> tls;
tls.push_back(&t);
- return apply_transactions(tls, onjournal, ondisk);
+ return apply_transactions(tls, ondisk);
}
unsigned FileStore::apply_transactions(list<Transaction*> &tls,
- Context *onjournal,
Context *ondisk)
{
int r = 0;
C_SafeCond *onreadable = new C_SafeCond(&my_lock, &my_cond, &done, &r);
dout(10) << "apply queued" << dendl;
- queue_transactions(tls, onreadable, onjournal, ondisk);
+ queue_transactions(tls, onreadable, ondisk);
my_lock.Lock();
while (!done)
my_lock.Unlock();
dout(10) << "apply done r = " << r << dendl;
} else {
- __u64 op_seq = op_apply_start(0, ondisk);
+ __u64 op_seq = op_apply_start(0);
r = do_transactions(tls, op_seq);
op_apply_finish();
if (r >= 0) {
op_journal_start(op_seq);
- journal_transactions(tls, op_seq, onjournal);
+ journal_transactions(tls, op_seq, ondisk);
op_journal_finish();
} else {
- delete onjournal;
delete ondisk;
}
}
struct Op {
__u64 op;
list<Transaction*> tls;
- Context *onreadable, *oncommit;
+ Context *onreadable;
__u64 ops, bytes;
};
deque<Op*> op_queue;
void _do_op(Op *o);
void _finish_op(Op *o);
- void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable, Context *ondisk);
+ void queue_op(__u64 op, list<Transaction*>& tls, Context *onreadable);
void _journaled_ahead(__u64 op, list<Transaction*> &tls,
- Context *onreadable, Context *onjournal, Context *ondisk);
+ Context *onreadable, Context *ondisk);
friend class C_JournaledAhead;
// flusher thread
int statfs(struct statfs *buf);
int do_transactions(list<Transaction*> &tls, __u64 op_seq);
- unsigned apply_transaction(Transaction& t, Context *onjournal=0, Context *ondisk=0);
- unsigned apply_transactions(list<Transaction*>& tls, Context *onjournal=0, Context *ondisk=0);
+ unsigned apply_transaction(Transaction& t, Context *ondisk=0);
+ unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0);
int _transaction_start(__u64 bytes, __u64 ops);
void _transaction_finish(int id);
unsigned _do_transaction(Transaction& t);
int queue_transaction(Transaction* t);
- int queue_transactions(list<Transaction*>& tls, Context *onreadable,
- Context *onjournal=0, Context *ondisk=0);
+ int queue_transactions(list<Transaction*>& tls, Context *onreadable, Context *ondisk=0);
// ------------------
// objects
};
- virtual unsigned apply_transaction(Transaction& t, Context *onjournal=0, Context *ondisk=0) = 0;
- virtual unsigned apply_transactions(list<Transaction*>& tls, Context *onjournal=0, Context *ondisk=0) = 0;
+ virtual unsigned apply_transaction(Transaction& t, Context *ondisk=0) = 0;
+ virtual unsigned apply_transactions(list<Transaction*>& tls, Context *ondisk=0) = 0;
virtual int queue_transaction(Transaction* t) = 0;
- virtual int queue_transaction(Transaction *t, Context *onreadable, Context *onjournal=0, Context *ondisk=0) {
+ virtual int queue_transaction(Transaction *t, Context *onreadable, Context *ondisk=0) {
list<Transaction*> tls;
tls.push_back(t);
- return queue_transactions(tls, onreadable, onjournal, ondisk);
+ return queue_transactions(tls, onreadable, ondisk);
}
virtual int queue_transactions(list<Transaction*>& tls, Context *onreadable,
- Context *onjournal=0, Context *ondisk=0) = 0;
+ Context *ondisk=0) = 0;