fin.swap(sync_waiters);
lock.Unlock();
+ op_tp.pause();
if (apply_manager.commit_start()) {
utime_t start = ceph_clock_now(g_ceph_context);
uint64_t cp = apply_manager.get_committing_seq();
snaps.push_back(cp);
apply_manager.commit_started();
+ op_tp.unpause();
// wait for commit
dout(20) << " waiting for transid " << async_args.transid << " to complete" << dendl;
snaps.push_back(cp);
apply_manager.commit_started();
+ op_tp.unpause();
}
} else
{
apply_manager.commit_started();
+ op_tp.unpause();
if (btrfs) {
dout(15) << "sync_entry doing btrfs SYNC" << dendl;
sync_entry_timeo_lock.Lock();
timer.cancel_event(sync_entry_timeo);
sync_entry_timeo_lock.Unlock();
+ } else {
+ op_tp.unpause();
}
lock.Lock();
uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
Mutex::Locker l(apply_lock);
- // if we ops are blocked, or there are already people (left) in
- // line, get in line.
- if (op > max_applying_seq &&
- (blocked || !ops_apply_blocked.empty())) {
- Cond cond;
- ops_apply_blocked.push_back(&cond);
- dout(10) << "op_apply_start " << op << " blocked (getting in back of line)" << dendl;
- // sleep until we are not blocked AND we are at the front of line
- while (blocked || ops_apply_blocked.front() != &cond)
- cond.Wait(apply_lock);
- dout(10) << "op_apply_start " << op << " woke (at front of line)" << dendl;
- ops_apply_blocked.pop_front();
- if (!ops_apply_blocked.empty()) {
- dout(10) << "op_apply_start " << op << " ...and kicking next in line" << dendl;
- ops_apply_blocked.front()->Signal();
- }
- }
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1)
<< ", max_applying_seq " << max_applying_seq << " -> " << MAX(op, max_applying_seq) << dendl;
-
+ assert(!blocked);
if (op > max_applying_seq)
max_applying_seq = op;
assert(op > committed_seq);
<< ", max_applying_seq " << max_applying_seq
<< ", max_applied_seq " << max_applied_seq << " -> " << MAX(op, max_applied_seq)
<< dendl;
- if (--open_ops == 0)
- open_ops_cond.Signal();
+ --open_ops;
+ assert(open_ops >= 0);
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
Mutex::Locker l(apply_lock);
dout(10) << "commit_start max_applying_seq " << max_applying_seq
<< ", max_applied_seq " << max_applied_seq
+ << ", open_ops " << open_ops
<< dendl;
blocked = true;
- while (open_ops > 0) {
- dout(10) << "commit_start blocked, waiting for " << open_ops << " open ops, "
- << " max_applying_seq " << max_applying_seq << " max_applied_seq " << max_applied_seq << dendl;
- open_ops_cond.Wait(apply_lock);
- }
assert(open_ops == 0);
assert(max_applied_seq == max_applying_seq);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
if (max_applied_seq == committed_seq) {
dout(10) << "commit_start nothing to do" << dendl;
blocked = false;
- if (!ops_apply_blocked.empty())
- ops_apply_blocked.front()->Signal();
assert(commit_waiters.empty());
goto out;
}
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
- if (!ops_apply_blocked.empty())
- ops_apply_blocked.front()->Signal();
}
void JournalingObjectStore::ApplyManager::commit_finish()