mon/PGMonitor.h\
mon/Paxos.h\
mon/PaxosService.h\
- mon/SubscriptionMap.h\
+ mon/Session.h\
mon/mon_types.h\
mount/mtab.c\
msg/Dispatcher.h\
- weird osd_lock contention during osd restart?
later
+- mon cluster expansion, contraction
- authentication
- client reconnect after long eviction; and slow delayed reconnect
- ENOSPC
session = mds->sessionmap.get_session(m->get_source());
if (session) {
dout(20) << "get_session set " << session->inst << " in connection" << dendl;
- m->get_connection()->set_priv(session);
+ m->get_connection()->set_priv(session->get());
} else {
dout(20) << "get_session " << m->get_source() << " dne" << dendl;
}
waiting_for_map.push_back(dest);
}
-void MDSMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until)
+void MDSMonitor::check_subs()
{
- if (paxos->is_readable() &&
- have < mdsmap.get_epoch()) {
- send_full(inst);
- subs.subscribe(inst, mdsmap.get_epoch(), until);
- } else
- subs.subscribe(inst, have, until);
+ nstring type = "mdsmap";
+ xlist<Subscription*>::iterator p = mon->session_map.subs[type].begin();
+ while (!p.end()) {
+ Subscription *sub = *p;
+ ++p;
+ check_sub(sub);
+ }
}
-void MDSMonitor::check_subs()
+void MDSMonitor::check_sub(Subscription *sub)
{
- for (map<entity_inst_t, SubscriptionMap::sub_info>::iterator p = subs.subs.begin();
- p != subs.subs.end();
- p++) {
- if (p->second.last < mdsmap.get_epoch()) {
- send_full(p->first);
- p->second.last = mdsmap.get_epoch();
- }
+ if (sub->last < mdsmap.get_epoch()) {
+ send_full(sub->session->inst);
+ if (sub->onetime)
+ mon->session_map.remove_sub(sub);
+ else
+ sub->last = mdsmap.get_epoch();
}
- subs.trim_onetime();
}
-
-
void MDSMonitor::tick()
{
// make sure mds's are still alive
bool do_propose = false;
- subs.trim(g_clock.now());
-
if (!mon->is_leader()) return;
// expand mds cluster (add new nodes to @in)?
#include "mds/MDSMap.h"
#include "PaxosService.h"
-#include "SubscriptionMap.h"
+#include "Session.h"
class MMDSBeacon;
MDSMap pending_mdsmap; // current + pending updates
- SubscriptionMap subs;
-
// my helpers
void print_map(MDSMap &m, int dbl=7);
void tick(); // check state, take actions
void do_stop();
- void subscribe(entity_inst_t inst, version_t have, utime_t until);
void check_subs();
+ void check_sub(Subscription *sub);
};
dout(10) << "handle_subscribe " << *m << dendl;
bool reply = false;
- utime_t until = g_clock.now();
- until += g_conf.mon_subscribe_interval;
+
+ Session *s = (Session *)m->get_connection()->get_priv();
+ if (!s) {
+ s = session_map.new_session(m->get_source_inst());
+ m->get_connection()->set_priv(s->get());
+ dout(10) << " new session " << s << " for " << s->inst << dendl;
+ } else {
+ dout(10) << " existing session " << s << " for " << s->inst << dendl;
+ }
+
+ s->until = g_clock.now();
+ s->until += g_conf.mon_subscribe_interval;
for (map<nstring,ceph_mon_subscribe_item>::iterator p = m->what.begin();
p != m->what.end();
p++) {
if (!p->second.onetime)
reply = true;
- if (p->first == "osdmap")
- osdmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
- else if (p->first == "mdsmap")
- mdsmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
- else
- dout(10) << " ignoring sub for '" << p->first << "'" << dendl;
+ session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime);
+
+ if (p->first == "mdsmap")
+ mdsmon()->check_sub(s->sub_map["mdsmap"]);
+ else if (p->first == "osdmap")
+ osdmon()->check_sub(s->sub_map["osdmap"]);
}
+ // ???
+
if (reply)
messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval),
m->get_source_inst());
+ s->put();
delete m;
}
+bool Monitor::ms_handle_reset(Connection *con, const entity_addr_t& peer)
+{
+ Session *s = (Session *)con->get_priv();
+ if (!s)
+ return false;
+
+ dout(10) << "reset/close on session " << s->inst << dendl;
+ session_map.remove_session(s);
+ s->put();
+ return true;
+}
+
void Monitor::handle_mon_get_map(MMonGetMap *m)
{
dout(10) << "handle_mon_get_map" << dendl;
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
(*p)->tick();
+ // trim sessions
+ utime_t now = g_clock.now();
+ xlist<Session*>::iterator p = session_map.sessions.begin();
+ while (!p.end()) {
+ Session *s = *p;
+ ++p;
+ if (s->until < now) {
+ dout(10) << " trimming session " << s->inst << " (until " << s->until << " < now " << now << ")" << dendl;
+ messenger->mark_down(s->inst.addr);
+ session_map.remove_session(s);
+ }
+ }
+
// next tick!
reset_tick();
}
#include "MonMap.h"
#include "Elector.h"
#include "Paxos.h"
+#include "Session.h"
#include "osd/OSDMap.h"
friend class LogMonitor;
+ // -- sessions --
+ SessionMap session_map;
+
+
+
// messages
void handle_subscribe(MMonSubscribe *m);
void handle_mon_get_map(MMonGetMap *m);
}
-void OSDMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until)
+void OSDMonitor::check_subs()
{
- if (paxos->is_readable() &&
- have < osdmap.get_epoch()) {
- send_latest(inst, have);
- subs.subscribe(inst, osdmap.get_epoch(), until);
- } else
- subs.subscribe(inst, have, until);
+ nstring type = "osdmap";
+ xlist<Subscription*>::iterator p = mon->session_map.subs[type].begin();
+ while (!p.end()) {
+ Subscription *sub = *p;
+ ++p;
+ check_sub(sub);
+ }
}
-void OSDMonitor::check_subs()
+void OSDMonitor::check_sub(Subscription *sub)
{
- for (map<entity_inst_t, SubscriptionMap::sub_info>::iterator p = subs.subs.begin();
- p != subs.subs.end();
- p++) {
- if (p->second.last < osdmap.get_epoch()) {
- send_latest(p->first, p->second.last);
- p->second.last = osdmap.get_epoch();
- }
+ if (sub->last < osdmap.get_epoch()) {
+ send_latest(sub->session->inst, sub->last);
+ if (sub->onetime)
+ mon->session_map.remove_sub(sub);
+ else
+ sub->last = osdmap.get_epoch();
}
- subs.trim_onetime();
}
-
// TICK
update_from_paxos();
dout(10) << osdmap << dendl;
- subs.trim(g_clock.now());
-
if (!mon->is_leader()) return;
bool do_propose = false;
#include "osd/OSDMap.h"
#include "PaxosService.h"
-#include "SubscriptionMap.h"
+#include "Session.h"
class Monitor;
class MOSDBoot;
class OSDMonitor : public PaxosService {
public:
OSDMap osdmap;
- SubscriptionMap subs;
private:
map<entity_inst_t, epoch_t> waiting_for_map; // who -> start epoch
void fake_osdmap_update();
void fake_reorg();
- void subscribe(entity_inst_t inst, version_t have, utime_t until);
void check_subs();
+ void check_sub(Subscription *sub);
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MON_SESSION_H
+#define __MON_SESSION_H
+
+#include "include/xlist.h"
+#include "msg/msg_types.h"
+
+struct Session;
+
+struct Subscription {
+ Session *session;
+ nstring type;
+ xlist<Subscription*>::item type_item;
+ version_t last;
+ bool onetime;
+
+ Subscription(Session *s, const nstring& t) : session(s), type(t), type_item(this) {};
+};
+
+
+struct Session : public RefCountedObject {
+ entity_inst_t inst;
+ utime_t until;
+ bool closed;
+ xlist<Session*>::item item;
+
+ map<nstring, Subscription*> sub_map;
+
+ Session(entity_inst_t i) : inst(i), closed(false), item(this) {}
+ ~Session() {
+ generic_dout(0) << "~Session " << this << dendl;
+ // we should have been removed before we get destructed; see SessionMap::remove_session()
+ assert(!item.is_on_xlist());
+ assert(sub_map.empty());
+ }
+};
+
+struct SessionMap {
+ xlist<Session*> sessions;
+ map<nstring, xlist<Subscription*> > subs;
+
+ void remove_session(Session *s) {
+ for (map<nstring,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p)
+ p->second->type_item.remove_myself();
+ s->sub_map.clear();
+ s->item.remove_myself();
+ s->put();
+ }
+
+ Session *new_session(entity_inst_t i) {
+ Session *s = new Session(i);
+ sessions.push_back(&s->item);
+ s->get(); // caller gets a ref
+ return s;
+ }
+
+ void add_update_sub(Session *s, const nstring& what, version_t have, bool onetime) {
+ Subscription *sub = 0;
+ if (s->sub_map.count(what)) {
+ sub = s->sub_map[what];
+ } else {
+ sub = new Subscription(s, what);
+ s->sub_map[what] = sub;
+ subs[what].push_back(&sub->type_item);
+ }
+ sub->last = have;
+ sub->onetime = onetime;
+ }
+
+ void remove_sub(Subscription *sub) {
+ sub->session->sub_map.erase(sub->type);
+ sub->type_item.remove_myself();
+ delete sub;
+ }
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __MON_SUBSCRIPTIONMAP_H
-#define __MON_SUBSCRIPTIONMAP_H
-
-#include "msg/msg_types.h"
-
-struct SubscriptionMap {
- struct sub_info {
- version_t last;
- utime_t until;
- };
- map<entity_inst_t, sub_info> subs;
-
- void subscribe(entity_inst_t a, version_t h, utime_t u) {
- if (!subs.count(a))
- subs[a].last = h;
- subs[a].until = u;
- }
-
- void trim(utime_t now) {
- map<entity_inst_t, sub_info>::iterator p = subs.begin();
- while (p != subs.end())
- if (p->second.until != utime_t() &&
- p->second.until < now)
- subs.erase(p++);
- else
- p++;
- }
- void trim_onetime() {
- map<entity_inst_t, sub_info>::iterator p = subs.begin();
- while (p != subs.end())
- if (p->second.until == utime_t())
- subs.erase(p++);
- else
- p++;
- }
-};
-
-#endif