From 97fcefb56fe201788cc45c211192170ebde8ca37 Mon Sep 17 00:00:00 2001 From: Adam King Date: Fri, 11 Aug 2023 13:35:10 -0400 Subject: [PATCH] cephadm: support for specifying IP a port will be bound to on This is mostly for checking for port conflicts. Currently, we just check if the port is bound to on any IP on the host. This mechanism should allow certain daemon types to specify a port -> IP mapping that will be passed to the cephadm binary. That mapping will then be used by cephadm to only check for the port being bound to on that specific IP rather than any IP on the host. The end result is we could have daemons bound to the same port on different IPs on the same node. It's expected that daemon types will set this up as part of their prepare_create or generate_config functions where they may have more info about the specific IPs and ports they need. Signed-off-by: Adam King (cherry picked from commit 178cbdab33f0eed2d0916524162b2533653865fd) Conflicts: src/cephadm/cephadm.py --- src/cephadm/cephadm.py | 130 +++++++++++------- src/cephadm/tests/test_agent.py | 4 +- src/cephadm/tests/test_cephadm.py | 28 +++- src/pybind/mgr/cephadm/serve.py | 4 + .../mgr/cephadm/services/cephadmservice.py | 6 + 5 files changed, 118 insertions(+), 54 deletions(-) diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index e35b8a44bd1..303408757c4 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -481,10 +481,10 @@ class SNMPGateway: @property def port(self) -> int: - ports = fetch_tcp_ports(self.ctx) - if not ports: + endpoints = fetch_tcp_ports(self.ctx) + if not endpoints: return self.DEFAULT_PORT - return ports[0] + return endpoints[0].port def get_daemon_args(self) -> List[str]: v3_args = [] @@ -1593,15 +1593,14 @@ def attempt_bind(ctx, s, address, port): s.close() -def port_in_use(ctx, port_num): - # type: (CephadmContext, int) -> bool +def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool: """Detect whether a port is in use on the local machine - IPv4 and IPv6""" - logger.info('Verifying port %d ...' % port_num) + logger.info('Verifying port %s ...' % str(endpoint)) def _port_in_use(af: socket.AddressFamily, address: str) -> bool: try: s = socket.socket(af, socket.SOCK_STREAM) - attempt_bind(ctx, s, address, port_num) + attempt_bind(ctx, s, address, endpoint.port) except PortOccupiedError: return True except OSError as e: @@ -1613,6 +1612,13 @@ def port_in_use(ctx, port_num): else: raise e return False + + if endpoint.ip != '0.0.0.0' and endpoint.ip != '::': + if is_ipv6(endpoint.ip): + return _port_in_use(socket.AF_INET6, endpoint.ip) + else: + return _port_in_use(socket.AF_INET, endpoint.ip) + return any(_port_in_use(af, address) for af, address in ( (socket.AF_INET, '0.0.0.0'), (socket.AF_INET6, '::') @@ -3226,15 +3232,30 @@ def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]: return [] -def fetch_tcp_ports(ctx: CephadmContext) -> List[int]: - """Return a list of tcp ports, as integers, stored on the given ctx. +def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]: + """Return a list of Endpoints, which have a port and ip attribute """ ports = getattr(ctx, 'tcp_ports', None) if ports is None: - return [] + ports = [] if isinstance(ports, str): - return list(map(int, ports.split())) - return ports + ports = list(map(int, ports.split())) + port_ips: Dict[str, str] = {} + port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None) + if isinstance(port_ips_attr, str): + port_ips = json.loads(port_ips_attr) + elif port_ips_attr is not None: + # if it's not None or a str, assume it's already the dict we want + port_ips = port_ips_attr + + endpoints: List[EndPoint] = [] + for port in ports: + if str(port) in port_ips: + endpoints.append(EndPoint(port_ips[str(port)], port)) + else: + endpoints.append(EndPoint('0.0.0.0', port)) + + return endpoints def get_config_and_keyring(ctx): @@ -3644,21 +3665,21 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str, uid: int, gid: int, config: Optional[str] = None, keyring: Optional[str] = None, osd_fsid: Optional[str] = None, deployment_type: DeploymentType = DeploymentType.DEFAULT, - ports: Optional[List[int]] = None) -> None: + endpoints: Optional[List[EndPoint]] = None) -> None: - ports = ports or [] + endpoints = endpoints or [] # only check port in use if fresh deployment since service # we are redeploying/reconfiguring will already be using the port if deployment_type == DeploymentType.DEFAULT: - if any([port_in_use(ctx, port) for port in ports]): + if any([port_in_use(ctx, e) for e in endpoints]): if daemon_type == 'mgr': # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't # tell whether that is the case here. logger.warning( - f"ceph-mgr TCP port(s) {','.join(map(str, ports))} already in use" + f"ceph-mgr TCP port(s) {','.join(map(str, endpoints))} already in use" ) else: - raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type)) + raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type)) data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir): @@ -3718,7 +3739,7 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str, else: if c: deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, - c, osd_fsid=osd_fsid, ports=ports) + c, osd_fsid=osd_fsid, endpoints=endpoints) else: raise RuntimeError('attempting to deploy a daemon without a container image') @@ -3732,9 +3753,9 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str, update_firewalld(ctx, daemon_type) # Open ports explicitly required for the daemon - if ports: + if endpoints: fw = Firewalld(ctx) - fw.open_ports(ports + fw.external_ports.get(daemon_type, [])) + fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, [])) fw.apply_rules() # If this was a reconfig and the daemon is not a Ceph daemon, restart it @@ -3810,7 +3831,7 @@ def deploy_daemon_units( enable: bool = True, start: bool = True, osd_fsid: Optional[str] = None, - ports: Optional[List[int]] = None, + endpoints: Optional[List[EndPoint]] = None, ) -> None: # cmd @@ -3896,7 +3917,10 @@ def deploy_daemon_units( 'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None, }) if not meta.get('ports'): - meta['ports'] = ports + if endpoints: + meta['ports'] = [e.port for e in endpoints] + else: + meta['ports'] = [] metaf.write(json.dumps(meta, indent=4) + '\n') timeout = 30 if daemon_type == 'osd' else None @@ -4805,7 +4829,7 @@ WantedBy=ceph-{fsid}.target try: for _ in range(1001): - if not port_in_use(self.ctx, self.starting_port): + if not port_in_use(self.ctx, EndPoint('0.0.0.0', self.starting_port)): self.listener_port = str(self.starting_port) break self.starting_port += 1 @@ -5638,11 +5662,11 @@ def create_mgr( mgr_c = get_container(ctx, fsid, 'mgr', mgr_id) # Note:the default port used by the Prometheus node exporter is opened in fw ctx.meta_properties = {'service_name': 'mgr'} - ports = [9283, 8765] + endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)] if not ctx.skip_monitoring_stack: - ports.append(8443) + endpoints.append(EndPoint('0.0.0.0', 8443)) deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid, - config=config, keyring=mgr_keyring, ports=ports) + config=config, keyring=mgr_keyring, endpoints=endpoints) # wait for the service to become available logger.info('Waiting for mgr to start...') @@ -6505,15 +6529,15 @@ def _common_deploy(ctx: CephadmContext) -> None: migrate_sysctl_dir(ctx, ctx.fsid) # Get and check ports explicitly required to be opened - daemon_ports = fetch_tcp_ports(ctx) - _dispatch_deploy(ctx, daemon_type, daemon_id, daemon_ports, deployment_type) + endpoints = fetch_tcp_ports(ctx) + _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type) def _dispatch_deploy( ctx: CephadmContext, daemon_type: str, daemon_id: str, - daemon_ports: List[int], + daemon_endpoints: List[EndPoint], deployment_type: DeploymentType, ) -> None: if daemon_type in Ceph.daemons: @@ -6538,7 +6562,7 @@ def _dispatch_deploy( config=config, keyring=keyring, osd_fsid=ctx.osd_fsid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type in Monitoring.components: # monitoring daemon - prometheus, grafana, alertmanager, node-exporter @@ -6560,12 +6584,13 @@ def _dispatch_deploy( c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == NFSGanesha.daemon_type: # only check ports if this is a fresh deployment - if deployment_type == DeploymentType.DEFAULT and not daemon_ports: - daemon_ports = list(NFSGanesha.port_map.values()) + if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints: + nfs_ports = list(NFSGanesha.port_map.values()) + daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports] config, keyring = get_config_and_keyring(ctx) # TODO: extract ganesha uid/gid (997, 994) ? @@ -6574,7 +6599,7 @@ def _dispatch_deploy( deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == CephIscsi.daemon_type: config, keyring = get_config_and_keyring(ctx) @@ -6583,7 +6608,7 @@ def _dispatch_deploy( deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == CephNvmeof.daemon_type: config, keyring = get_config_and_keyring(ctx) uid, gid = 167, 167 # TODO: need to get properly the uid/gid @@ -6591,20 +6616,20 @@ def _dispatch_deploy( deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, config=config, keyring=keyring, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type in Tracing.components: uid, gid = 65534, 65534 c = get_container(ctx, ctx.fsid, daemon_type, daemon_id) deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == HAproxy.daemon_type: haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id) uid, gid = haproxy.extract_uid_gid_haproxy() c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == Keepalived.daemon_type: keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id) @@ -6612,20 +6637,21 @@ def _dispatch_deploy( c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == CustomContainer.daemon_type: cc = CustomContainer.init(ctx, ctx.fsid, daemon_id) # only check ports if this is a fresh deployment if deployment_type == DeploymentType.DEFAULT: - daemon_ports.extend(cc.ports) + daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports]) c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id, privileged=cc.privileged, ptrace=ctx.allow_ptrace) deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid=cc.uid, gid=cc.gid, config=None, - keyring=None, deployment_type=deployment_type, - ports=daemon_ports) + keyring=None, + deployment_type=deployment_type, + endpoints=daemon_endpoints) elif daemon_type == CephadmAgent.daemon_type: # get current user gid and uid @@ -6634,7 +6660,7 @@ def _dispatch_deploy( deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None, uid, gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) elif daemon_type == SNMPGateway.daemon_type: sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id) @@ -6642,7 +6668,7 @@ def _dispatch_deploy( deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, sc.uid, sc.gid, deployment_type=deployment_type, - ports=daemon_ports) + endpoints=daemon_endpoints) else: raise Error('daemon type {} not implemented in command_deploy function' @@ -7572,6 +7598,7 @@ def command_adopt_prometheus(ctx, daemon_id, fsid): # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['prometheus'] + endpoints = [EndPoint('0.0.0.0', p) for p in ports] _stop_and_disable(ctx, 'prometheus') @@ -7594,7 +7621,7 @@ def command_adopt_prometheus(ctx, daemon_id, fsid): make_var_run(ctx, fsid, uid, gid) c = get_container(ctx, fsid, daemon_type, daemon_id) deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, ports=ports) + deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) update_firewalld(ctx, daemon_type) @@ -7606,6 +7633,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid): # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['grafana'] + endpoints = [EndPoint('0.0.0.0', p) for p in ports] _stop_and_disable(ctx, 'grafana-server') @@ -7652,7 +7680,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid): make_var_run(ctx, fsid, uid, gid) c = get_container(ctx, fsid, daemon_type, daemon_id) deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, ports=ports) + deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) update_firewalld(ctx, daemon_type) @@ -7664,6 +7692,7 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid): # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['alertmanager'] + endpoints = [EndPoint('0.0.0.0', p) for p in ports] _stop_and_disable(ctx, 'prometheus-alertmanager') @@ -7686,7 +7715,7 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid): make_var_run(ctx, fsid, uid, gid) c = get_container(ctx, fsid, daemon_type, daemon_id) deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, ports=ports) + deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) update_firewalld(ctx, daemon_type) @@ -7768,7 +7797,8 @@ def command_rm_daemon(ctx): else: call_throws(ctx, ['rm', '-rf', data_dir]) - ports: List[int] = fetch_tcp_ports(ctx) + endpoints = fetch_tcp_ports(ctx) + ports: List[int] = [e.port for e in endpoints] if ports: try: fw = Firewalld(ctx) @@ -9747,6 +9777,10 @@ def _add_deploy_parser_args( parser_deploy.add_argument( '--tcp-ports', help='List of tcp ports to open in the host firewall') + parser_deploy.add_argument( + '--port-ips', + help='JSON dict mapping ports to IPs they need to be bound on' + ) parser_deploy.add_argument( '--reconfig', action='store_true', diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index f579729405a..f9cf201e275 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -433,8 +433,8 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start, host = AGENT_ID device_enhanced_scan = False - def _fake_port_in_use(ctx, port): - if port == open_listener_port: + def _fake_port_in_use(ctx, endpoint): + if endpoint.port == open_listener_port: return False return True diff --git a/src/cephadm/tests/test_cephadm.py b/src/cephadm/tests/test_cephadm.py index ed09f91d54c..911ae564656 100644 --- a/src/cephadm/tests/test_cephadm.py +++ b/src/cephadm/tests/test_cephadm.py @@ -77,20 +77,40 @@ class TestCephAdm(object): def test_port_in_use(self, _logger, _attempt_bind): empty_ctx = None - assert _cephadm.port_in_use(empty_ctx, 9100) == False + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False _attempt_bind.side_effect = _cephadm.PortOccupiedError('msg') - assert _cephadm.port_in_use(empty_ctx, 9100) == True + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == True os_error = OSError() os_error.errno = errno.EADDRNOTAVAIL _attempt_bind.side_effect = os_error - assert _cephadm.port_in_use(empty_ctx, 9100) == False + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False os_error = OSError() os_error.errno = errno.EAFNOSUPPORT _attempt_bind.side_effect = os_error - assert _cephadm.port_in_use(empty_ctx, 9100) == False + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False + + @mock.patch('cephadm.attempt_bind') + @mock.patch('cephadm.logger') + def test_port_in_use_with_specific_ips(self, _logger, _attempt_bind): + empty_ctx = None + + def _fake_attempt_bind(ctx, s: socket.socket, addr: str, port: int) -> None: + occupied_error = _cephadm.PortOccupiedError('msg') + if addr.startswith('200'): + raise occupied_error + if addr.startswith('100'): + if port == 4567: + raise occupied_error + + _attempt_bind.side_effect = _fake_attempt_bind + + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('200.0.0.0', 9100)) == True + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('100.0.0.0', 9100)) == False + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('100.0.0.0', 4567)) == True + assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('155.0.0.0', 4567)) == False @mock.patch('socket.socket') @mock.patch('cephadm.logger') diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 6f557bafdc6..ee6542feed8 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -1244,6 +1244,7 @@ class CephadmServe: image = '' start_time = datetime_now() ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] + port_ips: Dict[str, str] = daemon_spec.port_ips if daemon_spec.port_ips else {} if daemon_spec.daemon_type == 'container': spec = cast(CustomContainerSpec, @@ -1256,6 +1257,9 @@ class CephadmServe: if len(ports) > 0: daemon_params['tcp_ports'] = list(ports) + if port_ips: + daemon_params['port_ips'] = port_ips + # osd deployments needs an --osd-uuid arg if daemon_spec.daemon_type == 'osd': if not osd_uuid_map: diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index 73c15e295a4..7d7a04dad9d 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -66,6 +66,7 @@ class CephadmDaemonDeploySpec: daemon_type: Optional[str] = None, ip: Optional[str] = None, ports: Optional[List[int]] = None, + port_ips: Optional[Dict[str, str]] = None, rank: Optional[int] = None, rank_generation: Optional[int] = None, extra_container_args: Optional[ArgumentList] = None, @@ -97,6 +98,11 @@ class CephadmDaemonDeploySpec: # TCP ports used by the daemon self.ports: List[int] = ports or [] + # mapping of ports to IP addresses for ports + # we know we will only bind to on a specific IP. + # Useful for allowing multiple daemons to bind + # to the same port on different IPs on the same node + self.port_ips: Dict[str, str] = port_ips or {} self.ip: Optional[str] = ip # values to be populated during generate_config calls -- 2.39.5