From 56587118782f4365f7d2464f26a08731e22379c3 Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Fri, 20 Sep 2019 16:55:06 +1200 Subject: [PATCH] mgr/k8sevents: Add support for remote kubernetes The initial implementation sent events to the kubernetes cluster Ceph is running under managed by rook-ceph. This patch extends this support to include sending events to an external kubernetes cluster, that may just be consuming ceph resources Additional docs added to help people use the module, either with or without rook-ceph. It also address a bug where the module was loaded outside of a rook-ceph environment. Fixes: https://tracker.ceph.com/issues/41737 Signed-off-by: Paul Cuzner --- src/pybind/mgr/k8sevents/README.md | 81 ++++ src/pybind/mgr/k8sevents/module.py | 442 +++++++++++++++++++--- src/pybind/mgr/k8sevents/rbac_sample.yaml | 45 +++ src/pybind/mgr/mgr_util.py | 34 +- 4 files changed, 536 insertions(+), 66 deletions(-) create mode 100644 src/pybind/mgr/k8sevents/README.md create mode 100644 src/pybind/mgr/k8sevents/rbac_sample.yaml diff --git a/src/pybind/mgr/k8sevents/README.md b/src/pybind/mgr/k8sevents/README.md new file mode 100644 index 00000000000..7398096ea59 --- /dev/null +++ b/src/pybind/mgr/k8sevents/README.md @@ -0,0 +1,81 @@ +# Testing + +## To test the k8sevents module +enable the module with `ceph mgr module enable k8sevents` +check that it's working `ceph k8sevents status`, you should see something like this; +``` +[root@ceph-mgr ~]# ceph k8sevents status +Kubernetes +- Hostname : https://localhost:30443 +- Namespace: ceph +Tracker Health +- EventProcessor : OK +- CephConfigWatcher : OK +- NamespaceWatcher : OK +Tracked Events +- namespace : 5 +- ceph events: 0 + +``` +Now run some commands to generate healthchecks and admin level events; +- ```ceph osd set noout``` +- ```ceph osd unset noout``` +- ```ceph osd pool create mypool 4 4 replicated``` +- ```ceph osd pool delete mypool mypool --yes-i-really-really-mean-it``` + +In addition to tracking audit, healthchecks and configuration changes if you have the environment up for >1 hr you should also see and event that shows the clusters health and configuration overview. + +As well as status, you can use k8sevents to see event activity in the target kubernetes namespace +``` +[root@rhcs4-3 kube]# ceph k8sevents ls +Last Seen (UTC) Type Count Message Event Object Name +2019/09/20 04:33:00 Normal 1 Pool 'mypool' has been removed from the cluster mgr.ConfigurationChangeql2hj +2019/09/20 04:32:55 Normal 1 Client 'client.admin' issued: ceph osd pool delete mgr.audit.osd_pool_delete_ +2019/09/20 04:13:23 Normal 2 Client 'mds.rhcs4-2' issued: ceph osd blacklist mgr.audit.osd_blacklist_ +2019/09/20 04:08:28 Normal 1 Ceph log -> event tracking started mgr.k8sevents-moduleq74k7 +Total : 4 +``` +or, focus on the ceph specific events(audit & healthcheck) that are being tracked by the k8sevents module. +``` +[root@rhcs4-3 kube]# ceph k8sevents ceph +Last Seen (UTC) Type Count Message Event Object Name +2019/09/20 04:32:55 Normal 1 Client 'client.admin' issued: ceph osd pool delete mgr.audit.osd_pool_delete_ +2019/09/20 04:13:23 Normal 2 Client 'mds.rhcs4-2' issued: ceph osd blacklist mgr.audit.osd_blacklist_ +Total : 2 +``` + +## Sending events from a standalone Ceph cluster to remote Kubernetes cluster +To test interaction from a standalone ceph cluster to a kubernetes environment, you need to make changes on the kubernetes cluster **and** on one of the mgr hosts. +### kubernetes (minikube) +We need some basic RBAC in place to define a serviceaccount(and token) that we can use to push events into kubernetes. The `rbac_sample.yaml` file provides a quick means to create the required resources. Create them with `kubectl create -f rbac_sample.yaml` + +Once the resources are defined inside kubernetes, we need a couple of things copied over to the Ceph mgr's filesystem. +### ceph admin host +We need to run some commands against the cluster, so you'll needs access to a ceph admin host. If you don't have a dedicated admin host, you can use a mon or mgr machine. We'll need the root ca.crt of the kubernetes API, and the token associated with the service account we're using to access the kubernetes API. + +1. Download/fetch the root ca.crt for the kubernetes cluster (on minikube this can be found at ~/minikube/ca.crt) +2. Copy the ca.crt to your ceph admin host +3. Extract the token from the service account we're going to use +``` +kubectl -n ceph get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='ceph-mgr')].data.token}"|base64 -d > mytoken +``` +4. Copy the token to your ceph admin host +5. On the ceph admin host, enable the module with `ceph mgr module enable k8sevents` +6. Set up the configuration +``` +ceph k8sevents set-access cacrt -i +ceph k8sevents set-access token -i +ceph k8sevents set-config server https://: +ceph k8sevents set-config namespace ceph +``` +7. Restart the module with `ceph mgr module disable k8sevents && ceph mgr module enable k8sevents` +8. Check state with the `ceph k8sevents status` command +9. Remove the ca.crt and mytoken files from your admin host + +To remove the configuration keys used for external kubernetes access, run the following command +``` +ceph k8sevents clear-config +``` + +## Networking +You can use the above approach with a minikube based target from a standalone ceph cluster, but you'll need to have a tunnel/routing defined from the mgr host(s) to the minikube machine to make the kubernetes API accessible to the mgr/k8sevents module. This can just be a simple ssh tunnel. diff --git a/src/pybind/mgr/k8sevents/module.py b/src/pybind/mgr/k8sevents/module.py index 88a28d5d354..f5f2042fd00 100644 --- a/src/pybind/mgr/k8sevents/module.py +++ b/src/pybind/mgr/k8sevents/module.py @@ -21,18 +21,33 @@ import os +import re import sys import time import json +import yaml +import errno +import socket +import base64 import logging +import tempfile import threading -from datetime import datetime, timedelta, timezone -from urllib3.exceptions import MaxRetryError +try: + # python 3 + from urllib.parse import urlparse +except ImportError: + # python 2 fallback + from urlparse import urlparse + +from datetime import tzinfo, datetime, timedelta + +from urllib3.exceptions import MaxRetryError,ProtocolError from collections import OrderedDict import rados from mgr_module import MgrModule +from mgr_util import verify_cacrt, ServerConfigException try: import queue @@ -50,10 +65,9 @@ except ImportError: kubernetes_imported = False client = None config = None + watch = None else: kubernetes_imported = True - # Apply the kubernetes config from the cluster - config.load_incluster_config() # The watch.Watch.stream method can provide event objects that have involved_object = None # which causes an exception in the generator. A workaround is discussed for a similar issue @@ -68,11 +82,50 @@ else: log = logging.getLogger(__name__) +# use a simple local class to represent UTC +# datetime pkg modules vary between python2 and 3 and pytz is not available on older +# ceph container images, so taking a pragmatic approach! +class UTC(tzinfo): + def utcoffset(self, dt): + return timedelta(0) + + def tzname(self, dt): + return "UTC" + + def dst(self, dt): + return timedelta(0) + + def text_suffix(num): """Define a text suffix based on a value i.e. turn host into hosts""" return '' if num == 1 else 's' +def create_temp_file(fname, content, suffix=".tmp"): + """Create a temp file + + Attempt to create an temporary file containing the given content + + Returns: + str .. full path to the temporary file + + Raises: + OSError: problems creating the file + + """ + + if content is not None: + file_name = os.path.join(tempfile.gettempdir(), fname + suffix) + + try: + with open(file_name, "w") as f: + f.write(content) + except OSError as e: + raise OSError("Unable to create temporary file : {}".format(str(e))) + + return file_name + + class HealthCheck(object): """Transform a healthcheck msg into it's component parts""" @@ -128,7 +181,11 @@ class LogEntry(object): def __str__(self): - return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,self.msg_type,self.msg,self.level,self.tstamp) + return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source, + self.msg_type, + self.msg, + self.level, + self.tstamp) @property def cmd(self): @@ -186,14 +243,6 @@ class LogEntry(object): else: return self.msg -class RookCeph(object): - """Establish environment defaults when interacting with rook-ceph""" - - pod_name = os.environ['POD_NAME'] - host = os.environ['NODE_NAME'] - namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph') - cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', 'rook-ceph') - api = client.CoreV1Api() class BaseThread(threading.Thread): health = 'OK' @@ -201,7 +250,7 @@ class BaseThread(threading.Thread): daemon = True -class NamespaceWatcher(RookCeph, BaseThread): +class NamespaceWatcher(BaseThread): """Watch events in a given namespace Using the watch package we can listen to event traffic in the namespace to @@ -210,10 +259,16 @@ class NamespaceWatcher(RookCeph, BaseThread): so this stream will only really show activity inside this retention window. """ - def __init__(self, namespace=None): + def __init__(self, api_client_config, namespace=None): super(NamespaceWatcher, self).__init__() - if namespace: # override the default - self.namespace = namespace + + if api_client_config: + self.api = client.CoreV1Api(api_client_config) + else: + self.api = client.CoreV1Api() + + self.namespace = namespace + self.events = OrderedDict() self.lock = threading.Lock() self.active = None @@ -266,8 +321,8 @@ class NamespaceWatcher(RookCeph, BaseThread): # Attribute error is generated when urllib3 on the system is old and doesn't have a # read_chunked method except AttributeError as e: - self.health = ("Unable to 'watch' events API in namespace {} - " - "incompatible urllib3? ({})".format(self.namespace, e)) + self.health = ("Error: Unable to 'watch' events API in namespace '{}' - " + "urllib3 too old? ({})".format(self.namespace, e)) self.active = False log.warning(self.health) break @@ -275,7 +330,7 @@ class NamespaceWatcher(RookCeph, BaseThread): except ApiException as e: # refresh the resource_version & watcher log.warning("API exception caught in watcher ({})".format(e)) - log.info("Restarting namespace watcher") + log.warning("Restarting namespace watcher") self.fetch() except Exception: @@ -290,10 +345,16 @@ class NamespaceWatcher(RookCeph, BaseThread): log.warning("Namespace event watcher stopped") -class KubernetesEvent(RookCeph): +class KubernetesEvent(object): + + def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None): + + if api_client_config: + self.api = client.CoreV1Api(api_client_config) + else: + self.api = client.CoreV1Api() - def __init__(self, log_entry, unique_name=True): - super(KubernetesEvent, self).__init__() + self.namespace = namespace self.event_name = log_entry.event_name self.message = log_entry.event_msg @@ -301,6 +362,8 @@ class KubernetesEvent(RookCeph): self.event_reason = log_entry.event_reason self.unique_name = unique_name + self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN')) + self.api_status = 200 self.count = 1 self.first_timestamp = None @@ -341,19 +404,26 @@ class KubernetesEvent(RookCeph): def write(self): - now=datetime.now(timezone.utc) + now=datetime.now(UTC()) self.first_timestamp = now self.last_timestamp = now try: self.api.create_namespaced_event(self.namespace, self.event_body) + except (OSError, ProtocolError): + # unable to reach to the API server + log.error("Unable to reach API server") + self.api_status = 400 except MaxRetryError: - # k8s config has not be defined properly + # k8s config has not be defined properly + log.error("multiple attempts to connect to the API have failed") self.api_status = 403 # Forbidden except ApiException as e: + log.debug("event.write status:{}".format(e.status)) self.api_status = e.status if e.status == 409: + log.debug("attempting event update for an existing event") # 409 means the event is there already, so read it back (v1Event object returned) # this could happen if the event has been created, and then the k8sevent module # disabled and reenabled - i.e. the internal event tracking no longer matches k8s @@ -409,11 +479,14 @@ class KubernetesEvent(RookCeph): try: self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) except ApiException as e: + log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status)) self.api_status = e.status else: + log.debug("event {} patched".format(self.event_name)) self.api_status = 200 else: + log.debug("event {} created successfully".format(self.event_name)) self.api_status = 200 @property @@ -423,22 +496,28 @@ class KubernetesEvent(RookCeph): def update(self, log_entry): self.message = log_entry.event_msg self.event_type = log_entry.event_type - self.last_timestamp = datetime.now(timezone.utc) + self.last_timestamp = datetime.now(UTC()) self.count += 1 + log.debug("performing event update for {}".format(self.event_name)) try: self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body) except ApiException as e: + log.error("event patch call failed: {}".format(e.status)) if e.status == 404: # tried to patch, but hit a 404. The event's TTL must have been reached, and # pruned by the kube-apiserver + log.debug("event not found, so attempting to create it") try: self.api.create_namespaced_event(self.namespace, self.event_body) except ApiException as e: + log.error("unable to create the event: {}".format(e.status)) self.api_status = e.status else: + log.debug("event {} created successfully".format(self.event_name)) self.api_status = 200 else: + log.debug("event {} updated".format(self.event_name)) self.api_status = 200 @@ -447,11 +526,14 @@ class EventProcessor(BaseThread): can_run = True - def __init__(self, config_watcher, event_retention_days): + def __init__(self, config_watcher, event_retention_days, api_client_config, namespace): super(EventProcessor, self).__init__() + self.events = dict() self.config_watcher = config_watcher self.event_retention_days = event_retention_days + self.api_client_config = api_client_config + self.namespace = namespace def startup(self): """Log an event to show we're active""" @@ -464,7 +546,9 @@ class EventProcessor(BaseThread): level='INF', tstamp=None ), - unique_name=False + unique_name=False, + api_client_config=self.api_client_config, + namespace=self.namespace ) event.write() @@ -475,8 +559,8 @@ class EventProcessor(BaseThread): return self.startup() def prune_events(self): - - oldest = datetime.now(timezone.utc) - timedelta(days=self.event_retention_days) + log.debug("prune_events - looking for old events to remove from cache") + oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days) local_events = dict(self.events) for event_name in sorted(local_events, @@ -486,12 +570,12 @@ class EventProcessor(BaseThread): break else: # drop this event - log.debug("Removing old event : {}".format(event_name)) + log.debug("prune_events - removing old event : {}".format(event_name)) del self.events[event_name] def process(self, log_object): - log.debug("K8sevents processing : {}".format(str(log_object))) + log.debug("log entry being processed : {}".format(str(log_object))) event_out = False unique_name = True @@ -525,10 +609,13 @@ class EventProcessor(BaseThread): log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type)) if event_out: + log.debug("k8sevents sending event to kubernetes") # we don't cache non-unique events like heartbeats or config changes if not unique_name or log_object.event_name not in self.events.keys(): event = KubernetesEvent(log_entry=log_object, - unique_name=unique_name) + unique_name=unique_name, + api_client_config=self.api_client_config, + namespace=self.namespace) event.write() log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status)) if event.api_success and unique_name: @@ -536,7 +623,7 @@ class EventProcessor(BaseThread): else: event = self.events[log_object.event_name] event.update(log_object) - log.debug("Event update ended : {}".format(event.api_status)) + log.debug("event update ended : {}".format(event.api_status)) self.prune_events() @@ -544,7 +631,8 @@ class EventProcessor(BaseThread): log.debug("K8sevents ignored message : {}".format(log_object.msg)) def run(self): - log.info("Ceph event processing thread started, event retention set to {} days".format(self.event_retention_days)) + log.info("Ceph event processing thread started, " + "event retention set to {} days".format(self.event_retention_days)) while True: @@ -808,11 +896,11 @@ class CephConfigWatcher(BaseThread): if size_diff != 0: if size_diff < 0: msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name, - pool_map[pool_name]['size']) + pool_map[pool_name]['size']) level = 'WRN' else: msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name, - pool_map[pool_name]['size']) + pool_map[pool_name]['size']) level = 'INF' changes.append(LogEntry(source="config", @@ -843,7 +931,7 @@ class CephConfigWatcher(BaseThread): # FUTURE # Could generate an event if a ceph daemon has moved hosts - # (assumes the ceph metadata host information is valid - which may not be the case) + # (assumes the ceph metadata host information is valid though!) return changes @@ -908,7 +996,7 @@ class Module(MgrModule): "perm": "r" }, { - "cmd": "k8sevents list", + "cmd": "k8sevents ls", "desc": "List all current Kuberenetes events from the Ceph namespace", "perm": "r" }, @@ -916,7 +1004,22 @@ class Module(MgrModule): "cmd": "k8sevents ceph", "desc": "List Ceph events tracked & sent to the kubernetes cluster", "perm": "r" - } + }, + { + "cmd": "k8sevents set-access name=key,type=CephString", + "desc": "Set kubernetes access credentials. must be cacrt or token and use -i syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt", + "perm": "rw" + }, + { + "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString", + "desc": "Set kubernetes config paramters. must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433", + "perm": "rw" + }, + { + "cmd": "k8sevents clear-config", + "desc": "Clear external kubernetes configuration settings", + "perm": "rw" + }, ] MODULE_OPTIONS = [ {'name': 'config_check_secs', @@ -932,24 +1035,58 @@ class Module(MgrModule): def __init__(self, *args, **kwargs): self.run = True + self.kubernetes_control = 'POD_NAME' in os.environ self.event_processor = None self.config_watcher = None self.ns_watcher = None self.trackers = list() + self.error_msg = None + self._api_client_config = None + self._namespace = None # Declare the module options we accept self.config_check_secs = None self.ceph_event_retention_days = None + self.k8s_config = dict( + cacrt = None, + token = None, + server = None, + namespace = None + ) + super(Module, self).__init__(*args, **kwargs) + def k8s_ready(self): + """Validate the k8s_config dict + + Returns: + - bool .... indicating whether the config is ready to use + - string .. variables that need to be defined before the module will function + + """ + missing = list() + ready = True + for k in self.k8s_config: + if not self.k8s_config[k]: + missing.append(k) + ready = False + return ready, missing + def config_notify(self): - """Apply module options to runtime""" + """Apply runtime module options, and defaults from the modules KV store""" + self.log.debug("applying runtime module option settings") for opt in self.MODULE_OPTIONS: setattr(self, opt['name'], self.get_module_option(opt['name']) or opt['default']) + if not self.kubernetes_control: + # Populate the config + self.log.debug("loading config from KV store") + for k in self.k8s_config: + self.k8s_config[k] = self.get_store(k, default=None) + def fetch_events(self, limit=None): """Interface to expose current events to another mgr module""" # FUTURE: Implement this to provide k8s events to the dashboard? @@ -963,6 +1100,7 @@ class Module(MgrModule): required_fields = ['channel', 'message', 'priority', 'stamp'] _message_attrs = log_message.keys() if all(_field in _message_attrs for _field in required_fields): + self.log.debug("clog entry received - adding to the queue") if log_message.get('message').startswith('overall HEALTH'): m_type = 'heartbeat' else: @@ -979,7 +1117,7 @@ class Module(MgrModule): ) else: - log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) + self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message)) def notify(self, notify_type, notify_id): """ @@ -995,19 +1133,20 @@ class Module(MgrModule): ``from send_command``. """ - # only interested in cluster log messages for now + # only interested in cluster log (clog) messages for now if notify_type == 'clog': + self.log.debug("received a clog entry from mgr.notify") if isinstance(notify_id, dict): # create a log object to process self.process_clog(notify_id) else: - log.warning("Expecting log record format of dict received {}".format(type(notify_type))) + self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type))) def _show_events(self, events): max_msg_length = max([len(events[k].message) for k in events]) fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n" - s = fmt.format("Last Seen", "Type", "Count", "Message", "Event Object Name") + s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name") for event_name in sorted(events, key = lambda name: events[name].last_timestamp, @@ -1034,22 +1173,146 @@ class Module(MgrModule): return 0, "", "No events emitted yet, local cache is empty" def show_status(self): - s = "Tracker Health\n" + s = "Kubernetes\n" + s += "- Hostname : {}\n".format(self.k8s_config['server']) + s += "- Namespace : {}\n".format(self._namespace) + s += "Tracker Health\n" for t in self.trackers: - s += "- {} : {}\n".format(t.__class__.__name__, t.health) + s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health) s += "Tracked Events\n" - s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) - s += "- ceph events: {:>3}\n".format(len(self.event_processor.events)) + s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events)) + s += "- ceph events : {:>3}\n".format(len(self.event_processor.events)) return 0, "", s + def _valid_server(self, server): + # must be a valid server url format + server = server.strip() + + res = urlparse(server) + port = res.netloc.split(":")[-1] + + if res.scheme != 'https': + return False, "Server URL must use https" + + elif not res.hostname: + return False, "Invalid server URL format" + + elif res.hostname: + try: + socket.gethostbyname(res.hostname) + except socket.gaierror: + return False, "Unresolvable server URL" + + if not port.isdigit(): + return False, "Server URL must end in a port number" + + return True, "" + + def _valid_cacrt(self, cacrt_data): + """use mgr_util.verify_cacrt to validate the CA file""" + + cacrt_fname = create_temp_file("ca_file", cacrt_data) + + try: + verify_cacrt(cacrt_fname) + except ServerConfigException as e: + return False, "Invalid CA certificate: {}".format(str(e)) + else: + return True, "" + + def _valid_token(self, token_data): + """basic checks on the token""" + if not token_data: + return False, "Token file is empty" + + pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$") + if not pattern.match(token_data): + return False, "Token contains invalid characters" + + return True, "" + + def _valid_namespace(self, namespace): + # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols + + if len(namespace) > 253: + return False, "Name too long" + if namespace.isdigit(): + return False, "Invalid name - must be alphanumeric" + + pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$") + if not pattern.match(namespace): + return False, "Invalid characters in the name" + + return True, "" + + def _config_set(self, key, val): + """Attempt to validate the content, then save to KV store""" + + val = val.rstrip() # remove any trailing whitespace/newline + + try: + checker = getattr(self, "_valid_" + key) + except AttributeError: + # no checker available, just let it pass + self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key)) + valid = True + else: + valid, reason = checker(val) + + if valid: + self.set_store(key, val) + self.log.info("Updated config KV Store item: " + key) + return 0, "", "Config updated for parameter '{}'".format(key) + else: + return -22, "", "Invalid value for '{}' :{}".format(key, reason) + + def clear_config_settings(self): + for k in self.k8s_config: + self.set_store(k, None) + return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys())) + def handle_command(self, inbuf, cmd): - # FUTURE: Should we implement dynamic options for the monitoring? + + access_options = ['cacrt', 'token'] + config_options = ['server', 'namespace'] + + if cmd['prefix'] == 'k8sevents clear-config': + return self.clear_config_settings() + + if cmd['prefix'] == 'k8sevents set-access': + if cmd['key'] not in access_options: + return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options)) + + if inbuf: + return self._config_set(cmd['key'], inbuf) + else: + return -errno.EINVAL, "", "Command must specify -i " + + if cmd['prefix'] == 'k8sevents set-config': + + if cmd['key'] not in config_options: + return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options)) + + return self._config_set(cmd['key'], cmd['value']) + + # At this point the command is trying to interact with k8sevents, so intercept if the configuration is + # not ready + if self.error_msg: + _msg = "k8sevents unavailable: " + self.error_msg + ready, _ = self.k8s_ready() + if not self.kubernetes_control and not ready: + _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect" + return -errno.ENODATA, "", _msg + if cmd["prefix"] == "k8sevents status": return self.show_status() - elif cmd["prefix"] == "k8sevents list": + + elif cmd["prefix"] == "k8sevents ls": return self.show_events(self.ns_watcher.events) + elif cmd["prefix"] == "k8sevents ceph": return self.show_events(self.event_processor.events) + else: raise NotImplementedError(cmd["prefix"]) @@ -1058,16 +1321,84 @@ class Module(MgrModule): """Determine whether the pre-reqs for the module are in place""" if not kubernetes_imported: - return False, "kubernetes python client is unavailable" + return False, "kubernetes python client is not available" return True, "" + def load_kubernetes_config(self): + """Load configuration for remote kubernetes API using KV store values + + Attempt to create an API client configuration from settings stored in + KV store. + + Returns: + client.ApiClient: kubernetes API client object + + Raises: + OSError: unable to create the cacrt file + """ + + # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a + # temporary file containing the cert for the client to load from + try: + ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt']) + except OSError as e: + self.log.error("Unable to create file to hold cacrt: {}".format(str(e))) + raise OSError(str(e)) + else: + self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file)) + + configuration = client.Configuration() + configuration.host = self.k8s_config['server'] + configuration.ssl_ca_cert = ca_crt_file + configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] } + api_client = client.ApiClient(configuration) + self.log.info("API client created for remote kubernetes access using cacrt and token from KV store") + + return api_client + def serve(self): + # apply options set by CLI to this module self.config_notify() + if not kubernetes_imported: + self.error_msg = "Unable to start : python kubernetes package is missing" + else: + if self.kubernetes_control: + # running under rook-ceph + config.load_incluster_config() + self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'), + os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN')) + self._api_client_config = None + self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph") + else: + # running outside of rook-ceph, so we need additional settings to tell us + # how to connect to the kubernetes cluster + ready, errors = self.k8s_ready() + if not ready: + self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors)) + else: + try: + self._api_client_config = self.load_kubernetes_config() + except OSError as e: + self.error_msg = str(e) + else: + self._namespace = self.k8s_config['namespace'] + self.log.info("k8sevents configuration loaded from KV store") + + if self.error_msg: + self.log.error(self.error_msg) + return + + # All checks have passed self.config_watcher = CephConfigWatcher(self) + self.event_processor = EventProcessor(self.config_watcher, - self.ceph_event_retention_days) - self.ns_watcher = NamespaceWatcher() + self.ceph_event_retention_days, + self._api_client_config, + self._namespace) + + self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config, + namespace=self._namespace) if self.event_processor.ok: log.info("Ceph Log processor thread starting") @@ -1090,7 +1421,8 @@ class Module(MgrModule): t.reported = True else: - log.warning('Unable to access kubernetes event API - check RBAC rules') + self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?" + log.warning(self.error_msg) log.warning("k8sevents module exiting") self.run = False diff --git a/src/pybind/mgr/k8sevents/rbac_sample.yaml b/src/pybind/mgr/k8sevents/rbac_sample.yaml new file mode 100644 index 00000000000..56392202298 --- /dev/null +++ b/src/pybind/mgr/k8sevents/rbac_sample.yaml @@ -0,0 +1,45 @@ +--- +# Create a namespace to receive our test events +apiVersion: v1 +kind: Namespace +metadata: + name: ceph +--- +# Define the access rules to open the events API to k8sevents +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: ceph-mgr-events-rules +rules: +- apiGroups: + - "" + resources: + - events + verbs: + - create + - list + - watch + - patch + - get +--- +# Define a service account to associate with our event stream +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ceph-mgr + namespace: ceph +--- +# Allow the ceph-mgr service account access to the events api +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: ceph-mgr + namespace: ceph +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: ceph-mgr-events-rules +subjects: +- kind: ServiceAccount + name: ceph-mgr + namespace: ceph diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index 48ee3b30426..42e00da2aa0 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -114,6 +114,25 @@ def get_default_addr(): class ServerConfigException(Exception): pass +def verify_cacrt(cert_fname): + """Basic validation of a ca cert""" + + if not cert_fname: + raise ServerConfigException("CA cert not configured") + if not os.path.isfile(cert_fname): + raise ServerConfigException("Certificate {} does not exist".format(cert_fname)) + + from OpenSSL import crypto + try: + with open(cert_fname) as f: + x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) + if x509.has_expired(): + logger.warning( + 'Certificate {} has expired'.format(cert_fname)) + except (ValueError, crypto.Error) as e: + raise ServerConfigException( + 'Invalid certificate {}: {}'.format(cert_fname, str(e))) + def verify_tls_files(cert_fname, pkey_fname): """Basic checks for TLS certificate and key files @@ -133,21 +152,14 @@ def verify_tls_files(cert_fname, pkey_fname): if not cert_fname or not pkey_fname: raise ServerConfigException('no certificate configured') - if not os.path.isfile(cert_fname): - raise ServerConfigException('certificate %s does not exist' % cert_fname) + + verify_cacrt(cert_fname) + if not os.path.isfile(pkey_fname): raise ServerConfigException('private key %s does not exist' % pkey_fname) from OpenSSL import crypto, SSL - try: - with open(cert_fname) as f: - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) - if x509.has_expired(): - logger.warning( - 'Certificate {} has been expired'.format(cert_fname)) - except (ValueError, crypto.Error) as e: - raise ServerConfigException( - 'Invalid certificate {}: {}'.format(cert_fname, str(e))) + try: with open(pkey_fname) as f: pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read()) -- 2.39.5