From 619afbb40c14585070af8ecdd57b1e2bf9764739 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 11 Jul 2007 14:37:37 +0000 Subject: [PATCH] * push seq number of mds to client messages, client session close attempts may fail git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1484 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/client/Client.cc | 26 ++++++++++++------- branches/sage/cephmds2/client/Client.h | 2 +- branches/sage/cephmds2/mds/ClientMap.h | 17 +++++++++++- branches/sage/cephmds2/mds/Locker.cc | 13 +++++----- branches/sage/cephmds2/mds/MDS.cc | 7 +++++ branches/sage/cephmds2/mds/MDS.h | 2 ++ branches/sage/cephmds2/mds/Migrator.cc | 7 ++--- branches/sage/cephmds2/mds/Server.cc | 14 +++++++--- .../sage/cephmds2/messages/MClientSession.h | 26 ++++++++++++------- 9 files changed, 77 insertions(+), 37 deletions(-) diff --git a/branches/sage/cephmds2/client/Client.cc b/branches/sage/cephmds2/client/Client.cc index d8ef4434d824c..8bf24c0142975 100644 --- a/branches/sage/cephmds2/client/Client.cc +++ b/branches/sage/cephmds2/client/Client.cc @@ -602,7 +602,7 @@ MClientReply *Client::make_request(MClientRequest *req, Cond cond; if (waiting_for_session.count(mds) == 0) { dout(10) << "opening session to mds" << mds << endl; - messenger->send_message(new MClientSession(MClientSession::OP_OPEN), + messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN), mdsmap->get_inst(mds), MDS_PORT_SERVER); } @@ -675,11 +675,12 @@ void Client::handle_client_session(MClientSession *m) int from = m->get_source().num(); switch (m->op) { - case MClientSession::OP_OPEN_ACK: - mds_sessions.insert(from); + case MClientSession::OP_OPEN: + assert(mds_sessions.count(from) == 0); + mds_sessions[from] = 0; break; - case MClientSession::OP_CLOSE_ACK: + case MClientSession::OP_CLOSE: mds_sessions.erase(from); // FIXME: kick requests (hard) so that they are redirected. or fail. break; @@ -991,6 +992,10 @@ void Client::handle_file_caps(MClientFileCaps *m) m->clear_payload(); // for if/when we send back to MDS + // note push seq increment + assert(mds_sessions.count(mds)); + mds_sessions[mds]++; + // reap? if (m->get_special() == MClientFileCaps::OP_REAP) { int other = m->get_mds(); @@ -1086,7 +1091,7 @@ void Client::handle_file_caps(MClientFileCaps *m) << ", which we don't want caps for, releasing." << endl; m->set_caps(0); m->set_wanted(0); - messenger->send_message(m, m->get_source_inst(), m->get_source_port()); + messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER); return; } @@ -1196,7 +1201,7 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in) in->file_wr_size = 0; } - messenger->send_message(m, m->get_source_inst(), m->get_source_port()); + messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER); } @@ -1349,12 +1354,13 @@ int Client::unmount() } // send session closes! - for (set::iterator p = mds_sessions.begin(); + for (map::iterator p = mds_sessions.begin(); p != mds_sessions.end(); ++p) { - dout(2) << "sending client_session close to mds" << *p << endl; - messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), - mdsmap->get_inst(*p), MDS_PORT_SERVER); + dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << endl; + messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE, + p->second), + mdsmap->get_inst(p->first), MDS_PORT_SERVER); } // send unmount! diff --git a/branches/sage/cephmds2/client/Client.h b/branches/sage/cephmds2/client/Client.h index 2289803ea9594..02601b21e7b2e 100644 --- a/branches/sage/cephmds2/client/Client.h +++ b/branches/sage/cephmds2/client/Client.h @@ -335,7 +335,7 @@ class Client : public Dispatcher { MonMap *monmap; // mds sessions - set mds_sessions; + map mds_sessions; // mds -> push seq map > waiting_for_session; void handle_client_session(MClientSession *m); diff --git a/branches/sage/cephmds2/mds/ClientMap.h b/branches/sage/cephmds2/mds/ClientMap.h index aa23da1b7f44d..dd291d9a7b1b3 100644 --- a/branches/sage/cephmds2/mds/ClientMap.h +++ b/branches/sage/cephmds2/mds/ClientMap.h @@ -63,8 +63,10 @@ public: } private: - // effects version + // affects version hash_map client_inst; + + // does not affect version set sessions; set opening; set closing; @@ -100,6 +102,19 @@ public: version++; } +private: + // -- push sequence -- + hash_map client_push_seq; // seq # for messages pushed to client. + +public: + version_t inc_push_seq(int client) { + return ++client_push_seq[client]; + } + version_t get_push_seq(int client) { + return client_push_seq[client]; + } + + private: // -- completed requests -- // client id -> tid -> result code diff --git a/branches/sage/cephmds2/mds/Locker.cc b/branches/sage/cephmds2/mds/Locker.cc index 551ebe05d44dc..88da6aa6beff4 100644 --- a/branches/sage/cephmds2/mds/Locker.cc +++ b/branches/sage/cephmds2/mds/Locker.cc @@ -539,12 +539,11 @@ bool Locker::issue_caps(CInode *in) if (seq > 0 && !it->second.is_suppress()) { dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << endl; - mds->messenger->send_message(new MClientFileCaps(in->inode, - it->second.get_last_seq(), - it->second.pending(), - it->second.wanted()), - mds->clientmap.get_inst(it->first), - 0, MDS_PORT_LOCKER); + mds->send_message_client(new MClientFileCaps(in->inode, + it->second.get_last_seq(), + it->second.pending(), + it->second.wanted()), + it->first); } } } @@ -688,7 +687,7 @@ void Locker::handle_client_file_caps(MClientFileCaps *m) MClientFileCaps *r = new MClientFileCaps(in->inode, 0, 0, 0, MClientFileCaps::OP_RELEASE); - mds->messenger->send_message(r, m->get_source_inst(), 0, MDS_PORT_LOCKER); + mds->send_message_client(r, client); } // merge in atime? diff --git a/branches/sage/cephmds2/mds/MDS.cc b/branches/sage/cephmds2/mds/MDS.cc index cd29b4cd524fd..5911f63dc4219 100644 --- a/branches/sage/cephmds2/mds/MDS.cc +++ b/branches/sage/cephmds2/mds/MDS.cc @@ -242,6 +242,13 @@ void MDS::forward_message_mds(Message *req, int mds, int port) +void MDS::send_message_client(Message *m, int client) +{ + version_t seq = clientmap.inc_push_seq(client); + dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << endl; + messenger->send_message(m, clientmap.get_inst(client)); +} + int MDS::init(bool standby) diff --git a/branches/sage/cephmds2/mds/MDS.h b/branches/sage/cephmds2/mds/MDS.h index 0ef140dc5953f..de9937bfff444 100644 --- a/branches/sage/cephmds2/mds/MDS.h +++ b/branches/sage/cephmds2/mds/MDS.h @@ -217,6 +217,8 @@ class MDS : public Dispatcher { void send_message_mds(Message *m, int mds, int port=0, int fromport=0); void forward_message_mds(Message *req, int mds, int port=0); + void send_message_client(Message *m, int client); + // start up, shutdown int init(bool standby=false); diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index d5188826e65d0..c724c95c0af0c 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -766,8 +766,7 @@ void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_au it->second.pending(), it->second.wanted(), MClientFileCaps::OP_STALE); - mds->messenger->send_message(m, mds->clientmap.get_inst(it->first), - 0, MDS_PORT_CACHE); + mds->send_message_client(m, it->first); } // relax locks? @@ -1804,9 +1803,7 @@ void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int ol in->client_caps[*it].wanted(), MClientFileCaps::OP_REAP); caps->set_mds( oldauth ); // reap from whom? - mds->messenger->send_message(caps, - mds->clientmap.get_inst(*it), - 0, MDS_PORT_CACHE); + mds->send_message_client(caps, *it); } // filelock diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index ef0522c8fa269..adc3838065087 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -116,7 +116,7 @@ void Server::handle_client_session(MClientSession *m) { dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl; int from = m->get_source().num(); - bool open = m->op == MClientSession::OP_OPEN; + bool open = m->op == MClientSession::OP_REQUEST_OPEN; if (open) { if (mds->clientmap.is_opening(from)) { @@ -131,6 +131,14 @@ void Server::handle_client_session(MClientSession *m) delete m; return; } + if (m->seq < mds->clientmap.get_push_seq(from)) { + dout(10) << "old push seq " << m->seq << " < " << mds->clientmap.get_push_seq(from) + << ", dropping" << endl; + delete m; + return; + } + assert(m->seq == mds->clientmap.get_push_seq(from)); + mds->clientmap.add_closing(from); } @@ -164,9 +172,9 @@ void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cma // reply if (open) - mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst); + mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), client_inst); else - mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst); + mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst); } diff --git a/branches/sage/cephmds2/messages/MClientSession.h b/branches/sage/cephmds2/messages/MClientSession.h index 90e069ecbc985..c84eadbccb117 100644 --- a/branches/sage/cephmds2/messages/MClientSession.h +++ b/branches/sage/cephmds2/messages/MClientSession.h @@ -19,37 +19,43 @@ class MClientSession : public Message { public: - const static int OP_OPEN = 1; - const static int OP_OPEN_ACK = 2; - const static int OP_CLOSE = 3; - const static int OP_CLOSE_ACK = 4; + const static int OP_REQUEST_OPEN = 1; + const static int OP_OPEN = 2; + const static int OP_REQUEST_CLOSE = 3; + const static int OP_CLOSE = 4; static const char *get_opname(int o) { switch (o) { + case OP_REQUEST_OPEN: return "request_open"; case OP_OPEN: return "open"; - case OP_OPEN_ACK: return "open_ack"; + case OP_REQUEST_CLOSE: return "request_close"; case OP_CLOSE: return "close"; - case OP_CLOSE_ACK: return "close_ack"; default: assert(0); } } - __int32_t op; + int32_t op; + version_t seq; MClientSession() : Message(MSG_CLIENT_SESSION) { } - MClientSession(int o) : Message(MSG_CLIENT_SESSION), - op(o) { } + MClientSession(int o, version_t s=0) : + Message(MSG_CLIENT_SESSION), + op(o), seq(s) { } char *get_type_name() { return "client_session"; } void print(ostream& out) { - out << "client_session(" << get_opname(op) << ")"; + out << "client_session(" << get_opname(op); + if (seq) out << " seq " << seq; + out << ")"; } void decode_payload() { int off = 0; ::_decode(op, payload, off); + ::_decode(seq, payload, off); } void encode_payload() { ::_encode(op, payload); + ::_encode(seq, payload); } }; -- 2.39.5