]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: flush session messages before exporting caps
authorYan, Zheng <zheng.z.yan@intel.com>
Fri, 25 Oct 2013 08:30:49 +0000 (16:30 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Mon, 16 Dec 2013 04:15:24 +0000 (12:15 +0800)
Following sequence of events can happen when exporting inodes:

- client sends open file request to mds.0
- mds.0 handles the request and sends inode stat back to the client
- mds.0 export the inode to mds.1
- mds.1 sends cap import message to the client
- mds.0 sends cap export message to the client
- client receives the cap import message from mds.1, but the client
  still doesn't have corresponding inode in the cache. So the client
  releases the imported caps.
- client receives the open file reply from mds.0
- client receives the cap export message from mds.0.

After the end of these events, the client doesn't have any cap for
the opened file.

To fix the message ordering issue, this patch introduces a new session
operation FLUSHMSG. Before exporting caps, we send a FLUSHMSG seesion
message to client and wait for the acknowledgment. When receiveing the
FLUSHMSG_ACK message from client, we are sure that clients have received
all messages sent previously.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/common/ceph_strings.cc
src/include/ceph_fs.h
src/mds/Migrator.cc
src/mds/Migrator.h
src/mds/Server.cc
src/mds/Server.h
src/mds/SessionMap.h

index 3eb5e672d4070e899d14b56178ff818c55696181..c6721d3358ed5140c3d669481bb795efd118cc61 100644 (file)
@@ -151,6 +151,8 @@ const char *ceph_session_op_name(int op)
        case CEPH_SESSION_RENEWCAPS: return "renewcaps";
        case CEPH_SESSION_STALE: return "stale";
        case CEPH_SESSION_RECALL_STATE: return "recall_state";
+       case CEPH_SESSION_FLUSHMSG: return "flushmsg";
+       case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
        }
        return "???";
 }
index 47ec1f14f6e7d0a4e6400a5fb9eb946a19524584..f1f286faf71d33f36217acbb87b9b1b08aa23250 100644 (file)
@@ -283,6 +283,8 @@ enum {
        CEPH_SESSION_RENEWCAPS,
        CEPH_SESSION_STALE,
        CEPH_SESSION_RECALL_STATE,
+       CEPH_SESSION_FLUSHMSG,
+       CEPH_SESSION_FLUSHMSG_ACK,
 };
 
 extern const char *ceph_session_op_name(int op);
index 7ca38fef6215919a56917ef3c8495dc958bcbaf7..8cbb07edf42293ec68868b8ab7e5599ddc9e9d3b 100644 (file)
@@ -778,6 +778,36 @@ void Migrator::handle_export_discover_ack(MExportDirDiscoverAck *m)
   m->put();  // done
 }
 
+class C_M_ExportSessionsFlushed : public Context {
+  Migrator *migrator;
+  CDir *dir;
+  uint64_t tid;
+public:
+  C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
+    migrator(m), dir(d), tid(t) {}
+  void finish(int r) {
+    migrator->export_sessions_flushed(dir, tid);
+  }
+};
+
+void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
+{
+  dout(7) << "export_sessions_flushed " << *dir << dendl;
+
+  map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+  if (it == export_state.end() || it->second.tid != tid) {
+    // export must have aborted.
+    dout(7) << "export must have aborted on " << dir << dendl;
+    return;
+  }
+
+  assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
+  assert(it->second.warning_ack_waiting.count(-1) > 0);
+  it->second.warning_ack_waiting.erase(-1);
+  if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
+    export_go(dir);     // start export.
+}
+
 void Migrator::export_frozen(CDir *dir)
 {
   dout(7) << "export_frozen on " << *dir << dendl;
@@ -913,6 +943,60 @@ void Migrator::export_frozen(CDir *dir)
   it->second.state = EXPORT_PREPPING;
   mds->send_message_mds(prep, it->second.peer);
   assert (g_conf->mds_kill_export_at != 4);
+
+  // make sure any new instantiations of caps are flushed out
+  assert(it->second.warning_ack_waiting.empty());
+
+  set<client_t> export_client_set;
+  get_export_client_set(dir, export_client_set);
+
+  C_GatherBuilder gather(g_ceph_context);
+  mds->server->flush_client_sessions(export_client_set, gather);
+  if (gather.has_subs()) {
+    it->second.warning_ack_waiting.insert(-1);
+    gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
+    gather.activate();
+  }
+}
+
+void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
+{
+  list<CDir*> dfs;
+  dfs.push_back(dir);
+  while (!dfs.empty()) {
+    CDir *dir = dfs.front();
+    dfs.pop_front();
+    for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
+      CDentry *dn = p->second;
+      if (!dn->get_linkage()->is_primary())
+       continue;
+      CInode *in = dn->get_linkage()->get_inode();
+      if (in->is_dir()) {
+       // directory?
+       list<CDir*> ls;
+       in->get_dirfrags(ls);
+       for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
+         if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
+           // include nested dirfrag
+           assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
+           dfs.push_back(*q); // it's ours, recurse (later)
+         }
+       }
+      }
+      for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
+          q != in->client_caps.end();
+          ++q)
+       client_set.insert(q->first);
+    }
+  }
+}
+
+void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
+{
+  for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
+      q != in->client_caps.end();
+      ++q)
+    client_set.insert(q->first);
 }
 
 /* This function DOES put the passed message before returning*/
@@ -939,7 +1023,9 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
   set<CDir*> bounds;
   cache->get_subtree_bounds(dir, bounds);
 
-  assert(it->second.warning_ack_waiting.empty());
+  assert(it->second.warning_ack_waiting.empty() ||
+         (it->second.warning_ack_waiting.size() == 1 &&
+         it->second.warning_ack_waiting.count(-1) > 0));
   assert(it->second.notify_ack_waiting.empty());
 
   for (map<int,unsigned>::iterator p = dir->replicas_begin();
index c616543a5e12184214ec613cdff9e2b1994dfdfd..82132ef60d16fdfd0678ca7cb041eaa7d6908c3d 100644 (file)
@@ -230,6 +230,8 @@ public:
   }
   
   void get_export_lock_set(CDir *dir, set<SimpleLock*>& locks);
+  void get_export_client_set(CDir *dir, set<client_t> &client_set);
+  void get_export_client_set(CInode *in, set<client_t> &client_set);
 
   void encode_export_inode(CInode *in, bufferlist& bl, 
                           map<client_t,entity_inst_t>& exported_client_map);
@@ -257,6 +259,7 @@ public:
   void handle_export_discover_ack(MExportDirDiscoverAck *m);
   void export_frozen(CDir *dir);
   void handle_export_prep_ack(MExportDirPrepAck *m);
+  void export_sessions_flushed(CDir *dir, uint64_t tid);
   void export_go(CDir *dir);
   void export_go_synced(CDir *dir, uint64_t tid);
   void export_try_cancel(CDir *dir);
@@ -279,6 +282,7 @@ public:
   friend class C_MDC_ExportFreeze;
   friend class C_MDS_ExportFinishLogged;
   friend class C_M_ExportGo;
+  friend class C_M_ExportSessionsFlushed;
 
   // importer
   void handle_export_discover(MExportDirDiscover *m);
index 193cddb007c4f17c62e85073f8bd932446adc80e..c9266a5dfa45edf1b9a79c669440fb4a73f25aa7 100644 (file)
@@ -256,12 +256,35 @@ void Server::handle_client_session(MClientSession *m)
     }
     break;
 
+  case CEPH_SESSION_FLUSHMSG_ACK:
+    finish_flush_session(session, m->get_seq());
+    break;
+
   default:
     assert(0);
   }
   m->put();
 }
 
+void Server::flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather)
+{
+  for (set<client_t>::iterator p = client_set.begin(); p != client_set.end(); ++p) {
+    Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
+    assert(session);
+    if (session->is_stale())
+      continue;
+    version_t seq = session->wait_for_flush(gather.new_sub());
+    mds->send_message_client(new MClientSession(CEPH_SESSION_FLUSHMSG, seq), session);
+  }
+}
+
+void Server::finish_flush_session(Session *session, version_t seq)
+{
+  list<Context*> finished;
+  session->finish_flush(seq, finished);
+  mds->queue_waiters(finished);
+}
+
 void Server::_session_logged(Session *session, uint64_t state_seq, bool open, version_t pv,
                             interval_set<inodeno_t>& inos, version_t piv)
 {
@@ -439,6 +462,7 @@ void Server::find_idle_sessions()
     mds->locker->revoke_stale_caps(session);
     mds->locker->remove_stale_leases(session);
     mds->send_message_client(new MClientSession(CEPH_SESSION_STALE, session->get_push_seq()), session);
+    finish_flush_session(session, session->get_push_seq());
   }
 
   // autoclose
@@ -523,6 +547,8 @@ void Server::journal_close_session(Session *session, int state)
     ++p;
     mdcache->request_kill(mdr);
   }
+
+  finish_flush_session(session, session->get_push_seq());
 }
 
 void Server::reconnect_clients()
@@ -6387,6 +6413,20 @@ public:
   }
 };
 
+class C_MDS_SlaveRenameSessionsFlushed : public Context {
+  Server *server;
+  MDRequest *mdr;
+public:
+  C_MDS_SlaveRenameSessionsFlushed(Server *s, MDRequest *r) :
+    server(s), mdr(r) {
+      mdr->get();
+    }
+  void finish(int r) {
+    server->_slave_rename_sessions_flushed(mdr);
+    mdr->put();
+  }
+};
+
 /* This function DOES put the mdr->slave_request before returning*/
 void Server::handle_slave_rename_prep(MDRequest *mdr)
 {
@@ -6487,6 +6527,18 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
        mds->send_message_mds(notify, *p);
        mdr->more()->waiting_on_slave.insert(*p);
       }
+
+      // make sure clients have received all cap related messages
+      set<client_t> export_client_set;
+      mdcache->migrator->get_export_client_set(srcdnl->get_inode(), export_client_set);
+
+      C_GatherBuilder gather(g_ceph_context);
+      flush_client_sessions(export_client_set, gather);
+      if (gather.has_subs()) {
+       mdr->more()->waiting_on_slave.insert(-1);
+       gather.set_finisher(new C_MDS_SlaveRenameSessionsFlushed(this, mdr));
+       gather.activate();
+      }
     }
 
     // is witness list sufficient?
@@ -7101,7 +7153,21 @@ void Server::handle_slave_rename_notify_ack(MDRequest *mdr, MMDSSlaveRequest *ac
   }
 }
 
+void Server::_slave_rename_sessions_flushed(MDRequest *mdr)
+{
+  dout(10) << "_slave_rename_sessions_flushed " << *mdr << dendl;
 
+  if (mdr->more()->waiting_on_slave.count(-1)) {
+    mdr->more()->waiting_on_slave.erase(-1);
+
+    if (mdr->more()->waiting_on_slave.empty()) {
+      if (mdr->slave_request)
+       dispatch_slave_request(mdr);
+    } else
+      dout(10) << " still waiting for rename notify acks from "
+       << mdr->more()->waiting_on_slave << dendl;
+  }
+}
 
 // snaps
 /* This function takes responsibility for the passed mdr*/
index cf1ef5b85b5f52c2f4062260bb86fa7d8cc8ee14..091d3d20e293a4f50cdb6905b412197e32cc206c 100644 (file)
@@ -79,6 +79,8 @@ public:
                                        map<client_t,uint64_t>& sseqmap);
   void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
                                        map<client_t,uint64_t>& sseqmap);
+  void flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather);
+  void finish_flush_session(Session *session, version_t seq);
   void terminate_sessions();
   void find_idle_sessions();
   void kill_session(Session *session);
@@ -238,6 +240,7 @@ public:
   void handle_slave_rename_prep(MDRequest *mdr);
   void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
   void handle_slave_rename_notify_ack(MDRequest *mdr, MMDSSlaveRequest *m);
+  void _slave_rename_sessions_flushed(MDRequest *mdr);
   void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
   void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
   void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, bool finish_mdr=false);
index e960ac5a4f938e45793938de87b05347bb039855..ebb1ac5d7817988ac6cb65b434be4cc5a9f250c5 100644 (file)
@@ -144,6 +144,7 @@ public:
   // -- caps --
 private:
   version_t cap_push_seq;        // cap push seq #
+  map<version_t, list<Context*> > waitfor_flush; // flush session messages
 public:
   xlist<Capability*> caps;     // inodes with caps; front=most recently used
   xlist<ClientLease*> leases;  // metadata leases to clients
@@ -153,6 +154,19 @@ public:
   version_t inc_push_seq() { return ++cap_push_seq; }
   version_t get_push_seq() const { return cap_push_seq; }
 
+  version_t wait_for_flush(Context* c) {
+    waitfor_flush[get_push_seq()].push_back(c);
+    return get_push_seq();
+  }
+  void finish_flush(version_t seq, list<Context*>& ls) {
+    while (!waitfor_flush.empty()) {
+      if (waitfor_flush.begin()->first > seq)
+       break;
+      ls.splice(ls.end(), waitfor_flush.begin()->second);
+      waitfor_flush.erase(waitfor_flush.begin());
+    }
+  }
+
   void add_cap(Capability *cap) {
     caps.push_back(&cap->item_session_caps);
   }