From c2c9ecd1dd7ad9a0942d7f674442709513b3b9b5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 21 Sep 2009 14:30:19 -0700 Subject: [PATCH] mon: fix up subscribe infrastructure to use msgr connection for sessions Session state is now cleaned up when connections drop. More efficient in-memory structures. --- src/Makefile.am | 2 +- src/TODO | 1 + src/mds/Server.cc | 2 +- src/mon/MDSMonitor.cc | 35 +++++++-------- src/mon/MDSMonitor.h | 6 +-- src/mon/Monitor.cc | 54 +++++++++++++++++++---- src/mon/Monitor.h | 6 +++ src/mon/OSDMonitor.cc | 34 +++++++-------- src/mon/OSDMonitor.h | 5 +-- src/mon/Session.h | 90 +++++++++++++++++++++++++++++++++++++++ src/mon/SubscriptionMap.h | 52 ---------------------- 11 files changed, 179 insertions(+), 108 deletions(-) create mode 100644 src/mon/Session.h delete mode 100644 src/mon/SubscriptionMap.h diff --git a/src/Makefile.am b/src/Makefile.am index eb3a40c724c52..21f2b3d88fc5d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -655,7 +655,7 @@ noinst_HEADERS = \ mon/PGMonitor.h\ mon/Paxos.h\ mon/PaxosService.h\ - mon/SubscriptionMap.h\ + mon/Session.h\ mon/mon_types.h\ mount/mtab.c\ msg/Dispatcher.h\ diff --git a/src/TODO b/src/TODO index 490d5e4592fae..22646e3040a82 100644 --- a/src/TODO +++ b/src/TODO @@ -15,6 +15,7 @@ bugs - weird osd_lock contention during osd restart? later +- mon cluster expansion, contraction - authentication - client reconnect after long eviction; and slow delayed reconnect - ENOSPC diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 8a9a421b7cc4a..20a5288f2eb05 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -169,7 +169,7 @@ Session *Server::get_session(Message *m) session = mds->sessionmap.get_session(m->get_source()); if (session) { dout(20) << "get_session set " << session->inst << " in connection" << dendl; - m->get_connection()->set_priv(session); + m->get_connection()->set_priv(session->get()); } else { dout(20) << "get_session " << m->get_source() << " dne" << dendl; } diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index acb414afab11a..9443332494052 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -529,31 +529,28 @@ void MDSMonitor::send_latest(entity_inst_t dest) waiting_for_map.push_back(dest); } -void MDSMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until) +void MDSMonitor::check_subs() { - if (paxos->is_readable() && - have < mdsmap.get_epoch()) { - send_full(inst); - subs.subscribe(inst, mdsmap.get_epoch(), until); - } else - subs.subscribe(inst, have, until); + nstring type = "mdsmap"; + xlist::iterator p = mon->session_map.subs[type].begin(); + while (!p.end()) { + Subscription *sub = *p; + ++p; + check_sub(sub); + } } -void MDSMonitor::check_subs() +void MDSMonitor::check_sub(Subscription *sub) { - for (map::iterator p = subs.subs.begin(); - p != subs.subs.end(); - p++) { - if (p->second.last < mdsmap.get_epoch()) { - send_full(p->first); - p->second.last = mdsmap.get_epoch(); - } + if (sub->last < mdsmap.get_epoch()) { + send_full(sub->session->inst); + if (sub->onetime) + mon->session_map.remove_sub(sub); + else + sub->last = mdsmap.get_epoch(); } - subs.trim_onetime(); } - - void MDSMonitor::tick() { // make sure mds's are still alive @@ -565,8 +562,6 @@ void MDSMonitor::tick() bool do_propose = false; - subs.trim(g_clock.now()); - if (!mon->is_leader()) return; // expand mds cluster (add new nodes to @in)? diff --git a/src/mon/MDSMonitor.h b/src/mon/MDSMonitor.h index e7ba6cca44e3a..dac42f4ea2d97 100644 --- a/src/mon/MDSMonitor.h +++ b/src/mon/MDSMonitor.h @@ -28,7 +28,7 @@ using namespace std; #include "mds/MDSMap.h" #include "PaxosService.h" -#include "SubscriptionMap.h" +#include "Session.h" class MMDSBeacon; @@ -43,8 +43,6 @@ class MDSMonitor : public PaxosService { MDSMap pending_mdsmap; // current + pending updates - SubscriptionMap subs; - // my helpers void print_map(MDSMap &m, int dbl=7); @@ -107,8 +105,8 @@ public: void tick(); // check state, take actions void do_stop(); - void subscribe(entity_inst_t inst, version_t have, utime_t until); void check_subs(); + void check_sub(Subscription *sub); }; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 0cf27ffe858ab..39a727f285f77 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -470,29 +470,54 @@ void Monitor::handle_subscribe(MMonSubscribe *m) dout(10) << "handle_subscribe " << *m << dendl; bool reply = false; - utime_t until = g_clock.now(); - until += g_conf.mon_subscribe_interval; + + Session *s = (Session *)m->get_connection()->get_priv(); + if (!s) { + s = session_map.new_session(m->get_source_inst()); + m->get_connection()->set_priv(s->get()); + dout(10) << " new session " << s << " for " << s->inst << dendl; + } else { + dout(10) << " existing session " << s << " for " << s->inst << dendl; + } + + s->until = g_clock.now(); + s->until += g_conf.mon_subscribe_interval; for (map::iterator p = m->what.begin(); p != m->what.end(); p++) { if (!p->second.onetime) reply = true; - if (p->first == "osdmap") - osdmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until); - else if (p->first == "mdsmap") - mdsmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until); - else - dout(10) << " ignoring sub for '" << p->first << "'" << dendl; + session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime); + + if (p->first == "mdsmap") + mdsmon()->check_sub(s->sub_map["mdsmap"]); + else if (p->first == "osdmap") + osdmon()->check_sub(s->sub_map["osdmap"]); } + // ??? + if (reply) messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval), m->get_source_inst()); + s->put(); delete m; } +bool Monitor::ms_handle_reset(Connection *con, const entity_addr_t& peer) +{ + Session *s = (Session *)con->get_priv(); + if (!s) + return false; + + dout(10) << "reset/close on session " << s->inst << dendl; + session_map.remove_session(s); + s->put(); + return true; +} + void Monitor::handle_mon_get_map(MMonGetMap *m) { dout(10) << "handle_mon_get_map" << dendl; @@ -575,6 +600,19 @@ void Monitor::tick() for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); p++) (*p)->tick(); + // trim sessions + utime_t now = g_clock.now(); + xlist::iterator p = session_map.sessions.begin(); + while (!p.end()) { + Session *s = *p; + ++p; + if (s->until < now) { + dout(10) << " trimming session " << s->inst << " (until " << s->until << " < now " << now << ")" << dendl; + messenger->mark_down(s->inst.addr); + session_map.remove_session(s); + } + } + // next tick! reset_tick(); } diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 4413ec9c92e40..96ee4f36f9e56 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -31,6 +31,7 @@ #include "MonMap.h" #include "Elector.h" #include "Paxos.h" +#include "Session.h" #include "osd/OSDMap.h" @@ -127,6 +128,11 @@ public: friend class LogMonitor; + // -- sessions -- + SessionMap session_map; + + + // messages void handle_subscribe(MMonSubscribe *m); void handle_mon_get_map(MMonGetMap *m); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 2a7c53c456f6f..c725c6ca0af47 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -804,31 +804,29 @@ void OSDMonitor::blacklist(entity_addr_t a, utime_t until) } -void OSDMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until) +void OSDMonitor::check_subs() { - if (paxos->is_readable() && - have < osdmap.get_epoch()) { - send_latest(inst, have); - subs.subscribe(inst, osdmap.get_epoch(), until); - } else - subs.subscribe(inst, have, until); + nstring type = "osdmap"; + xlist::iterator p = mon->session_map.subs[type].begin(); + while (!p.end()) { + Subscription *sub = *p; + ++p; + check_sub(sub); + } } -void OSDMonitor::check_subs() +void OSDMonitor::check_sub(Subscription *sub) { - for (map::iterator p = subs.subs.begin(); - p != subs.subs.end(); - p++) { - if (p->second.last < osdmap.get_epoch()) { - send_latest(p->first, p->second.last); - p->second.last = osdmap.get_epoch(); - } + if (sub->last < osdmap.get_epoch()) { + send_latest(sub->session->inst, sub->last); + if (sub->onetime) + mon->session_map.remove_sub(sub); + else + sub->last = osdmap.get_epoch(); } - subs.trim_onetime(); } - // TICK @@ -839,8 +837,6 @@ void OSDMonitor::tick() update_from_paxos(); dout(10) << osdmap << dendl; - subs.trim(g_clock.now()); - if (!mon->is_leader()) return; bool do_propose = false; diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index f5ac78e1b7a7f..577aa3c79b0e6 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -28,7 +28,7 @@ using namespace std; #include "osd/OSDMap.h" #include "PaxosService.h" -#include "SubscriptionMap.h" +#include "Session.h" class Monitor; class MOSDBoot; @@ -38,7 +38,6 @@ class MPoolSnap; class OSDMonitor : public PaxosService { public: OSDMap osdmap; - SubscriptionMap subs; private: map waiting_for_map; // who -> start epoch @@ -166,8 +165,8 @@ private: void fake_osdmap_update(); void fake_reorg(); - void subscribe(entity_inst_t inst, version_t have, utime_t until); void check_subs(); + void check_sub(Subscription *sub); }; diff --git a/src/mon/Session.h b/src/mon/Session.h new file mode 100644 index 0000000000000..60e2834a442ff --- /dev/null +++ b/src/mon/Session.h @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MON_SESSION_H +#define __MON_SESSION_H + +#include "include/xlist.h" +#include "msg/msg_types.h" + +struct Session; + +struct Subscription { + Session *session; + nstring type; + xlist::item type_item; + version_t last; + bool onetime; + + Subscription(Session *s, const nstring& t) : session(s), type(t), type_item(this) {}; +}; + + +struct Session : public RefCountedObject { + entity_inst_t inst; + utime_t until; + bool closed; + xlist::item item; + + map sub_map; + + Session(entity_inst_t i) : inst(i), closed(false), item(this) {} + ~Session() { + generic_dout(0) << "~Session " << this << dendl; + // we should have been removed before we get destructed; see SessionMap::remove_session() + assert(!item.is_on_xlist()); + assert(sub_map.empty()); + } +}; + +struct SessionMap { + xlist sessions; + map > subs; + + void remove_session(Session *s) { + for (map::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p) + p->second->type_item.remove_myself(); + s->sub_map.clear(); + s->item.remove_myself(); + s->put(); + } + + Session *new_session(entity_inst_t i) { + Session *s = new Session(i); + sessions.push_back(&s->item); + s->get(); // caller gets a ref + return s; + } + + void add_update_sub(Session *s, const nstring& what, version_t have, bool onetime) { + Subscription *sub = 0; + if (s->sub_map.count(what)) { + sub = s->sub_map[what]; + } else { + sub = new Subscription(s, what); + s->sub_map[what] = sub; + subs[what].push_back(&sub->type_item); + } + sub->last = have; + sub->onetime = onetime; + } + + void remove_sub(Subscription *sub) { + sub->session->sub_map.erase(sub->type); + sub->type_item.remove_myself(); + delete sub; + } +}; + +#endif diff --git a/src/mon/SubscriptionMap.h b/src/mon/SubscriptionMap.h deleted file mode 100644 index 1f4eec6ec5890..0000000000000 --- a/src/mon/SubscriptionMap.h +++ /dev/null @@ -1,52 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef __MON_SUBSCRIPTIONMAP_H -#define __MON_SUBSCRIPTIONMAP_H - -#include "msg/msg_types.h" - -struct SubscriptionMap { - struct sub_info { - version_t last; - utime_t until; - }; - map subs; - - void subscribe(entity_inst_t a, version_t h, utime_t u) { - if (!subs.count(a)) - subs[a].last = h; - subs[a].until = u; - } - - void trim(utime_t now) { - map::iterator p = subs.begin(); - while (p != subs.end()) - if (p->second.until != utime_t() && - p->second.until < now) - subs.erase(p++); - else - p++; - } - void trim_onetime() { - map::iterator p = subs.begin(); - while (p != subs.end()) - if (p->second.until == utime_t()) - subs.erase(p++); - else - p++; - } -}; - -#endif -- 2.39.5