]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: send cap import messages to clients after importing subtree succeeds
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 26 Nov 2013 09:19:04 +0000 (17:19 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:25 +0000 (12:15 +0800)
When importing subtree, the importer sends cap import messages to clients
before the import subtree operation is considered as success. If the
exporter crashes before EExport event is journalled, the importer needs to
re-export client caps. This confuses clients, and makes them lose track of
auth caps.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/Capability.h
src/mds/Locker.cc
src/mds/Migrator.cc
src/mds/Migrator.h
src/mds/Server.cc
src/mds/Server.h
src/messages/MExportDirFinish.h

index c04a05285cf335750b20e9c07c062c414443a3a5..6e35ba815da98d9baeaf21ba72e4626212d05fe3 100644 (file)
@@ -217,7 +217,10 @@ private:
   ceph_seq_t mseq;
 
   int suppress;
-  bool stale;
+  unsigned state;
+
+  const static unsigned STATE_STALE            = (1<<0);
+  const static unsigned STATE_NEW              = (1<<1);
 
 public:
   snapid_t client_follows;
@@ -234,7 +237,7 @@ public:
     last_sent(0),
     last_issue(0),
     mseq(0),
-    suppress(0), stale(false),
+    suppress(0), state(0),
     client_follows(0), client_xattr_version(0),
     item_session_caps(this), item_snaprealm_caps(this) {
     g_num_cap++;
@@ -263,8 +266,12 @@ public:
   void inc_suppress() { suppress++; }
   void dec_suppress() { suppress--; }
 
-  bool is_stale() { return stale; }
-  void set_stale(bool b) { stale = b; }
+  bool is_stale() { return state & STATE_STALE; }
+  void mark_stale() { state |= STATE_STALE; }
+  void clear_stale() { state &= ~STATE_STALE; }
+  bool is_new() { return state & STATE_NEW; }
+  void mark_new() { state |= STATE_NEW; }
+  void clear_new() { state &= ~STATE_NEW; }
 
   CInode *get_inode() { return inode; }
   client_t get_client() { return client; }
index 7365a47bf1f352234059987b0734e9aae68389af..eb30055933245456ddf6247a1ef49d3db22ab188 100644 (file)
@@ -1852,7 +1852,7 @@ void Locker::revoke_stale_caps(Session *session)
 
   for (xlist<Capability*>::iterator p = session->caps.begin(); !p.end(); ++p) {
     Capability *cap = *p;
-    cap->set_stale(true);
+    cap->mark_stale();
     CInode *in = cap->get_inode();
     int issued = cap->issued();
     if (issued) {
@@ -1889,7 +1889,7 @@ void Locker::resume_stale_caps(Session *session)
     assert(in->is_head());
     if (cap->is_stale()) {
       dout(10) << " clearing stale flag on " << *in << dendl;
-      cap->set_stale(false);
+      cap->clear_stale();
       if (!in->is_auth() || !eval(in, CEPH_CAP_LOCKS))
        issue_caps(in, cap);
     }
index 30b037cc505ebc355f802ba2a41d092203b1629e..d2e97d234e63251dd0610c44fa8718c2f4034f90 100644 (file)
@@ -478,6 +478,12 @@ void Migrator::handle_mds_failure_or_stop(int who)
        }
        break;
        
+      case IMPORT_FINISHING:
+       assert(dir);
+       dout(10) << "import state=finishing : finishing import on " << *dir << dendl;
+       import_finish(dir, true);
+       break;
+
       case IMPORT_ABORTING:
        assert(dir);
        dout(10) << "import state=aborting : ignoring repeat failure " << *dir << dendl;
@@ -1582,8 +1588,16 @@ void Migrator::export_logged_finish(CDir *dir)
   assert (g_conf->mds_kill_export_at != 11);
   
   // no notifies to wait for?
-  if (stat.notify_ack_waiting.empty())
+  if (stat.notify_ack_waiting.empty()) {
     export_finish(dir);  // skip notify/notify_ack stage.
+  } else {
+    // notify peer to send cap import messages to clients
+    if (mds->mdsmap->is_clientreplay_or_active_or_stopping(stat.peer)) {
+      mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), false, stat.tid), stat.peer);
+    } else {
+      dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
+    }
+  }
 }
 
 /*
@@ -1662,9 +1676,9 @@ void Migrator::export_finish(CDir *dir)
 
   // send finish/commit to new auth
   if (mds->mdsmap->is_clientreplay_or_active_or_stopping(it->second.peer)) {
-    mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), it->second.tid), it->second.peer);
+    mds->send_message_mds(new MExportDirFinish(dir->dirfrag(), true, it->second.tid), it->second.peer);
   } else {
-    dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
+    dout(7) << "not sending MExportDirFinish last, dest has failed" << dendl;
   }
   assert(g_conf->mds_kill_export_at != 13);
   
@@ -2264,20 +2278,26 @@ void Migrator::import_reverse(CDir *dir)
     }
   }
 
-  // reexport caps
-  for (map<CInode*, map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
-       p != stat.peer_exports.end();
-       ++p) {
-    CInode *in = p->first;
-    dout(20) << " reexporting caps on " << *in << dendl;
-    /*
-     * bleh.. just export all caps for this inode.  the auth mds
-     * will pick them up during recovery.
-     */
-    bufferlist bl; // throw this away
-    map<client_t,entity_inst_t> exported_client_map;  // throw this away too
-    encode_export_inode_caps(in, bl, exported_client_map);
-    finish_export_inode_caps(in);
+  if (stat.state == IMPORT_ACKING) {
+    // remove imported caps
+    for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
+       p != stat.peer_exports.end();
+       ++p) {
+      CInode *in = p->first;
+      for (map<client_t,Capability::Export>::iterator q = p->second.begin();
+         q != p->second.end();
+         ++q) {
+       Capability *cap = in->get_client_cap(q->first);
+       if (cap->is_new())
+         in->remove_client_cap(q->first);
+      }
+    }
+    for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
+        p != stat.client_map.end();
+        ++p) {
+      Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
+      session->dec_importing();
+    }
   }
         
   // log our failure
@@ -2381,15 +2401,15 @@ void Migrator::import_logged_start(dirfrag_t df, CDir *dir, int from,
   assert (g_conf->mds_kill_import_at != 7);
 
   // force open client sessions and finish cap import
-  mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+  mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
+  it->second.client_map.swap(imported_client_map);
   
   map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
-
   for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
        p != it->second.peer_exports.end();
        ++p) {
-
-    finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]);
+    // parameter 'peer' is -1, delay sending cap import messages to client
+    finish_import_inode_caps(p->first, -1, true, p->second, imported_caps[p->first->ino()]);
   }
   
   // send notify's etc.
@@ -2412,24 +2432,62 @@ void Migrator::handle_export_finish(MExportDirFinish *m)
 {
   CDir *dir = cache->get_dirfrag(m->get_dirfrag());
   assert(dir);
-  dout(7) << "handle_export_finish on " << *dir << dendl;
+  dout(7) << "handle_export_finish on " << *dir << (m->is_last() ? " last" : "") << dendl;
 
   map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
   assert(it != import_state.end());
-  assert(it->second.state == IMPORT_ACKING);
   assert(it->second.tid == m->get_tid());
 
-  import_finish(dir, false);
+  import_finish(dir, false, m->is_last());
+
   m->put();
 }
 
-void Migrator::import_finish(CDir *dir, bool notify)
+void Migrator::import_finish(CDir *dir, bool notify, bool last)
 {
   dout(7) << "import_finish on " << *dir << dendl;
 
+  map<dirfrag_t,import_state_t>::iterator it = import_state.find(dir->dirfrag());
+  assert(it != import_state.end());
+  assert(it->second.state == IMPORT_ACKING || it->second.state == IMPORT_FINISHING);
+
   // log finish
   assert(g_conf->mds_kill_import_at != 9);
 
+  if (it->second.state == IMPORT_ACKING) {
+    for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
+       p != it->second.peer_exports.end();
+       ++p) {
+      CInode *in = p->first;
+      assert(in->is_auth());
+      for (map<client_t,Capability::Export>::iterator q = p->second.begin();
+         q != p->second.end();
+         ++q) {
+       Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+       assert(session);
+       Capability *cap = in->get_client_cap(q->first);
+       assert(cap);
+       cap->clear_new();
+       cap->merge(q->second, true);
+       mds->mdcache->do_cap_import(session, in, cap, q->second.cap_id, q->second.seq,
+                                   q->second.mseq - 1, it->second.peer, CEPH_CAP_FLAG_AUTH);
+      }
+      p->second.clear();
+    }
+    for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
+        p != it->second.client_map.end();
+        ++p) {
+      Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
+      session->dec_importing();
+    }
+  }
+
+  if (!last) {
+    assert(it->second.state == IMPORT_ACKING);
+    it->second.state = IMPORT_FINISHING;
+    return;
+  }
+
   // clear updated scatterlocks
   /*
   for (list<ScatterLock*>::iterator p = import_updated_scatterlocks[dir].begin();
@@ -2448,10 +2506,10 @@ void Migrator::import_finish(CDir *dir, bool notify)
   import_remove_pins(dir, bounds);
 
   map<CInode*, map<client_t,Capability::Export> > peer_exports;
-  import_state[dir->dirfrag()].peer_exports.swap(peer_exports);
+  it->second.peer_exports.swap(peer_exports);
 
   // clear import state (we're done!)
-  import_state.erase(dir->dirfrag());
+  import_state.erase(it);
 
   mds->mdlog->start_submit_entry(new EImportFinish(dir, true));
 
@@ -2582,6 +2640,8 @@ void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap,
     Capability *cap = in->get_client_cap(it->first);
     if (!cap) {
       cap = in->add_client_cap(it->first, session);
+      if (peer < 0)
+       cap->mark_new();
     }
 
     Capability::Import& im = import_map[it->first];
@@ -2589,10 +2649,12 @@ void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap,
     im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
     im.issue_seq = cap->get_last_seq() + 1;
 
-    cap->merge(it->second, auth_cap);
-    mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
-                               it->second.seq, it->second.mseq - 1, peer,
-                               auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
+    if (peer >= 0) {
+      cap->merge(it->second, auth_cap);
+      mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
+                                 it->second.seq, it->second.mseq - 1, peer,
+                                 auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
+    }
   }
 
   in->replica_caps_wanted = 0;
index 58dce1cb67a228dbdf494c0ece27692602a0354a..9007a85019df702e21e8242320ce52e72959cdcf 100644 (file)
@@ -113,7 +113,8 @@ public:
   const static int IMPORT_PREPPED       = 4; // opened bounds, waiting for import
   const static int IMPORT_LOGGINGSTART  = 5; // got import, logging EImportStart
   const static int IMPORT_ACKING        = 6; // logged EImportStart, sent ack, waiting for finish
-  const static int IMPORT_ABORTING      = 7; // notifying bystanders of an abort before unfreezing
+  const static int IMPORT_FINISHING     = 7; // sent cap imports, waiting for finish
+  const static int IMPORT_ABORTING      = 8; // notifying bystanders of an abort before unfreezing
   static const char *get_import_statename(int s) {
     switch (s) {
     case IMPORT_DISCOVERING: return "discovering";
@@ -122,6 +123,7 @@ public:
     case IMPORT_PREPPED: return "prepped";
     case IMPORT_LOGGINGSTART: return "loggingstart";
     case IMPORT_ACKING: return "acking";
+    case IMPORT_FINISHING: return "finishing";
     case IMPORT_ABORTING: return "aborting";
     default: assert(0); return 0;
     }
@@ -135,6 +137,7 @@ protected:
     set<int> bystanders;
     list<dirfrag_t> bound_ls;
     list<ScatterLock*> updated_scatterlocks;
+    map<client_t,entity_inst_t> client_map;
     map<CInode*, map<client_t,Capability::Export> > peer_exports;
   };
 
@@ -332,7 +335,7 @@ protected:
                           map<client_t,uint64_t>& sseqmap);
   void handle_export_finish(MExportDirFinish *m);
 public:
-  void import_finish(CDir *dir, bool notify);
+  void import_finish(CDir *dir, bool notify, bool last=true);
 protected:
 
   void handle_export_caps(MExportCaps *m);
index 8242d226ff57facc5922a31b4fcd634ac29fbc8e..a34b082e033f640a48bb3a0260e1e847713e320c 100644 (file)
@@ -372,7 +372,8 @@ version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
 }
 
 void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
-                                       map<client_t,uint64_t>& sseqmap)
+                                       map<client_t,uint64_t>& sseqmap,
+                                       bool dec_import)
 {
   /*
    * FIXME: need to carefully consider the race conditions between a
@@ -403,7 +404,8 @@ void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
       dout(10) << "force_open_sessions skipping already-open " << session->info.inst << dendl;
       assert(session->is_open() || session->is_stale());
     }
-    session->dec_importing();
+    if (dec_import)
+      session->dec_importing();
   }
   mds->sessionmap.version++;
 }
index 091d3d20e293a4f50cdb6905b412197e32cc206c..d7253f1f9da63e170354ecfbe7f529eff4c4d9c3 100644 (file)
@@ -78,7 +78,8 @@ public:
   version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm,
                                        map<client_t,uint64_t>& sseqmap);
   void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
-                                       map<client_t,uint64_t>& sseqmap);
+                                 map<client_t,uint64_t>& sseqmap,
+                                 bool dec_import=true);
   void flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather);
   void finish_flush_session(Session *session, version_t seq);
   void terminate_sessions();
index 66d6f27eae26c1b8e5505b0a10fc60ce0a0828ed..dd78dda538c8028e0431d406e5659fb427a5c888 100644 (file)
 
 class MExportDirFinish : public Message {
   dirfrag_t dirfrag;
+  bool last;
 
  public:
   dirfrag_t get_dirfrag() { return dirfrag; }
+  bool is_last() { return last; }
   
   MExportDirFinish() {}
-  MExportDirFinish(dirfrag_t df, uint64_t tid) :
-    Message(MSG_MDS_EXPORTDIRFINISH), dirfrag(df) {
+  MExportDirFinish(dirfrag_t df, bool l, uint64_t tid) :
+    Message(MSG_MDS_EXPORTDIRFINISH), dirfrag(df), last(l) {
     set_tid(tid);
   }
 private:
@@ -34,15 +36,17 @@ private:
 public:
   const char *get_type_name() const { return "ExFin"; }
   void print(ostream& o) const {
-    o << "export_finish(" << dirfrag << ")";
+    o << "export_finish(" << dirfrag << (last ? " last" : "") << ")";
   }
   
   void encode_payload(uint64_t features) {
     ::encode(dirfrag, payload);
+    ::encode(last, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(dirfrag, p);
+    ::decode(last, p);
   }
 
 };