import os
+import re
import sys
import time
import json
+import yaml
+import errno
+import socket
+import base64
import logging
+import tempfile
import threading
-from datetime import datetime, timedelta, timezone
-from urllib3.exceptions import MaxRetryError
+try:
+ # python 3
+ from urllib.parse import urlparse
+except ImportError:
+ # python 2 fallback
+ from urlparse import urlparse
+
+from datetime import tzinfo, datetime, timedelta
+
+from urllib3.exceptions import MaxRetryError,ProtocolError
from collections import OrderedDict
import rados
from mgr_module import MgrModule
+from mgr_util import verify_cacrt, ServerConfigException
try:
import queue
kubernetes_imported = False
client = None
config = None
+ watch = None
else:
kubernetes_imported = True
- # Apply the kubernetes config from the cluster
- config.load_incluster_config()
# The watch.Watch.stream method can provide event objects that have involved_object = None
# which causes an exception in the generator. A workaround is discussed for a similar issue
log = logging.getLogger(__name__)
+# use a simple local class to represent UTC
+# datetime pkg modules vary between python2 and 3 and pytz is not available on older
+# ceph container images, so taking a pragmatic approach!
+class UTC(tzinfo):
+ def utcoffset(self, dt):
+ return timedelta(0)
+
+ def tzname(self, dt):
+ return "UTC"
+
+ def dst(self, dt):
+ return timedelta(0)
+
+
def text_suffix(num):
"""Define a text suffix based on a value i.e. turn host into hosts"""
return '' if num == 1 else 's'
+def create_temp_file(fname, content, suffix=".tmp"):
+ """Create a temp file
+
+ Attempt to create an temporary file containing the given content
+
+ Returns:
+ str .. full path to the temporary file
+
+ Raises:
+ OSError: problems creating the file
+
+ """
+
+ if content is not None:
+ file_name = os.path.join(tempfile.gettempdir(), fname + suffix)
+
+ try:
+ with open(file_name, "w") as f:
+ f.write(content)
+ except OSError as e:
+ raise OSError("Unable to create temporary file : {}".format(str(e)))
+
+ return file_name
+
+
class HealthCheck(object):
"""Transform a healthcheck msg into it's component parts"""
def __str__(self):
- return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,self.msg_type,self.msg,self.level,self.tstamp)
+ return "source={}, msg_type={}, msg={}, level={}, tstamp={}".format(self.source,
+ self.msg_type,
+ self.msg,
+ self.level,
+ self.tstamp)
@property
def cmd(self):
else:
return self.msg
-class RookCeph(object):
- """Establish environment defaults when interacting with rook-ceph"""
-
- pod_name = os.environ['POD_NAME']
- host = os.environ['NODE_NAME']
- namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
- cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', 'rook-ceph')
- api = client.CoreV1Api()
class BaseThread(threading.Thread):
health = 'OK'
daemon = True
-class NamespaceWatcher(RookCeph, BaseThread):
+class NamespaceWatcher(BaseThread):
"""Watch events in a given namespace
Using the watch package we can listen to event traffic in the namespace to
so this stream will only really show activity inside this retention window.
"""
- def __init__(self, namespace=None):
+ def __init__(self, api_client_config, namespace=None):
super(NamespaceWatcher, self).__init__()
- if namespace: # override the default
- self.namespace = namespace
+
+ if api_client_config:
+ self.api = client.CoreV1Api(api_client_config)
+ else:
+ self.api = client.CoreV1Api()
+
+ self.namespace = namespace
+
self.events = OrderedDict()
self.lock = threading.Lock()
self.active = None
# Attribute error is generated when urllib3 on the system is old and doesn't have a
# read_chunked method
except AttributeError as e:
- self.health = ("Unable to 'watch' events API in namespace {} - "
- "incompatible urllib3? ({})".format(self.namespace, e))
+ self.health = ("Error: Unable to 'watch' events API in namespace '{}' - "
+ "urllib3 too old? ({})".format(self.namespace, e))
self.active = False
log.warning(self.health)
break
except ApiException as e:
# refresh the resource_version & watcher
log.warning("API exception caught in watcher ({})".format(e))
- log.info("Restarting namespace watcher")
+ log.warning("Restarting namespace watcher")
self.fetch()
except Exception:
log.warning("Namespace event watcher stopped")
-class KubernetesEvent(RookCeph):
+class KubernetesEvent(object):
+
+ def __init__(self, log_entry, unique_name=True, api_client_config=None, namespace=None):
+
+ if api_client_config:
+ self.api = client.CoreV1Api(api_client_config)
+ else:
+ self.api = client.CoreV1Api()
- def __init__(self, log_entry, unique_name=True):
- super(KubernetesEvent, self).__init__()
+ self.namespace = namespace
self.event_name = log_entry.event_name
self.message = log_entry.event_msg
self.event_reason = log_entry.event_reason
self.unique_name = unique_name
+ self.host = os.environ.get('NODE_NAME', os.environ.get('HOSTNAME', 'UNKNOWN'))
+
self.api_status = 200
self.count = 1
self.first_timestamp = None
def write(self):
- now=datetime.now(timezone.utc)
+ now=datetime.now(UTC())
self.first_timestamp = now
self.last_timestamp = now
try:
self.api.create_namespaced_event(self.namespace, self.event_body)
+ except (OSError, ProtocolError):
+ # unable to reach to the API server
+ log.error("Unable to reach API server")
+ self.api_status = 400
except MaxRetryError:
- # k8s config has not be defined properly
+ # k8s config has not be defined properly
+ log.error("multiple attempts to connect to the API have failed")
self.api_status = 403 # Forbidden
except ApiException as e:
+ log.debug("event.write status:{}".format(e.status))
self.api_status = e.status
if e.status == 409:
+ log.debug("attempting event update for an existing event")
# 409 means the event is there already, so read it back (v1Event object returned)
# this could happen if the event has been created, and then the k8sevent module
# disabled and reenabled - i.e. the internal event tracking no longer matches k8s
try:
self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
except ApiException as e:
+ log.error("event.patch failed for {} with status code:{}".format(self.event_name, e.status))
self.api_status = e.status
else:
+ log.debug("event {} patched".format(self.event_name))
self.api_status = 200
else:
+ log.debug("event {} created successfully".format(self.event_name))
self.api_status = 200
@property
def update(self, log_entry):
self.message = log_entry.event_msg
self.event_type = log_entry.event_type
- self.last_timestamp = datetime.now(timezone.utc)
+ self.last_timestamp = datetime.now(UTC())
self.count += 1
+ log.debug("performing event update for {}".format(self.event_name))
try:
self.api.patch_namespaced_event(self.event_name, self.namespace, self.event_body)
except ApiException as e:
+ log.error("event patch call failed: {}".format(e.status))
if e.status == 404:
# tried to patch, but hit a 404. The event's TTL must have been reached, and
# pruned by the kube-apiserver
+ log.debug("event not found, so attempting to create it")
try:
self.api.create_namespaced_event(self.namespace, self.event_body)
except ApiException as e:
+ log.error("unable to create the event: {}".format(e.status))
self.api_status = e.status
else:
+ log.debug("event {} created successfully".format(self.event_name))
self.api_status = 200
else:
+ log.debug("event {} updated".format(self.event_name))
self.api_status = 200
can_run = True
- def __init__(self, config_watcher, event_retention_days):
+ def __init__(self, config_watcher, event_retention_days, api_client_config, namespace):
super(EventProcessor, self).__init__()
+
self.events = dict()
self.config_watcher = config_watcher
self.event_retention_days = event_retention_days
+ self.api_client_config = api_client_config
+ self.namespace = namespace
def startup(self):
"""Log an event to show we're active"""
level='INF',
tstamp=None
),
- unique_name=False
+ unique_name=False,
+ api_client_config=self.api_client_config,
+ namespace=self.namespace
)
event.write()
return self.startup()
def prune_events(self):
-
- oldest = datetime.now(timezone.utc) - timedelta(days=self.event_retention_days)
+ log.debug("prune_events - looking for old events to remove from cache")
+ oldest = datetime.now(UTC()) - timedelta(days=self.event_retention_days)
local_events = dict(self.events)
for event_name in sorted(local_events,
break
else:
# drop this event
- log.debug("Removing old event : {}".format(event_name))
+ log.debug("prune_events - removing old event : {}".format(event_name))
del self.events[event_name]
def process(self, log_object):
- log.debug("K8sevents processing : {}".format(str(log_object)))
+ log.debug("log entry being processed : {}".format(str(log_object)))
event_out = False
unique_name = True
log.warning("K8sevents received unknown msg_type - {}".format(log_object.msg_type))
if event_out:
+ log.debug("k8sevents sending event to kubernetes")
# we don't cache non-unique events like heartbeats or config changes
if not unique_name or log_object.event_name not in self.events.keys():
event = KubernetesEvent(log_entry=log_object,
- unique_name=unique_name)
+ unique_name=unique_name,
+ api_client_config=self.api_client_config,
+ namespace=self.namespace)
event.write()
log.debug("event(unique={}) creation ended : {}".format(unique_name, event.api_status))
if event.api_success and unique_name:
else:
event = self.events[log_object.event_name]
event.update(log_object)
- log.debug("Event update ended : {}".format(event.api_status))
+ log.debug("event update ended : {}".format(event.api_status))
self.prune_events()
log.debug("K8sevents ignored message : {}".format(log_object.msg))
def run(self):
- log.info("Ceph event processing thread started, event retention set to {} days".format(self.event_retention_days))
+ log.info("Ceph event processing thread started, "
+ "event retention set to {} days".format(self.event_retention_days))
while True:
if size_diff != 0:
if size_diff < 0:
msg = "Data protection level of pool '{}' reduced to {} copies".format(pool_name,
- pool_map[pool_name]['size'])
+ pool_map[pool_name]['size'])
level = 'WRN'
else:
msg = "Data protection level of pool '{}' increased to {} copies".format(pool_name,
- pool_map[pool_name]['size'])
+ pool_map[pool_name]['size'])
level = 'INF'
changes.append(LogEntry(source="config",
# FUTURE
# Could generate an event if a ceph daemon has moved hosts
- # (assumes the ceph metadata host information is valid - which may not be the case)
+ # (assumes the ceph metadata host information is valid though!)
return changes
"perm": "r"
},
{
- "cmd": "k8sevents list",
+ "cmd": "k8sevents ls",
"desc": "List all current Kuberenetes events from the Ceph namespace",
"perm": "r"
},
"cmd": "k8sevents ceph",
"desc": "List Ceph events tracked & sent to the kubernetes cluster",
"perm": "r"
- }
+ },
+ {
+ "cmd": "k8sevents set-access name=key,type=CephString",
+ "desc": "Set kubernetes access credentials. <key> must be cacrt or token and use -i <filename> syntax.\ne.g. ceph k8sevents set-access cacrt -i /root/ca.crt",
+ "perm": "rw"
+ },
+ {
+ "cmd": "k8sevents set-config name=key,type=CephString name=value,type=CephString",
+ "desc": "Set kubernetes config paramters. <key> must be server or namespace.\ne.g. ceph k8sevents set-config server https://localhost:30433",
+ "perm": "rw"
+ },
+ {
+ "cmd": "k8sevents clear-config",
+ "desc": "Clear external kubernetes configuration settings",
+ "perm": "rw"
+ },
]
MODULE_OPTIONS = [
{'name': 'config_check_secs',
def __init__(self, *args, **kwargs):
self.run = True
+ self.kubernetes_control = 'POD_NAME' in os.environ
self.event_processor = None
self.config_watcher = None
self.ns_watcher = None
self.trackers = list()
+ self.error_msg = None
+ self._api_client_config = None
+ self._namespace = None
# Declare the module options we accept
self.config_check_secs = None
self.ceph_event_retention_days = None
+ self.k8s_config = dict(
+ cacrt = None,
+ token = None,
+ server = None,
+ namespace = None
+ )
+
super(Module, self).__init__(*args, **kwargs)
+ def k8s_ready(self):
+ """Validate the k8s_config dict
+
+ Returns:
+ - bool .... indicating whether the config is ready to use
+ - string .. variables that need to be defined before the module will function
+
+ """
+ missing = list()
+ ready = True
+ for k in self.k8s_config:
+ if not self.k8s_config[k]:
+ missing.append(k)
+ ready = False
+ return ready, missing
+
def config_notify(self):
- """Apply module options to runtime"""
+ """Apply runtime module options, and defaults from the modules KV store"""
+ self.log.debug("applying runtime module option settings")
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']) or opt['default'])
+ if not self.kubernetes_control:
+ # Populate the config
+ self.log.debug("loading config from KV store")
+ for k in self.k8s_config:
+ self.k8s_config[k] = self.get_store(k, default=None)
+
def fetch_events(self, limit=None):
"""Interface to expose current events to another mgr module"""
# FUTURE: Implement this to provide k8s events to the dashboard?
required_fields = ['channel', 'message', 'priority', 'stamp']
_message_attrs = log_message.keys()
if all(_field in _message_attrs for _field in required_fields):
+ self.log.debug("clog entry received - adding to the queue")
if log_message.get('message').startswith('overall HEALTH'):
m_type = 'heartbeat'
else:
)
else:
- log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
+ self.log.warning("Unexpected clog message format received - skipped: {}".format(log_message))
def notify(self, notify_type, notify_id):
"""
``from send_command``.
"""
- # only interested in cluster log messages for now
+ # only interested in cluster log (clog) messages for now
if notify_type == 'clog':
+ self.log.debug("received a clog entry from mgr.notify")
if isinstance(notify_id, dict):
# create a log object to process
self.process_clog(notify_id)
else:
- log.warning("Expecting log record format of dict received {}".format(type(notify_type)))
+ self.log.warning("Expected a 'dict' log record format, received {}".format(type(notify_type)))
def _show_events(self, events):
max_msg_length = max([len(events[k].message) for k in events])
fmt = "{:<20} {:<8} {:>5} {:<" + str(max_msg_length) + "} {}\n"
- s = fmt.format("Last Seen", "Type", "Count", "Message", "Event Object Name")
+ s = fmt.format("Last Seen (UTC)", "Type", "Count", "Message", "Event Object Name")
for event_name in sorted(events,
key = lambda name: events[name].last_timestamp,
return 0, "", "No events emitted yet, local cache is empty"
def show_status(self):
- s = "Tracker Health\n"
+ s = "Kubernetes\n"
+ s += "- Hostname : {}\n".format(self.k8s_config['server'])
+ s += "- Namespace : {}\n".format(self._namespace)
+ s += "Tracker Health\n"
for t in self.trackers:
- s += "- {} : {}\n".format(t.__class__.__name__, t.health)
+ s += "- {:<20} : {}\n".format(t.__class__.__name__, t.health)
s += "Tracked Events\n"
- s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
- s += "- ceph events: {:>3}\n".format(len(self.event_processor.events))
+ s += "- namespace : {:>3}\n".format(len(self.ns_watcher.events))
+ s += "- ceph events : {:>3}\n".format(len(self.event_processor.events))
return 0, "", s
+ def _valid_server(self, server):
+ # must be a valid server url format
+ server = server.strip()
+
+ res = urlparse(server)
+ port = res.netloc.split(":")[-1]
+
+ if res.scheme != 'https':
+ return False, "Server URL must use https"
+
+ elif not res.hostname:
+ return False, "Invalid server URL format"
+
+ elif res.hostname:
+ try:
+ socket.gethostbyname(res.hostname)
+ except socket.gaierror:
+ return False, "Unresolvable server URL"
+
+ if not port.isdigit():
+ return False, "Server URL must end in a port number"
+
+ return True, ""
+
+ def _valid_cacrt(self, cacrt_data):
+ """use mgr_util.verify_cacrt to validate the CA file"""
+
+ cacrt_fname = create_temp_file("ca_file", cacrt_data)
+
+ try:
+ verify_cacrt(cacrt_fname)
+ except ServerConfigException as e:
+ return False, "Invalid CA certificate: {}".format(str(e))
+ else:
+ return True, ""
+
+ def _valid_token(self, token_data):
+ """basic checks on the token"""
+ if not token_data:
+ return False, "Token file is empty"
+
+ pattern = re.compile(r"[a-zA-Z0-9\-\.\_]+$")
+ if not pattern.match(token_data):
+ return False, "Token contains invalid characters"
+
+ return True, ""
+
+ def _valid_namespace(self, namespace):
+ # Simple check - name must be a string <= 253 in length, alphanumeric with '.' and '-' symbols
+
+ if len(namespace) > 253:
+ return False, "Name too long"
+ if namespace.isdigit():
+ return False, "Invalid name - must be alphanumeric"
+
+ pattern = re.compile(r"^[a-z][a-z0-9\-\.]+$")
+ if not pattern.match(namespace):
+ return False, "Invalid characters in the name"
+
+ return True, ""
+
+ def _config_set(self, key, val):
+ """Attempt to validate the content, then save to KV store"""
+
+ val = val.rstrip() # remove any trailing whitespace/newline
+
+ try:
+ checker = getattr(self, "_valid_" + key)
+ except AttributeError:
+ # no checker available, just let it pass
+ self.log.warning("Unable to validate '{}' parameter - checker not implemented".format(key))
+ valid = True
+ else:
+ valid, reason = checker(val)
+
+ if valid:
+ self.set_store(key, val)
+ self.log.info("Updated config KV Store item: " + key)
+ return 0, "", "Config updated for parameter '{}'".format(key)
+ else:
+ return -22, "", "Invalid value for '{}' :{}".format(key, reason)
+
+ def clear_config_settings(self):
+ for k in self.k8s_config:
+ self.set_store(k, None)
+ return 0,"","{} configuration keys removed".format(len(self.k8s_config.keys()))
+
def handle_command(self, inbuf, cmd):
- # FUTURE: Should we implement dynamic options for the monitoring?
+
+ access_options = ['cacrt', 'token']
+ config_options = ['server', 'namespace']
+
+ if cmd['prefix'] == 'k8sevents clear-config':
+ return self.clear_config_settings()
+
+ if cmd['prefix'] == 'k8sevents set-access':
+ if cmd['key'] not in access_options:
+ return -errno.EINVAL, "", "Unknown access option. Must be one of; {}".format(','.join(access_options))
+
+ if inbuf:
+ return self._config_set(cmd['key'], inbuf)
+ else:
+ return -errno.EINVAL, "", "Command must specify -i <filename>"
+
+ if cmd['prefix'] == 'k8sevents set-config':
+
+ if cmd['key'] not in config_options:
+ return -errno.EINVAL, "", "Unknown config option. Must be one of; {}".format(','.join(config_options))
+
+ return self._config_set(cmd['key'], cmd['value'])
+
+ # At this point the command is trying to interact with k8sevents, so intercept if the configuration is
+ # not ready
+ if self.error_msg:
+ _msg = "k8sevents unavailable: " + self.error_msg
+ ready, _ = self.k8s_ready()
+ if not self.kubernetes_control and not ready:
+ _msg += "\nOnce all variables have been defined, you must restart the k8sevents module for the changes to take effect"
+ return -errno.ENODATA, "", _msg
+
if cmd["prefix"] == "k8sevents status":
return self.show_status()
- elif cmd["prefix"] == "k8sevents list":
+
+ elif cmd["prefix"] == "k8sevents ls":
return self.show_events(self.ns_watcher.events)
+
elif cmd["prefix"] == "k8sevents ceph":
return self.show_events(self.event_processor.events)
+
else:
raise NotImplementedError(cmd["prefix"])
"""Determine whether the pre-reqs for the module are in place"""
if not kubernetes_imported:
- return False, "kubernetes python client is unavailable"
+ return False, "kubernetes python client is not available"
return True, ""
+ def load_kubernetes_config(self):
+ """Load configuration for remote kubernetes API using KV store values
+
+ Attempt to create an API client configuration from settings stored in
+ KV store.
+
+ Returns:
+ client.ApiClient: kubernetes API client object
+
+ Raises:
+ OSError: unable to create the cacrt file
+ """
+
+ # the kubernetes setting Configuration.ssl_ca_cert is a path, so we have to create a
+ # temporary file containing the cert for the client to load from
+ try:
+ ca_crt_file = create_temp_file('cacrt', self.k8s_config['cacrt'])
+ except OSError as e:
+ self.log.error("Unable to create file to hold cacrt: {}".format(str(e)))
+ raise OSError(str(e))
+ else:
+ self.log.debug("CA certificate from KV store, written to {}".format(ca_crt_file))
+
+ configuration = client.Configuration()
+ configuration.host = self.k8s_config['server']
+ configuration.ssl_ca_cert = ca_crt_file
+ configuration.api_key = { "authorization": "Bearer " + self.k8s_config['token'] }
+ api_client = client.ApiClient(configuration)
+ self.log.info("API client created for remote kubernetes access using cacrt and token from KV store")
+
+ return api_client
+
def serve(self):
+ # apply options set by CLI to this module
self.config_notify()
+ if not kubernetes_imported:
+ self.error_msg = "Unable to start : python kubernetes package is missing"
+ else:
+ if self.kubernetes_control:
+ # running under rook-ceph
+ config.load_incluster_config()
+ self.k8s_config['server'] = "https://{}:{}".format(os.environ.get('KUBERNETES_SERVICE_HOST', 'UNKNOWN'),
+ os.environ.get('KUBERNETES_SERVICE_PORT_HTTPS', 'UNKNOWN'))
+ self._api_client_config = None
+ self._namespace = os.environ.get("POD_NAMESPACE", "rook-ceph")
+ else:
+ # running outside of rook-ceph, so we need additional settings to tell us
+ # how to connect to the kubernetes cluster
+ ready, errors = self.k8s_ready()
+ if not ready:
+ self.error_msg = "Required settings missing. Use ceph k8sevents set-access | set-config to define {}".format(",".join(errors))
+ else:
+ try:
+ self._api_client_config = self.load_kubernetes_config()
+ except OSError as e:
+ self.error_msg = str(e)
+ else:
+ self._namespace = self.k8s_config['namespace']
+ self.log.info("k8sevents configuration loaded from KV store")
+
+ if self.error_msg:
+ self.log.error(self.error_msg)
+ return
+
+ # All checks have passed
self.config_watcher = CephConfigWatcher(self)
+
self.event_processor = EventProcessor(self.config_watcher,
- self.ceph_event_retention_days)
- self.ns_watcher = NamespaceWatcher()
+ self.ceph_event_retention_days,
+ self._api_client_config,
+ self._namespace)
+
+ self.ns_watcher = NamespaceWatcher(api_client_config=self._api_client_config,
+ namespace=self._namespace)
if self.event_processor.ok:
log.info("Ceph Log processor thread starting")
t.reported = True
else:
- log.warning('Unable to access kubernetes event API - check RBAC rules')
+ self.error_msg = "Unable to access kubernetes API. Is it accessible? Are RBAC rules for our token valid?"
+ log.warning(self.error_msg)
log.warning("k8sevents module exiting")
self.run = False