MON_OBJS= \
mon/Monitor.o\
mon/Paxos.o\
+ mon/PaxosService.o\
mon/OSDMonitor.o\
mon/MDSMonitor.o\
mon/ClientMonitor.o\
-
-void ClientMonitor::dispatch(Message *m)
-{
- switch (m->get_type()) {
-
- case MSG_CLIENT_MOUNT:
- case MSG_CLIENT_UNMOUNT:
- handle_query(m);
- break;
-
- default:
- assert(0);
- }
-}
-
-
-void ClientMonitor::handle_query(Message *m)
-{
- dout(10) << "handle_query " << *m << " from " << m->get_source_inst() << endl;
-
- // make sure our map is readable and up to date
- if (!paxos->is_readable() ||
- !update_from_paxos()) {
- dout(10) << " waiting for paxos -> readable" << endl;
- paxos->wait_for_readable(new C_RetryMessage(this, m));
- return;
- }
-
- // preprocess
- if (preprocess_update(m))
- return; // easy!
-
- // leader?
- if (!mon->is_leader()) {
- // fw to leader
- dout(10) << " fw to leader mon" << mon->get_leader() << endl;
- mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
- return;
- }
-
- // writeable?
- if (!paxos->is_writeable()) {
- dout(10) << " waiting for paxos -> writeable" << endl;
- paxos->wait_for_writeable(new C_RetryMessage(this, m));
- return;
- }
-
- prepare_update(m);
-
- // do it now (for now!) ***
- propose_pending();
-}
-
bool ClientMonitor::update_from_paxos()
{
assert(paxos->is_active());
}
-void ClientMonitor::election_finished()
-{
-
- if (mon->is_leader() && g_conf.mkfs)
- create_initial();
-}
#include "mds/MDSMap.h"
+#include "PaxosService.h"
+
class Monitor;
class Paxos;
-class ClientMonitor : public Dispatcher {
+class ClientMonitor : public PaxosService {
public:
struct Incremental {
map<int32_t, entity_addr_t> mount;
set<int32_t> unmount;
- Incremental(int nc=0) : next_client(nc) {}
+ Incremental() : version(0), next_client() {}
bool is_empty() { return mount.empty() && unmount.empty(); }
void add_mount(uint32_t client, entity_addr_t addr) {
map<uint32_t,entity_addr_t> client_addr;
hash_map<entity_addr_t,uint32_t> addr_client;
- Map() : next_client(0) {}
+ Map() : version(0), next_client(0) {}
void reverse() {
addr_client.clear();
}
};
- class C_RetryMessage : public Context {
- ClientMonitor *cmon;
- Message *m;
- public:
- C_RetryMessage(ClientMonitor *cm, Message *m_) : cmon(cm), m(m_) {}
- void finish(int r) {
- cmon->dispatch(m);
- }
- };
-
class C_Mounted : public Context {
ClientMonitor *cmon;
int client;
if (r >= 0)
cmon->_mounted(client, m);
else
- cmon->handle_query(m);
+ cmon->dispatch(m);
}
};
if (r >= 0)
cmon->_unmounted(m);
else
- cmon->handle_query(m);
+ cmon->dispatch(m);
}
};
};
private:
- Monitor *mon;
- Paxos *paxos;
-
Map client_map;
list<Message*> waiting_for_active;
Incremental pending_inc;
list<Context*> pending_commit; // contributers to pending_inc
- //void bcast_latest_mds();
-
void create_initial();
bool update_from_paxos();
void prepare_pending(); // prepare a new pending
public:
- ClientMonitor(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { }
+ ClientMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
- void dispatch(Message *m);
- void tick(); // check state, take actions
+ //void tick(); // check state, take actions
- void election_finished();
};
#endif
/********* MDS map **************/
+ class C_RetryMessage : public Context {
+ Dispatcher *svc;
+ Message *m;
+ public:
+ C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+ void finish(int r) {
+ svc->dispatch(m);
+ }
+ };
+
void MDSMonitor::dispatch(Message *m)
{
+ if (mon->is_peon()) {
+ dout(1) << "peon, fw to leader" << endl;
+ mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader()));
+ return;
+ }
+ if (mon->is_starting()) {
+ dout(1) << "starting, waiting" << endl;
+ waiting_for_active.push_back(new C_RetryMessage(this, m));
+ return;
+ }
+
switch (m->get_type()) {
case MSG_MDS_BEACON:
load_map();
}
}
+
+ finish_contexts(waiting_for_active);
}
private:
bufferlist encoded_map;
+ list<Context*> waiting_for_active;
+
//map<epoch_t, bufferlist> inc_maps;
//MDSMap::Incremental pending_inc;
mdsmon = new MDSMonitor(this, messenger, lock);
clientmon = new ClientMonitor(this, &paxos_clientmap);
+ // init paxos
+ paxos_test.init();
+ paxos_osdmap.init();
+ paxos_mdsmap.init();
+ paxos_clientmap.init();
+
// i'm ready!
messenger->set_dispatcher(this);
class Monitor : public Dispatcher {
-protected:
+public:
// me
int whoami;
Messenger *messenger;
friend class C_Mon_Tick;
// -- local storage --
+public:
MonitorStore *store;
-
// -- monitor state --
private:
const static int STATE_STARTING = 0; // electing
#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") "
+void Paxos::init()
+{
+ // load paxos variables from stable storage
+ last_pn = mon->store->get_int(machine_name, "last_pn");
+ accepted_pn = mon->store->get_int(machine_name, "accepted_pn");
+ last_committed = mon->store->get_int(machine_name, "last_committed");
+
+ dout(10) << "init" << endl;
+}
+
// ---------------------------------
// PHASE 1
accepted_pn = collect->pn;
accepted_pn_from = collect->pn_from;
dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << endl;
+ mon->store->put_int(accepted_pn, machine_name, "accepted_pn");
} else {
// don't accept!
dout(10) << "NOT accepting pn " << collect->pn << " from " << collect->pn_from
<< last->values[v].length() << " bytes" << endl;
}
last_committed = last->last_committed;
- mon->store->put_int(last_committed, machine_name, "last_commtted");
+ mon->store->put_int(last_committed, machine_name, "last_committed");
dout(10) << "last_committed now " << last_committed << endl;
}
*/
version_t Paxos::get_new_proposal_number(version_t gt)
{
- // read last
- version_t last = mon->store->get_int("last_paxos_proposal");
- if (last < gt)
- last = gt;
+ if (last_pn < gt)
+ last_pn = gt;
- // update
- last /= 100;
- last++;
-
- // make it unique among all monitors.
- version_t pn = last*100 + (version_t)whoami;
+ // update. make it unique among all monitors.
+ last_pn /= 100;
+ last_pn++;
+ last_pn *= 100;
+ last_pn += (version_t)whoami;
// write
- mon->store->put_int(pn, "last_paxos_proposal");
+ mon->store->put_int(last_pn, machine_name, "last_pn");
- dout(10) << "get_new_proposal_number = " << pn << endl;
- return pn;
+ dout(10) << "get_new_proposal_number = " << last_pn << endl;
+ return last_pn;
}
int machine_id;
const char *machine_name;
+ friend class PaxosService;
+
// LEADER+PEON
// -- generic state --
private:
// recovery (phase 1)
+ version_t last_pn;
version_t last_committed;
version_t accepted_pn;
version_t accepted_pn_from;
int mid) : mon(m), whoami(w),
machine_id(mid),
machine_name(get_paxos_name(mid)),
+ state(STATE_RECOVERING),
lease_renew_event(0),
lease_ack_timeout_event(0),
accept_timeout_event(0) { }
void dispatch(Message *m);
+ void init();
+
void leader_init();
void peon_init();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __PAXOSSERVICE_H
+#define __PAXOSSERVICE_H
+
+#include "msg/Dispatcher.h"
+#include "include/Context.h"
+
+class Monitor;
+class Paxos;
+
+class PaxosService : public Dispatcher {
+protected:
+ Monitor *mon;
+ Paxos *paxos;
+
+
+ class C_RetryMessage : public Context {
+ Dispatcher *svc;
+ Message *m;
+ public:
+ C_RetryMessage(Dispatcher *s, Message *m_) : svc(s), m(m_) {}
+ void finish(int r) {
+ svc->dispatch(m);
+ }
+ };
+
+public:
+ PaxosService(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { }
+
+ // i implement
+ void dispatch(Message *m);
+ void election_finished();
+
+ // you implement
+ virtual void create_initial() = 0;
+ virtual bool update_from_paxos() = 0;
+ virtual void prepare_pending() = 0;
+ virtual void propose_pending() = 0;
+
+ virtual bool preprocess_update(Message *m) = 0; // true if processed.
+ virtual void prepare_update(Message *m)= 0;
+
+ virtual void tick() {}; // check state, take actions
+
+
+};
+
+#endif
+