From 3ed25a3b31cd8fd0fcabccc636b2d8414bfbabc8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 24 Dec 2008 12:11:50 -0800 Subject: [PATCH] mds: flush log only when we need to In particular, don't flush immediately on cap flush unless the lock is unstable. If we later wait on the lock state, make sure the log flushes. --- src/mds/Locker.cc | 33 ++++++++++++++++++++++++++------- src/mds/MDCache.cc | 2 ++ src/mds/MDLog.cc | 8 +++++--- src/mds/MDS.cc | 8 +++++--- src/mds/MDSTableServer.cc | 1 + src/mds/Migrator.cc | 4 ++++ src/mds/journal.cc | 1 + src/osdc/Journaler.cc | 27 +++++++++++++++++---------- src/osdc/Journaler.h | 1 + 9 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 2bc0ed94799..c2e70dabc98 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -1257,8 +1257,12 @@ void Locker::_do_cap_update(CInode *in, int had, int all_wanted, snapid_t follow mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows); mdcache->journal_dirty_inode(mut, &le->metablob, in, follows); - mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max, - client, ack, releasecap)); + mds->mdlog->submit_entry(le); + mds->mdlog->wait_for_sync(new C_Locker_FileUpdate_finish(this, in, mut, change_max, + client, ack, releasecap)); + // only flush immediately if the lock is unstable + if (!in->filelock.is_stable()) + mds->mdlog->flush(); } else { // no update, ack now. if (releasecap) { @@ -2364,6 +2368,7 @@ void Locker::scatter_writebehind(ScatterLock *lock) mds->mdlog->submit_entry(le); mds->mdlog->wait_for_sync(new C_Locker_ScatterWB(this, lock, mut)); + mds->mdlog->flush(); } void Locker::scatter_writebehind_finish(ScatterLock *lock, Mutation *mut) @@ -3019,7 +3024,11 @@ bool Locker::file_rdlock_start(FileLock *lock, MDRequest *mut) // wait dout(7) << "file_rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mut)); - + + // make sure we aren't waiting on a cap flush + if (lock->get_parent()->is_auth() && lock->is_wrlocked()) + mds->mdlog->flush(); + return false; } @@ -3111,6 +3120,11 @@ bool Locker::file_wrlock_start(FileLock *lock, MDRequest *mut) dout(7) << "file_wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl; lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut)); + + // make sure we aren't waiting on a cap flush + if (lock->get_parent()->is_auth() && lock->is_wrlocked()) + mds->mdlog->flush(); + return false; } @@ -3167,11 +3181,16 @@ bool Locker::file_xlock_start(FileLock *lock, MDRequest *mut) mut->locks.insert(lock); mut->xlocks.insert(lock); return true; - } else { - dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl; - lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut)); - return false; } + + dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl; + lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut)); + + // make sure we aren't waiting on a cap flush + if (lock->get_parent()->is_auth() && lock->is_wrlocked()) + mds->mdlog->flush(); + + return false; } diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 8e23f347eff..bba97882c8c 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -509,6 +509,7 @@ void MDCache::try_subtree_merge_at(CDir *dir) mds->mdlog->submit_entry(le); mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in, mut)); + mds->mdlog->flush(); } } @@ -2121,6 +2122,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) uncommitted_slave_updates.erase(from); mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p)); + mds->mdlog->flush(); } else { MDRequest *mdr = request_get(*p); assert(mdr->slave_request == 0); // shouldn't be doing anything! diff --git a/src/mds/MDLog.cc b/src/mds/MDLog.cc index 8b8445d7fae..07951caa692 100644 --- a/src/mds/MDLog.cc +++ b/src/mds/MDLog.cc @@ -213,7 +213,7 @@ void MDLog::wait_for_sync( Context *c ) { if (g_conf.mds_log) { // wait - journaler->flush(c); + journaler->wait_for_flush(c, 0); } else { // hack: bypass. c->finish(0); @@ -224,7 +224,7 @@ void MDLog::wait_for_safe( Context *c ) { if (g_conf.mds_log) { // wait - journaler->flush(0, c); + journaler->wait_for_flush(0, c); } else { // hack: bypass. c->finish(0); @@ -263,8 +263,10 @@ void MDLog::start_new_segment(Context *onsync) ESubtreeMap *le = mds->mdcache->create_subtree_map(); submit_entry(le, new C_MDL_WroteSubtreeMap(this, mds->mdlog->get_write_pos())); - if (onsync) + if (onsync) { wait_for_sync(onsync); + flush(); + } logger->inc("segadd"); logger->set("seg", segments.size()); diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index c205ef10837..21abdb0d817 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -388,6 +388,9 @@ void MDS::tick() if (laggy) return; + // make sure mds log flushes periodically + mdlog->flush(); + // log mds_load_t load = balancer->get_load(); @@ -1271,9 +1274,8 @@ bool MDS::_dispatch(Message *m) // HACK FOR NOW if (is_active() || is_stopping()) { - // flush log to disk after every op. for now. - //mdlog->flush(); - mdlog->trim(); + if (is_stopping()) + mdlog->trim(); // trim cache mdcache->trim(); diff --git a/src/mds/MDSTableServer.cc b/src/mds/MDSTableServer.cc index c1d65fc0f83..c1976264a23 100644 --- a/src/mds/MDSTableServer.cc +++ b/src/mds/MDSTableServer.cc @@ -78,6 +78,7 @@ void MDSTableServer::handle_commit(MMDSTableRequest *req) pending_for_mds.erase(tid); mds->mdlog->submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, -1, tid, version)); mds->mdlog->wait_for_sync(new C_Commit(this, req)); + mds->mdlog->flush(); } else if (tid <= version) { dout(0) << "got commit for tid " << tid << " <= " << version diff --git a/src/mds/Migrator.cc b/src/mds/Migrator.cc index 980ed33f8f9..14c74418d86 100644 --- a/src/mds/Migrator.cc +++ b/src/mds/Migrator.cc @@ -802,6 +802,7 @@ void Migrator::export_go(CDir *dir) // first sync log to flush out e.g. any cap imports mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir)); + mds->mdlog->flush(); } void Migrator::export_go_synced(CDir *dir) @@ -1137,6 +1138,7 @@ void Migrator::handle_export_ack(MExportDirAck *m) // log export completion, then finish (unfreeze, trigger finish context, etc.) mds->mdlog->submit_entry(le); mds->mdlog->wait_for_safe(new C_MDS_ExportFinishLogged(this, dir)); + mds->mdlog->flush(); delete m; } @@ -1691,6 +1693,7 @@ void Migrator::handle_export_dir(MExportDir *m) // log it mds->mdlog->submit_entry(le); mds->mdlog->wait_for_safe(onlogged); + mds->mdlog->flush(); // note state import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART; @@ -2300,6 +2303,7 @@ void Migrator::handle_export_caps(MExportCaps *ex) mds->mdlog->submit_entry(le); mds->mdlog->wait_for_safe(finish); + mds->mdlog->flush(); delete ex; } diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 7bd1745ac88..dddcd59a624 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -245,6 +245,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) << ", waiting for safe journal flush" << dendl; if (!gather) gather = new C_Gather; mds->mdlog->wait_for_safe(gather->new_sub()); + mds->mdlog->flush(); } } diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index 378a701770d..c3306a6c6ea 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -360,9 +360,9 @@ void Journaler::_do_flush() dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl; } - -void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) + +void Journaler::wait_for_flush(Context *onsync, Context *onsafe, bool add_ack_barrier) { // all flushed and acked? if (write_pos == ack_pos) { @@ -386,6 +386,21 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) return; } + // queue waiter + if (onsync) + waitfor_ack[write_pos].push_back(onsync); + if (onsafe) + waitfor_safe[write_pos].push_back(onsafe); + if (add_ack_barrier) + ack_barrier.insert(write_pos); +} + +void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) +{ + wait_for_flush(onsync, onsafe, add_ack_barrier); + if (write_pos == ack_pos) + return; + if (write_pos == flush_pos) { assert(write_buf.length() == 0); dout(10) << "flush nothing to flush, write pointers at " @@ -409,14 +424,6 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier) } } - // queue waiter (at _new_ write_pos; will go when reached by ack_pos) - if (onsync) - waitfor_ack[write_pos].push_back(onsync); - if (onsafe) - waitfor_safe[write_pos].push_back(onsafe); - if (add_ack_barrier) - ack_barrier.insert(write_pos); - // write head? if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) { write_head(); diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 12e3c794d60..ea017f6a8eb 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -233,6 +233,7 @@ public: // write __s64 append_entry(bufferlist& bl, Context *onsync = 0); + void wait_for_flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false); void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false); // read -- 2.47.3