mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
mdcache->journal_dirty_inode(mut, &le->metablob, in, follows);
- mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max,
- client, ack, releasecap));
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_sync(new C_Locker_FileUpdate_finish(this, in, mut, change_max,
+ client, ack, releasecap));
+ // only flush immediately if the lock is unstable
+ if (!in->filelock.is_stable())
+ mds->mdlog->flush();
} else {
// no update, ack now.
if (releasecap) {
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_sync(new C_Locker_ScatterWB(this, lock, mut));
+ mds->mdlog->flush();
}
void Locker::scatter_writebehind_finish(ScatterLock *lock, Mutation *mut)
// wait
dout(7) << "file_rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
lock->add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mut));
-
+
+ // make sure we aren't waiting on a cap flush
+ if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+ mds->mdlog->flush();
+
return false;
}
dout(7) << "file_wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+
+ // make sure we aren't waiting on a cap flush
+ if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+ mds->mdlog->flush();
+
return false;
}
mut->locks.insert(lock);
mut->xlocks.insert(lock);
return true;
- } else {
- dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl;
- lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut));
- return false;
}
+
+ dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl;
+ lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut));
+
+ // make sure we aren't waiting on a cap flush
+ if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+ mds->mdlog->flush();
+
+ return false;
}
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in, mut));
+ mds->mdlog->flush();
}
}
uncommitted_slave_updates.erase(from);
mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
+ mds->mdlog->flush();
} else {
MDRequest *mdr = request_get(*p);
assert(mdr->slave_request == 0); // shouldn't be doing anything!
{
if (g_conf.mds_log) {
// wait
- journaler->flush(c);
+ journaler->wait_for_flush(c, 0);
} else {
// hack: bypass.
c->finish(0);
{
if (g_conf.mds_log) {
// wait
- journaler->flush(0, c);
+ journaler->wait_for_flush(0, c);
} else {
// hack: bypass.
c->finish(0);
ESubtreeMap *le = mds->mdcache->create_subtree_map();
submit_entry(le, new C_MDL_WroteSubtreeMap(this, mds->mdlog->get_write_pos()));
- if (onsync)
+ if (onsync) {
wait_for_sync(onsync);
+ flush();
+ }
logger->inc("segadd");
logger->set("seg", segments.size());
if (laggy)
return;
+ // make sure mds log flushes periodically
+ mdlog->flush();
+
// log
mds_load_t load = balancer->get_load();
// HACK FOR NOW
if (is_active() || is_stopping()) {
- // flush log to disk after every op. for now.
- //mdlog->flush();
- mdlog->trim();
+ if (is_stopping())
+ mdlog->trim();
// trim cache
mdcache->trim();
pending_for_mds.erase(tid);
mds->mdlog->submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, -1, tid, version));
mds->mdlog->wait_for_sync(new C_Commit(this, req));
+ mds->mdlog->flush();
}
else if (tid <= version) {
dout(0) << "got commit for tid " << tid << " <= " << version
// first sync log to flush out e.g. any cap imports
mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir));
+ mds->mdlog->flush();
}
void Migrator::export_go_synced(CDir *dir)
// log export completion, then finish (unfreeze, trigger finish context, etc.)
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_safe(new C_MDS_ExportFinishLogged(this, dir));
+ mds->mdlog->flush();
delete m;
}
// log it
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_safe(onlogged);
+ mds->mdlog->flush();
// note state
import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
mds->mdlog->submit_entry(le);
mds->mdlog->wait_for_safe(finish);
+ mds->mdlog->flush();
delete ex;
}
<< ", waiting for safe journal flush" << dendl;
if (!gather) gather = new C_Gather;
mds->mdlog->wait_for_safe(gather->new_sub());
+ mds->mdlog->flush();
}
}
dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
}
-
-void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
+
+void Journaler::wait_for_flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
{
// all flushed and acked?
if (write_pos == ack_pos) {
return;
}
+ // queue waiter
+ if (onsync)
+ waitfor_ack[write_pos].push_back(onsync);
+ if (onsafe)
+ waitfor_safe[write_pos].push_back(onsafe);
+ if (add_ack_barrier)
+ ack_barrier.insert(write_pos);
+}
+
+void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
+{
+ wait_for_flush(onsync, onsafe, add_ack_barrier);
+ if (write_pos == ack_pos)
+ return;
+
if (write_pos == flush_pos) {
assert(write_buf.length() == 0);
dout(10) << "flush nothing to flush, write pointers at "
}
}
- // queue waiter (at _new_ write_pos; will go when reached by ack_pos)
- if (onsync)
- waitfor_ack[write_pos].push_back(onsync);
- if (onsafe)
- waitfor_safe[write_pos].push_back(onsafe);
- if (add_ack_barrier)
- ack_barrier.insert(write_pos);
-
// write head?
if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
write_head();
// write
__s64 append_entry(bufferlist& bl, Context *onsync = 0);
+ void wait_for_flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
// read