]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* session recovery cleanup
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 5 Apr 2007 20:29:13 +0000 (20:29 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 5 Apr 2007 20:29:13 +0000 (20:29 +0000)
* EOpen batching

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1341 29311d96-e01e-0410-9327-a35deaab8ce9

19 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/client/Client.cc
branches/sage/cephmds2/client/SyntheticClient.cc
branches/sage/cephmds2/config.cc
branches/sage/cephmds2/config.h
branches/sage/cephmds2/mds/AnchorClient.cc
branches/sage/cephmds2/mds/CDir.cc
branches/sage/cephmds2/mds/CInode.h
branches/sage/cephmds2/mds/ClientMap.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDS.h
branches/sage/cephmds2/mds/Migrator.cc
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/Server.h
branches/sage/cephmds2/mds/events/EOpen.h
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MClientReconnect.h
branches/sage/cephmds2/mon/ClientMonitor.cc
branches/sage/cephmds2/msg/SimpleMessenger.cc

index 04a5e01a813e0e89c2ca49ded41f1879d8d49bf8..00c87fe4cfa262196e5335f6fae785c1777c342c 100644 (file)
@@ -40,15 +40,12 @@ mds
     - ScatterLock or something?  hrm.
 
 - discover
-/  - hard link dentries
   - open_remote_ino needs major work...
 
 - FIXME how to journal root and stray inode content? 
   - in particular, i care about dirfragtree.. get it on rejoin?
   - and dir sizes, if i add that... also on rejoin?
 
-- rejoin and replicas that are not in recovered node's cache...  fetch storm?
-
 - remote xlocks
   - drop remote locks on request finish
     - handled by individual MDSCacheObject _finish()'s
@@ -58,30 +55,13 @@ mds
     - replicas will tell it when they hold an xlock
   - surviving mds rejoins replicas from a recovering mds
     - will tell auth if it holds an xlock
+
 - recovering open files
-/  - need to journal EOpen
-/  - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
-  - need to batch EOpen events when rejournaling to avoid looping
-    - or something.........
   - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
-    - path+cap window will require fetching and metadata from disk before doing the rejoin
-- client reconnect stage after resolve, but before rejoin.
-
-/- clientmap request history
-/  - journaled bit in MDRequest
-/  - complete request list in MClientMap
-/    - populated on replay
-/    - periodic client->mds MClientLastRetry -> trim request list
-
-
-/untested- truncate...
+    - path+cap window will require some fetching of metadata from disk before doing the rejoin
 
-- mds failure vs clients
-  - idempotent client ops
-    - EMetablob replay, expire logic
 - journal+recovery
   - local rename
-/    - fix dir renames vs subtrees
     - how to notify replicas...
 /  - stray purge
   - stray reintegration
index 53598bfcd548ed4e0e647e64f1617459b9d186a2..3cb83b84781ef2cf415d7ec975dcb56a76869118 100644 (file)
@@ -892,28 +892,34 @@ void Client::send_reconnect(int mds)
 
   MClientReconnect *m = new MClientReconnect;
 
-  for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
-       p != inode_map.end();
-       p++) {
-    if (p->second->caps.count(mds)) {
-      dout(10) << " caps on " << p->first
-              << " " << cap_string(p->second->caps[mds].caps)
-              << " wants " << cap_string(p->second->file_caps_wanted())
-              << endl;
-      m->add_inode_caps(p->first, 
-                       p->second->caps[mds].caps,
-                       p->second->caps[mds].seq,
-                       p->second->file_caps_wanted());
-      string path;
-      p->second->make_path(path);
-      dout(10) << " path on " << p->first << " is " << path << endl;
-      m->add_inode_path(p->first, path);
-    }
-    if (p->second->stale_caps.count(mds)) {
-      dout(10) << " clearing stale caps on " << p->first << endl;
-      p->second->stale_caps.erase(mds);         // hrm, is this right?
-    }
-  }    
+  if (mds_sessions.count(mds)) {
+    // i have an open session.
+    for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
+        p != inode_map.end();
+        p++) {
+      if (p->second->caps.count(mds)) {
+       dout(10) << " caps on " << p->first
+                << " " << cap_string(p->second->caps[mds].caps)
+                << " wants " << cap_string(p->second->file_caps_wanted())
+                << endl;
+       m->add_inode_caps(p->first, 
+                         p->second->caps[mds].caps,
+                         p->second->caps[mds].seq,
+                         p->second->file_caps_wanted());
+       string path;
+       p->second->make_path(path);
+       dout(10) << " path on " << p->first << " is " << path << endl;
+       m->add_inode_path(p->first, path);
+      }
+      if (p->second->stale_caps.count(mds)) {
+       dout(10) << " clearing stale caps on " << p->first << endl;
+       p->second->stale_caps.erase(mds);         // hrm, is this right?
+      }
+    }    
+  } else {
+    dout(10) << " i had no session with this mds";
+    m->closed = true;
+  }
 
   messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER);
 }
index bb7d6540f7998a8609fb96fd023f841a2c7d7475..5564cf90a6d722b9d2830f71f6879c596049a26e 100644 (file)
@@ -20,7 +20,8 @@ using namespace std;
 #include "SyntheticClient.h"
 
 #include "include/filepath.h"
-#include "mds/MDS.h"
+#include "mds/mdstypes.h"
+#include "common/Logger.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
index 51394988bc8ed47d0e5c4e312b1ed15231a9a3e9..379df1fb68859e364995a19fa69dfcd7b4bd895b 100644 (file)
@@ -176,6 +176,8 @@ md_config_t g_conf = {
   mds_log_before_reply: true,
   mds_log_flush_on_shutdown: true,
   mds_log_import_map_interval: 1024*1024,  // frequency (in bytes) of EImportMap in log
+  mds_log_eopen_size: 100,
+
   mds_bal_replicate_threshold: 2000,
   mds_bal_unreplicate_threshold: 0,//500,
   mds_bal_hash_rd: 10000,
index 8d01d06b4ed0bde04d6f6d57386f878d09f7950d..11bc3c2827048b1f5ecdd40302500a8246f113aa 100644 (file)
@@ -168,6 +168,7 @@ struct md_config_t {
   bool  mds_log_before_reply;
   bool  mds_log_flush_on_shutdown;
   off_t mds_log_import_map_interval;
+  int mds_log_eopen_size;
   
   float mds_bal_replicate_threshold;
   float mds_bal_unreplicate_threshold;
index 7b6eac2d8df1b046cf226f6ace09781382cfcaef..d7bfb655f06d807493083c75161e5dcea45af547 100644 (file)
@@ -195,7 +195,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
       // kick any waiters
       if (ack_waiters.count(atid)) {
        dout(15) << "kicking waiters on atid " << atid << endl;
-       mds->queue_finished(ack_waiters[atid]);
+       mds->queue_waiters(ack_waiters[atid]);
        ack_waiters.erase(atid);
       }
     }
index 6a754a5f50bd86299a44a9e29268e9e1912e0bb3..b13359fb916a1cce8af0042ad4f4aef28980241f 100644 (file)
@@ -452,7 +452,7 @@ void CDir::finish_waiting(int mask, int result)
   list<Context*> finished;
   take_waiting(mask, finished);
   //finish_contexts(finished, result);
-  cache->mds->queue_finished(finished);
+  cache->mds->queue_waiters(finished);
 }
 
 
@@ -944,7 +944,7 @@ void CDir::_committed(version_t v)
     map<version_t, list<Context*> >::iterator n = p;
     n++;
     if (p->first > committed_version) break; // haven't committed this far yet.
-    cache->mds->queue_finished(p->second);
+    cache->mds->queue_waiters(p->second);
     waiting_for_commit.erase(p);
     p = n;
   } 
@@ -1048,7 +1048,7 @@ void CDir::set_dir_auth(pair<int,int> a, bool iamauth)
   if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
     list<Context*> ls;
     take_waiting(WAIT_SINGLEAUTH, ls);
-    cache->mds->queue_finished(ls);
+    cache->mds->queue_waiters(ls);
   }
 }
 
index a1021d3bb81e1b9a0281585d263991ced3a5a702..7ecc8ac292e41df2db55dc4ae08d8967371f0a67 100644 (file)
@@ -63,6 +63,7 @@ class CInode : public MDSCacheObject {
   static const int PIN_UNANCHORING = 13;
   static const int PIN_OPENINGDIR = 14;
   static const int PIN_REMOTEPARENT = 15;
+  static const int PIN_BATCHOPENJOURNAL = 16;
 
   const char *pin_name(int p) {
     switch (p) {
@@ -76,6 +77,7 @@ class CInode : public MDSCacheObject {
     case PIN_UNANCHORING: return "unanchoring";
     case PIN_OPENINGDIR: return "openingdir";
     case PIN_REMOTEPARENT: return "remoteparent";
+    case PIN_BATCHOPENJOURNAL: return "batchopenjournal";
     default: return generic_pin_name(p);
     }
   }
index 065cd684a5dee020ef72ba7ce4d947a7a2acd222..6100b38beaffc769f7c111777e8072686b02ee97 100644 (file)
@@ -64,7 +64,9 @@ public:
 private:
   // effects version
   hash_map<int,entity_inst_t> client_inst;
-  set<int>                    sessions;
+  set<int> sessions;
+  set<int> opening;
+  set<int> closing;
 
 public:
   bool empty() {
@@ -77,15 +79,21 @@ public:
   }
   const set<int>& get_session_set() { return sessions; }
   
+  bool is_opening(int c) { return opening.count(c); }
+  void add_opening(int c) { opening.insert(c); }
+  bool is_closing(int c) { return closing.count(c); }
+  void add_closing(int c) { closing.insert(c); }
   bool have_session(int client) {
     return client_inst.count(client);
   }
-  void add_session(const entity_inst_t& inst) {
+  void open_session(const entity_inst_t& inst) {
+    opening.erase(inst.name.num());
     client_inst[inst.name.num()] = inst;
     sessions.insert(inst.name.num());
     version++;
   }
-  void rem_session(int client) {
+  void close_session(int client) {
+    closing.erase(client);
     sessions.erase(client);
     client_inst.erase(client);
     version++;
@@ -117,7 +125,8 @@ public:
     map<int, map<tid_t,Context*> >::iterator q = waiting_for_trim.find(client);
     if (q != waiting_for_trim.end()) {
       list<Context*> fls;
-      while (q->second.begin()->first < mintid) {
+      while (!q->second.empty() &&
+            (mintid == 0 || q->second.begin()->first < mintid)) {
        fls.push_back(q->second.begin()->second);
        q->second.erase(q->second.begin());
       }
index b4b39b058a9304c81835fdd189f02581c6843fff..202a04564d3ec930fd98396139eef1099d6cd3ce 100644 (file)
@@ -987,7 +987,7 @@ void MDCache::handle_mds_failure(int who)
       // take waiters
       list<Context*> waiters;
       in->take_waiting(CInode::WAIT_DIR, waiters);
-      mds->queue_finished(waiters);
+      mds->queue_waiters(waiters);
       dout(10) << "kicking WAIT_DIR on " << *in << endl;
       
       // remove from mds list
@@ -1053,7 +1053,7 @@ void MDCache::handle_mds_recovery(int who)
   }
 
   // queue them up.
-  mds->queue_finished(waiters);
+  mds->queue_waiters(waiters);
 }
 
 void MDCache::set_recovery_set(set<int>& s) 
@@ -2489,6 +2489,11 @@ bool MDCache::shutdown_pass()
   trim(0);
   dout(5) << "lru size now " << lru.lru_get_size() << endl;
 
+  // flush batching eopens, so that we can properly expire them.
+  mds->server->journal_opens();    // hrm, this is sort of a hack.
+
+  // flush what we can from the log
+  mds->mdlog->trim(0);
 
   // SUBTREES
   // send all imports back to 0.
@@ -2531,9 +2536,6 @@ bool MDCache::shutdown_pass()
   // FIXME
   dout(7) << "FIXME: i need to empty out stray dir contents..." << endl;
 
-  // LOG
-  mds->mdlog->trim(0);
-
   // (wait for) flush log?
   if (g_conf.mds_log_flush_on_shutdown) {
     if (mds->mdlog->get_non_importmap_events()) {
@@ -4077,7 +4079,7 @@ void MDCache::handle_discover_reply(MDiscoverReply *m)
   // finish errors directly
   finish_contexts(error, -ENOENT);
 
-  mds->queue_finished(finished);
+  mds->queue_waiters(finished);
 
   // done
   delete m;
@@ -4304,7 +4306,7 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
        list<Context*> finished;
        CDir *dir = add_replica_dir(in, m->straydir->get_dirfrag().frag, *m->straydir,
                                    m->get_source().num(), finished);
-       if (!finished.empty()) mds->queue_finished(finished);
+       if (!finished.empty()) mds->queue_waiters(finished);
        
        // dentry
        straydn = dir->add_dentry( m->straydn->get_dname(), 0, false );
index a1d6d8201c00cbdb4890821e30185ab4f0a1e6d2..d7645c322a518d4b595e3a84c3e142e99c1ad3ff 100644 (file)
@@ -138,10 +138,10 @@ class MDS : public Dispatcher {
   // -- waiters --
   list<Context*> finished_queue;
 
-  void queue_finished(Context *c) {
+  void queue_waiter(Context *c) {
     finished_queue.push_back(c);
   }
-  void queue_finished(list<Context*>& ls) {
+  void queue_waiters(list<Context*>& ls) {
     finished_queue.splice( finished_queue.end(), ls );
   }
   
index eebbaca6f8e3332b0113e15cca6dc486d8699853..9319b78dcd4bd85130eb58a10e21b08211c19915 100644 (file)
@@ -259,7 +259,7 @@ void Migrator::handle_mds_failure(int who)
        mds->locker->dentry_anon_rdlock_trace_finish(trace);
        
        // wake up any waiters
-       mds->queue_finished(export_finish_waiters[dir]);
+       mds->queue_waiters(export_finish_waiters[dir]);
        export_finish_waiters.erase(dir);
        
        // send pending import_maps?  (these need to go out when all exports have finished.)
@@ -1170,7 +1170,7 @@ void Migrator::export_finish(CDir *dir)
   export_notify_ack_waiting.erase(dir);
 
   // queue finishers
-  mds->queue_finished(export_finish_waiters[dir]);
+  mds->queue_waiters(export_finish_waiters[dir]);
   export_finish_waiters.erase(dir);
 
   // stats
index 2dbde64253775c32b3057278ba8c05e9a9dfffed..de5e88f494e843cb933b969ced4c344aa1a6bb59 100644 (file)
@@ -91,30 +91,15 @@ void Server::dispatch(Message *m)
 
 class C_MDS_session_finish : public Context {
   MDS *mds;
-  MClientSession *m;
+  entity_inst_t client_inst;
   bool open;
   version_t cmapv;
 public:
-  C_MDS_session_finish(MDS *m, MClientSession *msg, bool s, version_t mv) :
-    mds(m), m(msg), open(s), cmapv(mv) { }
+  C_MDS_session_finish(MDS *m, entity_inst_t ci, bool s, version_t mv) :
+    mds(m), client_inst(ci), open(s), cmapv(mv) { }
   void finish(int r) {
     assert(r == 0);
-
-    // apply
-    if (open)
-      mds->clientmap.add_session(m->get_source_inst());
-    else
-      mds->clientmap.rem_session(m->get_source().num());
-    
-    assert(cmapv == mds->clientmap.get_version());
-    
-    // purge completed requests from clientmap?
-    if (!open) 
-      mds->clientmap.trim_completed_requests(m->get_source().num(), 0);
-    
-    // reply
-    mds->messenger->send_message(new MClientSession(m->op+1), m->get_source_inst());
-    delete m;
+    mds->server->_session_logged(client_inst, open, cmapv);
   }
 };
 
@@ -122,13 +107,58 @@ public:
 void Server::handle_client_session(MClientSession *m)
 {
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl;
-
+  int from = m->get_source().num();
   bool open = m->op == MClientSession::OP_OPEN;
-  
+
+  if (open) {
+    if (mds->clientmap.is_opening(from)) {
+      dout(10) << "already opening, dropping this req" << endl;
+      delete m;
+      return;
+    }
+    mds->clientmap.add_opening(from);
+  } else {
+    if (mds->clientmap.is_closing(from)) {
+      dout(10) << "already closing, dropping this req" << endl;
+      delete m;
+      return;
+    }
+    mds->clientmap.add_closing(from);
+  }
+
   // journal it
   version_t cmapv = mds->clientmap.inc_projected();
   mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv),
-                     new C_MDS_session_finish(mds, m, open, cmapv));
+                     new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv));
+  delete m;
+}
+
+void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cmapv)
+{
+  dout(10) << "_session_logged " << client_inst << " " << (open ? "open":"close")
+          << " " << cmapv 
+          << endl;
+
+  // apply
+  int from = client_inst.name.num();
+  if (open) {
+    assert(mds->clientmap.is_opening(from));
+    mds->clientmap.open_session(client_inst);
+  } else {
+    assert(mds->clientmap.is_closing(from));
+    mds->clientmap.close_session(from);
+    
+    // purge completed requests from clientmap
+    mds->clientmap.trim_completed_requests(from, 0);
+  }
+  
+  assert(cmapv == mds->clientmap.get_version());
+  
+  // reply
+  if (open) 
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst);
+  else
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst);
 }
 
 
@@ -137,19 +167,16 @@ void Server::terminate_sessions()
   dout(2) << "terminate_sessions" << endl;
 
   // kill them off.  clients will retry etc.
-  while (!mds->clientmap.get_session_set().empty()) {
-    int client = *mds->clientmap.get_session_set().begin();
-    dout(10) << "terminating session for client" << client << endl;
-
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK),
-                                mds->clientmap.get_inst(client));
-    mds->clientmap.rem_session(client);
-
-    // trim requests
-    mds->clientmap.trim_completed_requests(client, 0);
+  for (set<int>::const_iterator p = mds->clientmap.get_session_set().begin();
+       p != mds->clientmap.get_session_set().end();
+       ++p) {
+    if (mds->clientmap.is_closing(*p)) 
+      continue;
+    mds->clientmap.add_closing(*p);
+    version_t cmapv = mds->clientmap.inc_projected();
+    mdlog->submit_entry(new ESession(mds->clientmap.get_inst(*p), false, cmapv),
+                       new C_MDS_session_finish(mds, mds->clientmap.get_inst(*p), false, cmapv));
   }
-  
-  // FIXME hrm, should i journal this?
 }
 
 
@@ -178,24 +205,35 @@ void Server::handle_client_reconnect(MClientReconnect *m)
   dout(7) << "handle_client_reconnect " << m->get_source() << endl;
   int from = m->get_source().num();
 
-  // caps
-  for (map<inodeno_t, MClientReconnect::inode_caps_t>::iterator p = m->inode_caps.begin();
-       p != m->inode_caps.end();
-       ++p) {
-    CInode *in = mdcache->get_inode(p->first);
-    if (!in) {
-      dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl;
-      assert(0);
-      continue;
-    }
+  if (m->closed) {
+    dout(7) << " client had no session, removing from clientmap" << endl;
 
-    dout(10) << " client cap " << cap_string(p->second.wanted)
-            << " seq " << p->second.seq 
-            << " on " << *in << endl;
-    Capability cap(p->second.wanted, p->second.seq);
-    in->add_client_cap(from, cap);
+    mds->clientmap.add_closing(from);
+    version_t cmapv = mds->clientmap.inc_projected();
+    mdlog->submit_entry(new ESession(mds->clientmap.get_inst(from), false, cmapv),
+                       new C_MDS_session_finish(mds, mds->clientmap.get_inst(from), false, cmapv));
 
-    reconnected_open_files.insert(in);
+  } else {
+
+    // caps
+    for (map<inodeno_t, MClientReconnect::inode_caps_t>::iterator p = m->inode_caps.begin();
+        p != m->inode_caps.end();
+        ++p) {
+      CInode *in = mdcache->get_inode(p->first);
+      if (!in) {
+       dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl;
+       assert(0);
+       continue;
+      }
+      
+      dout(10) << " client cap " << cap_string(p->second.wanted)
+              << " seq " << p->second.seq 
+              << " on " << *in << endl;
+      Capability cap(p->second.wanted, p->second.seq);
+      in->add_client_cap(from, cap);
+      
+      reconnected_open_files.insert(in);
+    }
   }
 
   // remove from gather set
@@ -1727,6 +1765,8 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn)
   C_MDS_unlink_local_finish *fin = new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, 
                                                                 ipv, pi->ctime);
   
+  journal_opens();  // journal pending opens, just in case
+  
   // log + wait
   mdlog->submit_entry(le);
   mdlog->wait_for_sync(fin);
@@ -2224,6 +2264,8 @@ void Server::_rename_local(MDRequest *mdr,
   C_MDS_rename_local_finish *fin = new C_MDS_rename_local_finish(mds, mdr, 
                                                                 srcdn, destdn, straydn,
                                                                 ipv, pi ? pi->ctime:utime_t());
+
+  journal_opens();  // journal pending opens, just in case
   
   if (anchorfin) {
     // doing anchor update prepare first
@@ -2681,12 +2723,62 @@ void Server::_do_open(MDRequest *mdr, CInode *cur)
 
   // journal?
   if (cur->last_open_journaled == 0) {
-    cur->last_open_journaled = mdlog->get_write_pos();
-    mdlog->submit_entry(new EOpen(cur));
+    queue_journal_open(cur);
+    maybe_journal_opens();
   }
 
 }
 
+void Server::queue_journal_open(CInode *in)
+{
+  dout(10) << "queue_journal_open on " << *in << endl;
+
+  // pin so our pointer stays valid
+  in->get(CInode::PIN_BATCHOPENJOURNAL);
+
+  // queue it up for a bit
+  journal_open_queue.insert(in);
+}
+
+
+void Server::journal_opens()
+{
+  dout(10) << "journal_opens " << journal_open_queue.size() << " inodes" << endl;
+  if (journal_open_queue.empty()) return;
+
+  EOpen *le = 0;
+
+  // check queued inodes
+  for (set<CInode*>::iterator p = journal_open_queue.begin();
+       p != journal_open_queue.end();
+       ++p) {
+    (*p)->put(CInode::PIN_BATCHOPENJOURNAL);
+    if ((*p)->is_any_caps()) {
+      if (!le) le = new EOpen;
+      le->add_inode(*p);
+      (*p)->last_open_journaled = mds->mdlog->get_write_pos();
+    }
+  }
+  journal_open_queue.clear();
+  
+  if (le) {
+    // journal
+    mds->mdlog->submit_entry(le);
+  
+    // add waiters to journal entry
+    for (list<Context*>::iterator p = journal_open_waiters.begin();
+        p != journal_open_waiters.end();
+        ++p) 
+      mds->mdlog->wait_for_sync(*p);
+    journal_open_waiters.clear();
+  } else {
+    // nothing worth journaling here, just kick the waiters.
+    mds->queue_waiters(journal_open_waiters);
+  }
+}
+
+
+
 
 class C_MDS_open_truncate_purged : public Context {
   MDS *mds;
index 2cd8a073e64ffb79b521d7603f75f375420b2791..24a7d19d139228a71be1b9297dba9411e6d5be41 100644 (file)
@@ -43,6 +43,7 @@ public:
   set<CInode*> reconnected_open_files;
   
   void handle_client_session(class MClientSession *m);
+  void _session_logged(entity_inst_t ci, bool open, version_t cmapv);
   void reconnect_clients();
   void handle_client_reconnect(class MClientReconnect *m);
   void client_reconnect_failure(int from);
@@ -84,6 +85,18 @@ public:
   void handle_client_opent(MDRequest *mdr);  // O_TRUNC variant.
   void _do_open(MDRequest *mdr, CInode *ref);
 
+  set<CInode*> journal_open_queue; // to be journal
+  list<Context*> journal_open_waiters;
+  void queue_journal_open(CInode *in);
+  void add_journal_open_waiter(Context *c) {
+    journal_open_waiters.push_back(c);
+  }
+  void maybe_journal_opens() {
+    if (journal_open_queue.size() >= (unsigned)g_conf.mds_log_eopen_size)
+      journal_opens();
+  }
+  void journal_opens();
+
   // namespace changes
   void handle_client_mknod(MDRequest *mdr);
   void handle_client_mkdir(MDRequest *mdr);
index 6af9dc61810d61024c6724dd0844430cc5aaffd6..3ddfabf330f15a9d646150ec455f85c005cd98de 100644 (file)
 class EOpen : public LogEvent {
 public:
   EMetaBlob metablob;
-  inodeno_t ino;
+  list<inodeno_t> inos;
 
   EOpen() : LogEvent(EVENT_OPEN) { }
-  EOpen(CInode *in) : LogEvent(EVENT_OPEN),
-                     ino(in->ino()) {
-    metablob.add_primary_dentry(in->get_parent_dn(), false);
+  EOpen(CInode *in) : LogEvent(EVENT_OPEN) {
+    add_inode(in);
   }
   void print(ostream& out) {
-    out << "EOpen " << ino << " " << metablob;
+    out << "EOpen " << metablob;
+  }
+
+  void add_inode(CInode *in) {
+    inos.push_back(in->ino());
+    metablob.add_primary_dentry(in->get_parent_dn(), false);
   }
 
   void encode_payload(bufferlist& bl) {
-    ::_encode(ino, bl);
+    ::_encode(inos, bl);
     metablob._encode(bl);
   } 
   void decode_payload(bufferlist& bl, int& off) {
-    ::_decode(ino, bl, off);
+    ::_decode(inos, bl, off);
     metablob._decode(bl, off);
   }
 
index 200e3168f0438884aae1d4a3f7c383a5366b6894..a298ee8cb8520bfe6a4d8e0eecf756f4ed77458e 100644 (file)
@@ -35,6 +35,7 @@
 #include "MDS.h"
 #include "MDLog.h"
 #include "MDCache.h"
+#include "Server.h"
 #include "Migrator.h"
 #include "AnchorTable.h"
 #include "AnchorClient.h"
@@ -504,9 +505,9 @@ void ESession::replay(MDS *mds)
 {
   dout(10) << "ESession.replay" << endl;
   if (open)
-    mds->clientmap.add_session(client_inst);
+    mds->clientmap.open_session(client_inst);
   else
-    mds->clientmap.rem_session(client_inst.name.num());
+    mds->clientmap.close_session(client_inst.name.num());
   mds->clientmap.reset_projected(); // make it follow version.
 }
 
@@ -669,42 +670,45 @@ void EUpdate::replay(MDS *mds)
 
 bool EOpen::has_expired(MDS *mds)
 {
-  CInode *in = mds->mdcache->get_inode(ino);
-  if (!in) return true;
-  if (!in->is_any_caps()) return true;
-  if (in->last_open_journaled > get_start_off() ||
-      in->last_open_journaled == 0) return true;
-  return false;
+  for (list<inodeno_t>::iterator p = inos.begin(); p != inos.end(); ++p) {
+    CInode *in = mds->mdcache->get_inode(*p);
+    if (in &&
+       in->is_any_caps() &&
+       !(in->last_open_journaled > get_start_off() ||
+         in->last_open_journaled == 0)) {
+      dout(10) << "EOpen.has_expired still refer to caps on " << *in << endl;
+      return false;
+    }
+  }
+  return true;
 }
 
 void EOpen::expire(MDS *mds, Context *c)
 {
-  CInode *in = mds->mdcache->get_inode(ino);
-  assert(in);
-
-  dout(10) << "EOpen.expire " << ino
-          << " last_open_journaled " << in->last_open_journaled << endl;
+  dout(10) << "EOpen.expire " << endl;
   
-  // wait?
-  // FIXME this is stupid.
-  if (in->last_open_journaled == get_start_off()) {
-    //||
-    //(get_start_off() < mds->mdlog->last_import_map &&
-    //in->last_open_journaled < mds->mdlog->last_import_map)) {
-    dout(10) << "waiting." << endl;
-    // wait
-    mds->mdlog->add_import_map_expire_waiter(c);
-  } else {
-    // rejournal now.
-    dout(10) << "rejournaling" << endl;
-    in->last_open_journaled = mds->mdlog->get_write_pos();
-    mds->mdlog->submit_entry(new EOpen(in));
+  if (mds->mdlog->is_capped()) {
+    dout(0) << "uh oh, log is capped, but i have unexpired opens." << endl;
+    assert(0);
+  }
+
+  for (list<inodeno_t>::iterator p = inos.begin(); p != inos.end(); ++p) {
+    CInode *in = mds->mdcache->get_inode(*p);
+    if (!in) continue;
+    if (!in->is_any_caps()) continue;
+    
+    dout(10) << "EOpen.expire " << in->ino()
+            << " last_open_journaled " << in->last_open_journaled << endl;
+
+    mds->server->queue_journal_open(in);
   }
+  mds->server->add_journal_open_waiter(c);
+  mds->server->maybe_journal_opens();
 }
 
 void EOpen::replay(MDS *mds)
 {
-  dout(10) << "EOpen.replay " << ino << endl;
+  dout(10) << "EOpen.replay " << endl;
   metablob.replay(mds);
 }
 
index 0b612d62dfdc6285bb54cecc59124c2d29e8ae34..ef6a0dc96bcc368b2e0df6dea76623d8b8f21192 100644 (file)
@@ -29,8 +29,10 @@ public:
 
   map<inodeno_t, inode_caps_t>  inode_caps;
   map<inodeno_t, string>        inode_path;
+  bool closed;
 
-  MClientReconnect() : Message(MSG_CLIENT_RECONNECT) { }
+  MClientReconnect() : Message(MSG_CLIENT_RECONNECT),
+                      closed(false) { }
 
   char *get_type_name() { return "client_reconnect"; }
   void print(ostream& out) {
@@ -48,11 +50,13 @@ public:
   }
 
   void encode_payload() {
+    ::_encode(closed, payload);
     ::_encode(inode_caps, payload);
     ::_encode(inode_path, payload);
   }
   void decode_payload() {
     int off = 0;
+    ::_decode(closed, payload, off);
     ::_decode(inode_caps, payload, off);
     ::_decode(inode_path, payload, off);
   }
index 2e3b5282364bba6b224989160923e1f208a36269..055370c971c488d821a5739edbab73e35790c95a 100644 (file)
@@ -59,7 +59,7 @@ void ClientMonitor::handle_client_mount(MClientMount *m)
       (client_map.count(from) && 
        client_map[from] != m->get_source_addr())) {
     from = num_clients++;
-    dout(10) << "client_boot assigned client" << from << endl;
+    dout(10) << "client_mount assigned client" << from << endl;
   }
   
   client_map[from] = m->get_source_addr();
index 1b15c85e2d3cccd0354aa892556e94f225e3f2be..ca702b62ed363061496fd9e3a9b8ea6a8ea9647d 100644 (file)
@@ -1016,6 +1016,7 @@ void Rank::wait()
   lock.Unlock();
 
   dout(10) << "wait: done." << endl;
+  dout(1) << "shutdown complete." << endl;
 }
 
 
@@ -1101,10 +1102,10 @@ int Rank::EntityMessenger::shutdown()
   
   // stop my dispatch thread
   if (dispatch_thread.am_self()) {
-    dout(1) << "shutdown i am dispatch, setting stop flag" << endl;
+    dout(10) << "shutdown i am dispatch, setting stop flag" << endl;
     stop = true;
   } else {
-    dout(1) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
+    dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
     lock.Lock();
     stop = true;
     cond.Signal();