]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: flush log only when we need to
authorSage Weil <sage@newdream.net>
Wed, 24 Dec 2008 20:11:50 +0000 (12:11 -0800)
committerSage Weil <sage@newdream.net>
Wed, 24 Dec 2008 20:11:50 +0000 (12:11 -0800)
In particular, don't flush immediately on cap flush unless the lock is
unstable.  If we later wait on the lock state, make sure the log flushes.

src/mds/Locker.cc
src/mds/MDCache.cc
src/mds/MDLog.cc
src/mds/MDS.cc
src/mds/MDSTableServer.cc
src/mds/Migrator.cc
src/mds/journal.cc
src/osdc/Journaler.cc
src/osdc/Journaler.h

index 2bc0ed94799dd333974a08710ab940e1fbc352ff..c2e70dabc98f946eea1631914c33d14b4fd26f57 100644 (file)
@@ -1257,8 +1257,12 @@ void Locker::_do_cap_update(CInode *in, int had, int all_wanted, snapid_t follow
     mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
     mdcache->journal_dirty_inode(mut, &le->metablob, in, follows);
 
-    mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut, change_max, 
-                                                               client, ack, releasecap));
+    mds->mdlog->submit_entry(le);
+    mds->mdlog->wait_for_sync(new C_Locker_FileUpdate_finish(this, in, mut, change_max, 
+                                                            client, ack, releasecap));
+    // only flush immediately if the lock is unstable
+    if (!in->filelock.is_stable())
+      mds->mdlog->flush();
   } else {
     // no update, ack now.
     if (releasecap) {
@@ -2364,6 +2368,7 @@ void Locker::scatter_writebehind(ScatterLock *lock)
   
   mds->mdlog->submit_entry(le);
   mds->mdlog->wait_for_sync(new C_Locker_ScatterWB(this, lock, mut));
+  mds->mdlog->flush();
 }
 
 void Locker::scatter_writebehind_finish(ScatterLock *lock, Mutation *mut)
@@ -3019,7 +3024,11 @@ bool Locker::file_rdlock_start(FileLock *lock, MDRequest *mut)
   // wait
   dout(7) << "file_rdlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
   lock->add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mut));
-        
+  
+  // make sure we aren't waiting on a cap flush
+  if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+    mds->mdlog->flush();
+
   return false;
 }
 
@@ -3111,6 +3120,11 @@ bool Locker::file_wrlock_start(FileLock *lock, MDRequest *mut)
 
   dout(7) << "file_wrlock_start waiting on " << *lock << " on " << *lock->get_parent() << dendl;
   lock->add_waiter(SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
+
+  // make sure we aren't waiting on a cap flush
+  if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+    mds->mdlog->flush();
+
   return false;
 }
 
@@ -3167,11 +3181,16 @@ bool Locker::file_xlock_start(FileLock *lock, MDRequest *mut)
     mut->locks.insert(lock);
     mut->xlocks.insert(lock);
     return true;
-  } else {
-    dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl;
-    lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut));
-    return false;
   }
+
+  dout(7) << "file_xlock_start on auth, waiting for write on " << *lock << " on " << *lock->get_parent() << dendl;
+  lock->add_waiter(SimpleLock::WAIT_WR, new C_MDS_RetryRequest(mdcache, mut));
+
+  // make sure we aren't waiting on a cap flush
+  if (lock->get_parent()->is_auth() && lock->is_wrlocked())
+    mds->mdlog->flush();
+
+  return false;
 }
 
 
index 8e23f347effc12442546b7d3333d6c7f68b14e1a..bba97882c8ca41208f887211b0a9b8a1c5c9236f 100644 (file)
@@ -509,6 +509,7 @@ void MDCache::try_subtree_merge_at(CDir *dir)
       
       mds->mdlog->submit_entry(le);
       mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in, mut));
+      mds->mdlog->flush();
     }
   } 
 
@@ -2121,6 +2122,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack)
        uncommitted_slave_updates.erase(from);
 
       mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p));
+      mds->mdlog->flush();
     } else {
       MDRequest *mdr = request_get(*p);
       assert(mdr->slave_request == 0);  // shouldn't be doing anything!
index 8b8445d7fae0d7b2cf8784c3312a52369ed8a151..07951caa69239a1a75e97d18489c441811344e47 100644 (file)
@@ -213,7 +213,7 @@ void MDLog::wait_for_sync( Context *c )
 {
   if (g_conf.mds_log) {
     // wait
-    journaler->flush(c);
+    journaler->wait_for_flush(c, 0);
   } else {
     // hack: bypass.
     c->finish(0);
@@ -224,7 +224,7 @@ void MDLog::wait_for_safe( Context *c )
 {
   if (g_conf.mds_log) {
     // wait
-    journaler->flush(0, c);
+    journaler->wait_for_flush(0, c);
   } else {
     // hack: bypass.
     c->finish(0);
@@ -263,8 +263,10 @@ void MDLog::start_new_segment(Context *onsync)
 
   ESubtreeMap *le = mds->mdcache->create_subtree_map();
   submit_entry(le, new C_MDL_WroteSubtreeMap(this, mds->mdlog->get_write_pos()));
-  if (onsync)
+  if (onsync) {
     wait_for_sync(onsync);  
+    flush();
+  }
 
   logger->inc("segadd");
   logger->set("seg", segments.size());
index c205ef108370eea5b637fb6495a31f442f270d82..21abdb0d8177cf8d8aea2cd5fd7bbd158f9fdca4 100644 (file)
@@ -388,6 +388,9 @@ void MDS::tick()
   if (laggy)
     return;
 
+  // make sure mds log flushes periodically
+  mdlog->flush();
+
   // log
   mds_load_t load = balancer->get_load();
   
@@ -1271,9 +1274,8 @@ bool MDS::_dispatch(Message *m)
 
   // HACK FOR NOW
   if (is_active() || is_stopping()) {
-    // flush log to disk after every op.  for now.
-    //mdlog->flush();
-    mdlog->trim();
+    if (is_stopping())
+      mdlog->trim();
 
     // trim cache
     mdcache->trim();
index c1d65fc0f83df2277d3f291f960506f08ea05d66..c1976264a23df02dfae924caa4071be4b4c1e8b3 100644 (file)
@@ -78,6 +78,7 @@ void MDSTableServer::handle_commit(MMDSTableRequest *req)
     pending_for_mds.erase(tid);
     mds->mdlog->submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, -1, tid, version));
     mds->mdlog->wait_for_sync(new C_Commit(this, req));
+    mds->mdlog->flush();
   }
   else if (tid <= version) {
     dout(0) << "got commit for tid " << tid << " <= " << version 
index 980ed33f8f9b5a33574ce93cadc57740a39dcb58..14c74418d86603dc6969f3797d348d69dc5b75c7 100644 (file)
@@ -802,6 +802,7 @@ void Migrator::export_go(CDir *dir)
 
   // first sync log to flush out e.g. any cap imports
   mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir));
+  mds->mdlog->flush();
 }
 
 void Migrator::export_go_synced(CDir *dir)
@@ -1137,6 +1138,7 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   // log export completion, then finish (unfreeze, trigger finish context, etc.)
   mds->mdlog->submit_entry(le);
   mds->mdlog->wait_for_safe(new C_MDS_ExportFinishLogged(this, dir));
+  mds->mdlog->flush();
   
   delete m;
 }
@@ -1691,6 +1693,7 @@ void Migrator::handle_export_dir(MExportDir *m)
   // log it
   mds->mdlog->submit_entry(le);
   mds->mdlog->wait_for_safe(onlogged);
+  mds->mdlog->flush();
 
   // note state
   import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
@@ -2300,6 +2303,7 @@ void Migrator::handle_export_caps(MExportCaps *ex)
   
   mds->mdlog->submit_entry(le);
   mds->mdlog->wait_for_safe(finish);
+  mds->mdlog->flush();
 
   delete ex;
 }
index 7bd1745ac882bdbf87e9a304d6f04ff8c55e5d82..dddcd59a624a728e02b77fe97fb4bd5eb3a104fa 100644 (file)
@@ -245,6 +245,7 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
              << ", waiting for safe journal flush" << dendl;
       if (!gather) gather = new C_Gather;
       mds->mdlog->wait_for_safe(gather->new_sub());
+      mds->mdlog->flush();
     }
   }
 
index 378a701770d2c6c1bf6307f6ff72e822dca4336a..c3306a6c6eac70ca55fcd0faf47c66fedfc76721 100644 (file)
@@ -360,9 +360,9 @@ void Journaler::_do_flush()
   dout(10) << "_do_flush write pointers now at " << write_pos << "/" << flush_pos << "/" << ack_pos << dendl;
 }
 
-  
 
-void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
+
+void Journaler::wait_for_flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
 {
   // all flushed and acked?
   if (write_pos == ack_pos) {
@@ -386,6 +386,21 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
     return;
   }
 
+  // queue waiter
+  if (onsync) 
+    waitfor_ack[write_pos].push_back(onsync);
+  if (onsafe) 
+    waitfor_safe[write_pos].push_back(onsafe);
+  if (add_ack_barrier)
+    ack_barrier.insert(write_pos);
+}  
+
+void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
+{
+  wait_for_flush(onsync, onsafe, add_ack_barrier);
+  if (write_pos == ack_pos)
+    return;
+
   if (write_pos == flush_pos) {
     assert(write_buf.length() == 0);
     dout(10) << "flush nothing to flush, write pointers at "
@@ -409,14 +424,6 @@ void Journaler::flush(Context *onsync, Context *onsafe, bool add_ack_barrier)
     }
   }
 
-  // queue waiter (at _new_ write_pos; will go when reached by ack_pos)
-  if (onsync) 
-    waitfor_ack[write_pos].push_back(onsync);
-  if (onsafe) 
-    waitfor_safe[write_pos].push_back(onsafe);
-  if (add_ack_barrier)
-    ack_barrier.insert(write_pos);
-
   // write head?
   if (last_wrote_head.sec() + g_conf.journaler_write_head_interval < g_clock.now().sec()) {
     write_head();
index 12e3c794d602c3be74768921069df62de208a175..ea017f6a8ebf0af503b29d73db4be6dbbf94dc28 100644 (file)
@@ -233,6 +233,7 @@ public:
 
   // write
   __s64 append_entry(bufferlist& bl, Context *onsync = 0);
+  void wait_for_flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
   void flush(Context *onsync = 0, Context *onsafe = 0, bool add_ack_barrier=false);
 
   // read