write_tmp,
)
from cephadmlib.net_utils import (
+ build_addrv_params,
EndPoint,
check_ip_port,
check_subnet,
get_short_hostname,
ip_in_subnets,
is_ipv6,
+ parse_mon_addrv,
+ parse_mon_ip,
port_in_use,
unwrap_ipv6,
wrap_ipv6,
##################################
-def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]:
- """Parse mon-addrv param into a list of mon end points."""
- r = re.compile(r':(\d+)$')
- addrv_args = []
- addr_arg = addrv_arg
- if addr_arg[0] != '[' or addr_arg[-1] != ']':
- raise Error(f'--mon-addrv value {addr_arg} must use square brackets')
-
- for addr in addr_arg[1: -1].split(','):
- hasport = r.findall(addr)
- if not hasport:
- raise Error(f'--mon-addrv value {addr_arg} must include port number')
- port_str = hasport[0]
- addr = re.sub(r'^v\d+:', '', addr) # strip off v1: or v2: prefix
- base_ip = addr[0:-(len(port_str)) - 1]
- addrv_args.append(EndPoint(base_ip, int(port_str)))
-
- return addrv_args
-
-
-def parse_mon_ip(mon_ip: str) -> List[EndPoint]:
- """Parse mon-ip param into a list of mon end points."""
- r = re.compile(r':(\d+)$')
- addrv_args = []
- hasport = r.findall(mon_ip)
- if hasport:
- port_str = hasport[0]
- base_ip = mon_ip[0:-(len(port_str)) - 1]
- addrv_args.append(EndPoint(base_ip, int(port_str)))
- else:
- # No port provided: use fixed ports for ceph monitor
- addrv_args.append(EndPoint(mon_ip, 3300))
- addrv_args.append(EndPoint(mon_ip, 6789))
-
- return addrv_args
-
-
-def build_addrv_params(addrv: List[EndPoint]) -> str:
- """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]"""
- if len(addrv) > 2:
- raise Error('Detected a local mon-addrv list with more than 2 entries.')
- port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'}
- addr_arg_list: List[str] = []
- for ep in addrv:
- if ep.port in port_to_ver:
- ver = port_to_ver[ep.port]
- else:
- ver = 'v2' # default mon protocol version if port is not provided
- logger.warning(f'Using msgr2 protocol for unrecognized port {ep}')
- addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}')
-
- addr_arg = '[{0}]'.format(','.join(addr_arg_list))
- return addr_arg
-
-
def get_public_net_from_cfg(ctx: CephadmContext) -> Optional[str]:
"""Get mon public network from configuration file."""
cp = read_config(ctx.config)
import socket
import struct
-from typing import Tuple, List
+from typing import Dict, Tuple, List
from .context import CephadmContext
from .exceptions import Error, PortOccupiedError
ipv4_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET]
ipv6_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET6]
return ipv4_addresses, ipv6_addresses
+
+
+def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]:
+ """Parse mon-addrv param into a list of mon end points."""
+ r = re.compile(r':(\d+)$')
+ addrv_args = []
+ addr_arg = addrv_arg
+ if addr_arg[0] != '[' or addr_arg[-1] != ']':
+ raise Error(f'--mon-addrv value {addr_arg} must use square brackets')
+
+ for addr in addr_arg[1: -1].split(','):
+ hasport = r.findall(addr)
+ if not hasport:
+ raise Error(f'--mon-addrv value {addr_arg} must include port number')
+ port_str = hasport[0]
+ addr = re.sub(r'^v\d+:', '', addr) # strip off v1: or v2: prefix
+ base_ip = addr[0:-(len(port_str)) - 1]
+ addrv_args.append(EndPoint(base_ip, int(port_str)))
+
+ return addrv_args
+
+
+def parse_mon_ip(mon_ip: str) -> List[EndPoint]:
+ """Parse mon-ip param into a list of mon end points."""
+ r = re.compile(r':(\d+)$')
+ addrv_args = []
+ hasport = r.findall(mon_ip)
+ if hasport:
+ port_str = hasport[0]
+ base_ip = mon_ip[0:-(len(port_str)) - 1]
+ addrv_args.append(EndPoint(base_ip, int(port_str)))
+ else:
+ # No port provided: use fixed ports for ceph monitor
+ addrv_args.append(EndPoint(mon_ip, 3300))
+ addrv_args.append(EndPoint(mon_ip, 6789))
+
+ return addrv_args
+
+
+def build_addrv_params(addrv: List[EndPoint]) -> str:
+ """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]"""
+ if len(addrv) > 2:
+ raise Error('Detected a local mon-addrv list with more than 2 entries.')
+ port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'}
+ addr_arg_list: List[str] = []
+ for ep in addrv:
+ if ep.port in port_to_ver:
+ ver = port_to_ver[ep.port]
+ else:
+ ver = 'v2' # default mon protocol version if port is not provided
+ logger.warning(f'Using msgr2 protocol for unrecognized port {ep}')
+ addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}')
+
+ addr_arg = '[{0}]'.format(','.join(addr_arg_list))
+ return addr_arg