@property
def port(self) -> int:
- ports = fetch_tcp_ports(self.ctx)
- if not ports:
+ endpoints = fetch_tcp_ports(self.ctx)
+ if not endpoints:
return self.DEFAULT_PORT
- return ports[0]
+ return endpoints[0].port
def get_daemon_args(self) -> List[str]:
v3_args = []
s.close()
-def port_in_use(ctx, port_num):
- # type: (CephadmContext, int) -> bool
+def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool:
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
- logger.info('Verifying port %d ...' % port_num)
+ logger.info('Verifying port %s ...' % str(endpoint))
def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
try:
s = socket.socket(af, socket.SOCK_STREAM)
- attempt_bind(ctx, s, address, port_num)
+ attempt_bind(ctx, s, address, endpoint.port)
except PortOccupiedError:
return True
except OSError as e:
else:
raise e
return False
+
+ if endpoint.ip != '0.0.0.0' and endpoint.ip != '::':
+ if is_ipv6(endpoint.ip):
+ return _port_in_use(socket.AF_INET6, endpoint.ip)
+ else:
+ return _port_in_use(socket.AF_INET, endpoint.ip)
+
return any(_port_in_use(af, address) for af, address in (
(socket.AF_INET, '0.0.0.0'),
(socket.AF_INET6, '::')
return []
-def fetch_tcp_ports(ctx: CephadmContext) -> List[int]:
- """Return a list of tcp ports, as integers, stored on the given ctx.
+def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
+ """Return a list of Endpoints, which have a port and ip attribute
"""
ports = getattr(ctx, 'tcp_ports', None)
if ports is None:
- return []
+ ports = []
if isinstance(ports, str):
- return list(map(int, ports.split()))
- return ports
+ ports = list(map(int, ports.split()))
+ port_ips: Dict[str, str] = {}
+ port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+ if isinstance(port_ips_attr, str):
+ port_ips = json.loads(port_ips_attr)
+ elif port_ips_attr is not None:
+ # if it's not None or a str, assume it's already the dict we want
+ port_ips = port_ips_attr
+
+ endpoints: List[EndPoint] = []
+ for port in ports:
+ if str(port) in port_ips:
+ endpoints.append(EndPoint(port_ips[str(port)], port))
+ else:
+ endpoints.append(EndPoint('0.0.0.0', port))
+
+ return endpoints
def get_config_and_keyring(ctx):
uid: int, gid: int, config: Optional[str] = None,
keyring: Optional[str] = None, osd_fsid: Optional[str] = None,
deployment_type: DeploymentType = DeploymentType.DEFAULT,
- ports: Optional[List[int]] = None) -> None:
+ endpoints: Optional[List[EndPoint]] = None) -> None:
- ports = ports or []
+ endpoints = endpoints or []
# only check port in use if fresh deployment since service
# we are redeploying/reconfiguring will already be using the port
if deployment_type == DeploymentType.DEFAULT:
- if any([port_in_use(ctx, port) for port in ports]):
+ if any([port_in_use(ctx, e) for e in endpoints]):
if daemon_type == 'mgr':
# non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
# tell whether that is the case here.
logger.warning(
- f"ceph-mgr TCP port(s) {','.join(map(str, ports))} already in use"
+ f"ceph-mgr TCP port(s) {','.join(map(str, endpoints))} already in use"
)
else:
- raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type))
+ raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type))
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir):
else:
if c:
deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
- c, osd_fsid=osd_fsid, ports=ports)
+ c, osd_fsid=osd_fsid, endpoints=endpoints)
else:
raise RuntimeError('attempting to deploy a daemon without a container image')
update_firewalld(ctx, daemon_type)
# Open ports explicitly required for the daemon
- if ports:
+ if endpoints:
fw = Firewalld(ctx)
- fw.open_ports(ports + fw.external_ports.get(daemon_type, []))
+ fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
fw.apply_rules()
# If this was a reconfig and the daemon is not a Ceph daemon, restart it
enable: bool = True,
start: bool = True,
osd_fsid: Optional[str] = None,
- ports: Optional[List[int]] = None,
+ endpoints: Optional[List[EndPoint]] = None,
) -> None:
# cmd
'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
})
if not meta.get('ports'):
- meta['ports'] = ports
+ if endpoints:
+ meta['ports'] = [e.port for e in endpoints]
+ else:
+ meta['ports'] = []
metaf.write(json.dumps(meta, indent=4) + '\n')
timeout = 30 if daemon_type == 'osd' else None
try:
for _ in range(1001):
- if not port_in_use(self.ctx, self.starting_port):
+ if not port_in_use(self.ctx, EndPoint('0.0.0.0', self.starting_port)):
self.listener_port = str(self.starting_port)
break
self.starting_port += 1
mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
# Note:the default port used by the Prometheus node exporter is opened in fw
ctx.meta_properties = {'service_name': 'mgr'}
- ports = [9283, 8765]
+ endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)]
if not ctx.skip_monitoring_stack:
- ports.append(8443)
+ endpoints.append(EndPoint('0.0.0.0', 8443))
deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring, ports=ports)
+ config=config, keyring=mgr_keyring, endpoints=endpoints)
# wait for the service to become available
logger.info('Waiting for mgr to start...')
migrate_sysctl_dir(ctx, ctx.fsid)
# Get and check ports explicitly required to be opened
- daemon_ports = fetch_tcp_ports(ctx)
- _dispatch_deploy(ctx, daemon_type, daemon_id, daemon_ports, deployment_type)
+ endpoints = fetch_tcp_ports(ctx)
+ _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type)
def _dispatch_deploy(
ctx: CephadmContext,
daemon_type: str,
daemon_id: str,
- daemon_ports: List[int],
+ daemon_endpoints: List[EndPoint],
deployment_type: DeploymentType,
) -> None:
if daemon_type in Ceph.daemons:
config=config, keyring=keyring,
osd_fsid=ctx.osd_fsid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type in Monitoring.components:
# monitoring daemon - prometheus, grafana, alertmanager, node-exporter
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == NFSGanesha.daemon_type:
# only check ports if this is a fresh deployment
- if deployment_type == DeploymentType.DEFAULT and not daemon_ports:
- daemon_ports = list(NFSGanesha.port_map.values())
+ if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints:
+ nfs_ports = list(NFSGanesha.port_map.values())
+ daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports]
config, keyring = get_config_and_keyring(ctx)
# TODO: extract ganesha uid/gid (997, 994) ?
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == CephIscsi.daemon_type:
config, keyring = get_config_and_keyring(ctx)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == CephNvmeof.daemon_type:
config, keyring = get_config_and_keyring(ctx)
uid, gid = 167, 167 # TODO: need to get properly the uid/gid
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type in Tracing.components:
uid, gid = 65534, 65534
c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == HAproxy.daemon_type:
haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id)
uid, gid = haproxy.extract_uid_gid_haproxy()
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == Keepalived.daemon_type:
keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id)
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == CustomContainer.daemon_type:
cc = CustomContainer.init(ctx, ctx.fsid, daemon_id)
# only check ports if this is a fresh deployment
if deployment_type == DeploymentType.DEFAULT:
- daemon_ports.extend(cc.ports)
+ daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports])
c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
privileged=cc.privileged,
ptrace=ctx.allow_ptrace)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
uid=cc.uid, gid=cc.gid, config=None,
- keyring=None, deployment_type=deployment_type,
- ports=daemon_ports)
+ keyring=None,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints)
elif daemon_type == CephadmAgent.daemon_type:
# get current user gid and uid
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
uid, gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
elif daemon_type == SNMPGateway.daemon_type:
sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
sc.uid, sc.gid,
deployment_type=deployment_type,
- ports=daemon_ports)
+ endpoints=daemon_endpoints)
else:
raise Error('daemon type {} not implemented in command_deploy function'
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['prometheus']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'prometheus')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, ports=ports)
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['grafana']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'grafana-server')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, ports=ports)
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['alertmanager']
+ endpoints = [EndPoint('0.0.0.0', p) for p in ports]
_stop_and_disable(ctx, 'prometheus-alertmanager')
make_var_run(ctx, fsid, uid, gid)
c = get_container(ctx, fsid, daemon_type, daemon_id)
deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, ports=ports)
+ deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
update_firewalld(ctx, daemon_type)
else:
call_throws(ctx, ['rm', '-rf', data_dir])
- ports: List[int] = fetch_tcp_ports(ctx)
+ endpoints = fetch_tcp_ports(ctx)
+ ports: List[int] = [e.port for e in endpoints]
if ports:
try:
fw = Firewalld(ctx)
parser_deploy.add_argument(
'--tcp-ports',
help='List of tcp ports to open in the host firewall')
+ parser_deploy.add_argument(
+ '--port-ips',
+ help='JSON dict mapping ports to IPs they need to be bound on'
+ )
parser_deploy.add_argument(
'--reconfig',
action='store_true',