proc = self.controller.run(**kwargs)
return proc.exitstatus
- def run_ceph_w(self):
+ def run_ceph_w(self, watch_channel=None):
"""
Execute "ceph -w" in the background with stdout connected to a StringIO,
and return the RemoteProcess.
- """
- return self.controller.run(
- args=["sudo",
- "daemon-helper",
- "kill",
- "ceph",
- '--cluster',
- self.cluster,
- "-w"],
- wait=False, stdout=StringIO(), stdin=run.PIPE)
+
+ :param watch_channel: Specifies the channel to be watched. This can be
+ 'cluster', 'audit', ...
+ :type watch_channel: str
+ """
+ args = ["sudo",
+ "daemon-helper",
+ "kill",
+ "ceph",
+ '--cluster',
+ self.cluster,
+ "-w"]
+ if watch_channel is not None:
+ args.append("--watch-channel")
+ args.append(watch_channel)
+ return self.controller.run(args, wait=False, stdout=StringIO(), stdin=run.PIPE)
def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
"""
self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
"Ended test {0}".format(self.id()))
- def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+ def assert_cluster_log(self, expected_pattern, invert_match=False,
+ timeout=10, watch_channel=None):
"""
Context manager. Assert that during execution, or up to 5 seconds later,
the Ceph cluster log emits a message matching the expected pattern.
- :param expected_pattern: a string that you expect to see in the log output
+ :param expected_pattern: A string that you expect to see in the log output
+ :type expected_pattern: str
+ :param watch_channel: Specifies the channel to be watched. This can be
+ 'cluster', 'audit', ...
+ :type watch_channel: str
"""
ceph_manager = self.ceph_cluster.mon_manager
return found
def __enter__(self):
- self.watcher_process = ceph_manager.run_ceph_w()
+ self.watcher_process = ceph_manager.run_ceph_w(watch_channel)
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.watcher_process.finished:
self.mgr_cluster.mon_manager.raw_cluster_cmd(
"mgr", "self-test", "remote")
+ def test_selftest_cluster_log(self):
+ """
+ Use the selftest module to test the cluster/audit log interface.
+ """
+ priority_map = {
+ 'info': 'INF',
+ 'security': 'SEC',
+ 'warning': 'WRN',
+ 'error': 'ERR'
+ }
+ self._load_module("selftest")
+ for priority in priority_map.keys():
+ message = "foo bar {}".format(priority)
+ log_message = "[{}] {}".format(priority_map[priority], message)
+ # Check for cluster/audit logs:
+ # 2018-09-24 09:37:10.977858 mgr.x [INF] foo bar info
+ # 2018-09-24 09:37:10.977860 mgr.x [SEC] foo bar security
+ # 2018-09-24 09:37:10.977863 mgr.x [WRN] foo bar warning
+ # 2018-09-24 09:37:10.977866 mgr.x [ERR] foo bar error
+ with self.assert_cluster_log(log_message):
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ "mgr", "self-test", "cluster-log", "cluster",
+ priority, message)
+ with self.assert_cluster_log(log_message, watch_channel="audit"):
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ "mgr", "self-test", "cluster-log", "audit",
+ priority, message)
"""
return LocalRemote()
- def run_ceph_w(self):
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
+ def run_ceph_w(self, watch_channel=None):
+ """
+ :param watch_channel: Specifies the channel to be watched.
+ This can be 'cluster', 'audit', ...
+ :type watch_channel: str
+ """
+ args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
+ if watch_channel is not None:
+ args.append("--watch-channel")
+ args.append(watch_channel)
+ proc = self.controller.run(args, wait=False, stdout=StringIO())
return proc
def raw_cluster_cmd(self, *args, **kwargs):
public:
ActivePyModule(const PyModuleRef &py_module_,
- LogChannelRef clog_)
- : PyModuleRunner(py_module_, clog_)
+ LogChannelRef clog_, LogChannelRef audit_clog_)
+ : PyModuleRunner(py_module_, clog_, audit_clog_)
{}
int load(ActivePyModules *py_modules);
ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
std::map<std::string, std::string> store_data,
DaemonStateIndex &ds, ClusterState &cs,
- MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
+ MonClient &mc, LogChannelRef clog_,
+ LogChannelRef audit_clog_, Objecter &objecter_,
Client &client_, Finisher &f, DaemonServer &server)
: module_config(module_config_), daemon_state(ds), cluster_state(cs),
- monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
+ monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+ client(client_), finisher(f),
server(server), lock("ActivePyModules")
{
store_cache = std::move(store_data);
ceph_assert(modules.count(py_module->get_name()) == 0);
- modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog));
+ modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog, audit_clog));
auto active_module = modules.at(py_module->get_name()).get();
int r = active_module->load(this);
DaemonStateIndex &daemon_state;
ClusterState &cluster_state;
MonClient &monc;
- LogChannelRef clog;
+ LogChannelRef clog, audit_clog;
Objecter &objecter;
Client &client;
Finisher &finisher;
ActivePyModules(PyModuleConfig &module_config,
std::map<std::string, std::string> store_data,
DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
- LogChannelRef clog_, Objecter &objecter_, Client &client_,
+ LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
Finisher &f, DaemonServer &server);
~ActivePyModules();
static PyObject*
ceph_log(BaseMgrModule *self, PyObject *args)
{
-
int level = 0;
char *record = nullptr;
if (!PyArg_ParseTuple(args, "is:log", &level, &record)) {
Py_RETURN_NONE;
}
+static PyObject*
+ceph_cluster_log(BaseMgrModule *self, PyObject *args)
+{
+ int prio = 0;
+ char *channel = nullptr;
+ char *message = nullptr;
+ if (!PyArg_ParseTuple(args, "sis:ceph_cluster_log", &channel, &prio, &message)) {
+ return nullptr;
+ }
+
+ ceph_assert(self->this_module);
+
+ self->this_module->cluster_log(channel, (clog_type)prio, message);
+
+ Py_RETURN_NONE;
+}
+
static PyObject *
ceph_get_version(BaseMgrModule *self, PyObject *args)
{
{"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS,
"Emit a (local) log message"},
+ {"_ceph_cluster_log", (PyCFunction)ceph_cluster_log, METH_VARARGS,
+ "Emit an cluster log message"},
+
{"_ceph_get_version", (PyCFunction)ceph_get_version, METH_VARARGS,
"Get the ceph version of this process"},
// assume finisher already initialized in background_init
dout(4) << "starting python modules..." << dendl;
py_module_registry->active_start(daemon_state, cluster_state,
- kv_store, *monc, clog, *objecter, *client, finisher, server);
+ kv_store, *monc, clog, audit_clog, *objecter, *client,
+ finisher, server);
dout(4) << "Complete." << dendl;
initializing = false;
audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
lock("MgrStandby::lock"),
timer(g_ceph_context, lock),
- py_module_registry(clog),
+ py_module_registry(clog, audit_clog),
active_mgr(nullptr),
orig_argc(argc),
orig_argv(argv),
dout(4) << "Starting modules in standby mode" << dendl;
standby_modules.reset(new StandbyPyModules(
- mgr_map, module_config, clog, mc));
+ mgr_map, module_config, clog, audit_clog, mc));
std::set<std::string> failed_modules;
for (const auto &i : modules) {
void PyModuleRegistry::active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
- MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
- Client &client_, Finisher &f, DaemonServer &server)
+ MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
+ Objecter &objecter_, Client &client_, Finisher &f,
+ DaemonServer &server)
{
Mutex::Locker locker(lock);
active_modules.reset(new ActivePyModules(
module_config, kv_store, ds, cs, mc,
- clog_, objecter_, client_, f, server));
+ clog_, audit_clog_, objecter_, client_, f, server));
for (const auto &i : modules) {
// Anything we're skipping because of !can_run will be flagged
{
private:
mutable Mutex lock{"PyModuleRegistry::lock"};
- LogChannelRef clog;
+ LogChannelRef clog, audit_clog;
std::map<std::string, PyModuleRef> modules;
return modules_out;
}
- explicit PyModuleRegistry(LogChannelRef clog_)
- : clog(clog_)
+ explicit PyModuleRegistry(LogChannelRef clog_, LogChannelRef audit_clog_)
+ : clog(clog_), audit_clog(audit_clog_)
{}
/**
void active_start(
DaemonStateIndex &ds, ClusterState &cs,
const std::map<std::string, std::string> &kv_store,
- MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
- Client &client_, Finisher &f, DaemonServer &server);
+ MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
+ Objecter &objecter_, Client &client_, Finisher &f,
+ DaemonServer &server);
void standby_start(MonClient &mc);
bool is_standby_running() const
#define dout_prefix *_dout << "mgr " << __func__ << " "
}
+void PyModuleRunner::cluster_log(const std::string &channel, clog_type prio,
+ const std::string &message)
+{
+ if (std::string(channel) == "audit") {
+ audit_clog->do_log(prio, message);
+ } else {
+ clog->do_log(prio, message);
+ }
+}
+
void* PyModuleRunner::PyModuleRunnerThread::entry()
{
// No need to acquire the GIL here; the module does it.
// Populated by descendent class
PyObject *pClassInstance = nullptr;
- LogChannelRef clog;
+ LogChannelRef clog, audit_clog;
class PyModuleRunnerThread : public Thread
{
int serve();
void shutdown();
void log(int level, const std::string &record);
+ void cluster_log(const std::string &channel, clog_type prio,
+ const std::string &message);
const char *get_thread_name() const
{
PyModuleRunner(
const PyModuleRef &py_module_,
- LogChannelRef clog_)
+ LogChannelRef clog_,
+ LogChannelRef audit_clog_)
:
py_module(py_module_),
clog(clog_),
+ audit_clog(audit_clog_),
thread(this)
{
// Shortened name for use as thread name, because thread names
const MgrMap &mgr_map_,
PyModuleConfig &module_config,
LogChannelRef clog_,
+ LogChannelRef audit_clog_,
MonClient &monc_)
: state(module_config, monc_),
- clog(clog_)
+ clog(clog_),
+ audit_clog(audit_clog_)
{
state.set_mgr_map(mgr_map_);
}
modules[module_name].reset(new StandbyPyModule(
state,
- py_module, clog));
+ py_module, clog, audit_clog));
int r = modules[module_name]->load();
if (r != 0) {
StandbyPyModule(
StandbyPyModuleState &state_,
const PyModuleRef &py_module_,
- LogChannelRef clog_)
+ LogChannelRef clog_,
+ LogChannelRef audit_clog_)
:
- PyModuleRunner(py_module_, clog_),
+ PyModuleRunner(py_module_, clog_, audit_clog_),
state(state_)
{
}
StandbyPyModuleState state;
- LogChannelRef clog;
+ LogChannelRef clog, audit_clog;
public:
const MgrMap &mgr_map_,
PyModuleConfig &module_config,
LogChannelRef clog_,
+ LogChannelRef audit_clog_,
MonClient &monc);
int start_one(PyModuleRef py_module);
import ceph_module # noqa
-import json
import logging
import six
import threading
# units supported
BYTES = 0
NONE = 1
-
+
+ # Cluster log priorities
+ CLUSTER_LOG_PRIO_DEBUG = 0
+ CLUSTER_LOG_PRIO_INFO = 1
+ CLUSTER_LOG_PRIO_SEC = 2
+ CLUSTER_LOG_PRIO_WARN = 3
+ CLUSTER_LOG_PRIO_ERROR = 4
+
def __init__(self, module_name, py_modules_ptr, this_ptr):
self.module_name = module_name
def log(self):
return self._logger
+ def cluster_log(self, channel, priority, message):
+ """
+ :param channel: The log channel. This can be 'cluster', 'audit', ...
+ :type channel: str
+ :param priority: The log message priority. This can be
+ CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
+ CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
+ CLUSTER_LOG_PRIO_ERROR.
+ :type priority: int
+ :param message: The message to log.
+ :type message: str
+ """
+ self._ceph_cluster_log(channel, priority, message)
+
@property
def version(self):
return self._version
"desc": "Set the now time for the insights module.",
"perm": "rw"
},
+ {
+ "cmd": "mgr self-test cluster-log name=channel,type=CephString "
+ "name=priority,type=CephString "
+ "name=message,type=CephString",
+ "desc": "Create an audit log record.",
+ "perm": "rw"
+ },
]
def __init__(self, *args, **kwargs):
return self._health_clear(inbuf, command)
elif command['prefix'] == 'mgr self-test insights_set_now_offset':
return self._insights_set_now_offset(inbuf, command)
+ elif command['prefix'] == 'mgr self-test cluster-log':
+ priority_map = {
+ 'info': self.CLUSTER_LOG_PRIO_INFO,
+ 'security': self.CLUSTER_LOG_PRIO_SEC,
+ 'warning': self.CLUSTER_LOG_PRIO_WARN,
+ 'error': self.CLUSTER_LOG_PRIO_ERROR
+ }
+ self.cluster_log(command['channel'],
+ priority_map[command['priority']],
+ command['message'])
+ return 0, '', 'Successfully called'
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(command['prefix']))