]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: simplify+fix open file journaling, replay
authorSage Weil <sage@newdream.net>
Thu, 31 Jan 2008 19:18:18 +0000 (11:18 -0800)
committerSage Weil <sage@newdream.net>
Thu, 31 Jan 2008 19:18:18 +0000 (11:18 -0800)
src/TODO
src/mds/Locker.cc
src/mds/MDCache.cc
src/mds/Server.cc
src/mds/Server.h
src/mds/events/EOpen.h
src/mds/journal.cc
src/msg/SimpleMessenger.cc

index a61782984259e8dd9de19dfea247ce82596f962e..b8513b55aa6c6eee468459bdd3336bcc9ee98b62 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -25,6 +25,7 @@ kernel client
  - echo blah >> mnt/blah "appends" from offset zero, regardless of file size
  - cap handler probably needs i_mutex or something when updating file sizes
    - which means it probably needs to be done in a different worker thread (NOT the messenger's)
+ - prevent client_reply from racing against a subsequent file_caps.
 - msgr layer
   - callbacks for 'remote reset'
   - idle state, to keep connect_seq around after a disconnect
@@ -36,7 +37,6 @@ kernel client
  - getattr should do an lstat?
  - d_revalidate?
  - test truncate
-- is ino_t really still 32 bits on i386??  hrm!
 - fix file open vs file_cap race
   - preemptively release caps as part of request if doing utimes/etc. on an open file?
 - mds client
@@ -80,7 +80,6 @@ mon
 
 
 mds mustfix
-- journal cap mtime/size updates
 - rename slave in-memory rollback on failure
 - proper handling of cache expire messages during rejoin phase?
   -> i think cache expires are fine; the rejoin_ack handler just has to behave if rejoining items go missing
@@ -91,6 +90,8 @@ mds mustfix
 - EOpen vs other journal events... update ordering problem?
 
 mds
+- fix file_data_version
+
 - client cap timeouts
 /  - stale -> resume
   - tolerate connection break
@@ -113,9 +114,6 @@ mds
 - inode.rmtime (recursive mtime)
 - make inode.size reflect directory size (number of entries)
 
-- inode.max_size
-- inode.allocated_size
 - osd needs a set_floor_and_read op for safe failover/STOGITH-like semantics.
 
 - could mark dir complete in EMetaBlob by counting how many dentries are dirtied in the current log epoch in CDir...
index c6d8514fc99dbb405c89fc8810a99a58f64b14e5..7fb7f6bc4e65f6d3d901f7d77665ab4baffcf822 100644 (file)
@@ -734,8 +734,9 @@ void Locker::handle_inode_file_caps(MInodeFileCaps *m)
 
 
 
-
-
+/*
+ * helper: journal size, max_size, mtime, atime updates, as needed.
+ */
 void Locker::maybe_journal_inode_update(CInode *in, bool had_or_has_wr, 
                                        int64_t size, utime_t mtime, utime_t atime)
 {
index ee07fefd47b2f8d3bbff41683f0269735eac12b3..dd6c894d20a0db9e07e87446c1f37685dac4d93f 100644 (file)
@@ -3550,9 +3550,6 @@ bool MDCache::shutdown_pass()
     return true;
   }
 
-  // flush batching eopens, so that we can properly expire them.
-  mds->server->journal_opens();    // hrm, this is sort of a hack.
-
   // flush what we can from the log
   mds->mdlog->set_max_events(0);
   mds->mdlog->trim();
index b957f3b776b1e210c5ca16fd3d8c7abacbe92d87..ab9fe550daec0d1ddc6320ab3fd50a5908d6c7f3 100644 (file)
@@ -2515,7 +2515,6 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn)
     le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
 
   // log + wait
-  journal_opens();  // journal pending opens, just in case
   mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, 
                                                        dirpv));
 }
@@ -2632,8 +2631,6 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn)
   // finisher
   C_MDS_unlink_remote_finish *fin = new C_MDS_unlink_remote_finish(mds, mdr, dn, dirpv);
   
-  journal_opens();  // journal pending opens, just in case
-  
   // mark committing (needed for proper recovery)
   mdr->committing = true;
 
@@ -3034,8 +3031,6 @@ void Server::handle_client_rename(MDRequest *mdr)
   // -- commit locally --
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
 
-  journal_opens();  // journal pending opens, just in case
-
   // mark committing (needed for proper recovery)
   mdr->committing = true;
   
@@ -3891,69 +3886,16 @@ void Server::_do_open(MDRequest *mdr, CInode *cur)
   //reply->set_file_data_version(fdv);
   reply_request(mdr, reply, cur);
 
-  // journal?
-  if (cur->last_open_journaled == 0) {
-    queue_journal_open(cur);
-    maybe_journal_opens();
+  // make sure this inode gets into the journal
+  if (cur->xlist_open_file.get_xlist() == 0) {
+    LogSegment *ls = mds->mdlog->get_current_segment();
+    EOpen *le = new EOpen(mds->mdlog);
+    le->add_inode(cur);
+    ls->open_files.push_back(&cur->xlist_open_file);
+    mds->mdlog->submit_entry(le);
   }
-
 }
 
-void Server::queue_journal_open(CInode *in)
-{
-  dout(10) << "queue_journal_open on " << *in << dendl;
-
-  if (journal_open_queue.count(in) == 0) {
-    // pin so our pointer stays valid
-    in->get(CInode::PIN_BATCHOPENJOURNAL);
-    
-    // queue it up for a bit
-    journal_open_queue.insert(in);
-  }
-}
-
-
-void Server::journal_opens()
-{
-  dout(10) << "journal_opens " << journal_open_queue.size() << " inodes" << dendl;
-  if (journal_open_queue.empty()) return;
-
-  EOpen *le = 0;
-
-  // check queued inodes
-  LogSegment *ls = mdlog->get_current_segment();
-  for (set<CInode*>::iterator p = journal_open_queue.begin();
-       p != journal_open_queue.end();
-       ++p) {
-    CInode *in = *p;
-    in->put(CInode::PIN_BATCHOPENJOURNAL);
-    if (in->is_any_caps()) {
-      if (!le) le = new EOpen(mdlog);
-      le->add_inode(in);
-      in->last_open_journaled = mds->mdlog->get_write_pos();
-      ls->open_files.push_back(&in->xlist_open_file);
-    }
-  }
-  journal_open_queue.clear();
-  
-  if (le) {
-    // journal
-    mdlog->submit_entry(le);
-  
-    // add waiters to journal entry
-    for (list<Context*>::iterator p = journal_open_waiters.begin();
-        p != journal_open_waiters.end();
-        ++p) 
-      mds->mdlog->wait_for_sync(*p);
-    journal_open_waiters.clear();
-  } else {
-    // nothing worth journaling here, just kick the waiters.
-    mds->queue_waiters(journal_open_waiters);
-  }
-}
-
-
-
 
 class C_MDS_open_truncate_purged : public Context {
   MDS *mds;
index 10e96870c4c3bb95278e8625a105e6ef249524c3..b8a9817c954e8b35d360e4a8749143772f5ffdfd 100644 (file)
@@ -114,18 +114,6 @@ public:
   void handle_client_opent(MDRequest *mdr);  // O_TRUNC variant.
   void _do_open(MDRequest *mdr, CInode *ref);
 
-  set<CInode*> journal_open_queue; // to be journal
-  list<Context*> journal_open_waiters;
-  void queue_journal_open(CInode *in);
-  void add_journal_open_waiter(Context *c) {
-    journal_open_waiters.push_back(c);
-  }
-  void maybe_journal_opens() {
-    if (journal_open_queue.size() >= (unsigned)g_conf.mds_log_eopen_size)
-      journal_opens();
-  }
-  void journal_opens();
-
   // namespace changes
   void handle_client_mknod(MDRequest *mdr);
   void handle_client_mkdir(MDRequest *mdr);
index eb492cd52574314c23d1402e17057fdb61dee4eb..1cd94fcc75621bc970e2d551d03fc3c950a05eb6 100644 (file)
@@ -22,7 +22,6 @@ class EOpen : public LogEvent {
 public:
   EMetaBlob metablob;
   list<inodeno_t> inos;
-  //list<inodeno_t> wr_inos;
 
   EOpen() : LogEvent(EVENT_OPEN) { }
   EOpen(MDLog *mdlog) : 
@@ -33,20 +32,21 @@ public:
   }
 
   void add_inode(CInode *in) {
-    inos.push_back(in->ino());
     if (!in->is_base()) {
+      inode_t *pi = in->get_projected_inode();
       metablob.add_dir_context(in->get_parent_dn()->get_dir());
-      metablob.add_primary_dentry(in->get_parent_dn(), false);
+      metablob.add_primary_dentry(in->get_parent_dn(), false, 0, pi);
+      inos.push_back(in->ino());
     }
   }
 
   void encode_payload(bufferlist& bl) {
-    ::_encode(inos, bl);
     metablob._encode(bl);
+    ::_encode(inos, bl);
   } 
   void decode_payload(bufferlist& bl, int& off) {
-    ::_decode(inos, bl, off);
     metablob._decode(bl, off);
+    ::_decode(inos, bl, off);
   }
 
   void update_segment();
index 8bb875aea5e71db28ba7868968c6c1e99daff866..c2bf14cec408f9cc108b425f706c8613b8462274 100644 (file)
@@ -129,14 +129,20 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
   // open files
   if (!open_files.empty()) {
     assert(!mds->mdlog->is_capped()); // hmm FIXME
+    EOpen *le = 0;
+    LogSegment *ls = mds->mdlog->get_current_segment();
     for (xlist<CInode*>::iterator p = open_files.begin(); !p.end(); ++p) {
-      dout(20) << "try_to_expire requeueing open file " << **p << dendl;
-      mds->server->queue_journal_open(*p);
+      CInode *in = *p;
+      dout(20) << "try_to_expire requeueing open file " << *in << dendl;
+      if (!le) le = new EOpen(mds->mdlog);
+      le->add_inode(in);
+      ls->open_files.push_back(&in->xlist_open_file);
+    }
+    if (le) {
+      if (!gather) gather = new C_Gather;
+      mds->mdlog->submit_entry(le, gather->new_sub());
+      dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
     }
-    if (!gather) gather = new C_Gather;
-    mds->server->add_journal_open_waiter(gather->new_sub());
-    mds->server->maybe_journal_opens();
-    dout(10) << "try_to_expire waiting for open files to rejournal" << dendl;
   }
 
   // slave updates
@@ -597,6 +603,15 @@ void EOpen::replay(MDS *mds)
 {
   dout(10) << "EOpen.replay " << dendl;
   metablob.replay(mds, _segment);
+
+  // note which segments inodes belong to, so we don't have to start rejournaling them
+  for (list<inodeno_t>::iterator p = inos.begin();
+       p != inos.end();
+       p++) {
+    CInode *in = mds->mdcache->get_inode(*p);
+    assert(in); 
+    _segment->open_files.push_back(&in->xlist_open_file);
+  }
 }
 
 
index d3176d54f17f2578e4606f3ce9592e091f2dd8e6..40d58d2c31d6e579e5575701765aa3e1fc4a3691 100644 (file)
@@ -815,7 +815,7 @@ int Rank::Pipe::accept()
       // if open race, low addr's pipe "wins".
       // otherwise, look at connect_seq
       if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) ||
-         (other->state == STATE_OPEN && cseq == other->connect_seq)) {
+         (other->state == STATE_OPEN && cseq >= other->connect_seq)) {
        dout(10) << "accept already had pipe " << other
                 << ", but switching to this new one" << dendl;
        // switch to this new Pipe