OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
-OPTION(paxos_observer_timeout, OPT_DOUBLE, 5*60) // gather updates for this long before proposing a map update
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_supported, OPT_STR, "none")
OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_MMONOBSERVE_H
-#define CEPH_MMONOBSERVE_H
-
-#include "msg/Message.h"
-
-#include <vector>
-using std::vector;
-
-class MMonObserve : public PaxosServiceMessage {
- public:
- uuid_d fsid;
- uint32_t machine_id;
- version_t ver;
-
- MMonObserve() : PaxosServiceMessage(MSG_MON_OBSERVE, 0) {}
- MMonObserve(uuid_d &f, int mid, version_t v) :
- PaxosServiceMessage(MSG_MON_OBSERVE, v),
- fsid(f), machine_id(mid), ver(v) { }
-private:
- ~MMonObserve() {}
-
-public:
- const char *get_type_name() const { return "mon_observe"; }
- void print(ostream& o) const {
- o << "observe(" << machine_id << " v" << ver << ")";
- }
-
- void encode_payload(uint64_t features) {
- paxos_encode();
- ::encode(fsid, payload);
- ::encode(machine_id, payload);
- ::encode(ver, payload);
- }
- void decode_payload() {
- bufferlist::iterator p = payload.begin();
- paxos_decode(p);
- ::decode(fsid, p);
- ::decode(machine_id, p);
- ::decode(ver, p);
- }
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_MMONOBSERVENOTIFY_H
-#define CEPH_MMONOBSERVENOTIFY_H
-
-#include "msg/Message.h"
-
-class MMonObserveNotify : public PaxosServiceMessage {
- public:
- uuid_d fsid;
- int32_t machine_id;
- bufferlist bl;
- version_t ver;
- bool is_latest;
-
- MMonObserveNotify() : PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, 0) {}
- MMonObserveNotify(uuid_d& f, int id, bufferlist& b, version_t v, bool l) :
- PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, v), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {}
-private:
- ~MMonObserveNotify() {}
-
-public:
- const char *get_type_name() const { return "mon_observe_notify"; }
- void print(ostream& o) const {
- o << "mon_observe_notify(v" << ver << " " << bl.length() << " bytes";
- if (is_latest)
- o << " latest";
- o << " v" << version << ")";
- }
-
- void encode_payload(uint64_t features) {
- paxos_encode();
- ::encode(fsid, payload);
- ::encode(machine_id, payload);
- ::encode(bl, payload);
- ::encode(ver, payload);
- ::encode(is_latest, payload);
- }
- void decode_payload() {
- bufferlist::iterator p = payload.begin();
- paxos_decode(p);
- ::decode(fsid, p);
- ::decode(machine_id, p);
- ::decode(bl, p);
- ::decode(ver, p);
- ::decode(is_latest, p);
- }
-};
-
-#endif
#include "messages/MGenericMessage.h"
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
-#include "messages/MMonObserve.h"
-#include "messages/MMonObserveNotify.h"
#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
#include "messages/MMonPaxos.h"
}
-void Monitor::handle_observe(MMonObserve *m)
-{
- dout(10) << "handle_observe " << *m << " from " << m->get_source_inst() << dendl;
- // check that there are perms. Send a response back if they aren't sufficient,
- // and delete the message (if it's not deleted for us, which happens when
- // we own the connection to the requested observer).
- MonSession *session = m->get_session();
- if (!session || !session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) {
- send_reply(m, m);
- return;
- }
- if (m->machine_id >= PAXOS_NUM) {
- dout(0) << "register_observer: bad monitor id: " << m->machine_id << dendl;
- } else {
- paxos[m->machine_id]->register_observer(m->get_orig_source_inst(), m->ver);
- }
- messenger->send_message(m, m->get_orig_source_inst());
-}
-
void Monitor::send_command(const entity_inst_t& inst,
const vector<string>& com, version_t version)
{
}
break;
- case MSG_MON_OBSERVE:
- handle_observe((MMonObserve *)m);
- break;
-
// elector messages
case MSG_MON_ELECTION:
//check privileges here for simplicity
class MMonGetMap;
class MMonGetVersion;
class MMonProbe;
-class MMonObserve;
class MMonSubscribe;
class MAuthRotating;
class MRoute;
void _mon_status(ostream& ss);
void _quorum_status(ostream& ss);
void handle_command(class MMonCommand *m);
- void handle_observe(MMonObserve *m);
void handle_route(MRoute *m);
void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
#include "MonitorStore.h"
#include "messages/MMonPaxos.h"
-#include "messages/MMonObserveNotify.h"
#include "common/config.h"
finish_contexts(g_ceph_context, waiting_for_commit);
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
- update_observers();
return;
}
default:
assert(0);
}
-
- if (is_readable())
- update_observers();
}
-void Paxos::register_observer(entity_inst_t inst, version_t v)
-{
- dout(10) << "register_observer " << inst << " v" << v << dendl;
-
- Observer *observer;
- if (observers.count(inst))
- observer = observers[inst];
- else {
- observers[inst] = observer = new Observer(inst, v);
- }
-
- utime_t timeout = ceph_clock_now(g_ceph_context);
- timeout += g_conf->paxos_observer_timeout;
- observer->timeout = timeout;
-
- if (is_readable())
- update_observers();
-}
-
-
-void Paxos::update_observers()
-{
- dout(10) << "update_observers" << dendl;
-
- bufferlist bl;
- version_t ver;
-
- map<entity_inst_t, Observer *>::iterator iter = observers.begin();
- while (iter != observers.end()) {
- Observer *observer = iter->second;
-
- // timed out?
- if (ceph_clock_now(g_ceph_context) > observer->timeout) {
- delete observer;
- observers.erase(iter++);
- continue;
- }
- ++iter;
-
- if (observer->last_version == 0 ||
- observer->last_version < first_committed) {
- ver = get_stashed(bl);
- if (ver) {
- dout(10) << " sending summary state v" << ver << " to " << observer->inst << dendl;
- mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, true),
- observer->inst);
- observer->last_version = ver;
- continue;
- }
- }
-
- for (ver = observer->last_version + 1; ver <= last_committed; ver++) {
- if (read(ver, bl)) {
- dout(10) << " sending state v" << ver << " to " << observer->inst << dendl;
- mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, false),
- observer->inst);
- observer->last_version = ver;
- }
- }
- }
-}
// -----------------
// service interface
friend class Monitor;
friend class PaxosService;
- friend class PaxosObserver;
list<std::string> extra_state_dirs;
list<Context*> waiting_for_writeable;
list<Context*> waiting_for_commit;
- // observers
- struct Observer {
- entity_inst_t inst;
- version_t last_version;
- utime_t timeout;
- Observer(entity_inst_t& ei, version_t v) : inst(ei), last_version(v) { }
- };
- map<entity_inst_t, Observer *> observers;
-
//synchronization warnings
utime_t last_clock_drift_warn;
int clock_drift_warned;
version_t get_stashed_version() { return latest_stashed; }
version_t get_first_committed() { return first_committed; }
-
- void register_observer(entity_inst_t inst, version_t v);
- void update_observers();
};
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonPaxos.h"
-#include "messages/MMonObserve.h"
-#include "messages/MMonObserveNotify.h"
#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
m = new MMonElection;
break;
- case MSG_MON_OBSERVE:
- m = new MMonObserve;
- break;
- case MSG_MON_OBSERVE_NOTIFY:
- m = new MMonObserveNotify;
- break;
case MSG_LOG:
m = new MLog;
break;
#define MSG_MON_COMMAND_ACK 51
#define MSG_LOG 52
#define MSG_LOGACK 53
-#define MSG_MON_OBSERVE 54
-#define MSG_MON_OBSERVE_NOTIFY 55
+//#define MSG_MON_OBSERVE 54
+//#define MSG_MON_OBSERVE_NOTIFY 55
#define MSG_CLASS 56
#define MSG_CLASS_ACK 57
MESSAGE(MMonJoin)
#include "messages/MMonMap.h"
MESSAGE(MMonMap)
-#include "messages/MMonObserve.h"
-MESSAGE(MMonObserve)
-#include "messages/MMonObserveNotify.h"
-MESSAGE(MMonObserveNotify)
#include "messages/MMonPaxos.h"
MESSAGE(MMonPaxos)
#include "messages/MMonProbe.h"
#include "mon/mon_types.h"
-#include "messages/MMonObserve.h"
-#include "messages/MMonObserveNotify.h"
#include "messages/MOSDMap.h"
#include "messages/MCommand.h"
void Admin::ms_handle_connect(Connection *con) {
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
ctx->lock.Lock();
- if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) {
-// send_observe_requests(ctx);
- }
if (pending_cmd.size())
send_command(ctx);
ctx->lock.Unlock();