]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* fixed mdsmon startup race
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 14 Jun 2007 21:39:59 +0000 (21:39 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 14 Jun 2007 21:39:59 +0000 (21:39 +0000)
* pulled out generic PaxosService stuff (wasn't that much it turns out)
* some paxos bug fixes

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1420 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/Makefile
trunk/ceph/mon/ClientMonitor.cc
trunk/ceph/mon/ClientMonitor.h
trunk/ceph/mon/MDSMonitor.cc
trunk/ceph/mon/MDSMonitor.h
trunk/ceph/mon/Monitor.cc
trunk/ceph/mon/Monitor.h
trunk/ceph/mon/Paxos.cc
trunk/ceph/mon/Paxos.h
trunk/ceph/mon/PaxosService.h [new file with mode: 0644]

index 0402c724895ab51ffa458f49427bebe57f12a9d4..96c7a57d55227f0ea03e0cf45802aac4e8818368 100644 (file)
@@ -72,6 +72,7 @@ OSDC_OBJS= \
 MON_OBJS= \
        mon/Monitor.o\
        mon/Paxos.o\
+       mon/PaxosService.o\
        mon/OSDMonitor.o\
        mon/MDSMonitor.o\
        mon/ClientMonitor.o\
index c6fe6f5da2b68f989557856b49bb937c00cb3907..51686e8f2bed4cb8a857defe8ced3d946b9dca9e 100644 (file)
 
 
 
-
-void ClientMonitor::dispatch(Message *m)
-{
-  switch (m->get_type()) {
-
-  case MSG_CLIENT_MOUNT:
-  case MSG_CLIENT_UNMOUNT:
-    handle_query(m);
-    break;
-       
-  default:
-    assert(0);
-  }  
-}
-
-
-void ClientMonitor::handle_query(Message *m)
-{
-  dout(10) << "handle_query " << *m << " from " << m->get_source_inst() << endl;
-  
-  // make sure our map is readable and up to date
-  if (!paxos->is_readable() ||
-      !update_from_paxos()) {
-    dout(10) << " waiting for paxos -> readable" << endl;
-    paxos->wait_for_readable(new C_RetryMessage(this, m));
-    return;
-  }
-
-  // preprocess
-  if (preprocess_update(m)) 
-    return;  // easy!
-
-  // leader?
-  if (!mon->is_leader()) {
-    // fw to leader
-    dout(10) << " fw to leader mon" << mon->get_leader() << endl;
-    mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
-    return;
-  }
-  
-  // writeable?
-  if (!paxos->is_writeable()) {
-    dout(10) << " waiting for paxos -> writeable" << endl;
-    paxos->wait_for_writeable(new C_RetryMessage(this, m));
-    return;
-  }
-
-  prepare_update(m);
-
-  // do it now (for now!) ***
-  propose_pending();
-}
-
 bool ClientMonitor::update_from_paxos()
 {
   assert(paxos->is_active());
@@ -307,9 +254,3 @@ void ClientMonitor::create_initial()
 }
 
 
-void ClientMonitor::election_finished()
-{
-  if (mon->is_leader() && g_conf.mkfs)
-    create_initial();
-}
index f47cf908ae471fa621b7f60ae0ca2e8c1ab5389f..ebb1f05e9c8c61ff21153fdf7f1b638bfddefb71 100644 (file)
@@ -24,10 +24,12 @@ using namespace std;
 
 #include "mds/MDSMap.h"
 
+#include "PaxosService.h"
+
 class Monitor;
 class Paxos;
 
-class ClientMonitor : public Dispatcher {
+class ClientMonitor : public PaxosService {
 public:
 
   struct Incremental {
@@ -36,7 +38,7 @@ public:
     map<int32_t, entity_addr_t> mount;
     set<int32_t> unmount;
     
-    Incremental(int nc=0) : next_client(nc) {}
+    Incremental() : version(0), next_client() {}
 
     bool is_empty() { return mount.empty() && unmount.empty(); }
     void add_mount(uint32_t client, entity_addr_t addr) {
@@ -71,7 +73,7 @@ public:
     map<uint32_t,entity_addr_t> client_addr;
     hash_map<entity_addr_t,uint32_t> addr_client;
 
-    Map() : next_client(0) {}
+    Map() : version(0), next_client(0) {}
 
     void reverse() {
       addr_client.clear();
@@ -123,16 +125,6 @@ public:
     }
   };
 
-  class C_RetryMessage : public Context {
-    ClientMonitor *cmon;
-    Message *m;
-  public:
-    C_RetryMessage(ClientMonitor *cm, Message *m_) : cmon(cm), m(m_) {}
-    void finish(int r) {
-      cmon->dispatch(m);
-    }
-  };
-
   class C_Mounted : public Context {
     ClientMonitor *cmon;
     int client;
@@ -144,7 +136,7 @@ public:
       if (r >= 0)
        cmon->_mounted(client, m);
       else
-       cmon->handle_query(m);
+       cmon->dispatch(m);
     }
   };
 
@@ -158,7 +150,7 @@ public:
       if (r >= 0)
        cmon->_unmounted(m);
       else
-       cmon->handle_query(m);
+       cmon->dispatch(m);
     }
   };
 
@@ -173,9 +165,6 @@ public:
   };
 
 private:
-  Monitor *mon;
-  Paxos *paxos;
-
   Map client_map;
   list<Message*> waiting_for_active;
 
@@ -183,8 +172,6 @@ private:
   Incremental pending_inc;
   list<Context*> pending_commit;   // contributers to pending_inc
 
-  //void bcast_latest_mds();
-
   void create_initial();
   bool update_from_paxos();
   void prepare_pending();  // prepare a new pending
@@ -200,12 +187,10 @@ private:
 
   
  public:
-  ClientMonitor(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { }
+  ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
   
-  void dispatch(Message *m);
-  void tick();  // check state, take actions
+  //void tick();  // check state, take actions
 
-  void election_finished();
 };
 
 #endif
index c9a680d36a244191559f40ba70b81c506789abda..8644f769eaaed938125163f143a8973fb45f755e 100644 (file)
 
 /********* MDS map **************/
 
+  class C_RetryMessage : public Context {
+    Dispatcher *svc;
+    Message *m;
+  public:
+    C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+    void finish(int r) {
+      svc->dispatch(m);
+    }
+  };
+
 void MDSMonitor::dispatch(Message *m)
 {
+  if (mon->is_peon()) {
+    dout(1) << "peon, fw to leader" << endl;
+    mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
+    return;
+  }
+  if (mon->is_starting()) {
+    dout(1) << "starting, waiting" << endl;
+    waiting_for_active.push_back(new C_RetryMessage(this, m));
+    return;
+  }
+
   switch (m->get_type()) {
 
   case MSG_MDS_BEACON:
@@ -68,6 +89,8 @@ void MDSMonitor::election_finished()
       load_map();
     }
   }
+
+  finish_contexts(waiting_for_active);
 }
 
 
index 658ba50855b29d75083d6fa093ddbcadc0940f06..5a9dcb65c8484078a96a95c4a3b7c98f266f2462 100644 (file)
@@ -38,6 +38,8 @@ class MDSMonitor : public Dispatcher {
  private:
   bufferlist encoded_map;
 
+  list<Context*> waiting_for_active;
+
   //map<epoch_t, bufferlist> inc_maps;
   //MDSMap::Incremental pending_inc;
   
index 497f3caf799acfc636daa414571218b5411fceb8..e92756ba084f0b67fb05a3ac648aa85a4c8ca297 100644 (file)
    mdsmon = new MDSMonitor(this, messenger, lock);
    clientmon = new ClientMonitor(this, &paxos_clientmap);
 
+   // init paxos
+   paxos_test.init();
+   paxos_osdmap.init();
+   paxos_mdsmap.init();
+   paxos_clientmap.init();
+
    // i'm ready!
    messenger->set_dispatcher(this);
 
index 98ce5857d695ada2d8189f17062a195e95bc86e1..015e5797ca6dfe2b0342f0d9c3caf4d6d5509a07 100644 (file)
@@ -33,7 +33,7 @@ class ClientMonitor;
 
 
 class Monitor : public Dispatcher {
-protected:
+public:
   // me
   int whoami;
   Messenger *messenger;
@@ -49,9 +49,9 @@ protected:
   friend class C_Mon_Tick;
 
   // -- local storage --
+public:
   MonitorStore *store;
 
-
   // -- monitor state --
 private:
   const static int STATE_STARTING = 0; // electing
index 60e161ce00fff1d7923bf2d5394d9d1320b35724..f83bdc57b21ba7629f9e1473c248e97f6e5c031f 100644 (file)
 #define  dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
 
 
+void Paxos::init()
+{
+  // load paxos variables from stable storage
+  last_pn = mon->store->get_int(machine_name, "last_pn");
+  accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
+  last_committed = mon->store->get_int(machine_name, "last_committed");
+
+  dout(10) << "init" << endl;
+}
+
 // ---------------------------------
 
 // PHASE 1
@@ -90,6 +100,7 @@ void Paxos::handle_collect(MMonPaxos *collect)
     accepted_pn = collect->pn;
     accepted_pn_from = collect->pn_from;
     dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl;
+    mon->store->put_int(accepted_pn, machine_name, "accepted_pn");
   } else {
     // don't accept!
     dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from 
@@ -152,7 +163,7 @@ void Paxos::handle_last(MMonPaxos *last)
               << last->values[v].length() << " bytes" << endl;
     }
     last_committed = last->last_committed;
-    mon->store->put_int(last_committed, machine_name, "last_commtted");
+    mon->store->put_int(last_committed, machine_name, "last_committed");
     dout(10) << "last_committed now " << last_committed << endl;
   }
       
@@ -507,23 +518,20 @@ void Paxos::lease_ack_timeout()
  */
 version_t Paxos::get_new_proposal_number(version_t gt)
 {
-  // read last
-  version_t last = mon->store->get_int("last_paxos_proposal");
-  if (last < gt) 
-    last = gt;
+  if (last_pn < gt) 
+    last_pn = gt;
   
-  // update
-  last /= 100;
-  last++;
-
-  // make it unique among all monitors.
-  version_t pn = last*100 + (version_t)whoami;
+  // update. make it unique among all monitors.
+  last_pn /= 100;
+  last_pn++;
+  last_pn *= 100;
+  last_pn += (version_t)whoami;
   
   // write
-  mon->store->put_int(pn, "last_paxos_proposal");
+  mon->store->put_int(last_pn, machine_name, "last_pn");
 
-  dout(10) << "get_new_proposal_number = " << pn << endl;
-  return pn;
+  dout(10) << "get_new_proposal_number = " << last_pn << endl;
+  return last_pn;
 }
 
 
index 08da005139b094b01d72a262541b235d7f49b791..6699cc5ad33ad6c32f8e8eca7025877797f51f32 100644 (file)
@@ -71,6 +71,8 @@ class Paxos {
   int machine_id;
   const char *machine_name;
 
+  friend class PaxosService;
+
   // LEADER+PEON
 
   // -- generic state --
@@ -97,6 +99,7 @@ public:
 
 private:
   // recovery (phase 1)
+  version_t last_pn;
   version_t last_committed;
   version_t accepted_pn;
   version_t accepted_pn_from;
@@ -179,12 +182,15 @@ public:
        int mid) : mon(m), whoami(w), 
                   machine_id(mid), 
                   machine_name(get_paxos_name(mid)),
+                  state(STATE_RECOVERING),
                   lease_renew_event(0),
                   lease_ack_timeout_event(0),
                   accept_timeout_event(0) { }
 
   void dispatch(Message *m);
 
+  void init();
+
   void leader_init();
   void peon_init();
 
diff --git a/trunk/ceph/mon/PaxosService.h b/trunk/ceph/mon/PaxosService.h
new file mode 100644 (file)
index 0000000..59bcc77
--- /dev/null
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __PAXOSSERVICE_H
+#define __PAXOSSERVICE_H
+
+#include "msg/Dispatcher.h"
+#include "include/Context.h"
+
+class Monitor;
+class Paxos;
+
+class PaxosService : public Dispatcher {
+protected:
+  Monitor *mon;
+  Paxos *paxos;
+
+  
+  class C_RetryMessage : public Context {
+    Dispatcher *svc;
+    Message *m;
+  public:
+    C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+    void finish(int r) {
+      svc->dispatch(m);
+    }
+  };
+
+public:
+  PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { }
+  
+  // i implement
+  void dispatch(Message *m);
+  void election_finished();
+
+  // you implement
+  virtual void create_initial() = 0;
+  virtual bool update_from_paxos() = 0;
+  virtual void prepare_pending() = 0;
+  virtual void propose_pending() = 0;
+
+  virtual bool preprocess_update(Message *m) = 0;  // true if processed.
+  virtual void prepare_update(Message *m)= 0;
+
+  virtual void tick() {};  // check state, take actions
+
+
+};
+
+#endif
+