From: Joao Eduardo Luis Date: Thu, 17 May 2012 20:48:46 +0000 (-0700) Subject: src: get rid of the Observers throughout the code base. X-Git-Tag: v0.48argonaut~137^2~35^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=035bb125a7b6967b94b5d362b18966f4924a89fb;p=ceph.git src: get rid of the Observers throughout the code base. This is a big patch that will remove all references to the observers throughout the code, including a complete removal of the Observer-related messages' source files. Signed-off-by: Joao Eduardo Luis --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 6ae487bf0027..0da12f2631f2 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -127,7 +127,6 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024) // limit size of slurp messages OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity -OPTION(paxos_observer_timeout, OPT_DOUBLE, 5*60) // gather updates for this long before proposing a map update OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc OPTION(auth_supported, OPT_STR, "none") OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12) diff --git a/src/messages/MMonObserve.h b/src/messages/MMonObserve.h deleted file mode 100644 index 1228129c16fe..000000000000 --- a/src/messages/MMonObserve.h +++ /dev/null @@ -1,57 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MMONOBSERVE_H -#define CEPH_MMONOBSERVE_H - -#include "msg/Message.h" - -#include -using std::vector; - -class MMonObserve : public PaxosServiceMessage { - public: - uuid_d fsid; - uint32_t machine_id; - version_t ver; - - MMonObserve() : PaxosServiceMessage(MSG_MON_OBSERVE, 0) {} - MMonObserve(uuid_d &f, int mid, version_t v) : - PaxosServiceMessage(MSG_MON_OBSERVE, v), - fsid(f), machine_id(mid), ver(v) { } -private: - ~MMonObserve() {} - -public: - const char *get_type_name() const { return "mon_observe"; } - void print(ostream& o) const { - o << "observe(" << machine_id << " v" << ver << ")"; - } - - void encode_payload(uint64_t features) { - paxos_encode(); - ::encode(fsid, payload); - ::encode(machine_id, payload); - ::encode(ver, payload); - } - void decode_payload() { - bufferlist::iterator p = payload.begin(); - paxos_decode(p); - ::decode(fsid, p); - ::decode(machine_id, p); - ::decode(ver, p); - } -}; - -#endif diff --git a/src/messages/MMonObserveNotify.h b/src/messages/MMonObserveNotify.h deleted file mode 100644 index 794cf141d7b6..000000000000 --- a/src/messages/MMonObserveNotify.h +++ /dev/null @@ -1,62 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MMONOBSERVENOTIFY_H -#define CEPH_MMONOBSERVENOTIFY_H - -#include "msg/Message.h" - -class MMonObserveNotify : public PaxosServiceMessage { - public: - uuid_d fsid; - int32_t machine_id; - bufferlist bl; - version_t ver; - bool is_latest; - - MMonObserveNotify() : PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, 0) {} - MMonObserveNotify(uuid_d& f, int id, bufferlist& b, version_t v, bool l) : - PaxosServiceMessage(MSG_MON_OBSERVE_NOTIFY, v), fsid(f), machine_id(id), bl(b), ver(v), is_latest(l) {} -private: - ~MMonObserveNotify() {} - -public: - const char *get_type_name() const { return "mon_observe_notify"; } - void print(ostream& o) const { - o << "mon_observe_notify(v" << ver << " " << bl.length() << " bytes"; - if (is_latest) - o << " latest"; - o << " v" << version << ")"; - } - - void encode_payload(uint64_t features) { - paxos_encode(); - ::encode(fsid, payload); - ::encode(machine_id, payload); - ::encode(bl, payload); - ::encode(ver, payload); - ::encode(is_latest, payload); - } - void decode_payload() { - bufferlist::iterator p = payload.begin(); - paxos_decode(p); - ::decode(fsid, p); - ::decode(machine_id, p); - ::decode(bl, p); - ::decode(ver, p); - ::decode(is_latest, p); - } -}; - -#endif diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index a15882816f7b..a967a88d2351 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -28,8 +28,6 @@ #include "messages/MGenericMessage.h" #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" -#include "messages/MMonObserve.h" -#include "messages/MMonObserveNotify.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" #include "messages/MMonPaxos.h" @@ -1328,25 +1326,6 @@ void Monitor::remove_session(MonSession *s) } -void Monitor::handle_observe(MMonObserve *m) -{ - dout(10) << "handle_observe " << *m << " from " << m->get_source_inst() << dendl; - // check that there are perms. Send a response back if they aren't sufficient, - // and delete the message (if it's not deleted for us, which happens when - // we own the connection to the requested observer). - MonSession *session = m->get_session(); - if (!session || !session->caps.check_privileges(PAXOS_MONMAP, MON_CAP_X)) { - send_reply(m, m); - return; - } - if (m->machine_id >= PAXOS_NUM) { - dout(0) << "register_observer: bad monitor id: " << m->machine_id << dendl; - } else { - paxos[m->machine_id]->register_observer(m->get_orig_source_inst(), m->ver); - } - messenger->send_message(m, m->get_orig_source_inst()); -} - void Monitor::send_command(const entity_inst_t& inst, const vector& com, version_t version) { @@ -1563,10 +1542,6 @@ bool Monitor::_ms_dispatch(Message *m) } break; - case MSG_MON_OBSERVE: - handle_observe((MMonObserve *)m); - break; - // elector messages case MSG_MON_ELECTION: //check privileges here for simplicity diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index f5d77406ed76..13407fea559e 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -85,7 +85,6 @@ class AdminSocketHook; class MMonGetMap; class MMonGetVersion; class MMonProbe; -class MMonObserve; class MMonSubscribe; class MAuthRotating; class MRoute; @@ -266,7 +265,6 @@ public: void _mon_status(ostream& ss); void _quorum_status(ostream& ss); void handle_command(class MMonCommand *m); - void handle_observe(MMonObserve *m); void handle_route(MRoute *m); void reply_command(MMonCommand *m, int rc, const string &rs, version_t version); diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 501476d102a4..6014c4564548 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -17,7 +17,6 @@ #include "MonitorStore.h" #include "messages/MMonPaxos.h" -#include "messages/MMonObserveNotify.h" #include "common/config.h" @@ -381,7 +380,6 @@ void Paxos::begin(bufferlist& v) finish_contexts(g_ceph_context, waiting_for_commit); finish_contexts(g_ceph_context, waiting_for_readable); finish_contexts(g_ceph_context, waiting_for_writeable); - update_observers(); return; } @@ -891,72 +889,8 @@ void Paxos::dispatch(PaxosServiceMessage *m) default: assert(0); } - - if (is_readable()) - update_observers(); } -void Paxos::register_observer(entity_inst_t inst, version_t v) -{ - dout(10) << "register_observer " << inst << " v" << v << dendl; - - Observer *observer; - if (observers.count(inst)) - observer = observers[inst]; - else { - observers[inst] = observer = new Observer(inst, v); - } - - utime_t timeout = ceph_clock_now(g_ceph_context); - timeout += g_conf->paxos_observer_timeout; - observer->timeout = timeout; - - if (is_readable()) - update_observers(); -} - - -void Paxos::update_observers() -{ - dout(10) << "update_observers" << dendl; - - bufferlist bl; - version_t ver; - - map::iterator iter = observers.begin(); - while (iter != observers.end()) { - Observer *observer = iter->second; - - // timed out? - if (ceph_clock_now(g_ceph_context) > observer->timeout) { - delete observer; - observers.erase(iter++); - continue; - } - ++iter; - - if (observer->last_version == 0 || - observer->last_version < first_committed) { - ver = get_stashed(bl); - if (ver) { - dout(10) << " sending summary state v" << ver << " to " << observer->inst << dendl; - mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, true), - observer->inst); - observer->last_version = ver; - continue; - } - } - - for (ver = observer->last_version + 1; ver <= last_committed; ver++) { - if (read(ver, bl)) { - dout(10) << " sending state v" << ver << " to " << observer->inst << dendl; - mon->messenger->send_message(new MMonObserveNotify(mon->monmap->fsid, machine_id, bl, ver, false), - observer->inst); - observer->last_version = ver; - } - } - } -} // ----------------- // service interface diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 79b91916627a..8558fc9502fe 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -75,7 +75,6 @@ class Paxos { friend class Monitor; friend class PaxosService; - friend class PaxosObserver; list extra_state_dirs; @@ -145,15 +144,6 @@ private: list waiting_for_writeable; list waiting_for_commit; - // observers - struct Observer { - entity_inst_t inst; - version_t last_version; - utime_t timeout; - Observer(entity_inst_t& ei, version_t v) : inst(ei), last_version(v) { } - }; - map observers; - //synchronization warnings utime_t last_clock_drift_warn; int clock_drift_warned; @@ -320,9 +310,6 @@ public: version_t get_stashed_version() { return latest_stashed; } version_t get_first_committed() { return first_committed; } - - void register_observer(entity_inst_t inst, version_t v); - void update_observers(); }; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index a6bd1f2976bf..0685f038e978 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -31,8 +31,6 @@ using namespace std; #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" #include "messages/MMonPaxos.h" -#include "messages/MMonObserve.h" -#include "messages/MMonObserveNotify.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" @@ -298,12 +296,6 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot m = new MMonElection; break; - case MSG_MON_OBSERVE: - m = new MMonObserve; - break; - case MSG_MON_OBSERVE_NOTIFY: - m = new MMonObserveNotify; - break; case MSG_LOG: m = new MLog; break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 44cc0fea7a50..8244cccb2199 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -30,8 +30,8 @@ #define MSG_MON_COMMAND_ACK 51 #define MSG_LOG 52 #define MSG_LOGACK 53 -#define MSG_MON_OBSERVE 54 -#define MSG_MON_OBSERVE_NOTIFY 55 +//#define MSG_MON_OBSERVE 54 +//#define MSG_MON_OBSERVE_NOTIFY 55 #define MSG_CLASS 56 #define MSG_CLASS_ACK 57 diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index 45fe618faf18..9dec79cbc4c0 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -250,10 +250,6 @@ MESSAGE(MMonGlobalID) MESSAGE(MMonJoin) #include "messages/MMonMap.h" MESSAGE(MMonMap) -#include "messages/MMonObserve.h" -MESSAGE(MMonObserve) -#include "messages/MMonObserveNotify.h" -MESSAGE(MMonObserveNotify) #include "messages/MMonPaxos.h" MESSAGE(MMonPaxos) #include "messages/MMonProbe.h" diff --git a/src/tools/common.cc b/src/tools/common.cc index bb2ab399bdad..367944d994d1 100644 --- a/src/tools/common.cc +++ b/src/tools/common.cc @@ -80,8 +80,6 @@ OSDMap *osdmap = 0; #include "mon/mon_types.h" -#include "messages/MMonObserve.h" -#include "messages/MMonObserveNotify.h" #include "messages/MOSDMap.h" #include "messages/MCommand.h" @@ -525,9 +523,6 @@ bool Admin::ms_dispatch(Message *m) { void Admin::ms_handle_connect(Connection *con) { if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { ctx->lock.Lock(); - if (ceph_tool_mode != CEPH_TOOL_MODE_CLI_INPUT) { -// send_observe_requests(ctx); - } if (pending_cmd.size()) send_command(ctx); ctx->lock.Unlock();