]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cobserver: utility, observe changes in different maps
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 9 Dec 2008 21:39:10 +0000 (13:39 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 9 Dec 2008 22:12:31 +0000 (14:12 -0800)
src/Makefile.am
src/config.cc
src/config.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/Paxos.cc
src/mon/Paxos.h
src/msg/Message.cc
src/msg/Message.h

index c392141b420901521878eab9cd67287e1d2c4fe2..f3bb88331ec8385bafe18568f3fada2ebca3ccad 100644 (file)
@@ -9,6 +9,8 @@ cmon_SOURCES = cmon.cc msg/SimpleMessenger.cc
 cmon_LDADD = libcommon.a libmon.a libcrush.a libcommon.a 
 
 # admin tools
+cobserver_SOURCES = cobserver.cc msg/SimpleMessenger.cc
+cobserver_LDADD = libcrush.a libcommon.a
 cmonctl_SOURCES = cmonctl.cc msg/SimpleMessenger.cc
 cmonctl_LDADD = libcommon.a -ledit
 mkmonfs_SOURCES = mkmonfs.cc
@@ -41,7 +43,7 @@ csyn_LDADD = libcommon.a libclient.a libosdc.a libcrush.a libcommon.a
 
 bin_PROGRAMS = \
        cmon cmds cosd csyn \
-       cmonctl \
+       cmonctl cobserver \
        mkmonfs monmaptool osdmaptool crushtool \
        streamtest dupstore dumpjournal testmsgr
 
index d5d2545d4e09d6a7b3b6202815b896fa8cf9ab7c..673000fc854726e8be013e1af42b311c44de5fe9 100644 (file)
@@ -277,6 +277,7 @@ md_config_t g_conf = {
   mon_pg_create_interval: 30.0, // no more than every 30s
 
   paxos_propose_interval: 1.0,  // gather updates for this long before proposing a map update
+  paxos_observer_timeout: 5*60,  // gather updates for this long before proposing a map update
 
   // --- client ---
   client_cache_size: 1000,
index 2de072f144bf0febc5665e26c0c241fc1276cc81..d51a70e054a65763af6d2d3566c0bba2819b7e0d 100644 (file)
@@ -149,6 +149,7 @@ struct md_config_t {
   float mon_pg_create_interval;
 
   double paxos_propose_interval;
+  double paxos_observer_timeout;
 
   // client
   int      client_cache_size;
index 636878056fdd027de5452f5a23b018ce55b82433..fc9aa47343e783992373b7460f34c03b45a64b45 100644 (file)
@@ -28,6 +28,8 @@
 #include "messages/MGenericMessage.h"
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
+#include "messages/MMonObserve.h"
+#include "messages/MMonObserveNotify.h"
 
 #include "messages/MMonPaxos.h"
 
@@ -281,6 +283,21 @@ void Monitor::reply_command(MMonCommand *m, int rc, const string &rs, bufferlist
   delete m;
 }
 
+void Monitor::register_observer(MMonObserve *m)
+{
+  if (m->monitor_id >= PAXOS_NUM) {
+    dout(0) << "register_observer: wrong monitor id: " << m->monitor_id << dendl;
+    delete m;
+    return;
+  }
+  Paxos *paxos = paxos_service[m->monitor_id]->paxos;
+  assert(paxos);
+  entity_inst_t inst=m->get_orig_source_inst();
+  PaxosObserver *observer = new PaxosObserver(paxos, inst, m->ver);
+  paxos->register_observer(observer);
+  delete m;
+}
+
 
 void Monitor::inject_args(const entity_inst_t& inst, vector<string>& args)
 {
@@ -338,6 +355,10 @@ bool Monitor::dispatch_impl(Message *m)
       handle_command((MMonCommand*)m);
       break;
 
+    case MSG_MON_OBSERVE:
+      register_observer((MMonObserve *)m);
+      break;
+
 
       // OSDs
     case CEPH_MSG_OSD_GETMAP:
index 8ce7559e3e8229ff59ee9b9f624762bcf322e10e..f8db47dc63ca13f298699fbd9c465a1fffbe932a 100644 (file)
@@ -38,6 +38,7 @@ class MonitorStore;
 class PaxosService;
 
 class MMonGetMap;
+class MMonObserve;
 
 class Monitor : public Dispatcher {
 public:
@@ -123,6 +124,7 @@ public:
 
   void reply_command(MMonCommand *m, int rc, const string &rs);
   void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata);
+  void register_observer(MMonObserve *m);
 
   void inject_args(const entity_inst_t& inst, string& args) {
     vector<string> a(1);
index 591d7e31f916373457dec9165109e0e881db5a96..ff1b78d825b1948e2adc909b2b51a57f5b1afe85 100644 (file)
@@ -17,6 +17,7 @@
 #include "MonitorStore.h"
 
 #include "messages/MMonPaxos.h"
+#include "messages/MMonObserveNotify.h"
 
 #include "config.h"
 
@@ -36,6 +37,14 @@ static ostream& _prefix(Monitor *mon, int whoami, const char *machine_name, int
                << ") ";
 }
 
+void PaxosObserver::notify(bufferlist& bl, version_t ver, Monitor *mon, bool is_incremental)
+{
+  MMonObserveNotify *msg = new MMonObserveNotify(paxos->machine_id, bl, ver, is_incremental);
+  last_version = ver;
+   
+  mon->messenger->send_message(msg, inst);
+}
+
 
 void Paxos::init()
 {
@@ -768,7 +777,7 @@ void Paxos::dispatch(Message *m)
   case MSG_MON_PAXOS:
     {
       MMonPaxos *pm = (MMonPaxos*)m;
-      
+
       // NOTE: these ops are defined in messages/MMonPaxos.h
       switch (pm->op) {
        // learner
@@ -802,10 +811,80 @@ void Paxos::dispatch(Message *m)
   default:
     assert(0);
   }
+
+  if (is_readable()) {
+    update_observers();
+  }
 }
 
+void Paxos::register_observer(PaxosObserver *observer)
+{
+  utime_t timeout=g_clock.now();
+  map<entity_inst_t, PaxosObserver *>::iterator iter;
+
+  observers_lock.Lock();
+
+  iter = observers.find(observer->inst);
+
+  if (iter != observers.end()) {
+    PaxosObserver *o = iter->second;
+
+    observers.erase(iter);
+
+    delete o;
+  }
 
+  timeout += g_conf.paxos_observer_timeout;
+  observer->set_timeout(timeout);
 
+  observers[observer->inst] = observer;
+
+  observers_lock.Unlock();
+}
+
+void Paxos::update_observers()
+{
+  bufferlist bl;
+  version_t ver;
+  map<entity_inst_t, PaxosObserver *>::iterator iter, last;
+
+  observers_lock.Lock();
+
+  last = observers.end();
+  iter = observers.begin();
+  while (iter != last) {
+    int done = 0;
+    PaxosObserver *observer = iter->second;
+    while (g_clock.now() > observer->timeout) {
+       delete observer;
+       observers.erase(iter++);
+       if (iter == last) {
+               observers_lock.Unlock();
+               return;
+       }
+       observer = iter->second;        
+    }
+
+    if (observer->get_ver() == 0) {
+        ver = get_latest(bl);
+        if (ver) {
+          observer->notify(bl, ver, mon, false);
+          done=1;
+        }
+      }
+
+      if (!done) {
+        for (ver = observer->get_ver() + 1; ver <=last_committed; ver++) {
+          if (read(ver, bl)) {
+             observer->notify(bl, ver, mon, true);
+          }
+        }
+      }
+
+      ++iter;
+    }
+    observers_lock.Unlock();
+}
 
 // -----------------
 // service interface
index 7507318ab4d4117e2c38500be4c95415415533a8..c00aa9944a350311a078f8d602868e69983514da 100644 (file)
@@ -54,6 +54,7 @@ e 12v
 #include "mon_types.h"
 #include "include/buffer.h"
 #include "msg/Message.h"
+#include "msg/msg_types.h"
 
 #include "include/Context.h"
 
@@ -61,6 +62,23 @@ e 12v
 
 class Monitor;
 class MMonPaxos;
+class Paxos;
+
+class PaxosObserver {
+  friend class Paxos;
+
+  Paxos *paxos;
+  int machine_id;
+  entity_inst_t inst;
+  version_t last_version;
+  utime_t timeout;
+public:
+  PaxosObserver(Paxos *px, entity_inst_t& ei, version_t v) : paxos(px), inst(ei), last_version(v) { }
+  void notify(bufferlist& bl, version_t ver, Monitor *mon, bool is_incremental);
+  version_t get_ver() { return last_version; }
+  int get_machine_id() { return machine_id; }
+  void set_timeout(utime_t to) { timeout = to; }
+};
 
 
 // i am one state machine.
@@ -73,6 +91,7 @@ class Paxos {
   const char *machine_name;
 
   friend class PaxosService;
+  friend class PaxosObserver;
 
   // LEADER+PEON
 
@@ -137,6 +156,8 @@ private:
 
   list<Context*> waiting_for_writeable;
   list<Context*> waiting_for_commit;
+  map<entity_inst_t, PaxosObserver *> observers;
+  Mutex observers_lock;
 
   class C_CollectTimeout : public Context {
     Paxos *paxos;
@@ -218,7 +239,8 @@ public:
                   lease_renew_event(0),
                   lease_ack_timeout_event(0),
                   lease_timeout_event(0),
-                  accept_timeout_event(0) { }
+                  accept_timeout_event(0),
+                   observers_lock("observers_lock") { }
 
   const char *get_machine_name() const {
     return machine_name;
@@ -271,6 +293,8 @@ public:
   void stash_latest(version_t v, bufferlist& bl);
   version_t get_latest(bufferlist& bl);
 
+  void register_observer(PaxosObserver *observer);
+  void update_observers();
 };
 
 
index 3e322a1b40bda0c20db694dbe1470a5bf5867488..62cac0de1c1d617fa28a6f16296f6cb0ecee1fe6 100644 (file)
@@ -19,6 +19,8 @@ using namespace std;
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
 #include "messages/MMonPaxos.h"
+#include "messages/MMonObserve.h"
+#include "messages/MMonObserveNotify.h"
 
 #include "messages/MMonElection.h"
 #include "messages/MLog.h"
@@ -165,6 +167,12 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
     m = new MMonElection;
     break;
 
+  case MSG_MON_OBSERVE:
+    m = new MMonObserve;
+    break;
+  case MSG_MON_OBSERVE_NOTIFY:
+    m = new MMonObserveNotify;
+    break;
   case MSG_LOG:
     m = new MLog;
     break;
index bf6eca6d497b56e90be25042f0a5725e45f0e50f..a661569cddd7e759c480cdc2a93da0acac16792e 100644 (file)
@@ -25,8 +25,9 @@
 /* monitor <-> mon admin tool */
 #define MSG_MON_COMMAND            50
 #define MSG_MON_COMMAND_ACK        51
-
-#define MSG_LOG              52
+#define MSG_LOG                    52
+#define MSG_MON_OBSERVE            53
+#define MSG_MON_OBSERVE_NOTIFY     54
 
 // osd internal
 #define MSG_OSD_PING         70