Cond cond;
if (waiting_for_session.count(mds) == 0) {
dout(10) << "opening session to mds" << mds << endl;
- messenger->send_message(new MClientSession(MClientSession::OP_OPEN),
+ messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN),
mdsmap->get_inst(mds), MDS_PORT_SERVER);
}
int from = m->get_source().num();
switch (m->op) {
- case MClientSession::OP_OPEN_ACK:
- mds_sessions.insert(from);
+ case MClientSession::OP_OPEN:
+ assert(mds_sessions.count(from) == 0);
+ mds_sessions[from] = 0;
break;
- case MClientSession::OP_CLOSE_ACK:
+ case MClientSession::OP_CLOSE:
mds_sessions.erase(from);
// FIXME: kick requests (hard) so that they are redirected. or fail.
break;
m->clear_payload(); // for if/when we send back to MDS
+ // note push seq increment
+ assert(mds_sessions.count(mds));
+ mds_sessions[mds]++;
+
// reap?
if (m->get_special() == MClientFileCaps::OP_REAP) {
int other = m->get_mds();
<< ", which we don't want caps for, releasing." << endl;
m->set_caps(0);
m->set_wanted(0);
- messenger->send_message(m, m->get_source_inst(), m->get_source_port());
+ messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
return;
}
in->file_wr_size = 0;
}
- messenger->send_message(m, m->get_source_inst(), m->get_source_port());
+ messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
}
}
// send session closes!
- for (set<int>::iterator p = mds_sessions.begin();
+ for (map<int,version_t>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
++p) {
- dout(2) << "sending client_session close to mds" << *p << endl;
- messenger->send_message(new MClientSession(MClientSession::OP_CLOSE),
- mdsmap->get_inst(*p), MDS_PORT_SERVER);
+ dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << endl;
+ messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE,
+ p->second),
+ mdsmap->get_inst(p->first), MDS_PORT_SERVER);
}
// send unmount!
MonMap *monmap;
// mds sessions
- set<int> mds_sessions;
+ map<int, version_t> mds_sessions; // mds -> push seq
map<int, list<Cond*> > waiting_for_session;
void handle_client_session(MClientSession *m);
}
private:
- // effects version
+ // affects version
hash_map<int,entity_inst_t> client_inst;
+
+ // does not affect version
set<int> sessions;
set<int> opening;
set<int> closing;
version++;
}
+private:
+ // -- push sequence --
+ hash_map<int,version_t> client_push_seq; // seq # for messages pushed to client.
+
+public:
+ version_t inc_push_seq(int client) {
+ return ++client_push_seq[client];
+ }
+ version_t get_push_seq(int client) {
+ return client_push_seq[client];
+ }
+
+
private:
// -- completed requests --
// client id -> tid -> result code
if (seq > 0 &&
!it->second.is_suppress()) {
dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << endl;
- mds->messenger->send_message(new MClientFileCaps(in->inode,
- it->second.get_last_seq(),
- it->second.pending(),
- it->second.wanted()),
- mds->clientmap.get_inst(it->first),
- 0, MDS_PORT_LOCKER);
+ mds->send_message_client(new MClientFileCaps(in->inode,
+ it->second.get_last_seq(),
+ it->second.pending(),
+ it->second.wanted()),
+ it->first);
}
}
}
MClientFileCaps *r = new MClientFileCaps(in->inode,
0, 0, 0,
MClientFileCaps::OP_RELEASE);
- mds->messenger->send_message(r, m->get_source_inst(), 0, MDS_PORT_LOCKER);
+ mds->send_message_client(r, client);
}
// merge in atime?
+void MDS::send_message_client(Message *m, int client)
+{
+ version_t seq = clientmap.inc_push_seq(client);
+ dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << endl;
+ messenger->send_message(m, clientmap.get_inst(client));
+}
+
int MDS::init(bool standby)
void send_message_mds(Message *m, int mds, int port=0, int fromport=0);
void forward_message_mds(Message *req, int mds, int port=0);
+ void send_message_client(Message *m, int client);
+
// start up, shutdown
int init(bool standby=false);
it->second.pending(),
it->second.wanted(),
MClientFileCaps::OP_STALE);
- mds->messenger->send_message(m, mds->clientmap.get_inst(it->first),
- 0, MDS_PORT_CACHE);
+ mds->send_message_client(m, it->first);
}
// relax locks?
in->client_caps[*it].wanted(),
MClientFileCaps::OP_REAP);
caps->set_mds( oldauth ); // reap from whom?
- mds->messenger->send_message(caps,
- mds->clientmap.get_inst(*it),
- 0, MDS_PORT_CACHE);
+ mds->send_message_client(caps, *it);
}
// filelock
{
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl;
int from = m->get_source().num();
- bool open = m->op == MClientSession::OP_OPEN;
+ bool open = m->op == MClientSession::OP_REQUEST_OPEN;
if (open) {
if (mds->clientmap.is_opening(from)) {
delete m;
return;
}
+ if (m->seq < mds->clientmap.get_push_seq(from)) {
+ dout(10) << "old push seq " << m->seq << " < " << mds->clientmap.get_push_seq(from)
+ << ", dropping" << endl;
+ delete m;
+ return;
+ }
+ assert(m->seq == mds->clientmap.get_push_seq(from));
+
mds->clientmap.add_closing(from);
}
// reply
if (open)
- mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst);
+ mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), client_inst);
else
- mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst);
+ mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst);
}
class MClientSession : public Message {
public:
- const static int OP_OPEN = 1;
- const static int OP_OPEN_ACK = 2;
- const static int OP_CLOSE = 3;
- const static int OP_CLOSE_ACK = 4;
+ const static int OP_REQUEST_OPEN = 1;
+ const static int OP_OPEN = 2;
+ const static int OP_REQUEST_CLOSE = 3;
+ const static int OP_CLOSE = 4;
static const char *get_opname(int o) {
switch (o) {
+ case OP_REQUEST_OPEN: return "request_open";
case OP_OPEN: return "open";
- case OP_OPEN_ACK: return "open_ack";
+ case OP_REQUEST_CLOSE: return "request_close";
case OP_CLOSE: return "close";
- case OP_CLOSE_ACK: return "close_ack";
default: assert(0);
}
}
- __int32_t op;
+ int32_t op;
+ version_t seq;
MClientSession() : Message(MSG_CLIENT_SESSION) { }
- MClientSession(int o) : Message(MSG_CLIENT_SESSION),
- op(o) { }
+ MClientSession(int o, version_t s=0) :
+ Message(MSG_CLIENT_SESSION),
+ op(o), seq(s) { }
char *get_type_name() { return "client_session"; }
void print(ostream& out) {
- out << "client_session(" << get_opname(op) << ")";
+ out << "client_session(" << get_opname(op);
+ if (seq) out << " seq " << seq;
+ out << ")";
}
void decode_payload() {
int off = 0;
::_decode(op, payload, off);
+ ::_decode(seq, payload, off);
}
void encode_payload() {
::_encode(op, payload);
+ ::_encode(seq, payload);
}
};