mon/AuthMonitor.cc \
mon/Elector.cc \
mon/MonitorStore.cc \
- os/LevelDBStore.cc
+ os/LevelDBStore.cc \
+ mon/HealthMonitor.cc \
+ mon/DataHealthService.cc
libmon_a_CXXFLAGS= ${AM_CXXFLAGS}
noinst_LIBRARIES += libmon.a
messages/MWatchNotify.h\
messages/PaxosServiceMessage.h\
mon/AuthMonitor.h\
+ mon/DataHealthMonitor.h\
mon/Elector.h\
mon/LogMonitor.h\
+ mon/HealthMonitor.h\
mon/MDSMonitor.h\
mon/MonmapMonitor.h\
mon/MonCaps.h\
mon/MonClient.h\
mon/MonMap.h\
mon/Monitor.h\
+ mon/MonitorHealthService.h\
mon/MonitorStore.h\
mon/MonitorDBStore.h\
mon/OSDMonitor.h\
OPTION(mon_client_bytes, OPT_U64, 100ul << 20) // client msg data allowed in memory (in bytes)
OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20) // mds, osd message memory cap (in bytes)
OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
+OPTION(mon_health_data_update_interval, OPT_FLOAT, 60.0)
+OPTION(mon_data_avail_crit, OPT_INT, 5)
+OPTION(mon_data_avail_warn, OPT_INT, 30)
OPTION(mon_sync_trim_timeout, OPT_DOUBLE, 30.0)
OPTION(mon_sync_heartbeat_timeout, OPT_DOUBLE, 30.0)
OPTION(mon_sync_heartbeat_interval, OPT_DOUBLE, 5.0)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2012 Inktank, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MMON_HEALTH_H
+#define CEPH_MMON_HEALTH_H
+
+#include "msg/Message.h"
+#include "messages/MMonQuorumService.h"
+#include "mon/mon_types.h"
+
+struct MMonHealth : public MMonQuorumService
+{
+ static const int HEAD_VERSION = 1;
+
+ enum {
+ OP_TELL = 1,
+ };
+
+ static const uint32_t FLAG_DATA = 0x01;
+
+ int service_type;
+ int service_op;
+ uint32_t flags;
+
+ // service specific data
+ DataStats data_stats;
+
+ MMonHealth() : MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION) { }
+ MMonHealth(uint32_t type) :
+ MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION),
+ service_type(type)
+ { }
+ MMonHealth(uint32_t type, int op) :
+ MMonQuorumService(MSG_MON_HEALTH, HEAD_VERSION),
+ service_type(type),
+ service_op(op)
+ { }
+
+private:
+ ~MMonHealth() { }
+
+public:
+ const char *get_type_name() const { return "mon_health"; }
+ const char *get_service_op_name() const {
+ switch (service_op) {
+ case OP_TELL: return "tell";
+ }
+ return "???";
+ }
+ void print(ostream &o) const {
+ o << "mon_health( service " << get_service_type()
+ << " op " << get_service_op_name()
+ << " e " << get_epoch() << " r " << get_round()
+ << " flags";
+ if (!flags) {
+ o << " none";
+ } else {
+ if (has_flag(FLAG_DATA)) {
+ o << " data";
+ }
+ }
+ o << " )";
+ }
+
+ int get_service_type() const {
+ return service_type;
+ }
+
+ int get_service_op() {
+ return service_op;
+ }
+
+ void set_flag(uint32_t f) {
+ flags |= f;
+ }
+
+ bool has_flag(uint32_t f) const {
+ return (flags & f);
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ service_decode(p);
+ ::decode(service_type, p);
+ ::decode(service_op, p);
+ ::decode(data_stats, p);
+ }
+
+ void encode_payload(uint64_t features) {
+ service_encode();
+ ::encode(service_type, payload);
+ ::encode(service_op, payload);
+ ::encode(data_stats, payload);
+ }
+
+};
+
+#endif /* CEPH_MMON_HEALTH_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include <memory>
+#include <tr1/memory>
+#include <errno.h>
+#include <map>
+#include <list>
+#include <string>
+#include <sstream>
+#include <sys/vfs.h>
+
+#include "messages/MMonHealth.h"
+#include "include/types.h"
+#include "include/Context.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/DataHealthService.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, this)
+static ostream& _prefix(std::ostream *_dout, const Monitor *mon,
+ const DataHealthService *svc) {
+ assert(mon != NULL);
+ assert(svc != NULL);
+ return *_dout << "mon." << mon->name << "@" << mon->rank
+ << "(" << mon->get_state_name() << ")." << svc->get_name()
+ << "(" << svc->get_epoch() << ") ";
+}
+
+void DataHealthService::start_epoch()
+{
+ dout(10) << __func__ << " epoch " << get_epoch() << dendl;
+ // we are not bound by election epochs, but we should clear the stats
+ // everytime an election is triggerd. As far as we know, a monitor might
+ // have been running out of disk space and someone fixed it. We don't want
+ // to hold the cluster back, even confusing the user, due to some possibly
+ // outdated stats.
+ stats.clear();
+}
+
+void DataHealthService::get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail)
+{
+ dout(10) << __func__ << dendl;
+ assert(f != NULL);
+
+ f->open_object_section("data_health");
+ f->open_array_section("mons");
+
+ for (map<entity_inst_t,DataStats>::iterator it = stats.begin();
+ it != stats.end(); ++it) {
+ string mon_name = mon->monmap->get_name(it->first.addr);
+ DataStats& stats = it->second;
+
+ health_status_t health_status = HEALTH_OK;
+ string health_detail;
+ if (stats.latest_avail_percent <= g_conf->mon_data_avail_crit) {
+ health_status = HEALTH_ERR;
+ health_detail = "shutdown iminent!";
+ } else if (stats.latest_avail_percent <= g_conf->mon_data_avail_warn) {
+ health_status = HEALTH_WARN;
+ health_detail = "low disk space!";
+ }
+
+ if (detail && health_status != HEALTH_OK) {
+ stringstream ss;
+ ss << "mon." << mon_name << " addr " << it->first.addr
+ << " has " << stats.latest_avail_percent
+ << "\% avail disk space -- " << health_detail;
+ detail->push_back(make_pair(health_status, ss.str()));
+ }
+
+ f->open_object_section(mon_name.c_str());
+ f->dump_string("name", mon_name.c_str());
+ f->dump_int("kb_total", stats.kb_total);
+ f->dump_int("kb_used", stats.kb_used);
+ f->dump_int("kb_avail", stats.kb_avail);
+ f->dump_int("avail_percent", stats.latest_avail_percent);
+ f->dump_stream("last_updated") << stats.last_update;
+ f->dump_stream("health") << health_status;
+ if (health_status != HEALTH_OK)
+ f->dump_string("health_detail", health_detail);
+ f->close_section();
+ }
+
+ f->close_section(); // mons
+ f->close_section(); // data_health
+}
+
+void DataHealthService::update_stats()
+{
+ struct statfs stbuf;
+ int err = ::statfs(g_conf->mon_data.c_str(), &stbuf);
+ if (err < 0) {
+ assert(errno != EIO);
+ mon->clog.error() << __func__ << " statfs error: " << cpp_strerror(errno) << "\n";
+ return;
+ }
+
+ entity_inst_t our_inst = mon->monmap->get_inst(mon->name);
+ DataStats& ours = stats[our_inst];
+
+ ours.kb_total = stbuf.f_blocks * stbuf.f_bsize / 1024;
+ ours.kb_used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize / 1024;
+ ours.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024;
+ ours.latest_avail_percent = (((float)ours.kb_avail/ours.kb_total)*100);
+ dout(0) << __func__ << " avail " << ours.latest_avail_percent << "%"
+ << " total " << ours.kb_total << " used " << ours.kb_used << " avail " << ours.kb_avail
+ << dendl;
+ ours.last_update = ceph_clock_now(g_ceph_context);
+}
+
+void DataHealthService::share_stats()
+{
+ dout(10) << __func__ << dendl;
+ if (!in_quorum())
+ return;
+
+ assert(!stats.empty());
+ entity_inst_t our_inst = mon->monmap->get_inst(mon->rank);
+ assert(stats.count(our_inst) > 0);
+ DataStats &ours = stats[our_inst];
+ const set<int>& quorum = mon->get_quorum();
+ for (set<int>::const_iterator it = quorum.begin();
+ it != quorum.end(); ++it) {
+ if (mon->monmap->get_name(*it) == mon->name)
+ continue;
+ entity_inst_t inst = mon->monmap->get_inst(*it);
+ MMonHealth *m = new MMonHealth(HealthService::SERVICE_HEALTH_DATA,
+ MMonHealth::OP_TELL);
+ m->data_stats = ours;
+ dout(20) << __func__ << " send " << *m << " to " << inst << dendl;
+ mon->messenger->send_message(m, inst);
+ }
+}
+
+void DataHealthService::service_tick()
+{
+ dout(10) << __func__ << dendl;
+
+ update_stats();
+ if (in_quorum())
+ share_stats();
+
+ DataStats &ours = stats[mon->monmap->get_inst(mon->name)];
+
+ if (ours.latest_avail_percent <= g_conf->mon_data_avail_crit) {
+ mon->clog.error()
+ << "reached critical levels of available space on data store"
+ << " -- shutdown!\n";
+ force_shutdown();
+ return;
+ }
+
+ // we must backoff these warnings, and track how much data is being
+ // consumed in-between reports to assess if it's worth to log this info,
+ // otherwise we may very well contribute to the consumption of the
+ // already low available disk space.
+ if (ours.latest_avail_percent <= g_conf->mon_data_avail_warn) {
+ mon->clog.warn()
+ << "reached concerning levels of available space on data store"
+ << " (" << ours.latest_avail_percent << "\% free)\n";
+ }
+}
+
+void DataHealthService::handle_tell(MMonHealth *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ assert(m->get_service_op() == MMonHealth::OP_TELL);
+
+ stats[m->get_source_inst()] = m->data_stats;
+}
+
+bool DataHealthService::service_dispatch(MMonHealth *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ assert(m->get_service_type() == get_type());
+ if (!in_quorum()) {
+ dout(1) << __func__ << " not in quorum -- drop message" << dendl;
+ m->put();
+ return false;
+ }
+
+ switch (m->service_op) {
+ case MMonHealth::OP_TELL:
+ // someone is telling us their stats
+ handle_tell(m);
+ break;
+ default:
+ dout(0) << __func__ << " unknown op " << m->service_op << dendl;
+ assert(0 == "Unknown service op");
+ break;
+ }
+ m->put();
+ return true;
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MON_DATA_HEALTH_SERVICE_H
+#define CEPH_MON_DATA_HEALTH_SERVICE_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+#include <errno.h>
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "mon/mon_types.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+#include "common/Formatter.h"
+#include "common/config.h"
+#include "global/signal_handler.h"
+
+class MMonHealth;
+
+class DataHealthService :
+ public HealthService
+{
+ map<entity_inst_t,DataStats> stats;
+ void handle_tell(MMonHealth *m);
+ void update_stats();
+ void share_stats();
+
+ void force_shutdown() {
+ generic_dout(0) << "** Shutdown via Data Health Service **" << dendl;
+ queue_async_signal(SIGINT);
+ }
+
+protected:
+ virtual void service_tick();
+ virtual bool service_dispatch(Message *m) {
+ assert(0 == "We should never reach this; only the function below");
+ return false;
+ }
+ virtual bool service_dispatch(MMonHealth *m);
+ virtual void service_shutdown() { }
+
+ virtual void start_epoch();
+ virtual void finish_epoch() { }
+ virtual void cleanup() { }
+
+public:
+ DataHealthService(Monitor *m) :
+ HealthService(m)
+ {
+ set_update_period(g_conf->mon_health_data_update_interval);
+ }
+ virtual ~DataHealthService() { }
+ DataHealthService *get() {
+ return static_cast<DataHealthService *>(RefCountedObject::get());
+ }
+
+ virtual void init() {
+ generic_dout(20) << "data_health " << __func__ << dendl;
+ start_tick();
+ }
+
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail);
+
+ virtual int get_type() {
+ return HealthService::SERVICE_HEALTH_DATA;
+ }
+
+ virtual string get_name() const {
+ return "data_health";
+ }
+};
+typedef boost::intrusive_ptr<DataHealthService> DataHealthServiceRef;
+
+#endif /* CEPH_MON_DATA_HEALTH_SERVICE_H */
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sstream>
+#include <stdlib.h>
+#include <limits.h>
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+#include "mon/HealthMonitor.h"
+#include "mon/DataHealthService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, mon, this)
+static ostream& _prefix(std::ostream *_dout, const Monitor *mon,
+ const HealthMonitor *hmon) {
+ return *_dout << "mon." << mon->name << "@" << mon->rank
+ << "(" << mon->get_state_name() << ")." << hmon->get_name()
+ << "(" << hmon->get_epoch() << ") ";
+}
+
+void HealthMonitor::init()
+{
+ dout(10) << __func__ << dendl;
+ assert(services.empty());
+ services[HealthService::SERVICE_HEALTH_DATA] =
+ HealthServiceRef(new DataHealthService(mon));
+
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->init();
+ }
+}
+
+bool HealthMonitor::service_dispatch(Message *m)
+{
+ assert(m->get_type() == MSG_MON_HEALTH);
+ MMonHealth *hm = (MMonHealth*)m;
+ int service_type = hm->get_service_type();
+ if (services.count(service_type) == 0) {
+ dout(1) << __func__ << " service type " << service_type
+ << " not registered -- drop message!" << dendl;
+ m->put();
+ return false;
+ }
+ return services[service_type]->service_dispatch(hm);
+}
+
+void HealthMonitor::service_shutdown()
+{
+ dout(0) << "HealthMonitor::service_shutdown "
+ << services.size() << " services" << dendl;
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->shutdown();
+ }
+ services.clear();
+}
+
+void HealthMonitor::get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail) {
+ assert(f != NULL);
+ f->open_object_section("health");
+ f->open_array_section("health_services");
+
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->get_health(f, detail);
+ }
+
+ f->close_section(); // health_services
+ f->close_section(); // health
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_HEALTH_MONITOR_H
+#define CEPH_HEALTH_MONITOR_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+#include "mon/HealthService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+#include "common/Formatter.h"
+
+class HealthMonitor : public QuorumService
+{
+ map<int,HealthServiceRef> services;
+
+protected:
+ virtual void service_shutdown();
+
+public:
+ HealthMonitor(Monitor *m) : QuorumService(m) { }
+ virtual ~HealthMonitor() { }
+ HealthMonitor *get() {
+ return static_cast<HealthMonitor *>(RefCountedObject::get());
+ }
+
+
+ /**
+ * @defgroup HealthMonitor_Inherited_h Inherited abstract methods
+ * @{
+ */
+ virtual void init();
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail);
+ virtual bool service_dispatch(Message *m);
+
+ virtual void start_epoch() {
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ it->second->start(get_epoch());
+ }
+ }
+
+ virtual void finish_epoch() {
+ generic_dout(20) << "HealthMonitor::finish_epoch()" << dendl;
+ for (map<int,HealthServiceRef>::iterator it = services.begin();
+ it != services.end(); ++it) {
+ assert(it->second.get() != NULL);
+ it->second->finish();
+ }
+ }
+
+ virtual void cleanup() { }
+ virtual void service_tick() { }
+
+ virtual int get_type() {
+ return QuorumService::SERVICE_HEALTH;
+ }
+
+ virtual string get_name() const {
+ return "health";
+ }
+
+ /**
+ * @} // HealthMonitor_Inherited_h
+ */
+};
+typedef boost::intrusive_ptr<HealthMonitor> HealthMonitorRef;
+
+#endif // CEPH_HEALTH_MONITOR_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#ifndef CEPH_MON_HEALTH_SERVICE_H
+#define CEPH_MON_HEALTH_SERVICE_H
+
+#include <boost/intrusive_ptr.hpp>
+// Because intusive_ptr clobbers our assert...
+#include "include/assert.h"
+
+#include "mon/Monitor.h"
+#include "mon/QuorumService.h"
+
+#include "messages/MMonHealth.h"
+
+#include "common/config.h"
+
+struct HealthService : public QuorumService
+{
+ static const int SERVICE_HEALTH_DATA = 0x01;
+
+ HealthService(Monitor *m) : QuorumService(m) { }
+ virtual ~HealthService() { }
+
+ virtual bool service_dispatch(Message *m) {
+ return service_dispatch(static_cast<MMonHealth*>(m));
+ }
+
+ virtual bool service_dispatch(MMonHealth *m) = 0;
+
+public:
+ HealthService *get() {
+ return static_cast<HealthService *>(RefCountedObject::get());
+ }
+ virtual void get_health(Formatter *f,
+ list<pair<health_status_t,string> > *detail) = 0;
+ virtual int get_type() = 0;
+ virtual string get_name() const = 0;
+};
+typedef boost::intrusive_ptr<HealthService> HealthServiceRef;
+
+#endif // CEPH_MON_HEALTH_SERVICE_H
#ifndef CEPH_MON_TYPES_H
#define CEPH_MON_TYPES_H
+#include "include/utime.h"
+
#define PAXOS_PGMAP 0 // before osd, for pg kick to behave
#define PAXOS_MDSMAP 1
#define PAXOS_OSDMAP 2
#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012"
+// data stats
+
+struct DataStats {
+ // data dir
+ uint64_t kb_total;
+ uint64_t kb_used;
+ uint64_t kb_avail;
+ int latest_avail_percent;
+ utime_t last_update;
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(kb_total, bl);
+ ::encode(kb_used, bl);
+ ::encode(kb_avail, bl);
+ ::encode(latest_avail_percent, bl);
+ ::encode(last_update, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator &p) {
+ DECODE_START(1, p);
+ ::decode(kb_total, p);
+ ::decode(kb_used, p);
+ ::decode(kb_avail, p);
+ ::decode(latest_avail_percent, p);
+ ::decode(last_update, p);
+ DECODE_FINISH(p);
+ }
+};
+
+WRITE_CLASS_ENCODER(DataStats);
+
#endif
#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersion.h"
#include "messages/MMonGetVersionReply.h"
+#include "messages/MMonHealth.h"
#include "messages/MAuth.h"
#include "messages/MAuthReply.h"
m = new MTimeCheck();
break;
+ case MSG_MON_HEALTH:
+ m = new MMonHealth();
+ break;
+
// -- simple messages without payload --
case CEPH_MSG_SHUTDOWN:
// *** generic ***
#define MSG_TIMECHECK 0x600
+#define MSG_MON_HEALTH 0x601