if (changed.count("log_to_graylog") || changed.count("err_to_graylog")) {
int l = conf->log_to_graylog ? 99 : (conf->err_to_graylog ? -1 : -2);
log->set_graylog_level(l, l);
+
+ if (conf->log_to_graylog || conf->err_to_graylog) {
+ log->start_graylog();
+ } else if (! (conf->log_to_graylog && conf->err_to_graylog)) {
+ log->stop_graylog();
+ }
}
- if (changed.count("log_graylog_host") || changed.count("log_graylog_port")) {
- log->set_graylog_destination(conf->log_graylog_host, conf->log_graylog_port);
+ if (log->graylog() && (changed.count("log_graylog_host") || changed.count("log_graylog_port"))) {
+ log->graylog()->set_destination(conf->log_graylog_host, conf->log_graylog_port);
}
// metadata
- if (changed.count("host")) {
- log->set_host(conf->host);
+ if (log->graylog() && changed.count("host")) {
+ log->graylog()->set_hostname(conf->host);
}
- if (changed.count("fsid")) {
- log->set_fsid(conf->fsid);
+ if (log->graylog() && changed.count("fsid")) {
+ log->graylog()->set_fsid(conf->fsid);
}
-
}
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Graylog.h"
+
+#include <iostream>
+#include <sstream>
+#include <memory>
+
+#include <boost/asio.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include "common/Formatter.h"
+#include "include/uuid.h"
+
+namespace ceph {
+namespace log {
+
+Graylog::Graylog(const SubsystemMap * const s)
+ : m_subs(s),
+ m_log_dst_valid(false),
+ m_hostname(""),
+ m_fsid(""),
+ m_ostream_compressed(std::stringstream::in |
+ std::stringstream::out |
+ std::stringstream::binary)
+{
+ m_formatter = std::auto_ptr<Formatter>(Formatter::create("json"));
+}
+
+Graylog::~Graylog()
+{
+}
+
+void Graylog::set_destination(const std::string& host, int port)
+{
+ try {
+ boost::asio::ip::udp::resolver resolver(m_io_service);
+ boost::asio::ip::udp::resolver::query query(host,
+ boost::lexical_cast<std::string>(port));
+ m_endpoint = *resolver.resolve(query);
+ m_log_dst_valid = true;
+ } catch (boost::system::system_error const& e) {
+ cerr << "Error resolving graylog destination: " << e.what() << std::endl;
+ m_log_dst_valid = false;
+ }
+}
+
+void Graylog::set_hostname(const std::string& host)
+{
+ m_hostname = host;
+}
+
+void Graylog::set_fsid(uuid_d fsid)
+{
+ std::vector<char> buf(40);
+ fsid.print(&buf[0]);
+ m_fsid = std::string(&buf[0]);
+}
+
+void Graylog::log_entry(Entry const * const e)
+{
+ if (m_log_dst_valid) {
+ std::string s = e->get_str();
+
+ // GELF format
+ // http://www.graylog2.org/resources/gelf/specification
+ m_formatter->open_object_section("");
+ m_formatter->dump_string("version", "1.1");
+ m_formatter->dump_string("host", m_hostname);
+ m_formatter->dump_string("short_message", s);
+ m_formatter->dump_string("_app", "ceph");
+ m_formatter->dump_float("timestamp", e->m_stamp.sec() + (e->m_stamp.usec() / 1000000.0));
+ m_formatter->dump_int("_thread", e->m_thread);
+ m_formatter->dump_int("_prio", e->m_prio);
+ m_formatter->dump_string("_subsys_name", m_subs->get_name(e->m_subsys));
+ m_formatter->dump_int("_subsys_id", e->m_subsys);
+ m_formatter->dump_string("_fsid", m_fsid);
+ m_formatter->close_section();
+
+ m_ostream_compressed.clear();
+ m_ostream_compressed.str("");
+
+ m_ostream.reset();
+
+ m_ostream.push(m_compressor);
+ m_ostream.push(m_ostream_compressed);
+
+ m_formatter->flush(m_ostream);
+ m_ostream << std::endl;
+
+ m_ostream.reset();
+
+ try {
+ boost::asio::ip::udp::socket socket(m_io_service);
+ socket.open(m_endpoint.protocol());
+ socket.send_to(boost::asio::buffer(m_ostream_compressed.str()), m_endpoint);
+ } catch (boost::system::system_error const& e) {
+ cerr << "Error sending graylog message: " << e.what() << std::endl;
+ }
+ }
+}
+
+} // ceph::log::
+} // ceph::
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef __CEPH_LOG_GRAYLOG_H
+#define __CEPH_LOG_GRAYLOG_H
+
+
+#include <memory>
+
+#include <boost/thread/mutex.hpp>
+#include <boost/asio.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/filter/zlib.hpp>
+
+#include "Entry.h"
+#include "SubsystemMap.h"
+
+namespace ceph {
+namespace log {
+
+class Graylog
+{
+ public:
+
+ Graylog(const SubsystemMap * const s);
+ virtual ~Graylog();
+
+ void set_hostname(const std::string& host);
+ void set_fsid(uuid_d fsid);
+
+ void set_destination(const std::string& host, int port);
+
+ void log_entry(Entry const * const e);
+
+ private:
+ SubsystemMap const * const m_subs;
+
+ bool m_log_dst_valid;
+
+ std::string m_hostname;
+ std::string m_fsid;
+
+ boost::asio::ip::udp::endpoint m_endpoint;
+ boost::asio::io_service m_io_service;
+
+ std::auto_ptr<Formatter> m_formatter;
+ std::stringstream m_ostream_compressed;
+ boost::iostreams::filtering_ostream m_ostream;
+ boost::iostreams::zlib_compressor m_compressor;
+};
+
+}
+}
+
+#endif
#include <iostream>
#include <sstream>
-#include <memory>
-#include <boost/lexical_cast.hpp>
#include <boost/asio.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
+#include <boost/shared_ptr.hpp>
#include "common/errno.h"
#include "common/safe_io.h"
m_syslog_log(-2), m_syslog_crash(-2),
m_stderr_log(1), m_stderr_crash(-1),
m_graylog_log(-3), m_graylog_crash(-3),
- m_host(""), m_fsid(),
- m_graylog_endpoint(),
- m_graylog_io_service(),
m_stop(false),
m_max_new(DEFAULT_MAX_NEW),
m_max_recent(DEFAULT_MAX_RECENT),
pthread_mutex_unlock(&m_flush_mutex);
}
-void Log::set_host(string host)
+void Log::start_graylog()
{
- m_host = host;
+ pthread_mutex_lock(&m_flush_mutex);
+ if (! m_graylog.get())
+ m_graylog = boost::shared_ptr<Graylog>(new Graylog(m_subs));
+ pthread_mutex_unlock(&m_flush_mutex);
}
-void Log::set_fsid(uuid_d fsid)
-{
- m_fsid = fsid;
-}
-void Log::set_graylog_destination(string host, int port)
+void Log::stop_graylog()
{
- boost::asio::ip::udp::resolver resolver(m_graylog_io_service);
- boost::asio::ip::udp::resolver::query query(host,
- boost::lexical_cast<string>(port));
- m_graylog_endpoint = *resolver.resolve(query);
+ pthread_mutex_lock(&m_flush_mutex);
+ m_graylog.reset();
+ pthread_mutex_unlock(&m_flush_mutex);
}
void Log::submit_entry(Entry *e)
cerr << "problem writing to " << m_log_file << ": " << cpp_strerror(r) << std::endl;
}
- if (do_graylog2) {
- std::auto_ptr<Formatter> f(Formatter::create("json"));
-
- char fsid_str[40];
- m_fsid.print(fsid_str);
-
- // GELF format
- // http://www.graylog2.org/resources/gelf/specification
- f->open_object_section("");
- f->dump_string("version", "1.1");
- f->dump_string("host", m_host);
- f->dump_string("short_message", s);
- f->dump_string("_app", "ceph");
- f->dump_float("timestamp", e->m_stamp.sec() + (e->m_stamp.usec() / 1000000.0));
- f->dump_int("_thread", e->m_thread);
- f->dump_int("_prio", e->m_prio);
- f->dump_string("_subsys_name", m_subs->get_name(sub));
- f->dump_int("_subsys_id", sub);
- f->dump_int("_crash", crash);
- f->dump_string("_fsid", fsid_str);
- if (crash) f->dump_int("_crash_entry_queue_len", -t->m_len);
- f->close_section();
-
- std::stringstream zos(std::stringstream::in |
- std::stringstream::out |
- std::stringstream::binary);
-
- {
- boost::iostreams::filtering_ostream os;
- os.push(boost::iostreams::zlib_compressor());
- os.push(zos);
-
- f->flush(os);
- os << std::endl;
- }
-
- try {
- boost::asio::ip::udp::socket socket(m_graylog_io_service);
- socket.open(m_graylog_endpoint.protocol());
- socket.send_to(boost::asio::buffer(zos.str()), m_graylog_endpoint);
- } catch (boost::system::system_error const& e) {
- /* The above code fails until the configuration is set */
- }
- }
}
+ if (do_graylog2 && m_graylog) {
+ m_graylog->log_entry(e);
+ }
+
requeue->enqueue(e);
}
}
#include "common/Thread.h"
+#include <assert.h>
#include <pthread.h>
#include <boost/asio.hpp>
#include "Entry.h"
#include "EntryQueue.h"
#include "SubsystemMap.h"
+#include "Graylog.h"
namespace ceph {
namespace log {
Log **m_indirect_this;
SubsystemMap *m_subs;
-
+
pthread_mutex_t m_queue_mutex;
pthread_mutex_t m_flush_mutex;
pthread_cond_t m_cond_loggers;
int m_stderr_log, m_stderr_crash;
int m_graylog_log, m_graylog_crash;
- std::string m_host;
- uuid_d m_fsid;
-
- boost::asio::ip::udp::endpoint m_graylog_endpoint;
- boost::asio::io_service m_graylog_io_service;
+ boost::shared_ptr<Graylog> m_graylog;
bool m_stop;
void set_stderr_level(int log, int crash);
void set_graylog_level(int log, int crash);
+ void start_graylog();
+ void stop_graylog();
+ void update_graylog(const std::string& host, int port);
+
+ Graylog::Ref graylog() { return m_graylog; }
+
Entry *create_entry(int level, int subsys);
Entry *create_entry(int level, int subsys, size_t* expected_size);
void submit_entry(Entry *e);
void start();
void stop();
- void set_host(std::string host);
- void set_fsid(uuid_d fdid);
-
- void set_graylog_destination(std::string host, int port);
-
/// true if the log lock is held by our thread
bool is_inside_log_lock();
liblog_la_SOURCES = \
log/Log.cc \
- log/SubsystemMap.cc
+ log/SubsystemMap.cc \
+ log/Graylog.cc
noinst_LTLIBRARIES += liblog.la
noinst_HEADERS += \
log/Entry.h \
log/EntryQueue.h \
log/Log.h \
- log/SubsystemMap.h
-
+ log/SubsystemMap.h \
+ log/Graylog.h