# type: (str, int) -> None
if not args.skip_ping_check:
logger.info('Verifying IP %s port %d ...' % (ip, port))
- if ip.startswith('[') or '::' in ip:
+ if is_ipv6(ip):
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- if ip.startswith('[') and ip.endswith(']'):
- ip = ip[1:-1]
+ ip = unwrap_ipv6(ip)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
##################################
+
def unwrap_ipv6(address):
# type: (str) -> str
if address.startswith('[') and address.endswith(']'):
return address
+def wrap_ipv6(address):
+ # type: (str) -> str
+
+ # We cannot assume it's already wrapped or even an IPv6 address if
+ # it's already wrapped it'll not pass (like if it's a hostname) and trigger
+ # the ValueError
+ try:
+ if ipaddress.ip_address(unicode(address)).version == 6:
+ return f"[{address}]"
+ except ValueError:
+ pass
+
+ return address
+
+
def is_ipv6(address):
# type: (str) -> bool
address = unwrap_ipv6(address)
base_ip = ''
if args.mon_ip:
ipv6 = is_ipv6(args.mon_ip)
+ if ipv6:
+ args.mon_ip = wrap_ipv6(args.mon_ip)
hasport = r.findall(args.mon_ip)
if hasport:
port = int(hasport[0])
for address, expected in tests:
unwrap_test(address, expected)
+ def test_wrap_ipv6(self):
+ def wrap_test(address, expected):
+ assert cd.wrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '[::1]'), ('[::1]', '[::1]'),
+ ('fde4:8dba:82e1:0:5054:ff:fe6a:357',
+ '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'),
+ ('myhost.example.com', 'myhost.example.com'),
+ ('192.168.0.1', '192.168.0.1'),
+ ('', ''), ('fd00::1::1', 'fd00::1::1')]
+ for address, expected in tests:
+ wrap_test(address, expected)
+
@mock.patch('cephadm.call_throws')
@mock.patch('cephadm.get_parm')
def test_registry_login(self, get_parm, call_throws):
from mgr_module import HandleCommandResult, MonCommandFailed
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6
from orchestrator import OrchestratorError, DaemonDescription
from cephadm import utils
extra_config += 'public network = %s\n' % network
elif network.startswith('[v') and network.endswith(']'):
extra_config += 'public addrv = %s\n' % network
+ elif is_ipv6(network):
+ extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
elif ':' not in network:
extra_config += 'public addr = %s\n' % network
else:
import inspect
import json
import functools
-import ipaddress
import logging
import collections
import fnmatch
import time
import threading
-import six
from six.moves import urllib
import cherrypy
except ImportError:
from urllib.parse import urljoin
+from ceph.deployment.utils import wrap_ipv6
+
from . import mgr
from .exceptions import ViewCacheNoDataException
from .settings import Settings
:type port: int
:rtype: str
"""
- try:
- try:
- u_host = six.u(host)
- except TypeError:
- u_host = host
-
- ipaddress.IPv6Address(u_host)
- netloc = '[{}]'.format(host)
- except ValueError:
- netloc = host
+ netloc = wrap_ipv6(host)
if port:
netloc += ':{}'.format(port)
pr = urllib.parse.ParseResult(
import yaml
from ceph.deployment.hostspec import HostSpec
+from ceph.deployment.utils import unwrap_ipv6
class ServiceSpecValidationError(Exception):
for network in networks:
# only if we have versioned network configs
if network.startswith('v') or network.startswith('[v'):
- network = network.split(':')[1]
+ # if this is ipv6 we can't just simply split on ':' so do
+ # a split once and rsplit once to leave us with just ipv6 addr
+ network = network.split(':', 1)[1]
+ network = network.rsplit(':', 1)[0]
try:
# if subnets are defined, also verify the validity
if '/' in network:
ip_network(six.text_type(network))
else:
- ip_address(six.text_type(network))
+ ip_address(unwrap_ipv6(network))
except ValueError as e:
# logging?
raise e
--- /dev/null
+import ipaddress
+import sys
+
+if sys.version_info > (3, 0):
+ unicode = str
+
+
+def unwrap_ipv6(address):
+ # type: (str) -> str
+ if address.startswith('[') and address.endswith(']'):
+ return address[1:-1]
+ return address
+
+
+def wrap_ipv6(address):
+ # type: (str) -> str
+
+ # We cannot assume it's already wrapped or even an IPv6 address if
+ # it's already wrapped it'll not pass (like if it's a hostname) and trigger
+ # the ValueError
+ try:
+ if ipaddress.ip_address(unicode(address)).version == 6:
+ return f"[{address}]"
+ except ValueError:
+ pass
+
+ return address
+
+
+def is_ipv6(address):
+ # type: (str) -> bool
+ address = unwrap_ipv6(address)
+ try:
+ return ipaddress.ip_address(unicode(address)).version == 6
+ except ValueError:
+ return False
--- /dev/null
+from ceph.deployment.utils import is_ipv6, unwrap_ipv6, wrap_ipv6
+
+
+def test_is_ipv6():
+ for good in ("[::1]", "::1",
+ "fff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"):
+ assert is_ipv6(good)
+ for bad in ("127.0.0.1",
+ "ffff:ffff:ffff:ffff:ffff:ffff:ffff:fffg",
+ "1:2:3:4:5:6:7:8:9", "fd00::1::1", "[fg::1]"):
+ assert not is_ipv6(bad)
+
+
+def test_unwrap_ipv6():
+ def unwrap_test(address, expected):
+ assert unwrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '::1'), ('[::1]', '::1'),
+ ('[fde4:8dba:82e1:0:5054:ff:fe6a:357]', 'fde4:8dba:82e1:0:5054:ff:fe6a:357'),
+ ('can actually be any string', 'can actually be any string'),
+ ('[but needs to be stripped] ', '[but needs to be stripped] ')]
+ for address, expected in tests:
+ unwrap_test(address, expected)
+
+
+def test_wrap_ipv6():
+ def wrap_test(address, expected):
+ assert wrap_ipv6(address) == expected
+
+ tests = [
+ ('::1', '[::1]'), ('[::1]', '[::1]'),
+ ('fde4:8dba:82e1:0:5054:ff:fe6a:357', '[fde4:8dba:82e1:0:5054:ff:fe6a:357]'),
+ ('myhost.example.com', 'myhost.example.com'), ('192.168.0.1', '192.168.0.1'),
+ ('', ''), ('fd00::1::1', 'fd00::1::1')]
+ for address, expected in tests:
+ wrap_test(address, expected)