]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* prelim work for EOpen journaling
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Apr 2007 20:07:32 +0000 (20:07 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 4 Apr 2007 20:07:32 +0000 (20:07 +0000)
* cache rejoin rewrite
* export dir prep cleanup
* thread cleanup

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1336 29311d96-e01e-0410-9327-a35deaab8ce9

24 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/common/Thread.h
branches/sage/cephmds2/common/Timer.h
branches/sage/cephmds2/config.cc
branches/sage/cephmds2/mds/CInode.cc
branches/sage/cephmds2/mds/CInode.h
branches/sage/cephmds2/mds/FileLock.h
branches/sage/cephmds2/mds/Locker.cc
branches/sage/cephmds2/mds/LogEvent.cc
branches/sage/cephmds2/mds/LogEvent.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDCache.h
branches/sage/cephmds2/mds/MDLog.cc
branches/sage/cephmds2/mds/MDLog.h
branches/sage/cephmds2/mds/Migrator.cc
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/SimpleLock.h
branches/sage/cephmds2/mds/events/EOpen.h [new file with mode: 0644]
branches/sage/cephmds2/mds/events/EUnlink.h [deleted file]
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MExportDirPrep.h
branches/sage/cephmds2/messages/MMDSCacheRejoin.h
branches/sage/cephmds2/msg/Message.cc
branches/sage/cephmds2/msg/Message.h

index 8f705b5521d55aec6bf91c56d901e3fe6b611d1a..3ef48543d45bb477c8f0e162d517c4fc9a73c579 100644 (file)
@@ -54,6 +54,16 @@ mds
     - handled by individual MDSCacheObject _finish()'s
 
 - properly recover lock state on rejoin...
+  - recovering mds rejoins replicas it pulled out of its journal
+    - replicas will tell it when they hold an xlock
+  - surviving mds rejoins replicas from a recovering mds
+    - will tell auth if it holds an xlock
+- recovering open files
+  - need to journal EOpen
+  - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
+  - need to batch EOpen events when rejournaling to avoid looping
+  - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
+    - path+cap window will require fetching and metadata from disk before doing the rejoin
 
 - clientmap request history
   - hmm, so we need completion codes, but only after recovery.
index 8565ce9effd925e8a44d38e7291bf35c0c98b237..2d8a393498479fbf579f0858ed819b77e4f50bf9 100644 (file)
@@ -16,6 +16,7 @@
 #define __THREAD_H
 
 #include <pthread.h>
+#include <errno.h>
 
 class Thread {
  private:
@@ -25,9 +26,7 @@ class Thread {
   Thread() : thread_id(0) {}
   virtual ~Thread() {}
 
-  pthread_t &get_thread_id() { return thread_id; }
-  bool is_started() { return thread_id != 0; }
-
+ protected:
   virtual void *entry() = 0;
 
  private:
@@ -36,27 +35,42 @@ class Thread {
   }
 
  public:
+  pthread_t &get_thread_id() { return thread_id; }
+  bool is_started() { return thread_id != 0; }
+  bool am_self() { return (pthread_self() == thread_id); }
+
   int create() {
     return pthread_create( &thread_id, NULL, _entry_func, (void*)this );
   }
-
-  bool am_self() {
-    return (pthread_self() == thread_id);
-  }
-
   int join(void **prval = 0) {
-    assert(thread_id);
-    //if (thread_id == 0) return -1;   // never started.
-
+    if (thread_id == 0) {
+      cerr << "WARNING: join on thread that was never started" << endl;
+      //assert(0);
+      return -EINVAL;   // never started.
+    }
+    
     int status = pthread_join(thread_id, prval);
-    if (status == 0) 
-      thread_id = 0;
-    else {
-      cout << "join status = " << status << endl;
-      assert(0);
+    if (status != 0) {
+      switch (status) {
+      case -EINVAL:
+       cerr << "thread " << thread_id << " join status = EINVAL" << endl;
+       break;
+      case -ESRCH:
+       cerr << "thread " << thread_id << " join status = ESRCH" << endl;
+       assert(0);
+       break;
+      case -EDEADLK:
+       cerr << "thread " << thread_id << " join status = EDEADLK" << endl;
+       break;
+      default:
+       cerr << "thread " << thread_id << " join status = " << status << endl;
+      }
+      assert(0); // none of these should happen.
     }
+    thread_id = 0;
     return status;
   }
+
 };
 
 #endif
index 88d9929ac5ae1725ed8e85312db18e96a28ca295..80470c361573735c10a3fbc656cf3389fb2dc265 100644 (file)
@@ -53,15 +53,11 @@ class Timer {
   map< utime_t, set<Context*> >  scheduled;    // time -> (context ...)
   hash_map< Context*, utime_t >  event_times;  // event -> time
 
-  // get time of the next event
-  //Context* get_next_scheduled(utime_t& when);
-
   bool get_next_due(utime_t &when);
 
   void register_timer();  // make sure i get a callback
   void cancel_timer();    // make sure i get a callback
 
-  //pthread_t thread_id;
   bool      thread_stop;
   Mutex     lock;
   bool      timed_sleep;
index ddbc606b726c100079d71cde3e78918e94575106..50937c5edc294eab30d5470ba67fecc92cf35022 100644 (file)
@@ -164,7 +164,7 @@ md_config_t g_conf = {
   mds_decay_halflife: 30,
 
   mds_beacon_interval: 5.0,
-  mds_beacon_grace: 10.0,
+  mds_beacon_grace: 15.0,
 
   mds_log: true,
   mds_log_max_len:  MDS_CACHE_SIZE / 3,
index 7496852637c48435a4400f74061be85e2c328608..a0ee7cb0251d94757b2aa098c240cf78a017b81b 100644 (file)
@@ -94,33 +94,6 @@ void CInode::print(ostream& out)
 
 
 // ====== CInode =======
-CInode::CInode(MDCache *c, bool auth) : 
-  authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET),
-  linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET),
-  dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET),
-  filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET)
-{
-  mdcache = c;
-
-  //num_parents = 0;
-  parent = NULL;
-  
-  auth_pins = 0;
-  nested_auth_pins = 0;
-  //num_request_pins = 0;
-
-  state = 0;  
-
-  if (auth) state_set(STATE_AUTH);
-}
-
-CInode::~CInode() {
-  for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
-       p != dirfrags.end();
-       ++p)
-    delete p->second;
-}
-
 
 // dirfrags
 
@@ -135,13 +108,15 @@ frag_t CInode::pick_dirfrag(const string& dn)
 
 void CInode::get_dirfrags(list<CDir*>& ls) 
 {
+  // all dirfrags
   for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
        p != dirfrags.end();
        ++p)
     ls.push_back(p->second);
 }
 void CInode::get_nested_dirfrags(list<CDir*>& ls) 
-{  // same subtree
+{  
+  // dirfrags in same subtree
   for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
        p != dirfrags.end();
        ++p)
@@ -149,7 +124,8 @@ void CInode::get_nested_dirfrags(list<CDir*>& ls)
       ls.push_back(p->second);
 }
 void CInode::get_subtree_dirfrags(list<CDir*>& ls) 
-{  // new subtree
+{ 
+  // dirfrags that are roots of new subtrees
   for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
        p != dirfrags.end();
        ++p)
index 4f600f6cfb963cefeb2e60377285a2ddc983745a..a1021d3bb81e1b9a0281585d263991ced3a5a702 100644 (file)
@@ -55,7 +55,7 @@ class CInode : public MDSCacheObject {
   //static const int PIN_REPLICATED =     1;
   static const int PIN_DIR =        2;
   static const int PIN_PROXY =      5;  // can't expire yet
-  static const int PIN_CAPS =       7;  // local fh's
+  static const int PIN_CAPS =       7;  // client caps
   static const int PIN_AUTHPIN =    8;
   static const int PIN_IMPORTING =  -9;  // importing
   static const int PIN_RENAMESRC = 11;  // pinned on dest for foreign rename
@@ -122,11 +122,12 @@ class CInode : public MDSCacheObject {
   string           symlink;      // symlink dest, if symlink
   fragtree_t       dirfragtree;  // dir frag tree, if any
 
-  frag_t pick_dirfrag(const string &dn);
+  off_t            last_open_journaled;  // log offset for the last journaled EOpen
 
   // -- cache infrastructure --
   map<frag_t,CDir*> dirfrags; // cached dir fragments
 
+  frag_t pick_dirfrag(const string &dn);
   CDir* get_dirfrag(frag_t fg) {
     if (dirfrags.count(fg)) 
       return dirfrags[fg];
@@ -175,8 +176,23 @@ protected:
 
  public:
   // ---------------------------
-  CInode(MDCache *c, bool auth=true);
-  ~CInode();
+  CInode(MDCache *c, bool auth=true) : 
+    mdcache(c),
+    last_open_journaled(0),
+    parent(0),
+    replica_caps_wanted(0),
+    auth_pins(0), nested_auth_pins(0),
+    authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET),
+    linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET),
+    dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET),
+    filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET)
+  {
+    state = 0;  
+    if (auth) state_set(STATE_AUTH);
+  };
+  ~CInode() {
+    close_dirfrags();
+  }
   
 
   // -- accessors --
index 192f0b9b38f50e17912f1c15e381f5ec9313417a..cef3384010c0ae39e0ed98b9cac36e5ad8d24596 100644 (file)
@@ -28,22 +28,22 @@ using namespace std;
 //  C = cache reads, R = read, W = write, A = append, B = buffer writes, L = lazyio
 
 //                               -----auth--------   ---replica-------
-#define LOCK_SYNC_    0  // AR   R . / C R . . . L   R . / C R . . . L   stat()
-#define LOCK_GSYNCL  -11 // A    . . / C ? . . . L                       loner -> sync    (*) FIXME: let old loner keep R, somehow...
-#define LOCK_GSYNCM  -12 // A    . . / . R . . . L
+#define LOCK_SYNC_    1  // AR   R . / C R . . . L   R . / C R . . . L   stat()
+#define LOCK_GSYNCL  -12 // A    . . / C ? . . . L                       loner -> sync    (*) FIXME: let old loner keep R, somehow...
+#define LOCK_GSYNCM  -13 // A    . . / . R . . . L
 
-#define LOCK_LOCK_    1  // AR   R W / C . . . . .   . . / C . . . . .   truncate()
-#define LOCK_GLOCKR_ -2  // AR   R . / C . . . . .   . . / C . . . . .
-#define LOCK_GLOCKL  -3  // A    . . / . . . . . .                       loner -> lock
-#define LOCK_GLOCKM  -4  // A    . . / . . . . . .
+#define LOCK_LOCK_    2  // AR   R W / C . . . . .   . . / C . . . . .   truncate()
+#define LOCK_GLOCKR_ -3  // AR   R . / C . . . . .   . . / C . . . . .
+#define LOCK_GLOCKL  -4  // A    . . / . . . . . .                       loner -> lock
+#define LOCK_GLOCKM  -5  // A    . . / . . . . . .
 
-#define LOCK_MIXED    5  // AR   . . / . R W A . L   . . / . R . . . L
-#define LOCK_GMIXEDR -6  // AR   R . / . R . . . L   . . / . R . . . L 
-#define LOCK_GMIXEDL -7  // A    . . / . . . . . L                       loner -> mixed
+#define LOCK_MIXED    6  // AR   . . / . R W A . L   . . / . R . . . L
+#define LOCK_GMIXEDR -7  // AR   R . / . R . . . L   . . / . R . . . L 
+#define LOCK_GMIXEDL -8  // A    . . / . . . . . L                       loner -> mixed
 
-#define LOCK_LONER    8  // A    . . / C R W A B L        (lock)      
-#define LOCK_GLONERR - // A    . . / . R . . . L
-#define LOCK_GLONERM -10 // A    . . / . R W A . L
+#define LOCK_LONER    9  // A    . . / C R W A B L        (lock)      
+#define LOCK_GLONERR -10 // A    . . / . R . . . L
+#define LOCK_GLONERM -11 // A    . . / . R W A . L
 
 
 //   4 stable
@@ -207,8 +207,9 @@ class FileLock : public SimpleLock {
 
 inline ostream& operator<<(ostream& out, FileLock& l) 
 {
-  out << "(" << get_lock_type_name(l.get_type())
-      << " " << get_filelock_state_name(l.get_state());
+  out << "(";
+  //out << get_lock_type_name(l.get_type()) << " ";
+  out << get_filelock_state_name(l.get_state());
   if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set();
   if (l.get_num_rdlock()) 
     out << " r=" << l.get_num_rdlock();
index c6977e8a1a3730555bdedf57de6a6aff1f10f445..68d25a94078b68a7b84368724e5ec3bbb8e73a52 100644 (file)
@@ -26,7 +26,6 @@
 
 #include "events/EString.h"
 #include "events/EUpdate.h"
-#include "events/EUnlink.h"
 
 #include "msg/Messenger.h"
 
index 43ac0ec9647341cd6ae0a513bccc756833266db3..de0e6eeb3afb8205d523aaa9b0a499acd7dcefbd 100644 (file)
 
 // events i know of
 #include "events/EString.h"
-#include "events/EImportMap.h"
-#include "events/EMetaBlob.h"
-#include "events/EUpdate.h"
-#include "events/ESlaveUpdate.h"
-#include "events/EUnlink.h"
+
 #include "events/EMount.h"
 #include "events/EClientMap.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
-#include "events/EAlloc.h"
-#include "events/EPurgeFinish.h"
+#include "events/EImportMap.h"
 #include "events/EExport.h"
 #include "events/EImportStart.h"
 #include "events/EImportFinish.h"
 
+#include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
+#include "events/EOpen.h"
+
+#include "events/EAlloc.h"
+#include "events/EPurgeFinish.h"
+
+#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
+
+
 LogEvent *LogEvent::decode(bufferlist& bl)
 {
   // parse type, length
@@ -48,20 +52,24 @@ LogEvent *LogEvent::decode(bufferlist& bl)
   // create event
   LogEvent *le;
   switch (type) {
-  case EVENT_STRING: le = new EString(); break;
+  case EVENT_STRING: le = new EString; break;
+
+  case EVENT_MOUNT: le = new EMount; break;
+  case EVENT_CLIENTMAP: le = new EClientMap; break;
   case EVENT_IMPORTMAP: le = new EImportMap; break;
-  case EVENT_UPDATE: le = new EUpdate; break;
-  case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
-  case EVENT_UNLINK: le = new EUnlink(); break;
-  case EVENT_PURGEFINISH: le = new EPurgeFinish(); break;
-  case EVENT_MOUNT: le = new EMount(); break;
-  case EVENT_CLIENTMAP: le = new EClientMap(); break;
-  case EVENT_ANCHOR: le = new EAnchor(); break;
-  case EVENT_ANCHORCLIENT: le = new EAnchorClient(); break;
-  case EVENT_ALLOC: le = new EAlloc(); break;
   case EVENT_EXPORT: le = new EExport; break;
   case EVENT_IMPORTSTART: le = new EImportStart; break;
   case EVENT_IMPORTFINISH: le = new EImportFinish; break;
+
+  case EVENT_UPDATE: le = new EUpdate; break;
+  case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
+  case EVENT_OPEN: le = new EOpen; break;
+
+  case EVENT_ALLOC: le = new EAlloc; break;
+  case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
+
+  case EVENT_ANCHOR: le = new EAnchor; break;
+  case EVENT_ANCHORCLIENT: le = new EAnchorClient; break;
   default:
     dout(1) << "uh oh, unknown log event type " << type << endl;
     assert(0);
index 8138ab200845502b9a971516f1c515635f934d23..f4791dbc7ddca1157532ed423d82d42139c53412 100644 (file)
 
 #define EVENT_STRING       1
 
-#define EVENT_INODEUPDATE  2
-#define EVENT_DIRUPDATE    3
-
-#define EVENT_IMPORTMAP    4
-#define EVENT_UPDATE       5
-#define EVENT_SLAVEUPDATE  6
-
 #define EVENT_MOUNT        7
 #define EVENT_CLIENTMAP    8
+#define EVENT_IMPORTMAP    2
+#define EVENT_EXPORT       30
+#define EVENT_IMPORTSTART  31
+#define EVENT_IMPORTFINISH 32
 
-#define EVENT_ALLOC        10
-#define EVENT_MKNOD        11
-#define EVENT_MKDIR        12
-#define EVENT_LINK         13
+#define EVENT_UPDATE       3
+#define EVENT_SLAVEUPDATE  4
+#define EVENT_OPEN         5
 
-#define EVENT_UNLINK       20
-#define EVENT_RMDIR        21
+#define EVENT_ALLOC        10
 #define EVENT_PURGEFINISH  22
 
-#define EVENT_EXPORT  30
-#define EVENT_IMPORTSTART  31
-#define EVENT_IMPORTFINISH 32
-
 #define EVENT_ANCHOR       40
 #define EVENT_ANCHORCLIENT 41
 
index 41547e8f6dae76f24439457c83cd283af4122a99..044670253049b2fb8c3a5bb34c5b2f373e07cda9 100644 (file)
 #include "events/EUpdate.h"
 #include "events/ESlaveUpdate.h"
 #include "events/EString.h"
-#include "events/EUnlink.h"
 #include "events/EPurgeFinish.h"
 
 #include "messages/MGenericMessage.h"
 
 #include "messages/MMDSImportMap.h"
 #include "messages/MMDSCacheRejoin.h"
-#include "messages/MMDSCacheRejoinAck.h"
 
 #include "messages/MDiscover.h"
 #include "messages/MDiscoverReply.h"
@@ -1303,7 +1301,7 @@ void MDCache::recalc_auth_bits()
  * rejoin phase!
  * we start out by sending rejoins to everyone in the recovery set.
  *
- * if _were_ are rejoining, send for all regions in our cache.
+ * if we are rejoin, send for all regions in our cache.
  * if we are active|stopping, send only to nodes that are are rejoining.
  */
 void MDCache::send_cache_rejoins()
@@ -1320,12 +1318,12 @@ void MDCache::send_cache_rejoins()
     if (*p == mds->get_nodeid())  continue;  // nothing to myself!
     if (mds->is_rejoin() ||
        mds->mdsmap->is_rejoin(*p))
-      rejoins[*p] = new MMDSCacheRejoin;
+      rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_REJOIN);
   }    
 
   assert(!migrator->is_importing());
   assert(!migrator->is_exporting());
-
+  
   // check all subtrees
   for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
        p != subtrees.end();
@@ -1353,7 +1351,7 @@ void MDCache::send_cache_rejoins()
   }
 
   // nothing?
-  if (rejoins.empty()) {
+  if (mds->is_rejoin() && rejoins.empty()) {
     dout(10) << "nothing to rejoin, going active" << endl;
     mds->set_want_state(MDSMap::STATE_ACTIVE);
   }
@@ -1363,8 +1361,12 @@ void MDCache::send_cache_rejoins()
 void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
 {
   dout(10) << "cache_rejoin_walk " << *dir << endl;
-  rejoin->add_dirfrag(dir->dirfrag());
 
+  //if (mds->is_rejoin()) 
+  rejoin->add_weak_dirfrag(dir->dirfrag());
+  //else
+  //rejoin->add_strong_dirfrag(dir->dirfrag(), dir->get_replica_nonce());
+  
   list<CDir*> nested;  // finish this dir, then do nested items
   
   // walk dentries
@@ -1372,13 +1374,43 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
        p != dir->items.end();
        ++p) {
     // dentry
-    rejoin->add_dentry(dir->dirfrag(), p->first);
+    CDentry *dn = p->second;
+    if (mds->is_rejoin())
+      rejoin->add_weak_dentry(dir->dirfrag(), p->first);
+    else {
+      rejoin->add_strong_dentry(dir->dirfrag(), p->first,
+                               dn->get_replica_nonce(),
+                               dn->lock.get_state());
+      if (dn->lock.is_xlocked())
+       rejoin->add_dentry_xlock(dir->dirfrag(), p->first, 
+                                dn->lock.get_xlocked_by()->reqid);
+    }
     
     // inode?
-    if (p->second->is_primary() && p->second->get_inode()) {
-      CInode *in = p->second->get_inode();
-      rejoin->add_inode(in->ino(), 
-                       in->get_caps_wanted());
+    if (dn->is_primary() && dn->get_inode()) {
+      CInode *in = dn->get_inode();
+      if (mds->is_rejoin() && in->get_caps_wanted() == 0)
+       rejoin->add_weak_inode(in->ino());
+      else {
+       rejoin->add_strong_inode(in->ino(), in->get_replica_nonce(),
+                                in->get_caps_wanted(),
+                                in->authlock.get_state(),
+                                in->linklock.get_state(),
+                                in->dirfragtreelock.get_state(),
+                                in->filelock.get_state());
+       if (in->authlock.is_xlocked())
+         rejoin->add_inode_xlock(in->ino(), in->authlock.get_type(),
+                                 in->authlock.get_xlocked_by()->reqid);
+       if (in->linklock.is_xlocked())
+         rejoin->add_inode_xlock(in->ino(), in->linklock.get_type(),
+                                 in->linklock.get_xlocked_by()->reqid);
+       if (in->dirfragtreelock.is_xlocked())
+         rejoin->add_inode_xlock(in->ino(), in->dirfragtreelock.get_type(),
+                                 in->dirfragtreelock.get_xlocked_by()->reqid);
+       if (in->filelock.is_xlocked())
+         rejoin->add_inode_xlock(in->ino(), in->filelock.get_type(),
+                                 in->filelock.get_xlocked_by()->reqid);
+      }
       
       // dirfrags in this subtree?
       list<CDir*> dfs;
@@ -1401,7 +1433,6 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
 
 /*
  * i got a rejoin.
- * 
  *  - reply with the lockstate
  *
  * if i am active|stopping, 
@@ -1409,30 +1440,59 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
  */
 void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m)
 {
-  dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
+  dout(7) << "handle_cache_rejoin " << *m << " from " << m->get_source() << endl;
+
+  switch (m->op) {
+  case MMDSCacheRejoin::OP_REJOIN:
+    handle_cache_rejoin_rejoin(m);
+    break;
+
+  case MMDSCacheRejoin::OP_ACK:
+    handle_cache_rejoin_ack(m);
+    break;
+
+  case MMDSCacheRejoin::OP_MISSING:
+    handle_cache_rejoin_missing(m);
+    break;
+
+  case MMDSCacheRejoin::OP_FULL:
+    handle_cache_rejoin_full(m);
+    break;
+
+  default: 
+    assert(0);
+  }
+  delete m;
+}
+
+void MDCache::handle_cache_rejoin_rejoin(MMDSCacheRejoin *m)
+{
   int from = m->get_source().num();
 
-  MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck;
+  // do immediate ack?
+  MMDSCacheRejoin *ack = 0;
+  MMDSCacheRejoin *missing = 0;
 
   if (mds->is_active() || mds->is_stopping()) {
-    dout(10) << "removing stale cache replicas" << endl;
+    dout(10) << "i am active.  removing stale cache replicas" << endl;
+    
     // first, scour cache of replica references
     for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
         p != inode_map.end();
         ++p) {
       // inode
       CInode *in = p->second;
-      if (in->is_replica(from) && m->inodes.count(p->first) == 0) {
+      if (in->is_replica(from) && m->weak_inodes.count(p->first) == 0) {
        inode_remove_replica(in, from);
        dout(10) << " rem " << *in << endl;
       }
-
+      
       // dentry
       if (in->parent) {
        CDentry *dn = in->parent;
        if (dn->is_replica(from) &&
-           (m->dentries.count(dn->get_dir()->dirfrag()) == 0 ||
-            m->dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) {
+           (m->weak_dentries.count(dn->get_dir()->dirfrag()) == 0 ||
+            m->weak_dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) {
          dn->remove_replica(from);
          dout(10) << " rem " << *dn << endl;
        }
@@ -1445,87 +1505,179 @@ void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m)
           p != dfs.end();
           ++p) {
        CDir *dir = *p;
-       if (dir->is_replica(from) && m->dirfrags.count(dir->dirfrag()) == 0) {
+       if (dir->is_replica(from) && m->weak_dirfrags.count(dir->dirfrag()) == 0) {
          dir->remove_replica(from);
          dout(10) << " rem " << *dir << endl;
        }
       }
     }
-  } else {
-    assert(mds->is_rejoin());
+    
+    // do immediate ack.
+    ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
   }
-
+  
   // dirs
-  for (set<dirfrag_t>::iterator p = m->dirfrags.begin();
-       p != m->dirfrags.end();
+  for (set<dirfrag_t>::iterator p = m->weak_dirfrags.begin();
+       p != m->weak_dirfrags.end();
        ++p) {
     CDir *dir = get_dirfrag(*p);
-    assert(dir);
-    int nonce = dir->add_replica(from);
-    dout(10) << " has " << *dir << endl;
-    ack->add_dirfrag(*p, nonce);
+    if (dir) {
+      int nonce = dir->add_replica(from);
+      dout(10) << " have " << *dir << endl;
+      if (ack) 
+       ack->add_strong_dirfrag(*p, nonce);
     
-    // dentries
-    for (set<string>::iterator q = m->dentries[*p].begin();
-        q != m->dentries[*p].end();
-        ++q) {
-      CDentry *dn = dir->lookup(*q);
-      assert(dn);
-      int nonce = dn->add_replica(from);
-      dout(10) << " has " << *dn << endl;
-      ack->add_dentry(*p, *q, dn->lock.get_state(), nonce);
+      // dentries
+      for (set<string>::iterator q = m->weak_dentries[*p].begin();
+          q != m->weak_dentries[*p].end();
+          ++q) {
+       CDentry *dn = dir->lookup(*q);
+       if (dn) {
+         int nonce = dn->add_replica(from);
+         dout(10) << " have " << *dn << endl;
+         ack->add_strong_dentry(*p, *q, dn->lock.get_state(), nonce);
+       } else {
+         dout(10) << " missing " << *p << " " << *q << endl;
+         if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+         missing->add_weak_dentry(*p, *q);
+       }
+       if (ack)
+         ack->add_strong_dentry(*p, *q, nonce, dn->lock.get_state());
+      }
+    } else {
+      dout(10) << " missing " << *p << endl;
+      if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+      missing->add_weak_dirfrag(*p);
+      
+      // dentries
+      for (set<string>::iterator q = m->weak_dentries[*p].begin();
+          q != m->weak_dentries[*p].end();
+          ++q) 
+       missing->add_weak_dentry(*p, *q);
     }
   }
 
   // inodes
-  for (map<inodeno_t,int>::iterator p = m->inodes.begin();
-       p != m->inodes.end();
+  for (set<inodeno_t>::iterator p = m->weak_inodes.begin();
+       p != m->weak_inodes.end();
        ++p) {
-    CInode *in = get_inode(p->first);
-    assert(in);
-    int nonce = in->add_replica(from);
-    if (p->second)
-      in->mds_caps_wanted[from] = p->second;
-    else
+    CInode *in = get_inode(*p);
+    if (in) {
+      int nonce = in->add_replica(from);
       in->mds_caps_wanted.erase(from);
-    in->authlock.remove_gather(from);  // just in case
-    in->linklock.remove_gather(from);  // just in case
-    in->dirfragtreelock.remove_gather(from);  // just in case
-    in->filelock.remove_gather(from);  // just in case
-    dout(10) << " has " << *in << endl;
-    ack->add_inode(p->first, 
-                  in->authlock.get_replica_state(), 
-                  in->linklock.get_replica_state(), 
-                  in->dirfragtreelock.get_replica_state(), 
-                  in->filelock.get_replica_state(), 
-                  nonce);
+      in->authlock.remove_gather(from);  // just in case
+      in->linklock.remove_gather(from);  // just in case
+      in->dirfragtreelock.remove_gather(from);  // just in case
+      in->filelock.remove_gather(from);  // just in case
+      dout(10) << " have (weak) " << *in << endl;
+      if (ack) 
+       ack->add_strong_inode(in->ino(), 
+                             nonce,
+                             0,
+                             in->authlock.get_replica_state(), 
+                             in->linklock.get_replica_state(), 
+                             in->dirfragtreelock.get_replica_state(), 
+                             in->filelock.get_replica_state());
+    } else {
+      dout(10) << " missing " << *p << endl;
+      if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+      missing->add_weak_inode(*p);
+    }
   }
 
-  // send ack
-  mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+  // strong inodes too?
+  for (map<inodeno_t, MMDSCacheRejoin::inode_strong>::iterator p = m->strong_inodes.begin();
+       p != m->strong_inodes.end();
+       ++p) {
+    CInode *in = get_inode(p->first);
+    if (in) {
+      dout(10) << " have (strong) " << *in << endl;
+      int nonce = in->add_replica(from);
+      if (p->second.caps_wanted)
+       in->mds_caps_wanted[from] = p->second.caps_wanted;
+      else
+       in->mds_caps_wanted.erase(from);
+      in->authlock.remove_gather(from);  // just in case
+      in->linklock.remove_gather(from);  // just in case
+      in->dirfragtreelock.remove_gather(from);  // just in case
+      in->filelock.remove_gather(from);  // just in case
+      dout(10) << " have (weak) " << *in << endl;
+      if (ack) 
+       ack->add_strong_inode(in->ino(), 
+                             nonce,
+                             0,
+                             in->authlock.get_replica_state(), 
+                             in->linklock.get_replica_state(), 
+                             in->dirfragtreelock.get_replica_state(), 
+                             in->filelock.get_replica_state());
+    } else {
+      dout(10) << " missing " << p->first << endl;
+      if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+      missing->add_weak_inode(p->first);
+    }
+  }
   
-  delete m;
+  // xlocks
+  for (list<MMDSCacheRejoin::inode_xlock>::iterator p = m->xlocked_inodes.begin();
+       p != m->xlocked_inodes.end();
+       ++p) {
+    CInode *in = get_inode(p->ino);
+    if (!in) continue;  // already missing, from strong_inodes list above.
+    
+    dout(10) << " inode xlock by " << p->reqid << " on " << *in << endl;
+    assert(0);    
+    //SimpleLock *lock = in->get_lock(p->locktype);
+    // .. FIXME IMPLEMENT ME ..
+      
+    
+  }
+  for (map<dirfrag_t, map<string, metareqid_t> >::iterator p = m->xlocked_dentries.begin();
+       p != m->xlocked_dentries.end();
+       ++p) {
+    CDir *dir = get_dirfrag(p->first);
+    if (!dir) continue;  // already missing, from above.
+    for (map<string, metareqid_t>::iterator q = p->second.begin();
+        q != p->second.end();
+        ++q) {
+      CDentry *dn = dir->lookup(q->first);
+      if (!dn) continue;  // already missing, from above.
+      dout(10) << " dn xlock by " << q->second << " on " << *dn << endl;
+      
+      // FIXME IMPLEMENT ME
+      assert(0);
+    }
+  }
+  
+  // send ack?
+  if (ack)
+    mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+  else
+    want_rejoin_ack.insert(from);
+
+  // send missing?
+  if (missing)
+    mds->send_message_mds(missing, from, MDS_PORT_CACHE);
 }
 
 
-void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
+void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *m)
 {
   dout(7) << "handle_cache_rejoin_ack from " << m->get_source() << endl;
   int from = m->get_source().num();
   
   // dirs
-  for (list<MMDSCacheRejoinAck::dirinfo>::iterator p = m->dirfrags.begin();
-       p != m->dirfrags.end();
+  for (map<dirfrag_t, MMDSCacheRejoin::dirfrag_strong>::iterator p = m->strong_dirfrags.begin();
+       p != m->strong_dirfrags.end();
        ++p) {
-    CDir *dir = get_dirfrag(p->dirfrag);
+    CDir *dir = get_dirfrag(p->first);
     assert(dir);
 
-    dir->set_replica_nonce(p->nonce);
+    dir->set_replica_nonce(p->second.nonce);
     dout(10) << " got " << *dir << endl;
 
     // dentries
-    for (map<string,MMDSCacheRejoinAck::dninfo>::iterator q = m->dentries[p->dirfrag].begin();
-        q != m->dentries[p->dirfrag].end();
+    for (map<string,MMDSCacheRejoin::dn_strong>::iterator q = m->strong_dentries[p->first].begin();
+        q != m->strong_dentries[p->first].end();
         ++q) {
       CDentry *dn = dir->lookup(q->first);
       assert(dn);
@@ -1536,25 +1688,25 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
   }
 
   // inodes
-  for (list<MMDSCacheRejoinAck::inodeinfo>::iterator p = m->inodes.begin();
-       p != m->inodes.end();
+  for (map<inodeno_t, MMDSCacheRejoin::inode_strong>::iterator p = m->strong_inodes.begin();
+       p != m->strong_inodes.end();
        ++p) {
-    CInode *in = get_inode(p->ino);
+    CInode *in = get_inode(p->first);
     assert(in);
-    in->set_replica_nonce(p->nonce);
-    in->authlock.set_state(p->authlock);
-    in->linklock.set_state(p->linklock);
-    in->dirfragtreelock.set_state(p->dirfragtreelock);
-    in->filelock.set_state(p->filelock);
+    in->set_replica_nonce(p->second.nonce);
+    in->authlock.set_state(p->second.authlock);
+    in->linklock.set_state(p->second.linklock);
+    in->dirfragtreelock.set_state(p->second.dirfragtreelock);
+    in->filelock.set_state(p->second.filelock);
     dout(10) << " got " << *in << endl;
   }
 
-  delete m;
-
   // done?
   rejoin_ack_gather.erase(from);
   if (rejoin_ack_gather.empty()) {
     dout(7) << "all done, going active!" << endl;
+    send_cache_rejoin_acks();
+
     show_subtrees();
     show_cache();
     mds->set_want_state(MDSMap::STATE_ACTIVE);
@@ -1565,6 +1717,68 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
 }
 
 
+void MDCache::handle_cache_rejoin_missing(MMDSCacheRejoin *m)
+{
+  dout(7) << "handle_cache_rejoin_missing from " << m->get_source() << endl;
+
+  MMDSCacheRejoin *full = new MMDSCacheRejoin(MMDSCacheRejoin::OP_FULL);
+
+  // dirs
+  for (set<dirfrag_t>::iterator p = m->weak_dirfrags.begin();
+       p != m->weak_dirfrags.end();
+       ++p) {
+    CDir *dir = get_dirfrag(*p);
+    assert(dir);
+    dout(10) << " sending " << *dir << endl;
+    
+    // dentries
+    for (set<string>::iterator q = m->weak_dentries[*p].begin();
+        q != m->weak_dentries[*p].end();
+        ++q) {
+      CDentry *dn = dir->lookup(*q);
+      assert(dn);
+      dout(10) << " sending " << *dn << endl;
+      if (mds->is_rejoin())
+       full->add_weak_dentry(*p, *q);
+      else
+       full->add_strong_dentry(*p, *q, dn->get_replica_nonce(), dn->lock.get_state());
+    }
+  }
+    
+  // inodes
+  for (set<inodeno_t>::iterator p = m->weak_inodes.begin();
+       p != m->weak_inodes.end();
+       ++p) {
+    CInode *in = get_inode(*p);
+    assert(in);
+    
+    dout(10) << " sending " << *in << endl;
+    full->add_full_inode(in->inode, in->symlink, in->dirfragtree);
+    if (mds->is_rejoin())
+      full->add_weak_inode(in->ino());
+    else
+      full->add_strong_inode(in->ino(),
+                            in->get_replica_nonce(),
+                            in->get_caps_wanted(),
+                            in->authlock.get_replica_state(), 
+                            in->linklock.get_replica_state(), 
+                            in->dirfragtreelock.get_replica_state(), 
+                            in->filelock.get_replica_state());
+  }
+
+  mds->send_message_mds(full, m->get_source().num(), MDS_PORT_CACHE);
+}
+
+void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *m)
+{
+  assert(0); // write me
+}
+
+void MDCache::send_cache_rejoin_acks()
+{
+  dout(7) << "send_cache_rejoin_acks to " << want_rejoin_ack << endl;
+  
+}
 
 
 
@@ -2398,9 +2612,11 @@ void MDCache::dispatch(Message *m)
   case MSG_MDS_CACHEREJOIN:
     handle_cache_rejoin((MMDSCacheRejoin*)m);
     break;
+    /*
   case MSG_MDS_CACHEREJOINACK:
     handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m);
     break;
+    */
 
 
   case MSG_MDS_DISCOVER:
index fb62b48564089efcd9712fe4ef11298ecd3cb5af..052f380cd381604019d103ab98ea3e32fb6dcd58 100644 (file)
@@ -243,21 +243,29 @@ public:
 
   // -- recovery --
 protected:
+  set<int> recovery_set;
+
   // from EImportStart w/o EImportFinish during journal replay
   map<dirfrag_t, list<dirfrag_t> >            my_ambiguous_imports;  
   // from MMDSImportMaps
   map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;  
 
-  set<int> recovery_set;
   set<int> wants_import_map;   // nodes i need to send my import map to
   set<int> got_import_map;     // nodes i got import_maps from
-  set<int> rejoin_ack_gather;  // nodes i need a rejoin ack from
   
   void handle_import_map(MMDSImportMap *m);
-  void handle_cache_rejoin(MMDSCacheRejoin *m);
-  void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m);
   void disambiguate_imports();
+
+  set<int> rejoin_gather;      // nodes from whom i need a rejoin
+  set<int> rejoin_ack_gather;  // nodes from whom i need a rejoin ack
+  set<int> want_rejoin_ack;    // nodes to whom i need to send a rejoin ack
+
   void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin);
+  void handle_cache_rejoin(MMDSCacheRejoin *m);
+  void handle_cache_rejoin_rejoin(MMDSCacheRejoin *m);
+  void handle_cache_rejoin_ack(MMDSCacheRejoin *m);
+  void handle_cache_rejoin_missing(MMDSCacheRejoin *m);
+  void handle_cache_rejoin_full(MMDSCacheRejoin *m);
   void send_cache_rejoin_acks();
   void recalc_auth_bits();
 
index f030d8b76c286481ba3b7b0cfe4b4a470962744b..f577e29417e1e1e659c61041532e2f96fc63484b 100644 (file)
@@ -129,8 +129,7 @@ off_t MDLog::get_write_pos()
 
 
 
-void MDLog::submit_entry( LogEvent *le,
-                         Context *c ) 
+void MDLog::submit_entry( LogEvent *le, Context *c ) 
 {
   if (g_conf.mds_log) {
     dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *le << endl;
index 4383fd6b6307771c0ad7de64b4109ea70ecfed1a..f06a2eea21427736414403ca86c5d34674b2ac12 100644 (file)
@@ -102,9 +102,13 @@ class MDLog {
   friend class MDCache;
 
   void init_journaler();
+ public:
+  void add_import_map_expire_waiter(Context *c) {
+    import_map_expire_waiters.push_back(c);
+  }
+
 
 
- public:
   // replay state
   map<inodeno_t, set<inodeno_t> >   pending_exports;
 
index 450797605f28402f649866da89e53724592f1abb..1bcdda631abb113105323dee8eb927bd5f38ce19 100644 (file)
@@ -552,10 +552,12 @@ void Migrator::export_frozen(CDir *dir)
   // include spanning tree for all nested exports.
   // these need to be on the destination _before_ the final export so that
   // dir_auth updates on any nested exports are properly absorbed.
+  // this includes inodes and dirfrags included in the subtree, but
+  // only the inodes at the bounds.
   set<inodeno_t> inodes_added;
 
-  // include base dir
-  prep->add_dir( new CDirDiscover(dir, dir->add_replica(dest)) );
+  // include base dirfrag
+  prep->add_dirfrag( new CDirDiscover(dir, dir->add_replica(dest)) );
   
   // check bounds
   for (set<CDir*>::iterator it = bounds.begin();
@@ -580,23 +582,20 @@ void Migrator::export_frozen(CDir *dir)
       // don't repeat ourselves
       if (inodes_added.count(cur->ino())) break;   // did already!
       inodes_added.insert(cur->ino());
-      
-      CDir *parent_dir = cur->get_parent_dir();
 
-      // inode?
+      // inode
       assert(cur->inode->is_auth());
       inode_trace.push_front(cur->inode);
       dout(7) << "  will add " << *cur->inode << endl;
       
-      // include dir?
-      // note: don't replicate ambiguous auth items!  they're
-      //    frozen anyway.
-      if (cur->is_auth() && !cur->is_ambiguous_auth()) {
-        prep->add_dir( new CDirDiscover(cur, cur->add_replica(dest)) );  // yay!
+      // include the dirfrag?  only if it's not the bounding subtree root.
+      if (cur != bound) {
+       assert(cur->is_auth());
+        prep->add_dirfrag( new CDirDiscover(cur, cur->add_replica(dest)) );  // yay!
         dout(7) << "  added " << *cur << endl;
       }
       
-      cur = parent_dir;      
+      cur = cur->get_parent_dir();
     }
 
     for (list<CInode*>::iterator it = inode_trace.begin();
index d8fb638c47adaf0b6bbf5bdd9bbad03305f05010..7a84a758376f9e59bede423eca10325564ce839f 100644 (file)
@@ -35,6 +35,7 @@
 #include "events/EString.h"
 #include "events/EUpdate.h"
 #include "events/EMount.h"
+#include "events/EOpen.h"
 
 #include "include/filepath.h"
 #include "common/Timer.h"
@@ -2568,6 +2569,13 @@ void Server::_do_open(MDRequest *mdr, CInode *cur)
   reply->set_file_caps_seq(cap->get_last_seq());
   reply->set_file_data_version(fdv);
   reply_request(mdr, reply, cur);
+
+  // journal?
+  if (cur->last_open_journaled == 0) {
+    cur->last_open_journaled = mdlog->get_write_pos();
+    mdlog->submit_entry(new EOpen(cur));
+  }
+
 }
 
 
index 3062809428b9d7b484d063a3b1aec8daf84aa5d0..cd27319e99c63f9101d4e56cf3daed19d4c7111c 100644 (file)
@@ -38,13 +38,15 @@ inline const char *get_lock_type_name(int t) {
 }
 
 // -- lock states --
+#define LOCK_UNDEF    0
 //                                auth   rep
-#define LOCK_SYNC     0  // AR   R .    R .
-#define LOCK_LOCK     1  // AR   R W    . .
-#define LOCK_GLOCKR   2  // AR   R .    . .
+#define LOCK_SYNC     1  // AR   R .    R .
+#define LOCK_LOCK     2  // AR   R W    . .
+#define LOCK_GLOCKR   3  // AR   R .    . .
 
 inline const char *get_simplelock_state_name(int n) {
   switch (n) {
+  case LOCK_UNDEF: return "undef";
   case LOCK_SYNC: return "sync";
   case LOCK_LOCK: return "lock";
   case LOCK_GLOCKR: return "glockr";
@@ -242,8 +244,9 @@ public:
 
 inline ostream& operator<<(ostream& out, SimpleLock& l) 
 {
-  out << "(" << get_lock_type_name(l.get_type())
-      << " " << get_simplelock_state_name(l.get_state());
+  out << "(";
+  //out << get_lock_type_name(l.get_type()) << " ";
+  out << get_simplelock_state_name(l.get_state());
   if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set();
   if (l.get_num_rdlock()) 
     out << " r=" << l.get_num_rdlock();
diff --git a/branches/sage/cephmds2/mds/events/EOpen.h b/branches/sage/cephmds2/mds/events/EOpen.h
new file mode 100644 (file)
index 0000000..729fea9
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_EOPEN_H
+#define __MDS_EOPEN_H
+
+#include "../LogEvent.h"
+#include "EMetaBlob.h"
+
+class EOpen : public LogEvent {
+public:
+  EMetaBlob metablob;
+  inodeno_t ino;
+
+  EOpen() : LogEvent(EVENT_OPEN) { }
+  EOpen(CInode *in) : LogEvent(EVENT_OPEN),
+                     ino(in->ino()) {
+    metablob.add_primary_dentry(in->get_parent_dn(), false);
+  }
+  void print(ostream& out) {
+    out << "open " << metablob;
+  }
+
+  void encode_payload(bufferlist& bl) {
+    ::_encode(ino, bl);
+    metablob._encode(bl);
+  } 
+  void decode_payload(bufferlist& bl, int& off) {
+    ::_decode(ino, bl, off);
+    metablob._decode(bl, off);
+  }
+
+  bool has_expired(MDS *mds);
+  void expire(MDS *mds, Context *c);
+  void replay(MDS *mds);
+};
+
+#endif
diff --git a/branches/sage/cephmds2/mds/events/EUnlink.h b/branches/sage/cephmds2/mds/events/EUnlink.h
deleted file mode 100644 (file)
index 7d97248..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef __EUNLINK_H
-#define __EUNLINK_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "EMetaBlob.h"
-
-#include "../CInode.h"
-#include "../CDentry.h"
-#include "../CDir.h"
-
-/// help rewrite me
-
-class EUnlink : public LogEvent {
- protected:
-  version_t dirv;
-  string dname;
-
- public:
-  EMetaBlob metaglob;
-  
-  /*
-  EUnlink(CDir *dir, CDentry* dn, CInode *in) :
-    LogEvent(EVENT_UNLINK),
-    diritrace(dir->inode), 
-    dirv(dir->get_version()),
-    dname(dn->get_name()),
-    inodetrace(in) {}
-  */
-  EUnlink() : LogEvent(EVENT_UNLINK) { }
-  
-  virtual void encode_payload(bufferlist& bl) {
-  /*
-    diritrace.encode(bl);
-    bl.append((char*)&dirv, sizeof(dirv));
-    ::_encode(dname, bl);
-    inodetrace.encode(bl);
-  */
-  }
-  void decode_payload(bufferlist& bl, int& off) {
-    /*
-    diritrace.decode(bl,off);
-    bl.copy(off, sizeof(dirv), (char*)&dirv);
-    off += sizeof(dirv);
-    ::_decode(dname, bl, off);
-    inodetrace.decode(bl, off);
-  */
-  }
-  
-  bool has_expired(MDS *mds);
-  void expire(MDS *mds, Context *c);
-  void replay(MDS *mds);
-};
-
-#endif
index d1e8e0cd178f75953f40108b8ca7ebfb6c4d2d8d..73e13dcab52d282a143fd25acbd10b4fa890964d 100644 (file)
  */
 
 #include "events/EString.h"
+#include "events/EImportMap.h"
+#include "events/EMount.h"
+#include "events/EClientMap.h"
 
 #include "events/EMetaBlob.h"
-#include "events/EAlloc.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
+
 #include "events/EUpdate.h"
 #include "events/ESlaveUpdate.h"
-#include "events/EImportMap.h"
-
-#include "events/EMount.h"
-#include "events/EClientMap.h"
+#include "events/EOpen.h"
 
+#include "events/EAlloc.h"
 #include "events/EPurgeFinish.h"
-#include "events/EUnlink.h"
+
 #include "events/EExport.h"
 #include "events/EImportStart.h"
 #include "events/EImportFinish.h"
 
+#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
+
 #include "MDS.h"
 #include "MDLog.h"
 #include "MDCache.h"
@@ -633,6 +635,51 @@ void EUpdate::replay(MDS *mds)
 }
 
 
+// ------------------------
+// EOpen
+
+bool EOpen::has_expired(MDS *mds)
+{
+  CInode *in = mds->mdcache->get_inode(ino);
+  if (!in) return true;
+  if (!in->is_any_caps()) return true;
+  if (in->last_open_journaled > get_start_off() ||
+      in->last_open_journaled == 0) return true;
+  return false;
+}
+
+void EOpen::expire(MDS *mds, Context *c)
+{
+  CInode *in = mds->mdcache->get_inode(ino);
+  assert(in);
+
+  dout(10) << "EOpen.expire " << ino
+          << " last_open_journaled " << in->last_open_journaled << endl;
+  
+  // wait?
+  // FIXME this is stupid.
+  if (in->last_open_journaled == get_start_off()) {
+    //||
+    //(get_start_off() < mds->mdlog->last_import_map &&
+    //in->last_open_journaled < mds->mdlog->last_import_map)) {
+    dout(10) << "waiting." << endl;
+    // wait
+    mds->mdlog->add_import_map_expire_waiter(c);
+  } else {
+    // rejournal now.
+    dout(10) << "rejournaling" << endl;
+    in->last_open_journaled = mds->mdlog->get_write_pos();
+    mds->mdlog->submit_entry(new EOpen(in));
+  }
+}
+
+void EOpen::replay(MDS *mds)
+{
+  dout(10) << "EOpen.replay " << ino << endl;
+  metablob.replay(mds);
+}
+
+
 // -----------------------
 // EUpdate
 
@@ -717,48 +764,6 @@ void EImportMap::replay(MDS *mds)
 
 
 
-// -----------------------
-// EUnlink
-
-bool EUnlink::has_expired(MDS *mds)
-{
-  /*
-  // dir
-  CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino );
-  CDir *dir = 0;
-  if (diri) dir = diri->dir;
-
-  if (dir && dir->get_last_committed_version() < dirv) return false;
-
-  if (!inodetrace.trace.empty()) {
-    // inode
-    CInode *in = mds->mdcache->get_inode( inodetrace.back().inode.ino );
-    if (in && in->get_last_committed_version() < inodetrace.back().inode.version)
-      return false;
-  }
-  */
-  return true;
-}
-
-void EUnlink::expire(MDS *mds, Context *c)
-{
-  /*
-  CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino );
-  CDir *dir = diri->dir;
-  assert(dir);
-  
-  // okay!
-  dout(7) << "commiting dirty (from unlink) dir " << *dir << endl;
-  mds->mdstore->commit_dir(dir, dirv, c);
-  */
-}
-
-void EUnlink::replay(MDS *mds)
-{
-}
-
-
-
 
 // -----------------------
 // EPurgeFinish
index 9959089ae80b9a853554ee8b7b1043db5f11f608..fce07df7958b75124c1d61d5fd48e680197cf619 100644 (file)
@@ -34,7 +34,7 @@ class MExportDirPrep : public Message {
   map<inodeno_t,string>          inode_dentry;
 
   map<inodeno_t,list<frag_t> >   frags_by_ino;
-  map<dirfrag_t,CDirDiscover*>   dirs;
+  map<dirfrag_t,CDirDiscover*>   dirfrags;
 
   set<int>                       bystanders;
 
@@ -53,11 +53,11 @@ class MExportDirPrep : public Message {
   string& get_dentry(inodeno_t ino) {
     return inode_dentry[ino];
   }
-  bool have_dir(dirfrag_t df) {
-    return dirs.count(df);
+  bool have_dirfrag(dirfrag_t df) {
+    return dirfrags.count(df);
   }
   CDirDiscover* get_dirfrag_discover(dirfrag_t df) {
-    return dirs[df];
+    return dirfrags[df];
   }
   set<int> &get_bystanders() { return bystanders; }
 
@@ -76,8 +76,8 @@ class MExportDirPrep : public Message {
          iit != inodes.end();
          iit++)
       delete *iit;
-    for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirs.begin();
-         dit != dirs.end();
+    for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirfrags.begin();
+         dit != dirfrags.end();
          dit++) 
       delete dit->second;
   }
@@ -96,8 +96,8 @@ class MExportDirPrep : public Message {
     inode_dirfrag[in->get_ino()] = df;
     inode_dentry[in->get_ino()] = dentry;
   }
-  void add_dir(CDirDiscover *dir) {
-    dirs[dir->get_dirfrag()] = dir;
+  void add_dirfrag(CDirDiscover *dir) {
+    dirfrags[dir->get_dirfrag()] = dir;
     frags_by_ino[dir->get_dirfrag().ino].push_back(dir->get_dirfrag().frag);
   }
   void add_bystander(int who) {
@@ -143,7 +143,7 @@ class MExportDirPrep : public Message {
     for (int i=0; i<nd; i++) {
       CDirDiscover *dir = new CDirDiscover;
       dir->_decode(payload, off);
-      dirs[dir->get_dirfrag()] = dir;
+      dirfrags[dir->get_dirfrag()] = dir;
     }
     
     ::_decode(bystanders, payload, off);
@@ -174,10 +174,10 @@ class MExportDirPrep : public Message {
     }
 
     // dirs
-    int nd = dirs.size();
+    int nd = dirfrags.size();
     payload.append((char*)&nd, sizeof(int));
-    for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirs.begin();
-         dit != dirs.end();
+    for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirfrags.begin();
+         dit != dirfrags.end();
          dit++)
       dit->second->_encode(payload);
 
index f75900defd2f845558a7c7d6e85d48af291d4ebb..2d1d858958ce80d1e9f8679371d1757868a963bc 100644 (file)
 
 class MMDSCacheRejoin : public Message {
  public:
-  map<inodeno_t,int> inodes; // ino -> caps_wanted
-  set<dirfrag_t> dirfrags;
-  map<dirfrag_t, set<string> > dentries;   // dir -> (dentries...)
+  static const int OP_REJOIN  = 1;  // replica -> auth, i exist.  and maybe my lock state.
+  static const int OP_ACK     = 3;  // auth -> replica, here is your lock state.
+  static const int OP_MISSING = 4;  // auth -> replica, i am missing these items
+  static const int OP_FULL    = 5;  // replica -> auth, here is the full object.
+  static const char *get_opname(int op) {
+    switch (op) {
+    case OP_REJOIN: return "rejoin";
+    case OP_ACK: return "ack";
+    case OP_MISSING: return "missing";
+    case OP_FULL: return "full";
+    default: assert(0);
+    }
+  }
+
+  // -- types --
+  struct inode_strong { 
+    int caps_wanted;
+    int nonce;
+    int authlock;
+    int linklock;
+    int dirfragtreelock;
+    int filelock;
+    inode_strong() {}
+    inode_strong(int n, int cw=0, int a=0, int l=0, int dft=0, int f=0) : 
+      caps_wanted(cw),
+      nonce(n),
+      authlock(a), linklock(l), dirfragtreelock(dft), filelock(f) { }
+  };
+  struct inode_full {
+    inode_t inode;
+    string symlink;
+    fragtree_t dirfragtree;
+    inode_full() {}
+    inode_full(const inode_t& i, const string& s, const fragtree_t& f) :
+      inode(i), symlink(s), dirfragtree(f) {}
+    inode_full(bufferlist& bl, int& off) {
+      ::_decode(inode, bl, off);
+      ::_decode(symlink, bl, off);
+      ::_decode(dirfragtree, bl, off);
+    }
+    void _encode(bufferlist& bl) {
+      ::_encode(inode, bl);
+      ::_encode(symlink, bl);
+      ::_encode(dirfragtree, bl);
+    }
+  };
+  struct inode_xlock {
+    inodeno_t ino;
+    int locktype;
+    metareqid_t reqid;
+    inode_xlock() {}
+    inode_xlock(inodeno_t i, int lt, const metareqid_t& ri) :
+      ino(i), locktype(lt), reqid(ri) {}
+  };
+
+  struct dirfrag_strong {
+    int nonce;
+    dirfrag_strong() {}
+    dirfrag_strong(int n) : nonce(n) {}
+  };
+  struct dn_strong {
+    int nonce;
+    int lock;
+    dn_strong() {}
+    dn_strong(int n, int l) : nonce(n), lock(l) {}
+  };
+
+  // -- data --
+  int op;
+
+  set<inodeno_t> weak_inodes;
+  map<inodeno_t, inode_strong> strong_inodes;
+  list<inode_full> full_inodes;
+  list<inode_xlock> xlocked_inodes;
+
+  set<dirfrag_t> weak_dirfrags;
+  map<dirfrag_t, dirfrag_strong> strong_dirfrags;
+
+  map<dirfrag_t, set<string> > weak_dentries;
+  map<dirfrag_t, map<string, dn_strong> > strong_dentries;
+  map<dirfrag_t, map<string, metareqid_t> > xlocked_dentries;
 
   MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {}
+  MMDSCacheRejoin(int o) : 
+    Message(MSG_MDS_CACHEREJOIN),
+    op(o) {}
 
   char *get_type_name() { return "cache_rejoin"; }
+  void print(ostream& out) {
+    out << "cache_rejoin " << get_opname(op);
+  }
 
-  void add_dirfrag(dirfrag_t dirfrag) {
-    dirfrags.insert(dirfrag);
+  // -- builders --
+  // inodes
+  void add_weak_inode(inodeno_t ino) {
+    weak_inodes.insert(ino);
   }
-  void add_dentry(dirfrag_t dirfrag, const string& dn) {
-    dentries[dirfrag].insert(dn);
+  void add_strong_inode(inodeno_t i, int n, int cw, int a, int l, int dft, int f) {
+    strong_inodes[i] = inode_strong(n, cw, a, l, dft, f);
   }
-  void add_inode(inodeno_t ino, int cw) {
-    inodes[ino] = cw;
+  void add_full_inode(inode_t &i, const string& s, const fragtree_t &f) {
+    full_inodes.push_back(inode_full(i, s, f));
+  }
+  void add_inode_xlock(inodeno_t ino, int lt, const metareqid_t& ri) {
+    xlocked_inodes.push_back(inode_xlock(ino, lt, ri));
   }
   
+  // dirfrags
+  void add_weak_dirfrag(dirfrag_t df) {
+    weak_dirfrags.insert(df);
+  }
+  void add_strong_dirfrag(dirfrag_t df, int n) {
+    strong_dirfrags[df] = dirfrag_strong(n);
+  }
+   
+  // dentries
+  void add_weak_dentry(dirfrag_t df, const string& dname) {
+    weak_dentries[df].insert(dname);
+  }
+  void add_strong_dentry(dirfrag_t df, const string& dname, int n, int ls) {
+    strong_dentries[df][dname] = dn_strong(n, ls);
+  }
+  void add_dentry_xlock(dirfrag_t df, const string& dname, const metareqid_t& ri) {
+    xlocked_dentries[df][dname] = ri;
+  }
+
+  // -- encoding --
   void encode_payload() {
-    ::_encode(inodes, payload);
-    ::_encode(dirfrags, payload);
-    for (set<dirfrag_t>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p)
-      ::_encode(dentries[*p], payload);
+    ::_encode(weak_inodes, payload);
+    ::_encode(strong_inodes, payload);
+
+    __uint32_t nfull = full_inodes.size();
+    ::_encode(nfull, payload);
+    for (list<inode_full>::iterator p = full_inodes.begin(); p != full_inodes.end(); ++p)
+      p->_encode(payload);
+
+    ::_encode(xlocked_inodes, payload);
+    ::_encode(weak_dirfrags, payload);
+    //::_encode(strong_dirfrags, payload);
+    ::_encode(weak_dentries, payload);
+    ::_encode(strong_dentries, payload);
+    ::_encode(xlocked_dentries, payload);
   }
   void decode_payload() {
     int off = 0;
-    ::_decode(inodes, payload, off);
-    ::_decode(dirfrags, payload, off);
-    for (set<dirfrag_t>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p)
-      ::_decode(dentries[*p], payload, off);
+    ::_decode(weak_inodes, payload, off);
+    ::_decode(strong_inodes, payload, off);
+
+    __uint32_t nfull;
+    ::_decode(nfull, payload, off);
+    for (unsigned i=0; i<nfull; i++) 
+      full_inodes.push_back(inode_full(payload, off));
+
+    ::_decode(xlocked_inodes, payload, off);
+    ::_decode(weak_dirfrags, payload, off);
+    //::_decode(strong_dirfrags, payload, off);
+    ::_decode(weak_dentries, payload, off);
+    ::_decode(strong_dentries, payload, off);
+    ::_decode(xlocked_dentries, payload, off);
   }
+
 };
 
 #endif
index facf6f2423c4055209a9286ac1956494a0778b75..8bdb5f173bba04da99ae6c28ab5bd1e9de347f8b 100644 (file)
@@ -59,7 +59,7 @@ using namespace std;
 #include "messages/MMDSBeacon.h"
 #include "messages/MMDSImportMap.h"
 #include "messages/MMDSCacheRejoin.h"
-#include "messages/MMDSCacheRejoinAck.h"
+//#include "messages/MMDSCacheRejoinAck.h"
 
 #include "messages/MDirUpdate.h"
 #include "messages/MDiscover.h"
@@ -259,9 +259,11 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_MDS_CACHEREJOIN:
        m = new MMDSCacheRejoin;
        break;
+       /*
   case MSG_MDS_CACHEREJOINACK:
        m = new MMDSCacheRejoinAck;
        break;
+       */
 
   case MSG_MDS_DIRUPDATE:
     m = new MDirUpdate();
index ee88ed69ac289b8ed3181ee40df20ced541e5898..4c758f4194608dd33ae0501182815b16b653f37c 100644 (file)
@@ -93,7 +93,7 @@
 
 #define MSG_MDS_IMPORTMAP          106
 #define MSG_MDS_CACHEREJOIN        107
-#define MSG_MDS_CACHEREJOINACK     108
+//#define MSG_MDS_CACHEREJOINACK     108
 
 #define MSG_MDS_DISCOVER           110
 #define MSG_MDS_DISCOVERREPLY      111