]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: fix up subscribe infrastructure to use msgr connection for sessions
authorSage Weil <sage@newdream.net>
Mon, 21 Sep 2009 21:30:19 +0000 (14:30 -0700)
committerSage Weil <sage@newdream.net>
Mon, 21 Sep 2009 21:30:19 +0000 (14:30 -0700)
Session state is now cleaned up when connections drop.  More
efficient in-memory structures.

src/Makefile.am
src/TODO
src/mds/Server.cc
src/mon/MDSMonitor.cc
src/mon/MDSMonitor.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/Session.h [new file with mode: 0644]
src/mon/SubscriptionMap.h [deleted file]

index eb3a40c724c5210be79c2fa13efc37cb0873f296..21f2b3d88fc5dc74220fdced604309b1ae8a6783 100644 (file)
@@ -655,7 +655,7 @@ noinst_HEADERS = \
         mon/PGMonitor.h\
         mon/Paxos.h\
         mon/PaxosService.h\
-        mon/SubscriptionMap.h\
+        mon/Session.h\
         mon/mon_types.h\
        mount/mtab.c\
         msg/Dispatcher.h\
index 490d5e4592faed6e42cd5b8e8a4c8908488fb842..22646e3040a82d920a4b51d350862605153b359e 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -15,6 +15,7 @@ bugs
 - weird osd_lock contention during osd restart?
 
 later
+- mon cluster expansion, contraction
 - authentication
 - client reconnect after long eviction; and slow delayed reconnect
 - ENOSPC
index 8a9a421b7cc4aee6b41c377151a5cd9cb29e97bc..20a5288f2eb059989e10c05d3c3602198f97b518 100644 (file)
@@ -169,7 +169,7 @@ Session *Server::get_session(Message *m)
     session = mds->sessionmap.get_session(m->get_source());
     if (session) {
       dout(20) << "get_session set " << session->inst << " in connection" << dendl;
-      m->get_connection()->set_priv(session);
+      m->get_connection()->set_priv(session->get());
     } else {
       dout(20) << "get_session " << m->get_source() << " dne" << dendl;
     }
index acb414afab11abcad46b6557d404699b5e4fab7f..9443332494052ea1fb21e7d0acac83340594daf8 100644 (file)
@@ -529,31 +529,28 @@ void MDSMonitor::send_latest(entity_inst_t dest)
     waiting_for_map.push_back(dest);
 }
 
-void MDSMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until)
+void MDSMonitor::check_subs()
 {
-  if (paxos->is_readable() &&
-      have < mdsmap.get_epoch()) {
-    send_full(inst);
-    subs.subscribe(inst, mdsmap.get_epoch(), until);    
-  } else
-    subs.subscribe(inst, have, until);
+  nstring type = "mdsmap";
+  xlist<Subscription*>::iterator p = mon->session_map.subs[type].begin();
+  while (!p.end()) {
+    Subscription *sub = *p;
+    ++p;
+    check_sub(sub);
+  }
 }
 
-void MDSMonitor::check_subs()
+void MDSMonitor::check_sub(Subscription *sub)
 {
-  for (map<entity_inst_t, SubscriptionMap::sub_info>::iterator p = subs.subs.begin();
-       p != subs.subs.end();
-       p++) {
-    if (p->second.last < mdsmap.get_epoch()) {
-      send_full(p->first);
-      p->second.last = mdsmap.get_epoch();
-    }
+  if (sub->last < mdsmap.get_epoch()) {
+    send_full(sub->session->inst);
+    if (sub->onetime)
+      mon->session_map.remove_sub(sub);
+    else
+      sub->last = mdsmap.get_epoch();
   }
-  subs.trim_onetime();
 }
 
-
-
 void MDSMonitor::tick()
 {
   // make sure mds's are still alive
@@ -565,8 +562,6 @@ void MDSMonitor::tick()
   
   bool do_propose = false;
 
-  subs.trim(g_clock.now());
-
   if (!mon->is_leader()) return;
 
   // expand mds cluster (add new nodes to @in)?
index e7ba6cca44e3ad5a438e8d331bed5032bca82477..dac42f4ea2d97de3b766f78eced9fcab0a81d689 100644 (file)
@@ -28,7 +28,7 @@ using namespace std;
 #include "mds/MDSMap.h"
 
 #include "PaxosService.h"
-#include "SubscriptionMap.h"
+#include "Session.h"
 
 
 class MMDSBeacon;
@@ -43,8 +43,6 @@ class MDSMonitor : public PaxosService {
 
   MDSMap pending_mdsmap;  // current + pending updates
 
-  SubscriptionMap subs;
-
   // my helpers
   void print_map(MDSMap &m, int dbl=7);
 
@@ -107,8 +105,8 @@ public:
   void tick();     // check state, take actions
   void do_stop();
 
-  void subscribe(entity_inst_t inst, version_t have, utime_t until);
   void check_subs();
+  void check_sub(Subscription *sub);
 
 };
 
index 0cf27ffe858ab33c7e3f1eb75f34c7daf79ec480..39a727f285f774bd6bb3847254e2dcaef0f019c1 100644 (file)
@@ -470,29 +470,54 @@ void Monitor::handle_subscribe(MMonSubscribe *m)
   dout(10) << "handle_subscribe " << *m << dendl;
   
   bool reply = false;
-  utime_t until = g_clock.now();
-  until += g_conf.mon_subscribe_interval;
+
+  Session *s = (Session *)m->get_connection()->get_priv();
+  if (!s) {
+    s = session_map.new_session(m->get_source_inst());
+    m->get_connection()->set_priv(s->get());
+    dout(10) << " new session " << s << " for " << s->inst << dendl;
+  } else {
+    dout(10) << " existing session " << s << " for " << s->inst << dendl;
+  }
+
+  s->until = g_clock.now();
+  s->until += g_conf.mon_subscribe_interval;
 
   for (map<nstring,ceph_mon_subscribe_item>::iterator p = m->what.begin();
        p != m->what.end();
        p++) {
     if (!p->second.onetime)
       reply = true;
-    if (p->first == "osdmap")
-      osdmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
-    else if (p->first == "mdsmap")
-      mdsmon()->subscribe(m->get_source_inst(), p->second.have, p->second.onetime ? utime_t() : until);
-    else
-      dout(10) << " ignoring sub for '" << p->first << "'" << dendl;
+    session_map.add_update_sub(s, p->first, p->second.have, p->second.onetime);
+
+    if (p->first == "mdsmap")
+      mdsmon()->check_sub(s->sub_map["mdsmap"]);
+    else if (p->first == "osdmap")
+      osdmon()->check_sub(s->sub_map["osdmap"]);
   }
 
+  // ???
+
   if (reply)
     messenger->send_message(new MMonSubscribeAck(g_conf.mon_subscribe_interval),
                            m->get_source_inst());
 
+  s->put();
   delete m;
 }
 
+bool Monitor::ms_handle_reset(Connection *con, const entity_addr_t& peer)
+{
+  Session *s = (Session *)con->get_priv();
+  if (!s)
+    return false;
+
+  dout(10) << "reset/close on session " << s->inst << dendl;
+  session_map.remove_session(s);
+  s->put();
+  return true;
+}
+
 void Monitor::handle_mon_get_map(MMonGetMap *m)
 {
   dout(10) << "handle_mon_get_map" << dendl;
@@ -575,6 +600,19 @@ void Monitor::tick()
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
     (*p)->tick();
   
+  // trim sessions
+  utime_t now = g_clock.now();
+  xlist<Session*>::iterator p = session_map.sessions.begin();
+  while (!p.end()) {
+    Session *s = *p;
+    ++p;
+    if (s->until < now) {
+      dout(10) << " trimming session " << s->inst << " (until " << s->until << " < now " << now << ")" << dendl;
+      messenger->mark_down(s->inst.addr);
+      session_map.remove_session(s);
+    }
+  }
+
   // next tick!
   reset_tick();
 }
index 4413ec9c92e40a332d28b61b40ee11809d1cea64..96ee4f36f9e5665ddd79be59dbf9c6fc21c0d8fb 100644 (file)
@@ -31,6 +31,7 @@
 #include "MonMap.h"
 #include "Elector.h"
 #include "Paxos.h"
+#include "Session.h"
 
 #include "osd/OSDMap.h"
 
@@ -127,6 +128,11 @@ public:
   friend class LogMonitor;
 
 
+  // -- sessions --
+  SessionMap session_map;
+
+  
+
   // messages
   void handle_subscribe(MMonSubscribe *m);
   void handle_mon_get_map(MMonGetMap *m);
index 2a7c53c456f6f1936d3291401891f05fa41a0131..c725c6ca0af47b082508ace211c1ec649b3f167a 100644 (file)
@@ -804,31 +804,29 @@ void OSDMonitor::blacklist(entity_addr_t a, utime_t until)
 }
 
 
-void OSDMonitor::subscribe(entity_inst_t inst, version_t have, utime_t until)
+void OSDMonitor::check_subs()
 {
-  if (paxos->is_readable() &&
-      have < osdmap.get_epoch()) {
-    send_latest(inst, have);
-    subs.subscribe(inst, osdmap.get_epoch(), until);    
-  } else
-    subs.subscribe(inst, have, until);
+  nstring type = "osdmap";
+  xlist<Subscription*>::iterator p = mon->session_map.subs[type].begin();
+  while (!p.end()) {
+    Subscription *sub = *p;
+    ++p;
+    check_sub(sub);
+  }
 }
 
-void OSDMonitor::check_subs()
+void OSDMonitor::check_sub(Subscription *sub)
 {
-  for (map<entity_inst_t, SubscriptionMap::sub_info>::iterator p = subs.subs.begin();
-       p != subs.subs.end();
-       p++) {
-    if (p->second.last < osdmap.get_epoch()) {
-      send_latest(p->first, p->second.last);
-      p->second.last = osdmap.get_epoch();
-    }
+  if (sub->last < osdmap.get_epoch()) {
+    send_latest(sub->session->inst, sub->last);
+    if (sub->onetime)
+      mon->session_map.remove_sub(sub);
+    else
+      sub->last = osdmap.get_epoch();
   }
-  subs.trim_onetime();
 }
 
 
-
 // TICK
 
 
@@ -839,8 +837,6 @@ void OSDMonitor::tick()
   update_from_paxos();
   dout(10) << osdmap << dendl;
 
-  subs.trim(g_clock.now());
-
   if (!mon->is_leader()) return;
 
   bool do_propose = false;
index f5ac78e1b7a7f02dadda88ff5a3291ac84273c2d..577aa3c79b0e6acb2dd113136df4108426f7c0fb 100644 (file)
@@ -28,7 +28,7 @@ using namespace std;
 #include "osd/OSDMap.h"
 
 #include "PaxosService.h"
-#include "SubscriptionMap.h"
+#include "Session.h"
 
 class Monitor;
 class MOSDBoot;
@@ -38,7 +38,6 @@ class MPoolSnap;
 class OSDMonitor : public PaxosService {
 public:
   OSDMap osdmap;
-  SubscriptionMap subs;
 
 private:
   map<entity_inst_t, epoch_t> waiting_for_map;  // who -> start epoch
@@ -166,8 +165,8 @@ private:
   void fake_osdmap_update();
   void fake_reorg();
 
-  void subscribe(entity_inst_t inst, version_t have, utime_t until);
   void check_subs();
+  void check_sub(Subscription *sub);
 
 };
 
diff --git a/src/mon/Session.h b/src/mon/Session.h
new file mode 100644 (file)
index 0000000..60e2834
--- /dev/null
@@ -0,0 +1,90 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MON_SESSION_H
+#define __MON_SESSION_H
+
+#include "include/xlist.h"
+#include "msg/msg_types.h"
+
+struct Session;
+
+struct Subscription {
+  Session *session;
+  nstring type;
+  xlist<Subscription*>::item type_item;
+  version_t last;
+  bool onetime;
+  
+  Subscription(Session *s, const nstring& t) : session(s), type(t), type_item(this) {};
+};
+
+
+struct Session : public RefCountedObject {
+  entity_inst_t inst;
+  utime_t until;
+  bool closed;
+  xlist<Session*>::item item;
+
+  map<nstring, Subscription*> sub_map;
+
+  Session(entity_inst_t i) : inst(i), closed(false), item(this) {}
+  ~Session() {
+    generic_dout(0) << "~Session " << this << dendl;
+    // we should have been removed before we get destructed; see SessionMap::remove_session()
+    assert(!item.is_on_xlist());
+    assert(sub_map.empty());
+  }
+};
+
+struct SessionMap {
+  xlist<Session*> sessions;
+  map<nstring, xlist<Subscription*> > subs;
+
+  void remove_session(Session *s) {
+    for (map<nstring,Subscription*>::iterator p = s->sub_map.begin(); p != s->sub_map.end(); ++p)
+      p->second->type_item.remove_myself();
+    s->sub_map.clear();
+    s->item.remove_myself();
+    s->put();
+  }
+
+  Session *new_session(entity_inst_t i) {
+    Session *s = new Session(i);
+    sessions.push_back(&s->item);
+    s->get();  // caller gets a ref
+    return s;
+  }
+
+  void add_update_sub(Session *s, const nstring& what, version_t have, bool onetime) {
+    Subscription *sub = 0;
+    if (s->sub_map.count(what)) {
+      sub = s->sub_map[what];
+    } else {
+      sub = new Subscription(s, what);
+      s->sub_map[what] = sub;
+      subs[what].push_back(&sub->type_item);
+    }
+    sub->last = have;
+    sub->onetime = onetime;
+  }
+
+  void remove_sub(Subscription *sub) {
+    sub->session->sub_map.erase(sub->type);
+    sub->type_item.remove_myself();
+    delete sub;
+  }
+};
+
+#endif
diff --git a/src/mon/SubscriptionMap.h b/src/mon/SubscriptionMap.h
deleted file mode 100644 (file)
index 1f4eec6..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef __MON_SUBSCRIPTIONMAP_H
-#define __MON_SUBSCRIPTIONMAP_H
-
-#include "msg/msg_types.h"
-
-struct SubscriptionMap {
-  struct sub_info {
-    version_t last;
-    utime_t until;
-  };
-  map<entity_inst_t, sub_info> subs;
-
-  void subscribe(entity_inst_t a, version_t h, utime_t u) {
-    if (!subs.count(a))
-      subs[a].last = h;
-    subs[a].until = u;
-  }
-
-  void trim(utime_t now) {
-    map<entity_inst_t, sub_info>::iterator p = subs.begin();
-    while (p != subs.end())
-      if (p->second.until != utime_t() &&
-         p->second.until < now)
-       subs.erase(p++);
-      else
-       p++;
-  }
-  void trim_onetime() {
-    map<entity_inst_t, sub_info>::iterator p = subs.begin();
-    while (p != subs.end())
-      if (p->second.until == utime_t())
-       subs.erase(p++);
-      else
-       p++;
-  }
-};
-
-#endif