]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: Add ability to trigger a cluster/audit log message from Python
authorVolker Theile <vtheile@suse.com>
Mon, 24 Sep 2018 10:18:56 +0000 (12:18 +0200)
committerVolker Theile <vtheile@suse.com>
Thu, 4 Oct 2018 11:33:18 +0000 (13:33 +0200)
Fixes: https://tracker.ceph.com/issues/36194
Signed-off-by: Volker Theile <vtheile@suse.com>
18 files changed:
qa/tasks/ceph_manager.py
qa/tasks/ceph_test_case.py
qa/tasks/mgr/test_module_selftest.py
qa/tasks/vstart_runner.py
src/mgr/ActivePyModule.h
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mgr/Mgr.cc
src/mgr/MgrStandby.cc
src/mgr/PyModuleRegistry.cc
src/mgr/PyModuleRegistry.h
src/mgr/PyModuleRunner.cc
src/mgr/PyModuleRunner.h
src/mgr/StandbyPyModules.cc
src/mgr/StandbyPyModules.h
src/pybind/mgr/mgr_module.py
src/pybind/mgr/selftest/module.py

index 9638670a1574689ef55549d4cd4684fa1588f0fc..232c3f79bb355ca1a71bafc919e121a9bb0424a0 100644 (file)
@@ -1179,20 +1179,26 @@ class CephManager:
         proc = self.controller.run(**kwargs)
         return proc.exitstatus
 
-    def run_ceph_w(self):
+    def run_ceph_w(self, watch_channel=None):
         """
         Execute "ceph -w" in the background with stdout connected to a StringIO,
         and return the RemoteProcess.
-        """
-        return self.controller.run(
-            args=["sudo",
-                  "daemon-helper",
-                  "kill",
-                  "ceph",
-                  '--cluster',
-                  self.cluster,
-                  "-w"],
-            wait=False, stdout=StringIO(), stdin=run.PIPE)
+
+        :param watch_channel: Specifies the channel to be watched. This can be
+                              'cluster', 'audit', ...
+        :type watch_channel: str
+        """
+        args = ["sudo",
+                "daemon-helper",
+                "kill",
+                "ceph",
+                '--cluster',
+                self.cluster,
+                "-w"]
+        if watch_channel is not None:
+            args.append("--watch-channel")
+            args.append(watch_channel)
+        return self.controller.run(args, wait=False, stdout=StringIO(), stdin=run.PIPE)
 
     def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
         """
index 1c6bc04ef8a3e94265f3f5b189ebecc9d47db283..05e22722deb0c9a8348b57a7768591a230c463c3 100644 (file)
@@ -34,12 +34,17 @@ class CephTestCase(unittest.TestCase):
         self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
             "Ended test {0}".format(self.id()))
 
-    def assert_cluster_log(self, expected_pattern, invert_match=False, timeout=10):
+    def assert_cluster_log(self, expected_pattern, invert_match=False,
+                           timeout=10, watch_channel=None):
         """
         Context manager.  Assert that during execution, or up to 5 seconds later,
         the Ceph cluster log emits a message matching the expected pattern.
 
-        :param expected_pattern: a string that you expect to see in the log output
+        :param expected_pattern: A string that you expect to see in the log output
+        :type expected_pattern: str
+        :param watch_channel: Specifies the channel to be watched. This can be
+                              'cluster', 'audit', ...
+        :type watch_channel: str
         """
 
         ceph_manager = self.ceph_cluster.mon_manager
@@ -53,7 +58,7 @@ class CephTestCase(unittest.TestCase):
                 return found
 
             def __enter__(self):
-                self.watcher_process = ceph_manager.run_ceph_w()
+                self.watcher_process = ceph_manager.run_ceph_w(watch_channel)
 
             def __exit__(self, exc_type, exc_val, exc_tb):
                 if not self.watcher_process.finished:
index fa636d05ea3bcec4fc16c5327bca16ee44c7abe7..44c38b04deddb977a4a512dbeb12a98cf2b85202 100644 (file)
@@ -291,3 +291,30 @@ class TestModuleSelftest(MgrTestCase):
         self.mgr_cluster.mon_manager.raw_cluster_cmd(
             "mgr", "self-test", "remote")
 
+    def test_selftest_cluster_log(self):
+        """
+        Use the selftest module to test the cluster/audit log interface.
+        """
+        priority_map = {
+            'info': 'INF',
+            'security': 'SEC',
+            'warning': 'WRN',
+            'error': 'ERR'
+        }
+        self._load_module("selftest")
+        for priority in priority_map.keys():
+            message = "foo bar {}".format(priority)
+            log_message = "[{}] {}".format(priority_map[priority], message)
+            # Check for cluster/audit logs:
+            # 2018-09-24 09:37:10.977858 mgr.x [INF] foo bar info
+            # 2018-09-24 09:37:10.977860 mgr.x [SEC] foo bar security
+            # 2018-09-24 09:37:10.977863 mgr.x [WRN] foo bar warning
+            # 2018-09-24 09:37:10.977866 mgr.x [ERR] foo bar error
+            with self.assert_cluster_log(log_message):
+                self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                    "mgr", "self-test", "cluster-log", "cluster",
+                    priority, message)
+            with self.assert_cluster_log(log_message, watch_channel="audit"):
+                self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                    "mgr", "self-test", "cluster-log", "audit",
+                    priority, message)
index 5e7278b36e20fde8615776848807e7a24aac9cf2..46828429bed890a49dc5a530d5e0255f31112cfc 100644 (file)
@@ -577,8 +577,17 @@ class LocalCephManager(CephManager):
         """
         return LocalRemote()
 
-    def run_ceph_w(self):
-        proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
+    def run_ceph_w(self, watch_channel=None):
+        """
+        :param watch_channel: Specifies the channel to be watched.
+                              This can be 'cluster', 'audit', ...
+        :type watch_channel: str
+        """
+        args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
+        if watch_channel is not None:
+            args.append("--watch-channel")
+            args.append(watch_channel)
+        proc = self.controller.run(args, wait=False, stdout=StringIO())
         return proc
 
     def raw_cluster_cmd(self, *args, **kwargs):
index 149d297faf13ef37e2872dc6eb43569dcad5d903..6b382bdc863db7187c809b58c844eec3b9aa7bd8 100644 (file)
@@ -44,8 +44,8 @@ private:
 
 public:
   ActivePyModule(const PyModuleRef &py_module_,
-      LogChannelRef clog_)
-    : PyModuleRunner(py_module_, clog_)
+      LogChannelRef clog_, LogChannelRef audit_clog_)
+    : PyModuleRunner(py_module_, clog_, audit_clog_)
   {}
 
   int load(ActivePyModules *py_modules);
index 1c89dcc3188a24cf81536b1c275fb04ca680db83..7e22775bac7ced5d6a6c68cbb20d33953d4c7eb6 100644 (file)
 ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
           std::map<std::string, std::string> store_data,
           DaemonStateIndex &ds, ClusterState &cs,
-         MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
+          MonClient &mc, LogChannelRef clog_,
+          LogChannelRef audit_clog_, Objecter &objecter_,
           Client &client_, Finisher &f, DaemonServer &server)
   : module_config(module_config_), daemon_state(ds), cluster_state(cs),
-    monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
+    monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+    client(client_), finisher(f),
     server(server), lock("ActivePyModules")
 {
   store_cache = std::move(store_data);
@@ -375,7 +377,7 @@ int ActivePyModules::start_one(PyModuleRef py_module)
 
   ceph_assert(modules.count(py_module->get_name()) == 0);
 
-  modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog));
+  modules[py_module->get_name()].reset(new ActivePyModule(py_module, clog, audit_clog));
   auto active_module = modules.at(py_module->get_name()).get();
 
   int r = active_module->load(this);
index 35476c1bbc801f9b9a39fa9b2b01bdc8eef6fc23..48302d63ac5b7f33b6e70d788866379c46707cd5 100644 (file)
@@ -42,7 +42,7 @@ class ActivePyModules
   DaemonStateIndex &daemon_state;
   ClusterState &cluster_state;
   MonClient &monc;
-  LogChannelRef clog;
+  LogChannelRef clog, audit_clog;
   Objecter &objecter;
   Client   &client;
   Finisher &finisher;
@@ -55,7 +55,7 @@ public:
   ActivePyModules(PyModuleConfig &module_config,
             std::map<std::string, std::string> store_data,
             DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
-            LogChannelRef clog_, Objecter &objecter_, Client &client_,
+            LogChannelRef clog_, LogChannelRef audit_clog_, Objecter &objecter_, Client &client_,
             Finisher &f, DaemonServer &server);
 
   ~ActivePyModules();
index a6fe20d8f3ddbd13c5930228d55e61f1fdc079ed..91687b6c8a108dacb9cf6a13b9bfe14349587b02 100644 (file)
@@ -465,7 +465,6 @@ get_daemon_status(BaseMgrModule *self, PyObject *args)
 static PyObject*
 ceph_log(BaseMgrModule *self, PyObject *args)
 {
-
   int level = 0;
   char *record = nullptr;
   if (!PyArg_ParseTuple(args, "is:log", &level, &record)) {
@@ -479,6 +478,23 @@ ceph_log(BaseMgrModule *self, PyObject *args)
   Py_RETURN_NONE;
 }
 
+static PyObject*
+ceph_cluster_log(BaseMgrModule *self, PyObject *args)
+{
+  int prio = 0;
+  char *channel = nullptr;
+  char *message = nullptr;
+  if (!PyArg_ParseTuple(args, "sis:ceph_cluster_log", &channel, &prio, &message)) {
+    return nullptr;
+  }
+
+  ceph_assert(self->this_module);
+
+  self->this_module->cluster_log(channel, (clog_type)prio, message);
+
+  Py_RETURN_NONE;
+}
+
 static PyObject *
 ceph_get_version(BaseMgrModule *self, PyObject *args)
 {
@@ -692,6 +708,9 @@ PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_log", (PyCFunction)ceph_log, METH_VARARGS,
    "Emit a (local) log message"},
 
+  {"_ceph_cluster_log", (PyCFunction)ceph_cluster_log, METH_VARARGS,
+   "Emit an cluster log message"},
+
   {"_ceph_get_version", (PyCFunction)ceph_get_version, METH_VARARGS,
    "Get the ceph version of this process"},
 
index eb60ee245384f2d8de272a40443c20b357b927d7..573975988f3df2311b0ae2c95f78bb99e24c9bce 100644 (file)
@@ -286,7 +286,8 @@ void Mgr::init()
   // assume finisher already initialized in background_init
   dout(4) << "starting python modules..." << dendl;
   py_module_registry->active_start(daemon_state, cluster_state,
-      kv_store, *monc, clog, *objecter, *client, finisher, server);
+      kv_store, *monc, clog, audit_clog, *objecter, *client,
+      finisher, server);
 
   dout(4) << "Complete." << dendl;
   initializing = false;
index d90b799f57772346683299a5f58affee13375ab5..ae4b853d15928ba33ed1713875eaf49c05736ec7 100644 (file)
@@ -54,7 +54,7 @@ MgrStandby::MgrStandby(int argc, const char **argv) :
   audit_clog(log_client.create_channel(CLOG_CHANNEL_AUDIT)),
   lock("MgrStandby::lock"),
   timer(g_ceph_context, lock),
-  py_module_registry(clog),
+  py_module_registry(clog, audit_clog),
   active_mgr(nullptr),
   orig_argc(argc),
   orig_argv(argv),
index 3e96639e8385250ff15bd2e96ddaed6d647d50e4..0f48d6bbd0e7de7132f3f66c5ce31ed0160d7eee 100644 (file)
@@ -141,7 +141,7 @@ void PyModuleRegistry::standby_start(MonClient &mc)
   dout(4) << "Starting modules in standby mode" << dendl;
 
   standby_modules.reset(new StandbyPyModules(
-        mgr_map, module_config, clog, mc));
+        mgr_map, module_config, clog, audit_clog, mc));
 
   std::set<std::string> failed_modules;
   for (const auto &i : modules) {
@@ -178,8 +178,9 @@ void PyModuleRegistry::standby_start(MonClient &mc)
 void PyModuleRegistry::active_start(
             DaemonStateIndex &ds, ClusterState &cs,
             const std::map<std::string, std::string> &kv_store,
-            MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
-            Client &client_, Finisher &f, DaemonServer &server)
+            MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
+            Objecter &objecter_, Client &client_, Finisher &f,
+            DaemonServer &server)
 {
   Mutex::Locker locker(lock);
 
@@ -198,7 +199,7 @@ void PyModuleRegistry::active_start(
 
   active_modules.reset(new ActivePyModules(
               module_config, kv_store, ds, cs, mc,
-              clog_, objecter_, client_, f, server));
+              clog_, audit_clog_, objecter_, client_, f, server));
 
   for (const auto &i : modules) {
     // Anything we're skipping because of !can_run will be flagged
index 089044433edd0a488867036f4ff52e9179d368b2..06e49b2e2ad34489eb22f9292e8cc1e5659f73ae 100644 (file)
@@ -38,7 +38,7 @@ class PyModuleRegistry
 {
 private:
   mutable Mutex lock{"PyModuleRegistry::lock"};
-  LogChannelRef clog;
+  LogChannelRef clog, audit_clog;
 
   std::map<std::string, PyModuleRef> modules;
 
@@ -76,8 +76,8 @@ public:
     return modules_out;
   }
 
-  explicit PyModuleRegistry(LogChannelRef clog_)
-    : clog(clog_)
+  explicit PyModuleRegistry(LogChannelRef clog_, LogChannelRef audit_clog_)
+    : clog(clog_), audit_clog(audit_clog_)
   {}
 
   /**
@@ -94,8 +94,9 @@ public:
   void active_start(
                 DaemonStateIndex &ds, ClusterState &cs,
                 const std::map<std::string, std::string> &kv_store,
-                MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
-                Client &client_, Finisher &f, DaemonServer &server);
+                MonClient &mc, LogChannelRef clog_, LogChannelRef audit_clog_,
+                Objecter &objecter_, Client &client_, Finisher &f,
+                DaemonServer &server);
   void standby_start(MonClient &mc);
 
   bool is_standby_running() const
index 403c8a9f183f30c4972e9c4d93d4f44229fb09cc..b6f7f07853fecb10550e7b6ca7dc62c0ea769162 100644 (file)
@@ -99,6 +99,16 @@ void PyModuleRunner::log(int level, const std::string &record)
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 }
 
+void PyModuleRunner::cluster_log(const std::string &channel, clog_type prio,
+    const std::string &message)
+{
+    if (std::string(channel) == "audit") {
+        audit_clog->do_log(prio, message);
+    } else {
+        clog->do_log(prio, message);
+    }
+}
+
 void* PyModuleRunner::PyModuleRunnerThread::entry()
 {
   // No need to acquire the GIL here; the module does it.
index fb58bf0c2323f1a0920d8034d80fce261068cb9c..550f544d196ebf51983345bb2e60fb472a902660 100644 (file)
@@ -33,7 +33,7 @@ protected:
   // Populated by descendent class
   PyObject *pClassInstance = nullptr;
 
-  LogChannelRef clog;
+  LogChannelRef clog, audit_clog;
 
   class PyModuleRunnerThread : public Thread
   {
@@ -52,6 +52,8 @@ public:
   int serve();
   void shutdown();
   void log(int level, const std::string &record);
+  void cluster_log(const std::string &channel, clog_type prio,
+    const std::string &message);
 
   const char *get_thread_name() const
   {
@@ -60,10 +62,12 @@ public:
 
   PyModuleRunner(
       const PyModuleRef &py_module_,
-      LogChannelRef clog_)
+      LogChannelRef clog_,
+      LogChannelRef audit_clog_)
     : 
       py_module(py_module_),
       clog(clog_),
+      audit_clog(audit_clog_),
       thread(this)
   {
     // Shortened name for use as thread name, because thread names
index f2d1eeb45b8d4a01ba6cf65d0f6ba4f9d7ee864b..259d70fe0b12c80571528b3b4d3533178af0daa1 100644 (file)
@@ -36,9 +36,11 @@ StandbyPyModules::StandbyPyModules(
     const MgrMap &mgr_map_,
     PyModuleConfig &module_config,
     LogChannelRef clog_,
+    LogChannelRef audit_clog_,
     MonClient &monc_)
     : state(module_config, monc_),
-      clog(clog_)
+      clog(clog_),
+      audit_clog(audit_clog_)
 {
   state.set_mgr_map(mgr_map_);
 }
@@ -81,7 +83,7 @@ int StandbyPyModules::start_one(PyModuleRef py_module)
 
   modules[module_name].reset(new StandbyPyModule(
       state,
-      py_module, clog));
+      py_module, clog, audit_clog));
 
   int r = modules[module_name]->load();
   if (r != 0) {
index f443ba67e7947ebeda2ceeb5ae00e279398c048e..28abc2593ac88b59bb8ff509922000520cb99c3a 100644 (file)
@@ -81,9 +81,10 @@ class StandbyPyModule : public PyModuleRunner
   StandbyPyModule(
       StandbyPyModuleState &state_,
       const PyModuleRef &py_module_,
-      LogChannelRef clog_)
+      LogChannelRef clog_,
+      LogChannelRef audit_clog_)
     :
-      PyModuleRunner(py_module_, clog_),
+      PyModuleRunner(py_module_, clog_, audit_clog_),
       state(state_)
   {
   }
@@ -103,7 +104,7 @@ private:
 
   StandbyPyModuleState state;
 
-  LogChannelRef clog;
+  LogChannelRef clog, audit_clog;
 
 public:
 
@@ -111,6 +112,7 @@ public:
       const MgrMap &mgr_map_,
       PyModuleConfig &module_config,
       LogChannelRef clog_,
+      LogChannelRef audit_clog_,
       MonClient &monc);
 
   int start_one(PyModuleRef py_module);
index c920df1a1027fbc7e6221643659a12e4385298b4..cc5079c5ad9f0288ed79089f9ca125396b1963cb 100644 (file)
@@ -1,7 +1,6 @@
 
 import ceph_module  # noqa
 
-import json
 import logging
 import six
 import threading
@@ -279,7 +278,14 @@ class MgrModule(ceph_module.BaseMgrModule):
     # units supported
     BYTES = 0
     NONE = 1
-    
+
+    # Cluster log priorities
+    CLUSTER_LOG_PRIO_DEBUG = 0
+    CLUSTER_LOG_PRIO_INFO = 1
+    CLUSTER_LOG_PRIO_SEC = 2
+    CLUSTER_LOG_PRIO_WARN = 3
+    CLUSTER_LOG_PRIO_ERROR = 4
+
     def __init__(self, module_name, py_modules_ptr, this_ptr):
         self.module_name = module_name
 
@@ -304,6 +310,20 @@ class MgrModule(ceph_module.BaseMgrModule):
     def log(self):
         return self._logger
 
+    def cluster_log(self, channel, priority, message):
+        """
+        :param channel: The log channel. This can be 'cluster', 'audit', ...
+        :type channel: str
+        :param priority: The log message priority. This can be
+                         CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
+                         CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
+                         CLUSTER_LOG_PRIO_ERROR.
+        :type priority: int
+        :param message: The message to log.
+        :type message: str
+        """
+        self._ceph_cluster_log(channel, priority, message)
+
     @property
     def version(self):
         return self._version
index 3f48ad180eb8338ee83576edd6bf7bd2c168ef2d..e29fe639d55a4c9b9cfa29700056dbbdd7d8e56f 100644 (file)
@@ -86,6 +86,13 @@ class Module(MgrModule):
                 "desc": "Set the now time for the insights module.",
                 "perm": "rw"
             },
+            {
+                "cmd": "mgr self-test cluster-log name=channel,type=CephString "
+                       "name=priority,type=CephString "
+                       "name=message,type=CephString",
+                "desc": "Create an audit log record.",
+                "perm": "rw"
+            },
             ]
 
     def __init__(self, *args, **kwargs):
@@ -136,6 +143,17 @@ class Module(MgrModule):
             return self._health_clear(inbuf, command)
         elif command['prefix'] == 'mgr self-test insights_set_now_offset':
             return self._insights_set_now_offset(inbuf, command)
+        elif command['prefix'] == 'mgr self-test cluster-log':
+            priority_map = {
+                'info': self.CLUSTER_LOG_PRIO_INFO,
+                'security': self.CLUSTER_LOG_PRIO_SEC,
+                'warning': self.CLUSTER_LOG_PRIO_WARN,
+                'error': self.CLUSTER_LOG_PRIO_ERROR
+            }
+            self.cluster_log(command['channel'],
+                             priority_map[command['priority']],
+                             command['message'])
+            return 0, '', 'Successfully called'
         else:
             return (-errno.EINVAL, '',
                     "Command not found '{0}'".format(command['prefix']))