From 6d7c0531c1565a0b1374e10bc49e673401b52159 Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 14 Jun 2007 20:50:58 +0000 Subject: [PATCH] * fixed client mount race under fakesyn git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1419 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/client/Client.cc | 6 +- trunk/ceph/config.cc | 5 +- trunk/ceph/config.h | 3 + trunk/ceph/messages/MMonPaxos.h | 23 +- trunk/ceph/mon/ClientMonitor.cc | 285 +++++++++++++++---- trunk/ceph/mon/ClientMonitor.h | 181 +++++++++++- trunk/ceph/mon/Monitor.cc | 471 +++++++++++++++++--------------- trunk/ceph/mon/Monitor.h | 14 +- trunk/ceph/mon/OSDMonitor.cc | 16 +- trunk/ceph/mon/Paxos.cc | 209 +++++++++++--- trunk/ceph/mon/Paxos.h | 82 +++++- trunk/ceph/mon/mon_types.h | 33 +++ 12 files changed, 960 insertions(+), 368 deletions(-) create mode 100644 trunk/ceph/mon/mon_types.h diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index 7682524f81325..8b65c76c1ebab 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -875,16 +875,14 @@ void Client::handle_mds_map(MMDSMap* m) if (m->get_source().is_mds()) frommds = m->get_source().num(); - if (mdsmap == 0) + if (mdsmap == 0) { mdsmap = new MDSMap; - if (whoami < 0) { - // mounted! assert(m->get_source().is_mon()); whoami = m->get_dest().num(); dout(1) << "handle_mds_map i am now " << m->get_dest() << endl; messenger->reset_myname(m->get_dest()); - + mount_cond.Signal(); // mount might be waiting for this. } diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index 01314f5bfbfd2..467689d28bada 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -125,7 +125,10 @@ md_config_t g_conf = { // --- mon --- mon_tick_interval: 5, mon_osd_down_out_interval: 5, // seconds - mon_lease: 2.000, // seconds + mon_lease: 5, // seconds + mon_lease_renew_interval: 3, + mon_lease_ack_timeout: 10.0, + mon_accept_timeout: 10.0, mon_stop_with_last_mds: true, // --- client --- diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index b8c6d20d7d6a5..232041b717d92 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -114,6 +114,9 @@ struct md_config_t { int mon_tick_interval; int mon_osd_down_out_interval; float mon_lease; + float mon_lease_renew_interval; + float mon_lease_ack_timeout; + float mon_accept_timeout; bool mon_stop_with_last_mds; // client diff --git a/trunk/ceph/messages/MMonPaxos.h b/trunk/ceph/messages/MMonPaxos.h index f766c9254cb4a..b33012336901f 100644 --- a/trunk/ceph/messages/MMonPaxos.h +++ b/trunk/ceph/messages/MMonPaxos.h @@ -17,16 +17,18 @@ #define __MMONPAXOS_H #include "msg/Message.h" +#include "mon/mon_types.h" class MMonPaxos : public Message { public: // op types - const static int OP_COLLECT = 1; // proposer: propose round - const static int OP_LAST = 2; // voter: accept proposed round - const static int OP_BEGIN = 4; // proposer: value proposed for this round - const static int OP_ACCEPT = 5; // voter: accept propsed value - const static int OP_COMMIT = 7; // proposer: notify learners of agreed value - const static int OP_LEASE = 8; // extend reader lease + const static int OP_COLLECT = 1; // proposer: propose round + const static int OP_LAST = 2; // voter: accept proposed round + const static int OP_BEGIN = 3; // proposer: value proposed for this round + const static int OP_ACCEPT = 4; // voter: accept propsed value + const static int OP_COMMIT = 5; // proposer: notify learners of agreed value + const static int OP_LEASE = 6; // leader: extend peon lease + const static int OP_LEASE_ACK = 7; // peon: lease ack const static char *get_opname(int op) { switch (op) { case OP_COLLECT: return "collect"; @@ -35,6 +37,7 @@ class MMonPaxos : public Message { case OP_ACCEPT: return "accept"; case OP_COMMIT: return "commit"; case OP_LEASE: return "lease"; + case OP_LEASE_ACK: return "lease_ack"; default: assert(0); return 0; } } @@ -47,7 +50,7 @@ class MMonPaxos : public Message { version_t pn_from; // i promise to accept after version_t pn; // with with proposal version_t old_accepted_pn; // previous pn, if we are a LAST with an uncommitted value - utime_t lease_timeout; + utime_t lease_expire; map values; @@ -61,7 +64,7 @@ class MMonPaxos : public Message { virtual char *get_type_name() { return "paxos"; } void print(ostream& out) { - out << "paxos(m" << machine_id + out << "paxos(" << get_paxos_name(machine_id) << " " << get_opname(op) << " lc " << last_committed << " pn " << pn << " opn " << old_accepted_pn << ")"; @@ -75,7 +78,7 @@ class MMonPaxos : public Message { ::_encode(pn_from, payload); ::_encode(pn, payload); ::_encode(old_accepted_pn, payload); - ::_encode(lease_timeout, payload); + ::_encode(lease_expire, payload); ::_encode(values, payload); } void decode_payload() { @@ -87,7 +90,7 @@ class MMonPaxos : public Message { ::_decode(pn_from, payload, off); ::_decode(pn, payload, off); ::_decode(old_accepted_pn, payload, off); - ::_decode(lease_timeout, payload, off); + ::_decode(lease_expire, payload, off); ::_decode(values, payload, off); } }; diff --git a/trunk/ceph/mon/ClientMonitor.cc b/trunk/ceph/mon/ClientMonitor.cc index 1b7e4f0f12ac3..c6fe6f5da2b68 100644 --- a/trunk/ceph/mon/ClientMonitor.cc +++ b/trunk/ceph/mon/ClientMonitor.cc @@ -17,6 +17,7 @@ #include "Monitor.h" #include "MDSMonitor.h" #include "OSDMonitor.h" +#include "MonitorStore.h" #include "messages/MClientMount.h" #include "messages/MClientUnmount.h" @@ -36,86 +37,241 @@ void ClientMonitor::dispatch(Message *m) switch (m->get_type()) { case MSG_CLIENT_MOUNT: - handle_client_mount((MClientMount*)m); - break; - case MSG_CLIENT_UNMOUNT: - handle_client_unmount((MClientUnmount*)m); + handle_query(m); break; - default: assert(0); } } -void ClientMonitor::handle_client_mount(MClientMount *m) + +void ClientMonitor::handle_query(Message *m) { - dout(7) << "client_mount from " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); + dout(10) << "handle_query " << *m << " from " << m->get_source_inst() << endl; - // choose a client id - if (from < 0 || - (client_map.count(from) && - client_map[from] != m->get_source_addr())) { - from = num_clients++; - dout(10) << "client_mount assigned client" << from << endl; + // make sure our map is readable and up to date + if (!paxos->is_readable() || + !update_from_paxos()) { + dout(10) << " waiting for paxos -> readable" << endl; + paxos->wait_for_readable(new C_RetryMessage(this, m)); + return; + } + + // preprocess + if (preprocess_update(m)) + return; // easy! + + // leader? + if (!mon->is_leader()) { + // fw to leader + dout(10) << " fw to leader mon" << mon->get_leader() << endl; + mon->messenger->send_message(m, mon->monmap->get_inst(mon->get_leader())); + return; } - client_map[from] = m->get_source_addr(); - - // reply with latest mds map - entity_inst_t to = m->get_source_inst(); - to.name = MSG_ADDR_CLIENT(from); - mon->mdsmon->send_latest(to); - mon->osdmon->send_latest(to); - delete m; + // writeable? + if (!paxos->is_writeable()) { + dout(10) << " waiting for paxos -> writeable" << endl; + paxos->wait_for_writeable(new C_RetryMessage(this, m)); + return; + } + + prepare_update(m); + + // do it now (for now!) *** + propose_pending(); } -void ClientMonitor::handle_client_unmount(MClientUnmount *m) +bool ClientMonitor::update_from_paxos() { - dout(7) << "client_unmount from " << m->get_source() - << " at " << m->get_source_inst() << endl; - assert(m->get_source().is_client()); - int from = m->get_source().num(); - - if (client_map.count(from)) { - client_map.erase(from); - - if (client_map.empty() && - g_conf.mds_shutdown_on_last_unmount) { - dout(1) << "last client unmounted" << endl; - mon->do_stop(); + assert(paxos->is_active()); + version_t paxosv = paxos->get_version(); + dout(10) << "update_from_paxos paxosv " << paxosv + << ", my v " << client_map.version << endl; + + assert(paxosv >= client_map.version); + while (paxosv > client_map.version) { + bufferlist bl; + bool success = paxos->read(client_map.version+1, bl); + if (success) { + dout(10) << "update_from_paxos applying incremental " << client_map.version+1 << endl; + Incremental inc; + int off = 0; + inc._decode(bl, off); + client_map.apply_incremental(inc); + + } else { + dout(10) << "update_from_paxos couldn't read incremental " << client_map.version+1 << endl; + return false; } + + // save latest + bl.clear(); + client_map._encode(bl); + mon->store->put_bl_ss(bl, "clientmap", "latest"); + + // prepare next inc + prepare_pending(); } - // reply with (same) unmount message to ack - mon->messenger->send_message(m, m->get_source_inst()); + return true; } +void ClientMonitor::prepare_pending() +{ + pending_inc = Incremental(); + pending_inc.version = client_map.version + 1; + pending_inc.next_client = client_map.next_client; + dout(10) << "prepare_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; +} +void ClientMonitor::propose_pending() +{ + dout(10) << "propose_pending v " << pending_inc.version + << ", next is " << pending_inc.next_client + << endl; + + // apply to paxos + assert(paxos->get_version() + 1 == pending_inc.version); + bufferlist bl; + pending_inc._encode(bl); + paxos->propose_new_value(bl, new C_Commit(this)); +} -/* -void ClientMonitor::handle_mds_shutdown(Message *m) + +// ------- + + +bool ClientMonitor::preprocess_update(Message *m) { - assert(m->get_source().is_mds()); - int from = m->get_source().num(); + dout(10) << "preprocess_update " << *m << " from " << m->get_source_inst() << endl; - mdsmap.mds_inst.erase(from); - mdsmap.all_mds.erase(from); + switch (m->get_type()) { + case MSG_CLIENT_MOUNT: + { + // already mounted? + entity_addr_t addr = m->get_source_addr(); + if (client_map.addr_client.count(addr)) { + int client = client_map.addr_client[addr]; + dout(7) << " client" << client << " already mounted" << endl; + _mounted(client, m); + return true; + } + } + return false; + + case MSG_CLIENT_UNMOUNT: + { + // already unmounted? + int client = m->get_source().num(); + if (client_map.client_addr.count(client) == 0) { + dout(7) << " client" << client << " not mounted" << endl; + _unmounted(m); + return true; + } + } + return false; + - dout(7) << "mds_shutdown from " << m->get_source() - << ", still have " << mdsmap.all_mds - << endl; - - // tell someone? - // fixme + default: + assert(0); + delete m; + return true; + } +} + +void ClientMonitor::prepare_update(Message *m) +{ + dout(10) << "prepare_update " << *m << " from " << m->get_source_inst() << endl; + + int client = m->get_source().num(); + entity_addr_t addr = m->get_source_addr(); + + switch (m->get_type()) { + case MSG_CLIENT_MOUNT: + { + // choose a client id + if (client < 0 || + (client_map.client_addr.count(client) && + client_map.client_addr[client] != addr)) { + client = pending_inc.next_client; + dout(10) << "mount: assigned client" << client << " to " << addr << endl; + } else { + dout(10) << "mount: client" << client << " requested by " << addr << endl; + } + + pending_inc.add_mount(client, addr); + pending_commit.push_back(new C_Mounted(this, client, m)); + } + break; + + case MSG_CLIENT_UNMOUNT: + { + assert(client_map.client_addr.count(client)); + + pending_inc.add_unmount(client); + pending_commit.push_back(new C_Unmounted(this, m)); + } + break; + + default: + assert(0); + delete m; + } +} + + +// MOUNT + + +void ClientMonitor::_mounted(int client, Message *m) +{ + entity_inst_t to = m->get_source_inst(); + to.name = MSG_ADDR_CLIENT(client); + + dout(10) << "_mounted client" << client << " at " << to << endl; + // reply with latest mds, osd maps + mon->mdsmon->send_latest(to); + mon->osdmon->send_latest(to); + delete m; } -*/ +void ClientMonitor::_unmounted(Message *m) +{ + dout(10) << "_unmounted " << m->get_source() << endl; + + // reply with (same) unmount message + mon->messenger->send_message(m, m->get_source_inst()); + + // auto-shutdown? + if (update_from_paxos() && + mon->is_leader() && + client_map.version > 1 && + client_map.client_addr.empty() && + g_conf.mds_shutdown_on_last_unmount) { + dout(1) << "last client unmounted" << endl; + mon->do_stop(); + } +} + + +void ClientMonitor::_commit(int r) +{ + if (r >= 0) { + dout(10) << "_commit success" << endl; + finish_contexts(pending_commit); + } else { + dout(10) << "_commit failed" << endl; + } + + finish_contexts(pending_commit, r); +} /* void ClientMonitor::bcast_latest_mds() @@ -130,5 +286,30 @@ void ClientMonitor::bcast_latest_mds() send_full(MSG_ADDR_MDS(*p), mdsmap.get_inst(*p)); } } - */ + + +void ClientMonitor::create_initial() +{ + dout(10) << "create_initial" << endl; + + if (!mon->is_leader()) return; + if (paxos->get_version() > 0) return; + + if (paxos->is_writeable()) { + dout(1) << "create_initial -- creating initial map" << endl; + prepare_pending(); + propose_pending(); + } else { + dout(1) << "create_initial -- waiting for writeable" << endl; + paxos->wait_for_writeable(new C_CreateInitial(this)); + } +} + + +void ClientMonitor::election_finished() +{ + + if (mon->is_leader() && g_conf.mkfs) + create_initial(); +} diff --git a/trunk/ceph/mon/ClientMonitor.h b/trunk/ceph/mon/ClientMonitor.h index 1ae9401465c94..f47cf908ae471 100644 --- a/trunk/ceph/mon/ClientMonitor.h +++ b/trunk/ceph/mon/ClientMonitor.h @@ -25,30 +25,187 @@ using namespace std; #include "mds/MDSMap.h" class Monitor; +class Paxos; class ClientMonitor : public Dispatcher { +public: + + struct Incremental { + version_t version; + uint32_t next_client; + map mount; + set unmount; + + Incremental(int nc=0) : next_client(nc) {} + + bool is_empty() { return mount.empty() && unmount.empty(); } + void add_mount(uint32_t client, entity_addr_t addr) { + next_client = MAX(next_client, client+1); + mount[client] = addr; + } + void add_unmount(uint32_t client) { + assert(client < next_client); + if (mount.count(client)) + mount.erase(client); + else + unmount.insert(client); + } + + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(mount, bl); + ::_encode(unmount, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(mount, bl, off); + ::_decode(unmount, bl, off); + } + }; + + struct Map { + version_t version; + uint32_t next_client; + map client_addr; + hash_map addr_client; + + Map() : next_client(0) {} + + void reverse() { + addr_client.clear(); + for (map::iterator p = client_addr.begin(); + p != client_addr.end(); + ++p) { + addr_client[p->second] = p->first; + } + } + void apply_incremental(Incremental &inc) { + assert(inc.version == version+1); + version = inc.version; + next_client = inc.next_client; + for (map::iterator p = inc.mount.begin(); + p != inc.mount.end(); + ++p) { + client_addr[p->first] = p->second; + addr_client[p->second] = p->first; + } + + for (set::iterator p = inc.unmount.begin(); + p != inc.unmount.end(); + ++p) { + assert(client_addr.count(*p)); + addr_client.erase(client_addr[*p]); + client_addr.erase(*p); + } + } + + void _encode(bufferlist &bl) { + ::_encode(version, bl); + ::_encode(next_client, bl); + ::_encode(client_addr, bl); + } + void _decode(bufferlist &bl, int& off) { + ::_decode(version, bl, off); + ::_decode(next_client, bl, off); + ::_decode(client_addr, bl, off); + reverse(); + } + }; + + class C_CreateInitial : public Context { + ClientMonitor *cmon; + public: + C_CreateInitial(ClientMonitor *cm) : cmon(cm) {} + void finish(int r) { + cmon->create_initial(); + } + }; + + class C_RetryMessage : public Context { + ClientMonitor *cmon; + Message *m; + public: + C_RetryMessage(ClientMonitor *cm, Message *m_) : cmon(cm), m(m_) {} + void finish(int r) { + cmon->dispatch(m); + } + }; + + class C_Mounted : public Context { + ClientMonitor *cmon; + int client; + Message *m; + public: + C_Mounted(ClientMonitor *cm, int c, Message *m_) : + cmon(cm), client(c), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_mounted(client, m); + else + cmon->handle_query(m); + } + }; + + class C_Unmounted : public Context { + ClientMonitor *cmon; + Message *m; + public: + C_Unmounted(ClientMonitor *cm, Message *m_) : + cmon(cm), m(m_) {} + void finish(int r) { + if (r >= 0) + cmon->_unmounted(m); + else + cmon->handle_query(m); + } + }; + + class C_Commit : public Context { + ClientMonitor *cmon; + public: + C_Commit(ClientMonitor *cm) : + cmon(cm) {} + void finish(int r) { + cmon->_commit(r); + } + }; + +private: Monitor *mon; - Messenger *messenger; - Mutex &lock; + Paxos *paxos; - private: - int num_clients; - map client_map; + Map client_map; + list waiting_for_active; - void bcast_latest_mds(); + // leader + Incremental pending_inc; + list pending_commit; // contributers to pending_inc - //void accept_pending(); // accept pending, new map. - //void send_incremental(epoch_t since, msg_addr_t dest); + //void bcast_latest_mds(); - void handle_client_mount(class MClientMount *m); - void handle_client_unmount(class MClientUnmount *m); + void create_initial(); + bool update_from_paxos(); + void prepare_pending(); // prepare a new pending + void propose_pending(); // propose pending update to peers + void _mounted(int c, Message *m); + void _unmounted(Message *m); + void _commit(int r); + + void handle_query(Message *m); + bool preprocess_update(Message *m); // true if processed. + void prepare_update(Message *m); + + public: - ClientMonitor(Monitor *mn, Messenger *m, Mutex& l) : mon(mn), messenger(m), lock(l), - num_clients(0) { } + ClientMonitor(Monitor *mn, Paxos *p) : mon(mn), paxos(p) { } void dispatch(Message *m); void tick(); // check state, take actions + + void election_finished(); }; #endif diff --git a/trunk/ceph/mon/Monitor.cc b/trunk/ceph/mon/Monitor.cc index 1d55b6c7d8496..497f3caf799ac 100644 --- a/trunk/ceph/mon/Monitor.cc +++ b/trunk/ceph/mon/Monitor.cc @@ -1,224 +1,234 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ + // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + // vim: ts=8 sw=2 smarttab + /* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + // TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer + + #include "Monitor.h" + + #include "osd/OSDMap.h" + + #include "MonitorStore.h" + + #include "msg/Message.h" + #include "msg/Messenger.h" + + #include "messages/MPing.h" + #include "messages/MPingAck.h" + #include "messages/MGenericMessage.h" + #include "messages/MMonCommand.h" + #include "messages/MMonCommandAck.h" + + #include "messages/MMonPaxos.h" + + #include "common/Timer.h" + #include "common/Clock.h" + + #include "OSDMonitor.h" + #include "MDSMonitor.h" + #include "ClientMonitor.h" + + #include "config.h" + #undef dout + #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " + #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " -// TODO: missing run() method, which creates the two main timers, refreshTimer and readTimer -#include "Monitor.h" -#include "osd/OSDMap.h" - -#include "MonitorStore.h" - -#include "msg/Message.h" -#include "msg/Messenger.h" - -#include "messages/MPing.h" -#include "messages/MPingAck.h" -#include "messages/MGenericMessage.h" -#include "messages/MMonCommand.h" -#include "messages/MMonCommandAck.h" - -#include "messages/MMonPaxos.h" - -#include "common/Timer.h" -#include "common/Clock.h" - -#include "OSDMonitor.h" -#include "MDSMonitor.h" -#include "ClientMonitor.h" - -#include "config.h" -#undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (is_starting() ? (const char*)"(starting)":(is_leader() ? (const char*)"(leader)":(is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << " " - - - -void Monitor::init() -{ - lock.Lock(); - - dout(1) << "init" << endl; - - // store - char s[80]; - sprintf(s, "mondata/mon%d", whoami); - store = new MonitorStore(s); - - if (g_conf.mkfs) - store->mkfs(); - - store->mount(); - - // create - osdmon = new OSDMonitor(this, messenger, lock); - mdsmon = new MDSMonitor(this, messenger, lock); - clientmon = new ClientMonitor(this, messenger, lock); - - // i'm ready! - messenger->set_dispatcher(this); - - // start ticker - reset_tick(); - - // call election? - if (monmap->num_mon > 1) { - assert(monmap->num_mon != 2); - call_election(); - } else { - // we're standalone. - set q; - q.insert(whoami); - win_election(1, q); - } - - lock.Unlock(); -} - -void Monitor::shutdown() -{ - dout(1) << "shutdown" << endl; - - elector.shutdown(); - - // cancel all events - cancel_tick(); - timer.cancel_all(); - timer.join(); - - // stop osds. - for (set::iterator it = osdmon->osdmap.get_osds().begin(); - it != osdmon->osdmap.get_osds().end(); - it++) { - if (osdmon->osdmap.is_down(*it)) continue; - dout(10) << "sending shutdown to osd" << *it << endl; - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - osdmon->osdmap.get_inst(*it)); - } - osdmon->mark_all_down(); - - // monitors too. - for (int i=0; inum_mon; i++) - if (i != whoami) - messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - monmap->get_inst(i)); - - // unmount my local storage - if (store) - delete store; - - // clean up - if (monmap) delete monmap; - if (osdmon) delete osdmon; - if (mdsmon) delete mdsmon; - if (clientmon) delete clientmon; - - // die. - messenger->shutdown(); - delete messenger; -} - - -void Monitor::call_election() -{ - if (monmap->num_mon == 1) return; - - dout(10) << "call_election" << endl; - state = STATE_STARTING; - - elector.call_election(); - - osdmon->election_starting(); - //mdsmon->election_starting(); -} - -void Monitor::win_election(epoch_t epoch, set& active) -{ - state = STATE_LEADER; - leader = whoami; - mon_epoch = epoch; - quorum = active; - dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; - - // init - osdmon->election_finished(); - mdsmon->election_finished(); - - // init paxos - test_paxos.leader_init(); -} - -void Monitor::lose_election(epoch_t epoch, int l) -{ - state = STATE_PEON; - mon_epoch = epoch; - leader = l; - dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; - - // init paxos - test_paxos.peon_init(); -} - - -void Monitor::handle_command(MMonCommand *m) -{ - dout(0) << "handle_command " << *m << endl; - - int r = -1; - string rs = "unrecognized command"; - - if (!m->cmd.empty()) { - if (m->cmd[0] == "stop") { - r = 0; - rs = "stopping"; - do_stop(); - } - else if (m->cmd[0] == "mds") { - mdsmon->handle_command(m, r, rs); - } - else if (m->cmd[0] == "osd") { - - } - } - - // reply - messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); - delete m; -} - - -void Monitor::do_stop() -{ - dout(0) << "do_stop -- shutting down" << endl; - mdsmon->do_stop(); -} - - -void Monitor::dispatch(Message *m) -{ - lock.Lock(); - { - switch (m->get_type()) { - - // misc - case MSG_PING_ACK: - handle_ping_ack((MPingAck*)m); - break; - - case MSG_SHUTDOWN: - assert(m->get_source().is_osd()); - osdmon->dispatch(m); + void Monitor::init() + { + lock.Lock(); + + dout(1) << "init" << endl; + + // store + char s[80]; + sprintf(s, "mondata/mon%d", whoami); + store = new MonitorStore(s); + + if (g_conf.mkfs) + store->mkfs(); + + store->mount(); + + // create + osdmon = new OSDMonitor(this, messenger, lock); + mdsmon = new MDSMonitor(this, messenger, lock); + clientmon = new ClientMonitor(this, &paxos_clientmap); + + // i'm ready! + messenger->set_dispatcher(this); + + // start ticker + reset_tick(); + + // call election? + if (monmap->num_mon > 1) { + assert(monmap->num_mon != 2); + call_election(); + } else { + // we're standalone. + set q; + q.insert(whoami); + win_election(1, q); + } + + lock.Unlock(); + } + + void Monitor::shutdown() + { + dout(1) << "shutdown" << endl; + + elector.shutdown(); + + // cancel all events + cancel_tick(); + timer.cancel_all(); + timer.join(); + + // stop osds. + for (set::iterator it = osdmon->osdmap.get_osds().begin(); + it != osdmon->osdmap.get_osds().end(); + it++) { + if (osdmon->osdmap.is_down(*it)) continue; + dout(10) << "sending shutdown to osd" << *it << endl; + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + osdmon->osdmap.get_inst(*it)); + } + osdmon->mark_all_down(); + + // monitors too. + for (int i=0; inum_mon; i++) + if (i != whoami) + messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), + monmap->get_inst(i)); + + // unmount my local storage + if (store) + delete store; + + // clean up + if (monmap) delete monmap; + if (osdmon) delete osdmon; + if (mdsmon) delete mdsmon; + if (clientmon) delete clientmon; + + // die. + messenger->shutdown(); + delete messenger; + } + + + void Monitor::call_election() + { + if (monmap->num_mon == 1) return; + + dout(10) << "call_election" << endl; + state = STATE_STARTING; + + elector.call_election(); + + osdmon->election_starting(); + //mdsmon->election_starting(); + } + + void Monitor::win_election(epoch_t epoch, set& active) + { + state = STATE_LEADER; + leader = whoami; + mon_epoch = epoch; + quorum = active; + dout(10) << "win_election, epoch " << mon_epoch << " quorum is " << quorum << endl; + + // init paxos + paxos_test.leader_init(); + paxos_mdsmap.leader_init(); + paxos_osdmap.leader_init(); + paxos_clientmap.leader_init(); + + // init + osdmon->election_finished(); + mdsmon->election_finished(); + clientmon->election_finished(); + } + + void Monitor::lose_election(epoch_t epoch, int l) + { + state = STATE_PEON; + mon_epoch = epoch; + leader = l; + dout(10) << "lose_election, epoch " << mon_epoch << " leader is mon" << leader << endl; + + // init paxos + paxos_test.peon_init(); + paxos_mdsmap.peon_init(); + paxos_osdmap.peon_init(); + paxos_clientmap.peon_init(); + } + + + void Monitor::handle_command(MMonCommand *m) + { + dout(0) << "handle_command " << *m << endl; + + int r = -1; + string rs = "unrecognized command"; + + if (!m->cmd.empty()) { + if (m->cmd[0] == "stop") { + r = 0; + rs = "stopping"; + do_stop(); + } + else if (m->cmd[0] == "mds") { + mdsmon->handle_command(m, r, rs); + } + else if (m->cmd[0] == "osd") { + + } + } + + // reply + messenger->send_message(new MMonCommandAck(r, rs), m->get_source_inst()); + delete m; + } + + + void Monitor::do_stop() + { + dout(0) << "do_stop -- shutting down" << endl; + mdsmon->do_stop(); + } + + + void Monitor::dispatch(Message *m) + { + lock.Lock(); + { + switch (m->get_type()) { + + // misc + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_SHUTDOWN: + if (m->get_source().is_osd()) + osdmon->dispatch(m); + else + handle_shutdown(m); + break; case MSG_MON_COMMAND: @@ -243,7 +253,8 @@ void Monitor::dispatch(Message *m) // hackish: did all mds's shut down? if (g_conf.mon_stop_with_last_mds && - mdsmon->mdsmap.get_num_up_or_failed_mds() == 0) + mdsmon->mdsmap.get_num_up_or_failed_mds() == 0 && + is_leader()) shutdown(); break; @@ -271,11 +282,17 @@ void Monitor::dispatch(Message *m) // send it to the right paxos instance switch (pm->machine_id) { case PAXOS_TEST: - test_paxos.dispatch(m); + paxos_test.dispatch(m); break; case PAXOS_OSDMAP: - //... - + paxos_osdmap.dispatch(m); + break; + case PAXOS_MDSMAP: + paxos_mdsmap.dispatch(m); + break; + case PAXOS_CLIENTMAP: + paxos_clientmap.dispatch(m); + break; default: assert(0); } @@ -299,9 +316,13 @@ void Monitor::dispatch(Message *m) void Monitor::handle_shutdown(Message *m) { - dout(1) << "shutdown from " << m->get_source() << endl; - - shutdown(); + assert(m->get_source().is_mon()); + if (m->get_source().num() == get_leader()) { + dout(1) << "shutdown from leader " << m->get_source() << endl; + shutdown(); + } else { + dout(1) << "ignoring shutdown from non-leader " << m->get_source() << endl; + } delete m; } diff --git a/trunk/ceph/mon/Monitor.h b/trunk/ceph/mon/Monitor.h index e7e77bca305a4..98ce5857d695a 100644 --- a/trunk/ceph/mon/Monitor.h +++ b/trunk/ceph/mon/Monitor.h @@ -31,10 +31,6 @@ class OSDMonitor; class MDSMonitor; class ClientMonitor; -#define PAXOS_TEST 0 -#define PAXOS_OSDMAP 1 -#define PAXOS_MDSMAP 2 -#define PAXOS_CLIENTMAP 3 class Monitor : public Dispatcher { protected: @@ -90,7 +86,10 @@ public: // -- paxos -- - Paxos test_paxos; + Paxos paxos_test; + Paxos paxos_mdsmap; + Paxos paxos_osdmap; + Paxos paxos_clientmap; friend class Paxos; @@ -125,7 +124,10 @@ public: mon_epoch(0), leader(0), - test_paxos(this, w, PAXOS_TEST, "tester"), // tester state machine + paxos_test(this, w, PAXOS_TEST), + paxos_mdsmap(this, w, PAXOS_MDSMAP), + paxos_osdmap(this, w, PAXOS_OSDMAP), + paxos_clientmap(this, w, PAXOS_CLIENTMAP), osdmon(0), mdsmon(0), clientmon(0) { diff --git a/trunk/ceph/mon/OSDMonitor.cc b/trunk/ceph/mon/OSDMonitor.cc index 25dc11f720de5..26696c929c236 100644 --- a/trunk/ceph/mon/OSDMonitor.cc +++ b/trunk/ceph/mon/OSDMonitor.cc @@ -232,18 +232,18 @@ void OSDMonitor::create_initial() bool OSDMonitor::get_map_bl(epoch_t epoch, bufferlist& bl) { - if (!mon->store->exists_bl_sn("osdmap", epoch)) + if (!mon->store->exists_bl_sn("osdmap_full", epoch)) return false; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); assert(r > 0); return true; } bool OSDMonitor::get_inc_map_bl(epoch_t epoch, bufferlist& bl) { - if (!mon->store->exists_bl_sn("osdincmap", epoch)) + if (!mon->store->exists_bl_sn("osdmap_inc", epoch)) return false; - int r = mon->store->get_bl_sn(bl, "osdincmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_inc", epoch); assert(r > 0); return true; } @@ -254,7 +254,7 @@ void OSDMonitor::save_map() bufferlist bl; osdmap.encode(bl); - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); } @@ -266,8 +266,8 @@ void OSDMonitor::save_inc_map(OSDMap::Incremental &inc) bufferlist incbl; inc.encode(incbl); - mon->store->put_bl_sn(bl, "osdmap", osdmap.get_epoch()); - mon->store->put_bl_sn(incbl, "osdincmap", osdmap.get_epoch()); + mon->store->put_bl_sn(bl, "osdmap_full", osdmap.get_epoch()); + mon->store->put_bl_sn(incbl, "osdmap_inc", osdmap.get_epoch()); mon->store->put_int(osdmap.get_epoch(), "osd_epoch"); } @@ -664,7 +664,7 @@ void OSDMonitor::election_finished() epoch_t epoch = mon->store->get_int("osd_epoch"); dout(10) << " last epoch was " << epoch << endl; bufferlist bl, blinc; - int r = mon->store->get_bl_sn(bl, "osdmap", epoch); + int r = mon->store->get_bl_sn(bl, "osdmap_full", epoch); assert(r>0); osdmap.decode(bl); diff --git a/trunk/ceph/mon/Paxos.cc b/trunk/ceph/mon/Paxos.cc index a0875e072bba3..60e161ce00fff 100644 --- a/trunk/ceph/mon/Paxos.cc +++ b/trunk/ceph/mon/Paxos.cc @@ -20,8 +20,8 @@ #include "config.h" #undef dout -#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << ") " -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << ") " +#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cerr << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mon) cout << g_clock.now() << " mon" << whoami << (mon->is_starting() ? (const char*)"(starting)":(mon->is_leader() ? (const char*)"(leader)":(mon->is_peon() ? (const char*)"(peon)":(const char*)"(?\?)"))) << ".paxos(" << machine_name << " " << get_statename(state) << " lc " << last_committed << ") " // --------------------------------- @@ -64,11 +64,7 @@ void Paxos::handle_collect(MMonPaxos *collect) { dout(10) << "handle_collect " << *collect << endl; - if (!mon->is_peon()) { - dout(10) << "hmm, not peon, dropping!" << endl; - delete collect; - return; - } + assert(mon->is_peon()); // mon epoch filter should catch strays // we're recoverying, it seems! state = STATE_RECOVERING; @@ -163,12 +159,13 @@ void Paxos::handle_last(MMonPaxos *last) // do they accept your pn? if (last->old_accepted_pn > accepted_pn) { // no, try again. - dout(10) << "uh oh, they have a higher pn than us. pick a new one." << endl; + dout(10) << " they had a higher pn than us, picking a new one." << endl; collect(last->old_accepted_pn); } else { // yes, they accepted our pn. great. num_last++; - dout(10) << "great, they accepted our pn, we now have " << num_last << endl; + dout(10) << " they accepted our pn, we now have " + << num_last << " peons" << endl; // did this person send back an accepted but uncommitted value? if (last->old_accepted_pn && @@ -193,6 +190,7 @@ void Paxos::handle_last(MMonPaxos *last) // active! dout(10) << "that's everyone. active!" << endl; state = STATE_ACTIVE; + finish_contexts(waiting_for_active); extend_lease(); } } @@ -215,8 +213,9 @@ void Paxos::begin(bufferlist& v) state = STATE_UPDATING; // we must already have a majority for this to work. - assert(num_last > (unsigned)mon->monmap->num_mon/2); - + assert(mon->get_quorum().size() == 1 || + num_last > (unsigned)mon->monmap->num_mon/2); + // and no value, yet. assert(new_value.length() == 0); @@ -225,6 +224,15 @@ void Paxos::begin(bufferlist& v) new_value = v; mon->store->put_bl_sn(new_value, machine_name, last_committed+1); + if (mon->get_quorum().size() == 1) { + // we're alone, take it easy + commit(); + state = STATE_ACTIVE; + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_active); + return; + } + // ask others to accept it to! for (set::const_iterator p = mon->get_quorum().begin(); p != mon->get_quorum().end(); @@ -239,6 +247,10 @@ void Paxos::begin(bufferlist& v) mon->messenger->send_message(begin, mon->monmap->get_inst(*p)); } + + // set timeout event + accept_timeout_event = new C_AcceptTimeout(this); + mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event); } // peon @@ -257,6 +269,7 @@ void Paxos::handle_begin(MMonPaxos *begin) // set state. state = STATE_UPDATING; + lease_expire = utime_t(); // cancel lease // yes. version_t v = last_committed+1; @@ -270,7 +283,6 @@ void Paxos::handle_begin(MMonPaxos *begin) mon->messenger->send_message(accept, begin->get_source_inst()); delete begin; - } // leader @@ -284,11 +296,14 @@ void Paxos::handle_accept(MMonPaxos *accept) delete accept; return; } - if (accept->last_committed != last_committed) { - dout(10) << " this is from an old round that's already committed, ignoring" << endl; + if (last_committed > 0 && + accept->last_committed < last_committed-1) { + dout(10) << " this is from an old round, ignoring" << endl; delete accept; return; } + assert(accept->last_committed == last_committed || // not committed + accept->last_committed == last_committed-1); // committed assert(state == STATE_UPDATING); num_accepted++; @@ -297,19 +312,32 @@ void Paxos::handle_accept(MMonPaxos *accept) // new majority? if (num_accepted == (unsigned)mon->monmap->num_mon/2+1) { // yay, commit! - // note: this may happen a bit before the lease is reextended. + // note: this may happen before the lease is reextended (below) dout(10) << "we got a majority, committing too" << endl; commit(); - finish_contexts(waiting_for_commit); } // done? if (num_accepted == mon->get_quorum().size()) { state = STATE_ACTIVE; + finish_contexts(waiting_for_commit); + finish_contexts(waiting_for_active); extend_lease(); + + // cancel timeout event + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; } } +void Paxos::accept_timeout() +{ + dout(5) << "accept timeout, calling fresh election" << endl; + assert(mon->is_leader()); + assert(is_updating()); + mon->call_election(); +} + void Paxos::commit() { dout(10) << "commit " << last_committed+1 << endl; @@ -334,7 +362,6 @@ void Paxos::commit() // get ready for a new round. new_value.clear(); - } @@ -367,9 +394,12 @@ void Paxos::extend_lease() assert(mon->is_leader()); assert(is_active()); - lease_timeout = g_clock.now(); - lease_timeout += g_conf.mon_lease; - dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_timeout << ")" << endl; + lease_expire = g_clock.now(); + lease_expire += g_conf.mon_lease; + acked_lease.clear(); + acked_lease.insert(whoami); + + dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" << lease_expire << ")" << endl; // bcast for (set::const_iterator p = mon->get_quorum().begin(); @@ -378,16 +408,31 @@ void Paxos::extend_lease() if (*p == whoami) continue; MMonPaxos *lease = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE, machine_id); lease->last_committed = last_committed; - lease->lease_timeout = lease_timeout; + lease->lease_expire = lease_expire; mon->messenger->send_message(lease, mon->monmap->get_inst(*p)); } - if (is_readable()) - finish_contexts(waiting_for_readable); - if (is_writeable()) - finish_contexts(waiting_for_readable); + // wake people up + finish_contexts(waiting_for_readable); + finish_contexts(waiting_for_writeable); + + // set renew event + lease_renew_event = new C_LeaseRenew(this); + utime_t at = lease_expire; + at -= g_conf.mon_lease; + at += g_conf.mon_lease_renew_interval; + mon->timer.add_event_at(at, lease_renew_event); + + // set timeout event. + // if old timeout is still in place, leave it. + if (!lease_ack_timeout_event) { + lease_ack_timeout_event = new C_LeaseAckTimeout(this); + mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event); + } } + +// peon void Paxos::handle_lease(MMonPaxos *lease) { // sanity @@ -399,14 +444,21 @@ void Paxos::handle_lease(MMonPaxos *lease) } // extend lease - if (lease_timeout < lease->lease_timeout) - lease_timeout = lease->lease_timeout; + if (lease_expire < lease->lease_expire) + lease_expire = lease->lease_expire; state = STATE_ACTIVE; - + finish_contexts(waiting_for_active); + dout(10) << "handle_lease on " << lease->last_committed - << " now " << lease_timeout << endl; + << " now " << lease_expire << endl; + // ack + MMonPaxos *ack = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_LEASE_ACK, machine_id); + ack->last_committed = last_committed; + ack->lease_expire = lease_expire; + mon->messenger->send_message(ack, lease->get_source_inst()); + // kick waiters if (is_readable()) finish_contexts(waiting_for_readable); @@ -414,6 +466,41 @@ void Paxos::handle_lease(MMonPaxos *lease) delete lease; } +void Paxos::handle_lease_ack(MMonPaxos *ack) +{ + int from = ack->get_source().num(); + + if (acked_lease.count(from) == 0) { + acked_lease.insert(from); + + if (acked_lease == mon->get_quorum()) { + // yay! + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- got everyone" << endl; + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " -- still need " + << mon->get_quorum().size() - acked_lease.size() + << " more" << endl; + } + } else { + dout(10) << "handle_lease_ack from " << ack->get_source() + << " dup (lagging!), ignoring" << endl; + } + + delete ack; +} + +void Paxos::lease_ack_timeout() +{ + dout(5) << "lease_ack_timeout -- calling new election" << endl; + assert(mon->is_leader()); + assert(is_active()); + mon->call_election(); +} + /* * return a globally unique, monotonically increasing proposal number @@ -440,18 +527,40 @@ version_t Paxos::get_new_proposal_number(version_t gt) } +void Paxos::cancel_events() +{ + if (accept_timeout_event) { + mon->timer.cancel_event(accept_timeout_event); + accept_timeout_event = 0; + } + if (lease_renew_event) { + mon->timer.cancel_event(lease_renew_event); + lease_renew_event = 0; + } + if (lease_ack_timeout_event) { + mon->timer.cancel_event(lease_ack_timeout_event); + lease_ack_timeout_event = 0; + } +} + void Paxos::leader_init() { + if (mon->get_quorum().size() == 1) { + state = STATE_ACTIVE; + return; + } + cancel_events(); state = STATE_RECOVERING; - lease_timeout = utime_t(); + lease_expire = utime_t(); dout(10) << "leader_init -- starting paxos recovery" << endl; collect(0); } void Paxos::peon_init() { + cancel_events(); state = STATE_RECOVERING; - lease_timeout = utime_t(); + lease_expire = utime_t(); dout(10) << "peon_init -- i am a peon" << endl; // no chance to write now! @@ -500,6 +609,9 @@ void Paxos::dispatch(Message *m) case MMonPaxos::OP_LEASE: handle_lease(pm); break; + case MMonPaxos::OP_LEASE_ACK: + handle_lease_ack(pm); + break; default: assert(0); } @@ -521,42 +633,49 @@ void Paxos::dispatch(Message *m) bool Paxos::is_readable() { + if (mon->get_quorum().size() == 1) return true; return (mon->is_peon() || mon->is_leader()) && is_active() && - last_committed > 0 && - g_clock.now() < lease_timeout; + g_clock.now() < lease_expire; } -version_t Paxos::read_current(bufferlist &bl) +bool Paxos::read(version_t v, bufferlist &bl) { if (!is_readable()) - return 0; - mon->store->get_bl_sn(bl, machine_name, last_committed); - return last_committed; + return false; + + if (!mon->store->get_bl_sn(bl, machine_name, v)) + return false; + return true; } -void Paxos::wait_for_readable(Context *onreadable) +version_t Paxos::read_current(bufferlist &bl) { - assert(!is_readable()); - waiting_for_readable.push_back(onreadable); + if (!is_readable()) + return 0; + if (read(last_committed, bl)) + return last_committed; + return 0; } + // -- WRITE -- bool Paxos::is_writeable() { + if (mon->get_quorum().size() == 1) return true; return mon->is_leader() && is_active() && - last_committed > 0 && - g_clock.now() < lease_timeout; + g_clock.now() < lease_expire; } bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit) { + /* // writeable? if (!is_writeable()) { dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" @@ -567,7 +686,13 @@ bool Paxos::propose_new_value(bufferlist& bl, Context *oncommit) } return false; } + */ + assert(mon->is_leader() && is_active()); + + // cancel lease renewal and timeout events. + cancel_events(); + // ok! dout(5) << "propose_new_value " << last_committed+1 << " " << bl.length() << " bytes" << endl; if (oncommit) diff --git a/trunk/ceph/mon/Paxos.h b/trunk/ceph/mon/Paxos.h index 43c1967ca7054..08da005139b09 100644 --- a/trunk/ceph/mon/Paxos.h +++ b/trunk/ceph/mon/Paxos.h @@ -50,6 +50,7 @@ e 12v #define __MON_PAXOS_H #include "include/types.h" +#include "mon_types.h" #include "include/buffer.h" #include "msg/Message.h" @@ -60,6 +61,7 @@ e 12v class Monitor; class MMonPaxos; + // i am one state machine. class Paxos { Monitor *mon; @@ -72,6 +74,7 @@ class Paxos { // LEADER+PEON // -- generic state -- +public: const static int STATE_RECOVERING = 1; // leader|peon: recovering paxos state const static int STATE_ACTIVE = 2; // leader|peon: idle. peon may or may not have valid lease const static int STATE_UPDATING = 3; // leader|peon: updating to new value @@ -84,36 +87,74 @@ class Paxos { } } +private: int state; + +public: bool is_recovering() { return state == STATE_RECOVERING; } bool is_active() { return state == STATE_ACTIVE; } bool is_updating() { return state == STATE_UPDATING; } +private: // recovery (phase 1) version_t last_committed; version_t accepted_pn; version_t accepted_pn_from; // active (phase 2) - utime_t lease_timeout; + utime_t lease_expire; + list waiting_for_active; list waiting_for_readable; // -- leader -- - // recovery (phase 1) + // recovery (paxos phase 1) unsigned num_last; version_t old_accepted_v; version_t old_accepted_pn; bufferlist old_accepted_value; - // updating (phase 2) + // active + set acked_lease; + Context *lease_renew_event; + Context *lease_ack_timeout_event; + + // updating (paxos phase 2) bufferlist new_value; unsigned num_accepted; - utime_t accept_timeout; + + Context *accept_timeout_event; list waiting_for_writeable; list waiting_for_commit; + class C_AcceptTimeout : public Context { + Paxos *paxos; + public: + C_AcceptTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->accept_timeout(); + } + }; + + class C_LeaseAckTimeout : public Context { + Paxos *paxos; + public: + C_LeaseAckTimeout(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->lease_ack_timeout(); + } + }; + + class C_LeaseRenew : public Context { + Paxos *paxos; + public: + C_LeaseRenew(Paxos *p) : paxos(p) {} + void finish(int r) { + paxos->extend_lease(); + } + }; + void collect(version_t oldpn); void handle_collect(MMonPaxos*); @@ -121,18 +162,26 @@ class Paxos { void begin(bufferlist& value); void handle_begin(MMonPaxos*); void handle_accept(MMonPaxos*); + void accept_timeout(); void commit(); void handle_commit(MMonPaxos*); void extend_lease(); void handle_lease(MMonPaxos*); + void handle_lease_ack(MMonPaxos*); + void lease_ack_timeout(); + + void cancel_events(); version_t get_new_proposal_number(version_t gt=0); public: Paxos(Monitor *m, int w, - int mid,const char *mnm) : mon(m), whoami(w), - machine_id(mid), machine_name(mnm) { - } + int mid) : mon(m), whoami(w), + machine_id(mid), + machine_name(get_paxos_name(mid)), + lease_renew_event(0), + lease_ack_timeout_event(0), + accept_timeout_event(0) { } void dispatch(Message *m); @@ -141,14 +190,31 @@ public: // -- service interface -- + /* + void wait_for_active(Context *c) { + assert(!is_active()); + waiting_for_active.push_back(c); + } + */ + // read + version_t get_version() { return last_committed; } bool is_readable(); + bool read(version_t v, bufferlist &bl); version_t read_current(bufferlist &bl); - void wait_for_readable(Context *onreadable); + void wait_for_readable(Context *onreadable) { + assert(!is_readable()); + waiting_for_readable.push_back(onreadable); + } // write bool is_leader(); bool is_writeable(); + void wait_for_writeable(Context *c) { + assert(!is_writeable()); + waiting_for_writeable.push_back(c); + } + bool propose_new_value(bufferlist& bl, Context *oncommit=0); void wait_for_commit(Context *oncommit) { waiting_for_commit.push_back(oncommit); diff --git a/trunk/ceph/mon/mon_types.h b/trunk/ceph/mon/mon_types.h new file mode 100644 index 0000000000000..852e42b8d983f --- /dev/null +++ b/trunk/ceph/mon/mon_types.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MON_TYPES_H +#define __MON_TYPES_H + +#define PAXOS_TEST 0 +#define PAXOS_MDSMAP 1 +#define PAXOS_OSDMAP 2 +#define PAXOS_CLIENTMAP 3 + +inline const char *get_paxos_name(int p) { + switch (p) { + case PAXOS_TEST: return "test"; + case PAXOS_MDSMAP: return "mdsmap"; + case PAXOS_OSDMAP: return "osdmap"; + case PAXOS_CLIENTMAP: return "clientmap"; + default: assert(0); return 0; + } +} + +#endif -- 2.39.5