]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: wait for journal safe for import/export
authorSage Weil <sage@newdream.net>
Tue, 10 Jun 2008 13:44:43 +0000 (06:44 -0700)
committerSage Weil <sage@newdream.net>
Tue, 10 Jun 2008 13:45:46 +0000 (06:45 -0700)
src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/MDS.cc
src/mds/Migrator.cc
src/mds/Server.cc
src/mds/journal.cc

index 8ce8c9740000f6aa9bd328cbcb496ae2ba2ebf49..106973efe8edced841c87472d153461d2296cea7 100644 (file)
@@ -31,8 +31,9 @@ class MDSlaveUpdate;
 
 class LogSegment {
  public:
-  off_t offset, end;
+  loff_t offset, end;
   int num_events;
+  loff_t trimmable_at;
 
   // dirty items
   xlist<CDir*>    dirty_dirfrags;
@@ -63,7 +64,7 @@ class LogSegment {
   C_Gather *try_to_expire(MDS *mds);
 
   // cons
-  LogSegment(off_t off) : offset(off), end(off), num_events(0),
+  LogSegment(loff_t off) : offset(off), end(off), num_events(0), trimmable_at(0),
                          allocv(0), sessionmapv(0), anchortablev(0) 
   { }
 };
index de5ff2958fb76b4790bb4b9b758b6e96ee04ea1a..bd985a5584f25aa7a593a1b26a4f059139f7fe97 100644 (file)
@@ -505,7 +505,7 @@ void MDCache::try_subtree_merge_at(CDir *dir)
       le->metablob.add_primary_dentry(in->get_parent_dn(), true, 0, pi);
       
       mds->mdlog->submit_entry(le);
-      mds->mdlog->wait_for_sync(new C_MDC_SubtreeMergeWB(this, in, 
+      mds->mdlog->wait_for_safe(new C_MDC_SubtreeMergeWB(this, in, 
                                                         mds->mdlog->get_current_segment()));
     }
   } 
index 13592961e7321536485353de3696aa81e1515952..feb728dc07b82ed6602a48b24a7bc1c440088991 100644 (file)
@@ -94,16 +94,21 @@ void MDLog::write_head(Context *c)
   journaler->write_head(c);
 }
 
-off_t MDLog::get_read_pos() 
+loff_t MDLog::get_read_pos() 
 {
   return journaler->get_read_pos(); 
 }
 
-off_t MDLog::get_write_pos() 
+loff_t MDLog::get_write_pos() 
 {
   return journaler->get_write_pos(); 
 }
 
+loff_t MDLog::get_safe_pos() 
+{
+  return journaler->get_write_safe_pos(); 
+}
+
 
 
 void MDLog::create(Context *c)
@@ -212,6 +217,17 @@ void MDLog::wait_for_sync( Context *c )
     delete c;
   }
 }
+void MDLog::wait_for_safe( Context *c )
+{
+  if (g_conf.mds_log) {
+    // wait
+    journaler->flush(0, c);
+  } else {
+    // hack: bypass.
+    c->finish(0);
+    delete c;
+  }
+}
 
 void MDLog::flush()
 {
index c958585b86a4825a2882311615b8aff9c1bf6709..b3580ec8d488b231f1401a3d93fd6965ea08a456 100644 (file)
@@ -153,8 +153,9 @@ public:
   size_t get_num_segments() { return segments.size(); }  
   void set_max_segments(int m) { max_segments = m; }
 
-  off_t get_read_pos();
-  off_t get_write_pos();
+  loff_t get_read_pos();
+  loff_t get_write_pos();
+  loff_t get_safe_pos();
   bool empty() { return segments.empty(); }
 
   bool is_capped() { return capped; }
@@ -162,6 +163,7 @@ public:
 
   void submit_entry( LogEvent *e, Context *c = 0 );
   void wait_for_sync( Context *c );
+  void wait_for_safe( Context *c );
   void flush();
   bool is_flushed() {
     return unflushed == 0;
index 5e53bec9cd1d9a5e2b60e9a309779fe3dada98b0..540e3c88b5fc81e955ae86c051dfecd61b4c3cfc 100644 (file)
@@ -1118,6 +1118,7 @@ void MDS::_dispatch(Message *m)
   if (is_active() || is_stopping()) {
     // flush log to disk after every op.  for now.
     //mdlog->flush();
+    mdlog->trim();
 
     // trim cache
     mdcache->trim();
index da66547bc23e2ee5738b325a1951bcff6ff39870..87314914fd0d48cc464c9bd64bd946d01af42a2a 100644 (file)
@@ -1128,8 +1128,8 @@ void Migrator::handle_export_ack(MExportDirAck *m)
   }
 
   // log export completion, then finish (unfreeze, trigger finish context, etc.)
-  mds->mdlog->submit_entry(le,
-                          new C_MDS_ExportFinishLogged(this, dir));
+  mds->mdlog->submit_entry(le);
+  mds->mdlog->wait_for_safe(new C_MDS_ExportFinishLogged(this, dir));
   
   delete m;
 }
@@ -1693,7 +1693,8 @@ void Migrator::handle_export_dir(MExportDir *m)
   dout(7) << "handle_export_dir did " << *dir << dendl;
 
   // log it
-  mds->mdlog->submit_entry(le, onlogged);
+  mds->mdlog->submit_entry(le);
+  mds->mdlog->wait_for_safe(onlogged);
 
   // note state
   import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
@@ -2302,7 +2303,8 @@ void Migrator::handle_export_caps(MExportCaps *ex)
   mds->server->prepare_force_open_sessions(ex->client_map);
   le->client_map.swap(ex->client_map);
   
-  mds->mdlog->submit_entry(le, finish);
+  mds->mdlog->submit_entry(le);
+  mds->mdlog->wait_for_safe(finish);
 
   delete ex;
 }
index ae6267c1f684cf43c80c2fe0497ed4f3e40c58ec..d030a3b4d058551b068e1f2aceaf87beb4a6ef7d 100644 (file)
@@ -4573,11 +4573,6 @@ void Server::handle_client_openc(MDRequest *mdr)
   // log + wait
   C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, mdr, dn, in);
   mdlog->submit_entry(le, fin);
-  
-  /*
-    FIXME. this needs to be rewritten when the write capability stuff starts
-    getting journaled.  
-  */
 }
 
 
index d75faec618c58e9e1c704d6ae97c63be1987730e..f5a992240d52d0cea1e90f41d104e3844c6f559b 100644 (file)
@@ -192,6 +192,27 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
   // FIXME client requests...?
   // audit handling of anchor transactions?
 
+  // once we are otherwise trimmable, make sure journal is fully safe on disk.
+  if (!gather) {
+    if (trimmable_at &&
+       trimmable_at <= mds->mdlog->get_safe_pos()) {
+      dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
+             << " <= " << mds->mdlog->get_safe_pos() << dendl;
+    } else {
+      if (trimmable_at == 0) {
+       trimmable_at = mds->mdlog->get_write_pos();
+       dout(6) << "LogSegment(" << offset << ").try_to_expire now trimmable at " << trimmable_at
+               << ", waiting for safe journal flush" << dendl;
+      } else {
+       dout(6) << "LogSegment(" << offset << ").try_to_expire trimmable at " << trimmable_at
+               << " > " << mds->mdlog->get_safe_pos()
+               << ", waiting for safe journal flush" << dendl;
+      }
+      if (!gather) gather = new C_Gather;
+      mds->mdlog->wait_for_safe(gather->new_sub());
+    }
+  }
+
   if (gather) {
     dout(6) << "LogSegment(" << offset << ").try_to_expire waiting" << dendl;
   } else {