// fresh from new mds?
if (!in->caps.count(mds)) {
+ caps_by_mds[mds]++;
if (in->caps.empty()) in->get();
in->caps[mds].seq = m->get_seq();
in->caps[mds].caps = m->get_caps();
assert(in->caps.count(mds));
in->caps.erase(mds);
+ caps_by_mds[mds]--;
if (in->caps.empty()) in->put();
// delayed reap?
dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " release" << dendl;
assert(in->caps.count(mds));
in->caps.erase(mds);
+ caps_by_mds[mds]--;
for (map<int,InodeCap>::iterator p = in->caps.begin();
p != in->caps.end();
p++)
in->get();
}
+ if (in->caps.count(mds) == 0)
+ caps_by_mds[mds]++;
+
int new_caps = reply->get_file_caps();
assert(reply->get_file_caps_seq() >= in->caps[mds].seq);
in->caps[mds].caps = new_caps;
in->caps[mds].seq = reply->get_file_caps_seq();
-
+
// we shouldn't ever lose caps at this point.
// actually, we might...?
assert((old_caps & ~in->caps[mds].caps) == 0);
{
dout(0) << "ms_handle_remote_reset on " << addr << ", last " << last << dendl;
if (last.is_mds()) {
-
+ int mds = last.num();
+ dout(0) << "ms_handle_remote_reset on " << last << ", " << caps_by_mds[mds]
+ << " caps, kicking requests" << dendl;
+ mds_sessions.erase(mds); // "kill" session
+ // reopen if caps
+ if (caps_by_mds[mds] > 0) {
+ waiting_for_session[mds].size(); // make sure entry exists
+ messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_OPEN),
+ mdsmap->get_inst(mds));
+ }
+ // or requests
+ kick_requests(mds);
}
}
// mds sessions
map<int, version_t> mds_sessions; // mds -> push seq
map<int, list<Cond*> > waiting_for_session;
+ map<int, int> caps_by_mds;
list<Cond*> waiting_for_mdsmap;
void handle_client_session(MClientSession *m);