]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
migrator export_caps
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 19 Oct 2007 17:04:00 +0000 (17:04 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 19 Oct 2007 17:04:00 +0000 (17:04 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1971 29311d96-e01e-0410-9327-a35deaab8ce9

23 files changed:
branches/sage/mds/TODO
branches/sage/mds/client/Client.cc
branches/sage/mds/client/SyntheticClient.cc
branches/sage/mds/mds/CInode.cc
branches/sage/mds/mds/CInode.h
branches/sage/mds/mds/LogEvent.cc
branches/sage/mds/mds/LogEvent.h
branches/sage/mds/mds/MDCache.cc
branches/sage/mds/mds/MDLog.h
branches/sage/mds/mds/Migrator.cc
branches/sage/mds/mds/Migrator.h
branches/sage/mds/mds/Server.cc
branches/sage/mds/mds/events/ESessions.h [new file with mode: 0644]
branches/sage/mds/mds/journal.cc
branches/sage/mds/messages/MClientFileCaps.h
branches/sage/mds/messages/MExportCaps.h [new file with mode: 0644]
branches/sage/mds/messages/MExportCapsAck.h [new file with mode: 0644]
branches/sage/mds/messages/MExportStrays.h [new file with mode: 0644]
branches/sage/mds/messages/MMDSGetMap.h
branches/sage/mds/mon/MDSMonitor.cc
branches/sage/mds/mon/MDSMonitor.h
branches/sage/mds/msg/Message.cc
branches/sage/mds/msg/Message.h

index ed581ab0b350b9d98959f5a3adc6c45636073385..5e1a37b7441a4fd19b1ae14e0843f990c4a19b0f 100644 (file)
@@ -53,10 +53,12 @@ mdsmon
 
 
 
+- clean up client mds session vs mdsmap behavior
+
+
 mds bugs
 - open file rejournaling vs capped log...
   - open files vs shutdown in general!  need to export any caps on replicated metadata
-- export caps to auth on unlinked inodes
 - stray purge on shutdown
 
 - rename slave in-memory rollback on failure
@@ -64,22 +66,16 @@ mds bugs
 - fix purge_stray bug
 - try_remove_unlinked_dn thing
 
-- client session open from locker.. doesn't work properly with delays
- -> journal the session open _with_ the import(start)
-
 - proper handling of cache expire messages during rejoin phase?
 
-- verify once-per-segment jouranl context is working...
-
 mds
 - extend/clean up filepath to allow paths relative to an ino
   - fix path_traverse
   - fix reconnect/rejoin open file weirdness
 
-- get rid of replicate objects for replicate_to .. encode to bufferlists directly
+- get rid of C*Discover objects for replicate_to .. encode to bufferlists directly
 
 - stray reintegration
-- verify stray is empty on shutdown
 
 - real chdir (directory "open")
   - relative metadata ops
index 67c5af7101ed5fe1b49405cc11c7f588697beb0d..117b2b08e401de6964c5983a1ad2f1954b592d76 100644 (file)
@@ -577,6 +577,7 @@ int Client::choose_target_mds(MClientRequest *req)
   if (!diri || g_conf.client_use_random_mds) {
     // no root info, pick a random MDS
     mds = mdsmap->get_random_in_mds();
+    dout(0) << "random mds" << mds << dendl;
     if (mds < 0) mds = 0;
 
     if (0) {
@@ -673,15 +674,15 @@ MClientReply *Client::make_request(MClientRequest *req,
     if (mds_sessions.count(mds) == 0) {
       Cond cond;
 
-      if (!mdsmap->have_inst(mds)) {
+      if (!mdsmap->is_active(mds)) {
        dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl;
        int mon = monmap->pick_mon();
-       messenger->send_message(new MMDSGetMap(),
+       messenger->send_message(new MMDSGetMap(mdsmap->get_epoch()),
                                monmap->get_inst(mon));
        waiting_for_mdsmap.push_back(&cond);
        cond.Wait(client_lock);
 
-       if (!mdsmap->have_inst(mds)) {
+       if (!mdsmap->is_active(mds)) {
          dout(10) << "hmm, still have no address for mds" << mds << ", trying a random mds" << dendl;
          request.resend_mds = mdsmap->get_random_in_mds();
          continue;
@@ -1103,11 +1104,11 @@ void Client::handle_file_caps(MClientFileCaps *m)
   mds_sessions[mds]++;
 
   // reap?
-  if (m->get_op() == MClientFileCaps::OP_REAP) {
+  if (m->get_op() == MClientFileCaps::OP_IMPORT) {
     int other = m->get_mds();
 
     if (in && in->stale_caps.count(other)) {
-      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " reap on mds" << other << dendl;
+      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " imported from mds" << other << dendl;
 
       // fresh from new mds?
       if (!in->caps.count(mds)) {
@@ -1122,7 +1123,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
       
       // fall-thru!
     } else {
-      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) reap on mds" << other << dendl;
+      dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) import from mds" << other << dendl;
       // delay!
       cap_reap_queue[in->ino()][other] = m;
       return;
@@ -1132,8 +1133,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
   assert(in);
   
   // stale?
-  if (m->get_op() == MClientFileCaps::OP_STALE) {
-    dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now stale" << dendl;
+  if (m->get_op() == MClientFileCaps::OP_EXPORT) {
+    dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now exported/stale" << dendl;
     
     // move to stale list
     assert(in->caps.count(mds));
index 3df7f5cf734f522075ab45342915b2ec5ee02f0a..270ebafec010f7b590cc789468b4124e6a38a2d1 100644 (file)
@@ -1578,8 +1578,8 @@ int SyntheticClient::make_files(int num, int count, int priv, bool more)
       if (more) {
         client->lstat(d, &st);
         int fd = client->open(d, O_RDONLY);
-        //client->unlink(d);
-        //client->close(fd);
+        client->unlink(d);
+        client->close(fd);
       }
 
       if (time_to_stop()) return 0;
@@ -1653,6 +1653,12 @@ int SyntheticClient::open_shared(int num, int count)
       fds.push_back(fd);
     }
 
+    if (1)
+      for (int n=0; n<num; n++) {
+       sprintf(d,"file.%d", n);
+       client->unlink(d);
+      }
+
     while (!fds.empty()) {
       int fd = fds.front();
       fds.pop_front();
index ccb1359329aba07fd22515bf87c74726e28bd0b4..d5981eea5ca3bb1d7d22e0cf777bb6a50507efd5 100644 (file)
@@ -779,10 +779,6 @@ void CInode::encode_export(bufferlist& bl)
  
   ::_encode_simple(replica_map, bl);
 
-  map<int,Capability::Export>  cap_map;
-  export_client_caps(cap_map);
-  ::_encode_simple(cap_map, bl);
-
   authlock._encode(bl);
   linklock._encode(bl);
   dirfragtreelock._encode(bl);
@@ -803,7 +799,6 @@ void CInode::finish_export(utime_t now)
 }
 
 void CInode::decode_import(bufferlist::iterator& p,
-                          map<CInode*, map<int,Capability::Export> >& imported_cap_map,
                           LogSegment *ls)
 {
   utime_t old_mtime = inode.mtime;
@@ -826,13 +821,6 @@ void CInode::decode_import(bufferlist::iterator& p,
   ::_decode_simple(replica_map, p);
   if (!replica_map.empty()) get(PIN_REPLICATED);
 
-  map<int,Capability::Export>  cap_map;
-  ::_decode_simple(cap_map, p);
-  if (!cap_map.empty()) {
-    imported_cap_map[this].swap(cap_map);
-    get(PIN_IMPORTINGCAPS);
-  }
-
   authlock._decode(p);
   linklock._decode(p);
   dirfragtreelock._decode(p);
@@ -840,14 +828,5 @@ void CInode::decode_import(bufferlist::iterator& p,
   dirlock._decode(p);
 }
 
-/*
- * the cap import is delayed so that we can journal before 
- * contacting clients
- */
-void CInode::import_caps(map<int,Capability::Export>& cap_map,
-                        set<int>& new_caps)
-{
-  merge_client_caps(cap_map, new_caps);
-  put(PIN_IMPORTINGCAPS);
-}
+
 
index 2089ee18e1e5f50ee83a23d482a6c894ed03134e..ef05b81b2c6cfaa4ecaab6fba1f6846515c638d6 100644 (file)
@@ -98,6 +98,7 @@ class CInode : public MDSCacheObject {
   static const int STATE_FREEZING =    (1<<7);
   static const int STATE_FROZEN =      (1<<8);
   static const int STATE_AMBIGUOUSAUTH = (1<<9);
+  static const int STATE_EXPORTINGCAPS = (1<<10);
 
   // -- waiters --
   //static const int WAIT_SLAVEAGREE  = (1<<0);
@@ -307,12 +308,8 @@ public:
   void abort_export() {
     put(PIN_TEMPEXPORTING);
   }
-  void decode_import(bufferlist::iterator& p,
-                    map<CInode*, map<int,Capability::Export> >& imported_cap_map,
-                    LogSegment *ls);
-  void import_caps(map<int,Capability::Export>& cap_map,
-                  set<int>& new_caps);
-
+  void decode_import(bufferlist::iterator& p, LogSegment *ls);
+  
 
   // -- locks --
 public:
index 05b4336c52f05dd0fba7a75d3a122b54c6eb4fe6..65b0bb2ec1322631dc824c512ed73541ae455829 100644 (file)
 // events i know of
 #include "events/EString.h"
 
-#include "events/ESession.h"
 #include "events/ESubtreeMap.h"
 #include "events/EExport.h"
 #include "events/EImportStart.h"
 #include "events/EImportFinish.h"
 #include "events/EFragment.h"
 
+#include "events/ESession.h"
+#include "events/ESessions.h"
+
 #include "events/EUpdate.h"
 #include "events/ESlaveUpdate.h"
 #include "events/EOpen.h"
@@ -55,13 +57,15 @@ LogEvent *LogEvent::decode(bufferlist& bl)
   switch (type) {
   case EVENT_STRING: le = new EString; break;
 
-  case EVENT_SESSION: le = new ESession; break;
   case EVENT_SUBTREEMAP: le = new ESubtreeMap; break;
   case EVENT_EXPORT: le = new EExport; break;
   case EVENT_IMPORTSTART: le = new EImportStart; break;
   case EVENT_IMPORTFINISH: le = new EImportFinish; break;
   case EVENT_FRAGMENT: le = new EFragment; break;
 
+  case EVENT_SESSION: le = new ESession; break;
+  case EVENT_SESSIONS: le = new ESessions; break;
+
   case EVENT_UPDATE: le = new EUpdate; break;
   case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
   case EVENT_OPEN: le = new EOpen; break;
index 8f2f55f342bb3f959d4c5727ce3bad6bdcf2cfd5..8d36a1d515c1c03ad3a89376187de3d9b500ac09 100644 (file)
 
 #define EVENT_STRING       1
 
-#define EVENT_SESSION      7
 #define EVENT_SUBTREEMAP   2
-#define EVENT_EXPORT       30
-#define EVENT_IMPORTSTART  31
-#define EVENT_IMPORTFINISH 32
-#define EVENT_FRAGMENT     33
+#define EVENT_EXPORT       3
+#define EVENT_IMPORTSTART  4
+#define EVENT_IMPORTFINISH 5
+#define EVENT_FRAGMENT     6
 
-#define EVENT_UPDATE       3
-#define EVENT_SLAVEUPDATE  4
-#define EVENT_OPEN         5
+#define EVENT_SESSION      10
+#define EVENT_SESSIONS     11
 
-#define EVENT_PURGEFINISH  22
+#define EVENT_UPDATE       20
+#define EVENT_SLAVEUPDATE  21
+#define EVENT_OPEN         22
+
+#define EVENT_PURGEFINISH  30
 
 #define EVENT_ANCHOR       40
 #define EVENT_ANCHORCLIENT 41
index 32201986d9f40dbb639e87109a627e45fc4ddf52..8456b5bb555b1fedf0e3a17c146926d7f160e419 100644 (file)
@@ -2651,7 +2651,7 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t&
   
   // send REAP
   // FIXME client session weirdness.
-  MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_REAP,
+  MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
                                              in->inode,
                                              in->client_caps[client].get_last_seq(),
                                              in->client_caps[client].pending(),
@@ -5683,6 +5683,14 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m)
        dn->dir->unlink_inode(dn);
        assert(straydn);
        straydn->dir->link_primary_inode(straydn, in);
+
+       // send caps to auth (if we're not already)
+       if (in->is_any_caps() &&
+           !in->state_test(CInode::STATE_EXPORTINGCAPS))
+         migrator->export_caps(in, in->authority().first);
+       
+       lru.lru_bottouch(straydn);  // move stray to end of lru
+
       } else {
        assert(dn->is_remote());
        dn->dir->unlink_inode(dn);
index f7bdcd21a53031a5ab1c58e0aa94a5e1b672e17a..c958585b86a4825a2882311615b8aff9c1bf6709 100644 (file)
@@ -163,6 +163,9 @@ public:
   void submit_entry( LogEvent *e, Context *c = 0 );
   void wait_for_sync( Context *c );
   void flush();
+  bool is_flushed() {
+    return unflushed == 0;
+  }
 
 private:
   class C_MaybeExpiredSegment : public Context {
index e76fcec0298153b59a288f3aa9e482595006c101..bc7c82396fe55f09a850a217a240388d3e0e5457 100644 (file)
@@ -32,6 +32,7 @@
 #include "events/EExport.h"
 #include "events/EImportStart.h"
 #include "events/EImportFinish.h"
+#include "events/ESessions.h"
 
 #include "msg/Messenger.h"
 
@@ -48,6 +49,9 @@
 #include "messages/MExportDirNotifyAck.h"
 #include "messages/MExportDirFinish.h"
 
+#include "messages/MExportCaps.h"
+#include "messages/MExportCapsAck.h"
+
 #include "config.h"
 
 #define  dout(l)    if (l<=g_conf.debug || l <= g_conf.debug_mds || l <= g_conf.debug_mds_migrator) *_dout << dbeginl << g_clock.now() << " mds" << mds->get_nodeid() << ".migrator "
@@ -93,6 +97,14 @@ void Migrator::dispatch(Message *m)
     handle_export_notify((MExportDirNotify*)m);
     break;
 
+    // caps
+  case MSG_MDS_EXPORTCAPS:
+    handle_export_caps((MExportCaps*)m);
+    break;
+  case MSG_MDS_EXPORTCAPSACK:
+    handle_export_caps_ack((MExportCapsAck*)m);
+    break;
+
   default:
     assert(0);
   }
@@ -759,12 +771,32 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
 }
 
 
+class C_M_ExportGo : public Context {
+  Migrator *migrator;
+  CDir *dir;
+public:
+  C_M_ExportGo(Migrator *m, CDir *d) : migrator(m), dir(d) {}
+  void finish(int r) {
+    migrator->export_go_synced(dir);
+  }
+};
+
 void Migrator::export_go(CDir *dir)
-{  
+{
   assert(export_peer.count(dir));
   int dest = export_peer[dir];
   dout(7) << "export_go " << *dir << " to " << dest << dendl;
 
+  // first sync log to flush out e.g. any cap imports
+  mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir));
+}
+
+void Migrator::export_go_synced(CDir *dir)
+{  
+  assert(export_peer.count(dir));
+  int dest = export_peer[dir];
+  dout(7) << "export_go_synced " << *dir << " to " << dest << dendl;
+
   cache->show_subtrees();
   
   export_warning_ack_waiting.erase(dir);
@@ -831,6 +863,20 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
   ::_encode_simple(in->inode.ino, enc_state);
   in->encode_export(enc_state);
 
+  // caps 
+  encode_export_inode_caps(in, enc_state, exported_client_map);
+}
+
+void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl, 
+                                       map<int,entity_inst_t>& exported_client_map)
+{
+  // encode caps
+  map<int,Capability::Export> cap_map;
+  in->export_client_caps(cap_map);
+  ::_encode_simple(cap_map, bl);
+
+  in->state_set(CInode::STATE_EXPORTINGCAPS);
+
   // make note of clients named by exported capabilities
   for (map<int, Capability>::iterator it = in->client_caps.begin();
        it != in->client_caps.end();
@@ -840,13 +886,15 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
 
 void Migrator::finish_export_inode_caps(CInode *in)
 {
-  // tell (all) clients about migrating caps.. mark STALE
+  in->state_clear(CInode::STATE_EXPORTINGCAPS);
+
+  // tell (all) clients about migrating caps.. 
   for (map<int, Capability>::iterator it = in->client_caps.begin();
        it != in->client_caps.end();
        it++) {
     dout(7) << "finish_export_inode telling client" << it->first
-           << " stale caps on " << *in << dendl;
-    MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE,
+           << " exported caps on " << *in << dendl;
+    MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_EXPORT,
                                             in->inode, 
                                              it->second.get_last_seq(), 
                                              it->second.pending(),
@@ -1765,10 +1813,10 @@ void Migrator::import_reverse(CDir *dir)
     in->export_client_caps(cap_map);
     finish_export_inode_caps(in);
   }
-
+        
   // log our failure
   mds->mdlog->submit_entry(new EImportFinish(dir, false));     // log failure
-
+       
   // bystanders?
   if (import_bystanders[dir].empty()) {
     dout(7) << "no bystanders, finishing reverse now" << dendl;
@@ -1827,27 +1875,10 @@ void Migrator::import_reverse_final(CDir *dir)
 }
 
 
-void Migrator::finish_import_caps(CInode *in, int from, map<int,Capability::Export> &cap_map)
-{
-  set<int> new_caps;
-  in->import_caps(cap_map, new_caps);
-  
-  for (set<int>::iterator it = new_caps.begin();
-       it != new_caps.end();
-       it++) {
-    dout(0) << "merged caps for client" << *it << " on " << *in << dendl;
-    MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_REAP,
-                                               in->inode,
-                                               in->client_caps[*it].get_last_seq(),
-                                               in->client_caps[*it].pending(),
-                                               in->client_caps[*it].wanted());
-    caps->set_mds(from); // reap from whom?
-    mds->send_message_client(caps, *it);
-  }
-}
+
 
 void Migrator::import_logged_start(CDir *dir, int from,
-                                  map<int,entity_inst_t> &imported_client_map)
+                                  map<int,entity_inst_t>imported_client_map)
 {
   dout(7) << "import_logged " << *dir << dendl;
 
@@ -1860,7 +1891,7 @@ void Migrator::import_logged_start(CDir *dir, int from,
   for (map<CInode*, map<int,Capability::Export> >::iterator p = import_caps[dir].begin();
        p != import_caps[dir].end();
        ++p) {
-        finish_import_caps(p->first, from, p->second);
+    finish_import_inode_caps(p->first, from, p->second);
   }
   
   // send notify's etc.
@@ -1952,8 +1983,11 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o
   }
 
   // state after link  -- or not!  -sage
-  in->decode_import(blp, cap_imports, ls);  // cap imports are noted for later action
+  in->decode_import(blp, ls);  // cap imports are noted for later action
+
+  // caps
+  decode_import_inode_caps(in, blp, cap_imports);
+
   // link before state  -- or not!  -sage
   if (dn->inode != in) {
     assert(!dn->inode);
@@ -1986,6 +2020,40 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o
   
 }
 
+void Migrator::decode_import_inode_caps(CInode *in,
+                                       bufferlist::iterator &blp,
+                                       map<CInode*, map<int,Capability::Export> >& cap_imports)
+{
+  map<int,Capability::Export> cap_map;
+  ::_decode_simple(cap_map, blp);
+  if (!cap_map.empty()) {
+    cap_imports[in].swap(cap_map);
+    in->get(CInode::PIN_IMPORTINGCAPS);
+  }
+}
+
+void Migrator::finish_import_inode_caps(CInode *in, int from, 
+                                       map<int,Capability::Export> &cap_map)
+{
+  assert(!cap_map.empty());
+
+  set<int> new_caps;
+  in->merge_client_caps(cap_map, new_caps);
+  in->put(CInode::PIN_IMPORTINGCAPS);
+  
+  for (set<int>::iterator it = new_caps.begin();
+       it != new_caps.end();
+       it++) {
+    dout(0) << "finish_import_inode_caps for client" << *it << " on " << *in << dendl;
+    MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
+                                               in->inode,
+                                               in->client_caps[*it].get_last_seq(),
+                                               in->client_caps[*it].pending(),
+                                               in->client_caps[*it].wanted());
+    caps->set_mds(from); // from whom?
+    mds->send_message_client(caps, *it);
+  }
+}
 
 int Migrator::decode_import_dir(bufferlist::iterator& blp,
                                int oldauth,
@@ -2151,8 +2219,90 @@ void Migrator::handle_export_notify(MExportDirNotify *m)
 
 
 
+/** cap exports **/
+
+
+
+void Migrator::export_caps(CInode *in, int dest)
+{
+  dout(7) << "export_caps to mds" << dest << " " << *in << dendl;
+
+  assert(in->is_any_caps());
+  assert(!in->is_auth());
+  assert(!in->is_ambiguous_auth());
+  assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
+
+  MExportCaps *ex = new MExportCaps;
+  ex->ino = in->ino();
+
+  encode_export_inode_caps(in, ex->cap_bl, ex->client_map);
+
+  mds->send_message_mds(ex, dest, MDS_PORT_MIGRATOR);
+}
+
+void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+{
+  CInode *in = cache->get_inode(ack->ino);
+  assert(in);
+  dout(10) << "handle_export_caps_ack " << *ack << " from " << ack->get_source() 
+          << " on " << *in
+          << dendl;
+  
+  finish_export_inode_caps(in);
+  delete ack;
+}
+
+
+class C_M_LoggedImportCaps : public Context {
+  Migrator *migrator;
+  CInode *in;
+  int from;
+public:
+  map<CInode*, map<int,Capability::Export> > cap_imports;
+
+  C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
+  void finish(int r) {
+    migrator->logged_import_caps(in, from, cap_imports);
+  }  
+};
+
+void Migrator::handle_export_caps(MExportCaps *ex)
+{
+  dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl;
+  CInode *in = cache->get_inode(ex->ino);
+  
+  assert(in->is_auth());
+  /*
+   * note: i may be frozen, but i won't have been encoded for export (yet)!
+   *  see export_go() vs export_go_synced().
+   */
+
+  C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num());
+  ESessions *le = new ESessions(mds->clientmap.inc_projected());
+
+  // decode new caps
+  bufferlist::iterator blp = ex->cap_bl.begin();
+  decode_import_inode_caps(in, blp, finish->cap_imports);
+  assert(!finish->cap_imports.empty());   // thus, inode is pinned.
+  
+  // journal open client sessions
+  mds->server->prepare_force_open_sessions(ex->client_map);
+  le->client_map.swap(ex->client_map);
+  
+  mds->mdlog->submit_entry(le, finish);
 
+  delete ex;
+}
 
 
+void Migrator::logged_import_caps(CInode *in, 
+                                 int from,
+                                 map<CInode*, map<int,Capability::Export> >& cap_imports) 
+{
+  dout(10) << "logged_import_caps on " << *in << dendl;
+  assert(cap_imports.count(in));
+  finish_import_inode_caps(in, from, cap_imports[in]);  
 
+  mds->send_message_mds(new MExportCapsAck(in->ino()), from, MDS_PORT_MIGRATOR);
+}
 
index 26dadbb15ed68464a895498aa65d516004d3b8c2..ca84968ac5d57ba193b54c7d6f07955876a536ba 100644 (file)
@@ -41,6 +41,9 @@ class MExportDirNotify;
 class MExportDirNotifyAck;
 class MExportDirFinish;
 
+class MExportCaps;
+class MExportCapsAck;
+
 class EImportStart;
 
 
@@ -178,9 +181,11 @@ public:
   void clear_export_queue() {
     export_queue.clear();
   }
-
-  void encode_export_inode(CInode *in, bufferlist& enc_state
+  
+  void encode_export_inode(CInode *in, bufferlist& bl
                           map<int,entity_inst_t>& exported_client_map);
+  void encode_export_inode_caps(CInode *in, bufferlist& bl,
+                               map<int,entity_inst_t>& exported_client_map);
   void finish_export_inode(CInode *in, utime_t now, list<Context*>& finished);
   void finish_export_inode_caps(CInode *in);
 
@@ -195,20 +200,25 @@ public:
   }
   void clear_export_proxy_pins(CDir *dir);
 
+  void export_caps(CInode *in, int dest);
+
  protected:
   void handle_export_discover_ack(MExportDirDiscoverAck *m);
   void export_frozen(CDir *dir);
   void handle_export_prep_ack(MExportDirPrepAck *m);
   void export_go(CDir *dir);
+  void export_go_synced(CDir *dir);
   void export_reverse(CDir *dir);
   void handle_export_ack(MExportDirAck *m);
   void export_logged_finish(CDir *dir);
   void handle_export_notify_ack(MExportDirNotifyAck *m);
   void export_finish(CDir *dir);
 
+  void handle_export_caps_ack(MExportCapsAck *m);
+
   friend class C_MDC_ExportFreeze;
   friend class C_MDS_ExportFinishLogged;
-
+  friend class C_M_ExportGo;
 
   // importer
   void handle_export_discover(MExportDirDiscover *m);
@@ -221,6 +231,10 @@ public:
                           LogSegment *ls,
                           map<CInode*, map<int,Capability::Export> >& cap_imports,
                           list<ScatterLock*>& updated_scatterlocks);
+  void decode_import_inode_caps(CInode *in,
+                               bufferlist::iterator &blp,
+                               map<CInode*, map<int,Capability::Export> >& cap_imports);
+  void finish_import_inode_caps(CInode *in, int from, map<int,Capability::Export> &cap_map);
   int decode_import_dir(bufferlist::iterator& blp,
                        int oldauth,
                        CDir *import_root,
@@ -228,7 +242,6 @@ public:
                        LogSegment *ls,
                        map<CInode*, map<int,Capability::Export> >& cap_imports,
                        list<ScatterLock*>& updated_scatterlocks);
-  void finish_import_caps(CInode *in, int from, map<int,Capability::Export> &cap_map);
 
 public:
   void import_reverse(CDir *dir);
@@ -244,8 +257,15 @@ public:
   void import_finish(CDir *dir);
 protected:
 
+  void handle_export_caps(MExportCaps *m);
+  void logged_import_caps(CInode *in, 
+                         int from,
+                         map<CInode*, map<int,Capability::Export> >& cap_imports);
+
+
   friend class C_MDS_ImportDirLoggedStart;
   friend class C_MDS_ImportDirLoggedFinish;
+  friend class C_M_LoggedImportCaps;
 
   // bystander
   void handle_export_notify(MExportDirNotify *m);
index 92b71888e28e3da1566c510a383082be315cc3b4..3a45b625f8a530cfa9ca370f80fab0ebd8409f94 100644 (file)
@@ -321,7 +321,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
        // mark client caps stale.
        inode_t fake_inode;
        fake_inode.ino = p->first;
-       MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_STALE,
+       MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_EXPORT,
                                                     fake_inode, 
                                                     0,
                                                     0,                // doesn't matter.
@@ -3262,8 +3262,8 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
        // finish cap imports
        finish_force_open_sessions(mdr->more()->imported_client_map);
        if (mdr->more()->cap_imports.count(destdn->inode))
-         mds->mdcache->migrator->finish_import_caps(destdn->inode, srcdn->authority().first, 
-                                                    mdr->more()->cap_imports[destdn->inode]);
+         mds->mdcache->migrator->finish_import_inode_caps(destdn->inode, srcdn->authority().first, 
+                                                          mdr->more()->cap_imports[destdn->inode]);
        
        // hack: fix auth bit
        destdn->inode->state_set(CInode::STATE_AUTH);
diff --git a/branches/sage/mds/mds/events/ESessions.h b/branches/sage/mds/mds/events/ESessions.h
new file mode 100644 (file)
index 0000000..0db175c
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_ESESSIONS_H
+#define __MDS_ESESSIONS_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+
+class ESessions : public LogEvent {
+protected:
+  version_t cmapv;  // client map version
+
+public:
+  map<int,entity_inst_t> client_map;
+
+  ESessions() : LogEvent(EVENT_SESSION) { }
+  ESessions(version_t v) :
+    LogEvent(EVENT_SESSION),
+    cmapv(v) {
+  }
+  
+  void encode_payload(bufferlist& bl) {
+    ::_encode(client_map, bl);
+    ::_encode(cmapv, bl);
+  }
+  void decode_payload(bufferlist& bl, int& off) {
+    ::_decode(client_map, bl, off);
+    ::_decode(cmapv, bl, off);
+  }
+
+
+  void print(ostream& out) {
+    out << "ESessions " << client_map.size() << " opens cmapv " << cmapv;
+  }
+  
+  void update_segment();
+  void replay(MDS *mds);  
+};
+
+#endif
index 94e7a451a1a7ab14cb376efeafdfeea9af89c234..2cf32d99ca3372baeb0bb77ae7b7a5a0d7f488b2 100644 (file)
@@ -15,6 +15,7 @@
 #include "events/EString.h"
 #include "events/ESubtreeMap.h"
 #include "events/ESession.h"
+#include "events/ESessions.h"
 
 #include "events/EMetaBlob.h"
 
@@ -771,6 +772,25 @@ void ESession::replay(MDS *mds)
   }
 }
 
+void ESessions::update_segment()
+{
+  _segment->clientmapv = cmapv;
+}
+
+void ESessions::replay(MDS *mds)
+{
+  if (mds->clientmap.get_version() >= cmapv) {
+    dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() 
+            << " >= " << cmapv << ", noop" << dendl;
+  } else {
+    dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() 
+            << " < " << cmapv << dendl;
+    mds->clientmap.open_sessions(client_map);
+    assert(mds->clientmap.get_version() == cmapv);
+    mds->clientmap.reset_projected(); // make it follow version.
+  }
+}
+
 
 
 // -----------------------
@@ -1061,6 +1081,7 @@ void EImportStart::replay(MDS *mds)
     ::_decode_simple(cm, blp);
     mds->clientmap.open_sessions(cm);
     assert(mds->clientmap.get_version() == cmapv);
+    mds->clientmap.reset_projected(); // make it follow version.
   }
 }
 
index 979be331e5ce88d3d52565ca410f201408944492..e6fe60b37343b5836d0ca9420ed91113f317f987 100644 (file)
@@ -23,15 +23,15 @@ class MClientFileCaps : public Message {
   static const int OP_GRANT   = 0;  // mds->client grant.
   static const int OP_ACK     = 1;  // client->mds ack (if prior grant was a recall)
   static const int OP_RELEASE = 2;  // mds closed the cap
-  static const int OP_STALE   = 3;  // mds has exported the cap
-  static const int OP_REAP    = 4;  // mds has imported the cap from get_mds()
+  static const int OP_EXPORT  = 3;  // mds has exported the cap
+  static const int OP_IMPORT  = 4;  // mds has imported the cap from get_mds()
   static const char* get_opname(int op) {
     switch (op) {
     case OP_GRANT: return "grant";
     case OP_ACK: return "ack";
     case OP_RELEASE: return "release";
-    case OP_STALE: return "stale";
-    case OP_REAP: return "reap";
+    case OP_EXPORT: return "export";
+    case OP_IMPORT: return "import";
     default: assert(0); return 0;
     }
   }
diff --git a/branches/sage/mds/messages/MExportCaps.h b/branches/sage/mds/messages/MExportCaps.h
new file mode 100644 (file)
index 0000000..f2057bf
--- /dev/null
@@ -0,0 +1,50 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MEXPORTCAPS_H
+#define __MEXPORTCAPS_H
+
+#include "msg/Message.h"
+
+
+class MExportCaps : public Message {
+ public:  
+  inodeno_t ino;
+  bufferlist cap_bl;
+  map<int,entity_inst_t> client_map;
+
+  MExportCaps() :
+    Message(MSG_MDS_EXPORTCAPS) {}
+
+  virtual char *get_type_name() { return "export_caps"; }
+  void print(ostream& o) {
+    o << "export_caps(" << ino << ")";
+  }
+
+  virtual void decode_payload() {
+    int off = 0;
+    ::_decode(ino, payload, off);
+    ::_decode(cap_bl, payload, off);
+    ::_decode(client_map, payload, off);
+  }
+  virtual void encode_payload() {
+    ::_encode(ino, payload);
+    ::_encode(cap_bl, payload);
+    ::_encode(client_map, payload);
+  }
+
+};
+
+#endif
diff --git a/branches/sage/mds/messages/MExportCapsAck.h b/branches/sage/mds/messages/MExportCapsAck.h
new file mode 100644 (file)
index 0000000..dd5e212
--- /dev/null
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MEXPORTCAPSACK_H
+#define __MEXPORTCAPSACK_H
+
+#include "msg/Message.h"
+
+
+class MExportCapsAck : public Message {
+ public:  
+  inodeno_t ino;
+
+  MExportCapsAck() :
+    Message(MSG_MDS_EXPORTCAPSACK) {}
+  MExportCapsAck(inodeno_t i) :
+    Message(MSG_MDS_EXPORTCAPSACK), ino(i) {}
+
+  virtual char *get_type_name() { return "export_caps_ack"; }
+  void print(ostream& o) {
+    o << "export_caps_ack(" << ino << ")";
+  }
+
+  virtual void decode_payload() {
+    int off = 0;
+    ::_decode(ino, payload, off);
+  }
+  virtual void encode_payload() {
+    ::_encode(ino, payload);
+  }
+
+};
+
+#endif
diff --git a/branches/sage/mds/messages/MExportStrays.h b/branches/sage/mds/messages/MExportStrays.h
new file mode 100644 (file)
index 0000000..be055c0
--- /dev/null
@@ -0,0 +1,43 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MEXPORTSTRAYS_H
+#define __MEXPORTSTRAYS_H
+
+#include "msg/Message.h"
+
+
+class MExportStrays : public Message {
+ public:  
+  bufferlist state;
+
+  MExportStrays() :
+    Message(MSG_MDS_EXPORTSTRAYS) {}
+
+  virtual char *get_type_name() { return "SEx"; }
+  void print(ostream& o) {
+    o << "export_strays";
+  }
+
+  virtual void decode_payload() {
+    state = payload;
+  }
+  virtual void encode_payload() {
+    payload = state;
+  }
+
+};
+
+#endif
index eab9a3506a40b7e70f4b285ac1265b296a37e0f6..e762760acf224083597325b154d26a75945c2f74 100644 (file)
 #include "msg/Message.h"
 
 #include "include/types.h"
+#include "include/encodable.h"
 
 class MMDSGetMap : public Message {
  public:
-  MMDSGetMap() : Message(MSG_MDS_GETMAP) {
-  }
+  epoch_t have;
+
+  MMDSGetMap(epoch_t h=0) : Message(MSG_MDS_GETMAP), have (h) { }
 
   char *get_type_name() { return "mdsgetmap"; }
   
   void encode_payload() {
-    //payload.append((char*)&sb, sizeof(sb));
+    ::_encode_simple(have, payload);
   }
   void decode_payload() {
-    //int off = 0;
-    //payload.copy(off, sizeof(sb), (char*)&sb);
-    //off += sizeof(sb);
+    bufferlist::iterator p = payload.begin();
+    ::_decode_simple(have, p);
   }
 };
 
index 645f029f6b2034778b0d29d8fe36c38401296655..24c5fc76e75efcf930021bef35d9300c24599e9d 100644 (file)
@@ -41,7 +41,7 @@
 
 // my methods
 
-void MDSMonitor::print_map(MDSMap &m)
+void MDSMonitor::print_map(MDSMap &m, int dbl)
 {
   dout(7) << "print_map epoch " << m.get_epoch() << " target_num " << m.target_num << dendl;
   entity_inst_t blank;
@@ -50,7 +50,7 @@ void MDSMonitor::print_map(MDSMap &m)
   for (set<int>::iterator p = all.begin();
        p != all.end();
        ++p) {
-    dout(7) << " mds" << *p << "." << m.mds_inc[*p]
+    dout(dbl) << " mds" << *p << "." << m.mds_inc[*p]
            << " : " << MDSMap::get_state_name(m.get_state(*p))
            << " : " << (m.have_inst(*p) ? m.get_inst(*p) : blank)
            << dendl;
@@ -88,8 +88,8 @@ bool MDSMonitor::update_from_paxos()
   mdsmap.decode(mdsmap_bl);
 
   // new map
-  dout(7) << "new map:" << dendl;
-  print_map(mdsmap);
+  dout(0) << "new map" << dendl;
+  print_map(mdsmap, 0);
 
   // bcast map to mds, waiters
   if (mon->is_leader())
@@ -128,7 +128,7 @@ bool MDSMonitor::preprocess_query(Message *m)
     return preprocess_beacon((MMDSBeacon*)m);
     
   case MSG_MDS_GETMAP:
-    send_full(m->get_source_inst());
+    handle_mds_getmap((MMDSGetMap*)m);
     return true;
 
   case MSG_MON_COMMAND:
@@ -141,6 +141,14 @@ bool MDSMonitor::preprocess_query(Message *m)
   }
 }
 
+void MDSMonitor::handle_mds_getmap(MMDSGetMap *m)
+{
+  if (m->have < mdsmap.get_epoch())
+    send_full(m->get_source_inst());
+  else
+    waiting_for_map.push_back(m->get_source_inst());
+}
+
 
 bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
 {
index 4c8fc91abcbf7a5c00ea93dc3318e3a31c92d327..49e8f680c7b411e5fa42b8d64a73fbb241cc495b 100644 (file)
@@ -27,6 +27,7 @@ using namespace std;
 #include "PaxosService.h"
 
 class MMDSBeacon;
+class MMDSGetMap;
 
 class MDSMonitor : public PaxosService {
  public:
@@ -37,7 +38,7 @@ class MDSMonitor : public PaxosService {
   MDSMap pending_mdsmap;  // current + pending updates
 
   // my helpers
-  void print_map(MDSMap &m);
+  void print_map(MDSMap &m, int dbl=7);
 
   class C_Updated : public Context {
     MDSMonitor *mm;
@@ -72,6 +73,7 @@ class MDSMonitor : public PaxosService {
   bool preprocess_beacon(class MMDSBeacon *m);
   bool handle_beacon(class MMDSBeacon *m);
   bool handle_command(class MMonCommand *m);
+  void handle_mds_getmap(MMDSGetMap *m);
 
   // beacons
   map<int, utime_t> last_beacon;
index e3c7ce827ac614740652b387ea6765c9d1b190e0..ebffe647c62f08553efb4c17d27f6d05b9b37eaf 100644 (file)
@@ -81,6 +81,9 @@ using namespace std;
 #include "messages/MExportDirNotifyAck.h"
 #include "messages/MExportDirFinish.h"
 
+#include "messages/MExportCaps.h"
+#include "messages/MExportCapsAck.h"
+
 #include "messages/MDentryUnlink.h"
 
 #include "messages/MHeartbeat.h"
@@ -311,6 +314,13 @@ decode_message(ceph_message_header& env, bufferlist& payload)
     m = new MExportDirWarningAck;
     break;
 
+    
+  case MSG_MDS_EXPORTCAPS:
+    m = new MExportCaps;
+    break;
+  case MSG_MDS_EXPORTCAPSACK:
+    m = new MExportCapsAck;
+    break;
 
 
   case MSG_MDS_DENTRYUNLINK:
index 9f0175e7a7d1e2f7a32547f5212fbda43f19edb8..b570722ad1888bc57d00bfe9f1ab695b68fb1db3 100644 (file)
 #define MSG_MDS_EXPORTDIRNOTIFYACK    159
 #define MSG_MDS_EXPORTDIRFINISH       160
 
+#define MSG_MDS_EXPORTSTRAY           161
+#define MSG_MDS_EXPORTSTRAYNOTIFY     162
+#define MSG_MDS_EXPORTSTRAYNOTIFYACK  163
+#define MSG_MDS_EXPORTSTRAYACK        164
+#define MSG_MDS_EXPORTSTRAYFINISH     165
+
+#define MSG_MDS_EXPORTCAPS            166
+#define MSG_MDS_EXPORTCAPSACK         167
+
 #define MSG_MDS_SLAVE_REQUEST         170
 
 #define MSG_MDS_DENTRYUNLINK      200