]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
some cleanup to better force open client sessions on cap imports; import on rename...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 17 Oct 2007 20:59:49 +0000 (20:59 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 17 Oct 2007 20:59:49 +0000 (20:59 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1959 29311d96-e01e-0410-9327-a35deaab8ce9

14 files changed:
branches/sage/mds/Makefile
branches/sage/mds/client/SyntheticClient.cc
branches/sage/mds/mds/ClientMap.h
branches/sage/mds/mds/Locker.cc
branches/sage/mds/mds/MDS.cc
branches/sage/mds/mds/MDS.h
branches/sage/mds/mds/Migrator.cc
branches/sage/mds/mds/Migrator.h
branches/sage/mds/mds/Server.cc
branches/sage/mds/mds/Server.h
branches/sage/mds/mds/events/EImportStart.h
branches/sage/mds/mds/events/ESession.h
branches/sage/mds/mds/events/EUpdate.h
branches/sage/mds/mds/journal.cc

index f1dc7b3d4d60a7ac7d45301d03e8efe4a6be4859..fd0c3623f0a6a06b8f68909ada0945fb980a1d50 100644 (file)
@@ -16,7 +16,7 @@
 EXTRA_CFLAGS = #-I${HOME}/include -L${HOME}/lib
 EXTRA_CFLAGS += -g
 EXTRA_CFLAGS += -pg
-EXTRA_CFLAGS += -O3
+#EXTRA_CFLAGS += -O3
 
 # base
 CFLAGS = -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE ${EXTRA_CFLAGS}
index 1695631b8b8cb8035128da8f4b0f582d69bf105f..3df7f5cf734f522075ab45342915b2ec5ee02f0a 100644 (file)
@@ -118,6 +118,11 @@ void parse_syn_options(vector<char*>& args)
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
         syn_iargs.push_back( atoi(args[++i]) );
+      } else if (strcmp(args[i],"makefiles2") == 0) {
+        syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES2 );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
+        syn_iargs.push_back( atoi(args[++i]) );
       } else if (strcmp(args[i],"linktest") == 0) {
         syn_modes.push_back( SYNCLIENT_MODE_LINKTEST );
       } else if (strcmp(args[i],"createshared") == 0) {
@@ -1573,8 +1578,8 @@ int SyntheticClient::make_files(int num, int count, int priv, bool more)
       if (more) {
         client->lstat(d, &st);
         int fd = client->open(d, O_RDONLY);
-        client->unlink(d);
-        client->close(fd);
+        //client->unlink(d);
+        //client->close(fd);
       }
 
       if (time_to_stop()) return 0;
index c36e66d240a338d5ba7c5f7ebaa655efa8e1cccf..b16abdbef7b9150698f89d1c376b2e3798aac5ee 100644 (file)
@@ -53,6 +53,7 @@ public:
   version_t get_committing() { return committing; }
   version_t get_committed() { return committed; }
 
+  void set_version(version_t v) { version = v; }
   version_t inc_projected() { return ++projected; }
   void reset_projected() { projected = version; }
   void set_committing(version_t v) { committing = v; }
@@ -82,6 +83,7 @@ public:
   void add_opening(int c) { opening.insert(c); }
   bool is_closing(int c) { return closing.count(c); }
   void add_closing(int c) { closing.insert(c); }
+  void remove_closing(int c) { closing.erase(c); }
   bool have_session(int client) {
     return client_inst.count(client);
   }
@@ -97,6 +99,16 @@ public:
     client_inst.erase(client);
     version++;
   }
+  void noop() {
+    version++;
+  }
+  void open_sessions(map<int,entity_inst_t>& cm) {
+    for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+      client_inst[p->first] = p->second;
+      sessions.insert(p->first);
+    }
+    version++;
+  }
   
 private:
   // -- push sequence --
index 55f38cd799b5fab3bbe9bc32dc6471522a8817cc..f5503d1345f3189e368075201a7230b3b4a38721 100644 (file)
@@ -546,7 +546,7 @@ bool Locker::issue_caps(CInode *in)
       if (seq > 0 && 
           !it->second.is_suppress()) {
         dout(7) << "   sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << dendl;
-        mds->send_message_client_maybe_opening(new MClientFileCaps(MClientFileCaps::OP_GRANT,
+        mds->send_message_client(new MClientFileCaps(MClientFileCaps::OP_GRANT,
                                                     in->inode,
                                                     it->second.get_last_seq(),
                                                     it->second.pending(),
@@ -729,7 +729,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m)
     MClientFileCaps *r = new MClientFileCaps(MClientFileCaps::OP_RELEASE,
                                             in->inode, 
                                              0, 0, 0);
-    mds->send_message_client_maybe_open(r, m->get_source_inst());
+    mds->send_message_client(r, m->get_source_inst());
   }
 
   // merge in atime?
index 6fc8ef46d903921e6050dd00acc1edb45db974f6..bf9f41cfac8519045f0237ee1a7e456c25f99761 100644 (file)
@@ -286,44 +286,6 @@ void MDS::send_message_client(Message *m, entity_inst_t clientinst)
 }
 
 
-class C_MDS_SendMessageClientSession : public Context {
-  MDS *mds;
-  Message *msg;
-  entity_inst_t clientinst;
-public:
-  C_MDS_SendMessageClientSession(MDS *md, Message *ms, entity_inst_t& ci) :
-    mds(md), msg(ms), clientinst(ci) {}
-  void finish(int r) {
-    mds->clientmap.open_session(clientinst);
-    mds->send_message_client(msg, clientinst.name.num());
-  }
-};
-
-void MDS::send_message_client_maybe_opening(Message *m, int c)
-{
-  send_message_client_maybe_open(m, clientmap.get_inst(c));
-}
-
-void MDS::send_message_client_maybe_open(Message *m, entity_inst_t clientinst)
-{
-  // FIXME
-  //  _most_ ppl shoudl check for a client session, since migration may call this,
-  //  start opening, and then e.g. locker sends something else (through non-maybe_open 
-  //  version)
-  int client = clientinst.name.num();
-  if (!clientmap.have_session(client)) {
-    // no session!
-    dout(10) << "send_message_client opening session with " << clientinst << dendl;
-    clientmap.add_opening(client);
-    mdlog->submit_entry(new ESession(clientinst, true, clientmap.inc_projected()),
-                       new C_MDS_SendMessageClientSession(this, m, clientinst));
-  } else {
-    // we have a session.
-    send_message_client(m, clientinst);
-  }
-}
-
-
 
 int MDS::init(bool standby)
 {
index 4dcd73662dbe8252fa10ded12844a7711359062a..6d3f90dc465af2358955664afcda9401caa0b7e9 100644 (file)
@@ -222,8 +222,6 @@ class MDS : public Dispatcher {
 
   void send_message_client(Message *m, int client);
   void send_message_client(Message *m, entity_inst_t clientinst);
-  void send_message_client_maybe_opening(Message *m, int);
-  void send_message_client_maybe_open(Message *m, entity_inst_t clientinst);
 
 
   // start up, shutdown
index ac02938ddbe886962ce096d59bc557c451f1923b..6ef64436318ebc87a1f8c801d309f750988884ba 100644 (file)
@@ -20,6 +20,7 @@
 #include "Migrator.h"
 #include "Locker.h"
 #include "Migrator.h"
+#include "Server.h"
 
 #include "MDBalancer.h"
 #include "MDLog.h"
@@ -854,8 +855,7 @@ void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& fini
                                              it->second.get_last_seq(), 
                                              it->second.pending(),
                                              it->second.wanted());
-    entity_inst_t inst = mds->clientmap.get_inst(it->first);
-    mds->send_message_client_maybe_open(m, inst);
+    mds->send_message_client(m, it->first);
   }
   in->clear_client_caps();
 
@@ -1602,6 +1602,8 @@ void Migrator::handle_export_dir(MExportDir *m)
   bufferlist::iterator blp = m->get_dirstate().begin();
   ::_decode_simple(imported_client_map, blp);
 
+  mds->server->force_open_sessions(imported_client_map);
+
   int num_imported_inodes = 0;
   while (!blp.end()) {
     num_imported_inodes += 
@@ -1609,12 +1611,14 @@ void Migrator::handle_export_dir(MExportDir *m)
                        oldauth, 
                        dir,                 // import root
                        le,
-                       imported_client_map,
                        mds->mdlog->get_current_segment(),
                        import_updated_scatterlocks[dir]);
   }
   dout(10) << " " << m->get_bounds().size() << " imported bounds" << dendl;
   
+  // include imported sessions in EImportStart
+  le->client_map.claim(m->get_dirstate());
+
   // include bounds in EImportStart
   set<CDir*> import_bounds;
   cache->get_subtree_bounds(dir, import_bounds);
@@ -1875,7 +1879,6 @@ void Migrator::import_finish(CDir *dir)
 
 
 void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth,
-                                  map<int,entity_inst_t>& imported_client_map, 
                                   LogSegment *ls,
                                   list<ScatterLock*>& updated_scatterlocks)
 {  
@@ -1923,7 +1926,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o
   
   // adjust replica list
   //assert(!in->is_replica(oldauth));  // not true on failed export
-  in->add_replica( oldauth, CInode::EXPORT_NONCE );
+  in->add_replica(oldauth, CInode::EXPORT_NONCE);
   if (in->is_replica(mds->get_nodeid()))
     in->remove_replica(mds->get_nodeid());
   
@@ -1938,7 +1941,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int o
                                                 in->client_caps[*it].pending(),
                                                 in->client_caps[*it].wanted());
     caps->set_mds( oldauth ); // reap from whom?
-    mds->send_message_client_maybe_open(caps, imported_client_map[*it]);
+    mds->send_message_client(caps, *it);
   }
 }
 
@@ -1947,7 +1950,6 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
                                int oldauth,
                                CDir *import_root,
                                EImportStart *le,
-                               map<int,entity_inst_t>& imported_client_map,
                                LogSegment *ls,
                                list<ScatterLock*>& updated_scatterlocks)
 {
@@ -2043,7 +2045,7 @@ int Migrator::decode_import_dir(bufferlist::iterator& blp,
     }
     else if (icode == 'I') {
       // inode
-      decode_import_inode(dn, blp, oldauth, imported_client_map, ls, updated_scatterlocks);
+      decode_import_inode(dn, blp, oldauth, ls, updated_scatterlocks);
     }
     
     // add dentry to journal entry
index 07a8731868a928dcc635dc368c5ffa8bde6fe552..92f824e691c558cbbb711add2978d1122c5cebad 100644 (file)
@@ -223,14 +223,12 @@ public:
 
 public:
   void decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth, 
-                          map<int,entity_inst_t>& imported_client_map,
                           LogSegment *ls,
                           list<ScatterLock*>& updated_scatterlocks);
   int decode_import_dir(bufferlist::iterator& blp,
                        int oldauth,
                        CDir *import_root,
                        EImportStart *le, 
-                       map<int,entity_inst_t>& imported_client_map,
                        LogSegment *ls,
                        list<ScatterLock*>& updated_scatterlocks);
 
index 3be92948cf0b3f20f57699c6f5c4479efe3668c4..b60159269a6c953abd1b7bad7cab0b4169124364 100644 (file)
@@ -191,12 +191,15 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma
   if (open) {
     assert(mds->clientmap.is_opening(from));
     mds->clientmap.open_session(client_inst);
-  } else {
-    assert(mds->clientmap.is_closing(from));
+  } else if (mds->clientmap.is_closing(from)) {
     mds->clientmap.close_session(from);
     
     // purge completed requests from clientmap
     mds->clientmap.trim_completed_requests(from, 0);
+  } else {
+    // close must have been canceled (by an import?) ...
+    assert(!open);
+    mds->clientmap.noop();
   }
   
   assert(cmapv == mds->clientmap.get_version());
@@ -208,6 +211,28 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma
     mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst);
 }
 
+void Server::force_open_sessions(map<int,entity_inst_t>& cm)
+{
+  dout(10) << "force_open_sessions on " << cm.size() << " clients" << dendl;
+  version_t v = mds->clientmap.get_version();
+  for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+    if (mds->clientmap.is_closing(p->first)) {
+      dout(15) << "force_open_sessions canceling close on " << p->second << dendl;
+      mds->clientmap.remove_closing(p->first);
+      continue;
+    }
+    if (mds->clientmap.have_session(p->first)) {
+      dout(15) << "force_open_sessions have session " << p->second << dendl;
+      continue;
+    }
+
+    dout(10) << "force_open_sessions opening " << p->second << dendl;
+    mds->clientmap.open_session(p->second);
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second);
+   }
+  mds->clientmap.set_version(v+1);
+}
+
 
 void Server::terminate_sessions()
 {
@@ -2904,6 +2929,12 @@ void Server::handle_client_rename(MDRequest *mdr)
   
   _rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
 
+  if (!srcdn->is_auth() && srcdn->is_primary()) {
+    // importing inode; also journal imported client map
+    
+    // ** DER FIXME **
+  }
+
   // -- commit locally --
   C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
 
@@ -3048,7 +3079,7 @@ void Server::_rename_prepare(MDRequest *mdr,
        version_t siv;
        if (srcdn->is_auth())
          siv = srcdn->inode->get_projected_version();
-       else
+       else 
          siv = mdr->more()->inode_import_v;
        mdr->more()->pvmap[destdn] = destdn->pre_dirty(siv+1);
       }
@@ -3196,9 +3227,9 @@ void Server::_rename_apply(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDen
        map<int,entity_inst_t> imported_client_map;
        list<ScatterLock*> updated_scatterlocks;  // we clear_updated explicitly below
        ::_decode_simple(imported_client_map, blp);
+       force_open_sessions(imported_client_map);
        mdcache->migrator->decode_import_inode(destdn, blp, 
                                               srcdn->authority().first,
-                                              imported_client_map,
                                               mdr->ls,
                                               updated_scatterlocks);
        destdn->inode->dirlock.clear_updated();   
index 281fd13ca259330549a5faec22fde4402a60b167..6eda7251c4025c25d6ee8e932fc6f8297df505c2 100644 (file)
@@ -56,6 +56,7 @@ public:
 
   void handle_client_session(class MClientSession *m);
   void _session_logged(entity_inst_t ci, bool open, version_t cmapv);
+  void force_open_sessions(map<int,entity_inst_t> &cm);
   void terminate_sessions();
   void reconnect_clients();
   void handle_client_reconnect(class MClientReconnect *m);
index aa1902576542d4d1b8c9cdc7928a309f66ee2a83..5671e404298a4cd2cb6b897dd599b64ef9586438 100644 (file)
@@ -30,6 +30,8 @@ protected:
 
  public:
   EMetaBlob metablob;
+  bufferlist client_map;  // encoded map<int,entity_inst_t>
+  version_t cmapv;
 
   EImportStart(dirfrag_t di,
               list<dirfrag_t>& b) : LogEvent(EVENT_IMPORTSTART), 
@@ -44,12 +46,16 @@ protected:
     bl.append((char*)&base, sizeof(base));
     metablob._encode(bl);
     ::_encode(bounds, bl);
+    ::_encode(cmapv, bl);
+    ::_encode(client_map, bl);
   }
   void decode_payload(bufferlist& bl, int& off) {
     bl.copy(off, sizeof(base), (char*)&base);
     off += sizeof(base);
     metablob._decode(bl, off);
     ::_decode(bounds, bl, off);
+    ::_decode(cmapv, bl, off);
+    ::_decode(client_map, bl, off);
   }
   
   bool has_expired(MDS *mds);
index a8f9992486a180ac763947ce4ec52a7e24796df7..3aba5559aac1ccb217794d301c5a35063fdf97fc 100644 (file)
@@ -37,14 +37,14 @@ class ESession : public LogEvent {
   }
   
   void encode_payload(bufferlist& bl) {
-       ::_encode(client_inst, bl);
-       ::_encode(open, bl);
-       ::_encode(cmapv, bl);
+    ::_encode(client_inst, bl);
+    ::_encode(open, bl);
+    ::_encode(cmapv, bl);
   }
   void decode_payload(bufferlist& bl, int& off) {
-       ::_decode(client_inst, bl, off);
-       ::_decode(open, bl, off);
-       ::_decode(cmapv, bl, off);
+    ::_decode(client_inst, bl, off);
+    ::_decode(open, bl, off);
+    ::_decode(cmapv, bl, off);
   }
 
 
index de965429f9bdd5a7df47695b50fabdf86208be31..3939527cef41ccae7c32c9bb3410c2295d89dd95 100644 (file)
@@ -22,6 +22,7 @@ class EUpdate : public LogEvent {
 public:
   EMetaBlob metablob;
   string type;
+  bufferlist client_map;
 
   EUpdate() : LogEvent(EVENT_UPDATE) { }
   EUpdate(MDLog *mdlog, const char *s) : 
@@ -37,10 +38,12 @@ public:
   void encode_payload(bufferlist& bl) {
     ::_encode(type, bl);
     metablob._encode(bl);
+    ::_encode(client_map, bl);
   } 
   void decode_payload(bufferlist& bl, int& off) {
     ::_decode(type, bl, off);
     metablob._decode(bl, off);
+    ::_decode(client_map, bl, off);
   }
 
   void update_segment();
index 1f27cf713a0782de7aceb80dabd417831f3e5339..94e7a451a1a7ab14cb376efeafdfeea9af89c234 100644 (file)
@@ -1048,6 +1048,20 @@ void EImportStart::replay(MDS *mds)
 
   // put in ambiguous import list
   mds->mdcache->add_ambiguous_import(base, bounds);
+
+  // open client sessions?
+  if (mds->clientmap.get_version() >= cmapv) {
+    dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() 
+            << " >= " << cmapv << ", noop" << dendl;
+  } else {
+    dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() 
+            << " < " << cmapv << dendl;
+    map<int,entity_inst_t> cm;
+    bufferlist::iterator blp = client_map.begin();
+    ::_decode_simple(cm, blp);
+    mds->clientmap.open_sessions(cm);
+    assert(mds->clientmap.get_version() == cmapv);
+  }
 }
 
 // -----------------------