Adds graylog/GELF logging support to clog and monitor log.
Changes to Graylog class:
- Support Logging LogEntry classes
- Add parameter "logger" to indicate the log source
(e.g dlog, clog, mon)
New config opts:
- clog to graylog
- clog to graylog host
- clog to graylog port
- mon cluster log to graylog
- mon cluster log to graylog host
- mon cluster log to graylog port
Signed-off-by: Marcel Lauhoff <ml@irq0.org>
su_sbin_PROGRAMS =
su_sbin_SCRIPTS =
dist_bin_SCRIPTS =
-lib_LTLIBRARIES =
-noinst_LTLIBRARIES =
+lib_LTLIBRARIES =
+noinst_LTLIBRARIES =
noinst_LIBRARIES =
radoslib_LTLIBRARIES =
EXTRALIBS += -lgcov
endif # ENABLE_COVERAGE
+LIBCOMMON += -luuid
+
# Libosd always needs osdc and os
LIBOSD += $(LIBOSDC) $(LIBOS)
radoslibdir = $(libdir)/rados-classes
-
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Graylog.h"
+
+#include <iostream>
+#include <sstream>
+#include <memory>
+
+#include <arpa/inet.h>
+
+#include <boost/asio.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include "common/Formatter.h"
+#include "include/uuid.h"
+
+namespace ceph {
+namespace log {
+
+Graylog::Graylog(const SubsystemMap * const s, std::string logger)
+ : m_subs(s),
+ m_log_dst_valid(false),
+ m_hostname(""),
+ m_fsid(""),
+ m_logger(logger),
+ m_ostream_compressed(std::stringstream::in |
+ std::stringstream::out |
+ std::stringstream::binary)
+{
+ m_formatter = std::unique_ptr<Formatter>(Formatter::create("json"));
+ m_formatter_section = std::unique_ptr<Formatter>(Formatter::create("json"));
+}
+
+Graylog::Graylog(std::string logger)
+ : m_subs(NULL),
+ m_log_dst_valid(false),
+ m_hostname(""),
+ m_fsid(""),
+ m_logger(logger),
+ m_ostream_compressed(std::stringstream::in |
+ std::stringstream::out |
+ std::stringstream::binary)
+{
+ m_formatter = std::unique_ptr<Formatter>(Formatter::create("json"));
+ m_formatter_section = std::unique_ptr<Formatter>(Formatter::create("json"));
+}
+
+Graylog::~Graylog()
+{
+}
+
+void Graylog::set_destination(const std::string& host, int port)
+{
+ try {
+ boost::asio::ip::udp::resolver resolver(m_io_service);
+ boost::asio::ip::udp::resolver::query query(host,
+ boost::lexical_cast<std::string>(port));
+ m_endpoint = *resolver.resolve(query);
+ m_log_dst_valid = true;
+ } catch (boost::system::system_error const& e) {
+ cerr << "Error resolving graylog destination: " << e.what() << std::endl;
+ m_log_dst_valid = false;
+ }
+}
+
+void Graylog::set_hostname(const std::string& host)
+{
+ m_hostname = host;
+}
+
+void Graylog::set_fsid(uuid_d fsid)
+{
+ std::vector<char> buf(40);
+ fsid.print(&buf[0]);
+ m_fsid = std::string(&buf[0]);
+}
+
+void Graylog::log_entry(Entry const * const e)
+{
+ if (m_log_dst_valid) {
+ std::string s = e->get_str();
+
+ m_formatter->open_object_section("");
+ m_formatter->dump_string("version", "1.1");
+ m_formatter->dump_string("host", m_hostname);
+ m_formatter->dump_string("short_message", s);
+ m_formatter->dump_string("_app", "ceph");
+ m_formatter->dump_float("timestamp", e->m_stamp.sec() + (e->m_stamp.usec() / 1000000.0));
+ m_formatter->dump_int("_thread", e->m_thread);
+ m_formatter->dump_int("_level", e->m_prio);
+ if (m_subs != NULL)
+ m_formatter->dump_string("_subsys_name", m_subs->get_name(e->m_subsys));
+ m_formatter->dump_int("_subsys_id", e->m_subsys);
+ m_formatter->dump_string("_fsid", m_fsid);
+ m_formatter->dump_string("_logger", m_logger);
+ m_formatter->close_section();
+
+ m_ostream_compressed.clear();
+ m_ostream_compressed.str("");
+
+ m_ostream.reset();
+
+ m_ostream.push(m_compressor);
+ m_ostream.push(m_ostream_compressed);
+
+ m_formatter->flush(m_ostream);
+ m_ostream << std::endl;
+
+ m_ostream.reset();
+
+ try {
+ boost::asio::ip::udp::socket socket(m_io_service);
+ socket.open(m_endpoint.protocol());
+ socket.send_to(boost::asio::buffer(m_ostream_compressed.str()), m_endpoint);
+ } catch (boost::system::system_error const& e) {
+ cerr << "Error sending graylog message: " << e.what() << std::endl;
+ }
+ }
+}
+
+void Graylog::log_log_entry(LogEntry const * const e)
+{
+ if (m_log_dst_valid) {
+ m_formatter->open_object_section("");
+ m_formatter->dump_string("version", "1.1");
+ m_formatter->dump_string("host", m_hostname);
+ m_formatter->dump_string("short_message", e->msg);
+ m_formatter->dump_float("timestamp", e->stamp.sec() + (e->stamp.usec() / 1000000.0));
+ m_formatter->dump_string("_app", "ceph");
+
+ m_formatter_section->open_object_section("");
+ e->who.addr.dump(m_formatter_section.get());
+ e->who.name.dump(m_formatter_section.get());
+ m_formatter_section->close_section();
+
+ m_ostream_section.clear();
+ m_ostream_section.str("");
+ m_formatter_section->flush(m_ostream_section);
+ m_formatter->dump_string("_who", m_ostream_section.str());
+
+ m_formatter->dump_int("_seq", e->seq);
+ m_formatter->dump_string("_prio", clog_type_to_string(e->prio));
+ m_formatter->dump_string("_channel", e->channel);
+ m_formatter->dump_string("_fsid", m_fsid);
+ m_formatter->dump_string("_logger", m_logger);
+ m_formatter->close_section();
+
+ m_ostream_compressed.clear();
+ m_ostream_compressed.str("");
+
+ m_ostream.reset();
+
+ m_ostream.push(m_compressor);
+ m_ostream.push(m_ostream_compressed);
+
+ m_formatter->flush(m_ostream);
+ m_ostream << std::endl;
+
+ m_ostream.reset();
+
+ try {
+ boost::asio::ip::udp::socket socket(m_io_service);
+ socket.open(m_endpoint.protocol());
+ socket.send_to(boost::asio::buffer(m_ostream_compressed.str()), m_endpoint);
+ } catch (boost::system::system_error const& e) {
+ cerr << "Error sending graylog message: " << e.what() << std::endl;
+ }
+ }
+}
+
+} // ceph::log::
+} // ceph::
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef __CEPH_LOG_GRAYLOG_H
+#define __CEPH_LOG_GRAYLOG_H
+
+
+#include <memory>
+
+#include <boost/thread/mutex.hpp>
+#include <boost/asio.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+
+#include "log/Entry.h"
+#include "log/SubsystemMap.h"
+#include "common/LogEntry.h"
+#include "include/memory.h"
+
+namespace ceph {
+namespace log {
+
+// Graylog logging backend: Convert log datastructures (LogEntry, Entry) to
+// GELF (http://www.graylog2.org/resources/gelf/specification) and send it
+// to a GELF UDP receiver
+
+class Graylog
+{
+ public:
+
+ /**
+ * Create Graylog with SubsystemMap. log_entry will resolve the subsystem
+ * id to string. Logging will not be ready until set_destination is called
+ * @param s SubsystemMap
+ * @param logger Value for key "_logger" in GELF
+ */
+ Graylog(const SubsystemMap * const s, std::string logger);
+
+ /**
+ * Create Graylog without SubsystemMap. Logging will not be ready
+ * until set_destination is called
+ * @param logger Value for key "_logger" in GELF
+ */
+ Graylog(std::string logger);
+ virtual ~Graylog();
+
+ void set_hostname(const std::string& host);
+ void set_fsid(uuid_d fsid);
+
+ void set_destination(const std::string& host, int port);
+
+ void log_entry(Entry const * const e);
+ void log_log_entry(LogEntry const * const e);
+
+ typedef ceph::shared_ptr<Graylog> Ref;
+
+ private:
+ SubsystemMap const * const m_subs;
+
+ bool m_log_dst_valid;
+
+ std::string m_hostname;
+ std::string m_fsid;
+ std::string m_logger;
+
+ boost::asio::ip::udp::endpoint m_endpoint;
+ boost::asio::io_service m_io_service;
+
+ std::unique_ptr<Formatter> m_formatter;
+ std::unique_ptr<Formatter> m_formatter_section;
+ std::stringstream m_ostream_section;
+ std::stringstream m_ostream_compressed;
+ boost::iostreams::filtering_ostream m_ostream;
+ boost::iostreams::zlib_compressor m_compressor;
+
+};
+
+}
+}
+
+#endif
map<string,string> &log_to_monitors,
map<string,string> &log_to_syslog,
map<string,string> &log_channels,
- map<string,string> &log_prios)
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host)
{
ostringstream oss;
lderr(cct) << __func__ << " error parsing 'clog_to_syslog_level'" << dendl;
return r;
}
+
+ r = get_conf_str_map_helper(cct->_conf->clog_to_graylog, oss,
+ &log_to_graylog, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ lderr(cct) << __func__ << " error parsing 'clog_to_graylog'" << dendl;
+ return r;
+ }
+
+ r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_host, oss,
+ &log_to_graylog_host, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ lderr(cct) << __func__ << " error parsing 'clog_to_graylog_host'" << dendl;
+ return r;
+ }
+
+ r = get_conf_str_map_helper(cct->_conf->clog_to_graylog_port, oss,
+ &log_to_graylog_port, CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ lderr(cct) << __func__ << " error parsing 'clog_to_graylog_port'" << dendl;
+ return r;
+ }
+
+ fsid = cct->_conf->fsid;
+ host = cct->_conf->host;
return 0;
}
void LogChannel::update_config(map<string,string> &log_to_monitors,
map<string,string> &log_to_syslog,
map<string,string> &log_channels,
- map<string,string> &log_prios)
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host)
{
ldout(cct, 20) << __func__ << " log_to_monitors " << log_to_monitors
<< " log_to_syslog " << log_to_syslog
&CLOG_CONFIG_DEFAULT_KEY);
string prio = get_str_map_key(log_prios, log_channel,
&CLOG_CONFIG_DEFAULT_KEY);
+ bool to_graylog = (get_str_map_key(log_to_graylog, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY) == "true");
+ string graylog_host = get_str_map_key(log_to_graylog_host, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ string graylog_port_str = get_str_map_key(log_to_graylog_port, log_channel,
+ &CLOG_CONFIG_DEFAULT_KEY);
+ int graylog_port = atoi(graylog_port_str.c_str());
set_log_to_monitors(to_monitors);
set_log_to_syslog(to_syslog);
set_syslog_facility(syslog_facility);
set_log_prio(prio);
+ if (to_graylog && !graylog) { /* should but isn't */
+ graylog = ceph::log::Graylog::Ref(new ceph::log::Graylog("clog"));
+ } else if (!to_graylog && graylog) { /* shouldn't but is */
+ graylog.reset();
+ }
+
+ if (to_graylog && graylog) {
+ graylog->set_fsid(fsid);
+ graylog->set_hostname(host);
+ }
+
+ if (graylog && (!graylog_host.empty()) && (graylog_port != 0)) {
+ graylog->set_destination(graylog_host, graylog_port);
+ }
+
ldout(cct, 10) << __func__
<< " to_monitors: " << (to_monitors ? "true" : "false")
<< " to_syslog: " << (to_syslog ? "true" : "false")
<< " syslog_facility: " << syslog_facility
- << " prio: " << prio << ")" << dendl;
+ << " prio: " << prio
+ << " to_graylog: " << (to_graylog ? "true" : "false")
+ << " graylog_host: " << graylog_host
+ << " graylog_port: " << graylog_port
+ << ")" << dendl;
}
void LogChannel::do_log(clog_type prio, std::stringstream& ss)
e.log_to_syslog(get_log_prio(), get_syslog_facility());
}
+ // log to graylog?
+ if (do_log_to_graylog()) {
+ ldout(cct,0) << __func__ << " log to graylog" << dendl;
+ graylog->log_log_entry(&e);
+ }
+
// log to monitor?
if (log_to_monitors) {
parent->queue(e);
#include "common/LogEntry.h"
#include "common/Mutex.h"
+#include "include/uuid.h"
+#include "common/Graylog.h"
#include <iosfwd>
#include <sstream>
map<string,string> &log_to_monitors,
map<string,string> &log_to_syslog,
map<string,string> &log_channels,
- map<string,string> &log_prios);
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host);
class LogClientTemp
{
}
bool must_log_to_monitors() { return log_to_monitors; }
+ bool do_log_to_graylog() {
+ return graylog;
+ }
+
typedef shared_ptr<LogChannel> Ref;
/**
void update_config(map<string,string> &log_to_monitors,
map<string,string> &log_to_syslog,
map<string,string> &log_channels,
- map<string,string> &log_prios);
+ map<string,string> &log_prios,
+ map<string,string> &log_to_graylog,
+ map<string,string> &log_to_graylog_host,
+ map<string,string> &log_to_graylog_port,
+ uuid_d &fsid,
+ string &host);
void do_log(clog_type prio, std::stringstream& ss);
void do_log(clog_type prio, const std::string& s);
std::string syslog_facility;
bool log_to_syslog;
bool log_to_monitors;
+ ceph::log::Graylog::Ref graylog;
friend class LogClientTemp;
common/DecayCounter.cc \
common/LogClient.cc \
common/LogEntry.cc \
+ common/Graylog.cc \
common/PrebufferedStreambuf.cc \
common/SloppyCRCMap.cc \
common/BackTrace.cc \
common/HeartbeatMap.h \
common/LogClient.h \
common/LogEntry.h \
+ common/Graylog.h \
common/Preforker.h \
common/SloppyCRCMap.h \
common/WorkQueue.h \
OPTION(clog_to_syslog, OPT_STR, "false")
OPTION(clog_to_syslog_level, OPT_STR, "info") // this level and above
OPTION(clog_to_syslog_facility, OPT_STR, "default=daemon audit=local0")
+OPTION(clog_to_graylog, OPT_STR, "false")
+OPTION(clog_to_graylog_host, OPT_STR, "127.0.0.1")
+OPTION(clog_to_graylog_port, OPT_STR, "12201")
OPTION(mon_cluster_log_to_syslog, OPT_STR, "default=false")
OPTION(mon_cluster_log_to_syslog_level, OPT_STR, "info") // this level and above
OPTION(mon_cluster_log_file, OPT_STR,
"default=/var/log/ceph/$cluster.$channel.log cluster=/var/log/ceph/$cluster.log")
OPTION(mon_cluster_log_file_level, OPT_STR, "info")
+OPTION(mon_cluster_log_to_graylog, OPT_STR, "false")
+OPTION(mon_cluster_log_to_graylog_host, OPT_STR, "127.0.0.1")
+OPTION(mon_cluster_log_to_graylog_port, OPT_STR, "12201")
OPTION(enable_experimental_unrecoverable_data_corrupting_features, OPT_STR, "")
{
pthread_mutex_lock(&m_flush_mutex);
if (! m_graylog.get())
- m_graylog = Graylog::Ref(new Graylog(m_subs));
+ m_graylog = Graylog::Ref(new Graylog(m_subs, "dlog"));
pthread_mutex_unlock(&m_flush_mutex);
}
#include "Entry.h"
#include "EntryQueue.h"
#include "SubsystemMap.h"
-#include "Graylog.h"
+#include "common/Graylog.h"
namespace ceph {
namespace log {
liblog_la_SOURCES = \
log/Log.cc \
- log/SubsystemMap.cc \
- log/Graylog.cc
+ log/SubsystemMap.cc
+
noinst_LTLIBRARIES += liblog.la
noinst_HEADERS += \
log/Entry.h \
log/EntryQueue.h \
log/Log.h \
- log/SubsystemMap.h \
- log/Graylog.h
+ log/SubsystemMap.h
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#include <unistd.h>
};
// cons/des
-MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
+MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
mds_lock("MDSDaemon::mds_lock"),
stopping(false),
MDSDaemon::~MDSDaemon() {
Mutex::Locker lock(mds_lock);
- delete mds_rank;
- mds_rank = NULL;
- delete objecter;
+ delete mds_rank;
+ mds_rank = NULL;
+ delete objecter;
objecter = NULL;
delete mdsmap;
mdsmap = NULL;
}
f->flush(ss);
delete f;
-
+
dout(1) << "asok_command: " << command << " (complete)" << dendl;
-
+
return handled;
}
// StrayManager
"mds_max_purge_ops",
"mds_max_purge_ops_per_pg",
+ "clog_to_graylog",
+ "clog_to_graylog_host",
+ "clog_to_graylog_port",
+ "host",
+ "fsid",
NULL
};
return KEYS;
if (changed.count("clog_to_monitors") ||
changed.count("clog_to_syslog") ||
changed.count("clog_to_syslog_level") ||
- changed.count("clog_to_syslog_facility")) {
+ changed.count("clog_to_syslog_facility") ||
+ changed.count("clog_to_graylog") ||
+ changed.count("clog_to_graylog_host") ||
+ changed.count("clog_to_graylog_port") ||
+ changed.count("host") ||
+ changed.count("fsid")) {
if (mds_rank) {
mds_rank->update_log_config();
}
// tell monc about log_client so it will know about mon session resets
monc->set_log_client(&log_client);
-
+
int r = monc->authenticate();
if (r < 0) {
derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
}
beacon.init(mdsmap, wanted_state, standby_for_rank, standby_for_name);
messenger->set_myname(entity_name_t::MDS(MDS_RANK_NONE));
-
+
// schedule tick
reset_tick();
g_conf->add_observer(this);
// is it new?
if (epoch <= mdsmap->get_epoch()) {
- dout(5) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch()
+ dout(5) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch()
<< ", discarding" << dendl;
m->put();
return;
}
// MDSRank is active: let him process the map, we have no say.
- dout(10) << __func__ << ": handling map as rank "
+ dout(10) << __func__ << ": handling map as rank "
<< mds_rank->get_nodeid() << dendl;
mds_rank->handle_mds_map(m, oldmap);
}
beacon.set_want_state(mdsmap, new_state);
return;
}
-
+
// Case where we have sent a boot beacon that isn't reflected yet
if (beacon.get_want_state() == MDSMap::STATE_BOOT) {
dout(10) << "not in map yet" << dendl;
//because add_observer is called after set_up_admin_socket
//so we can use asok_hook to avoid assert in the remove_observer
- if (asok_hook != NULL)
+ if (asok_hook != NULL)
g_conf->remove_observer(this);
clean_up_admin_socket();
}
// First see if it's a daemon message
- const bool handled_core = handle_core_message(m);
+ const bool handled_core = handle_core_message(m);
if (handled_core) {
return true;
}
case MSG_MON_COMMAND:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
handle_command(static_cast<MMonCommand*>(m));
- break;
+ break;
// OSD
case MSG_COMMAND:
return true;
}
-void MDSDaemon::ms_handle_connect(Connection *con)
+void MDSDaemon::ms_handle_connect(Connection *con)
{
}
-bool MDSDaemon::ms_handle_reset(Connection *con)
+bool MDSDaemon::ms_handle_reset(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
return false;
}
-void MDSDaemon::ms_handle_remote_reset(Connection *con)
+void MDSDaemon::ms_handle_remote_reset(Connection *con)
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
return;
// a new connection, rather than a new client
s = mds_rank->sessionmap.get_session(n);
}
-
+
// Wire up a Session* to this connection
// It doesn't go into a SessionMap instance until it sends an explicit
// request to open a session (initial state of Session is `closed`)
return true;
}
}
-
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#include "common/debug.h"
balancer(NULL), scrubstack(NULL),
inotable(NULL), snapserver(NULL), snapclient(NULL),
sessionmap(this), logger(NULL), mlogger(NULL),
- op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
+ op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
g_conf->osd_num_op_tracker_shard),
last_state(MDSMap::STATE_BOOT),
state(MDSMap::STATE_BOOT),
// log
utime_t now = ceph_clock_now(g_ceph_context);
mds_load_t load = balancer->get_load(now);
-
+
if (logger) {
logger->set(l_mds_load_cent, 100 * load.mds_load());
logger->set(l_mds_dispatch_queue_len, messenger->get_dispatch_queue_len());
server->find_idle_sessions();
locker->tick();
}
-
+
if (is_reconnect())
server->reconnect_tick();
-
+
if (is_active()) {
balancer->tick();
mdcache->find_stale_fragment_freeze();
{
typedef void (MDSRank::*fn_ptr)();
protected:
- fn_ptr fn;
+ fn_ptr fn;
public:
C_VoidFn(MDSRank *mds_, fn_ptr fn_)
: MDSInternalContext(mds_), fn(fn_)
// hack: thrash exports
static utime_t start;
utime_t now = ceph_clock_now(g_ceph_context);
- if (start == utime_t())
+ if (start == utime_t())
start = now;
/*double el = now - start;
if (el > 30.0 &&
set<mds_rank_t> s;
if (!is_active()) break;
mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
- if (s.size() < 2 || mdcache->get_num_inodes() < 10)
+ if (s.size() < 2 || mdcache->get_num_inodes() < 10)
break; // need peers for this to work.
dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
-
+
// pick a random dir inode
CInode *in = mdcache->hack_pick_random_inode();
CDir *dir = *p;
if (!dir->get_parent_dir()) continue; // must be linked.
if (!dir->is_auth()) continue; // must be auth.
-
+
mds_rank_t dest;
do {
int k = rand() % s.size();
if (!is_active()) break;
if (mdcache->get_num_fragmenting_dirs() > 5) break;
dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
-
+
// pick a random dir inode
CInode *in = mdcache->hack_pick_random_inode();
if (false &&
mdcache->get_root() &&
mdcache->get_root()->dir &&
- !(mdcache->get_root()->dir->is_hashed() ||
+ !(mdcache->get_root()->dir->is_hashed() ||
mdcache->get_root()->dir->is_hashing())) {
dout(0) << "hashing root" << dendl;
mdcache->migrator->hash_dir(mdcache->get_root()->dir);
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->dispatch(m);
break;
-
+
case MDS_PORT_MIGRATOR:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->migrator->dispatch(m);
break;
-
+
default:
switch (m->get_type()) {
// SERVER
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
server->dispatch(m);
break;
-
+
case MSG_MDS_HEARTBEAT:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
balancer->proc_message(m);
break;
-
+
case MSG_MDS_TABLE_REQUEST:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
{
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
locker->dispatch(m);
break;
-
+
case CEPH_MSG_CLIENT_CAPS:
case CEPH_MSG_CLIENT_CAPRELEASE:
case CEPH_MSG_CLIENT_LEASE:
ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
locker->dispatch(m);
break;
-
+
default:
return false;
}
void MDSRank::send_message(Message *m, Connection *c)
-{
+{
assert(c);
c->send_message(m);
}
// send mdsmap first?
if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
- messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
+ messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
mdsmap->get_inst(mds));
peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
}
* the affected metadata may migrate, in which case the new authority
* won't have the metareq_id in the completed request map.
*/
- // NEW: always make the client resend!
+ // NEW: always make the client resend!
bool client_must_resend = true; //!creq->can_forward();
// tell the client where it should go
messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
client_must_resend),
creq->get_source_inst());
-
+
if (client_must_resend) {
m->put();
- return;
+ return;
}
}
// send mdsmap first?
if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
- messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
+ messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
mdsmap->get_inst(mds));
peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
}
if (is_standby_replay())
standby_replaying = true;
-
+
calc_recovery_set();
// Check if we need to wait for a newer OSD map before starting
delete fin;
boot_start();
} else {
- dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
+ dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
<< " (which blacklists prior instance)" << dendl;
}
}
this,
mdlog->get_journaler()->get_read_pos()));
} else {
- dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
+ dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
<< " (which blacklists prior instance)" << dendl;
}
}
if (mdsmap->get_num_in_mds() == 1 &&
mdsmap->get_num_failed_mds() == 0) { // just me!
- dout(2) << "i am alone, moving to state reconnect" << dendl;
+ dout(2) << "i am alone, moving to state reconnect" << dendl;
request_state(MDSMap::STATE_RECONNECT);
} else {
dout(2) << "i am not alone, moving to state resolve" << dendl;
{
dout(1) << "recovery_done -- successful recovery!" << dendl;
assert(is_clientreplay() || is_active());
-
+
// kick snaptable (resent AGREEs)
if (mdsmap->get_tableserver() == whoami) {
set<mds_rank_t> active;
version_t epoch = m->get_epoch();
// note source's map version
- if (m->get_source().is_mds() &&
+ if (m->get_source().is_mds() &&
peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
dout(15) << " peer " << m->get_source()
<< " has mdsmap epoch >= " << epoch
}
}
}
-
+
// RESOLVE
// is someone else newly resolving?
if (is_resolve() || is_reconnect() || is_rejoin() ||
mdcache->send_resolves();
}
}
-
+
// REJOIN
// is everybody finally rejoining?
if (is_rejoin() || is_clientreplay() || is_active() || is_stopping()) {
// did we finish?
if (g_conf->mds_dump_cache_after_rejoin &&
- oldmap->is_rejoining() && !mdsmap->is_rejoining())
+ oldmap->is_rejoining() && !mdsmap->is_rejoining())
mdcache->dump_cache(); // for DEBUG only
if (oldstate >= MDSMap::STATE_REJOIN) {
oldmap->get_mds_set(oldactive, MDSMap::STATE_CLIENTREPLAY);
mdsmap->get_mds_set(active, MDSMap::STATE_ACTIVE);
mdsmap->get_mds_set(active, MDSMap::STATE_CLIENTREPLAY);
- for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
+ for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
if (*p != whoami && // not me
oldactive.count(*p) == 0) // newly so?
handle_mds_recovery(*p);
set<mds_rank_t> oldstopped, stopped;
oldmap->get_stopped_mds_set(oldstopped);
mdsmap->get_stopped_mds_set(stopped);
- for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
+ for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
if (oldstopped.count(*p) == 0) // newly so?
mdcache->migrator->handle_mds_failure_or_stop(*p);
}
mdcache->notify_mdsmap_changed();
}
-void MDSRank::handle_mds_recovery(mds_rank_t who)
+void MDSRank::handle_mds_recovery(mds_rank_t who)
{
dout(5) << "handle_mds_recovery mds." << who << dendl;
-
+
mdcache->handle_mds_recovery(who);
if (mdsmap->get_tableserver() == whoami) {
} else if (command == "osdmap barrier") {
int64_t target_epoch = 0;
bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
-
+
if (!got_val) {
ss << "no target epoch given";
delete f;
return true;
}
-
+
mds_lock.Lock();
set_osd_epoch_barrier(target_epoch);
mds_lock.Unlock();
-
+
C_SaferCond cond;
bool already_got = objecter->wait_for_map(target_epoch, &cond);
if (!already_got) {
}
} else if (command == "session ls") {
mds_lock.Lock();
-
+
heartbeat_reset();
-
+
dump_sessions(SessionFilter(), f);
-
+
mds_lock.Unlock();
} else if (command == "session evict") {
std::string client_id;
const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
assert(got_arg == true);
-
+
mds_lock.Lock();
Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
strtol(client_id.c_str(), 0, 10)));
if (session) {
C_SaferCond on_safe;
server->kill_session(session, &on_safe);
-
+
mds_lock.Unlock();
on_safe.wait();
} else {
if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
continue;
}
-
+
f->open_object_section("session");
f->dump_int("id", p->first.num());
-
+
f->dump_int("num_leases", s->leases.size());
f->dump_int("num_caps", s->caps.size());
-
+
f->dump_string("state", s->get_state_name());
f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
f->dump_unsigned("completed_requests", s->get_num_completed_requests());
map<string,string> log_to_syslog;
map<string,string> log_channel;
map<string,string> log_prio;
+ map<string,string> log_to_graylog;
+ map<string,string> log_to_graylog_host;
+ map<string,string> log_to_graylog_port;
+ uuid_d fsid;
+ string host;
+
if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
- log_channel, log_prio) == 0)
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host) == 0)
clog->update_config(log_to_monitors, log_to_syslog,
- log_channel, log_prio);
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host);
dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
}
mds_plb.add_time_avg(l_mds_reply_latency, "reply_latency",
"Reply latency", "rlat");
mds_plb.add_u64_counter(l_mds_forward, "forward", "Forwarding request");
-
+
mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos");
mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
- mds_plb.add_u64(l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
+ mds_plb.add_u64(l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps", "Inodes with capabilities");
mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps");
mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
-
- mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
+
+ mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward", "Traverse forwards");
mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover", "Traverse directory discovers");
mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch", "Traverse incomplete directory content fetchings");
mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino", "Traverse remote dentries");
mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock", "Traverse locks");
-
+
mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
-
+
mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
mds_plb.add_u64_counter(l_mds_exported_inodes, "exported_inodes", "Exported inodes");
mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
sessionmap.get_client_session_set(clients);
for (set<Session*>::const_iterator p = clients.begin();
p != clients.end();
- ++p)
+ ++p)
(*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
last_client_mdsmap_bcast = mdsmap->get_epoch();
}
return false;
}
}
-
channels.get_facility(channel));
}
+ if (channels.do_log_to_graylog(channel)) {
+ ceph::log::Graylog::Ref graylog = channels.get_graylog(channel);
+ if (graylog) {
+ graylog->log_log_entry(&le);
+ }
+ dout(7) << "graylog: " << channel << " " << graylog
+ << " host:" << channels.log_to_graylog_host << dendl;
+ }
+
string log_file = channels.get_log_file(channel);
dout(20) << __func__ << " logging for channel '" << channel
<< "' to file '" << log_file << "'" << dendl;
return;
}
+ r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
+ &channels.log_to_graylog,
+ CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
+ << dendl;
+ return;
+ }
+
+ r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
+ &channels.log_to_graylog_host,
+ CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
+ << dendl;
+ return;
+ }
+
+ r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
+ &channels.log_to_graylog_port,
+ CLOG_CONFIG_DEFAULT_KEY);
+ if (r < 0) {
+ derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
+ << dendl;
+ return;
+ }
+
channels.expand_channel_meta();
}
changed.count("mon_cluster_log_to_syslog_level") ||
changed.count("mon_cluster_log_to_syslog_facility") ||
changed.count("mon_cluster_log_file") ||
- changed.count("mon_cluster_log_file_level")) {
+ changed.count("mon_cluster_log_file_level") ||
+ changed.count("mon_cluster_log_to_graylog") ||
+ changed.count("mon_cluster_log_to_graylog_host") ||
+ changed.count("mon_cluster_log_to_graylog_port") ||
+ changed.count("fsid") ||
+ changed.count("host")) {
update_log_channels();
}
}
#include "common/LogEntry.h"
#include "messages/MLog.h"
+#include "common/Graylog.h"
class MMonCommand;
map<string,string> log_file;
map<string,string> expanded_log_file;
map<string,string> log_file_level;
+ map<string,string> log_to_graylog;
+ map<string,string> log_to_graylog_host;
+ map<string,string> log_to_graylog_port;
+
+ map<string, ceph::log::Graylog::Ref> graylogs;
+ uuid_d fsid;
+ string host;
void clear() {
log_to_syslog.clear();
log_file.clear();
expanded_log_file.clear();
log_file_level.clear();
+ log_to_graylog.clear();
+ log_to_graylog_host.clear();
+ log_to_graylog_port.clear();
+ graylogs.clear();
}
/** expands $channel meta variable on all maps *EXCEPT* log_file
return get_str_map_key(log_file_level, channel,
&CLOG_CONFIG_DEFAULT_KEY);
}
+
+ bool do_log_to_graylog(const string &channel) {
+ return (get_str_map_key(log_to_graylog, channel,
+ &CLOG_CONFIG_DEFAULT_KEY) == "true");
+ }
+
+ ceph::log::Graylog::Ref get_graylog(const string &channel) {
+ generic_dout(25) << __func__ << " for channel '"
+ << channel << "'" << dendl;
+
+ if (graylogs.count(channel) == 0) {
+ ceph::log::Graylog::Ref graylog = ceph::log::Graylog::Ref(new ceph::log::Graylog("mon"));
+
+ graylog->set_fsid(g_conf->fsid);
+ graylog->set_hostname(g_conf->host);
+ graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
+ &CLOG_CONFIG_DEFAULT_KEY),
+ atoi(get_str_map_key(log_to_graylog_port, channel,
+ &CLOG_CONFIG_DEFAULT_KEY).c_str()));
+
+ graylogs[channel] = graylog;
+ generic_dout(20) << __func__ << " for channel '"
+ << channel << "' to graylog host '"
+ << log_to_graylog_host[channel] << ":"
+ << log_to_graylog_port[channel]
+ << "'" << dendl;
+ }
+ return graylogs[channel];
+ }
} channels;
void update_log_channels();
"mon_cluster_log_to_syslog_facility",
"mon_cluster_log_file",
"mon_cluster_log_file_level",
+ "mon_cluster_log_to_graylog",
+ "mon_cluster_log_to_graylog_host",
+ "mon_cluster_log_to_graylog_port",
NULL
};
return KEYS;
"clog_to_syslog",
"clog_to_syslog_facility",
"clog_to_syslog_level",
+ "clog_to_graylog",
+ "clog_to_graylog_host",
+ "clog_to_graylog_port",
+ "fsid",
// periodic health to clog
"mon_health_to_clog",
"mon_health_to_clog_interval",
if (changed.count("clog_to_monitors") ||
changed.count("clog_to_syslog") ||
changed.count("clog_to_syslog_level") ||
- changed.count("clog_to_syslog_facility")) {
+ changed.count("clog_to_syslog_facility") ||
+ changed.count("clog_to_graylog") ||
+ changed.count("clog_to_graylog_host") ||
+ changed.count("clog_to_graylog_port") ||
+ changed.count("host") ||
+ changed.count("fsid")) {
update_log_clients();
}
map<string,string> log_to_syslog;
map<string,string> log_channel;
map<string,string> log_prio;
+ map<string,string> log_to_graylog;
+ map<string,string> log_to_graylog_host;
+ map<string,string> log_to_graylog_port;
+ uuid_d fsid;
+ string host;
if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
- log_channel, log_prio))
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host))
return;
clog->update_config(log_to_monitors, log_to_syslog,
- log_channel, log_prio);
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host);
+
audit_clog->update_config(log_to_monitors, log_to_syslog,
- log_channel, log_prio);
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host);
}
int Monitor::sanitize_options()
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
+ * License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
- *
+ *
*/
#include "acconfig.h"
struct statfs stbuf;
int r = osd->store->statfs(&stbuf);
if (r < 0) {
- derr << "statfs() failed: " << cpp_strerror(r) << dendl;
+ derr << "statfs() failed: " << cpp_strerror(r) << dendl;
return;
}
return e;
}
}
-
+
void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
{
Mutex::Locker l(peer_map_epoch_lock);
MOSDMap *m = new MOSDMap(monc->get_fsid());
m->oldest_map = sblock.oldest_map;
m->newest_map = sblock.newest_map;
-
+
for (epoch_t e = to; e > since; e--) {
bufferlist bl;
if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
send_map(m, con);
return;
}
-
+
if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
<< ", only sending most recent" << dendl;
since = to - cct->_conf->osd_map_share_max_epochs;
}
-
+
if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
to = since + cct->_conf->osd_map_message_max;
m = build_incremental_map_msg(since, to, sblock);
{
char val[80];
int r;
-
+
snprintf(val, sizeof(val), "%s", CEPH_OSD_ONDISK_MAGIC);
r = store->write_meta("magic", val);
if (r < 0)
heartbeat_thread(this),
heartbeat_dispatcher(this),
finished_lock("OSD::finished_lock"),
- op_tracker(cct, cct->_conf->osd_enable_op_tracker,
+ op_tracker(cct, cct->_conf->osd_enable_op_tracker,
cct->_conf->osd_num_op_tracker_shard),
test_ops_hook(NULL),
op_shardedwq(
Mutex::Locker lock(osd_lock);
if (is_stopping())
return 0;
-
+
if (store->test_mount_in_use()) {
derr << "OSD::pre_init: object store '" << dev_path << "' is "
<< "currently in use. (Is ceph-osd already running?)" << dendl;
"Client operations total write size", "wr"); // client op in bytes (writes)
osd_plb.add_u64_counter(l_osd_op_outb, "op_out_bytes",
"Client operations total read size", "rd"); // client op out bytes (reads)
- osd_plb.add_time_avg(l_osd_op_lat, "op_latency",
+ osd_plb.add_time_avg(l_osd_op_lat, "op_latency",
"Latency of client operations (including queue time)", "lat"); // client op latency
- osd_plb.add_time_avg(l_osd_op_process_lat, "op_process_latency",
+ osd_plb.add_time_avg(l_osd_op_process_lat, "op_process_latency",
"Latency of client operations (excluding queue time)"); // client op process latency
osd_plb.add_time_avg(l_osd_op_prepare_lat, "op_prepare_latency",
"Latency of client operations (excluding queue time and wait for finished)"); // client op prepare latency
- osd_plb.add_u64_counter(l_osd_op_r, "op_r",
+ osd_plb.add_u64_counter(l_osd_op_r, "op_r",
"Client read operations"); // client reads
- osd_plb.add_u64_counter(l_osd_op_r_outb, "op_r_out_bytes",
+ osd_plb.add_u64_counter(l_osd_op_r_outb, "op_r_out_bytes",
"Client data read"); // client read out bytes
- osd_plb.add_time_avg(l_osd_op_r_lat, "op_r_latency",
+ osd_plb.add_time_avg(l_osd_op_r_lat, "op_r_latency",
"Latency of read operation (including queue time)"); // client read latency
- osd_plb.add_time_avg(l_osd_op_r_process_lat, "op_r_process_latency",
+ osd_plb.add_time_avg(l_osd_op_r_process_lat, "op_r_process_latency",
"Latency of read operation (excluding queue time)"); // client read process latency
osd_plb.add_time_avg(l_osd_op_r_prepare_lat, "op_r_prepare_latency",
"Latency of read operations (excluding queue time and wait for finished)"); // client read prepare latency
- osd_plb.add_u64_counter(l_osd_op_w, "op_w",
+ osd_plb.add_u64_counter(l_osd_op_w, "op_w",
"Client write operations"); // client writes
- osd_plb.add_u64_counter(l_osd_op_w_inb, "op_w_in_bytes",
+ osd_plb.add_u64_counter(l_osd_op_w_inb, "op_w_in_bytes",
"Client data written"); // client write in bytes
- osd_plb.add_time_avg(l_osd_op_w_rlat, "op_w_rlat",
+ osd_plb.add_time_avg(l_osd_op_w_rlat, "op_w_rlat",
"Client write operation readable/applied latency"); // client write readable/applied latency
- osd_plb.add_time_avg(l_osd_op_w_lat, "op_w_latency",
+ osd_plb.add_time_avg(l_osd_op_w_lat, "op_w_latency",
"Latency of write operation (including queue time)"); // client write latency
- osd_plb.add_time_avg(l_osd_op_w_process_lat, "op_w_process_latency",
+ osd_plb.add_time_avg(l_osd_op_w_process_lat, "op_w_process_latency",
"Latency of write operation (excluding queue time)"); // client write process latency
osd_plb.add_time_avg(l_osd_op_w_prepare_lat, "op_w_prepare_latency",
"Latency of write operations (excluding queue time and wait for finished)"); // client write prepare latency
- osd_plb.add_u64_counter(l_osd_op_rw, "op_rw",
+ osd_plb.add_u64_counter(l_osd_op_rw, "op_rw",
"Client read-modify-write operations"); // client rmw
- osd_plb.add_u64_counter(l_osd_op_rw_inb, "op_rw_in_bytes",
+ osd_plb.add_u64_counter(l_osd_op_rw_inb, "op_rw_in_bytes",
"Client read-modify-write operations write in"); // client rmw in bytes
- osd_plb.add_u64_counter(l_osd_op_rw_outb,"op_rw_out_bytes",
+ osd_plb.add_u64_counter(l_osd_op_rw_outb,"op_rw_out_bytes",
"Client read-modify-write operations read out "); // client rmw out bytes
- osd_plb.add_time_avg(l_osd_op_rw_rlat,"op_rw_rlat",
+ osd_plb.add_time_avg(l_osd_op_rw_rlat,"op_rw_rlat",
"Client read-modify-write operation readable/applied latency"); // client rmw readable/applied latency
- osd_plb.add_time_avg(l_osd_op_rw_lat, "op_rw_latency",
+ osd_plb.add_time_avg(l_osd_op_rw_lat, "op_rw_latency",
"Latency of read-modify-write operation (including queue time)"); // client rmw latency
- osd_plb.add_time_avg(l_osd_op_rw_process_lat, "op_rw_process_latency",
+ osd_plb.add_time_avg(l_osd_op_rw_process_lat, "op_rw_process_latency",
"Latency of read-modify-write operation (excluding queue time)"); // client rmw process latency
- osd_plb.add_time_avg(l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
+ osd_plb.add_time_avg(l_osd_op_rw_prepare_lat, "op_rw_prepare_latency",
"Latency of read-modify-write operations (excluding queue time and wait for finished)"); // client rmw prepare latency
osd_plb.add_u64_counter(l_osd_sop, "subop", "Suboperations"); // subops
}
}
clear_pg_stat_queue();
-
+
// finish ops
op_shardedwq.drain(); // should already be empty except for lagard PGs
{
::decode(superblock, p);
dout(10) << "read_superblock " << superblock << dendl;
-
+
return 0;
}
PGPool p = PGPool(id, createmap->get_pool_name(id),
createmap->get_pg_pool(id)->auid);
-
+
const pg_pool_t *pi = createmap->get_pg_pool(id);
p.info = *pi;
p.snapc = pi->get_snap_context();
if (createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_REPLICATED ||
createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_ERASURE)
pg = new ReplicatedPG(&service, createmap, pool, pgid);
- else
+ else
assert(0);
return pg;
assert(0);
}
}
-
+
build_past_intervals_parallel();
}
upprimary != currentupprimary ||
acting != currentacting ||
up != currentup) && e > h.same_interval_since) {
- dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e
+ dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e
<< " from " << acting << "/" << up
<< " " << actingprimary << "/" << upprimary
<< " -> " << currentacting << "/" << currentup
- << " " << currentactingprimary << "/" << currentupprimary
+ << " " << currentactingprimary << "/" << currentupprimary
<< dendl;
h.same_interval_since = e;
}
// up set change?
if ((up != currentup || upprimary != currentupprimary)
&& e > h.same_up_since) {
- dout(15) << "project_pg_history " << pgid << " up changed in " << e
+ dout(15) << "project_pg_history " << pgid << " up changed in " << e
<< " from " << up << " " << upprimary
<< " -> " << currentup << " " << currentupprimary << dendl;
h.same_up_since = e;
}
OSDMapRef curmap = service.get_osdmap();
-
+
switch (m->op) {
case MOSDPing::PING:
logger->set(l_osd_hb_to, heartbeat_peers.size());
logger->set(l_osd_hb_from, 0);
-
+
// hmm.. am i all alone?
dout(30) << "heartbeat lonely?" << dendl;
if (heartbeat_peers.empty()) {
_send_boot();
return;
}
-
+
// get all the latest maps
if (osdmap->get_epoch() + 1 >= oldest)
osdmap_subscribe(osdmap->get_epoch() + 1, false);
epoch_t cur = osdmap->get_up_thru(whoami);
Mutex::Locker l(mon_report_lock);
if (want > up_thru_wanted) {
- dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")"
+ dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")"
<< ", currently " << cur
<< dendl;
up_thru_wanted = want;
send_alive();
} else {
- dout(10) << "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted
+ dout(10) << "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted
<< ", currently " << cur
<< dendl;
}
osd_stat_t cur_stat = service.get_osd_stat();
cur_stat.fs_perf_stat = store->get_cur_stats();
-
+
pg_stat_queue_lock.Lock();
if (osd_stat_updated || !pg_stat_queue.empty()) {
// yes, these are really pg commands, but there's a limit to how
// much work it's worth. The OSD returns all of them. Make this
-// form (pg <pgid> <cmd>) valid only for the cli.
+// form (pg <pgid> <cmd>) valid only for the cli.
// Rest uses "tell <pgid> <cmd>"
COMMAND("pg " \
"list missing objects on this pg, perhaps starting at an offset given in JSON",
"osd", "r", "cli")
-// new form: tell <pgid> <cmd> for both cli and rest
+// new form: tell <pgid> <cmd> for both cli and rest
COMMAND("query",
"show details of a specific pg", "osd", "r", "cli,rest")
else if (prefix == "flush_pg_stats") {
flush_pg_stats();
}
-
+
else if (prefix == "heap") {
if (!ceph_using_tcmalloc()) {
r = -EOPNOTSUPP;
{
dout(30) << "heartbeat_dispatch " << m << dendl;
switch (m->get_type()) {
-
+
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source_inst() << dendl;
m->put();
if (caps_info.allow_all)
s->caps.set_allow_all();
s->auid = auid;
-
+
if (caps_info.caps.length() > 0) {
bufferlist::iterator p = caps_info.caps.begin();
string str;
else
dout(10) << " session " << s << " " << s->entity_name << " failed to parse caps '" << str << "'" << dendl;
}
-
+
s->put();
}
return true;
void OSD::do_waiters()
{
assert(osd_lock.is_locked());
-
+
dout(10) << "do_waiters -- start" << dendl;
finished_lock.Lock();
while (!finished.empty()) {
switch (m->get_type()) {
- // -- don't need lock --
+ // -- don't need lock --
case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source() << dendl;
m->put();
op->mark_delayed("no osdmap");
break;
}
-
+
// need OSDMap
dispatch_op(op);
}
}
}
}
-
+
m->put();
}
bool OSD::scrub_time_permit(utime_t now)
{
- struct tm bdt;
+ struct tm bdt;
time_t tt = now.sec();
localtime_r(&tt, &bdt);
bool time_permit = false;
if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
time_permit = true;
- }
+ }
} else {
if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
time_permit = true;
- }
+ }
}
if (!time_permit) {
dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
}
pg->unlock();
} while (service.next_scrub_stamp(scrub, &scrub));
- }
+ }
dout(20) << "sched_scrub done" << dendl;
}
if (waiting_for_osdmap.empty()) {
osdmap_subscribe(osdmap->get_epoch() + 1, false);
}
-
+
logger->inc(l_osd_waiting_for_map);
waiting_for_osdmap.push_back(op);
op->mark_delayed("wait for new map");
dout(10) << "handle_osd_map got full map for epoch " << e << dendl;
OSDMap *o = new OSDMap;
bufferlist& bl = p->second;
-
+
o->decode(bl);
ghobject_t fulloid = get_osdmap_pobject_name(e);
note_down_osd(*p);
}
}
-
+
osdmap = newmap;
superblock.current_epoch = cur;
clog->error() << "map e" << osdmap->get_epoch()
<< " had wrong hb front addr (" << osdmap->get_hb_front_addr(whoami)
<< " != my " << hb_front_server_messenger->get_myaddr() << ")";
-
+
if (!service.is_stopping()) {
epoch_t up_epoch = 0;
epoch_t bind_epoch = osdmap->get_epoch();
}
else if (do_shutdown) {
if (network_error) {
- Mutex::Locker l(heartbeat_lock);
+ Mutex::Locker l(heartbeat_lock);
map<int,pair<utime_t,entity_inst_t>>::iterator it = failure_pending.begin();
while (it != failure_pending.end()) {
dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << it->first << dendl;
send_still_alive(osdmap->get_epoch(), it->second.second);
failure_pending.erase(it++);
}
- }
+ }
osd_lock.Unlock();
shutdown();
osd_lock.Lock();
return true;
}
-/**
+/**
* update service map; check pg creations
*/
void OSD::advance_map()
bool is_fast_dispatch)
{
int from = m->get_source().num();
-
+
if (!map->have_inst(from) ||
(map->get_cluster_addr(from) != m->get_source_inst().addr)) {
dout(5) << "from dead osd." << from << ", marking down, "
parent->info.stats.stats.sum = *stat_iter;
parent->write_if_dirty(*(rctx->transaction));
}
-
+
/*
* holding osd_lock
*/
}
dout(20) << "mkpg " << on << " e" << created << "@" << ci->second << dendl;
-
+
// is it still ours?
vector<int> up, acting;
int up_primary = -1;
vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator p =
info_map.begin();
p != info_map.end();
- ++p) {
+ ++p) {
if (!curmap->is_up(p->first)) {
dout(20) << __func__ << " skipping down osd." << p->first << dendl;
continue;
dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << dendl;
int from = m->get_source().num();
-
+
if (!require_same_or_newer_map(op, m->get_epoch(), false))
return;
op->mark_started();
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
-
+
for (map<spg_t,pg_query_t>::iterator it = m->pg_list.begin();
it != m->pg_list.end();
++it) {
dout(7) << "handle_pg_remove from " << m->get_source() << " on "
<< m->pg_list.size() << " pgs" << dendl;
-
+
if (!require_same_or_newer_map(op, m->get_epoch(), false))
return;
-
+
op->mark_started();
for (vector<spg_t>::iterator it = m->pg_list.begin();
dout(10) << "ignoring localized pg " << pgid << dendl;
continue;
}
-
+
RWLock::WLocker l(pg_map_lock);
if (pg_map.count(pgid) == 0) {
dout(10) << " don't have pg " << pgid << dendl;
pg->unlock();
goto out;
}
-
+
dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
#ifdef DEBUG_RECOVERY_OIDS
dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
#endif
-
+
int started = 0;
bool more = pg->start_recovery_ops(max, handle, &started);
dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
if (session) {
session->sent_epoch_lock.lock();
if (session->last_sent_epoch < last_sent_epoch) {
- session->last_sent_epoch = last_sent_epoch;
+ session->last_sent_epoch = last_sent_epoch;
}
session->sent_epoch_lock.unlock();
session->put();
// must be a rep op.
assert(m->get_source().is_osd());
-
+
// share our map with sender, if they're old
bool should_share_map = false;
Session *peer_session =
pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
suicide_interval);
(item.first)->lock_suspend_timeout(tp_handle);
sdata->pg_for_processing[&*(item.first)].pop_front();
if (!(sdata->pg_for_processing[&*(item.first)].size()))
sdata->pg_for_processing.erase(&*(item.first));
- }
+ }
// osd:opwq_process marks the point at which an operation has been dequeued
// and will begin to be handled by a worker thread.
unsigned priority = item.second.get_priority();
unsigned cost = item.second.get_cost();
sdata->sdata_op_ordering_lock.Lock();
-
+
if (priority >= CEPH_MSG_PRIO_LOW)
sdata->pqueue.enqueue_strict(
item.second.get_owner(), priority, item);
"clog_to_syslog_facility",
"clog_to_syslog_level",
"osd_objectstore_fuse",
+ "clog_to_graylog",
+ "clog_to_graylog_host",
+ "clog_to_graylog_port",
+ "host",
+ "fsid",
NULL
};
return KEYS;
if (changed.count("clog_to_monitors") ||
changed.count("clog_to_syslog") ||
changed.count("clog_to_syslog_level") ||
- changed.count("clog_to_syslog_facility")) {
+ changed.count("clog_to_syslog_facility") ||
+ changed.count("clog_to_graylog") ||
+ changed.count("clog_to_graylog_host") ||
+ changed.count("clog_to_graylog_port") ||
+ changed.count("host") ||
+ changed.count("fsid")) {
update_log_config();
}
#ifdef HAVE_LIBFUSE
map<string,string> log_to_syslog;
map<string,string> log_channel;
map<string,string> log_prio;
+ map<string,string> log_to_graylog;
+ map<string,string> log_to_graylog_host;
+ map<string,string> log_to_graylog_port;
+ uuid_d fsid;
+ string host;
+
if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
- log_channel, log_prio) == 0)
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host) == 0)
clog->update_config(log_to_monitors, log_to_syslog,
- log_channel, log_prio);
+ log_channel, log_prio, log_to_graylog,
+ log_to_graylog_host, log_to_graylog_port,
+ fsid, host);
derr << "log_to_monitors " << log_to_monitors << dendl;
}
$(top_builddir)/src/gmock/lib/libgmock_main.la \
$(top_builddir)/src/gmock/lib/libgmock.la \
$(top_builddir)/src/gmock/gtest/lib/libgtest.la \
- $(PTHREAD_LIBS) \
- -luuid
+ $(PTHREAD_LIBS)
if SOLARIS
UNITTEST_LDADD += \