class SNMPGateway:
"""Defines an SNMP gateway between Prometheus and SNMP monitoring Frameworks"""
daemon_type = 'snmp-gateway'
- supported_versions = ['V2c']
+ SUPPORTED_VERSIONS = ['V2c', 'V3']
default_image = DEFAULT_SNMP_GATEWAY_IMAGE
+ DEFAULT_PORT = 9464
env_filename = 'snmp-gateway.conf'
def __init__(self,
fsid: str,
daemon_id: Union[int, str],
config_json: Dict[str, Any],
- image: Optional[str] = None ) -> None:
+ image: Optional[str] = None) -> None:
self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.uid = config_json.get('uid', 0)
self.gid = config_json.get('gid', 0)
- self.ports = list([config_json.get('listen_port', 9464)])
+
self.destination = config_json.get('destination', '')
self.snmp_version = config_json.get('snmp_version', 'V2c')
self.snmp_community = config_json.get('snmp_community', 'public')
self.log_level = config_json.get('log_level', 'info')
- self.snmp_v3_auth_user = config_json.get('snmp_v3_auth_user', '')
+ self.snmp_v3_auth_username = config_json.get('snmp_v3_auth_username', '')
self.snmp_v3_auth_password = config_json.get('snmp_v3_auth_password', '')
+ self.snmp_v3_auth_protocol = config_json.get('snmp_v3_auth_protocol', '')
+ self.snmp_v3_priv_protocol = config_json.get('snmp_v3_priv_protocol', '')
self.snmp_v3_priv_password = config_json.get('snmp_v3_priv_password', '')
-
- # TODO Add SNMP V3 parameters
+ self.snmp_v3_engine_id = config_json.get('snmp_v3_engine_id', '')
self.validate()
get_parm(ctx.config_json), ctx.image)
@staticmethod
- def get_version(ctx: CephadmContext, fsid, daemon_id: str) -> Optional[str]:
+ def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]:
"""Return the version of the notifer from it's http endpoint"""
- path = os.path.join(ctx.data_dir, fsid, f"snmp-gateway.{daemon_id}", 'unit.meta')
+ path = os.path.join(ctx.data_dir, fsid, f'snmp-gateway.{daemon_id}', 'unit.meta')
try:
with open(path, 'r') as env:
metadata = json.loads(env.read())
return None
try:
- with urlopen(f"http://0.0.0.0:{ports[0]}/") as r:
+ with urlopen(f'http://127.0.0.1:{ports[0]}/') as r:
html = r.read().decode('utf-8').split('\n')
except (HTTPError, URLError):
return None
return None
- def get_daemon_args(self):
+ @property
+ def port(self) -> int:
+ if not self.ctx.tcp_ports:
+ return self.DEFAULT_PORT
+ else:
+ if len(self.ctx.tcp_ports) > 0:
+ return int(self.ctx.tcp_ports.split()[0])
+ else:
+ return self.DEFAULT_PORT
- args = [
- f'--web.listen-address=:{self.ports[0]}',
+ def get_daemon_args(self) -> List[str]:
+ v3_args = []
+ base_args = [
+ f'--web.listen-address=:{self.port}',
f'--snmp.destination={self.destination}',
f'--snmp.version={self.snmp_version}',
f'--log.level={self.log_level}',
'--snmp.trap-description-template=/etc/snmp_notifier/description-template.tpl'
]
- return args
+ if self.snmp_version == 'V3':
+ # common auth settings
+ v3_args.extend([
+ '--snmp.authentication-enabled',
+ f'--snmp.authentication-protocol={self.snmp_v3_auth_protocol}',
+ f'--snmp.security-engine-id={self.snmp_v3_engine_id}'
+ ])
+ # authPriv setting is applied if we have a privacy protocol setting
+ if self.snmp_v3_priv_protocol:
+ v3_args.extend([
+ '--snmp.private-enabled',
+ f'--snmp.private-protocol={self.snmp_v3_priv_protocol}'
+ ])
+
+ return base_args + v3_args
@property
def data_dir(self) -> str:
- return os.path.join(self.ctx.data_dir, self.ctx.fsid, f"{self.daemon_type}.{self.daemon_id}")
+ return os.path.join(self.ctx.data_dir, self.ctx.fsid, f'{self.daemon_type}.{self.daemon_id}')
@property
def conf_file_path(self) -> str:
return os.path.join(self.data_dir, self.env_filename)
- def create_daemon_conf(self):
+ def create_daemon_conf(self) -> None:
"""Creates the environment file holding 'secrets' passed to the snmp-notifier daemon"""
with open(os.open(self.conf_file_path, os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
- f.write(f"PORT={self.ports[0]}\n")
if self.snmp_version == 'V2c':
- f.write(f"SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n")
+ f.write(f'SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n')
else:
- # add snmp v3 settings here
- pass
+ f.write(f'SNMP_NOTIFIER_AUTH_USERNAME={self.snmp_v3_auth_username}\n')
+ f.write(f'SNMP_NOTIFIER_AUTH_PASSWORD={self.snmp_v3_auth_password}\n')
+ if self.snmp_v3_priv_password:
+ f.write(f'SNMP_NOTIFIER_PRIV_PASSWORD={self.snmp_v3_priv_password}\n')
- def validate(self):
+ def validate(self) -> None:
"""Validate the settings
Raises:
if not is_fsid(self.fsid):
raise Error(f'not a valid fsid: {self.fsid}')
- if self.snmp_version not in SNMPGateway.supported_versions:
- raise Error(f'not a valid snmp version: {self.fsid}')
+ if self.snmp_version not in SNMPGateway.SUPPORTED_VERSIONS:
+ raise Error(f'not a valid snmp version: {self.snmp_version}')
if not self.destination:
raise Error('config is missing destination attribute(<ip>:<port>) of the target SNMP listener')
elif daemon_type == SNMPGateway.daemon_type:
sg = SNMPGateway.init(ctx, fsid, daemon_id)
container_args.append(
- f"--env-file={sg.conf_file_path}"
+ f'--env-file={sg.conf_file_path}'
)
# if using podman, set -d, --conmon-pidfile & --cidfile flags
c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
sc.uid, sc.gid,
- ports=sc.ports)
+ ports=daemon_ports)
else:
raise Error('daemon type {} not implemented in command_deploy function'
from .services.nfs import NFSService
from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
- NodeExporterService
+ NodeExporterService, SNMPGatewayService
from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:6.7.4'
DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3'
DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived'
+DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
# ------------------------------------------------------------------------------
default=DEFAULT_KEEPALIVED_IMAGE,
desc='Keepalived container image',
),
+ Option(
+ 'container_image_snmp_gateway',
+ default=DEFAULT_SNMP_GATEWAY_IMAGE,
+ desc='SNMP Gateway container image',
+ ),
Option(
'warn_on_stray_hosts',
type='bool',
self.container_image_node_exporter = ''
self.container_image_haproxy = ''
self.container_image_keepalived = ''
+ self.container_image_snmp_gateway = ''
self.warn_on_stray_hosts = True
self.warn_on_stray_daemons = True
self.warn_on_failed_host_check = True
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, CrashService, IscsiService,
IngressService, CustomContainerService, CephfsMirrorService,
- CephadmAgent
+ CephadmAgent, SNMPGatewayService
]
# https://github.com/python/mypy/issues/8993
suffix = daemon_type not in [
'mon', 'crash',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
- 'container', 'agent'
+ 'container', 'agent', 'snmp-gateway'
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
# is only available when a container is deployed (given
# via spec).
image = None
+ elif daemon_type == 'snmp-gateway':
+ image = self.container_image_snmp_gateway
else:
assert False, daemon_type
need = {
'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
'grafana': ['prometheus'],
- 'alertmanager': ['mgr', 'alertmanager'],
+ 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
}
for dep_type in need.get(daemon_type, []):
for dd in self.cache.get_daemons_by_type(dep_type):
'node-exporter': PlacementSpec(host_pattern='*'),
'crash': PlacementSpec(host_pattern='*'),
'container': PlacementSpec(count=1),
+ 'snmp-gateway': PlacementSpec(count=1),
}
spec.placement = defaults[spec.service_type]
elif spec.service_type in ['mon', 'mgr'] and \
def apply_container(self, spec: ServiceSpec) -> str:
return self._apply(spec)
+ @handle_orch_error
+ def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
+ return self._apply(spec)
+
@handle_orch_error
def upgrade_check(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
from mgr_module import HandleCommandResult
from orchestrator import DaemonDescription
-from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec
+from ceph.deployment.service_spec import AlertManagerSpec, GrafanaSpec, ServiceSpec, SNMPGatewaySpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
from cephadm.services.ingress import IngressSpec
from mgr_util import verify_tls, ServerConfigException, create_self_signed_cert, build_url
# dashboard(s)
dashboard_urls: List[str] = []
+ snmp_gateway_urls: List[str] = []
mgr_map = self.mgr.get('mgr_map')
port = None
proto = None # http: or https:
addr = self.mgr.inventory.get_addr(dd.hostname)
dashboard_urls.append(build_url(scheme=proto, host=addr, port=port))
+ for dd in self.mgr.cache.get_daemons_by_service('snmp-gateway'):
+ assert dd.hostname is not None
+ assert dd.ports
+ addr = dd.ip if dd.ip else self._inventory_get_addr(dd.hostname)
+ deps.append(dd.name())
+ snmp_gateway_urls.append(f"http://{addr}:{dd.ports[0]}/alerts")
+
context = {
'dashboard_urls': dashboard_urls,
- 'default_webhook_urls': default_webhook_urls
+ 'default_webhook_urls': default_webhook_urls,
+ 'snmp_gateway_urls': snmp_gateway_urls,
}
yml = self.mgr.template.render('services/alertmanager/alertmanager.yml.j2', context)
deps.append(dd.name())
addr = self.mgr.inventory.get_addr(dd.hostname)
peers.append(build_url(host=addr, port=port).lstrip('/'))
+
return {
"files": {
"alertmanager.yml": yml
names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
out = f'It is presumed safe to stop {names}'
return HandleCommandResult(0, out, '')
+
+
+class SNMPGatewayService(CephadmService):
+ TYPE = 'snmp-gateway'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ assert self.TYPE == daemon_spec.daemon_type
+ deps: List[str] = []
+
+ spec = cast(SNMPGatewaySpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+ config = {
+ "destination": spec.snmp_destination,
+ "snmp_version": spec.snmp_version,
+ }
+ if spec.snmp_version == 'V2c':
+ community = spec.credentials.get('snmp_community', None)
+ assert community is not None
+
+ config.update({
+ "snmp_community": community
+ })
+ else:
+ # SNMP v3 settings can be either authNoPriv or authPriv
+ auth_protocol = 'SHA' if not spec.auth_protocol else spec.auth_protocol
+
+ auth_username = spec.credentials.get('snmp_v3_auth_username', None)
+ auth_password = spec.credentials.get('snmp_v3_auth_password', None)
+ assert auth_username is not None
+ assert auth_password is not None
+ assert spec.engine_id is not None
+
+ config.update({
+ "snmp_v3_auth_protocol": auth_protocol,
+ "snmp_v3_auth_username": auth_username,
+ "snmp_v3_auth_password": auth_password,
+ "snmp_v3_engine_id": spec.engine_id,
+ })
+ # authPriv adds encryption
+ if spec.privacy_protocol:
+ priv_password = spec.credentials.get('snmp_v3_priv_password', None)
+ assert priv_password is not None
+
+ config.update({
+ "snmp_v3_priv_protocol": spec.privacy_protocol,
+ "snmp_v3_priv_password": priv_password,
+ })
+
+ logger.debug(
+ f"Generated configuration for '{self.TYPE}' service. Dependencies={deps}")
+
+ return config, sorted(deps)
group_interval: 10s
repeat_interval: 1h
receiver: 'ceph-dashboard'
+{% if snmp_gateway_urls %}
+ continue: true
+ - receiver: 'snmp-gateway'
+ repeat_interval: 1h
+ group_interval: 10s
+ group_by: ['alertname']
+ match_re:
+ oid: "(1.3.6.1.4.1.50495.).*"
+{% endif %}
receivers:
- name: 'default'
{% for url in dashboard_urls %}
- url: '{{ url }}/api/prometheus_receiver'
{% endfor %}
+{% if snmp_gateway_urls %}
+- name: 'snmp-gateway'
+ webhook_configs:
+{% for url in snmp_gateway_urls %}
+ - url: '{{ url }}'
+{% endfor %}
+{% endif %}
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, IngressSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
from ceph.utils import datetime_to_str, str_to_datetime
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'ingress': self.apply_ingress,
+ 'snmp-gateway': self.apply_snmp_gateway,
'host': self.add_host,
}
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
+ def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+ """Update an existing snmp gateway service"""
+ raise NotImplementedError()
+
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
'crash': 'crash',
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
- 'agent': 'agent'
+ 'agent': 'agent',
+ 'snmp-gateway': 'snmp-gateway',
}
return mapping[dtype]
'node-exporter': ['node-exporter'],
'crash': ['crash'],
'container': ['container'],
- 'agent': ['agent']
+ 'agent': ['agent'],
+ 'snmp-gateway': ['snmp-gateway'],
}
return mapping[stype]
from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection, OSDMethod
-from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, service_spec_allow_invalid_from_json
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, service_spec_allow_invalid_from_json, \
+ SNMPGatewaySpec, SNMPVersion, SNMPAuthType, SNMPPrivacyType
from ceph.deployment.hostspec import SpecValidationError
from ceph.utils import datetime_now
rgw = 'rgw'
nfs = 'nfs'
iscsi = 'iscsi'
+ snmp_gateway = 'snmp-gateway'
class ServiceAction(enum.Enum):
return self._apply_misc([spec], dry_run, format, no_overwrite)
+ @_cli_write_command('orch apply snmp-gateway')
+ def _apply_snmp_gateway(self,
+ snmp_version: SNMPVersion,
+ destination: str,
+ port: int = 9464,
+ engine_id: Optional[str] = None,
+ auth_protocol: Optional[SNMPAuthType] = None,
+ privacy_protocol: Optional[SNMPPrivacyType] = None,
+ placement: Optional[str] = None,
+ unmanaged: bool = False,
+ dry_run: bool = False,
+ format: Format = Format.plain,
+ no_overwrite: bool = False,
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ """Add a Prometheus to SNMP gateway service (cephadm only)"""
+
+ if not inbuf:
+ raise OrchestratorValidationError(
+ 'missing credential configuration file. Retry with -i <filename>')
+
+ try:
+ # load inbuf
+ credentials = yaml.safe_load(inbuf)
+ except (OSError, yaml.YAMLError):
+ raise OrchestratorValidationError('credentials file must be valid YAML')
+
+ auth = None if not auth_protocol else auth_protocol.value
+ priv = None if not privacy_protocol else privacy_protocol.value
+
+ spec = SNMPGatewaySpec(
+ snmp_version=snmp_version.value,
+ port=port,
+ credentials=credentials,
+ snmp_destination=destination,
+ engine_id=engine_id,
+ auth_protocol=auth,
+ privacy_protocol=priv,
+ placement=PlacementSpec.from_string(placement),
+ unmanaged=unmanaged,
+ preview_only=dry_run
+ )
+
+ spec.validate() # force any validation exceptions to be caught correctly
+
+ return self._apply_misc([spec], dry_run, format, no_overwrite)
+
@_cli_write_command('orch set backend')
def _set_backend(self, module_name: Optional[str] = None) -> HandleCommandResult:
"""
import fnmatch
import re
+import enum
from collections import OrderedDict
from contextlib import contextmanager
from functools import wraps
import yaml
from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host
-from ceph.deployment.utils import unwrap_ipv6
+from ceph.deployment.utils import unwrap_ipv6, valid_addr
+from ceph.utils import is_hex
ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec')
FuncT = TypeVar('FuncT', bound=Callable)
+class SNMPVersion(enum.Enum):
+ V2c = 'V2c'
+ V3 = 'V3'
+
+
+class SNMPAuthType(enum.Enum):
+ MD5 = 'MD5'
+ SHA = 'SHA'
+
+
+class SNMPPrivacyType(enum.Enum):
+ DES = 'DES'
+ AES = 'AES'
+
+
def handle_type_error(method: FuncT) -> FuncT:
@wraps(method)
def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ' \
- 'container ingress cephfs-mirror'.split()
+ 'container ingress cephfs-mirror snmp-gateway'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
'grafana': GrafanaSpec,
'node-exporter': MonitoringSpec,
'prometheus': MonitoringSpec,
+ 'snmp-gateway': SNMPGatewaySpec,
}.get(service_type, cls)
if ret == ServiceSpec and not service_type:
raise SpecValidationError('Spec needs a "service_type" key.')
yaml.add_representer(GrafanaSpec, ServiceSpec.yaml_representer)
+
+
+class SNMPGatewaySpec(ServiceSpec):
+ valid_destination_types = [
+ 'Name:Port',
+ 'IPv4:Port'
+ ]
+
+ def __init__(self,
+ service_type: str = 'snmp-gateway',
+ snmp_version: Optional[str] = None,
+ snmp_destination: str = '',
+ credentials: Dict[str, str] = {},
+ engine_id: Optional[str] = None,
+ auth_protocol: Optional[str] = None,
+ privacy_protocol: Optional[str] = None,
+ placement: Optional[PlacementSpec] = None,
+ unmanaged: bool = False,
+ preview_only: bool = False,
+ port: int = 9464,
+ ):
+ assert service_type == 'snmp-gateway'
+
+ super(SNMPGatewaySpec, self).__init__(
+ service_type,
+ placement=placement,
+ unmanaged=unmanaged,
+ preview_only=preview_only)
+
+ self.service_type = service_type
+ self.snmp_version = snmp_version
+ self.snmp_destination = snmp_destination
+ self.port = port
+ self.credentials = credentials
+ self.engine_id = engine_id
+ self.auth_protocol = auth_protocol
+ self.privacy_protocol = privacy_protocol
+
+ @property
+ def ports(self) -> List[int]:
+ return [self.port]
+
+ def get_port_start(self) -> List[int]:
+ return self.ports
+
+ def validate(self) -> None:
+ super(SNMPGatewaySpec, self).validate()
+
+ def _check_type(name: str, value: Optional[str], options: List[str]) -> None:
+ if not value:
+ return
+ if value not in options:
+ raise SpecValidationError(
+ f'{name} unsupported. Must be one of {", ".join(sorted(options))}'
+ )
+
+ if not self.credentials:
+ raise SpecValidationError(
+ 'Missing authentication information (credentials). '
+ 'SNMP V2c and V3 require credential information'
+ )
+ elif not self.snmp_version:
+ raise SpecValidationError(
+ 'Missing SNMP version (snmp_version)'
+ )
+
+ _check_type('snmp_version',
+ self.snmp_version,
+ list(set(opt.value for opt in SNMPVersion)))
+ _check_type('auth_protocol',
+ self.auth_protocol,
+ list(set(opt.value for opt in SNMPAuthType)))
+ _check_type('privacy_protocol',
+ self.privacy_protocol,
+ list(set(opt.value for opt in SNMPPrivacyType)))
+
+ creds_requirement = {
+ 'V2c': ['snmp_community'],
+ 'V3': ['snmp_v3_auth_username', 'snmp_v3_auth_password']
+ }
+ if self.privacy_protocol:
+ creds_requirement['V3'].append('snmp_v3_priv_password')
+
+ missing = [parm for parm in creds_requirement[self.snmp_version]
+ if parm not in self.credentials]
+ # check that credentials are correct for the version
+ if missing:
+ raise SpecValidationError(
+ f'SNMP {self.snmp_version} credentials are incomplete. Missing {", ".join(missing)}'
+ )
+
+ if self.engine_id:
+ if 10 <= len(self.engine_id) <= 64 and \
+ is_hex(self.engine_id) and \
+ len(self.engine_id) % 2 == 0:
+ pass
+ else:
+ raise SpecValidationError(
+ 'engine_id must be a string containing 10-64 hex characters. '
+ 'Its length must be divisible by 2'
+ )
+
+ else:
+ if self.snmp_version == 'V3':
+ raise SpecValidationError(
+ 'Must provide an engine_id for SNMP V3 notifications'
+ )
+
+ if not self.snmp_destination:
+ raise SpecValidationError(
+ 'SNMP destination (snmp_destination) must be provided'
+ )
+ else:
+ valid, description = valid_addr(self.snmp_destination)
+ if not valid:
+ raise SpecValidationError(
+ f'SNMP destination (snmp_destination) is invalid: {description}'
+ )
+ if description not in self.valid_destination_types:
+ raise SpecValidationError(
+ f'SNMP destination (snmp_destination) type ({description}) is invalid. '
+ f'Must be either: {", ".join(sorted(self.valid_destination_types))}'
+ )
+
+
+yaml.add_representer(SNMPGatewaySpec, ServiceSpec.yaml_representer)
import ipaddress
+import socket
+from typing import Tuple, Optional
+from urllib.parse import urlparse
def unwrap_ipv6(address):
return ipaddress.ip_address(address).version == 6
except ValueError:
return False
+
+
+def valid_addr(addr: str) -> Tuple[bool, str]:
+ """check that an address string is valid
+ Valid in this case means that a name is resolvable, or the
+ IP address string is a correctly formed IPv4 or IPv6 address,
+ with or without a port
+
+ Args:
+ addr (str): address
+
+ Returns:
+ Tuple[bool, str]: Validity of the address, either
+ True, address type (IPv4[:Port], IPv6[:Port], Name[:Port])
+ False, <error description>
+ """
+
+ def _dns_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]:
+ try:
+ socket.getaddrinfo(addr, None)
+ except socket.gaierror:
+ # not resolvable
+ return False, 'DNS lookup failed'
+ return True, 'Name:Port' if port else 'Name'
+
+ def _ip_lookup(addr: str, port: Optional[int]) -> Tuple[bool, str]:
+ unwrapped = unwrap_ipv6(addr)
+ try:
+ ip_addr = ipaddress.ip_address(unwrapped)
+ except ValueError:
+ return False, 'Invalid IP v4 or v6 address format'
+ return True, f'IPv{ip_addr.version}:Port' if port else f'IPv{ip_addr.version}'
+
+ dots = addr.count('.')
+ colons = addr.count(':')
+ addr_as_url = f'http://{addr}'
+
+ try:
+ res = urlparse(addr_as_url)
+ except ValueError as e:
+ if str(e) == 'Invalid IPv6 URL':
+ return False, 'Address has incorrect/incomplete use of enclosing brackets'
+ return False, f'Unknown urlparse error {str(e)} for {addr_as_url}'
+
+ addr = res.netloc
+ port = None
+ try:
+ port = res.port
+ if port:
+ addr = addr[:-len(f':{port}')]
+ except ValueError:
+ if colons == 1:
+ return False, 'Port must be numeric'
+ elif ']:' in addr:
+ return False, 'Port must be numeric'
+
+ if addr.startswith('[') and dots:
+ return False, "IPv4 address wrapped in brackets is invalid"
+
+ # catch partial address like 10.8 which would be valid IPaddress schemes
+ # but are classed as invalid here since they're not usable
+ if dots and addr[0].isdigit() and dots != 3:
+ return False, 'Invalid partial IPv4 address'
+
+ if addr[0].isalpha() and '.' in addr:
+ return _dns_lookup(addr, port)
+ return _ip_lookup(addr, port)
import datetime
import re
+import string
from typing import Optional
parts = parts.groupdict()
args = {name: int(param) for name, param in parts.items() if param}
return datetime.timedelta(**args)
+
+
+def is_hex(s: str, strict: bool = True) -> bool:
+ """Simple check that a string contains only hex chars"""
+ try:
+ int(s, 16)
+ except ValueError:
+ return False
+
+ # s is multiple chars, but we should catch a '+/-' prefix too.
+ if strict:
+ if s[0] not in string.hexdigits:
+ return False
+
+ return True