from ..awsauth import S3Auth
from ..settings import Settings, Options
from ..rest_client import RestClient, RequestException
-from ..tools import build_url, dict_contains_path
+from ..tools import build_url, dict_contains_path, is_valid_ip_address
from .. import mgr, logger
'summary': '',
'0': {
...
+ 'addr': '[2001:db8:85a3::8a2e:370:7334]:49774/1534999298',
'metadata': {
'frontend_config#0': 'civetweb port=7280',
}
'summary': '',
'rgw': {
...
+ 'addr': '192.168.178.3:49774/1534999298',
'metadata': {
'frontend_config#0': 'civetweb port=8000',
}
"""
service_map = mgr.get('service_map')
if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']):
- raise LookupError('No RGW found.')
+ raise LookupError('No RGW found')
daemon = None
daemons = service_map['services']['rgw']['daemons']
for key in daemons.keys():
daemon = daemons[key]
break
if daemon is None:
- raise LookupError('No RGW daemon found.')
+ raise LookupError('No RGW daemon found')
- addr = daemon['addr'].split(':')[0]
+ addr = _parse_addr(daemon['addr'])
port, ssl = _parse_frontend_config(daemon['metadata']['frontend_config#0'])
return addr, port, ssl
+def _parse_addr(value):
+ """
+ Get the IP address the RGW is running on.
+
+ >>> _parse_addr('192.168.178.3:49774/1534999298')
+ '192.168.178.3'
+
+ >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
+ '2001:db8:85a3::8a2e:370:7334'
+
+ >>> _parse_addr('xyz')
+ Traceback (most recent call last):
+ ...
+ LookupError: Failed to determine RGW address
+
+ >>> _parse_addr('192.168.178.a:8080/123456789')
+ Traceback (most recent call last):
+ ...
+ LookupError: Invalid RGW address '192.168.178.a' found
+
+ >>> _parse_addr('[2001:0db8:1234]:443/123456789')
+ Traceback (most recent call last):
+ ...
+ LookupError: Invalid RGW address '2001:0db8:1234' found
+
+ >>> _parse_addr('2001:0db8::1234:49774/1534999298')
+ Traceback (most recent call last):
+ ...
+ LookupError: Failed to determine RGW address
+
+ :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
+ :type: str
+ :raises LookupError if parsing fails to determine the IP address.
+ :return: The IP address.
+ :rtype: str
+ """
+ match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value)
+ if match:
+ # IPv4:
+ # Group 0: 192.168.178.3:49774/1534999298
+ # Group 3: 192.168.178.3
+ # IPv6:
+ # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
+ # Group 1: [
+ # Group 2: 2001:db8:85a3::8a2e:370:7334
+ addr = match.group(3) if match.group(3) else match.group(2)
+ if not is_valid_ip_address(addr):
+ raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
+ return addr
+ raise LookupError('Failed to determine RGW address')
+
+
def _parse_frontend_config(config):
"""
Get the port the RGW is running on. Due the complexity of the
self.lock.release()
+def is_valid_ip_address(addr):
+ """
+ Validate the given IPv4 or IPv6 address.
+
+ >>> is_valid_ip_address('2001:0db8::1234')
+ True
+
+ >>> is_valid_ip_address('192.168.121.1')
+ True
+
+ >>> is_valid_ip_address('1:::1')
+ False
+
+ >>> is_valid_ip_address('8.1.0')
+ False
+
+ >>> is_valid_ip_address('260.1.0.1')
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IP address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
+ return is_valid_ipv4_address(addr) or is_valid_ipv6_address(addr)
+
+
+def is_valid_ipv4_address(addr):
+ """
+ Validate the given IPv4 address.
+
+ >>> is_valid_ipv4_address('0.0.0.0')
+ True
+
+ >>> is_valid_ipv4_address('192.168.121.1')
+ True
+
+ >>> is_valid_ipv4_address('a.b.c.d')
+ False
+
+ >>> is_valid_ipv4_address('172.1.0.a')
+ False
+
+ >>> is_valid_ipv4_address('2001:0db8::1234')
+ False
+
+ >>> is_valid_ipv4_address(None)
+ False
+
+ >>> is_valid_ipv4_address(123456)
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IPv4 address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
+ try:
+ socket.inet_pton(socket.AF_INET, addr)
+ return True
+ except (socket.error, TypeError):
+ return False
+
+
def is_valid_ipv6_address(addr):
+ """
+ Validate the given IPv6 address.
+
+ >>> is_valid_ipv6_address('2001:0db8::1234')
+ True
+
+ >>> is_valid_ipv6_address('fe80::bc6c:66b0:5af8:f44')
+ True
+
+ >>> is_valid_ipv6_address('192.168.121.1')
+ False
+
+ >>> is_valid_ipv6_address('a:x::1')
+ False
+
+ >>> is_valid_ipv6_address('1200:0000:AB00:1234:O000:2552:7777:1313')
+ False
+
+ >>> is_valid_ipv6_address(None)
+ False
+
+ >>> is_valid_ipv6_address(123456)
+ False
+
+ :param addr:
+ :type addr: str
+ :return: Returns ``True`` if the IPv6 address is valid,
+ otherwise ``False``.
+ :rtype: bool
+ """
try:
socket.inet_pton(socket.AF_INET6, addr)
return True
- except socket.error:
+ except (socket.error, TypeError):
return False