// mark apply start _now_, because we need to drain the entire apply
// queue during commit in order to put the store in a consistent
// state.
- op_apply_start(o->op);
+ apply_manager.op_apply_start(o->op);
op_tp.lock();
osr->queue(o);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
int r = do_transactions(o->tls, o->op);
- op_apply_finish(o->op);
+ apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
if (m_filestore_do_dump)
dump_transactions(tls, op, osr);
- op_apply_start(op);
+ apply_manager.op_apply_start(op);
int r = do_transactions(tls, op);
if (r >= 0) {
op_finisher.queue(onreadable, r);
submit_manager.op_submit_finish(op);
- op_apply_finish(op);
+ apply_manager.op_apply_finish(op);
return r;
}
fin.swap(sync_waiters);
lock.Unlock();
- if (commit_start()) {
+ if (apply_manager.commit_start()) {
utime_t start = ceph_clock_now(g_ceph_context);
- uint64_t cp = committing_seq;
+ uint64_t cp = apply_manager.get_committing_seq();
sync_entry_timeo_lock.Lock();
SyncEntryTimeout *sync_entry_timeo =
snaps.push_back(cp);
- commit_started();
+ apply_manager.commit_started();
// wait for commit
dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl;
assert(r == 0);
snaps.push_back(cp);
- commit_started();
+ apply_manager.commit_started();
}
} else
{
- commit_started();
+ apply_manager.commit_started();
if (btrfs) {
dout(15) << "sync_entry doing btrfs SYNC" << dendl;
logger->finc(l_os_commit_lat, lat);
logger->finc(l_os_commit_len, dur);
- commit_finish();
+ apply_manager.commit_finish();
logger->set(l_os_committing, 0);
fs_op_seq = g_conf->journal_replay_from - 1;
}
- journal_lock.Lock();
uint64_t op_seq = fs_op_seq;
- committed_seq = fs_op_seq;
- committing_seq = fs_op_seq;
- applied_seq = fs_op_seq;
- journal_lock.Unlock();
+ apply_manager.init_seq(fs_op_seq);
if (!journal)
return 0;
return err;
}
- journal_lock.Lock();
-
replaying = true;
int count = 0;
tls.push_back(t);
}
- open_ops++;
- journal_lock.Unlock();
+ apply_manager.op_apply_start(seq);
int r = do_transactions(tls, seq);
- journal_lock.Lock();
- open_ops--;
- cond.Signal();
+ apply_manager.op_apply_finish(seq);
- op_seq = applied_seq = seq;
+ op_seq = seq;
while (!tls.empty()) {
delete tls.front();
}
dout(3) << "journal_replay: r = " << r << ", op_seq now " << op_seq << dendl;
- assert(op_seq == seq);
- seq++; // we expect the next op
}
replaying = false;
submit_manager.set_op_seq(op_seq);
- journal_lock.Unlock();
-
// done reading, make writeable.
journal->make_writeable();
// ------------------------------------
-uint64_t JournalingObjectStore::op_apply_start(uint64_t op)
-{
- Mutex::Locker l(journal_lock);
- return _op_apply_start(op);
-}
-
-uint64_t JournalingObjectStore::_op_apply_start(uint64_t op)
+uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
- assert(journal_lock.is_locked());
-
+ Mutex::Locker l(apply_lock);
// if we ops are blocked, or there are already people (left) in
// line, get in line.
if (blocked || !ops_apply_blocked.empty()) {
dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
// sleep until we are not blocked AND we are at the front of line
while (blocked || ops_apply_blocked.front() != &cond)
- cond.Wait(journal_lock);
+ cond.Wait(apply_lock);
dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
ops_apply_blocked.pop_front();
if (!ops_apply_blocked.empty()) {
}
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
assert(!blocked);
+
open_ops++;
return op;
}
-void JournalingObjectStore::op_apply_finish(uint64_t op)
+void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
{
- journal_lock.Lock();
- dout(10) << "op_apply_finish " << op << " open_ops " << open_ops << " -> " << (open_ops-1) << dendl;
+ Mutex::Locker l(apply_lock);
+ dout(10) << "op_apply_finish " << op << " open_ops " << open_ops
+ << " -> " << (open_ops-1) << dendl;
if (--open_ops == 0)
- cond.Signal();
+ open_ops_cond.Signal();
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
if (op > applied_seq)
applied_seq = op;
-
- journal_lock.Unlock();
}
uint64_t JournalingObjectStore::SubmitManager::op_submit_start()
// ------------------------------------------
-bool JournalingObjectStore::commit_start()
+void JournalingObjectStore::ApplyManager::add_waiter(uint64_t op, Context *c)
{
- bool ret = false;
-
- journal_lock.Lock();
- dout(10) << "commit_start op_seq " << submit_manager.get_op_seq()
- << ", applied_seq " << applied_seq
- << ", committed_seq " << committed_seq << dendl;
- blocked = true;
- while (open_ops > 0) {
- dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
- cond.Wait(journal_lock);
- }
- dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
- assert(open_ops == 0);
+ Mutex::Locker l(com_lock);
+ assert(c);
+ commit_waiters[op].push_back(c);
+}
- if (applied_seq == committed_seq) {
- dout(10) << "commit_start nothing to do" << dendl;
- blocked = false;
- if (!ops_apply_blocked.empty())
- ops_apply_blocked.front()->Signal();
- assert(commit_waiters.empty());
- goto out;
- }
+bool JournalingObjectStore::ApplyManager::commit_start()
+{
+ bool ret = false;
+ {
+ Mutex::Locker l(apply_lock);
+ dout(10) << "commit_start "
+ << ", applied_seq " << applied_seq << dendl;
+ blocked = true;
+ while (open_ops > 0) {
+ dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops" << dendl;
+ open_ops_cond.Wait(apply_lock);
+ }
+ assert(open_ops == 0);
+
+ dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
+ {
+ Mutex::Locker l(com_lock);
+ if (applied_seq == committed_seq) {
+ dout(10) << "commit_start nothing to do" << dendl;
+ blocked = false;
+ if (!ops_apply_blocked.empty())
+ ops_apply_blocked.front()->Signal();
+ assert(commit_waiters.empty());
+ goto out;
+ }
- com_lock.Lock();
- // we can _only_ read applied_seq here because open_ops == 0 (we've
- // quiesced all in-flight applies).
- committing_seq = applied_seq;
- com_lock.Unlock();
+ committing_seq = applied_seq;
- dout(10) << "commit_start committing " << committing_seq << ", still blocked" << dendl;
+ dout(10) << "commit_start committing " << committing_seq
+ << ", still blocked" << dendl;
+ }
+ }
ret = true;
out:
if (journal)
journal->commit_start(); // tell the journal too
- journal_lock.Unlock();
return ret;
}
-void JournalingObjectStore::commit_started()
+void JournalingObjectStore::ApplyManager::commit_started()
{
- Mutex::Locker l(journal_lock);
+ Mutex::Locker l(apply_lock);
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
ops_apply_blocked.front()->Signal();
}
-void JournalingObjectStore::commit_finish()
+void JournalingObjectStore::ApplyManager::commit_finish()
{
- Mutex::Locker l(journal_lock);
+ Mutex::Locker l(com_lock);
dout(10) << "commit_finish thru " << committing_seq << dendl;
if (journal)
journal->committed_thru(committing_seq);
- com_lock.Lock();
committed_seq = committing_seq;
- com_lock.Unlock();
map<version_t, vector<Context*> >::iterator p = commit_waiters.begin();
while (p != commit_waiters.end() &&
- p->first <= committing_seq) {
+ p->first <= committing_seq) {
finisher.queue(p->second);
commit_waiters.erase(p++);
}
::encode(*t, tbl);
}
journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
- } else if (onjournal)
- commit_waiters[op].push_back(onjournal);
+ } else if (onjournal) {
+ apply_manager.add_waiter(op, onjournal);
+ }
}
class JournalingObjectStore : public ObjectStore {
protected:
- uint64_t applied_seq;
- uint64_t committing_seq, committed_seq;
- map<version_t, vector<Context*> > commit_waiters;
+ Journal *journal;
+ Finisher finisher;
+
class SubmitManager {
Mutex lock;
void op_submit_finish(uint64_t op);
void set_op_seq(uint64_t seq) {
Mutex::Locker l(lock);
- seq = op_seq;
+ op_seq = seq;
}
uint64_t get_op_seq() {
return op_seq;
}
} submit_manager;
- int open_ops;
- bool blocked;
+ class ApplyManager {
+ Journal *&journal;
+ Finisher &finisher;
- Journal *journal;
- Finisher finisher;
+ Mutex apply_lock;
+ bool blocked;
+ Cond blocked_cond;
+ int open_ops;
+ Cond open_ops_cond;
+ uint64_t applied_seq;
- Cond cond;
- Mutex journal_lock;
- Mutex com_lock;
+ Mutex com_lock;
+ map<version_t, vector<Context*> > commit_waiters;
+ uint64_t committing_seq, committed_seq;
+ list<uint64_t> ops_submitting;
+ list<Cond*> ops_apply_blocked;
- list<uint64_t> ops_submitting;
- list<Cond*> ops_apply_blocked;
+ public:
+ ApplyManager(Journal *&j, Finisher &f) :
+ journal(j), finisher(f),
+ apply_lock("JOS::ApplyManager::apply_lock"),
+ blocked(false),
+ open_ops(0),
+ applied_seq(0),
+ com_lock("JOS::ApplyManager::com_lock"),
+ committing_seq(0), committed_seq(0) {}
+ void add_waiter(uint64_t, Context*);
+ uint64_t op_apply_start(uint64_t op);
+ void op_apply_finish(uint64_t op);
+ bool commit_start();
+ void commit_started();
+ void commit_finish();
+ bool is_committing() {
+ Mutex::Locker l(com_lock);
+ return committing_seq != committed_seq;
+ }
+ uint64_t get_committed_seq() {
+ Mutex::Locker l(com_lock);
+ return committed_seq;
+ }
+ uint64_t get_committing_seq() {
+ Mutex::Locker l(com_lock);
+ return committing_seq;
+ }
+ void init_seq(uint64_t fs_op_seq) {
+ {
+ Mutex::Locker l(com_lock);
+ committed_seq = fs_op_seq;
+ committing_seq = fs_op_seq;
+ }
+ {
+ Mutex::Locker l(apply_lock);
+ applied_seq = fs_op_seq;
+ }
+ }
+ } apply_manager;
bool replaying;
void journal_stop();
int journal_replay(uint64_t fs_op_seq);
- // --
- uint64_t op_submit_start();
- void op_submit_finish(uint64_t op_seq);
-
- uint64_t op_apply_start(uint64_t op);
- uint64_t _op_apply_start(uint64_t op);
- void op_apply_finish(uint64_t op);
-
void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal, TrackedOpRef osd_op);
virtual int do_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op_seq) = 0;
- bool commit_start();
- void commit_started(); // allow new ops (underlying fs should now be committing all prior ops)
- void commit_finish();
-
public:
bool is_committing() {
- Mutex::Locker l(com_lock);
- return committing_seq != committed_seq;
+ return apply_manager.is_committing();
}
uint64_t get_committed_seq() {
- Mutex::Locker l(com_lock);
- return committed_seq;
+ return apply_manager.get_committed_seq();
}
public:
- JournalingObjectStore() : applied_seq(0), committing_seq(0), committed_seq(0),
- open_ops(0), blocked(false),
- journal(NULL), finisher(g_ceph_context),
- journal_lock("JournalingObjectStore::journal_lock"),
- com_lock("JournalingObjectStore::com_lock"),
+ JournalingObjectStore() : journal(NULL), finisher(g_ceph_context),
+ apply_manager(journal, finisher),
replaying(false) {}
};