#include "messages/MLock.h"
 #include "messages/MDentryUnlink.h"
 
+#include "messages/MRenameWarning.h"
 #include "messages/MRenameNotify.h"
 #include "messages/MRenameNotifyAck.h"
 #include "messages/MRename.h"
 #include "messages/MRenameAck.h"
 #include "messages/MRenameReq.h"
+#include "messages/MRenamePrep.h"
 
 #include "messages/MClientRequest.h"
 
 void MDCache::fix_renamed_dir(CDir *srcdir,
                                                          CInode *in,
                                                          CDir *destdir,
-                                                         bool authchanged)   // _inode_ auth
+                                                         bool authchanged,   // _inode_ auth
+                                                         int dir_auth)        // dir auth (for certain cases)
 {
+  dout(7) << "fix_renamed_dir on " << *in << endl;
   dout(7) << "fix_renamed_dir on " << *in->dir << endl;
 
   if (in->dir->is_auth()) {
                in->dir->state_clear(CDIR_STATE_IMPORT);
                in->dir->put(CDIR_PIN_IMPORT);
 
+               in->dir->dir_auth = CDIR_AUTH_PARENT;
+               dout(7) << " fixing dir_auth to be " << in->dir->dir_auth << endl;
+
                // move my nested imports to in's containing import
                CDir *con = get_containing_import(in->dir);
                assert(con);
                // inode was ours, still ours.
                dout(7) << "inode was ours, still ours." << endl;
                assert(!in->dir->is_import());
+               assert(in->dir->dir_auth == CDIR_AUTH_PARENT);
                
                // move any exports nested beneath me?
                CDir *newcon = get_containing_import(in->dir);
                if (newcon != oldcon) {
                  dout(7) << "moving nested exports under new container" << endl;
                  set<CDir*> nested;
-                 find_nested_exports(in->dir, nested);
+                 find_nested_exports_under(oldcon, in->dir, nested);
                  for (set<CDir*>::iterator it = nested.begin();
                           it != nested.end();
                           it++) {
                in->dir->state_set(CDIR_STATE_IMPORT);
                in->dir->get(CDIR_PIN_IMPORT);
 
+               in->dir->dir_auth = mds->get_nodeid();
+               dout(7) << " fixing dir_auth to be " << in->dir->dir_auth << endl;
+
                // find old import
                CDir *oldcon = get_containing_import(srcdir);
                assert(oldcon);
+               dout(7) << " oldcon is " << *oldcon << endl;
 
-               // move nested exports under me
+               // move nested exports under me 
                set<CDir*> nested;
-               find_nested_exports(in->dir, nested);
+               find_nested_exports_under(oldcon, in->dir, nested);  
                for (set<CDir*>::iterator it = nested.begin();
                         it != nested.end();
                         it++) {
                  nested_exports[oldcon].erase(*it);
                  nested_exports[in->dir].insert(*it);
                }
+
          } else {
                // inode was replica, still replica
                dout(7) << "inode was replica, still replica.  doing nothing." << endl;
                assert(in->dir->is_import());
 
-               // do nothing.
+               // verify dir_auth
+               assert(in->dir->dir_auth == mds->get_nodeid()); // me, because i'm auth for dir.
+               assert(in->authority() != in->dir->dir_auth);   // inode not me.
          }
 
          assert(in->dir->is_import());
                exports.insert(in->dir);
                in->dir->state_set(CDIR_STATE_EXPORT);
                in->dir->get(CDIR_PIN_EXPORT);
+               
+               assert(dir_auth >= 0);  // better be defined
+               in->dir->dir_auth = dir_auth;
+               dout(7) << " fixing dir_auth to be " << in->dir->dir_auth << endl;
+               
+               CDir *newcon = get_containing_import(in->dir);
+               assert(newcon);
+               nested_exports[newcon].insert(in->dir);
+
          } else {
                // inode was ours, still ours
-               dout(7) << "inode was ours, still ours.  doing nothing." << endl;
-               
-               // do nothing.
+               dout(7) << "inode was ours, still ours.  did my import change?" << endl;
+
+               // sanity
+               assert(in->dir->is_export());
+               assert(in->dir->dir_auth >= 0);              
+               assert(in->dir->dir_auth != in->authority());
+
+               // moved under new import?
+               CDir *oldcon = get_containing_import(srcdir);
+               CDir *newcon = get_containing_import(in->dir);
+               if (oldcon != newcon) {
+                 dout(7) << "moving myself under new import " << *newcon << endl;
+                 nested_exports[oldcon].erase(in->dir);
+                 nested_exports[newcon].insert(in->dir);
+               }
          }
 
          assert(in->dir->is_export());
                exports.erase(in->dir);
                in->dir->state_clear(CDIR_STATE_EXPORT);
                in->dir->put(CDIR_PIN_EXPORT);
-
+               
                CDir *oldcon = get_containing_import(srcdir);
                assert(oldcon);
                assert(nested_exports[oldcon].count(in->dir) == 1);
                nested_exports[oldcon].erase(in->dir);
 
+               // simplify dir_auth
+               if (in->authority() == in->dir->authority()) {
+                 in->dir->dir_auth = CDIR_AUTH_PARENT;
+                 dout(7) << "simplified dir_auth to -1, inode auth is (also) " << in->authority() << endl;
+               } else {
+                 assert(in->dir->dir_auth >= 0);    // someone else's export,
+               }
+
          } else {
                // inode was replica, still replica
                dout(7) << "inode was replica, still replica.  do nothing." << endl;
+               
+               // fix dir_auth?
+               if (in->authority() == dir_auth)
+                 in->dir->dir_auth = CDIR_AUTH_PARENT;
+               else
+                 in->dir->dir_auth = dir_auth;
+               dout(7) << " fixing dir_auth to be " << dir_auth << endl;
 
                // do nothing.
          }
        handle_dentry_unlink((MDentryUnlink*)m);
        break;
 
+  case MSG_MDS_RENAMEWARNING:
+       handle_rename_warning((MRenameWarning*)m);
+       break;
   case MSG_MDS_RENAMENOTIFY:
        handle_rename_notify((MRenameNotify*)m);
        break;
   case MSG_MDS_RENAMEREQ:
        handle_rename_req((MRenameReq*)m);
        break;
+  case MSG_MDS_RENAMEPREP:
+       handle_rename_prep((MRenamePrep*)m);
+       break;
   case MSG_MDS_RENAMEACK:
        handle_rename_ack((MRenameAck*)m);
        break;
        // dentry
        CDentry *dn = cur->dir->lookup(path[depth]);
 
-       // xlocked and null?  ** all wrong, FIXME
-       if (onfail == MDS_TRAVERSE_DISCOVERXLOCK &&
-               dn && !dn->inode && dn->is_xlockedbyme(req) &&
+       // xlocked by me?
+       if (dn && !dn->inode && dn->is_xlockedbyme(req) &&
                depth == path.depth()-1) {
-         dout(10) << "traverse: hit null xlocked dentry at tail of traverse, succeeding" << endl;
+         dout(10) << "traverse: hit (my) xlocked dentry at tail of traverse, succeeding" << endl;
          trace.push_back(dn);
          break; // done!
        }
   }
 
   // foreign xlocks?
-  if (active_requests[req].xlocks.size()) {
+  if (active_requests[req].foreign_xlocks.size()) {
        set<CDentry*> dns = active_requests[req].foreign_xlocks;
        active_requests[req].foreign_xlocks.clear();
        
          CDentry *dn = *it;
          
          dout(7) << "request_cleanup sending unxlock for foreign xlock on " << *dn << endl;
+         assert(dn->is_xlocked());
          int dauth = dn->dir->dentry_authority(dn->name);
          MLock *m = new MLock(LOCK_AC_UNXLOCK, mds->get_nodeid());
          m->set_dn(dn->dir->ino(), dn->name);
 }
 
 
-void MDCache::request_forward(Message *req, int who)
+void MDCache::request_forward(Message *req, int who, int port)
 {
+  if (!port) port = MDS_PORT_SERVER;
+
   dout(7) << "request_forward to " << who << " req " << *req << endl;
   request_cleanup(req);
   mds->messenger->send_message(req,
-                                                          MSG_ADDR_MDS(who), MDS_PORT_SERVER,
-                                                          MDS_PORT_SERVER);
+                                                          MSG_ADDR_MDS(who), port,
+                                                          port);
 }
 
 
                if (m->get_dentry_xlock(i)) {
                  dout(7) << " new dentry is xlock " << *dn << endl;
                  dn->lockstate = DN_LOCK_XLOCK;
+                 dn->xlockedby = 0;
                }
                dout(7) << "added " << *dn << endl;
          }
          assert(in);  // OOPS  i should be authority, or recent authority (and thus frozen).
        }  
        if (!in->is_auth()) {
-         int newauth = ino_proxy_auth(in->ino(), 
-                                                                  from,
-                                                                  export_proxy_inos);
+         int newauth = in->authority();
          dout(7) << "proxy inode expire on " << *in << " to " << newauth << endl;
-         in->is_proxy();
          assert(newauth >= 0);
          assert(in->state_test(CINODE_STATE_PROXY));
          if (proxymap.count(newauth) == 0) proxymap[newauth] = new MCacheExpire(from);
          assert(dir);  // OOPS  i should be authority, or recent authority (and thus frozen).
        }  
        if (!dir->is_auth()) {
-         int newauth = ino_proxy_auth(dir->ino(), 
-                                                                  from,
-                                                                  export_proxy_dirinos);
+         int newauth = dir->authority();
          dout(7) << "proxy dir expire on " << *dir << " to " << newauth << endl;
          assert(dir->is_proxy());
          assert(newauth >= 0);
   int from = m->get_from();
 
   if (in->is_proxy()) {
-       int newauth = ino_proxy_auth(in->ino(), 
-                                                                from,
-                                                                export_proxy_inos);
+       int newauth = in->authority();
        assert(newauth >= 0);
        dout(7) << "handle_inode_writer_closed " << m->get_ino() << " from " << from << ": proxy, fw to " << newauth << endl;
        mds->messenger->send_message(m,
 
 // renaming!
 
+/*
+ * when initiator gets an ack back for a foreign rename
+ */
+
+class C_MDC_RenameNotifyAck : public Context {
+  MDCache *mdc;
+  CInode *in;
+  int initiator;
+
+public:
+  C_MDC_RenameNotifyAck(MDCache *mdc, 
+                                               CInode *in, int initiator) {
+       this->mdc = mdc;
+       this->in = in;
+       this->initiator = initiator;
+  }
+  void finish(int r) {
+       mdc->file_rename_ack(in, initiator);
+  }
+};
+
+
+
+/************** initiator ****************/
+
+/*
+ * when we get MRenameAck (and rename is done, notifies gone out+acked, etc.)
+ */
 class C_MDC_RenameAck : public Context {
   MDCache *mdc;
   CDir *srcdir;
   }
 };
 
-void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *c)
+
+void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *onfinish)
 {
   assert(srcdn->is_xlocked());  // by me
   assert(destdn->is_xlocked());  // by me
 
   CDir *srcdir = srcdn->dir;
   string srcname = srcdn->name;
-  bool srcauth = srcdir->dentry_authority(srcdn->name) == mds->get_nodeid();
-
+  
   CDir *destdir = destdn->dir;
   string destname = destdn->name;
-  bool destauth = destdir->dentry_authority(destdn->name) == mds->get_nodeid();
-  
+
   CInode *in = srcdn->inode;
   Message *req = srcdn->xlockedby;
 
+
+  // determine the players
+  int srcauth = srcdir->dentry_authority(srcdn->name);
+  int destauth = destdir->dentry_authority(destname);
+
+
   // FOREIGN rename?
-  if (srcauth && !destauth) {
-       dout(7) << "file_rename src auth, not dest auth.  sending MRename" << endl;
+  if (srcauth != mds->get_nodeid() ||
+         destauth != mds->get_nodeid()) {
+       dout(7) << "foreign rename.  srcauth " << srcauth << ", destauth " << destauth << ", isdir " << srcdn->inode->is_dir() << endl;
        
-       file_rename_foreign_src(srcdn, destdn, 
-                                                       mds->get_nodeid());  // i'm the initiator
+       string destpath;
+       destdn->make_path(destpath);
 
-       // set waiter on the inode (is this the best place?)
-       in->add_waiter(CINODE_WAIT_RENAMEACK, c);
-       return;
-  }
-  else if (!srcauth) {
-       if (destauth) {
-         dout(7) << "file_rename dest auth, not src auth.  sending MRenameReq" << endl;        
-       } else {
-         dout(7) << "file_rename neither src auth nor dest auth.  sending MRenameReq" << endl; 
+       if (destauth != mds->get_nodeid()) { 
+         // make sure dest has dir open.
+         dout(7) << "file_rename i'm not dest auth.  sending MRenamePrep to " << destauth << endl;
+         
+         // prep dest first, they must have the dir open!  rest will follow.
+         string srcpath;
+         srcdn->make_path(srcpath);
+         
+         MRenamePrep *m = new MRenamePrep(mds->get_nodeid(),  // i'm the initiator
+                                                                          srcdir->ino(), srcname, srcpath, 
+                                                                          destdir->ino(), destname, destpath,
+                                                                          srcauth);  // tell dest who src is (maybe even me)
+         mds->messenger->send_message(m,
+                                                                  MSG_ADDR_MDS(destauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+         
+         show_imports();
+         
        }
        
-       MRenameReq *m = new MRenameReq(mds->get_nodeid(),  // i'm the initiator
-                                                                  srcdir->ino(), srcname, destdir->ino(), destname);
-       int srcauth = srcdir->dentry_authority(srcdn->name);
-       mds->messenger->send_message(m,
-                                                                MSG_ADDR_MDS(srcauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+       else if (srcauth != mds->get_nodeid()) {
+         if (destauth == mds->get_nodeid()) {
+               dout(7) << "file_rename dest auth, not src auth.  sending MRenameReq" << endl;  
+         } else {
+               dout(7) << "file_rename neither src auth nor dest auth.  sending MRenameReq" << endl;   
+         }
+         
+         // srcdn not important on destauth, just request
+         MRenameReq *m = new MRenameReq(mds->get_nodeid(),  // i'm the initiator
+                                                                        srcdir->ino(), srcname, 
+                                                                        destdir->ino(), destname, destpath, destauth);  // tell src who dest is (they may not know)
+         mds->messenger->send_message(m,
+                                                                  MSG_ADDR_MDS(srcauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+       }
+       
+       else
+         assert(0);
 
        // set waiter on the inode (is this the best place?)
-       in->add_waiter(CINODE_WAIT_RENAMEACK, c);
+       in->add_waiter(CINODE_WAIT_RENAMEACK, 
+                                  new C_MDC_RenameAck(this, 
+                                                                          srcdir, in, onfinish));
        return;
-  } 
+  }
 
-  // LOCAL rename
-  assert(srcauth && destauth);
+  // LOCAL rename!
+  assert(srcauth == mds->get_nodeid() && destauth == mds->get_nodeid());
   dout(7) << "file_rename src and dest auth, renaming locally (easy!)" << endl;
   
   // update our cache
   rename_file(srcdn, destdn);
   
-  // mark dentries dirty
-  srcdn->mark_dirty();
-  destdn->mark_dirty();
-  in->mark_dirty();
-  
   // update imports/exports?
   if (in->is_dir() && in->dir) 
        fix_renamed_dir(srcdir, in, destdir, false);  // auth didnt change
 
-  
-  // tell replicas (no need to wait for ack) to do the same, and un-xlock.
-  // make list
-  set<int> notify;
-  notify = srcdir->get_open_by();
+  // mark dentries dirty
+  srcdn->mark_dirty();
+  destdn->mark_dirty();
+  in->mark_dirty();
+ 
+ 
+  // local, restrict notify to ppl with open dirs
+  set<int> notify = srcdir->get_open_by();
   for (set<int>::iterator it = destdir->open_by_begin();
           it != destdir->open_by_end();
           it++)
        if (notify.count(*it) == 0) notify.insert(*it);
-
+  
   if (notify.size()) {
-       // tell
-       string destdirpath;
-       destdir->inode->make_path(destdirpath);
-       
-       for (set<int>::iterator it = notify.begin();
-                it != notify.end();
-                it++) {
-         mds->messenger->send_message(new MRenameNotify(in->ino(),
-                                                                                                        srcdir->ino(),
-                                                                                                        srcname,
-                                                                                                        destdir->ino(),
-                                                                                                        destdirpath,
-                                                                                                        destname),
-                                                                  MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE);
-       }
+       // warn + notify
+       file_rename_warn(in, notify);
+       file_rename_notify(in, srcdir, srcname, destdir, destname, notify, mds->get_nodeid());
 
-       // wait for acks
-       in->rename_waiting_for_ack = notify;
+       // wait for MRenameNotifyAck's
+       in->add_waiter(CINODE_WAIT_RENAMENOTIFYACK,
+                                  new C_MDC_RenameNotifyAck(this, in, mds->get_nodeid()));  // i am initiator
+
+       // wait for finish
        in->add_waiter(CINODE_WAIT_RENAMEACK,
-                                  new C_MDC_RenameAck(this, srcdir, in, c));
+                                  new C_MDC_RenameAck(this, srcdir, in, onfinish));
   } else {
-       file_rename_finish(srcdir, in, c);
+       // sweet, no notify necessary, we're done!
+       file_rename_finish(srcdir, in, onfinish);
   }
 }
 
-
-void MDCache::handle_rename_notify_ack(MRenameNotifyAck *m)
+void MDCache::handle_rename_ack(MRenameAck *m)
 {
   CInode *in = get_inode(m->get_ino());
   assert(in);
-  dout(7) << "handle_rename_notify_ack on " << *in << endl;
+  
+  dout(7) << "handle_rename_ack on " << *in << endl;
 
-  in->rename_waiting_for_ack.erase(m->get_source());
-  if (in->rename_waiting_for_ack.empty()) {
-       // last one!
-       in->finish_waiting(CINODE_WAIT_RENAMEACK, 0);
-  } else {
-       dout(7) << "still waiting for " << in->rename_waiting_for_ack << endl;
-  }
-}
+  // all done!
+  in->finish_waiting(CINODE_WAIT_RENAMEACK);
 
+  delete m;
+}
 
 void MDCache::file_rename_finish(CDir *srcdir, CInode *in, Context *c)
 {
   dout(10) << "file_rename_finish on " << *in << endl;
 
-  // did i empty out an imported dir?
+  // did i empty out an imported dir?  FIXME this check should go somewhere else???
   if (srcdir->is_import() && !srcdir->inode->is_root() && srcdir->get_size() == 0) 
        export_empty_import(srcdir);
 
-  /* just let the finished request drop the locks.
-
-  // clean up xlocks
-  dentry_xlock_finish(srcdn);
-  dentry_xlock_finish(destdn);
-
-  // other waiters
-  srcdir->take_waiting(CDIR_WAIT_ANY, srcname, mds->finished_queue);
-  destdir->take_waiting(CDIR_WAIT_DNREAD, destname, mds->finished_queue);
-  */
-  
   // finish our caller
   if (c) {
        c->finish(0);
 }
 
 
-void MDCache::file_rename_foreign_src(CDentry *srcdn, CDentry *destdn, int initiator)
+/************* src **************/
+
+
+/** handle_rename_req
+ * received by auth of src dentry (from init, or destauth if dir).  
+ * src may not have dest dir open.
+ * src will export inode, unlink|rename, and send MRename to dest.
+ */
+void MDCache::handle_rename_req(MRenameReq *m)
+{
+  // i am auth, i will have it.
+  CInode *srcdiri = get_inode(m->get_srcdirino());
+  CDir *srcdir = srcdiri->dir;
+  CDentry *srcdn = srcdir->lookup(m->get_srcname());
+  assert(srcdn);
+  
+  // do it
+  file_rename_foreign_src(srcdn, 
+                                                 m->get_destdirino(), m->get_destname(), m->get_destpath(), m->get_destauth(), 
+                                                 m->get_initiator());
+  delete m;
+}
+
+
+void MDCache::file_rename_foreign_src(CDentry *srcdn, 
+                                                                         inodeno_t destdirino, string& destname, string& destpath, int destauth, 
+                                                                         int initiator)
 {
-  dout(7) << "file_rename_foreign_src " << *srcdn << " to " << *destdn << endl;
+  dout(7) << "file_rename_foreign_src " << *srcdn << endl;
 
   CDir *srcdir = srcdn->dir;
-  CDir *destdir = destdn->dir;
   string srcname = srcdn->name;
 
   // (we're basically exporting this inode)
   assert(in);
   assert(in->is_auth());
 
-  string srcpath;
-  srcdn->inode->make_path(srcpath);
-  
+  if (in->is_dir()) show_imports();
+
   // encode and export inode state
   crope inode_state;
   encode_export_inode(in, inode_state);
-  
+
+  // send
   MRename *m = new MRename(initiator,
-                                                  srcdir->ino(), srcdn->name, srcpath, destdir->ino(), destdn->name,
+                                                  srcdir->ino(), srcdn->name, destdirino, destname,
                                                   inode_state);
-  int destauth = destdir->dentry_authority(destdn->name);
   mds->messenger->send_message(m,
                                                           MSG_ADDR_MDS(destauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+  
+  // have dest?
+  CInode *destdiri = get_inode(m->get_destdirino());
+  CDir *destdir = 0;
+  if (destdiri) destdir = destdiri->dir;
+  CDentry *destdn = 0;
+  if (destdir) destdn = destdir->lookup(m->get_destname());
+
+  // discover src
+  if (!destdn) {
+       dout(7) << "file_rename_foreign_src doesn't have destdn, discovering " << destpath << endl;
+
+       filepath destfilepath = destpath;
+       vector<CDentry*> trace;
+       int r = path_traverse(destfilepath, trace, true,
+                                                 m, new C_MDS_RetryMessage(mds, m), 
+                                                 MDS_TRAVERSE_DISCOVER);
+       assert(r>0);
+       return;
+  }
+
+  assert(destdn);
 
   // update our cache
   rename_file(srcdn, destdn);
-  srcdn->mark_dirty();
-       
+  
   // update imports/exports?
   if (in->is_dir() && in->dir) 
        fix_renamed_dir(srcdir, in, destdir, true);  // auth changed
 
-  /* let the initiator do this.
-  // drop xlock on src
-  dentry_xlock_finish(srcdn);
-  srcdir->take_waiting(CDIR_WAIT_ANY, srcname, mds->finished_queue);
-  */
+  srcdn->mark_dirty();
+
+  // proxy!
+  in->state_set(CINODE_STATE_PROXY);
+  in->get(CINODE_PIN_PROXY);
+  
+  // generate notify list (everybody but src|dst) and send warnings
+  set<int> notify;
+  for (int i=0; i<mds->get_cluster()->get_num_mds(); i++) {
+       if (i != mds->get_nodeid() &&  // except the source
+               i != destauth)             // and the dest
+         notify.insert(i);
+  }
+  file_rename_warn(in, notify);
+
+
+  // wait for MRenameNotifyAck's
+  in->add_waiter(CINODE_WAIT_RENAMENOTIFYACK,
+                                new C_MDC_RenameNotifyAck(this, in, initiator));
 }
 
-void MDCache::handle_rename_req(MRenameReq *m)
+void MDCache::file_rename_warn(CInode *in,
+                                                          set<int>& notify)
 {
-  CInode *srcdiri = get_inode(m->get_srcdirino());
-  CDir *srcdir = srcdiri->dir;
-  CDentry *srcdn = srcdir->lookup(m->get_srcname());
-  assert(srcdn);
+  // note gather list
+  in->rename_waiting_for_ack = notify;
 
-  CInode *destdiri = get_inode(m->get_destdirino());
-  CDir *destdir = destdiri->dir;
-  CDentry *destdn = destdir->lookup(m->get_destname());
-  assert(destdn);
+  // send
+  for (set<int>::iterator it = notify.begin();
+          it != notify.end();
+          it++) {
+       dout(10) << "file_rename_warn to " << *it << " for " << *in << endl;
+       mds->messenger->send_message(new MRenameWarning(in->ino()),
+                                                                MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE);
+  }
+}
+
+
+void MDCache::handle_rename_notify_ack(MRenameNotifyAck *m)
+{
+  CInode *in = get_inode(m->get_ino());
+  assert(in);
+  dout(7) << "handle_rename_notify_ack on " << *in << endl;
+
+  in->rename_waiting_for_ack.erase(m->get_source());
+  if (in->rename_waiting_for_ack.empty()) {
+       // last one!
+       in->finish_waiting(CINODE_WAIT_RENAMENOTIFYACK, 0);
+  } else {
+       dout(7) << "still waiting for " << in->rename_waiting_for_ack << endl;
+  }
+}
+
+
+void MDCache::file_rename_ack(CInode *in, int initiator) 
+{
+  // we got all our MNotifyAck's.
+
+  // was i proxy (if not, it's cuz this was a local rename)
+  if (in->state_test(CINODE_STATE_PROXY)) {
+       dout(10) << "file_rename_ack clearing proxy bit on " << *in << endl;
+       in->state_clear(CINODE_STATE_PROXY);
+       in->put(CINODE_PIN_PROXY);
+  }
+
+  // done!
+  if (initiator == mds->get_nodeid()) {
+       // it's me, finish
+       dout(7) << "file_rename_ack i am initiator, finishing" << endl;
+       in->finish_waiting(CINODE_WAIT_RENAMEACK);
+  } else {
+       // send ack
+       dout(7) << "file_rename_ack sending MRenameAck to initiator " << initiator << endl;
+       mds->messenger->send_message(new MRenameAck(in->ino()),
+                                                                MSG_ADDR_MDS(initiator), MDS_PORT_CACHE, MDS_PORT_CACHE);
+  }  
+}
+
+
+
+
+/************ dest *************/
+
+/** handle_rename_prep
+ * received by auth of dest dentry to make sure they have src + dir open.
+ * this is so that when they get the inode and dir, they can update exports etc properly.
+ * will send MRenameReq to src.
+ */
+void MDCache::handle_rename_prep(MRenamePrep *m)
+{
+  // open src
+  filepath srcpath = m->get_srcpath();
+  vector<CDentry*> trace;
+  int r = path_traverse(srcpath, trace, true,
+                                               m, new C_MDS_RetryMessage(mds, m), 
+                                               MDS_TRAVERSE_DISCOVER);
 
-  file_rename_foreign_src(srcdn, destdn, m->get_initiator());
+  if (r>0) return;
+
+  // ok!
+  CInode *srcin = trace[trace.size()-1]->inode;
+  assert(srcin);
+  
+  dout(7) << "handle_rename_prep have srcin " << *srcin << endl;
+
+  if (srcin->is_dir()) {
+       if (!srcin->dir) {
+         dout(7) << "handle_rename_prep need to open dir" << endl;
+         open_remote_dir(srcin,
+                                         new C_MDS_RetryMessage(mds,m));
+         return;
+       }
+
+       dout(7) << "handle_rename_prep have dir " << *srcin->dir << endl;       
+  }
+
+  // ok!
+
+  // send rename request
+  MRenameReq *req = new MRenameReq(m->get_initiator(),  // i'm the initiator
+                                                                  m->get_srcdirino(), m->get_srcname(), 
+                                                                  m->get_destdirino(), m->get_destname(), m->get_destpath(),
+                                                                  mds->get_nodeid());  // i am dest
+  mds->messenger->send_message(req,
+                                                          MSG_ADDR_MDS(m->get_srcauth()), MDS_PORT_CACHE, MDS_PORT_CACHE);
   delete m;
+  return;
 }
 
+
+
+/** handle_rename
+ * received by auth of dest dentry.   includes exported inode info.
+ * dest may not have srcdir open.
+ */
 void MDCache::handle_rename(MRename *m)
 {
+  // srcdn (required)
   CInode *srcdiri = get_inode(m->get_srcdirino());
-  CDir *srcdir = 0;
-  if (srcdiri) srcdir = srcdiri->dir;
-  CDentry *srcdn = 0;
-  if (srcdir) srcdn = srcdir->lookup(m->get_srcname());
-  
-  if (!srcdn) {
-       dout(7) << "handle_rename don't have src path " << m->get_srcpath() << ", discovering" << endl;
+  CDir *srcdir = srcdiri->dir;
+  CDentry *srcdn = srcdir->lookup(m->get_srcname());
+  string srcname = srcdn->name;
+  assert(srcdn && srcdn->inode);
 
-       vector<CDentry*> trace;
-       filepath srcpath = m->get_srcpath();
-       int r = path_traverse(srcpath, trace, true,
-                                                 m, new C_MDS_RetryMessage(mds, m), 
-                                                 MDS_TRAVERSE_DISCOVERXLOCK);             // ??? FIXME
-       assert(r>0);
-       return;
-  }
-  
+  dout(7) << "handle_rename srcdn " << *srcdn << endl;
 
+  // destdn (required).  i am auth, so i will have it.
   CInode *destdiri = get_inode(m->get_destdirino());
   CDir *destdir = destdiri->dir;
   CDentry *destdn = destdir->lookup(m->get_destname());
-  assert(destdn);
   string destname = destdn->name;
+  assert(destdn);
+  
+  dout(7) << "handle_rename destdn " << *destdn << endl;
 
-  dout(7) << "handle_rename " << *srcdn << " to " << *destdn << endl;
-
-  if (srcdn->inode) {
-       // rename replica into position
-       rename_file(srcdn, destdn);
-  } else {
-       // unlink dest inode if it's there
-       if (destdn->inode) destdir->unlink_inode(destdn);
-  }
+  // note old dir auth
+  int old_dir_auth = -1;
+  if (srcdn->inode->dir) old_dir_auth = srcdn->inode->dir->authority();
+       
+  // rename replica into position
+  rename_file(srcdn, destdn);
 
   // decode + import inode (into new location start)
   int off = 0;
   decode_import_inode(destdn, m->get_inode_state(), off, m->get_source());
 
-  CInode *in = srcdn->inode;
+  CInode *in = destdn->inode;
   assert(in);
 
+  // update imports/exports?
+  if (in->is_dir()) {
+       assert(in->dir);  // i had better already ahve it open.. see MRenamePrep
+       fix_renamed_dir(srcdir, in, destdir, true,  // auth changed
+                                       old_dir_auth);              // src is possibly new dir auth.
+  }
+  
+  // mark dirty
   destdn->mark_dirty();
   in->mark_dirty();
 
-  // update imports/exports?
-  if (in->is_dir() && in->dir) 
-       fix_renamed_dir(srcdir, in, destdir, true);  // auth changed
-
-  // ok, tell the initiator
-  if (m->get_initiator() == mds->get_nodeid()) {
-       // it's me!
-       in->take_waiting(CINODE_WAIT_RENAMEACK, mds->finished_queue);
-  } else {
-       MRenameAck *ack = new MRenameAck(srcdir->ino(), srcdn->name, destdir->ino(), destdn->name);
-       mds->messenger->send_message(ack,
-                                                                MSG_ADDR_MDS(m->get_initiator()), MDS_PORT_CACHE, MDS_PORT_CACHE);
+  // ok, send notifies.
+  set<int> notify;
+  for (int i=0; i<mds->get_cluster()->get_num_mds(); i++) {
+       if (i != m->get_source() &&  // except the source
+               i != mds->get_nodeid())  // and the dest
+         notify.insert(i);
   }
-
-
-  /* let the initiator do this.
-  // drop xlock on dst
-  dentry_xlock_finish(destdn);
-  destdir->take_waiting(CDIR_WAIT_DNREAD, destname, mds->finished_queue);
-  */
+  file_rename_notify(in, srcdir, srcname, destdir, destname, notify, m->get_source());
 
   delete m;
 }
 
 
-void MDCache::handle_rename_ack(MRenameAck *m)
+void MDCache::file_rename_notify(CInode *in, 
+                                                                CDir *srcdir, string& srcname, CDir *destdir, string& destname,
+                                                                set<int>& notify,
+                                                                int srcauth)
 {
-  CInode *destdiri = get_inode(m->get_destdirino());
-  CDir *destdir = destdiri->dir;
-  CDentry *destdn = destdir->lookup(m->get_destname());
-  assert(destdn);
-  CInode *in = destdn->inode;
+  /* NOTE: notify list might include myself */
   
-  dout(7) << "handle_rename_ack on " << *in << endl;
+  // tell
+  string destdirpath;
+  destdir->inode->make_path(destdirpath);
+  
+  for (set<int>::iterator it = notify.begin();
+          it != notify.end();
+          it++) {
+       dout(10) << "file_rename_notify to " << *it << " for " << *in << endl;
+       mds->messenger->send_message(new MRenameNotify(in->ino(),
+                                                                                                  srcdir->ino(),
+                                                                                                  srcname,
+                                                                                                  destdir->ino(),
+                                                                                                  destdirpath,
+                                                                                                  destname,
+                                                                                                  srcauth),
+                                                                MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE);
+  }
+}
+
 
-  // all done!
-  in->take_waiting(CINODE_WAIT_RENAMEACK, mds->finished_queue);
 
+/************ bystanders ****************/
+
+void MDCache::handle_rename_warning(MRenameWarning *m)
+{
+  // add to warning list
+  stray_rename_warnings.insert( m->get_ino() );
+  
+  // did i already see the notify?
+  if (stray_rename_notifies.count(m->get_ino())) {
+       // i did, we're good.
+       dout(7) << "handle_rename_warning on " << m->get_ino() << ".  already got notify." << endl;
+       
+       handle_rename_notify(stray_rename_notifies[m->get_ino()]);
+       stray_rename_notifies.erase(m->get_ino());
+  } else {
+       dout(7) << "handle_rename_warning on " << m->get_ino() << ".  waiting for notify." << endl;
+  }
+  
+  // done
   delete m;
 }
 
 
 void MDCache::handle_rename_notify(MRenameNotify *m)
 {
+  // FIXME: when we do hard links, i think we need to 
+  // have srcdn and destdn both, or neither,  always!
+
+  // did i see the warning yet?
+  if (!stray_rename_warnings.count(m->get_ino())) {
+       // wait for it.
+       dout(7) << "handle_rename_notify on " << m->get_ino() << ", waiting for warning." << endl;
+       stray_rename_notifies[m->get_ino()] = m;
+       return;
+  }
+
   dout(7) << "handle_rename_notify dir " << m->get_srcdirino() << " dn " << m->get_srcname() << " to dir " << m->get_destdirino() << " dname " << m->get_destname() << endl;
   
   // src
   if (srcdn && destdir) {
        CInode *in = srcdn->inode;
 
+       int old_dir_auth = -1;
+       if (in && in->dir) old_dir_auth = in->dir->authority();
+
        if (!destdn) {
          destdn = destdir->add_dentry(m->get_destname());  // create null dentry
          destdn->lockstate = DN_LOCK_XLOCK;                // that's xlocked!
        
        if (in) {
          rename_file(srcdn, destdn);
+
+         // update imports/exports?
+         if (in && in->is_dir() && in->dir) {
+               fix_renamed_dir(srcdir, in, destdir, false, old_dir_auth);  // auth didnt change
+         }
        } else {
          dout(7) << " i don't have the inode (just null dentries)" << endl;
        }
        
-       // remove src dentry
-       //srcdn->dir->remove_dentry(srcdn);
-       
-       // update imports/exports?
-       if (in && in->is_dir() && in->dir) 
-         fix_renamed_dir(srcdir, in, destdir, false);  // auth didnt change
-       
-       //srcdir->take_waiting(CDIR_WAIT_ANY, m->get_srcname(), finished);
-       //destdir->take_waiting(CDIR_WAIT_ANY, m->get_destname(), finished);
   }
 
   else if (srcdn) {
-       if (srcdn->inode && srcdn->inode->dir) {
-         dout(7) << "handle_rename_notify no dest, but src is an open dir." << endl;
-         dout(7) << "srcdn is " << *srcdn << endl;
-
-         if (destdiri) {
-               dout(7) << "have destdiri, opening dir " << *destdiri << endl;
-               open_remote_dir(destdiri,
-                                               new C_MDS_RetryMessage(mds,m));
-         } else {
-               filepath destdirpath = m->get_destdirpath();
-               dout(7) << "don't have destdiri even, doing traverse+discover on " << destdirpath << endl;
+       dout(7) << "handle_rename_notify no dest, but have src" << endl;
+       dout(7) << "srcdn is " << *srcdn << endl;
 
-               vector<CDentry*> trace;
-               int r = path_traverse(destdirpath, trace, true,
-                                                         m, new C_MDS_RetryMessage(mds, m), 
-                                                         MDS_TRAVERSE_DISCOVER);
-               assert(r>0);
-         }
-         return;
+       if (destdiri) {
+         dout(7) << "have destdiri, opening dir " << *destdiri << endl;
+         open_remote_dir(destdiri,
+                                         new C_MDS_RetryMessage(mds,m));
        } else {
-         dout(7) << "handle_rename_notify unlinking src only " << *srcdn << endl;
-         if (srcdn->inode) {
-               assert(!srcdn->inode->dir);    // dir shouldn't be open, or we would have discovered dest (above)
-               srcdir->unlink_inode(srcdn);
-         }
+         filepath destdirpath = m->get_destdirpath();
+         dout(7) << "don't have destdiri even, doing traverse+discover on " << destdirpath << endl;
+         
+         vector<CDentry*> trace;
+         int r = path_traverse(destdirpath, trace, true,
+                                                       m, new C_MDS_RetryMessage(mds, m), 
+                                                       MDS_TRAVERSE_DISCOVER);
+         assert(r>0);
        }
+       return;
   }
 
   else if (destdn) {
 
 
   // ack
-  dout(10) << "sending RenameNotifyAck back to initiator" << endl;
+  dout(10) << "sending RenameNotifyAck back to srcauth " << m->get_srcauth() << endl;
   MRenameNotifyAck *ack = new MRenameNotifyAck(m->get_ino());
   mds->messenger->send_message(ack,
-                                                          MSG_ADDR_MDS(m->get_source()), MDS_PORT_CACHE, MDS_PORT_CACHE);
+                                                          MSG_ADDR_MDS(m->get_srcauth()), MDS_PORT_CACHE, MDS_PORT_CACHE);
   
+
+  stray_rename_warnings.erase( m->get_ino() );
   delete m;
 }
 
 
 
 
+
+
 // locks ----------------------------------------------------------------
 
 /*
 
        if (in->is_proxy()) {
          // fw
-         int newauth = ino_proxy_auth(in->ino(), 
-                                                                  from,
-                                                                  export_proxy_inos);
+         int newauth = in->authority();
          assert(newauth >= 0);
          dout(7) << "handle_lock " << m->get_ino() << " from " << from << ": proxy, fw to " << newauth << endl;
          mds->messenger->send_message(m,
        
        if (in->is_proxy()) {
          // fw
-         int newauth = ino_proxy_auth(in->ino(), 
-                                                                  from,
-                                                                  export_proxy_inos);
+         int newauth = in->authority();
          assert(newauth >= 0);
          dout(7) << "handle_lock " << m->get_ino() << " from " << from << ": proxy, fw to " << newauth << endl;
          mds->messenger->send_message(m,
 
 // DENTRY
 
-bool MDCache::dentry_xlock_start(CDentry *dn, Message *m, CInode *ref, bool all_nodes)
+bool MDCache::dentry_xlock_start(CDentry *dn, Message *m, CInode *ref)
 {
   dout(7) << "dentry_xlock_start on " << *dn << endl;
 
   // mine!
   dn->xlockedby = m;
 
-  if (dn->dir->is_open_by_anyone() || all_nodes) {
+  if (dn->dir->is_open_by_anyone()) {
        dn->lockstate = DN_LOCK_PREXLOCK;
        
        // xlock with whom?
-       set<int> who;
-       if (all_nodes) {
-         for (int i=0; i<mds->get_cluster()->get_num_mds(); i++)
-               if (i != mds->get_nodeid()) who.insert(i);
-       } else {
-         who = dn->dir->get_open_by();
-       }
+       set<int> who = dn->dir->get_open_by();
        dn->gather_set = who;
 
        // make path
 
   void finish(int r) {
        cout << "xlockrequest->finish r = " << r << endl;
-       if (r == 0) {
+       if (r == 1) {  // 1 for xlock request success
          CDentry *dn = dir->lookup(dname);
          if (dn && dn->xlockedby == 0) {
-               cout << "xlock request success, now xlocked by " << req << endl;
                // success
                dn->xlockedby = req;   // our request was the winner
+               cout << "xlock request success, now xlocked by req " << req << " dn " << *dn << endl;
 
                // remember!
                mdc->active_requests[req].foreign_xlocks.insert(dn);
          if (dir->is_proxy()) {
 
                assert(dauth >= 0);
+
+               if (dauth == m->get_asker() && 
+                       (m->get_action() == LOCK_AC_REQXLOCK ||
+                        m->get_action() == LOCK_AC_REQXLOCKC)) {
+                 dout(7) << "handle_lock_dn got reqxlock from " << dauth << " and they are auth.. dropping on floor (their import will have woken them up)" << endl;
+                 if (active_requests.count(m)) 
+                       request_finish(m);
+                 else
+                       delete m;
+                 return;
+               }
+
                dout(7) << "handle_lock_dn " << m << " " << m->get_ino() << " dname " << dname << " from " << from << ": proxy, fw to " << dauth << endl;
 
-               // fw
+               // forward
                if (active_requests.count(m)) {
                  // xlock requests are requests, use request_* functions!
                  assert(m->get_action() == LOCK_AC_REQXLOCK ||
                                 m->get_action() == LOCK_AC_REQXLOCKC);
-                 request_forward(m, dauth);
+                 // forward as a request
+                 request_forward(m, dauth, MDS_PORT_CACHE);
                } else {
-                 // not an xlock req (or we just didn't register the request yet)
+                 // not an xlock req, or it is and we just didn't register the request yet
+                 // forward normally
                  mds->messenger->send_message(m,
                                                                           MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
                                                                           MDS_PORT_CACHE);
          assert(dir);  // we should still have the dir, though!  the requester has the dir open.
          switch (m->get_action()) {
 
+         case LOCK_AC_LOCK:
+               dout(7) << "handle_lock_dn xlock on " << dname << ", adding (null)" << endl;
+               dn = dir->add_dentry(dname);
+               break;
+
          case LOCK_AC_REQXLOCK:
                // send nak
                {
   case LOCK_AC_SYNC:
        assert(dn->lockstate == DN_LOCK_XLOCK);
        dn->lockstate = DN_LOCK_SYNC;
+       dn->xlockedby = 0;
 
        // null?  hose it.
        if (dn->is_null()) {
        {
          dout(10) << "handle_lock_dn got ack/nak on a reqxlock for " << *dn << endl;
          list<Context*> finished;
-         dir->take_waiting(CDIR_WAIT_DNREQXLOCK, m->get_dn(), finished);
+         dir->take_waiting(CDIR_WAIT_DNREQXLOCK, m->get_dn(), finished, 1);  // TAKE ONE ONLY!
          finish_contexts(finished, 
-                                         (m->get_action() == LOCK_AC_REQXLOCKACK) ? 0:-1);
+                                         (m->get_action() == LOCK_AC_REQXLOCKACK) ? 1:-1);
        }
        break;
 
        }
 
        if (dn->xlockedby != m) {
-         if (!dentry_xlock_start(dn, m, dir->inode, true)) {
+         if (!dentry_xlock_start(dn, m, dir->inode)) {
                // hose null dn if we're waiting on something
                if (dn->is_clean() && dn->is_null() && dn->is_sync()) dir->remove_dentry(dn);
                return;    // waiting for xlock
          // successfully xlocked!  on behalf of requestor.
          string path;
          dn->make_path(path);
+
+         dout(7) << "handle_lock_dn reqxlock success for " << m->get_asker() << " on " << *dn << ", acking" << endl;
          
          // ACK xlock request
          MLock *reply = new MLock(LOCK_AC_REQXLOCKACK, mds->get_nodeid());
                                                                   MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE,
                                                                   MDS_PORT_CACHE);
 
+         /* no: keep this around!
+         dn->xlockedby = DN_XLOCK_FOREIGN;  // not 0, not a valid pointer.
+
          // disassociate xlock from MLock
          active_requests[m].xlocks.clear();
          active_requests[m].traces.clear();
-         dn->xlockedby = DN_XLOCK_FOREIGN;  // not 0, not a valid pointer.
 
          // finish request
          request_finish(m);
+         */
          return;
        }
        break;
        {
          CDir *dir = dn->dir;
          string dname = dn->name;
-         dentry_xlock_finish(dn);
-         dir->take_waiting(CDIR_WAIT_ANY, dname, mds->finished_queue);
+
+         Message *m = dn->xlockedby;
+
+         // finish request
+         request_finish(m);  // this will drop the locks (and unpin paths!)
+
+         // unxlock
+         //dentry_xlock_finish(dn);
+         //dir->take_waiting(CDIR_WAIT_ANY, dname, mds->finished_queue);
+
+         return;
        }
        break;
 
 
 
 
-// ino proxy
-
-int MDCache::ino_proxy_auth(inodeno_t ino, 
-                                                       int frommds,
-                                                       map<CDir*, set<inodeno_t> >& inomap) 
-{
-  // check proxy sets for this ino
-  for (map<CDir*, set<inodeno_t> >::iterator wit = inomap.begin();
-          wit != inomap.end();
-          wit++) {
-       CDir *dir = wit->first;
-
-       // does this map apply to this node?
-       if (export_notify_ack_waiting[dir].count(frommds) == 0) continue;
-
-       // is this ino in the set?
-       if (inomap[dir].count(ino)) {
-         int dirauth = dir->authority();
-         assert(dirauth >= 0);
-         return dirauth;
-       }
-  }
-  return -1;   // no proxy
-}
-
-
-void MDCache::do_ino_proxy(CInode *in, Message *m)
-{
-  // check proxy maps
-  int newauth = ino_proxy_auth(in->ino(), 
-                                                          m->get_source(),    // works bc we only every proxy 1 hop
-                                                          export_proxy_inos);
-  dout(7) << "inode " << *in << " proxy, new auth is " << newauth << endl;
-  assert(newauth >= 0);     // we should know the new authority!
-  assert(in->is_frozen());  // i should be frozen right now!
-  assert(in->state_test(CINODE_STATE_PROXY));
-  
-  // forward
-  mds->messenger->send_message(m,
-                                                          MSG_ADDR_MDS(newauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
-  return;
-}
-
-
-void MDCache::do_dir_proxy(CDir *dir, Message *m)
-{
-  // check proxy maps
-  int newauth = ino_proxy_auth(dir->ino(), 
-                                                          m->get_source(),   // works because we only every proxy 1 hop
-                                                          export_proxy_dirinos);
-  dout(7) << "dir " << *dir << " proxy, new auth is " << newauth << endl;
-  assert(newauth >= 0);     // we should know the new authority!
-  assert(dir->is_frozen());  // i should be frozen right now!
-  assert(dir->state_test(CDIR_STATE_PROXY));
-  
-  // forward
-  mds->messenger->send_message(m,
-                                                          MSG_ADDR_MDS(newauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
-  return;
-}
-
-
 
 
 
 
 // IMPORT/EXPORT
 
-void MDCache::find_nested_exports(CDir *dir, set<CDir*>& s)
+void MDCache::find_nested_exports(CDir *dir, set<CDir*>& s) 
 {
   CDir *import = get_containing_import(dir);
+  find_nested_exports_under(import, dir, s);
+}
+
+void MDCache::find_nested_exports_under(CDir *import, CDir *dir, set<CDir*>& s)
+{
+
+  dout(10) << "find_nested_exports for " << *dir << endl;
+  dout(10) << "find_nested_exports under import " << *import << endl;
 
   if (import == dir) {
        // yay, my job is easy!
           p != nested_exports[import].end();
           p++) {
        CDir *nested = *p;
+       
+       dout(12) << "find_nested_exports checking " << *nested << endl;
 
        // trace back to import, or dir
        CDir *cur = nested->get_parent_dir();
-       while (!cur->is_import()) {
+       while (!cur->is_import() || cur == dir) {
          if (cur == dir) {
                s.insert(nested);
                dout(10) << "find_nested_exports " << *dir << " " << *nested << endl;
   
 
   // done
+  stray_export_warnings.erase( m->get_ino() );
   delete m;
 }