/*
* Ceph - scalable distributed file system
*
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2011 New Dream Network
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
*
*/
+#define __STDC_FORMAT_MACROS // for PRId64, etc.
-
+#include "common/ProfLogger.h"
+#include "common/Thread.h"
+#include "common/config.h"
+#include "common/config_obs.h"
+#include "common/dout.h"
+#include "common/errno.h"
+#include "common/safe_io.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <map>
+#include <poll.h>
+#include <sstream>
+#include <stdint.h>
+#include <string.h>
#include <string>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
-#include "ProfLogType.h"
-#include "ProfLogger.h"
-
-#include <iostream>
-#include <memory>
-#include "Clock.h"
-
-#include "common/config.h"
+#define PFL_SUCCESS ((void*)(intptr_t)0)
+#define PFL_FAIL ((void*)(intptr_t)1)
+#define COUNT_DISABLED ((uint64_t)(int64_t)-1)
-#include <sys/stat.h>
-#include <sys/types.h>
+using std::ostringstream;
-#include "common/Timer.h"
+enum prof_log_data_any_t {
+ PROF_LOG_DATA_ANY_NONE,
+ PROF_LOG_DATA_ANY_U64,
+ PROF_LOG_DATA_ANY_DOUBLE
+};
-//////////////// C_FlushProfLoggers ////////////////
-class C_FlushProfLoggers : public Context
+class ProfLogThread : public Thread
{
public:
- C_FlushProfLoggers(ProfLoggerCollection *coll_)
- : coll(coll_)
+
+ static std::string create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
{
+ int pipefd[2];
+ int ret = pipe2(pipefd, O_CLOEXEC);
+ if (ret < 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "ProfLogThread::create_shutdown_pipe error: "
+ << cpp_strerror(err);
+ return oss.str();
+ }
+
+ *pipe_rd = pipefd[0];
+ *pipe_wr = pipefd[1];
+ return "";
}
- void finish(int r) {
- coll->flush_all_loggers();
+ static std::string bind_and_listen(const std::string &sock_path, int *fd)
+ {
+ int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (sock_fd < 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "ProfLogThread::bind_and_listen: "
+ << "failed to create socket: " << cpp_strerror(err);
+ return oss.str();
+ }
+ struct sockaddr_un address;
+ memset(&address, 0, sizeof(struct sockaddr_un));
+ address.sun_family = AF_UNIX;
+ snprintf(address.sun_path, sizeof(address.sun_path), sock_path.c_str());
+ if (bind(sock_fd, (struct sockaddr*)&address,
+ sizeof(struct sockaddr_un)) != 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "ProfLogThread::bind_and_listen: "
+ << "failed to bind socket: " << cpp_strerror(err);
+ close(sock_fd);
+ return oss.str();
+ }
+ if (listen(sock_fd, 5) != 0) {
+ int err = errno;
+ ostringstream oss;
+ oss << "ProfLogThread::bind_and_listen: "
+ << "failed to listen to socket: " << cpp_strerror(err);
+ close(sock_fd);
+ return oss.str();
+ }
+ *fd = sock_fd;
+ return "";
+ }
+
+ ProfLogThread(int sock_fd, int shutdown_fd, ProfLoggerCollection *parent)
+ : m_sock_fd(sock_fd),
+ m_shutdown_fd(shutdown_fd),
+ m_parent(parent)
+ {
+ }
+
+ virtual ~ProfLogThread()
+ {
+ if (m_sock_fd != -1)
+ close(m_sock_fd);
+ if (m_shutdown_fd != -1)
+ close(m_shutdown_fd);
+ }
+
+ virtual void* entry()
+ {
+ while (true) {
+ struct pollfd fds[2];
+ memset(fds, 0, sizeof(fds));
+ fds[0].fd = m_sock_fd;
+ fds[0].events = POLLOUT | POLLWRBAND;
+ fds[1].fd = m_shutdown_fd;
+ fds[1].events = POLLIN | POLLRDBAND;
+
+ int ret = poll(fds, 2, NULL);
+ if (ret < 0) {
+ if (ret == -EINTR) {
+ continue;
+ }
+ int err = errno;
+ lderr(m_parent->m_cct) << "ProfLogThread: poll(2) error: '"
+ << cpp_strerror(err) << dendl;
+ return PFL_FAIL;
+ }
+
+ if (fds[0].revents & POLLOUT) {
+ // Send out some data
+ if (!do_accept())
+ return PFL_FAIL;
+ }
+ if (fds[1].revents & POLLIN) {
+ // Parent wants us to shut down
+ return PFL_SUCCESS;
+ }
+ }
}
private:
- ProfLoggerCollection *coll;
+ const static int MAX_PFL_RETRIES = 10;
+
+ bool do_accept()
+ {
+ struct sockaddr_un address;
+ socklen_t address_length;
+ int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
+ &address_length);
+ if (connection_fd < 0) {
+ int err = errno;
+ lderr(m_parent->m_cct) << "ProfLogThread: do_accept error: '"
+ << cpp_strerror(err) << dendl;
+ return false;
+ }
+ FILE *fp = fdopen(m_sock_fd, "w");
+ if (!fp) {
+ int err = errno;
+ lderr(m_parent->m_cct) << "ProfLogThread: failed to fdopen '"
+ << m_sock_fd << "'. error " << cpp_strerror(err) << dendl;
+ close(connection_fd);
+ return false;
+ }
+ fprintf(fp, "{");
+
+ {
+ Mutex::Locker lck(m_parent->m_lock); // Take lock to access m_loggers
+ for (std::set <ProfLogger*>::iterator log = m_parent->m_loggers.begin();
+ log != m_parent->m_loggers.end(); ++log)
+ {
+ // This will take the logger's lock for short period of time,
+ // then release it.
+ (*log)->write_json_to_fp(fp);
+ }
+ }
+
+ fprintf(fp, "}");
+ fflush(fp);
+ fclose(fp); // calls close(connection_fd)
+ return true;
+ }
+
+ ProfLogThread(ProfLogThread &rhs);
+ const ProfLogThread &operator=(const ProfLogThread &rhs);
+
+ int m_sock_fd;
+ int m_shutdown_fd;
+ ProfLoggerCollection *m_parent;
};
-//////////////// ProfLoggerCollection ////////////////
ProfLoggerCollection::
-ProfLoggerCollection(CephContext *cct_)
- : lock("ProfLoggerCollection::lock"),
- logger_timer(cct_, lock), logger_event(NULL),
- last_flush(0), need_reopen(true), need_reset(false), cct(cct_)
+ProfLoggerCollection(CephContext *cct)
+ : m_cct(cct),
+ m_thread(NULL),
+ m_lock("ProfLoggerCollection"),
+ m_shutdown_fd(-1)
{
}
ProfLoggerCollection::
~ProfLoggerCollection()
{
- try {
- lock.Lock();
- logger_timer.shutdown();
- lock.Unlock();
- }
- catch (...) {
+ Mutex::Locker lck(m_lock);
+ shutdown();
+ for (std::set <ProfLogger*>::iterator l = m_loggers.begin();
+ l != m_loggers.end(); ++l) {
+ delete *l;
}
+ m_loggers.clear();
}
-void ProfLoggerCollection::
-logger_reopen_all()
+const char** ProfLoggerCollection::
+get_tracked_conf_keys() const
{
- Mutex::Locker l(lock);
- need_reopen = true;
+ static const char *KEYS[] =
+ { "profiling_logger_uri", NULL };
+ return KEYS;
}
void ProfLoggerCollection::
-logger_reset_all()
+handle_conf_change(const md_config_t *conf,
+ const std::set <std::string> &changed)
{
- Mutex::Locker l(lock);
- need_reopen = true;
- need_reset = true;
+ Mutex::Locker lck(m_lock);
+ if (conf->profiling_logger_uri.empty()) {
+ shutdown();
+ }
+ else {
+ if (!init(conf->profiling_logger_uri)) {
+ lderr(m_cct) << "Initializing profiling logger failed!" << dendl;
+ }
+ }
}
void ProfLoggerCollection::
-logger_start()
+logger_add(class ProfLogger *l)
{
- Mutex::Locker l(lock);
- logger_timer.init();
- if (!logger_event)
- flush_all_loggers();
+ Mutex::Locker lck(m_lock);
+ std::set<ProfLogger*>::iterator i = m_loggers.find(l);
+ assert(i == m_loggers.end());
+ m_loggers.insert(l);
}
void ProfLoggerCollection::
-logger_tare(utime_t s)
+logger_remove(class ProfLogger *l)
{
- Mutex::Locker l(lock);
-
- ldout(cct, 10) << "logger_tare " << s << dendl;
-
- start = s;
-
- utime_t fromstart = ceph_clock_now(cct);
- if (fromstart < start) {
- lderr(cct) << "logger_tare time jumped backwards from "
- << start << " to " << fromstart << dendl;
- fromstart = start;
- }
- fromstart -= start;
- last_flush = fromstart.sec();
+ Mutex::Locker lck(m_lock);
+ std::set<ProfLogger*>::iterator i = m_loggers.find(l);
+ assert(i != m_loggers.end());
+ m_loggers.erase(i);
}
-void ProfLoggerCollection::
-logger_add(ProfLogger *logger)
+bool ProfLoggerCollection::
+init(const std::string &uri)
{
- Mutex::Locker l(lock);
+ /* Shut down old thread, if it exists. */
+ shutdown();
+
+ /* Set up things for the new thread */
+ std::string err;
+ int pipe_rd, pipe_wr;
+ err = ProfLogThread::create_shutdown_pipe(&pipe_rd, &pipe_wr);
+ if (!err.empty()) {
+ lderr(m_cct) << "ProfLoggerCollection::init: error: " << err << dendl;
+ return false;
+ }
+ int sock_fd;
+ err = ProfLogThread::bind_and_listen(uri, &sock_fd);
+ if (!err.empty()) {
+ lderr(m_cct) << "ProfLoggerCollection::init: failed: " << err << dendl;
+ close(pipe_rd);
+ close(pipe_wr);
+ return false;
+ }
- if (logger_list.empty()) {
- if (start == utime_t())
- start = ceph_clock_now(cct);
- last_flush = 0;
+ /* Create new thread */
+ m_thread = new (std::nothrow) ProfLogThread(sock_fd, pipe_rd, this);
+ if (!m_thread) {
+ close(sock_fd);
+ close(pipe_rd);
+ close(pipe_wr);
+ return false;
}
- logger_list.push_back(logger);
- logger->lock = &lock;
+ m_thread->create();
+ m_shutdown_fd = pipe_wr;
+ return 0;
}
void ProfLoggerCollection::
-logger_remove(ProfLogger *logger)
+shutdown()
{
- Mutex::Locker l(lock);
-
- for (list<ProfLogger*>::iterator p = logger_list.begin();
- p != logger_list.end();
- p++) {
- if (*p == logger) {
- logger_list.erase(p);
- delete logger;
- if (logger_list.empty() && logger_event) {
- // If there are no timers, stop the timer events.
- logger_timer.cancel_event(logger_event);
- logger_event = 0;
- }
- return;
+ if (m_thread) {
+ // Send a byte to the shutdown pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int ret = safe_write(m_shutdown_fd, buf, sizeof(buf));
+ m_shutdown_fd = -1;
+
+ if (ret == 0) {
+ // Join and delete the thread
+ m_thread->join();
+ delete m_thread;
+ }
+ else {
+ lderr(m_cct) << "ProfLoggerCollection::shutdown: failed to write "
+ "to thread shutdown pipe: error " << ret << dendl;
}
+ m_thread = NULL;
}
}
-void ProfLoggerCollection::
-flush_all_loggers()
+ProfLogger::
+~ProfLogger()
{
- // ProfLoggerCollection lock must be held here.
- ldout(cct, 20) << "flush_all_loggers" << dendl;
+}
- if (!cct->_conf->profiling_logger)
+void ProfLogger::
+inc(int idx, uint64_t amt)
+{
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_U64)
return;
-
- utime_t now = ceph_clock_now(cct);
- utime_t fromstart = now;
- if (fromstart < start) {
- lderr(cct) << "logger time jumped backwards from " << start << " to "
- << fromstart << dendl;
- //assert(0);
- start = fromstart;
- }
- fromstart -= start;
- int now_sec = fromstart.sec();
-
- // do any catching up we need to
- bool twice = now_sec - last_flush >= 2 * cct->_conf->profiling_logger_interval;
- again:
- ldout(cct, 20) << "fromstart " << fromstart << " last_flush " << last_flush << " flushing" << dendl;
-
- // This logic seems unecessary. We're holding the mutex the whole time here,
- // so need_reopen and need_reset can't change unless we change them.
- // TODO: clean this up slightly
- bool reopen = need_reopen;
- bool reset = need_reset;
-
- for (list<ProfLogger*>::iterator p = logger_list.begin();
- p != logger_list.end();
- ++p)
- (*p)->_flush(need_reopen, need_reset, last_flush);
-
- // did full pass while true?
- if (reopen && need_reopen)
- need_reopen = false;
- if (reset && need_reset)
- need_reset = false;
-
- last_flush = now_sec - (now_sec % cct->_conf->profiling_logger_interval);
- if (twice) {
- twice = false;
- goto again;
- }
-
- // schedule next flush event
- utime_t next;
- next.sec_ref() = start.sec() + last_flush + cct->_conf->profiling_logger_interval;
- next.nsec_ref() = start.nsec();
- ldout(cct, 20) << "logger now=" << now
- << " start=" << start
- << " next=" << next
- << dendl;
- logger_event = new C_FlushProfLoggers(this);
- logger_timer.add_event_at(next, logger_event);
+ data.u.u64 += amt;
+ if (data.count != COUNT_DISABLED)
+ data.count++;
}
-//////////////// ProfLoggerConfObs ////////////////
-ProfLoggerConfObs::ProfLoggerConfObs(ProfLoggerCollection *coll_)
- : coll(coll_)
+void ProfLogger::
+set(int idx, uint64_t amt)
{
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_U64)
+ return;
+ data.u.u64 = amt;
+ if (data.count != COUNT_DISABLED)
+ data.count++;
}
-ProfLoggerConfObs::~ProfLoggerConfObs()
+uint64_t ProfLogger::
+get(int idx)
{
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_DOUBLE)
+ return 0;
+ return data.u.u64;
}
-const char **ProfLoggerConfObs::get_tracked_conf_keys() const
+void ProfLogger::
+finc(int idx, double amt)
{
- static const char *KEYS[] = {
- "profiling_logger", "profiling_logger_interval", "profiling_logger_calc_variance",
- "profiling_logger_subdir", "profiling_logger_dir", NULL
- };
- return KEYS;
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_DOUBLE)
+ return;
+ data.u.dbl += amt;
+ if (data.count != COUNT_DISABLED)
+ data.count++;
}
-void ProfLoggerConfObs::handle_conf_change(const md_config_t *conf,
- const std::set <std::string> &changed)
+void ProfLogger::
+fset(int idx, double amt)
{
- // This could be done a *lot* smarter, if anyone cares to spend time
- // fixing this up.
- // We could probably just take the mutex and call _open_log from here.
- coll->logger_reopen_all();
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_DOUBLE)
+ return;
+ data.u.dbl = amt;
+ if (data.count != COUNT_DISABLED)
+ data.count++;
}
-//////////////// ProfLogger ////////////////
-void ProfLogger::_open_log()
+double ProfLogger::
+fget(int idx)
{
- struct stat st;
-
- filename = "";
- if ((!cct->_conf->chdir.empty()) &&
- (cct->_conf->profiling_logger_dir.substr(0,1) != "/")) {
- char cwd[PATH_MAX];
- char *c = getcwd(cwd, sizeof(cwd));
- assert(c);
- filename = c;
- filename += "/";
- }
-
- filename = cct->_conf->profiling_logger_dir;
-
- // make (feeble) attempt to create logger_dir
- if (::stat(filename.c_str(), &st))
- ::mkdir(filename.c_str(), 0750);
-
- filename += "/";
- if (!cct->_conf->profiling_logger_subdir.empty()) {
- filename += cct->_conf->profiling_logger_subdir;
- ::mkdir( filename.c_str(), 0755 ); // make sure dir exists
- filename += "/";
- }
- filename += name;
-
- ldout(cct, 10) << "ProfLogger::_open " << filename << dendl;
- if (out.is_open())
- out.close();
- out.open(filename.c_str(),
- (need_reset || need_reset) ? ofstream::out : ofstream::out|ofstream::app);
- if (!out.is_open()) {
- ldout(cct, 10) << "failed to open '" << filename << "'" << dendl;
- return; // we fail
- }
-
- // success
- need_open = false;
+ Mutex::Locker lck(m_lock);
+ assert(idx > m_lower_bound);
+ assert(idx < m_upper_bound);
+ prof_log_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+ if (data.type != PROF_LOG_DATA_ANY_DOUBLE)
+ return 0.0;
+ return data.u.dbl;
}
-ProfLogger::~ProfLogger()
+void ProfLogger::
+write_json_to_fp(FILE *fp)
{
- out.close();
+ Mutex::Locker lck(m_lock);
+
+ prof_log_data_vec_t::const_iterator d = m_data.begin();
+ prof_log_data_vec_t::const_iterator d_end = m_data.end();
+ for (; d != d_end; ++d) {
+ const prof_log_data_any_d &data(*d);
+ if (d->count != COUNT_DISABLED) {
+ switch (d->type) {
+ case PROF_LOG_DATA_ANY_U64:
+ fprintf(fp, "\"%s\" : { \"count\" : %" PRId64 ", "
+ "\"sum\" : %" PRId64 " },\n",
+ data.name, data.count, data.u.u64);
+ break;
+ case PROF_LOG_DATA_ANY_DOUBLE:
+ fprintf(fp, "\"%s\" : { \"count\" : %" PRId64 ", "
+ "\"sum\" : %g },\n",
+ data.name, data.count, data.u.dbl);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+ else {
+ switch (d->type) {
+ case PROF_LOG_DATA_ANY_U64:
+ fprintf(fp, "\"%s\" : %" PRId64 ",\n", data.name, data.u.u64);
+ break;
+ case PROF_LOG_DATA_ANY_DOUBLE:
+ fprintf(fp, "\"%s\" : %g,\n", data.name, data.u.dbl);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+ }
}
-void ProfLogger::reopen()
+ProfLogger::
+ProfLogger(CephContext *cct, const std::string &name,
+ int lower_bound, int upper_bound)
+ : m_cct(cct),
+ m_lower_bound(lower_bound),
+ m_upper_bound(upper_bound),
+ m_name(std::string("ProfLogger::") + name.c_str()),
+ m_lock(m_name.c_str())
{
- Mutex::Locker l(*lock);
- need_open = true;
+ m_data.resize(upper_bound - lower_bound - 1);
}
-void ProfLogger::reset()
+ProfLogger::prof_log_data_any_d::
+prof_log_data_any_d()
+ : name(NULL),
+ type(PROF_LOG_DATA_ANY_NONE),
+ count(COUNT_DISABLED)
{
- Mutex::Locker l(*lock);
- need_open = true;
- need_reset = true;
+ memset(&u, 0, sizeof(u));
}
-
-void ProfLogger::_flush(bool need_reopen, bool need_reset, int last_flush)
+ProfLoggerBuilder::
+ProfLoggerBuilder(CephContext *cct, const std::string &name,
+ int first, int last)
+ : m_prof_logger(new ProfLogger(cct, name, first, last))
{
- if (need_reopen)
- _open_log();
- if (need_reset) {
- // reset the counters
- for (int i=0; i<type->num_keys; i++) {
- this->vals[i] = 0;
- this->fvals[i] = 0;
- }
- need_reset = false;
- }
-
- ldout(cct, 20) << "ProfLogger::_flush on " << this << dendl;
-
- // header?
- wrote_header_last++;
- if (wrote_header_last > 10) {
- out << "#" << type->num_keys;
- for (int i=0; i<type->num_keys; i++) {
- out << "\t" << (type->key_name[i] ? type->key_name[i] : "???");
- if (type->avg_keys[i])
- out << "\t(n)\t(var)";
- }
- out << std::endl; //out << "\t (" << type->keymap.size() << ")" << endl;
- wrote_header_last = 0;
- }
-
- // write line to log
- out << last_flush;
- for (int i=0; i<type->num_keys; i++) {
- if (type->avg_keys[i]) {
- if (vals[i] > 0) {
- double avg = (fvals[i] / (double)vals[i]);
- double var = 0.0;
- if (cct->_conf->profiling_logger_calc_variance &&
- (unsigned)vals[i] == vals_to_avg[i].size()) {
- for (vector<double>::iterator p = vals_to_avg[i].begin(); p != vals_to_avg[i].end(); ++p)
- var += (avg - *p) * (avg - *p);
- }
- char s[256];
- snprintf(s, sizeof(s), "\t%.5lf\t%lld\t%.5lf", avg, (long long int)vals[i], var);
- out << s;
- } else
- out << "\t0\t0\t0";
- } else {
- if (fvals[i] > 0 && vals[i] == 0)
- out << "\t" << fvals[i];
- else {
- //cout << this << " p " << i << " and size is " << vals.size() << std::endl;
- out << "\t" << vals[i];
- }
- }
- }
-
- // reset the counters
- for (int i=0; i<type->num_keys; i++) {
- if (type->inc_keys[i]) {
- this->vals[i] = 0;
- this->fvals[i] = 0;
- }
- }
-
- out << std::endl;
}
-
-
-void ProfLogger::inc(int key, int64_t v)
+ProfLoggerBuilder::
+~ProfLoggerBuilder()
{
- if (!cct->_conf->profiling_logger)
- return;
- lock->Lock();
- int i = type->lookup_key(key);
- vals[i] += v;
- lock->Unlock();
+ if (m_prof_logger)
+ delete m_prof_logger;
+ m_prof_logger = NULL;
}
-void ProfLogger::finc(int key, double v)
+void ProfLoggerBuilder::
+add_u64(int idx, const char *name)
{
- if (!cct->_conf->profiling_logger)
- return;
- lock->Lock();
- int i = type->lookup_key(key);
- fvals[i] += v;
- lock->Unlock();
+ add_impl(idx, name, PROF_LOG_DATA_ANY_U64, COUNT_DISABLED);
}
-void ProfLogger::set(int key, int64_t v)
+void ProfLoggerBuilder::
+add_fl(int idx, const char *name)
{
- if (!cct->_conf->profiling_logger)
- return;
- lock->Lock();
- int i = type->lookup_key(key);
- vals[i] = v;
- lock->Unlock();
+ add_impl(idx, name, PROF_LOG_DATA_ANY_DOUBLE, COUNT_DISABLED);
}
-
-void ProfLogger::fset(int key, double v)
+void ProfLoggerBuilder::
+add_fl_avg(int idx, const char *name)
{
- if (!cct->_conf->profiling_logger)
- return;
- lock->Lock();
- int i = type->lookup_key(key);
- fvals[i] = v;
- lock->Unlock();
+ add_impl(idx, name, PROF_LOG_DATA_ANY_DOUBLE, 0);
}
-void ProfLogger::favg(int key, double v)
+void ProfLoggerBuilder::
+add_impl(int idx, const char *name, int ty, uint64_t count)
{
- if (!cct->_conf->profiling_logger)
- return;
- lock->Lock();
- int i = type->lookup_key(key);
- vals[i]++;
- fvals[i] += v;
- if (cct->_conf->profiling_logger_calc_variance)
- vals_to_avg[i].push_back(v);
- lock->Unlock();
+ assert(idx > m_prof_logger->m_lower_bound);
+ assert(idx < m_prof_logger->m_upper_bound);
+ ProfLogger::prof_log_data_vec_t &vec(m_prof_logger->m_data);
+ ProfLogger::prof_log_data_any_d
+ &data(vec[idx - m_prof_logger->m_lower_bound - 1]);
+ data.name = name;
+ data.type = ty;
+ data.count = count;
}
-int64_t ProfLogger::get(int key)
+ProfLogger *ProfLoggerBuilder::
+create_proflogger()
{
- if (!cct->_conf->profiling_logger)
- return 0;
- lock->Lock();
- int i = type->lookup_key(key);
- int64_t r = 0;
- if (i >= 0 && i < (int)vals.size())
- r = vals[i];
- lock->Unlock();
- return r;
+ ProfLogger::prof_log_data_vec_t::const_iterator d = m_prof_logger->m_data.begin();
+ ProfLogger::prof_log_data_vec_t::const_iterator d_end = m_prof_logger->m_data.end();
+ for (; d != d_end; ++d) {
+ assert(d->type != PROF_LOG_DATA_ANY_NONE);
+ }
+ ProfLogger *ret = m_prof_logger;
+ m_prof_logger = NULL;
+ return ret;
}
-
/*
* Ceph - scalable distributed file system
*
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Copyright (C) 2011 New Dream Network
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
*/
-#ifndef CEPH_LOGGER_H
-#define CEPH_LOGGER_H
+#ifndef CEPH_PROF_LOG_H
+#define CEPH_PROF_LOG_H
-#include "common/config.h"
-#include "common/Clock.h"
-#include "common/ProfLogType.h"
-#include "common/Timer.h"
-#include "include/types.h"
+#include "common/config_obs.h"
+#include "common/Mutex.h"
+#include <stdint.h>
#include <string>
-#include <fstream>
#include <vector>
-class ProfLogger;
+class ProfLoggerBuilder;
+class CephContext;
+class Thread;
-class ProfLoggerCollection
+/*
+ * ProfLog manages the profiler logging for a Ceph process.
+ */
+class ProfLoggerCollection : public md_config_obs_t
{
public:
- ProfLoggerCollection(CephContext *cct_);
+ ProfLoggerCollection(CephContext *cct);
~ProfLoggerCollection();
- void logger_reopen_all();
- void logger_reset_all();
- void logger_add(class ProfLogger *l);
- void logger_remove(class ProfLogger *l);
- void flush_all_loggers();
- void logger_tare(utime_t when);
- void logger_start();
-private:
- Mutex lock; // big lock. lame, but this way I protect ProfLogType too!
- SafeTimer logger_timer;
- Context *logger_event;
- list<ProfLogger*> logger_list;
- utime_t start;
- int last_flush; // in seconds since start
- bool need_reopen;
- bool need_reset;
- CephContext *cct;
-};
-
-class ProfLoggerConfObs : public md_config_obs_t {
-public:
- ProfLoggerConfObs(ProfLoggerCollection *coll_);
- ~ProfLoggerConfObs();
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const md_config_t *conf,
const std::set <std::string> &changed);
+ void logger_add(class ProfLogger *l);
+ void logger_remove(class ProfLogger *l);
private:
- ProfLoggerCollection *coll;
-};
+ bool init(const std::string &uri);
+ void shutdown();
-class ProfLogger {
- protected:
- CephContext *cct;
- // my type
- std::string name, filename;
- ProfLogType *type;
+ CephContext *m_cct;
+ Thread* m_thread;
- bool need_open;
- bool need_reset;
- bool need_close;
+ /** Protects m_loggers */
+ Mutex m_lock;
- // values for this instance
- std::vector<int64_t> vals;
- std::vector<double> fvals;
- std::vector< std::vector<double> > vals_to_avg; // for calculating variance
+ int m_shutdown_fd;
+ std::set <ProfLogger*> m_loggers;
- std::ofstream out;
+ friend class ProfLogThread;
+};
- // what i've written
- //int last_logged;
- int wrote_header_last;
+class ProfLogger
+{
+public:
+ ~ProfLogger();
- void _open_log();
+ void inc(int idx, uint64_t v = 1);
+ void set(int idx, uint64_t v);
+ uint64_t get(int idx);
- private:
- Mutex *lock;
+ void fset(int idx, double v);
+ void finc(int idx, double v);
+ double fget(int idx);
- public:
- ProfLogger(CephContext *cct_, const std::string &n, ProfLogType *t) :
- cct(cct_), name(n), type(t),
- need_open(true), need_reset(false), need_close(false),
- vals(t->num_keys), fvals(t->num_keys), vals_to_avg(t->num_keys),
- wrote_header_last(10000), lock(NULL) { }
- ~ProfLogger();
+ void write_json_to_fp(FILE *fp);
- void inc(int f, int64_t v = 1);
- void set(int f, int64_t v);
- int64_t get(int f);
+private:
+ ProfLogger(CephContext *cct, const std::string &name,
+ int lower_bound, int upper_bound);
+ ProfLogger(const ProfLogger &rhs);
+ ProfLogger& operator=(const ProfLogger &rhs);
+
+ /** Represents a ProfLogger data element. */
+ struct prof_log_data_any_d {
+ prof_log_data_any_d();
+ const char *name;
+ int type;
+ union {
+ uint64_t u64;
+ double dbl;
+ } u;
+ uint64_t count;
+ };
+ typedef std::vector<prof_log_data_any_d> prof_log_data_vec_t;
- void fset(int f, double v);
- void finc(int f, double v);
- void favg(int f, double v);
+ CephContext *m_cct;
+ int m_lower_bound;
+ int m_upper_bound;
+ const std::string m_name;
- void _flush(bool need_reopen, bool need_reset, int last_flush);
+ /** Protects m_data */
+ Mutex m_lock;
- void reopen();
- void reset();
- void close();
+ prof_log_data_vec_t m_data;
- friend class ProfLoggerCollection;
+ friend class ProfLoggerBuilder;
};
+/* Class for constructing ProfLoggers.
+ *
+ * This class peforms some validation that the parameters we have supplied are
+ * correct in create_proflogger().
+ *
+ * In the future, we will probably get rid of the first/last arguments, since
+ * ProfLoggerBuilder can deduce them itself.
+ */
class ProfLoggerBuilder
{
public:
ProfLoggerBuilder(CephContext *cct, const std::string &name,
- int first, int last)
- : m_cct(cct),
- m_name(name)
- {
- m_plt = new ProfLogType(first, last);
- }
-
- void add_u64(int key, const char *name) {
- m_plt->add_inc(key, name);
- }
- void add_fl(int key, const char *name) {
- m_plt->add_inc(key, name);
- }
- void add_fl_avg(int key, const char *name) {
- m_plt->add_avg(key, name);
- }
- ProfLogger* create_proflogger() {
- // TODO: remove m_plt
- m_plt->validate();
- return new ProfLogger(m_cct, m_name, m_plt);
- }
-
+ int first, int last);
+ ~ProfLoggerBuilder();
+ void add_u64(int key, const char *name);
+ void add_fl(int key, const char *name);
+ void add_fl_avg(int key, const char *name);
+ ProfLogger* create_proflogger();
private:
ProfLoggerBuilder(const ProfLoggerBuilder &rhs);
ProfLoggerBuilder& operator=(const ProfLoggerBuilder &rhs);
+ void add_impl(int idx, const char *name, int ty, uint64_t count);
- CephContext *m_cct;
- std::string m_name;
- ProfLogType *m_plt;
+ ProfLogger *m_prof_logger;
};
#endif