]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* mds.server: more rewriting, now we explicitly rdlock path always, for proper auth...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 30 Mar 2007 16:10:41 +0000 (16:10 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 30 Mar 2007 16:10:41 +0000 (16:10 +0000)
* mds: remote inode link groundwork, just need remote xlocks
* mds.server: rewrote open O_TRUNC path

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1325 29311d96-e01e-0410-9327-a35deaab8ce9

16 files changed:
branches/sage/cephmds2/TODO
branches/sage/cephmds2/mds/CDir.cc
branches/sage/cephmds2/mds/CDir.h
branches/sage/cephmds2/mds/CInode.cc
branches/sage/cephmds2/mds/CInode.h
branches/sage/cephmds2/mds/Locker.cc
branches/sage/cephmds2/mds/LogEvent.cc
branches/sage/cephmds2/mds/LogEvent.h
branches/sage/cephmds2/mds/MDCache.cc
branches/sage/cephmds2/mds/MDCache.h
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/mds/Server.h
branches/sage/cephmds2/mds/events/ESlaveUpdate.h [new file with mode: 0644]
branches/sage/cephmds2/mds/journal.cc
branches/sage/cephmds2/messages/MClientRequest.h
branches/sage/cephmds2/messages/MInodeLink.h

index 5623e46d39be8d22df0c484d7c1fb405908db15a..35e09b7e41d49c40b7b5fddd3c34f9f6bd5d3b97 100644 (file)
@@ -38,8 +38,6 @@ mds
     - or, only put frag size/mtime in inode when frag is closed.  otherwise, soft (journaled) state, possibly on another mds.
   - 
 
-- fix auth_pin/export vs mds recovery deadlock: unfrozen exports should abort.  maybe delay exportdiscover until after frozen?
-
 - discover
 /  - hard link dentries
   - open_remote_ino needs major work...
@@ -59,26 +57,24 @@ mds
   - journaled bit in MDRequest
   - journal 
 
-/untested- truncate
-
+/untested- truncate...
 
 - mds failure vs clients
-/  - clean up client op redirection
   - idempotent client ops
     - how to track in memory?
     - how to trim?
     - EMetablob replay, expire logic
 - journal+recovery
-/  - test anchortable, anchorclient
-/  - local link
-/  - local unlink
   - local rename
 /    - fix dir renames vs subtrees
     - how to notify replicas...
 /  - stray purge
   - stray reintegration
   - remote link
+    - impl remote inode xlock
+    - ESlaveUpdate replay, resolution, etc.
   - remote unlink
+    - rewrite to look link _link
   - remote rename
   - open(wr cap), open+create
   - file capabilities i/o
@@ -86,16 +82,11 @@ mds
   - client readdir for dirfrags
 - consistency points/snapshots
   - dentry versions vs dirfrags...
-- sync clients on stat
-  - will need to ditch 10s client metadata caching before this is useful
-  - implement truncate
 - real chdir (directory "open")
   - relative metadata ops
 - statfs?
 
 
-foreign link
-- 
 
 foreign rename
 - question: can we generalize foreign and local rename?
index 1237bd2af252b2c5980969f2b5f31df420ac3262..6281d3102800b2371797c73841992414e334448d 100644 (file)
@@ -275,9 +275,6 @@ void CDir::link_inode_work( CDentry *dn, CInode *in )
   // set inode version
   //in->inode.version = dn->get_version();
   
-  // clear dangling
-  in->state_clear(CInode::STATE_DANGLING);
-
   // pin dentry?
   if (in->get_num_ref())
     dn->get(CDentry::PIN_INODEPIN);
@@ -320,10 +317,6 @@ void CDir::unlink_inode_work( CDentry *dn )
     // primary
     assert(dn->is_primary());
  
-    // explicitly define auth
-    in->dangling_auth = in->authority();
-    //dout(10) << "unlink_inode " << *in << " dangling_auth now " << in->dangling_auth << endl;
-
     // unpin dentry?
     if (in->get_num_ref())
       dn->put(CDentry::PIN_INODEPIN);
@@ -332,9 +325,6 @@ void CDir::unlink_inode_work( CDentry *dn )
     if (in->auth_pins + in->nested_auth_pins)
       adjust_nested_auth_pins( 0 - (in->auth_pins + in->nested_auth_pins) );
     
-    // set dangling flag
-    in->state_set(CInode::STATE_DANGLING);
-
     // detach inode
     in->remove_primary_parent(dn);
     dn->inode = 0;
index 7f3c68a97929fcec86ea546934c492b4b988cccc..27d4902fa1ae8af461eecf2152896a0d32bcc057 100644 (file)
@@ -156,10 +156,10 @@ class CDir : public MDSCacheObject {
 
 
   // -- wait masks --
-  static const int WAIT_DENTRY        = (1<<0);  // wait for item to be in cache
-  static const int WAIT_COMPLETE      = (1<<1);  // wait for complete dir contents
-  static const int WAIT_FREEZEABLE    = (1<<2);  // hard_pins removed
-  static const int WAIT_UNFREEZE      = (1<<3);  // unfreeze
+  static const int WAIT_DENTRY       = (1<<0);  // wait for item to be in cache
+  static const int WAIT_COMPLETE     = (1<<1);  // wait for complete dir contents
+  static const int WAIT_FREEZEABLE   = (1<<2);  // hard_pins removed
+  static const int WAIT_UNFREEZE     = (1<<3);  // unfreeze
   static const int WAIT_AUTHPINNABLE = WAIT_UNFREEZE;
   static const int WAIT_IMPORTED     = (1<<4);  // import finish
   static const int WAIT_SINGLEAUTH   = (1<<5); 
index 9c54939b018bcc6f371ed1aa46e08e5b846a0e54..67af1751904d48610a7079c65834cc796d464fea 100644 (file)
@@ -240,10 +240,6 @@ void CInode::close_dirfrags()
 
 void CInode::set_auth(bool a) 
 {
-  if (!is_dangling() && !is_root() && 
-      is_auth() != a) {
-  }
-  
   if (a) state_set(STATE_AUTH);
   else state_clear(STATE_AUTH);
 }
@@ -273,12 +269,6 @@ void CInode::make_anchor_trace(vector<Anchor>& trace)
     trace.push_back(Anchor(ino(), parent->dir->dirfrag()));
     dout(10) << "make_anchor_trace added " << trace.back() << endl;
   }
-  else if (state_test(STATE_DANGLING)) {
-    dout(10) << "make_anchor_trace dangling " << ino() << " on mds " << dangling_auth << endl;
-    assert(0);
-    //trace.push_back( Anchor(ino(),
-    //MDS_INO_INODEFILE_OFFSET+dangling_auth.first) );
-  }
   else 
     assert(is_root());
 }
@@ -512,9 +502,6 @@ void CInode::adjust_nested_auth_pins(int a)
 
 pair<int,int> CInode::authority() 
 {
-  if (is_dangling()) 
-    return dangling_auth;      // explicit
-
   if (is_root())
     return CDIR_AUTH_ROOTINODE;  // root _inode_ is locked to mds0.
 
index b51c2bb77e5bf4768ea28589d60fd5e1c42ca1a3..6eef2a98182051f5aac6e60dbb57c38059b52c6f 100644 (file)
@@ -92,9 +92,8 @@ class CInode : public MDSCacheObject {
   static const int STATE_AUTH =       (1<<0);
   static const int STATE_ROOT =       (1<<1);
   static const int STATE_DIRTY =      (1<<2);
-  static const int STATE_UNSAFE =     (1<<3);   // not logged yet
-  static const int STATE_DANGLING =   (1<<4);   // delete me when i expire; i have no dentry
-  static const int STATE_UNLINKING =  (1<<5);
+  //static const int STATE_UNSAFE =     (1<<3);   // not logged yet
+  //static const int STATE_DANGLING =   (1<<4);   // delete me when i expire; i have no dentry
   static const int STATE_EXPORTING =  (1<<6);   // on nonauth bystander.
   static const int STATE_ANCHORING =  (1<<7);
   static const int STATE_UNANCHORING = (1<<8);
@@ -103,35 +102,30 @@ class CInode : public MDSCacheObject {
   //static const int STATE_RENAMINGTO = (1<<9);  // rename target (will be unlinked)
 
   // -- waiters --
-  static const int WAIT_AUTHPINNABLE  = (1<<10);
-    // waiters: write_hard_start, read_file_start, write_file_start  (mdcache)
-    //          handle_client_chmod, handle_client_touch             (mds)
-    // trigger: (see CDIR_WAIT_UNFREEZE)
-  static const int WAIT_SINGLEAUTH  = (1<<11);
-
-  static const int WAIT_DIR          = (1<<13);
-    // waiters: traverse_path
-    // triggers: handle_disocver_reply
-  static const int WAIT_LINK        = (1<<14);  // as in remotely nlink++
-  static const int WAIT_ANCHORED    = (1<<15);
-  static const int WAIT_UNANCHORED  = (1<<16);
-  static const int WAIT_UNLINK      = (1<<17);  // as in remotely nlink--
-  static const int WAIT_HARDR       = (1<<18);  // 131072
-  static const int WAIT_HARDW       = (1<<19);  // 262...
-  static const int WAIT_HARDB       = (1<<20);
+  static const int WAIT_SLAVEAGREE  = (1<<0);
+  static const int WAIT_AUTHPINNABLE = (1<<1);
+  static const int WAIT_SINGLEAUTH  = (1<<2);
+  static const int WAIT_DIR         = (1<<3);
+  static const int WAIT_LINK        = (1<<4);  // as in remotely nlink++
+  static const int WAIT_ANCHORED    = (1<<5);
+  static const int WAIT_UNANCHORED  = (1<<6);
+  static const int WAIT_UNLINK      = (1<<7);  // as in remotely nlink--
+  static const int WAIT_HARDR       = (1<<8);  // 131072
+  static const int WAIT_HARDW       = (1<<9);  // 262...
+  static const int WAIT_HARDB       = (1<<10);
   static const int WAIT_HARDRWB     = (WAIT_HARDR|WAIT_HARDW|WAIT_HARDB);
-  static const int WAIT_HARDSTABLE  = (1<<21);
-  static const int WAIT_HARDNORD    = (1<<22);
-  static const int WAIT_FILER       = (1<<23);
-  static const int WAIT_FILEW       = (1<<24);
-  static const int WAIT_FILEB       = (1<<25);
+  static const int WAIT_HARDSTABLE  = (1<<11);
+  static const int WAIT_HARDNORD    = (1<<12);
+  static const int WAIT_FILER       = (1<<13);
+  static const int WAIT_FILEW       = (1<<14);
+  static const int WAIT_FILEB       = (1<<15);
   static const int WAIT_FILERWB     = (WAIT_FILER|WAIT_FILEW|WAIT_FILEB);
-  static const int WAIT_FILESTABLE  = (1<<26);
-  static const int WAIT_FILENORD    = (1<<27);
-  static const int WAIT_FILENOWR    = (1<<28);
-  static const int WAIT_RENAMEACK       =(1<<29);
-  static const int WAIT_RENAMENOTIFYACK =(1<<30);
-  static const int WAIT_CAPS            =(1<<31);
+  static const int WAIT_FILESTABLE  = (1<<16);
+  static const int WAIT_FILENORD    = (1<<17);
+  static const int WAIT_FILENOWR    = (1<<18);
+  static const int WAIT_RENAMEACK       =(1<<19);
+  static const int WAIT_RENAMENOTIFYACK =(1<<20);
+  static const int WAIT_CAPS            =(1<<21);
   static const int WAIT_ANY           = 0xffffffff;
 
   // misc
@@ -171,7 +165,7 @@ class CInode : public MDSCacheObject {
 
   // -- other --
   // distributed caching (old)
-  pair<int,int> dangling_auth;    // explicit auth, when dangling.
+  //pair<int,int> dangling_auth;    // explicit auth, when dangling.
 
   // waiters
   multimap<int, Context*>  waiting;
@@ -248,20 +242,7 @@ protected:
   void name_stray_dentry(string& dname);
 
 
-
-  // -- state --
-  bool is_unsafe() { return state & STATE_UNSAFE; }
-  bool is_dangling() { return state & STATE_DANGLING; }
-  bool is_unlinking() { return state & STATE_UNLINKING; }
-
-  void mark_unsafe() { state |= STATE_UNSAFE; }
-  void mark_safe() { state &= ~STATE_UNSAFE; }
-
   // -- state encoding --
-  //void encode_basic_state(bufferlist& r);
-  //void decode_basic_state(bufferlist& r, int& off);
-
-
   void encode_file_state(bufferlist& r);
   void decode_file_state(bufferlist& r, int& off);
 
index 83761feee9a5d582a07ea18e62b69438aa3c8d1f..714d52ba9ee6f6fff272671f37b9e52f3eb1a6f0 100644 (file)
 #include "MDS.h"
 #include "MDCache.h"
 #include "Locker.h"
-#include "Server.h"
 #include "CInode.h"
 #include "CDir.h"
 #include "CDentry.h"
-#include "Migrator.h"
 
-#include "MDBalancer.h"
 #include "MDLog.h"
 #include "MDSMap.h"
 
@@ -2468,8 +2465,9 @@ void Locker::handle_lock_dn(MLock *m)
     if (dn->gather_set.size() == 0) {
       dout(7) << "handle_lock_dn finish gather, now xlock on " << *dn << endl;
       dn->lockstate = DN_LOCK_XLOCK;
-      mdcache->active_requests[dn->xlockedby->reqid].dentry_xlocks.insert(dn);
-      mdcache->active_requests[dn->xlockedby->reqid].dentry_locks.insert(dn);
+      // FIXME
+      mdcache->slave_requests[dn->xlockedby->reqid]->dentry_xlocks.insert(dn);
+      mdcache->slave_requests[dn->xlockedby->reqid]->dentry_locks.insert(dn);
       dir->finish_waiting(CDir::WAIT_DNLOCK, dname);
     }
     break;
index 4fc9101548f11f601a489d83363a111130770970..43ac0ec9647341cd6ae0a513bccc756833266db3 100644 (file)
@@ -20,6 +20,7 @@
 #include "events/EImportMap.h"
 #include "events/EMetaBlob.h"
 #include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
 #include "events/EUnlink.h"
 #include "events/EMount.h"
 #include "events/EClientMap.h"
@@ -50,6 +51,7 @@ LogEvent *LogEvent::decode(bufferlist& bl)
   case EVENT_STRING: le = new EString(); break;
   case EVENT_IMPORTMAP: le = new EImportMap; break;
   case EVENT_UPDATE: le = new EUpdate; break;
+  case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
   case EVENT_UNLINK: le = new EUnlink(); break;
   case EVENT_PURGEFINISH: le = new EPurgeFinish(); break;
   case EVENT_MOUNT: le = new EMount(); break;
index d50edd720207cda7a2a6e1d9e24f22532d9d0f33..8138ab200845502b9a971516f1c515635f934d23 100644 (file)
 
 #define EVENT_IMPORTMAP    4
 #define EVENT_UPDATE       5
+#define EVENT_SLAVEUPDATE  6
 
-#define EVENT_MOUNT        6
-#define EVENT_CLIENTMAP    7
-
-#define EVENT_ANCHOR       8
-#define EVENT_ANCHORCLIENT 9
+#define EVENT_MOUNT        7
+#define EVENT_CLIENTMAP    8
 
 #define EVENT_ALLOC        10
 #define EVENT_MKNOD        11
 #define EVENT_IMPORTSTART  31
 #define EVENT_IMPORTFINISH 32
 
+#define EVENT_ANCHOR       40
+#define EVENT_ANCHORCLIENT 41
+
+
 
 
 #include <string>
index f1801db6a1215fc5f0cdd312d810a92c1ea9bdf2..6ff40c83b7ff01692feb6aecc4de80841e1b8770 100644 (file)
@@ -39,6 +39,7 @@
 
 #include "events/EImportMap.h"
 #include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
 #include "events/EString.h"
 #include "events/EUnlink.h"
 #include "events/EPurgeFinish.h"
@@ -2414,9 +2415,6 @@ void MDCache::dispatch(Message *m)
   case MSG_MDS_INODELINK:
     handle_inode_link((MInodeLink*)m);
     break;
-  case MSG_MDS_INODELINKACK:
-    handle_inode_link_ack((MInodeLinkAck*)m);
-    break;
 
   case MSG_MDS_DIRUPDATE:
     handle_dir_update((MDirUpdate*)m);
@@ -2933,18 +2931,15 @@ void MDCache::make_trace(vector<CDentry*>& trace, CInode *in)
 
 MDRequest *MDCache::request_start(metareqid_t ri)
 {
-  assert(active_requests.count(ri) == 0);
-  active_requests[ri].reqid = ri;
-  MDRequest *mdr = &active_requests[ri];
+  MDRequest *mdr = new MDRequest(ri);
   dout(7) << "request_start " << *mdr << endl;
   return mdr;
 }
 
 MDRequest *MDCache::request_start(MClientRequest *req)
 {
-  metareqid_t ri = req->get_reqid();
-  MDRequest *mdr = request_start(ri);
-  mdr->request = req;
+  MDRequest *mdr = new MDRequest(req->get_reqid(), req);
+  dout(7) << "request_start " << *mdr << endl;
   return mdr;
 }
 
@@ -3044,7 +3039,6 @@ void MDCache::request_drop_locks(MDRequest *mdr)
 void MDCache::request_cleanup(MDRequest *mdr)
 {
   metareqid_t ri = mdr->reqid;
-  assert(active_requests.count(ri));
 
   // clear ref, trace
   mdr->ref = 0;
@@ -3073,9 +3067,6 @@ void MDCache::request_cleanup(MDRequest *mdr)
     (*it)->put(CDir::PIN_REQUEST);
   mdr->dir_pins.clear();
 
-  // remove from map
-  active_requests.erase(ri);
-
   // log some stats *****
   if (mds->logger) {
     mds->logger->set("c", lru.lru_get_size());
@@ -3426,47 +3417,81 @@ void MDCache::migrate_stray(CDentry *dn, int dest)
 // HARD LINKS
 
 
+class C_MDC_InodeLinkAgree : public Context {
+  MDS *mds;
+  MInodeLink *m;
+public:
+  C_MDC_InodeLinkAgree(MDS *_mds, MInodeLink *_m) : mds(_mds), m(_m) {}
+  void finish(int r) {
+    mds->send_message_mds(new MInodeLink(MInodeLink::OP_AGREE,
+                                        m->get_ino(), 
+                                        m->get_inc(), 
+                                        m->get_reqid()),
+                         m->get_source().num(),
+                         m->get_source_port());
+    delete m;
+  }
+};
+
 void MDCache::handle_inode_link(MInodeLink *m)
 {
   CInode *in = get_inode(m->get_ino());
   assert(in);
-
-  if (!in->is_auth()) {
-    dout(7) << "handle_inode_link not auth for " << *in << ", fw to auth" << endl;
-    mds->send_message_mds(m, in->authority().first, MDS_PORT_CACHE);
+  dout(7) << "handle_inode_link " << *m << " on " << *in << endl;
+  
+  // get request.
+  // we should have this bc the inode is xlocked.
+  assert(slave_requests.count(m->get_reqid()));
+  MDRequest *mdr = slave_requests[m->get_reqid()];
+
+  switch (m->get_op()) {
+    // auth
+  case MInodeLink::OP_PREPARE:
+    assert(in->is_auth());
+    {
+      version_t pv = in->pre_dirty();
+      ESlaveUpdate *le = new ESlaveUpdate("link_prepare", m->get_reqid(), 0);
+      le->metablob.add_dir_context(in->get_parent_dir());
+      inode_t *pi = le->metablob.add_primary_dentry(in->parent, true, in);
+      if (m->get_inc())
+       pi->nlink++;
+      else
+       pi->nlink--;
+      pi->ctime = m->get_ctime();
+      pi->version = pv;
+      mdr->projected_inode[in->ino()] = *pi;
+      mds->mdlog->submit_entry(le);
+      mds->mdlog->wait_for_sync(new C_MDC_InodeLinkAgree(mds, m));
+    }
     return;
-  }
-
-  dout(7) << "handle_inode_link on " << *in << endl;
-
-  if (!in->is_anchored()) {
-    assert(in->inode.nlink == 1);
-    dout(7) << "needs anchor, nlink=" << in->inode.nlink << ", creating anchor" << endl;
     
-    anchor_create(in, new C_MDS_RetryMessage(mds, m));
+  case MInodeLink::OP_COMMIT:
+    assert(in->is_auth());
+    {
+      // make the update to our cache
+      in->inode = mdr->projected_inode[in->ino()];
+      in->mark_dirty(in->inode.version);
+
+      // journal the commit
+      ESlaveUpdate *le = new ESlaveUpdate("link_commit", m->get_reqid(), 1);
+      mds->mdlog->submit_entry(le);
+    }
+    delete m;
     return;
-  }
-
-  in->inode.nlink++;
-  in->_mark_dirty(); // fixme
 
-  // reply
-  dout(7) << " nlink++, now " << in->inode.nlink++ << endl;
 
-  mds->send_message_mds(new MInodeLinkAck(m->get_ino(), true), m->get_from(), MDS_PORT_CACHE);
-  delete m;
+  case MInodeLink::OP_AGREE:
+    assert(!in->is_auth());
+    in->finish_waiting(CInode::WAIT_SLAVEAGREE);
+    delete m;
+    return;
+    
+  default:
+    assert(0);
+  }
 }
 
 
-void MDCache::handle_inode_link_ack(MInodeLinkAck *m) 
-{
-  CInode *in = get_inode(m->get_ino());
-  assert(in);
-
-  dout(7) << "handle_inode_link_ack success = " << m->is_success() << " on " << *in << endl;
-  in->finish_waiting(CInode::WAIT_LINK,
-                     m->is_success() ? 1:-1);
-}
 
 
 
index dcd6a7d01cf5a22ba96a5b49fd3c975f98f700a4..a07a2ea4d2849848b678182ae89d60370ae8f475 100644 (file)
@@ -93,12 +93,15 @@ struct MDRequest {
   set< CInode* >  inode_file_rdlocks;
   set< CInode* >  inode_file_xlocks;
   
+  // projected updates
+  map< inodeno_t, inode_t > projected_inode;
+
   // old
   set< CDentry* >           xlocks;           // xlocks (local)
   set< CDentry* >           foreign_xlocks;   // xlocks on foreign hosts
 
   MDRequest() : request(0), ref(0) {}
-  MDRequest(metareqid_t ri) : reqid(ri), request(0), ref(0) {}
+  MDRequest(metareqid_t ri, Message *req=0) : reqid(ri), request(req), ref(0) {}
   
   // requeest
   MClientRequest *client_request() {
@@ -239,7 +242,7 @@ public:
 
   
 protected:
-  hash_map<metareqid_t, MDRequest> active_requests; 
+  hash_map<metareqid_t, MDRequest*> slave_requests; 
   
 public:
   MDRequest* request_start(metareqid_t rid);
@@ -458,9 +461,6 @@ protected:
   void reintegrate_stray(CDentry *dn, CDentry *rlink);
   void migrate_stray(CDentry *dn, int dest);
 
-  // -- hard links --
-  void handle_inode_link(class MInodeLink *m);
-  void handle_inode_link_ack(class MInodeLinkAck *m);
 
   // == messages ==
  public:
@@ -477,11 +477,14 @@ protected:
                        list<Context*>& finished);
   CDir* forge_replica_dir(CInode *diri, frag_t fg, int from);
     
+
+  // -- hard links --
+  void handle_inode_link(class MInodeLink *m);
+
   // -- namespace --
   void handle_dentry_unlink(MDentryUnlink *m);
 
 
-
   // -- updates --
   //int send_inode_updates(CInode *in);
   //void handle_inode_update(MInodeUpdate *m);
@@ -489,6 +492,7 @@ protected:
   int send_dir_updates(CDir *in, bool bcast=false);
   void handle_dir_update(MDirUpdate *m);
 
+  // -- cache expiration --
   void handle_cache_expire(MCacheExpire *m);
   void process_delayed_expire(CDir *dir);
   void discard_delayed_expire(CDir *dir);
index 1cad12991f8d756eec772f25e369bbd59ffe2ebf..4be711cb0468753276ba3cb44d341e249f4ecc33 100644 (file)
@@ -237,94 +237,17 @@ void Server::handle_client_request(MClientRequest *req)
     break;
   }
 
+  // register + dispatch
+  MDRequest *mdr = mdcache->request_start(req);
+
   if (ref) {
-    MDRequest *mdr = mdcache->request_start(req);
     dout(10) << "inode op on ref " << *ref << endl;
     mdr->ref = ref;
     mdr->pin(ref);
-    dispatch_request(mdr);
-    return;
-  }
-
-
-  // -----
-  // some ops are on existing inodes
-
-  bool follow_trailing_symlink = false;
-  
-  switch (req->get_op()) {
-  case MDS_OP_LSTAT:
-    follow_trailing_symlink = false;
-  case MDS_OP_OPEN:
-    if (req->args.open.flags & O_CREAT) break;  // handled below.
-  case MDS_OP_STAT:
-  case MDS_OP_UTIME:
-  case MDS_OP_CHMOD:
-  case MDS_OP_CHOWN:
-  case MDS_OP_READDIR:
-    {
-      filepath refpath = req->get_filepath();
-      Context *ondelay = new C_MDS_RetryMessage(mds, req);
-      vector<CDentry*> trace;
-      
-      int r = mdcache->path_traverse(0, 0,
-                                    refpath, trace, follow_trailing_symlink,
-                                    req, ondelay,
-                                    MDS_TRAVERSE_FORWARD,
-                                    true); // is MClientRequest
-      
-      if (r > 0) return; // delayed
-      if (r < 0) {
-       dout(10) << "traverse error " << r << " " << strerror(-r) << endl;
-       
-       // send error.  don't bother registering request.
-       messenger->send_message(new MClientReply(req, r),
-                               req->get_client_inst());
-
-       // <HACK>
-       // is this a special debug command?
-       if (refpath.depth() - 1 == trace.size() &&
-           refpath.last_dentry().find(".ceph.") == 0) {
-         // ...
-       }
-       // </HACK>
-      }
-
-      // can we dnlock whole path?
-      if (!mds->locker->dentry_can_rdlock_trace(trace, req))
-       return;
-
-      // go
-      MDRequest *mdr = mdcache->request_start(req);
-      mds->locker->dentry_anon_rdlock_trace_start(trace);
-      dispatch_request(mdr);
-      return;
-    }
-  }
-
-  
-  // ----
-  // the rest handle things themselves.
-  
-  switch (req->get_op()) {
-  case MDS_OP_OPEN:
-    assert(req->args.open.flags & O_CREAT);
-  case MDS_OP_MKNOD:
-  case MDS_OP_MKDIR:
-  case MDS_OP_SYMLINK:
-  case MDS_OP_LINK:
-  case MDS_OP_UNLINK:
-  case MDS_OP_RMDIR:
-  case MDS_OP_RENAME:
-    {
-      // register request
-      MDRequest *mdr = mdcache->request_start(req);
-      dispatch_request(mdr);
-      return;
-    }
   }
 
-  assert(0);  // we missed something!
+  dispatch_request(mdr);
+  return;
 }
 
 
@@ -408,33 +331,6 @@ void Server::dispatch_request(MDRequest *mdr)
 // HELPERS
 
 
-/** request_pin_ref
- * return the ref inode, referred to by the last dentry in the trace.
- * open if it is remote.
- * pin.
- * return existing, if mdr->ref already set.
- */
-CInode *Server::request_pin_ref(MDRequest *mdr)
-{
-  // already did it?
-  if (mdr->ref)
-    return mdr->ref;
-
-  // open and pin ref inode in cache too
-  CInode *ref = 0;
-  if (mdr->trace.empty())
-    ref = mdcache->get_root();
-  else {
-    ref = mdcache->get_dentry_inode(mdr->trace[mdr->trace.size()-1], mdr);
-    if (!ref) return 0;
-  }
-  mdr->pin(ref);
-  mdr->ref = ref;
-  return ref;
-}
-
-
-
 /** validate_dentry_dir
  *
  * verify that the dir exists and would own the dname.
@@ -579,6 +475,104 @@ CDir *Server::traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, file
 }
 
 
+
+CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, bool want_auth)
+{
+  // already got ref?
+  if (mdr->ref) 
+    return mdr->ref;
+
+  MClientRequest *req = mdr->client_request();
+
+  // traverse
+  filepath refpath = req->get_filepath();
+  Context *ondelay = new C_MDS_RetryRequest(mdcache, mdr);
+  vector<CDentry*> trace;
+  int r = mdcache->path_traverse(mdr, 0,
+                                refpath, trace, req->follow_trailing_symlink(),
+                                req, ondelay,
+                                MDS_TRAVERSE_FORWARD,
+                                true); // is MClientRequest
+  if (r > 0) return false; // delayed
+  if (r < 0) {  // error
+    reply_request(mdr, r);
+    return 0;
+  }
+
+  // open ref inode
+  CInode *ref = 0;
+  if (mdr->trace.empty())
+    ref = mdcache->get_root();
+  else {
+    CDentry *dn = mdr->trace[mdr->trace.size()-1];
+
+    // if no inode, fw to dentry auth?
+    if (want_auth && 
+       dn->is_remote() &&
+       !dn->inode && 
+       !dn->is_auth()) {
+      if (dn->dir->auth_is_ambiguous()) {
+       dout(10) << "waiting for single auth on " << *dn << endl;
+       dn->dir->add_waiter(CInode::WAIT_SINGLEAUTH, 
+                           dn->get_name(),
+                           new C_MDS_RetryMessage(mds, req));
+      } else {
+       dout(10) << "fw to auth for " << *dn << endl;
+       mds->forward_message_mds(req, dn->authority().first, MDS_PORT_SERVER);
+      }
+    }
+
+    // open ref inode
+    ref = mdcache->get_dentry_inode(dn, mdr);
+    if (!ref) return 0;
+  }
+
+  // fw to inode auth?
+  if (want_auth && !ref->is_auth()) {
+    if (ref->auth_is_ambiguous()) {
+      dout(10) << "waiting for single auth on " << *ref << endl;
+      ref->add_waiter(CInode::WAIT_SINGLEAUTH, 
+                     new C_MDS_RetryMessage(mds, req));
+    } else {
+      dout(10) << "fw to auth for " << *ref << endl;
+      mds->forward_message_mds(req, ref->authority().first, MDS_PORT_SERVER);
+    }
+  }
+
+  // auth_pin?
+  if (want_auth) {
+    if (!ref->can_auth_pin()) {
+      dout(7) << "waiting for authpinnable on " << *ref << endl;
+      ref->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+      return 0;
+    }
+    mdr->auth_pin(ref);
+  }
+
+  // lock the path
+  set<CDentry*> dentry_rdlocks;
+  set<CDentry*> dentry_xlocks;
+  set<CInode*> inode_empty;
+
+  for (unsigned i=0; i<trace.size(); i++) 
+    dentry_rdlocks.insert(trace[i]);
+
+  if (!mds->locker->acquire_locks(mdr, 
+                                 dentry_rdlocks, dentry_xlocks,
+                                 inode_empty, inode_empty))
+    return 0;
+  
+  // set and pin ref
+  mdr->pin(ref);
+  mdr->ref = ref;
+
+  // save the locked trace.
+  mdr->trace.swap(trace);
+  
+  return ref;
+}
+
+
 /** rdlock_path_xlock_dentry
  * traverse path to the directory that could/would contain dentry.
  * make sure i am auth for that dentry, forward as necessary.
@@ -679,7 +673,7 @@ CDir* Server::try_open_auth_dir(CInode *diri, frag_t fg, MDRequest *mdr)
 
   // not open and inode frozen?
   if (!dir && diri->is_frozen_dir()) {
-    dout(10) << "try_open_dir: dir inode is frozen, waiting " << *diri << endl;
+    dout(10) << "try_open_auth_dir: dir inode is frozen, waiting " << *diri << endl;
     assert(diri->get_parent_dir());
     diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
                                       new C_MDS_RetryRequest(mdcache, mdr));
@@ -705,6 +699,7 @@ CDir* Server::try_open_auth_dir(CInode *diri, frag_t fg, MDRequest *mdr)
   return dir;
 }
 
+/*
 CDir* Server::try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr)
 {
   CDir *dir = diri->get_dirfrag(fg);
@@ -736,7 +731,7 @@ CDir* Server::try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr)
     return 0;
   }
 }
-
+*/
 
 // ===============================================================================
 // STAT
@@ -744,7 +739,7 @@ CDir* Server::try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr)
 void Server::handle_client_stat(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *ref = request_pin_ref(mdr);
+  CInode *ref = rdlock_path_pin_ref(mdr, false);
   if (!ref) return;
 
   // FIXME: this is really not the way to handle the statlite mask.
@@ -810,20 +805,12 @@ public:
 void Server::handle_client_utime(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *cur = request_pin_ref(mdr);
+  CInode *cur = rdlock_path_pin_ref(mdr, true);
   if (!cur) return;
 
-  // auth pin
-  if (!cur->can_auth_pin()) {
-    dout(7) << "waiting for authpinnable on " << *cur << endl;
-    cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
-  mdr->auth_pin(cur);
-
   // write
   if (!mds->locker->inode_file_xlock_start(cur, mdr))
-    return;  // fw or (wait for) sync
+    return;
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
 
@@ -884,20 +871,12 @@ public:
 void Server::handle_client_chmod(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *cur = request_pin_ref(mdr);
+  CInode *cur = rdlock_path_pin_ref(mdr, true);
   if (!cur) return;
 
-  // auth pin
-  if (!cur->can_auth_pin()) {
-    dout(7) << "waiting for authpinnable on " << *cur << endl;
-    cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
-  mdr->auth_pin(cur);
-
   // write
   if (!mds->locker->inode_hard_xlock_start(cur, mdr))
-    return;  // fw or (wait for) lock
+    return;
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
 
@@ -951,20 +930,12 @@ public:
 void Server::handle_client_chown(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *cur = request_pin_ref(mdr);
+  CInode *cur = rdlock_path_pin_ref(mdr, true);
   if (!cur) return;
 
-  // auth pin
-  if (!cur->can_auth_pin()) {
-    dout(7) << "waiting for authpinnable on " << *cur << endl;
-    cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
-  mdr->auth_pin(cur);
-
   // write
   if (!mds->locker->inode_hard_xlock_start(cur, mdr))
-    return;  // fw or (wait for) lock
+    return;
 
   mds->balancer->hit_inode(cur, META_POP_IWR);   
 
@@ -1029,7 +1000,7 @@ int Server::encode_dir_contents(CDir *dir,
 void Server::handle_client_readdir(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *diri = request_pin_ref(mdr);
+  CInode *diri = rdlock_path_pin_ref(mdr, false);
   if (!diri) return;
 
   // it's a directory, right?
@@ -1277,24 +1248,20 @@ void Server::handle_client_link(MDRequest *mdr)
                                 req, ondelay,
                                 MDS_TRAVERSE_DISCOVER);
   if (r > 0) return; // wait
+  if (targettrace.empty()) r = -EINVAL;
   if (r < 0) {
     reply_request(mdr, r);
     return;
   }
   
   // identify target inode
-  CInode *targeti;
-  if (targettrace.empty())
-    targeti = mdcache->get_root();
-  else
-    targeti = targettrace[targettrace.size()-1]->inode;
+  CInode *targeti = targettrace[targettrace.size()-1]->inode;
   assert(targeti);
-  assert(r == 0);
 
   // dir?
   dout(7) << "target is " << *targeti << endl;
   if (targeti->is_dir()) {
-    dout(7) << "target is a dir, failing" << endl;
+    dout(7) << "target is a dir, failing..." << endl;
     reply_request(mdr, -EINVAL);
     return;
   }
@@ -1435,10 +1402,9 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
 {
   dout(10) << "_link_remote " << *dn << " to " << *targeti << endl;
   
-  // ??
-  // 1. send LinkPrepare to dest (lock target on dest, journal target update)
+  // 1. send LinkPrepare to dest (journal nlink++ prepare)
   // 2. create+journal new dentry, as with link_local.
-  // 3. send LinkCommit to dest (unlocks target on dest, journals commit)  
+  // 3. send LinkCommit to dest (journals commit)  
 
   // IMPLEMENT ME
   reply_request(mdr, -EXDEV);
@@ -1534,13 +1500,6 @@ void Server::handle_client_unlink(MDRequest *mdr)
   CDentry *dn = trace[trace.size()-1];
   assert(dn);
 
-  // readable?
-  if (!dn->can_read(mdr)) {
-    dout(10) << "waiting on unreadable dentry " << *dn << endl;
-    dn->dir->add_waiter(CDir::WAIT_DNREAD, dn->name, new C_MDS_RetryRequest(mdcache, mdr));
-    return;
-  }
-
   // rmdir or unlink?
   bool rmdir = false;
   if (req->get_op() == MDS_OP_RMDIR) rmdir = true;
@@ -1550,11 +1509,19 @@ void Server::handle_client_unlink(MDRequest *mdr)
   } else {
     dout(7) << "handle_client_unlink on " << *dn << endl;
   }
+
+  // readable?
+  if (!dn->can_read(mdr)) {
+    dout(10) << "waiting on unreadable dentry " << *dn << endl;
+    dn->dir->add_waiter(CDir::WAIT_DNREAD, dn->name, new C_MDS_RetryRequest(mdcache, mdr));
+    return;
+  }
+
   // dn looks ok.
 
   // get/open inode.
   mdr->trace.swap(trace);
-  CInode *in = request_pin_ref(mdr);
+  CInode *in = mdcache->get_dentry_inode(dn, mdr);
   if (!in) return;
   dout(7) << "dn links to " << *in << endl;
 
@@ -1578,7 +1545,21 @@ void Server::handle_client_unlink(MDRequest *mdr)
     }
   }
 
-  dout(7) << "inode is " << *in << endl;
+  // lock
+  set<CDentry*> dentry_rdlocks;
+  set<CDentry*> dentry_xlocks;
+  set<CInode*> inode_hard_rdlocks;
+  set<CInode*> inode_hard_xlocks;
+
+  for (unsigned i=0; i<trace.size()-1; i++)
+    dentry_rdlocks.insert(trace[i]);
+  dentry_xlocks.insert(dn);
+  inode_hard_xlocks.insert(in);
+  
+  if (!mds->locker->acquire_locks(mdr, 
+                                 dentry_rdlocks, dentry_xlocks,
+                                 inode_hard_rdlocks, inode_hard_xlocks))
+    return;
 
   // ok!
   if (dn->is_remote() && !dn->inode->is_auth()) 
@@ -1614,23 +1595,6 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn)
 {
   dout(10) << "_unlink_local " << *dn << endl;
 
-  // auth pin inode
-  if (!mdr->is_auth_pinned(dn->inode) &&
-      !dn->inode->can_auth_pin()) {
-    dn->inode->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-    
-    // drop all locks while we wait (racey?)
-    mdcache->request_drop_locks(mdr);
-    mdr->drop_auth_pins();
-    return;
-  }
-  mdr->auth_pin(dn->inode);
-
-  // lock inode
-  if (!mds->locker->inode_hard_xlock_start(dn->inode, mdr))
-    return;
-
-
   // get stray dn ready?
   CDentry *straydn = 0;
   if (dn->is_primary()) {
@@ -2475,20 +2439,7 @@ void Server::handle_client_rename_local(MClientRequest *req,
 // ===================================
 // TRUNCATE, FSYNC
 
-class C_MDS_ReplyRequest : public Context {
-  MDS *mds;
-  MDRequest *mdr;
-public:
-  C_MDS_ReplyRequest(MDS *m, MDRequest *r) : mds(m), mdr(r) {}
-  void finish(int r) {
-    // reply
-    MClientReply *reply = new MClientReply(mdr->client_request(), 0);
-    reply->set_result(0);
-    mds->server->reply_request(mdr, reply, mdr->ref);
-  }
-};
-
-class C_MDS_truncate_finish : public Context {
+class C_MDS_truncate_purged : public Context {
   MDS *mds;
   MDRequest *mdr;
   CInode *in;
@@ -2496,7 +2447,7 @@ class C_MDS_truncate_finish : public Context {
   off_t size;
   time_t ctime;
 public:
-  C_MDS_truncate_finish(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
+  C_MDS_truncate_purged(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
     mds(m), mdr(r), in(i), 
     pv(pdv),
     size(sz), ctime(ct) { }
@@ -2512,34 +2463,39 @@ public:
     // hit pop
     mds->balancer->hit_inode(in, META_POP_IWR);   
 
+    // reply
+    mds->server->reply_request(mdr, 0);
+  }
+};
+
+class C_MDS_truncate_logged : public Context {
+  MDS *mds;
+  MDRequest *mdr;
+  CInode *in;
+  version_t pv;
+  off_t size;
+  time_t ctime;
+public:
+  C_MDS_truncate_logged(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
+    mds(m), mdr(r), in(i), 
+    pv(pdv),
+    size(sz), ctime(ct) { }
+  void finish(int r) {
+    assert(r == 0);
+
     // purge
     mds->mdcache->purge_inode(&in->inode, size);
-    mds->mdcache->wait_for_purge(in->inode.ino, size, new C_MDS_ReplyRequest(mds, mdr));
+    mds->mdcache->wait_for_purge(in->inode.ino, size, 
+                                new C_MDS_truncate_purged(mds, mdr, in, pv, size, ctime));
   }
 };
 
 void Server::handle_client_truncate(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *cur = request_pin_ref(mdr);
+  CInode *cur = rdlock_path_pin_ref(mdr, true);
   if (!cur) return;
 
-  // not auth?
-  if (!cur->is_auth()) {
-    mdcache->request_forward(mdr, cur->authority().first);
-    return;
-  }
-
-  // lock inode
-  if (!cur->can_auth_pin()) {
-    dout(7) << "waiting for authpinnable on " << *cur << endl;
-    cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-    mdcache->request_drop_locks(mdr);
-    mdr->drop_auth_pins();
-    return;
-  }
-  mdr->auth_pin(cur);
-
   // check permissions?  
 
   // xlock inode
@@ -2555,8 +2511,8 @@ void Server::handle_client_truncate(MDRequest *mdr)
   // prepare
   version_t pdv = cur->pre_dirty();
   time_t ctime = g_clock.gettime();
-  C_MDS_truncate_finish *fin = new C_MDS_truncate_finish(mds, mdr, cur, 
-                                                        pdv, req->args.truncate.length, ctime);
+  Context *fin = new C_MDS_truncate_logged(mds, mdr, cur, 
+                                          pdv, req->args.truncate.length, ctime);
   
   // log + wait
   EUpdate *le = new EUpdate("truncate");
@@ -2580,15 +2536,19 @@ void Server::handle_client_truncate(MDRequest *mdr)
 void Server::handle_client_open(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request();
-  CInode *cur = request_pin_ref(mdr);
-  if (!cur) return;
 
   int flags = req->args.open.flags;
   int cmode = req->get_open_file_mode();
-
-  dout(7) << "open " << flags << " on " << *cur << endl;
-  dout(10) << "open flags = " << flags << "  filemode = " << cmode << endl;
-
+  bool need_auth = ((cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY) ||
+                   (flags & O_TRUNC));
+  dout(10) << "open flags = " << flags
+          << ", filemode = " << cmode
+          << ", need_auth = " << need_auth
+          << endl;
+
+  CInode *cur = rdlock_path_pin_ref(mdr, need_auth);
+  if (!cur) return;
+  
   // regular file?
   if ((cur->inode.mode & INODE_TYPE_MASK) != INODE_MODE_FILE) {
     dout(7) << "not a regular file " << *cur << endl;
@@ -2596,49 +2556,42 @@ void Server::handle_client_open(MDRequest *mdr)
     return;
   }
 
-  // auth for write access
-  if (cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY &&
-      !cur->is_auth()) {
-    int auth = cur->authority().first;
-    assert(auth != mds->get_nodeid());
-    dout(9) << "open writeable on replica for " << *cur << " fw to auth " << auth << endl;
-    
-    mdcache->request_forward(mdr, auth);
-    return;
-  }
+  // hmm, check permissions or something.
+
 
   // O_TRUNC
   if (flags & O_TRUNC) {
-    // auth pin
-    if (!cur->can_auth_pin()) {
-      dout(7) << "waiting for authpinnable on " << *cur << endl;
-      cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-      return;
-    }
-    mdr->auth_pin(cur);
+    assert(cur->is_auth());
 
-    // write
+    // xlock file size
     if (!mds->locker->inode_file_xlock_start(cur, mdr))
-      return;  // fw or (wait for) lock
-    
-    // do update
-    cur->inode.size = 0;
-    cur->_mark_dirty(); // fixme
+      return;
     
-    mds->locker->inode_file_xlock_finish(cur, mdr);
+    if (cur->inode.size > 0) {
+      handle_client_opent(mdr);
+      return;
+    }
   }
+  
+  // do it
+  _do_open(mdr, cur);
+}
 
-
-  // hmm, check permissions or something.
-
+void Server::_do_open(MDRequest *mdr, CInode *cur)
+{
+  MClientRequest *req = mdr->client_request();
+  int cmode = req->get_open_file_mode();
 
   // can we issue the caps they want?
   version_t fdv = mds->locker->issue_file_data_version(cur);
   Capability *cap = mds->locker->issue_new_caps(cur, cmode, req);
   if (!cap) return; // can't issue (yet), so wait!
-
-  dout(12) << "open gets caps " << cap_string(cap->pending()) << " for " << req->get_source() << " on " << *cur << endl;
-
+  
+  dout(12) << "_do_open issuing caps " << cap_string(cap->pending())
+          << " for " << req->get_source()
+          << " on " << *cur << endl;
+  
+  // hit pop
   mds->balancer->hit_inode(cur, META_POP_IRD);
 
   // reply
@@ -2650,6 +2603,84 @@ void Server::handle_client_open(MDRequest *mdr)
 }
 
 
+class C_MDS_open_truncate_purged : public Context {
+  MDS *mds;
+  MDRequest *mdr;
+  CInode *in;
+  version_t pv;
+  time_t ctime;
+public:
+  C_MDS_open_truncate_purged(MDS *m, MDRequest *r, CInode *i, version_t pdv, time_t ct) :
+    mds(m), mdr(r), in(i), 
+    pv(pdv),
+    ctime(ct) { }
+  void finish(int r) {
+    assert(r == 0);
+
+    // apply to cache
+    in->inode.size = 0;
+    in->inode.ctime = ctime;
+    in->inode.mtime = ctime;
+    in->mark_dirty(pv);
+    
+    // hit pop
+    mds->balancer->hit_inode(in, META_POP_IWR);   
+    
+    // do the open
+    mds->server->_do_open(mdr, in);
+  }
+};
+
+class C_MDS_open_truncate_logged : public Context {
+  MDS *mds;
+  MDRequest *mdr;
+  CInode *in;
+  version_t pv;
+  time_t ctime;
+public:
+  C_MDS_open_truncate_logged(MDS *m, MDRequest *r, CInode *i, version_t pdv, time_t ct) :
+    mds(m), mdr(r), in(i), 
+    pv(pdv),
+    ctime(ct) { }
+  void finish(int r) {
+    assert(r == 0);
+
+    // purge also...
+    mds->mdcache->purge_inode(&in->inode, 0);
+    mds->mdcache->wait_for_purge(in->inode.ino, 0,
+                                new C_MDS_open_truncate_purged(mds, mdr, in, pv, ctime));
+  }
+};
+
+
+void Server::handle_client_opent(MDRequest *mdr)
+{
+  CInode *cur = mdr->ref;
+  assert(cur);
+
+  // prepare
+  version_t pdv = cur->pre_dirty();
+  time_t ctime = g_clock.gettime();
+  Context *fin = new C_MDS_open_truncate_logged(mds, mdr, cur, 
+                                               pdv, ctime);
+  
+  // log + wait
+  EUpdate *le = new EUpdate("open_truncate");
+  le->metablob.add_client_req(mdr->reqid);
+  le->metablob.add_dir_context(cur->get_parent_dir());
+  le->metablob.add_inode_truncate(cur->inode, 0);
+  inode_t *pi = le->metablob.add_dentry(cur->parent, true);
+  pi->mtime = ctime;
+  pi->ctime = ctime;
+  pi->version = pdv;
+  pi->size = 0;
+  
+  mdlog->submit_entry(le);
+  mdlog->wait_for_sync(fin);
+}
+
+
+
 class C_MDS_openc_finish : public Context {
   MDS *mds;
   MDRequest *mdr;
index aea76320b51c617b0b9d0806eb8c24339ca79bb1..0bc1deb9b96b0d91ba724634b2cc7cd3d03cff55 100644 (file)
@@ -46,15 +46,16 @@ public:
   void reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei);
 
   // some helpers
-  CInode *request_pin_ref(MDRequest *mdr);
   CDir *validate_dentry_dir(MDRequest *mdr, CInode *diri, const string& dname);
   CDir *traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, filepath refpath);
   CDentry *prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dname, bool okexist=false);
   CInode* prepare_new_inode(MClientRequest *req, CDir *dir);
+
+  CInode* rdlock_path_pin_ref(MDRequest *mdr, bool want_auth);
   CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist);
+
   CDir* try_open_auth_dir(CInode *diri, frag_t fg, MDRequest *mdr);
-  
-  CDir* try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr);
+  //CDir* try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr);
 
   // requests on existing inodes.
   void handle_client_stat(MDRequest *mdr);
@@ -69,6 +70,8 @@ public:
   // open
   void handle_client_open(MDRequest *mdr);
   void handle_client_openc(MDRequest *mdr);  // O_CREAT variant.
+  void handle_client_opent(MDRequest *mdr);  // O_TRUNC variant.
+  void _do_open(MDRequest *mdr, CInode *ref);
 
   // namespace changes
   void handle_client_mknod(MDRequest *mdr);
diff --git a/branches/sage/cephmds2/mds/events/ESlaveUpdate.h b/branches/sage/cephmds2/mds/events/ESlaveUpdate.h
new file mode 100644 (file)
index 0000000..fc673d0
--- /dev/null
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_ESLAVEUPDATE_H
+#define __MDS_ESLAVEUPDATE_H
+
+#include "../LogEvent.h"
+#include "EMetaBlob.h"
+
+class ESlaveUpdate : public LogEvent {
+public:
+  string type;
+  metareqid_t reqid;
+  int op;  // prepare, commit, abort
+  EMetaBlob metablob;
+
+  ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { }
+  ESlaveUpdate(const char *s, metareqid_t ri, int o) : 
+    LogEvent(EVENT_SLAVEUPDATE),
+    type(s),
+    reqid(ri),
+    op(o) { }
+  
+  void print(ostream& out) {
+    if (type.length())
+      out << type << " ";
+    out << " " << op;
+    out << " " << reqid;
+    out << metablob;
+  }
+
+  void encode_payload(bufferlist& bl) {
+    ::_encode(type, bl);
+    ::_encode(reqid, bl);
+    ::_encode(op, bl);
+    metablob._encode(bl);
+  } 
+  void decode_payload(bufferlist& bl, int& off) {
+    ::_decode(type, bl, off);
+    ::_decode(reqid, bl, off);
+    ::_decode(op, bl, off);
+    metablob._decode(bl, off);
+  }
+
+  bool has_expired(MDS *mds);
+  void expire(MDS *mds, Context *c);
+  void replay(MDS *mds);
+};
+
+#endif
index c6b8eec39047d1b33b4d023f37b4323c57ef6e7b..304b389720c62e65c552e931aee24cf3fb0f39e0 100644 (file)
@@ -18,6 +18,7 @@
 #include "events/EAnchor.h"
 #include "events/EAnchorClient.h"
 #include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
 #include "events/EImportMap.h"
 
 #include "events/EMount.h"
@@ -632,6 +633,26 @@ void EUpdate::replay(MDS *mds)
 }
 
 
+// -----------------------
+// EUpdate
+
+bool ESlaveUpdate::has_expired(MDS *mds)
+{
+  return true;
+  //return metablob.has_expired(mds);
+}
+
+void ESlaveUpdate::expire(MDS *mds, Context *c)
+{
+  metablob.expire(mds, c);
+}
+
+void ESlaveUpdate::replay(MDS *mds)
+{
+  //metablob.replay(mds);
+}
+
+
 // -----------------------
 // EImportMap
 
index ebeaa9d496445883dc93be81f50fdac9f9c31e00..bd70fd50a4d5536200562dfd5f2edb8dec5665a9 100644 (file)
@@ -164,6 +164,28 @@ class MClientRequest : public Message {
       return open_file_mode_is_readonly();
     return (st.op < 1000);
   }
+  bool follow_trailing_symlink() {
+    switch (st.op) {
+    case MDS_OP_LSTAT:
+    case MDS_OP_LINK:
+    case MDS_OP_UNLINK:
+    case MDS_OP_RENAME:
+      return false;
+      
+    case MDS_OP_STAT:
+    case MDS_OP_UTIME:
+    case MDS_OP_CHMOD:
+    case MDS_OP_CHOWN:
+    case MDS_OP_READDIR:
+    case MDS_OP_OPEN:
+      return true;
+
+    default:
+      assert(0);
+    }
+  }
+
+
 
   // normal fields
   void set_tid(long t) { st.tid = t; }
index 38b7ce2d585038ca6ba146ff518bd0c6f79458a2..1d03cdf7fc82b01e90e606a3c9e5881217e1e828 100644 (file)
 #ifndef __MINODELINK_H
 #define __MINODELINK_H
 
-typedef struct {
-  inodeno_t ino;
-  int from;
-} MInodeLink_st;
-
 class MInodeLink : public Message {
-  inodeno_t ino;
-  filepath  link_name;
-  bool prepare;
-  MInodeLink_st st;
+public:
+  static const int OP_PREPARE  = 1;
+  static const int OP_AGREE    = 2;
+  static const int OP_COMMIT   = 3;
+  static const int OP_ACK      = 4;
+  static const int OP_ROLLBACK = 5;
+
+  const char *get_opname(int o) {
+    switch (o) {
+    case OP_PREPARE: return "prepare";
+    case OP_AGREE: return "agree";
+    case OP_COMMIT: return "commit";
+    case OP_ACK: return "ack";
+    case OP_ROLLBACK: return "rollback";
+    default: assert(0);
+    }
+  }
+
+private:
+  struct _st {
+    inodeno_t ino;      // inode to nlink++
+    metareqid_t reqid;  // relevant request
+    int op;             // see above
+    bool inc;           // true == ++, false == --
 
- public:
+    time_t ctime;
+  } st;
+
+public:
   inodeno_t get_ino() { return st.ino; }
-  int get_from() { return st.from; }
+  metareqid_t get_reqid() { return st.reqid; }
+  int get_op() { return st.op; }
+  bool get_inc() { return st.inc; }
+
+  time_t get_ctime() { return st.ctime; }
+  void set_ctime(time_t ct) { st.ctime = ct; }
 
   MInodeLink() {}
-  MInodeLink(inodeno_t ino, int from) :
+  MInodeLink(int op, inodeno_t ino, bool inc, metareqid_t ri) :
     Message(MSG_MDS_INODELINK) {
+    st.op = op;
     st.ino = ino;
-    st.from = from;
+    st.inc = inc;
+    st.reqid = ri;
+  }
+
+  virtual char *get_type_name() { return "inode_link"; }
+  void print(ostream& o) {
+    o << "inode_link(" << get_opname(st.op)
+      << " " << st.ino 
+      << " nlink" << (st.inc ? "++":"--")
+      << " " << st.reqid << ")";
   }
-  virtual char *get_type_name() { return "InL";}
   
   virtual void decode_payload() {
     int off = 0;
-    payload.copy(off, sizeof(st), (char*)&st);
-    off += sizeof(st);
+    _decoderaw(st, payload, off);
   }
   virtual void encode_payload() {
-    payload.append((char*)&st,sizeof(st));
+    _encode(st, payload);
   }
 };