cmon_LDADD = libcommon.a libmon.a libcrush.a libcommon.a
# admin tools
+cobserver_SOURCES = cobserver.cc msg/SimpleMessenger.cc
+cobserver_LDADD = libcrush.a libcommon.a
cmonctl_SOURCES = cmonctl.cc msg/SimpleMessenger.cc
cmonctl_LDADD = libcommon.a -ledit
mkmonfs_SOURCES = mkmonfs.cc
bin_PROGRAMS = \
cmon cmds cosd csyn \
- cmonctl \
+ cmonctl cobserver \
mkmonfs monmaptool osdmaptool crushtool \
streamtest dupstore dumpjournal testmsgr
mon_pg_create_interval: 30.0, // no more than every 30s
paxos_propose_interval: 1.0, // gather updates for this long before proposing a map update
+ paxos_observer_timeout: 5*60, // gather updates for this long before proposing a map update
// --- client ---
client_cache_size: 1000,
float mon_pg_create_interval;
double paxos_propose_interval;
+ double paxos_observer_timeout;
// client
int client_cache_size;
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
+#include "messages/MMonObserve.h"
+#include "messages/MMonObserveNotify.h"
#include "messages/MMonPaxos.h"
delete m;
}
+void Monitor::register_observer(MMonObserve *m)
+{
+ if (m->monitor_id >= PAXOS_NUM) {
+ dout(0) << "register_observer: wrong monitor id: " << m->monitor_id << dendl;
+ delete m;
+ return;
+ }
+ Paxos *paxos = paxos_service[m->monitor_id]->paxos;
+ assert(paxos);
+ entity_inst_t inst=m->get_orig_source_inst();
+ PaxosObserver *observer = new PaxosObserver(paxos, inst, m->ver);
+ paxos->register_observer(observer);
+ delete m;
+}
+
void Monitor::inject_args(const entity_inst_t& inst, vector<string>& args)
{
handle_command((MMonCommand*)m);
break;
+ case MSG_MON_OBSERVE:
+ register_observer((MMonObserve *)m);
+ break;
+
// OSDs
case CEPH_MSG_OSD_GETMAP:
class PaxosService;
class MMonGetMap;
+class MMonObserve;
class Monitor : public Dispatcher {
public:
void reply_command(MMonCommand *m, int rc, const string &rs);
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata);
+ void register_observer(MMonObserve *m);
void inject_args(const entity_inst_t& inst, string& args) {
vector<string> a(1);
#include "MonitorStore.h"
#include "messages/MMonPaxos.h"
+#include "messages/MMonObserveNotify.h"
#include "config.h"
<< ") ";
}
+void PaxosObserver::notify(bufferlist& bl, version_t ver, Monitor *mon, bool is_incremental)
+{
+ MMonObserveNotify *msg = new MMonObserveNotify(paxos->machine_id, bl, ver, is_incremental);
+ last_version = ver;
+
+ mon->messenger->send_message(msg, inst);
+}
+
void Paxos::init()
{
case MSG_MON_PAXOS:
{
MMonPaxos *pm = (MMonPaxos*)m;
-
+
// NOTE: these ops are defined in messages/MMonPaxos.h
switch (pm->op) {
// learner
default:
assert(0);
}
+
+ if (is_readable()) {
+ update_observers();
+ }
}
+void Paxos::register_observer(PaxosObserver *observer)
+{
+ utime_t timeout=g_clock.now();
+ map<entity_inst_t, PaxosObserver *>::iterator iter;
+
+ observers_lock.Lock();
+
+ iter = observers.find(observer->inst);
+
+ if (iter != observers.end()) {
+ PaxosObserver *o = iter->second;
+
+ observers.erase(iter);
+
+ delete o;
+ }
+ timeout += g_conf.paxos_observer_timeout;
+ observer->set_timeout(timeout);
+ observers[observer->inst] = observer;
+
+ observers_lock.Unlock();
+}
+
+void Paxos::update_observers()
+{
+ bufferlist bl;
+ version_t ver;
+ map<entity_inst_t, PaxosObserver *>::iterator iter, last;
+
+ observers_lock.Lock();
+
+ last = observers.end();
+ iter = observers.begin();
+ while (iter != last) {
+ int done = 0;
+ PaxosObserver *observer = iter->second;
+ while (g_clock.now() > observer->timeout) {
+ delete observer;
+ observers.erase(iter++);
+ if (iter == last) {
+ observers_lock.Unlock();
+ return;
+ }
+ observer = iter->second;
+ }
+
+ if (observer->get_ver() == 0) {
+ ver = get_latest(bl);
+ if (ver) {
+ observer->notify(bl, ver, mon, false);
+ done=1;
+ }
+ }
+
+ if (!done) {
+ for (ver = observer->get_ver() + 1; ver <=last_committed; ver++) {
+ if (read(ver, bl)) {
+ observer->notify(bl, ver, mon, true);
+ }
+ }
+ }
+
+ ++iter;
+ }
+ observers_lock.Unlock();
+}
// -----------------
// service interface
#include "mon_types.h"
#include "include/buffer.h"
#include "msg/Message.h"
+#include "msg/msg_types.h"
#include "include/Context.h"
class Monitor;
class MMonPaxos;
+class Paxos;
+
+class PaxosObserver {
+ friend class Paxos;
+
+ Paxos *paxos;
+ int machine_id;
+ entity_inst_t inst;
+ version_t last_version;
+ utime_t timeout;
+public:
+ PaxosObserver(Paxos *px, entity_inst_t& ei, version_t v) : paxos(px), inst(ei), last_version(v) { }
+ void notify(bufferlist& bl, version_t ver, Monitor *mon, bool is_incremental);
+ version_t get_ver() { return last_version; }
+ int get_machine_id() { return machine_id; }
+ void set_timeout(utime_t to) { timeout = to; }
+};
// i am one state machine.
const char *machine_name;
friend class PaxosService;
+ friend class PaxosObserver;
// LEADER+PEON
list<Context*> waiting_for_writeable;
list<Context*> waiting_for_commit;
+ map<entity_inst_t, PaxosObserver *> observers;
+ Mutex observers_lock;
class C_CollectTimeout : public Context {
Paxos *paxos;
lease_renew_event(0),
lease_ack_timeout_event(0),
lease_timeout_event(0),
- accept_timeout_event(0) { }
+ accept_timeout_event(0),
+ observers_lock("observers_lock") { }
const char *get_machine_name() const {
return machine_name;
void stash_latest(version_t v, bufferlist& bl);
version_t get_latest(bufferlist& bl);
+ void register_observer(PaxosObserver *observer);
+ void update_observers();
};
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
+#include "messages/MMonObserve.h"
+#include "messages/MMonObserveNotify.h"
#include "messages/MMonElection.h"
#include "messages/MLog.h"
m = new MMonElection;
break;
+ case MSG_MON_OBSERVE:
+ m = new MMonObserve;
+ break;
+ case MSG_MON_OBSERVE_NOTIFY:
+ m = new MMonObserveNotify;
+ break;
case MSG_LOG:
m = new MLog;
break;
/* monitor <-> mon admin tool */
#define MSG_MON_COMMAND 50
#define MSG_MON_COMMAND_ACK 51
-
-#define MSG_LOG 52
+#define MSG_LOG 52
+#define MSG_MON_OBSERVE 53
+#define MSG_MON_OBSERVE_NOTIFY 54
// osd internal
#define MSG_OSD_PING 70