]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/k8sevents: Add support for remote kubernetes 30482/head
authorPaul Cuzner <pcuzner@redhat.com>
Fri, 20 Sep 2019 04:55:06 +0000 (16:55 +1200)
committerPaul Cuzner <pcuzner@redhat.com>
Thu, 3 Oct 2019 20:26:23 +0000 (09:26 +1300)
The initial implementation sent events to the
kubernetes cluster Ceph is running under managed
by rook-ceph. This patch extends this support to
include sending events to an external kubernetes
cluster, that may just be consuming ceph resources

Additional docs added to help people use the
module, either with or without rook-ceph.

It also address a bug where the module was loaded
outside of a rook-ceph environment.

Fixes: https://tracker.ceph.com/issues/41737
Signed-off-by: Paul Cuzner <pcuzner@redhat.com>
src/pybind/mgr/k8sevents/README.md [new file with mode: 0644]
src/pybind/mgr/k8sevents/module.py
src/pybind/mgr/k8sevents/rbac_sample.yaml [new file with mode: 0644]
src/pybind/mgr/mgr_util.py

diff --git a/src/pybind/mgr/k8sevents/README.md b/src/pybind/mgr/k8sevents/README.md
new file mode 100644 (file)
index 0000000..7398096
--- /dev/null
@@ -0,0 +1,81 @@
+# Testing
+
+## To test the k8sevents module  
+enable the module with `ceph mgr module enable k8sevents`  
+check that it's working `ceph k8sevents status`, you should see something like this;  
+```
+[root@ceph-mgr ~]# ceph k8sevents status
+Kubernetes
+- Hostname : https://localhost:30443
+- Namespace: ceph
+Tracker Health
+- EventProcessor : OK
+- CephConfigWatcher : OK
+- NamespaceWatcher : OK
+Tracked Events
+- namespace  :   5
+- ceph events:   0
+
+```  
+Now run some commands to generate healthchecks and admin level events;  
+- ```ceph osd set noout```
+- ```ceph osd unset noout```
+- ```ceph osd pool create mypool 4 4 replicated```
+- ```ceph osd pool delete mypool mypool --yes-i-really-really-mean-it```  
+
+In addition to tracking audit, healthchecks and configuration changes if you have the environment up for >1 hr you should also see and event that shows the clusters health and configuration overview.
+
+As well as status, you can use k8sevents to see event activity in the target kubernetes namespace
+```
+[root@rhcs4-3 kube]# ceph k8sevents ls 
+Last Seen (UTC)       Type      Count  Message                                              Event Object Name
+2019/09/20 04:33:00   Normal        1  Pool 'mypool' has been removed from the cluster      mgr.ConfigurationChangeql2hj
+2019/09/20 04:32:55   Normal        1  Client 'client.admin' issued: ceph osd pool delete   mgr.audit.osd_pool_delete_
+2019/09/20 04:13:23   Normal        2  Client 'mds.rhcs4-2' issued: ceph osd blacklist      mgr.audit.osd_blacklist_
+2019/09/20 04:08:28   Normal        1  Ceph log -> event tracking started                   mgr.k8sevents-moduleq74k7
+Total :   4
+```  
+or, focus on the ceph specific events(audit & healthcheck) that are being tracked by the k8sevents module.
+```
+[root@rhcs4-3 kube]# ceph k8sevents ceph
+Last Seen (UTC)       Type      Count  Message                                              Event Object Name
+2019/09/20 04:32:55   Normal        1  Client 'client.admin' issued: ceph osd pool delete   mgr.audit.osd_pool_delete_
+2019/09/20 04:13:23   Normal        2  Client 'mds.rhcs4-2' issued: ceph osd blacklist      mgr.audit.osd_blacklist_
+Total :   2
+```
+
+## Sending events from a standalone Ceph cluster to remote Kubernetes cluster
+To test interaction from a standalone ceph cluster to a kubernetes environment, you need to make changes on the kubernetes cluster **and** on one of the mgr hosts.
+### kubernetes (minikube)
+We need some basic RBAC in place to define a serviceaccount(and token) that we can use to push events into kubernetes. The `rbac_sample.yaml` file provides a quick means to create the required resources. Create them with `kubectl create -f rbac_sample.yaml`
+  
+Once the resources are defined inside kubernetes, we need a couple of things copied over to the Ceph mgr's filesystem.
+### ceph admin host
+We need to run some commands against the cluster, so you'll needs access to a ceph admin host. If you don't have a dedicated admin host, you can use a mon or mgr machine. We'll need the root ca.crt of the kubernetes API, and the token associated with the service account we're using to access the kubernetes API.  
+
+1. Download/fetch the root ca.crt for the kubernetes cluster (on minikube this can be found at ~/minikube/ca.crt)
+2. Copy the ca.crt to your ceph admin host
+3. Extract the token from the service account we're going to use
+```
+kubectl -n ceph get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='ceph-mgr')].data.token}"|base64 -d > mytoken
+```  
+4. Copy the token to your ceph admin host
+5. On the ceph admin host, enable the module with `ceph mgr module enable k8sevents`
+6. Set up the configuration
+```
+ceph k8sevents set-access cacrt -i <path to ca.crt file>
+ceph k8sevents set-access token -i <path to mytoken>
+ceph k8sevents set-config server https://<kubernetes api host>:<api_port>
+ceph k8sevents set-config namespace ceph
+```
+7. Restart the module with `ceph mgr module disable k8sevents && ceph mgr module enable k8sevents`
+8. Check state with the `ceph k8sevents status` command
+9. Remove the ca.crt and mytoken files from your admin host
+
+To remove the configuration keys used for external kubernetes access, run the following command
+```
+ceph k8sevents clear-config  
+```
+
+## Networking
+You can use the above approach with a minikube based target from a standalone ceph cluster, but you'll need to have a tunnel/routing defined from the mgr host(s) to the minikube machine to make the kubernetes API accessible to the mgr/k8sevents module. This can just be a simple ssh tunnel.    
index 88a28d5d354e2f51ab800bb99364f1daa76c959d..f5f2042fd00f6cfc92fd5b49a88dc87f635ff39c 100644 (file)
 
 
 import os
+import re
 import sys
 import time
 import json
+import yaml
+import errno
+import socket
+import base64
 import logging
+import tempfile
 import threading
 
-from datetime import datetime, timedelta, timezone
-from urllib3.exceptions import MaxRetryError
+try:
+    # python 3 
+    from urllib.parse import urlparse
+except ImportError:
+    # python 2 fallback
+    from urlparse import urlparse
+
+from datetime import tzinfo, datetime, timedelta
+   
+from urllib3.exceptions import MaxRetryError,ProtocolError
 from collections import OrderedDict
 
 import rados
 from mgr_module import MgrModule
+from mgr_util import verify_cacrt, ServerConfigException
 
 try:
     import queue
@@ -50,10 +65,9 @@ except ImportError:
     kubernetes_imported = False
     client = None
     config = None
+    watch = None
 else:
     kubernetes_imported = True
-    # Apply the kubernetes config from the cluster 
-    config.load_incluster_config()
 
     # The watch.Watch.stream method can provide event objects that have involved_object = None
     # which causes an exception in the generator. A workaround is discussed for a similar issue
@@ -68,11 +82,50 @@ else:
 
 log = logging.getLogger(__name__)
 
+# use a simple local class to represent UTC
+# datetime pkg modules vary between python2 and 3 and pytz is not available on older
+# ceph container images, so taking a pragmatic approach!
+class UTC(tzinfo):
+    def utcoffset(self, dt):
+        return timedelta(0)
+
+    def tzname(self, dt):
+        return "UTC"
+
+    def dst(self, dt):
+        return timedelta(0)
+
+
 def text_suffix(num):
     """Define a text suffix based on a value i.e. turn host into hosts"""
     return '' if num == 1 else 's'
 
 
+def create_temp_file(fname, content, suffix=".tmp"):
+    """Create a temp file 
+
+    Attempt to create an temporary file containing the given content
+
+    Returns:
+    str .. full path to the temporary file
+
+    Raises:
+    OSError: problems creating the file
+
+    """
+
+    if content is not None:
+        file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
+
+        try:
+            with open(file_name, "w") as f:
+                f.write(content)
+        except OSError as e:
+            raise OSError("Unable to create temporary file : {}".format(str(e)))
+
+    return file_name
+
+
 class HealthCheck(object):
     """Transform a healthcheck msg into it's component parts"""
 
@@ -128,7 +181,11 @@ class LogEntry(object):
 
     
     def __str__(self):
-        return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,self.msg_type,self.msg,self.level,self.tstamp)
+        return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
+                                                                            self.msg_type,
+                                                                            self.msg,
+                                                                            self.level,
+                                                                            self.tstamp)
 
     @property
     def cmd(self):
@@ -186,14 +243,6 @@ class LogEntry(object):
         else:
             return self.msg
 
-class RookCeph(object):
-    """Establish environment defaults when interacting with rook-ceph"""
-
-    pod_name = os.environ['POD_NAME']
-    host = os.environ['NODE_NAME']
-    namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
-    cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', 'rook-ceph')
-    api = client.CoreV1Api()
 
 class BaseThread(threading.Thread):
     health = 'OK'
@@ -201,7 +250,7 @@ class BaseThread(threading.Thread):
     daemon = True
 
 
-class NamespaceWatcher(RookCeph, BaseThread):
+class NamespaceWatcher(BaseThread):
     """Watch events in a given namespace 
     
     Using the watch package we can listen to event traffic in the namespace to 
@@ -210,10 +259,16 @@ class NamespaceWatcher(RookCeph, BaseThread):
     so this stream will only really show activity inside this retention window.
     """
 
-    def __init__(self, namespace=None):
+    def __init__(self, api_client_config, namespace=None):
         super(NamespaceWatcher, self).__init__()
-        if namespace:                       # override the default
-            self.namespace = namespace
+
+        if api_client_config:
+            self.api = client.CoreV1Api(api_client_config)
+        else:
+            self.api = client.CoreV1Api()
+
+        self.namespace = namespace
+
         self.events = OrderedDict()
         self.lock = threading.Lock()
         self.active = None
@@ -266,8 +321,8 @@ class NamespaceWatcher(RookCeph, BaseThread):
                 # Attribute error is generated when urllib3 on the system is old and doesn't have a
                 # read_chunked method
                 except AttributeError as e:
-                    self.health = ("Unable to 'watch' events API in namespace {} - "
-                                "incompatible urllib3? ({})".format(self.namespace, e))
+                    self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
+                                "urllib3 too old? ({})".format(self.namespace, e))
                     self.active = False
                     log.warning(self.health)
                     break
@@ -275,7 +330,7 @@ class NamespaceWatcher(RookCeph, BaseThread):
                 except ApiException as e:
                     # refresh the resource_version & watcher
                     log.warning("API exception caught in watcher ({})".format(e))
-                    log.info("Restarting namespace watcher")
+                    log.warning("Restarting namespace watcher")
                     self.fetch()
 
                 except Exception:
@@ -290,10 +345,16 @@ class NamespaceWatcher(RookCeph, BaseThread):
             log.warning("Namespace event watcher stopped")
 
 
-class KubernetesEvent(RookCeph):
+class KubernetesEvent(object):
+
+    def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
+
+        if api_client_config:
+            self.api = client.CoreV1Api(api_client_config)
+        else:
+            self.api = client.CoreV1Api()
 
-    def __init__(self, log_entry, unique_name=True):
-        super(KubernetesEvent, self).__init__()
+        self.namespace = namespace
 
         self.event_name = log_entry.event_name
         self.message = log_entry.event_msg
@@ -301,6 +362,8 @@ class KubernetesEvent(RookCeph):
         self.event_reason = log_entry.event_reason
         self.unique_name = unique_name
 
+        self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
+
         self.api_status = 200
         self.count = 1
         self.first_timestamp = None
@@ -341,19 +404,26 @@ class KubernetesEvent(RookCeph):
 
     def write(self):
 
-        now=datetime.now(timezone.utc)
+        now=datetime.now(UTC())
 
         self.first_timestamp = now
         self.last_timestamp = now
 
         try:
             self.api.create_namespaced_event(self.namespace, self.event_body)
+        except (OSError, ProtocolError):
+            # unable to reach to the API server
+            log.error("Unable to reach API server")
+            self.api_status = 400
         except MaxRetryError:
-            # k8s config has not be defined properly 
+            # k8s config has not be defined properly
+            log.error("multiple attempts to connect to the API have failed") 
             self.api_status = 403       # Forbidden
         except ApiException as e:
+            log.debug("event.write status:{}".format(e.status))
             self.api_status = e.status
             if e.status == 409:
+                log.debug("attempting event update for an existing event")
                 # 409 means the event is there already, so read it back (v1Event object returned)
                 # this could happen if the event has been created, and then the k8sevent module
                 # disabled and reenabled - i.e. the internal event tracking no longer matches k8s
@@ -409,11 +479,14 @@ class KubernetesEvent(RookCeph):
                 try:
                     self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
                 except ApiException as e:
+                    log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
                     self.api_status = e.status
                 else:
+                    log.debug("event {} patched".format(self.event_name))
                     self.api_status = 200
 
         else:
+            log.debug("event {} created successfully".format(self.event_name))
             self.api_status = 200
 
     @property
@@ -423,22 +496,28 @@ class KubernetesEvent(RookCeph):
     def update(self, log_entry):
         self.message = log_entry.event_msg
         self.event_type = log_entry.event_type
-        self.last_timestamp = datetime.now(timezone.utc)
+        self.last_timestamp = datetime.now(UTC())
         self.count += 1
+        log.debug("performing event update for {}".format(self.event_name))
 
         try:
             self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
         except ApiException as e:
+            log.error("event patch call failed: {}".format(e.status))
             if e.status == 404:
                 # tried to patch, but hit a 404. The event's TTL must have been reached, and 
                 # pruned by the kube-apiserver
+                log.debug("event not found, so attempting to create it")
                 try:
                     self.api.create_namespaced_event(self.namespace, self.event_body)
                 except ApiException as e:
+                    log.error("unable to create the event: {}".format(e.status))
                     self.api_status = e.status
                 else:
+                    log.debug("event {} created successfully".format(self.event_name))
                     self.api_status = 200
         else:
+            log.debug("event {} updated".format(self.event_name))
             self.api_status = 200 
 
 
@@ -447,11 +526,14 @@ class EventProcessor(BaseThread):
 
     can_run = True
 
-    def __init__(self, config_watcher, event_retention_days):
+    def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
         super(EventProcessor, self).__init__()
+
         self.events = dict()
         self.config_watcher = config_watcher
         self.event_retention_days = event_retention_days
+        self.api_client_config = api_client_config
+        self.namespace = namespace
 
     def startup(self):
         """Log an event to show we're active"""
@@ -464,7 +546,9 @@ class EventProcessor(BaseThread):
                 level='INF',
                 tstamp=None
             ),
-            unique_name=False
+            unique_name=False,
+            api_client_config=self.api_client_config,
+            namespace=self.namespace
         )
         
         event.write()
@@ -475,8 +559,8 @@ class EventProcessor(BaseThread):
         return self.startup()
 
     def prune_events(self):
-
-        oldest = datetime.now(timezone.utc) - timedelta(days=self.event_retention_days)
+        log.debug("prune_events - looking for old events to remove from cache")
+        oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
         local_events = dict(self.events)
 
         for event_name in sorted(local_events,
@@ -486,12 +570,12 @@ class EventProcessor(BaseThread):
                 break
             else:
                 # drop this event
-                log.debug("Removing old event : {}".format(event_name))
+                log.debug("prune_events - removing old event : {}".format(event_name))
                 del self.events[event_name]
 
     def process(self, log_object):
         
-        log.debug("K8sevents processing : {}".format(str(log_object)))
+        log.debug("log entry being processed : {}".format(str(log_object)))
 
         event_out = False
         unique_name = True
@@ -525,10 +609,13 @@ class EventProcessor(BaseThread):
             log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
 
         if event_out:
+            log.debug("k8sevents sending event to kubernetes")
             # we don't cache non-unique events like heartbeats or config changes
             if not unique_name or log_object.event_name not in self.events.keys():
                 event = KubernetesEvent(log_entry=log_object,
-                                        unique_name=unique_name)
+                                        unique_name=unique_name,
+                                        api_client_config=self.api_client_config,
+                                        namespace=self.namespace)
                 event.write()
                 log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
                 if event.api_success and unique_name:
@@ -536,7 +623,7 @@ class EventProcessor(BaseThread):
             else:
                 event = self.events[log_object.event_name]
                 event.update(log_object)
-                log.debug("Event update ended : {}".format(event.api_status))
+                log.debug("event update ended : {}".format(event.api_status))
 
             self.prune_events()
 
@@ -544,7 +631,8 @@ class EventProcessor(BaseThread):
             log.debug("K8sevents ignored message : {}".format(log_object.msg))
 
     def run(self):
-        log.info("Ceph event processing thread started, event retention set to {} days".format(self.event_retention_days))
+        log.info("Ceph event processing thread started, "
+                 "event retention set to {} days".format(self.event_retention_days))
 
         while True:
 
@@ -808,11 +896,11 @@ class CephConfigWatcher(BaseThread):
                 if size_diff != 0:
                     if size_diff < 0:
                         msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
-                                                                                            pool_map[pool_name]['size'])
+                                                                                               pool_map[pool_name]['size'])
                         level = 'WRN'
                     else:
                         msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
-                                                                                                pool_map[pool_name]['size'])
+                                                                                                 pool_map[pool_name]['size'])
                         level = 'INF'
 
                     changes.append(LogEntry(source="config",
@@ -843,7 +931,7 @@ class CephConfigWatcher(BaseThread):
 
         # FUTURE
         # Could generate an event if a ceph daemon has moved hosts
-        # (assumes the ceph metadata host information is valid - which may not be the case)
+        # (assumes the ceph metadata host information is valid though!)
 
         return changes
 
@@ -908,7 +996,7 @@ class Module(MgrModule):
             "perm": "r"
         },
         {
-            "cmd": "k8sevents list",
+            "cmd": "k8sevents ls",
             "desc": "List all current Kuberenetes events from the Ceph namespace",
             "perm": "r"
         },
@@ -916,7 +1004,22 @@ class Module(MgrModule):
             "cmd": "k8sevents ceph",
             "desc": "List Ceph events tracked & sent to the kubernetes cluster",
             "perm": "r"
-        }
+        },
+        {
+            "cmd": "k8sevents set-access name=key,type=CephString",
+            "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt",
+            "perm": "rw"
+        },
+        {
+            "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
+            "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433",
+            "perm": "rw"
+        },
+        {
+            "cmd": "k8sevents clear-config",
+            "desc": "Clear external kubernetes configuration settings",
+            "perm": "rw"
+        },
     ]
     MODULE_OPTIONS = [
         {'name': 'config_check_secs',
@@ -932,24 +1035,58 @@ class Module(MgrModule):
 
     def __init__(self, *args, **kwargs):
         self.run = True
+        self.kubernetes_control = 'POD_NAME' in os.environ
         self.event_processor = None
         self.config_watcher = None
         self.ns_watcher = None
         self.trackers = list()
+        self.error_msg = None
+        self._api_client_config = None
+        self._namespace = None
         
         # Declare the module options we accept
         self.config_check_secs = None
         self.ceph_event_retention_days = None
 
+        self.k8s_config = dict(
+            cacrt = None,
+            token = None,
+            server = None,
+            namespace = None
+        )
+
         super(Module, self).__init__(*args, **kwargs)
 
+    def k8s_ready(self):
+        """Validate the k8s_config dict 
+
+        Returns:
+         - bool .... indicating whether the config is ready to use
+         - string .. variables that need to be defined before the module will function
+
+        """
+        missing = list()
+        ready = True
+        for k in self.k8s_config:
+            if not self.k8s_config[k]:
+                missing.append(k)
+                ready = False
+        return ready, missing
+
     def config_notify(self):
-        """Apply module options to runtime"""
+        """Apply runtime module options, and defaults from the modules KV store"""
+        self.log.debug("applying runtime module option settings")
         for opt in self.MODULE_OPTIONS:
             setattr(self,
                     opt['name'],
                     self.get_module_option(opt['name']) or opt['default'])
 
+        if not self.kubernetes_control:
+            # Populate the config
+            self.log.debug("loading config from KV store")
+            for k in self.k8s_config:
+                self.k8s_config[k] = self.get_store(k, default=None)
+
     def fetch_events(self, limit=None):
         """Interface to expose current events to another mgr module"""
         # FUTURE: Implement this to provide k8s events to the dashboard?
@@ -963,6 +1100,7 @@ class Module(MgrModule):
         required_fields = ['channel', 'message', 'priority', 'stamp']
         _message_attrs = log_message.keys()
         if all(_field in _message_attrs for _field in required_fields):
+            self.log.debug("clog entry received - adding to the queue")
             if log_message.get('message').startswith('overall HEALTH'):
                 m_type = 'heartbeat'
             else:
@@ -979,7 +1117,7 @@ class Module(MgrModule):
             )
             
         else:
-            log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
+            self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
 
     def notify(self, notify_type, notify_id):
         """
@@ -995,19 +1133,20 @@ class Module(MgrModule):
                             ``from send_command``.
         """
         
-        # only interested in cluster log messages for now
+        # only interested in cluster log (clog) messages for now
         if notify_type == 'clog':
+            self.log.debug("received a clog entry from mgr.notify")
             if isinstance(notify_id, dict):
                 # create a log object to process
                 self.process_clog(notify_id)
             else:
-                log.warning("Expecting log record format of dict received {}".format(type(notify_type)))
+                self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
 
     def _show_events(self, events):
 
         max_msg_length = max([len(events[k].message) for k in events])
         fmt = "{:<20}  {:<8}  {:>5}  {:<" + str(max_msg_length) + "}  {}\n"
-        s = fmt.format("Last Seen", "Type", "Count", "Message", "Event Object Name")
+        s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
 
         for event_name in sorted(events, 
                                  key = lambda name: events[name].last_timestamp,
@@ -1034,22 +1173,146 @@ class Module(MgrModule):
             return 0, "", "No events emitted yet, local cache is empty"
 
     def show_status(self):
-        s = "Tracker Health\n"
+        s = "Kubernetes\n"
+        s += "- Hostname  : {}\n".format(self.k8s_config['server'])
+        s += "- Namespace : {}\n".format(self._namespace)
+        s += "Tracker Health\n"
         for t in self.trackers:
-            s += "- {} : {}\n".format(t.__class__.__name__, t.health)
+            s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
         s += "Tracked Events\n"
-        s += "- namespace  : {:>3}\n".format(len(self.ns_watcher.events))
-        s += "- ceph events: {:>3}\n".format(len(self.event_processor.events))
+        s += "- namespace   : {:>3}\n".format(len(self.ns_watcher.events))
+        s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
         return 0, "", s
 
+    def _valid_server(self, server):
+        # must be a valid server url format
+        server = server.strip()
+
+        res = urlparse(server)
+        port = res.netloc.split(":")[-1]
+
+        if res.scheme != 'https':
+            return False, "Server URL must use https"
+
+        elif not res.hostname:
+            return False, "Invalid server URL format"
+
+        elif res.hostname:
+            try:
+                socket.gethostbyname(res.hostname)
+            except socket.gaierror:
+                return False, "Unresolvable server URL"
+
+        if not port.isdigit():
+            return False, "Server URL must end in a port number"
+
+        return True, ""
+
+    def _valid_cacrt(self, cacrt_data):
+        """use mgr_util.verify_cacrt to validate the CA file"""
+
+        cacrt_fname = create_temp_file("ca_file", cacrt_data)
+
+        try:
+            verify_cacrt(cacrt_fname)
+        except ServerConfigException as e:
+            return False, "Invalid CA certificate: {}".format(str(e))
+        else:
+            return True, ""
+
+    def _valid_token(self, token_data):
+        """basic checks on the token"""
+        if not token_data:
+            return False, "Token file is empty"
+
+        pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
+        if not pattern.match(token_data):
+            return False, "Token contains invalid characters"
+
+        return True, ""
+
+    def _valid_namespace(self, namespace):
+        # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
+
+        if len(namespace) > 253:
+            return False, "Name too long"
+        if namespace.isdigit():
+            return False, "Invalid name - must be alphanumeric"
+
+        pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
+        if not pattern.match(namespace):
+            return False, "Invalid characters in the name"
+        
+        return True, ""
+
+    def _config_set(self, key, val):
+        """Attempt to validate the content, then save to KV store"""
+
+        val = val.rstrip()                      # remove any trailing whitespace/newline 
+
+        try:
+            checker = getattr(self, "_valid_" + key)
+        except AttributeError:
+            # no checker available, just let it pass
+            self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
+            valid = True
+        else:
+            valid, reason = checker(val)
+
+        if valid:
+            self.set_store(key, val)
+            self.log.info("Updated config KV Store item: " + key)
+            return 0, "", "Config updated for parameter '{}'".format(key)
+        else:
+            return -22, "", "Invalid value for '{}' :{}".format(key, reason)
+
+    def clear_config_settings(self):
+        for k in self.k8s_config:
+            self.set_store(k, None)
+        return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
+
     def handle_command(self, inbuf, cmd):
-        # FUTURE: Should we implement dynamic options for the monitoring?
+
+        access_options = ['cacrt', 'token']
+        config_options = ['server', 'namespace']
+
+        if cmd['prefix'] == 'k8sevents clear-config':
+            return self.clear_config_settings()
+        
+        if cmd['prefix'] == 'k8sevents set-access':
+            if cmd['key'] not in access_options:
+                return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
+
+            if inbuf:
+                return self._config_set(cmd['key'], inbuf)
+            else:
+                return -errno.EINVAL, "", "Command must specify -i <filename>"
+
+        if cmd['prefix'] == 'k8sevents set-config':
+
+            if cmd['key'] not in config_options:
+                return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
+
+            return self._config_set(cmd['key'], cmd['value'])
+
+        # At this point the command is trying to interact with k8sevents, so intercept if the configuration is 
+        # not ready
+        if self.error_msg:
+            _msg = "k8sevents unavailable: " + self.error_msg
+            ready, _ = self.k8s_ready()
+            if not self.kubernetes_control and not ready:
+                _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
+            return -errno.ENODATA, "", _msg
+
         if cmd["prefix"] == "k8sevents status":
             return self.show_status()
-        elif cmd["prefix"] == "k8sevents list":
+
+        elif cmd["prefix"] == "k8sevents ls":
             return self.show_events(self.ns_watcher.events)
+
         elif cmd["prefix"] == "k8sevents ceph":
             return self.show_events(self.event_processor.events)
+
         else:
             raise NotImplementedError(cmd["prefix"])
 
@@ -1058,16 +1321,84 @@ class Module(MgrModule):
         """Determine whether the pre-reqs for the module are in place"""
 
         if not kubernetes_imported:
-            return False, "kubernetes python client is unavailable"
+            return False, "kubernetes python client is not available"
         return True, ""
 
+    def load_kubernetes_config(self):
+        """Load configuration for remote kubernetes API using KV store values
+
+        Attempt to create an API client configuration from settings stored in 
+        KV store.
+
+        Returns:
+        client.ApiClient: kubernetes API client object
+
+        Raises:
+        OSError: unable to create the cacrt file
+        """
+
+        # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a 
+        # temporary file containing the cert for the client to load from
+        try:
+            ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
+        except OSError as e:
+            self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
+            raise OSError(str(e))
+        else:
+            self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
+
+        configuration = client.Configuration()
+        configuration.host = self.k8s_config['server']
+        configuration.ssl_ca_cert = ca_crt_file
+        configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
+        api_client = client.ApiClient(configuration)
+        self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
+
+        return api_client
+
     def serve(self):
+        # apply options set by CLI to this module
         self.config_notify()
 
+        if not kubernetes_imported:
+            self.error_msg = "Unable to start : python kubernetes package is missing"
+        else:
+            if self.kubernetes_control:
+                # running under rook-ceph
+                config.load_incluster_config()
+                self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
+                                                                   os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
+                self._api_client_config = None
+                self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
+            else:
+                # running outside of rook-ceph, so we need additional settings to tell us
+                # how to connect to the kubernetes cluster
+                ready, errors = self.k8s_ready()
+                if not ready:
+                    self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
+                else:
+                    try:
+                        self._api_client_config = self.load_kubernetes_config()
+                    except OSError as e:
+                        self.error_msg = str(e)
+                    else:
+                        self._namespace = self.k8s_config['namespace']
+                        self.log.info("k8sevents configuration loaded from KV store")
+
+        if self.error_msg:
+            self.log.error(self.error_msg)
+            return
+
+        # All checks have passed
         self.config_watcher = CephConfigWatcher(self)
+
         self.event_processor = EventProcessor(self.config_watcher, 
-                                              self.ceph_event_retention_days)
-        self.ns_watcher = NamespaceWatcher()
+                                              self.ceph_event_retention_days,
+                                              self._api_client_config,
+                                              self._namespace)
+
+        self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, 
+                                           namespace=self._namespace)
 
         if self.event_processor.ok:
             log.info("Ceph Log processor thread starting")
@@ -1090,7 +1421,8 @@ class Module(MgrModule):
                         t.reported = True
 
         else:
-            log.warning('Unable to access kubernetes event API - check RBAC rules')
+            self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
+            log.warning(self.error_msg)
             log.warning("k8sevents module exiting")
             self.run = False
 
diff --git a/src/pybind/mgr/k8sevents/rbac_sample.yaml b/src/pybind/mgr/k8sevents/rbac_sample.yaml
new file mode 100644 (file)
index 0000000..5639220
--- /dev/null
@@ -0,0 +1,45 @@
+---
+# Create a namespace to receive our test events
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: ceph
+---
+# Define the access rules to open the events API to k8sevents
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1beta1
+metadata:
+  name: ceph-mgr-events-rules
+rules:
+- apiGroups:
+  - ""
+  resources:
+  - events
+  verbs:
+  - create
+  - list
+  - watch
+  - patch
+  - get
+---
+# Define a service account to associate with our event stream
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: ceph-mgr
+  namespace: ceph
+---
+# Allow the ceph-mgr service account access to the events api
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1beta1
+metadata:
+  name: ceph-mgr
+  namespace: ceph
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: ceph-mgr-events-rules
+subjects:
+- kind: ServiceAccount
+  name: ceph-mgr
+  namespace: ceph
index 48ee3b3042658f044982f00df9930af5c907aad5..42e00da2aa08a20997e13e2dcbdb87b22fb28e8c 100644 (file)
@@ -114,6 +114,25 @@ def get_default_addr():
 class ServerConfigException(Exception):
     pass
 
+def verify_cacrt(cert_fname):
+    """Basic validation of a ca cert"""
+
+    if not cert_fname:
+        raise ServerConfigException("CA cert not configured")
+    if not os.path.isfile(cert_fname):
+        raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
+
+    from OpenSSL import crypto
+    try:
+        with open(cert_fname) as f:
+            x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
+            if x509.has_expired():
+                logger.warning(
+                    'Certificate {} has expired'.format(cert_fname))
+    except (ValueError, crypto.Error) as e:
+        raise ServerConfigException(
+            'Invalid certificate {}: {}'.format(cert_fname, str(e)))
+
 
 def verify_tls_files(cert_fname, pkey_fname):
     """Basic checks for TLS certificate and key files
@@ -133,21 +152,14 @@ def verify_tls_files(cert_fname, pkey_fname):
 
     if not cert_fname or not pkey_fname:
         raise ServerConfigException('no certificate configured')
-    if not os.path.isfile(cert_fname):
-        raise ServerConfigException('certificate %s does not exist' % cert_fname)
+
+    verify_cacrt(cert_fname)
+
     if not os.path.isfile(pkey_fname):
         raise ServerConfigException('private key %s does not exist' % pkey_fname)
 
     from OpenSSL import crypto, SSL
-    try:
-        with open(cert_fname) as f:
-            x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
-            if x509.has_expired():
-                logger.warning(
-                    'Certificate {} has been expired'.format(cert_fname))
-    except (ValueError, crypto.Error) as e:
-        raise ServerConfigException(
-            'Invalid certificate {}: {}'.format(cert_fname, str(e)))
+
     try:
         with open(pkey_fname) as f:
             pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())