--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#include "common/dout.h"
+#include "messages/MMDSBeacon.h"
+#include "mon/MonClient.h"
+
+#include "Beacon.h"
+
+#define dout_subsys ceph_subsys_mds
+#undef dout_prefix
+#define dout_prefix *_dout << "mds.beacon." << name << ' '
+
+
+Beacon::Beacon(MonClient *monc_, std::string name_) :
+ lock("Beacon"), monc(monc_), timer(g_ceph_context, lock), name(name_)
+{
+ last_seq = 0;
+ sender = NULL;
+ was_laggy = false;
+
+ standby_for_rank = MDSMap::MDS_NO_STANDBY_PREF;
+ epoch = 0;
+}
+
+
+Beacon::~Beacon()
+{
+}
+
+
+void Beacon::init(MDSMap const *mdsmap, MDSMap::DaemonState want_state_, int standby_rank_, std::string const & standby_name_)
+{
+ Mutex::Locker l(lock);
+ assert(mdsmap != NULL);
+
+ // Initialize copies of MDS state
+ want_state = want_state_;
+ _notify_mdsmap(mdsmap);
+ standby_for_rank = standby_rank_;
+ standby_for_name = standby_name_;
+
+ // Spawn threads and start messaging
+ timer.init();
+ _send();
+}
+
+
+void Beacon::shutdown()
+{
+ Mutex::Locker l(lock);
+ if (sender) {
+ timer.cancel_event(sender);
+ sender = NULL;
+ }
+ timer.shutdown();
+}
+
+
+/**
+ * Update lagginess state based on response from remote MDSMonitor
+ *
+ * This function puts the passed message before returning
+ */
+void Beacon::handle_mds_beacon(MMDSBeacon *m)
+{
+ Mutex::Locker l(lock);
+ assert(m != NULL);
+
+ version_t seq = m->get_seq();
+
+ // update lab
+ if (seq_stamp.count(seq)) {
+ assert(seq_stamp[seq] > last_acked_stamp);
+ last_acked_stamp = seq_stamp[seq];
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t rtt = now - last_acked_stamp;
+
+ dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
+ << " seq " << m->get_seq()
+ << " rtt " << rtt << dendl;
+
+ if (was_laggy && rtt < g_conf->mds_beacon_grace) {
+ dout(0) << "handle_mds_beacon no longer laggy" << dendl;
+ was_laggy = false;
+ laggy_until = now;
+ }
+
+ // clean up seq_stamp map
+ while (!seq_stamp.empty() &&
+ seq_stamp.begin()->first <= seq)
+ seq_stamp.erase(seq_stamp.begin());
+ } else {
+ dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
+ << " seq " << m->get_seq() << " dne" << dendl;
+ }
+
+ m->put();
+}
+
+
+void Beacon::send()
+{
+ Mutex::Locker l(lock);
+ _send();
+}
+
+
+/**
+ * Call periodically, or when you have updated the desired state
+ */
+void Beacon::_send()
+{
+ if (sender) {
+ timer.cancel_event(sender);
+ }
+ sender = new C_MDS_BeaconSender(this);
+ timer.add_event_after(g_conf->mds_beacon_interval, sender);
+
+ ++last_seq;
+ dout(10) << __func__ << " " << ceph_mds_state_name(want_state)
+ << " seq " << last_seq
+ << dendl;
+
+ seq_stamp[last_seq] = ceph_clock_now(g_ceph_context);
+
+ MMDSBeacon *beacon = new MMDSBeacon(
+ monc->get_fsid(), monc->get_global_id(),
+ name,
+ epoch,
+ want_state,
+ last_seq);
+
+ beacon->set_standby_for_rank(standby_for_rank);
+ beacon->set_standby_for_name(standby_for_name);
+
+ // include _my_ feature set
+ beacon->set_compat(compat);
+
+ monc->send_mon_message(beacon);
+}
+
+/**
+ * Call this when there is a new MDSMap available
+ */
+void Beacon::notify_mdsmap(MDSMap const *mdsmap)
+{
+ Mutex::Locker l(lock);
+ assert(mdsmap != NULL);
+
+ _notify_mdsmap(mdsmap);
+}
+
+void Beacon::_notify_mdsmap(MDSMap const *mdsmap)
+{
+ assert(mdsmap != NULL);
+
+ epoch = mdsmap->get_epoch();
+ compat = get_mdsmap_compat_set_default();
+ compat.merge(mdsmap->compat);
+}
+
+
+bool Beacon::is_laggy()
+{
+ Mutex::Locker l(lock);
+
+ if (last_acked_stamp == utime_t())
+ return false;
+
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t since = now - last_acked_stamp;
+ if (since > g_conf->mds_beacon_grace) {
+ dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
+ << " since last acked beacon" << dendl;
+ was_laggy = true;
+ if (since > (g_conf->mds_beacon_grace*2)) {
+ // maybe it's not us?
+ dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
+ << dendl;
+ monc->reopen_session();
+ }
+ return true;
+ }
+ return false;
+}
+
+utime_t Beacon::get_laggy_until() const
+{
+ Mutex::Locker l(lock);
+
+ return laggy_until;
+}
+
+void Beacon::notify_want_state(MDSMap::DaemonState const newstate)
+{
+ Mutex::Locker l(lock);
+
+ want_state = newstate;
+}
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 Red Hat
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef BEACON_STATE_H
+#define BEACON_STATE_H
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "common/Mutex.h"
+
+class MonClient;
+class MMDSBeacon;
+
+
+/**
+ * One of these per MDS. Handle beacon logic in this separate class so
+ * that a busy MDS holding its own lock does not hold up sending beacon
+ * messages to the mon and cause false lagginess.
+ *
+ * So that we can continue to operate while the MDS is holding its own lock,
+ * we keep copies of the data needed to generate beacon messages. The MDS is
+ * responsible for calling Beacon::notify_* when things change.
+ */
+class Beacon
+{
+ mutable Mutex lock;
+ MonClient* monc;
+ SafeTimer timer;
+
+ // Items we duplicate from the MDS to have access under our own lock
+ std::string name;
+ version_t epoch;
+ CompatSet compat;
+ int standby_for_rank;
+ std::string standby_for_name;
+ MDSMap::DaemonState want_state;
+
+ // Internal beacon state
+ version_t last_send;
+ version_t last_seq; // last seq sent to monitor
+ std::map<version_t,utime_t> seq_stamp; // seq # -> time sent
+ utime_t last_acked_stamp; // last time we sent a beacon that got acked
+ bool was_laggy;
+ utime_t laggy_until;
+
+ // Ticker
+ class C_MDS_BeaconSender : public Context {
+ Beacon *beacon;
+ public:
+ C_MDS_BeaconSender(Beacon *beacon_) : beacon(beacon_) {}
+ void finish(int r) {
+ assert(beacon->lock.is_locked_by_me());
+ beacon->sender = NULL;
+ beacon->_send();
+ }
+ } *sender;
+
+ void _notify_mdsmap(MDSMap const *mdsmap);
+ void _send();
+
+public:
+ Beacon(MonClient *monc_, std::string name);
+ ~Beacon();
+
+ void init(MDSMap const *mdsmap, MDSMap::DaemonState want_state_, int standby_rank_, std::string const &standby_name_);
+ void shutdown();
+
+ void notify_mdsmap(MDSMap const *mdsmap);
+ void notify_want_state(MDSMap::DaemonState const newstate);
+
+ void set_standby_for(int rank_, std::string const &name_);
+
+ void handle_mds_beacon(MMDSBeacon *m);
+ void send();
+
+ bool is_laggy();
+ utime_t get_laggy_until() const;
+};
+
+#endif // BEACON_STATE_H
+
#define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
-
// cons/des
MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
mds_lock("MDS::mds_lock"),
timer(m->cct, mds_lock),
+ beacon(mc, n),
authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(m->cct,
m->cct->_conf->auth_supported.length() ?
m->cct->_conf->auth_supported :
// clients
last_client_mdsmap_bcast = 0;
- // beacon
- beacon_last_seq = 0;
- beacon_sender = 0;
- was_laggy = false;
-
// tick
tick_event = 0;
} else if (standby_type == MDSMap::STATE_NULL && !standby_for_name.empty())
standby_for_rank = MDSMap::MDS_MATCHED_ACTIVE;
- beacon_start();
+ beacon.init(mdsmap, want_state, standby_for_rank, standby_for_name);
whoami = -1;
messenger->set_myname(entity_name_t::MDS(whoami));
// reschedule
reset_tick();
- if (is_laggy()) {
+ if (beacon.is_laggy()) {
dout(5) << "tick bailing out since we seem laggy" << dendl;
return;
}
}
-// -----------------------
-// beacons
-
-void MDS::beacon_start()
-{
- beacon_send(); // send first beacon
-}
-
-
-
-void MDS::beacon_send()
-{
- ++beacon_last_seq;
- dout(10) << "beacon_send " << ceph_mds_state_name(want_state)
- << " seq " << beacon_last_seq
- << " (currently " << ceph_mds_state_name(state) << ")"
- << dendl;
-
- beacon_seq_stamp[beacon_last_seq] = ceph_clock_now(g_ceph_context);
-
- MMDSBeacon *beacon = new MMDSBeacon(monc->get_fsid(), monc->get_global_id(), name, mdsmap->get_epoch(),
- want_state, beacon_last_seq);
- beacon->set_standby_for_rank(standby_for_rank);
- beacon->set_standby_for_name(standby_for_name);
-
- // include _my_ feature set
- CompatSet mdsmap_compat(get_mdsmap_compat_set_default());
- mdsmap_compat.merge(mdsmap->compat);
- beacon->set_compat(mdsmap_compat);
-
- monc->send_mon_message(beacon);
-
- // schedule next sender
- if (beacon_sender) timer.cancel_event(beacon_sender);
- beacon_sender = new C_MDS_BeaconSender(this);
- timer.add_event_after(g_conf->mds_beacon_interval, beacon_sender);
-}
-
-
-bool MDS::is_laggy()
-{
- if (beacon_last_acked_stamp == utime_t())
- return false;
-
- utime_t now = ceph_clock_now(g_ceph_context);
- utime_t since = now - beacon_last_acked_stamp;
- if (since > g_conf->mds_beacon_grace) {
- dout(5) << "is_laggy " << since << " > " << g_conf->mds_beacon_grace
- << " since last acked beacon" << dendl;
- was_laggy = true;
- if (since > (g_conf->mds_beacon_grace*2)) {
- // maybe it's not us?
- dout(5) << "initiating monitor reconnect; maybe we're not the slow one"
- << dendl;
- monc->reopen_session();
- }
- return true;
- }
- return false;
-}
-
-
-/* This fuction puts the passed message before returning */
-void MDS::handle_mds_beacon(MMDSBeacon *m)
-{
- version_t seq = m->get_seq();
-
- // update lab
- if (beacon_seq_stamp.count(seq)) {
- assert(beacon_seq_stamp[seq] > beacon_last_acked_stamp);
- beacon_last_acked_stamp = beacon_seq_stamp[seq];
- utime_t now = ceph_clock_now(g_ceph_context);
- utime_t rtt = now - beacon_last_acked_stamp;
-
- dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
- << " seq " << m->get_seq()
- << " rtt " << rtt << dendl;
-
- if (was_laggy && rtt < g_conf->mds_beacon_grace) {
- dout(0) << "handle_mds_beacon no longer laggy" << dendl;
- was_laggy = false;
- laggy_until = now;
- }
-
- // clean up seq_stamp map
- while (!beacon_seq_stamp.empty() &&
- beacon_seq_stamp.begin()->first <= seq)
- beacon_seq_stamp.erase(beacon_seq_stamp.begin());
- } else {
- dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
- << " seq " << m->get_seq() << " dne" << dendl;
- }
-
- m->put();
-}
-
/* This function DOES put the passed message before returning*/
void MDS::handle_command(MMonCommand *m)
{
last_state = oldstate;
if (state == MDSMap::STATE_STANDBY) {
- want_state = state = MDSMap::STATE_STANDBY;
+ state = MDSMap::STATE_STANDBY;
+ set_want_state(state);
dout(1) << "handle_mds_map standby" << dendl;
if (standby_type) // we want to be in standby_replay or oneshot_replay!
goto out;
} else if (state == MDSMap::STATE_STANDBY_REPLAY) {
if (standby_type != MDSMap::STATE_NULL && standby_type != MDSMap::STATE_STANDBY_REPLAY) {
- want_state = standby_type;
- beacon_send();
+ set_want_state(standby_type);
+ beacon.send();
state = oldstate;
goto out;
}
} else {
if (want_state == MDSMap::STATE_STANDBY) {
dout(10) << "dropped out of mdsmap, try to re-add myself" << dendl;
- want_state = state = MDSMap::STATE_BOOT;
+ state = MDSMap::STATE_BOOT;
+ set_want_state(state);
goto out;
}
if (want_state == MDSMap::STATE_BOOT) {
dout(1) << "handle_mds_map state change "
<< ceph_mds_state_name(oldstate) << " --> "
<< ceph_mds_state_name(state) << dendl;
- want_state = state;
+ set_want_state(state);
if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
}
out:
+ beacon.notify_mdsmap(mdsmap);
+
m->put();
delete oldmap;
}
void MDS::request_state(MDSMap::DaemonState s)
{
dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
- want_state = s;
- beacon_send();
+ set_want_state(s);
+ beacon.send();
}
void MDS::suicide()
{
assert(mds_lock.is_locked());
- want_state = MDSMap::STATE_DNE; // whatever.
+ set_want_state(MDSMap::STATE_DNE); // whatever.
dout(1) << "suicide. wanted " << ceph_mds_state_name(want_state)
<< ", now " << ceph_mds_state_name(state) << dendl;
finisher.stop(); // no flushing
// stop timers
- if (beacon_sender) {
- timer.cancel_event(beacon_sender);
- beacon_sender = 0;
- }
+ beacon.shutdown();
if (tick_event) {
timer.cancel_event(tick_event);
tick_event = 0;
break;
case MSG_MDS_BEACON:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
- handle_mds_beacon(static_cast<MMDSBeacon*>(m));
+ beacon.handle_mds_beacon(static_cast<MMDSBeacon*>(m));
break;
// misc
// core
if (!handle_core_message(m)) {
- if (is_laggy()) {
+ if (beacon.is_laggy()) {
dout(10) << " laggy, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
} else {
dout(1) << " " << this->mds_lock.is_locked_by_me() << dendl;
ls.front()->complete(0);
ls.pop_front();
-
- // give other threads (beacon!) a chance
- mds_lock.Unlock();
- mds_lock.Lock();
}
}
while (!waiting_for_nolaggy.empty()) {
// stop if we're laggy now!
- if (is_laggy())
+ if (beacon.is_laggy())
return true;
Message *old = waiting_for_nolaggy.front();
dout(7) << " processing laggy deferred " << *old << dendl;
handle_deferrable_message(old);
}
-
- // give other threads (beacon!) a chance
- mds_lock.Unlock();
- mds_lock.Lock();
}
// done with all client replayed requests?
#include "MDSMap.h"
#include "SessionMap.h"
+#include "Beacon.h"
#define CEPH_MDS_PROTOCOL 24 /* cluster internal */
Mutex mds_lock;
SafeTimer timer;
+ private:
+ Beacon beacon;
+ void set_want_state(MDSMap::DaemonState newstate)
+ {
+ want_state = newstate;
+ beacon.notify_want_state(newstate);
+ }
+ public:
+ utime_t get_laggy_until() {return beacon.get_laggy_until();}
+
AuthAuthorizeHandlerRegistry *authorize_handler_cluster_registry;
AuthAuthorizeHandlerRegistry *authorize_handler_service_registry;
replay_queue.push_back(c);
}
- int get_state() { return state; }
- int get_want_state() { return want_state; }
+ MDSMap::DaemonState get_state() { return state; }
+ MDSMap::DaemonState get_want_state() { return want_state; }
bool is_creating() { return state == MDSMap::STATE_CREATING; }
bool is_starting() { return state == MDSMap::STATE_STARTING; }
bool is_standby() { return state == MDSMap::STATE_STANDBY; }
return true;
}
- // -- keepalive beacon --
- version_t beacon_last_seq; // last seq sent to monitor
- map<version_t,utime_t> beacon_seq_stamp; // seq # -> time sent
- utime_t beacon_last_acked_stamp; // last time we sent a beacon that got acked
- bool was_laggy;
- utime_t laggy_until;
-
- bool is_laggy();
- utime_t get_laggy_until() { return laggy_until; }
-
- class C_MDS_BeaconSender : public MDSInternalContext {
- public:
- C_MDS_BeaconSender(MDS *m) : MDSInternalContext(m) {}
- void finish(int r) {
- mds->beacon_sender = 0;
- mds->beacon_send();
- }
- } *beacon_sender;
-
// tick and other timer fun
class C_MDS_Tick : public MDSInternalContext {
public:
void tick();
- void beacon_start();
- void beacon_send();
- void handle_mds_beacon(MMDSBeacon *m);
void inc_dispatch_depth() { ++dispatch_depth; }
void dec_dispatch_depth() { --dispatch_depth; }
libmds_la_SOURCES = \
mds/Capability.cc \
mds/MDS.cc \
+ mds/Beacon.cc \
mds/flock.cc \
mds/locks.c \
mds/journal.cc \
mds/RecoveryQueue.h \
mds/MDLog.h \
mds/MDS.h \
+ mds/Beacon.h \
mds/MDSContext.h \
mds/MDSMap.h \
mds/MDSTable.h \
void Server::find_idle_sessions()
{
- dout(10) << "find_idle_sessions. laggy until " << mds->laggy_until << dendl;
+ dout(10) << "find_idle_sessions. laggy until " << mds->get_laggy_until() << dendl;
// timeout/stale
// (caps go stale, lease die)
cutoff -= g_conf->mds_session_autoclose;
// don't kick clients if we've been laggy
- if (mds->laggy_until > cutoff) {
- dout(10) << " laggy_until " << mds->laggy_until << " > cutoff " << cutoff
+ if (mds->get_laggy_until() > cutoff) {
+ dout(10) << " laggy_until " << mds->get_laggy_until() << " > cutoff " << cutoff
<< ", not kicking any clients to be safe" << dendl;
return;
}