]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* push seq number of mds to client messages, client session close attempts may fail
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 11 Jul 2007 14:37:37 +0000 (14:37 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 11 Jul 2007 14:37:37 +0000 (14:37 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1484 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/cephmds2/client/Client.cc
branches/sage/cephmds2/client/Client.h
branches/sage/cephmds2/mds/ClientMap.h
branches/sage/cephmds2/mds/Locker.cc
branches/sage/cephmds2/mds/MDS.cc
branches/sage/cephmds2/mds/MDS.h
branches/sage/cephmds2/mds/Migrator.cc
branches/sage/cephmds2/mds/Server.cc
branches/sage/cephmds2/messages/MClientSession.h

index d8ef4434d824c3c207ca6199a81af864a0fa9c6e..8bf24c014297540b124ac527415c60d846598e91 100644 (file)
@@ -602,7 +602,7 @@ MClientReply *Client::make_request(MClientRequest *req,
       Cond cond;
       if (waiting_for_session.count(mds) == 0) {
        dout(10) << "opening session to mds" << mds << endl;
-       messenger->send_message(new MClientSession(MClientSession::OP_OPEN),
+       messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN),
                                mdsmap->get_inst(mds), MDS_PORT_SERVER);
       }
       
@@ -675,11 +675,12 @@ void Client::handle_client_session(MClientSession *m)
   int from = m->get_source().num();
 
   switch (m->op) {
-  case MClientSession::OP_OPEN_ACK:
-    mds_sessions.insert(from);
+  case MClientSession::OP_OPEN:
+    assert(mds_sessions.count(from) == 0);
+    mds_sessions[from] = 0;
     break;
 
-  case MClientSession::OP_CLOSE_ACK:
+  case MClientSession::OP_CLOSE:
     mds_sessions.erase(from);
     // FIXME: kick requests (hard) so that they are redirected.  or fail.
     break;
@@ -991,6 +992,10 @@ void Client::handle_file_caps(MClientFileCaps *m)
 
   m->clear_payload();  // for if/when we send back to MDS
 
+  // note push seq increment
+  assert(mds_sessions.count(mds));
+  mds_sessions[mds]++;
+
   // reap?
   if (m->get_special() == MClientFileCaps::OP_REAP) {
     int other = m->get_mds();
@@ -1086,7 +1091,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
             << ", which we don't want caps for, releasing." << endl;
     m->set_caps(0);
     m->set_wanted(0);
-    messenger->send_message(m, m->get_source_inst(), m->get_source_port());
+    messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
     return;
   }
 
@@ -1196,7 +1201,7 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in)
     in->file_wr_size = 0;
   }
 
-  messenger->send_message(m, m->get_source_inst(), m->get_source_port());
+  messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
 }
 
 
@@ -1349,12 +1354,13 @@ int Client::unmount()
   }
   
   // send session closes!
-  for (set<int>::iterator p = mds_sessions.begin();
+  for (map<int,version_t>::iterator p = mds_sessions.begin();
        p != mds_sessions.end();
        ++p) {
-    dout(2) << "sending client_session close to mds" << *p << endl;
-    messenger->send_message(new MClientSession(MClientSession::OP_CLOSE),
-                           mdsmap->get_inst(*p), MDS_PORT_SERVER);
+    dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << endl;
+    messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE,
+                                              p->second),
+                           mdsmap->get_inst(p->first), MDS_PORT_SERVER);
   }
 
   // send unmount!
index 2289803ea9594455dbb45921c0a9c5654185b5e9..02601b21e7b2ed9459654aa17ef5f78b24ee7a6c 100644 (file)
@@ -335,7 +335,7 @@ class Client : public Dispatcher {
   MonMap *monmap;
   
   // mds sessions
-  set<int> mds_sessions;
+  map<int, version_t> mds_sessions;  // mds -> push seq
   map<int, list<Cond*> > waiting_for_session;
 
   void handle_client_session(MClientSession *m);
index aa23da1b7f44df2fa88e1673a7eba02e6f784fee..dd291d9a7b1b31466ca0f6d1be5669ccb4ca0308 100644 (file)
@@ -63,8 +63,10 @@ public:
   }
 
 private:
-  // effects version
+  // affects version
   hash_map<int,entity_inst_t> client_inst;
+
+  // does not affect version
   set<int> sessions;
   set<int> opening;
   set<int> closing;
@@ -100,6 +102,19 @@ public:
     version++;
   }
   
+private:
+  // -- push sequence --
+  hash_map<int,version_t> client_push_seq; // seq # for messages pushed to client.
+
+public:
+  version_t inc_push_seq(int client) {
+    return ++client_push_seq[client];
+  }
+  version_t get_push_seq(int client) {
+    return client_push_seq[client];
+  }
+
+
 private:
   // -- completed requests --
   // client id -> tid -> result code
index 551ebe05d44dc057fb53b368c78032856a5acf15..88da6aa6beff49d0eed26a55a666342981a6211f 100644 (file)
@@ -539,12 +539,11 @@ bool Locker::issue_caps(CInode *in)
       if (seq > 0 && 
           !it->second.is_suppress()) {
         dout(7) << "   sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << endl;
-        mds->messenger->send_message(new MClientFileCaps(in->inode,
-                                                         it->second.get_last_seq(),
-                                                         it->second.pending(),
-                                                         it->second.wanted()),
-                                     mds->clientmap.get_inst(it->first), 
-                                    0, MDS_PORT_LOCKER);
+        mds->send_message_client(new MClientFileCaps(in->inode,
+                                                    it->second.get_last_seq(),
+                                                    it->second.pending(),
+                                                    it->second.wanted()),
+                                it->first);
       }
     }
   }
@@ -688,7 +687,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m)
     MClientFileCaps *r = new MClientFileCaps(in->inode, 
                                              0, 0, 0,
                                              MClientFileCaps::OP_RELEASE);
-    mds->messenger->send_message(r, m->get_source_inst(), 0, MDS_PORT_LOCKER);
+    mds->send_message_client(r, client);
   }
 
   // merge in atime?
index cd29b4cd524fd2a52d0c4ac9aa7fe9f0fa35dcce..5911f63dc4219e2d16f29cdd90a2526f266409fe 100644 (file)
@@ -242,6 +242,13 @@ void MDS::forward_message_mds(Message *req, int mds, int port)
 
 
 
+void MDS::send_message_client(Message *m, int client)
+{
+  version_t seq = clientmap.inc_push_seq(client);
+  dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << endl;
+  messenger->send_message(m, clientmap.get_inst(client));
+}
+
 
 
 int MDS::init(bool standby)
index 0ef140dc5953f3a436de344c3dcfe7dc74e7e49b..de9937bfff444d000409e53f139050cff090a39f 100644 (file)
@@ -217,6 +217,8 @@ class MDS : public Dispatcher {
   void send_message_mds(Message *m, int mds, int port=0, int fromport=0);
   void forward_message_mds(Message *req, int mds, int port=0);
 
+  void send_message_client(Message *m, int client);
+
 
   // start up, shutdown
   int init(bool standby=false);
index d5188826e65d0cc29999e3fb5a970ffcda87bcc3..c724c95c0af0c589f30f98240c03faca8a927b31 100644 (file)
@@ -766,8 +766,7 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au
                                              it->second.pending(),
                                              it->second.wanted(),
                                              MClientFileCaps::OP_STALE);
-    mds->messenger->send_message(m, mds->clientmap.get_inst(it->first),
-                                0, MDS_PORT_CACHE);
+    mds->send_message_client(m, it->first);
   }
 
   // relax locks?
@@ -1804,9 +1803,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol
                                                 in->client_caps[*it].wanted(),
                                                 MClientFileCaps::OP_REAP);
     caps->set_mds( oldauth ); // reap from whom?
-    mds->messenger->send_message(caps, 
-                                mds->clientmap.get_inst(*it),
-                                0, MDS_PORT_CACHE);
+    mds->send_message_client(caps, *it);
   }
 
   // filelock
index ef0522c8fa26985c72a04996c70ec7c6cad7d006..adc3838065087bfa3f2374888e001c4fa0d0c682 100644 (file)
@@ -116,7 +116,7 @@ void Server::handle_client_session(MClientSession *m)
 {
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl;
   int from = m->get_source().num();
-  bool open = m->op == MClientSession::OP_OPEN;
+  bool open = m->op == MClientSession::OP_REQUEST_OPEN;
 
   if (open) {
     if (mds->clientmap.is_opening(from)) {
@@ -131,6 +131,14 @@ void Server::handle_client_session(MClientSession *m)
       delete m;
       return;
     }
+    if (m->seq < mds->clientmap.get_push_seq(from)) {
+      dout(10) << "old push seq " << m->seq << " < " << mds->clientmap.get_push_seq(from) 
+              << ", dropping" << endl;
+      delete m;
+      return;
+    }
+    assert(m->seq == mds->clientmap.get_push_seq(from));
+
     mds->clientmap.add_closing(from);
   }
 
@@ -164,9 +172,9 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma
   
   // reply
   if (open) 
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst);
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), client_inst);
   else
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst);
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst);
 }
 
 
index 90e069ecbc9850a48b6573f0dd25bb1b9d9f77bd..c84eadbccb1175db52dac86361cb22df282fcea6 100644 (file)
 
 class MClientSession : public Message {
 public:
-  const static int OP_OPEN      = 1;
-  const static int OP_OPEN_ACK  = 2;
-  const static int OP_CLOSE     = 3;
-  const static int OP_CLOSE_ACK = 4;
+  const static int OP_REQUEST_OPEN  = 1;
+  const static int OP_OPEN          = 2;
+  const static int OP_REQUEST_CLOSE = 3;
+  const static int OP_CLOSE         = 4;
   static const char *get_opname(int o) {
     switch (o) {
+    case OP_REQUEST_OPEN: return "request_open";
     case OP_OPEN: return "open";
-    case OP_OPEN_ACK: return "open_ack";
+    case OP_REQUEST_CLOSE: return "request_close";
     case OP_CLOSE: return "close";
-    case OP_CLOSE_ACK: return "close_ack";
     default: assert(0);
     }
   }
 
-  __int32_t op;
+  int32_t op;
+  version_t seq;
 
   MClientSession() : Message(MSG_CLIENT_SESSION) { }
-  MClientSession(int o) : Message(MSG_CLIENT_SESSION),
-                         op(o) { }
+  MClientSession(int o, version_t s=0) : 
+    Message(MSG_CLIENT_SESSION),
+    op(o), seq(s) { }
 
   char *get_type_name() { return "client_session"; }
   void print(ostream& out) {
-    out << "client_session(" << get_opname(op) << ")";
+    out << "client_session(" << get_opname(op);
+    if (seq) out << " seq " << seq;
+    out << ")";
   }
 
   void decode_payload() { 
     int off = 0;
     ::_decode(op, payload, off);
+    ::_decode(seq, payload, off);
   }
   void encode_payload() { 
     ::_encode(op, payload);
+    ::_encode(seq, payload);
   }
 };