From 159536b8a0df570a7e97f09cfa26287a2696697f Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 14 Jun 2007 21:39:59 +0000 Subject: [PATCH] * fixed mdsmon startup race * pulled out generic PaxosService stuff (wasn't that much it turns out) * some paxos bug fixes git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1420 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 1 + trunk/ceph/mon/ClientMonitor.cc | 59 ------------------------------- trunk/ceph/mon/ClientMonitor.h | 33 +++++------------- trunk/ceph/mon/MDSMonitor.cc | 23 ++++++++++++ trunk/ceph/mon/MDSMonitor.h | 2 ++ trunk/ceph/mon/Monitor.cc | 6 ++++ trunk/ceph/mon/Monitor.h | 4 +-- trunk/ceph/mon/Paxos.cc | 36 +++++++++++-------- trunk/ceph/mon/Paxos.h | 6 ++++ trunk/ceph/mon/PaxosService.h | 62 +++++++++++++++++++++++++++++++++ 10 files changed, 133 insertions(+), 99 deletions(-) create mode 100644 trunk/ceph/mon/PaxosService.h diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index 0402c724895ab..96c7a57d55227 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -72,6 +72,7 @@ OSDC_OBJS= \ MON_OBJS= \ mon/Monitor.o\ mon/Paxos.o\ + mon/PaxosService.o\ mon/OSDMonitor.o\ mon/MDSMonitor.o\ mon/ClientMonitor.o\ diff --git a/trunk/ceph/mon/ClientMonitor.cc b/trunk/ceph/mon/ClientMonitor.cc index c6fe6f5da2b68..51686e8f2bed4 100644 --- a/trunk/ceph/mon/ClientMonitor.cc +++ b/trunk/ceph/mon/ClientMonitor.cc @@ -31,59 +31,6 @@ - -void ClientMonitor::dispatch(Message *m) -{ - switch (m->get_type()) { - - case MSG_CLIENT_MOUNT: - case MSG_CLIENT_UNMOUNT: - handle_query(m); - break; - - default: - assert(0); - } -} - - -void ClientMonitor::handle_query(Message *m) -{ - dout(10) << "handle_query " << *m << " from " << m->get_source_inst() << endl; - - // make sure our map is readable and up to date - if (!paxos->is_readable() || - !update_from_paxos()) { - dout(10) << " waiting for paxos -> readable" << endl; - paxos->wait_for_readable(new C_RetryMessage(this, m)); - return; - } - - // preprocess - if (preprocess_update(m)) - return; // easy! - - // leader? - if (!mon->is_leader()) { - // fw to leader - dout(10) << " fw to leader mon" << mon->get_leader() << endl; - mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); - return; - } - - // writeable? - if (!paxos->is_writeable()) { - dout(10) << " waiting for paxos -> writeable" << endl; - paxos->wait_for_writeable(new C_RetryMessage(this, m)); - return; - } - - prepare_update(m); - - // do it now (for now!) *** - propose_pending(); -} - bool ClientMonitor::update_from_paxos() { assert(paxos->is_active()); @@ -307,9 +254,3 @@ void ClientMonitor::create_initial() } -void ClientMonitor::election_finished() -{ - - if (mon->is_leader() && g_conf.mkfs) - create_initial(); -} diff --git a/trunk/ceph/mon/ClientMonitor.h b/trunk/ceph/mon/ClientMonitor.h index f47cf908ae471..ebb1f05e9c8c6 100644 --- a/trunk/ceph/mon/ClientMonitor.h +++ b/trunk/ceph/mon/ClientMonitor.h @@ -24,10 +24,12 @@ using namespace std; #include "mds/MDSMap.h" +#include "PaxosService.h" + class Monitor; class Paxos; -class ClientMonitor : public Dispatcher { +class ClientMonitor : public PaxosService { public: struct Incremental { @@ -36,7 +38,7 @@ public: map mount; set unmount; - Incremental(int nc=0) : next_client(nc) {} + Incremental() : version(0), next_client() {} bool is_empty() { return mount.empty() && unmount.empty(); } void add_mount(uint32_t client, entity_addr_t addr) { @@ -71,7 +73,7 @@ public: map client_addr; hash_map addr_client; - Map() : next_client(0) {} + Map() : version(0), next_client(0) {} void reverse() { addr_client.clear(); @@ -123,16 +125,6 @@ public: } }; - class C_RetryMessage : public Context { - ClientMonitor *cmon; - Message *m; - public: - C_RetryMessage(ClientMonitor *cm, Message *m_) : cmon(cm), m(m_) {} - void finish(int r) { - cmon->dispatch(m); - } - }; - class C_Mounted : public Context { ClientMonitor *cmon; int client; @@ -144,7 +136,7 @@ public: if (r >= 0) cmon->_mounted(client, m); else - cmon->handle_query(m); + cmon->dispatch(m); } }; @@ -158,7 +150,7 @@ public: if (r >= 0) cmon->_unmounted(m); else - cmon->handle_query(m); + cmon->dispatch(m); } }; @@ -173,9 +165,6 @@ public: }; private: - Monitor *mon; - Paxos *paxos; - Map client_map; list waiting_for_active; @@ -183,8 +172,6 @@ private: Incremental pending_inc; list pending_commit; // contributers to pending_inc - //void bcast_latest_mds(); - void create_initial(); bool update_from_paxos(); void prepare_pending(); // prepare a new pending @@ -200,12 +187,10 @@ private: public: - ClientMonitor(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { } + ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { } - void dispatch(Message *m); - void tick(); // check state, take actions + //void tick(); // check state, take actions - void election_finished(); }; #endif diff --git a/trunk/ceph/mon/MDSMonitor.cc b/trunk/ceph/mon/MDSMonitor.cc index c9a680d36a244..8644f769eaaed 100644 --- a/trunk/ceph/mon/MDSMonitor.cc +++ b/trunk/ceph/mon/MDSMonitor.cc @@ -36,8 +36,29 @@ /********* MDS map **************/ + class C_RetryMessage : public Context { + Dispatcher *svc; + Message *m; + public: + C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} + void finish(int r) { + svc->dispatch(m); + } + }; + void MDSMonitor::dispatch(Message *m) { + if (mon->is_peon()) { + dout(1) << "peon, fw to leader" << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return; + } + if (mon->is_starting()) { + dout(1) << "starting, waiting" << endl; + waiting_for_active.push_back(new C_RetryMessage(this, m)); + return; + } + switch (m->get_type()) { case MSG_MDS_BEACON: @@ -68,6 +89,8 @@ void MDSMonitor::election_finished() load_map(); } } + + finish_contexts(waiting_for_active); } diff --git a/trunk/ceph/mon/MDSMonitor.h b/trunk/ceph/mon/MDSMonitor.h index 658ba50855b29..5a9dcb65c8484 100644 --- a/trunk/ceph/mon/MDSMonitor.h +++ b/trunk/ceph/mon/MDSMonitor.h @@ -38,6 +38,8 @@ class MDSMonitor : public Dispatcher { private: bufferlist encoded_map; + list waiting_for_active; + //map inc_maps; //MDSMap::Incremental pending_inc; diff --git a/trunk/ceph/mon/Monitor.cc b/trunk/ceph/mon/Monitor.cc index 497f3caf799ac..e92756ba084f0 100644 --- a/trunk/ceph/mon/Monitor.cc +++ b/trunk/ceph/mon/Monitor.cc @@ -66,6 +66,12 @@ mdsmon = new MDSMonitor(this, messenger, lock); clientmon = new ClientMonitor(this, &paxos_clientmap); + // init paxos + paxos_test.init(); + paxos_osdmap.init(); + paxos_mdsmap.init(); + paxos_clientmap.init(); + // i'm ready! messenger->set_dispatcher(this); diff --git a/trunk/ceph/mon/Monitor.h b/trunk/ceph/mon/Monitor.h index 98ce5857d695a..015e5797ca6df 100644 --- a/trunk/ceph/mon/Monitor.h +++ b/trunk/ceph/mon/Monitor.h @@ -33,7 +33,7 @@ class ClientMonitor; class Monitor : public Dispatcher { -protected: +public: // me int whoami; Messenger *messenger; @@ -49,9 +49,9 @@ protected: friend class C_Mon_Tick; // -- local storage -- +public: MonitorStore *store; - // -- monitor state -- private: const static int STATE_STARTING = 0; // electing diff --git a/trunk/ceph/mon/Paxos.cc b/trunk/ceph/mon/Paxos.cc index 60e161ce00fff..f83bdc57b21ba 100644 --- a/trunk/ceph/mon/Paxos.cc +++ b/trunk/ceph/mon/Paxos.cc @@ -24,6 +24,16 @@ #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +void Paxos::init() +{ + // load paxos variables from stable storage + last_pn = mon->store->get_int(machine_name, "last_pn"); + accepted_pn = mon->store->get_int(machine_name, "accepted_pn"); + last_committed = mon->store->get_int(machine_name, "last_committed"); + + dout(10) << "init" << endl; +} + // --------------------------------- // PHASE 1 @@ -90,6 +100,7 @@ void Paxos::handle_collect(MMonPaxos *collect) accepted_pn = collect->pn; accepted_pn_from = collect->pn_from; dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl; + mon->store->put_int(accepted_pn, machine_name, "accepted_pn"); } else { // don't accept! dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from @@ -152,7 +163,7 @@ void Paxos::handle_last(MMonPaxos *last) << last->values[v].length() << " bytes" << endl; } last_committed = last->last_committed; - mon->store->put_int(last_committed, machine_name, "last_commtted"); + mon->store->put_int(last_committed, machine_name, "last_committed"); dout(10) << "last_committed now " << last_committed << endl; } @@ -507,23 +518,20 @@ void Paxos::lease_ack_timeout() */ version_t Paxos::get_new_proposal_number(version_t gt) { - // read last - version_t last = mon->store->get_int("last_paxos_proposal"); - if (last < gt) - last = gt; + if (last_pn < gt) + last_pn = gt; - // update - last /= 100; - last++; - - // make it unique among all monitors. - version_t pn = last*100 + (version_t)whoami; + // update. make it unique among all monitors. + last_pn /= 100; + last_pn++; + last_pn *= 100; + last_pn += (version_t)whoami; // write - mon->store->put_int(pn, "last_paxos_proposal"); + mon->store->put_int(last_pn, machine_name, "last_pn"); - dout(10) << "get_new_proposal_number = " << pn << endl; - return pn; + dout(10) << "get_new_proposal_number = " << last_pn << endl; + return last_pn; } diff --git a/trunk/ceph/mon/Paxos.h b/trunk/ceph/mon/Paxos.h index 08da005139b09..6699cc5ad33ad 100644 --- a/trunk/ceph/mon/Paxos.h +++ b/trunk/ceph/mon/Paxos.h @@ -71,6 +71,8 @@ class Paxos { int machine_id; const char *machine_name; + friend class PaxosService; + // LEADER+PEON // -- generic state -- @@ -97,6 +99,7 @@ public: private: // recovery (phase 1) + version_t last_pn; version_t last_committed; version_t accepted_pn; version_t accepted_pn_from; @@ -179,12 +182,15 @@ public: int mid) : mon(m), whoami(w), machine_id(mid), machine_name(get_paxos_name(mid)), + state(STATE_RECOVERING), lease_renew_event(0), lease_ack_timeout_event(0), accept_timeout_event(0) { } void dispatch(Message *m); + void init(); + void leader_init(); void peon_init(); diff --git a/trunk/ceph/mon/PaxosService.h b/trunk/ceph/mon/PaxosService.h new file mode 100644 index 0000000000000..59bcc770d108f --- /dev/null +++ b/trunk/ceph/mon/PaxosService.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __PAXOSSERVICE_H +#define __PAXOSSERVICE_H + +#include "msg/Dispatcher.h" +#include "include/Context.h" + +class Monitor; +class Paxos; + +class PaxosService : public Dispatcher { +protected: + Monitor *mon; + Paxos *paxos; + + + class C_RetryMessage : public Context { + Dispatcher *svc; + Message *m; + public: + C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {} + void finish(int r) { + svc->dispatch(m); + } + }; + +public: + PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { } + + // i implement + void dispatch(Message *m); + void election_finished(); + + // you implement + virtual void create_initial() = 0; + virtual bool update_from_paxos() = 0; + virtual void prepare_pending() = 0; + virtual void propose_pending() = 0; + + virtual bool preprocess_update(Message *m) = 0; // true if processed. + virtual void prepare_update(Message *m)= 0; + + virtual void tick() {}; // check state, take actions + + +}; + +#endif + -- 2.39.5