void Monitor::remove_all_sessions()
{
+ Mutex::Locker l(session_map_lock);
while (!session_map.sessions.empty()) {
MonSession *s = session_map.sessions.front();
remove_session(s);
con->mark_down();
// proxied sessions aren't registered and don't have a con; don't remove
// those.
- if (!s->proxy_con)
+ if (!s->proxy_con) {
+ Mutex::Locker l(session_map_lock);
remove_session(s);
+ }
op->mark_zap();
}
}
}
ConnectionRef con = m->get_connection();
- s = session_map.new_session(m->get_source_inst(), con.get());
+ {
+ Mutex::Locker l(session_map_lock);
+ s = session_map.new_session(m->get_source_inst(), con.get());
+ }
assert(s);
con->set_priv(s->get());
dout(10) << __func__ << " new session " << s << " " << *s << dendl;
for (map<string, Subscription*>::iterator it = s->sub_map.begin();
it != s->sub_map.end(); ) {
if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
+ Mutex::Locker l(session_map_lock);
session_map.remove_sub((it++)->second);
} else {
++it;
}
}
- session_map.add_update_sub(s, p->first, p->second.start,
- p->second.flags & CEPH_SUBSCRIBE_ONETIME,
- m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
+ {
+ Mutex::Locker l(session_map_lock);
+ session_map.add_update_sub(s, p->first, p->second.start,
+ p->second.flags & CEPH_SUBSCRIBE_ONETIME,
+ m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
+ }
if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
Mutex::Locker l(lock);
dout(10) << "reset/close on session " << s->inst << dendl;
- if (!s->closed)
+ if (!s->closed) {
+ Mutex::Locker l(session_map_lock);
remove_session(s);
+ }
s->put();
return true;
}
// trim sessions
utime_t now = ceph_clock_now();
- xlist<MonSession*>::iterator p = session_map.sessions.begin();
+ {
+ Mutex::Locker l(session_map_lock);
+ auto p = session_map.sessions.begin();
- bool out_for_too_long = (!exited_quorum.is_zero()
- && now > (exited_quorum + 2*g_conf->mon_lease));
+ bool out_for_too_long = (!exited_quorum.is_zero() &&
+ now > (exited_quorum + 2*g_conf->mon_lease));
- while (!p.end()) {
- MonSession *s = *p;
- ++p;
+ while (!p.end()) {
+ MonSession *s = *p;
+ ++p;
- // don't trim monitors
- if (s->inst.name.is_mon())
- continue;
+ // don't trim monitors
+ if (s->inst.name.is_mon())
+ continue;
- if (s->session_timeout < now && s->con) {
- // check keepalive, too
- s->session_timeout = s->con->get_last_keepalive();
- s->session_timeout += g_conf->mon_session_timeout;
- }
- if (s->session_timeout < now) {
- dout(10) << " trimming session " << s->con << " " << s->inst
- << " (timeout " << s->session_timeout
- << " < now " << now << ")" << dendl;
- } else if (out_for_too_long) {
- // boot the client Session because we've taken too long getting back in
- dout(10) << " trimming session " << s->con << " " << s->inst
- << " because we've been out of quorum too long" << dendl;
- } else {
- continue;
- }
+ if (s->session_timeout < now && s->con) {
+ // check keepalive, too
+ s->session_timeout = s->con->get_last_keepalive();
+ s->session_timeout += g_conf->mon_session_timeout;
+ }
+ if (s->session_timeout < now) {
+ dout(10) << " trimming session " << s->con << " " << s->inst
+ << " (timeout " << s->session_timeout
+ << " < now " << now << ")" << dendl;
+ } else if (out_for_too_long) {
+ // boot the client Session because we've taken too long getting back in
+ dout(10) << " trimming session " << s->con << " " << s->inst
+ << " because we've been out of quorum too long" << dendl;
+ } else {
+ continue;
+ }
- s->con->mark_down();
- remove_session(s);
- logger->inc(l_mon_session_trim);
+ s->con->mark_down();
+ remove_session(s);
+ logger->inc(l_mon_session_trim);
+ }
}
-
sync_trim_providers();
if (!maybe_wait_for_quorum.empty()) {
void MonmapMonitor::check_subs()
{
const string type = "monmap";
- auto subs = mon->session_map.subs.find(type);
- if (subs == mon->session_map.subs.end())
- return;
- for (auto sub : *subs->second) {
- check_sub(sub);
- }
+ mon->with_session_map([this, &type](const MonSessionMap& session_map) {
+ auto subs = session_map.subs.find(type);
+ if (subs == session_map.subs.end())
+ return;
+ for (auto sub : *subs->second) {
+ check_sub(sub);
+ }
+ });
}
void MonmapMonitor::check_sub(Subscription *sub)
<< " have " << epoch << dendl;
if (sub->next <= epoch) {
mon->send_latest_monmap(sub->session->con.get());
- if (sub->onetime)
- mon->session_map.remove_sub(sub);
- else
+ if (sub->onetime) {
+ mon->with_session_map([this, sub](MonSessionMap& session_map) {
+ session_map.remove_sub(sub);
+ });
+ } else {
sub->next = epoch + 1;
+ }
}
}
void PGMonitor::check_subs()
{
dout(10) << __func__ << dendl;
- string type = "osd_pg_creates";
- if (mon->session_map.subs.count(type) == 0)
- return;
-
- xlist<Subscription*>::iterator p = mon->session_map.subs[type]->begin();
- while (!p.end()) {
- Subscription *sub = *p;
- ++p;
- dout(20) << __func__ << " .. " << sub->session->inst << dendl;
- check_sub(sub);
- }
+ const string type = "osd_pg_creates";
+
+ mon->with_session_map([this, &type](const MonSessionMap& session_map) {
+ if (mon->session_map.subs.count(type) == 0)
+ return;
+
+ auto p = mon->session_map.subs[type]->begin();
+ while (!p.end()) {
+ Subscription *sub = *p;
+ ++p;
+ dout(20) << __func__ << " .. " << sub->session->inst << dendl;
+ check_sub(sub);
+ }
+ });
}
void PGMonitor::check_sub(Subscription *sub)