...and expose it to python modules.
Also fix some of the code around how we updated
DaemonState.
Signed-off-by: John Spray <john.spray@redhat.com>
configure->stats_period = 5;
m->get_connection()->send_message(configure);
+ if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+ daemon_state.get(key)->perf_counters.clear();
+ }
+
m->put();
return true;
}
DaemonStatePtr daemon;
if (daemon_state.exists(key)) {
+ dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
daemon = daemon_state.get(key);
} else {
+ dout(4) << "constructing new DaemonState for " << m->daemon_name << dendl;
daemon = std::make_shared<DaemonState>(daemon_state.types);
// FIXME: crap, we don't know the hostname at this stage.
daemon->key = key;
}
assert(daemon != nullptr);
- auto daemon_counters = daemon->perf_counters;
+ auto &daemon_counters = daemon->perf_counters;
daemon_counters.update(m);
m->put();
declared_types.insert(t.path);
}
+ const auto now = ceph_clock_now(g_ceph_context);
+
// Parse packed data according to declared set of types
bufferlist::iterator p = report->packed.begin();
DECODE_START(1, p);
::decode(avgcount, p);
::decode(avgcount2, p);
}
- // TODO: interface for insertion of avgs, add timestamp
- instances[t_path].push(val);
+ // TODO: interface for insertion of avgs
+ instances[t_path].push(now, val);
}
- // TODO: handle badly encoded things without asserting out
DECODE_FINISH(p);
}
+uint64_t PerfCounterInstance::get_current() const
+{
+ return buffer.front().v;
+}
+void PerfCounterInstance::push(utime_t t, uint64_t const &v)
+{
+ buffer.push_back({t, v});
+}
#include <string>
#include <memory>
#include <set>
+#include <boost/circular_buffer.hpp>
#include "common/Mutex.h"
// a particular daemon.
class PerfCounterInstance
{
- // TODO: store some short history or whatever
- uint64_t current;
+ class DataPoint
+ {
+ public:
+ utime_t t;
+ uint64_t v;
+ DataPoint(utime_t t_, uint64_t v_)
+ : t(t_), v(v_)
+ {}
+ };
+
+ boost::circular_buffer<DataPoint> buffer;
+ uint64_t get_current() const;
+
public:
- void push(uint64_t const &v) {current = v;}
+ const boost::circular_buffer<DataPoint> & get_data() const
+ {
+ return buffer;
+ }
+ void push(utime_t t, uint64_t const &v);
+ PerfCounterInstance()
+ : buffer(20) {}
};
std::set<std::string> declared_types;
void update(MMgrReport *report);
+
+ void clear()
+ {
+ instances.clear();
+ declared_types.clear();
+ }
};
// The state that we store about one daemon
return;
}
-
json_spirit::mObject daemon_meta = json_result.get_obj();
- DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
- dm->key = key;
- dm->hostname = daemon_meta.at("hostname").get_str();
- daemon_meta.erase("name");
- daemon_meta.erase("hostname");
+ DaemonStatePtr state;
+ if (daemon_state.exists(key)) {
+ state = daemon_state.get(key);
+ // TODO lock state
+ daemon_meta.erase("name");
+ daemon_meta.erase("hostname");
+ state->metadata.clear();
+ for (const auto &i : daemon_meta) {
+ state->metadata[i.first] = i.second.get_str();
+ }
+ } else {
+ state = std::make_shared<DaemonState>(daemon_state.types);
+ state->key = key;
+ state->hostname = daemon_meta.at("hostname").get_str();
- for (const auto &i : daemon_meta) {
- dm->metadata[i.first] = i.second.get_str();
- }
+ for (const auto &i : daemon_meta) {
+ state->metadata[i.first] = i.second.get_str();
+ }
- daemon_state.insert(dm);
+ daemon_state.insert(state);
+ }
} else if (key.first == CEPH_ENTITY_TYPE_OSD) {
} else {
assert(0);
// FIXME: nothing stopping old daemons being here, they won't have
// addr: need to handle case of pre-ceph-mgr daemons that don't have
// the fields we expect
- auto metadata_addr = metadata->metadata.at("addr");
- const auto map_addr = info.addr;
-
- if (metadata_addr != stringify(map_addr)) {
- dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
- << " != " << stringify(map_addr) << dendl;
+ if (metadata->metadata.empty()) {
update = true;
} else {
- dout(20) << "MDS[" << info.name << "] addr unchanged: "
- << metadata_addr << dendl;
+ auto metadata_addr = metadata->metadata.at("addr");
+ const auto map_addr = info.addr;
+ update = metadata_addr != stringify(map_addr);
+ if (update) {
+ dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
+ << " != " << stringify(map_addr) << dendl;
+ }
}
} else {
update = true;
#undef dout_prefix
#define dout_prefix *_dout << "mgr " << __func__ << " "
}
+
+PyObject* PyModules::get_counter_python(
+ const std::string &handle,
+ entity_type_t svc_type,
+ const std::string &svc_id,
+ const std::string &path)
+{
+ PyThreadState *tstate = PyEval_SaveThread();
+ Mutex::Locker l(lock);
+ PyEval_RestoreThread(tstate);
+
+ PyFormatter f;
+ f.open_array_section(path.c_str());
+
+ auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+
+ // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
+ // lock or put a lock on individual DaemonStates
+ if (metadata) {
+ if (metadata->perf_counters.instances.count(path)) {
+ auto counter_instance = metadata->perf_counters.instances.at(path);
+ const auto &data = counter_instance.get_data();
+ for (const auto &datapoint : data) {
+ f.open_array_section("datapoint");
+ f.dump_unsigned("t", datapoint.t.sec());
+ f.dump_unsigned("v", datapoint.v);
+ f.close_section();
+
+ }
+ } else {
+ dout(4) << "Missing counter: '" << path << "' ("
+ << ceph_entity_type_name(svc_type) << "."
+ << svc_id << ")" << dendl;
+ dout(20) << "Paths are:" << dendl;
+ for (const auto &i : metadata->perf_counters.instances) {
+ dout(20) << i.first << dendl;
+ }
+ }
+ } else {
+ dout(4) << "No daemon state for "
+ << ceph_entity_type_name(svc_type) << "."
+ << svc_id << ")" << dendl;
+ }
+ f.close_section();
+ return f.get();
+}
+
PyObject *list_servers_python();
PyObject *get_metadata_python(std::string const &handle,
entity_type_t svc_type, const std::string &svc_id);
+ PyObject *get_counter_python(std::string const &handle,
+ entity_type_t svc_type, const std::string &svc_id,
+ const std::string &path);
std::map<std::string, std::string> config_cache;
Py_RETURN_NONE;
}
+static entity_type_t svc_type_from_str(const std::string &type_str)
+{
+ if (type_str == std::string("mds")) {
+ return CEPH_ENTITY_TYPE_MDS;
+ } else if (type_str == std::string("osd")) {
+ return CEPH_ENTITY_TYPE_OSD;
+ } else if (type_str == std::string("mon")) {
+ return CEPH_ENTITY_TYPE_MON;
+ } else {
+ return CEPH_ENTITY_TYPE_ANY;
+ }
+}
+
static PyObject*
get_metadata(PyObject *self, PyObject *args)
{
return nullptr;
}
- entity_type_t svc_type;
- if (type_str == std::string("mds")) {
- svc_type = CEPH_ENTITY_TYPE_MDS;
- } else if (type_str == std::string("osd")) {
- svc_type = CEPH_ENTITY_TYPE_OSD;
- } else if (type_str == std::string("mon")) {
- svc_type = CEPH_ENTITY_TYPE_MON;
- } else {
+ entity_type_t svc_type = svc_type_from_str(type_str);
+ if (svc_type == CEPH_ENTITY_TYPE_ANY) {
// FIXME: form a proper exception
return nullptr;
}
+
return global_handle->get_metadata_python(handle, svc_type, svc_id);
}
Py_RETURN_NONE;
}
+static PyObject*
+get_counter(PyObject *self, PyObject *args)
+{
+ char *handle = nullptr;
+ char *type_str = nullptr;
+ char *svc_id = nullptr;
+ char *counter_path = nullptr;
+ if (!PyArg_ParseTuple(args, "ssss:get_counter", &handle, &type_str,
+ &svc_id, &counter_path)) {
+ return nullptr;
+ }
+ entity_type_t svc_type = svc_type_from_str(type_str);
+ if (svc_type == CEPH_ENTITY_TYPE_ANY) {
+ // FIXME: form a proper exception
+ return nullptr;
+ }
+
+ return global_handle->get_counter_python(
+ handle, svc_type, svc_id, counter_path);
+}
PyMethodDef CephStateMethods[] = {
{"get", ceph_state_get, METH_VARARGS,
"Get a configuration value"},
{"set_config", ceph_config_set, METH_VARARGS,
"Set a configuration value"},
+ {"get_counter", get_counter, METH_VARARGS,
+ "Get a performance counter"},
{"log", ceph_log, METH_VARARGS,
"Emit a (local) log message"},
{NULL, NULL, 0, NULL}