import ssl
from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set
import re
import uuid
def list_networks(ctx):
- # type: (CephadmContext) -> Dict[str,Dict[str,List[str]]]
+ # type: (CephadmContext) -> Dict[str,Dict[str, Set[str]]]
# sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag,
# so we'll need to use a regex to parse 'ip' command output.
# out, _, _ = call_throws(['ip', '-j', 'route', 'ls'])
# j = json.loads(out)
# for x in j:
-
res = _list_ipv4_networks(ctx)
res.update(_list_ipv6_networks(ctx))
return res
-def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
+def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
return _parse_ipv4_route(out)
-def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, List[str]]]:
- r = {} # type: Dict[str,Dict[str,List[str]]]
+def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
+ r = {} # type: Dict[str, Dict[str, Set[str]]]
p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
for line in out.splitlines():
m = p.findall(line)
if net not in r:
r[net] = {}
if iface not in r[net]:
- r[net][iface] = []
- r[net][iface].append(ip)
+ r[net][iface] = set()
+ r[net][iface].add(ip)
return r
-def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, List[str]]]:
+def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
return _parse_ipv6_route(routes, ips)
-def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, List[str]]]:
- r = {} # type: Dict[str,Dict[str,List[str]]]
+def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
+ r = {} # type: Dict[str, Dict[str, Set[str]]]
route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
iface_p = re.compile(r'^(\d+): (\S+): (.*)$')
if net not in r:
r[net] = {}
if iface not in r[net]:
- r[net][iface] = []
+ r[net][iface] = set()
iface = None
for line in ips.splitlines():
if ipaddress.ip_address(ip) in ipaddress.ip_network(n)]
if net:
assert(iface)
- r[net[0]][iface].append(ip)
+ r[net[0]][iface].add(ip)
return r
def command_list_networks(ctx):
# type: (CephadmContext) -> None
r = list_networks(ctx)
- print(json.dumps(r, indent=4))
+
+ def serialize_sets(obj: Any) -> Any:
+ return list(obj) if isinstance(obj, set) else obj
+
+ print(json.dumps(r, indent=4, default=serialize_sets))
##################################