bool MDSRankDispatcher::ms_dispatch(Message *m)
{
- bool ret;
+ if (m->get_source().is_client()) {
+ Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+ if (session)
+ session->last_seen = Session::clock::now();
+ }
+
inc_dispatch_depth();
- ret = _dispatch(m, true);
+ bool ret = _dispatch(m, true);
dec_dispatch_depth();
return ret;
}
imported_session->auth_caps = session->auth_caps;
assert(session->get_nref() == 1);
imported_session->connection->set_priv(imported_session->get());
+ imported_session->last_seen = session->last_seen;
session = imported_session;
}
}
// (caps go stale, lease die)
double queue_max_age = mds->get_dispatch_queue_max_age(ceph_clock_now());
double cutoff = queue_max_age + mds->mdsmap->get_session_timeout();
- while (1) {
- Session *session = mds->sessionmap.get_oldest_session(Session::STATE_OPEN);
- if (!session) break;
- auto last_cap_renew_span = std::chrono::duration<double>(now-session->last_cap_renew).count();
- if (last_cap_renew_span < cutoff) {
- dout(20) << "laggiest active session is " << session->info.inst << " and renewed caps recently (" << last_cap_renew_span << "s ago)" << dendl;
- break;
+
+ const auto sessions_p1 = mds->sessionmap.by_state.find(Session::STATE_OPEN);
+ if (sessions_p1 != mds->sessionmap.by_state.end() && !sessions_p1->second->empty()) {
+ std::vector<Session*> new_stale;
+
+ for (auto session : *(sessions_p1->second)) {
+ auto last_cap_renew_span = std::chrono::duration<double>(now - session->last_cap_renew).count();
+ if (last_cap_renew_span < cutoff) {
+ dout(20) << "laggiest active session is " << session->info.inst
+ << " and renewed caps recently (" << last_cap_renew_span << "s ago)" << dendl;
+ break;
+ }
+
+ if (session->last_seen > session->last_cap_renew) {
+ last_cap_renew_span = std::chrono::duration<double>(now - session->last_seen).count();
+ if (last_cap_renew_span < cutoff) {
+ dout(20) << "laggiest active session is " << session->info.inst
+ << " and renewed caps recently (" << last_cap_renew_span << "s ago)" << dendl;
+ continue;
+ }
+ }
+
+ dout(10) << "new stale session " << session->info.inst
+ << " last renewed caps " << last_cap_renew_span << "s ago" << dendl;
+ new_stale.push_back(session);
}
- dout(10) << "new stale session " << session->info.inst << " last renewed caps " << last_cap_renew_span << "s ago" << dendl;
- mds->sessionmap.set_state(session, Session::STATE_STALE);
- mds->locker->revoke_stale_caps(session);
- mds->locker->remove_stale_leases(session);
- mds->send_message_client(new MClientSession(CEPH_SESSION_STALE, session->get_push_seq()), session);
- finish_flush_session(session, session->get_push_seq());
+ for (auto session : new_stale) {
+ mds->sessionmap.set_state(session, Session::STATE_STALE);
+ mds->locker->revoke_stale_caps(session);
+ mds->locker->remove_stale_leases(session);
+ mds->send_message_client(new MClientSession(CEPH_SESSION_STALE, session->get_push_seq()), session);
+ finish_flush_session(session, session->get_push_seq());
+ }
}
// autoclose
// Collect a list of sessions exceeding the autoclose threshold
std::vector<Session *> to_evict;
- const auto sessions_p = mds->sessionmap.by_state.find(Session::STATE_STALE);
- if (sessions_p == mds->sessionmap.by_state.end() || sessions_p->second->empty()) {
+ const auto sessions_p2 = mds->sessionmap.by_state.find(Session::STATE_STALE);
+ if (sessions_p2 == mds->sessionmap.by_state.end() || sessions_p2->second->empty()) {
return;
}
- const auto &stale_sessions = sessions_p->second;
+ const auto &stale_sessions = sessions_p2->second;
assert(stale_sessions != nullptr);
for (const auto &session: *stale_sessions) {