]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: support for specifying IP a port will be bound to on
authorAdam King <adking@redhat.com>
Fri, 11 Aug 2023 17:35:10 +0000 (13:35 -0400)
committerAdam King <adking@redhat.com>
Thu, 31 Aug 2023 17:36:15 +0000 (13:36 -0400)
This is mostly for checking for port conflicts.
Currently, we just check if the port is bound to
on any IP on the host. This mechanism should allow
certain daemon types to specify a port -> IP mapping
that will be passed to the cephadm binary. That
mapping will then be used by cephadm to only
check for the port being bound to on that specific
IP rather than any IP on the host. The end result
is we could have daemons bound to the same
port on different IPs on the same node.

It's expected that daemon types will set this
up as part of their prepare_create or generate_config
functions where they may have more info about the
specific IPs and ports they need.

Signed-off-by: Adam King <adking@redhat.com>
(cherry picked from commit 178cbdab33f0eed2d0916524162b2533653865fd)

Conflicts:
src/cephadm/cephadm.py

src/cephadm/cephadm.py
src/cephadm/tests/test_agent.py
src/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py

index e35b8a44bd14ac751f9023e4bb3141a30c9f138b..303408757c4a61eddc56689d1e0641ebfe84e39f 100755 (executable)
@@ -481,10 +481,10 @@ class SNMPGateway:
 
     @property
     def port(self) -> int:
-        ports = fetch_tcp_ports(self.ctx)
-        if not ports:
+        endpoints = fetch_tcp_ports(self.ctx)
+        if not endpoints:
             return self.DEFAULT_PORT
-        return ports[0]
+        return endpoints[0].port
 
     def get_daemon_args(self) -> List[str]:
         v3_args = []
@@ -1593,15 +1593,14 @@ def attempt_bind(ctx, s, address, port):
         s.close()
 
 
-def port_in_use(ctx, port_num):
-    # type: (CephadmContext, int) -> bool
+def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool:
     """Detect whether a port is in use on the local machine - IPv4 and IPv6"""
-    logger.info('Verifying port %d ...' % port_num)
+    logger.info('Verifying port %s ...' % str(endpoint))
 
     def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
         try:
             s = socket.socket(af, socket.SOCK_STREAM)
-            attempt_bind(ctx, s, address, port_num)
+            attempt_bind(ctx, s, address, endpoint.port)
         except PortOccupiedError:
             return True
         except OSError as e:
@@ -1613,6 +1612,13 @@ def port_in_use(ctx, port_num):
             else:
                 raise e
         return False
+
+    if endpoint.ip != '0.0.0.0' and endpoint.ip != '::':
+        if is_ipv6(endpoint.ip):
+            return _port_in_use(socket.AF_INET6, endpoint.ip)
+        else:
+            return _port_in_use(socket.AF_INET, endpoint.ip)
+
     return any(_port_in_use(af, address) for af, address in (
         (socket.AF_INET, '0.0.0.0'),
         (socket.AF_INET6, '::')
@@ -3226,15 +3232,30 @@ def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
     return []
 
 
-def fetch_tcp_ports(ctx: CephadmContext) -> List[int]:
-    """Return a list of tcp ports, as integers, stored on the given ctx.
+def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
+    """Return a list of Endpoints, which have a port and ip attribute
     """
     ports = getattr(ctx, 'tcp_ports', None)
     if ports is None:
-        return []
+        ports = []
     if isinstance(ports, str):
-        return list(map(int, ports.split()))
-    return ports
+        ports = list(map(int, ports.split()))
+    port_ips: Dict[str, str] = {}
+    port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+    if isinstance(port_ips_attr, str):
+        port_ips = json.loads(port_ips_attr)
+    elif port_ips_attr is not None:
+        # if it's not None or a str, assume it's already the dict we want
+        port_ips = port_ips_attr
+
+    endpoints: List[EndPoint] = []
+    for port in ports:
+        if str(port) in port_ips:
+            endpoints.append(EndPoint(port_ips[str(port)], port))
+        else:
+            endpoints.append(EndPoint('0.0.0.0', port))
+
+    return endpoints
 
 
 def get_config_and_keyring(ctx):
@@ -3644,21 +3665,21 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str,
                   uid: int, gid: int, config: Optional[str] = None,
                   keyring: Optional[str] = None, osd_fsid: Optional[str] = None,
                   deployment_type: DeploymentType = DeploymentType.DEFAULT,
-                  ports: Optional[List[int]] = None) -> None:
+                  endpoints: Optional[List[EndPoint]] = None) -> None:
 
-    ports = ports or []
+    endpoints = endpoints or []
     # only check port in use if fresh deployment since service
     # we are redeploying/reconfiguring will already be using the port
     if deployment_type == DeploymentType.DEFAULT:
-        if any([port_in_use(ctx, port) for port in ports]):
+        if any([port_in_use(ctx, e) for e in endpoints]):
             if daemon_type == 'mgr':
                 # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
                 # tell whether that is the case here.
                 logger.warning(
-                    f"ceph-mgr TCP port(s) {','.join(map(str, ports))} already in use"
+                    f"ceph-mgr TCP port(s) {','.join(map(str, endpoints))} already in use"
                 )
             else:
-                raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type))
+                raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type))
 
     data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
     if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir):
@@ -3718,7 +3739,7 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str,
         else:
             if c:
                 deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
-                                    c, osd_fsid=osd_fsid, ports=ports)
+                                    c, osd_fsid=osd_fsid, endpoints=endpoints)
             else:
                 raise RuntimeError('attempting to deploy a daemon without a container image')
 
@@ -3732,9 +3753,9 @@ def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str,
     update_firewalld(ctx, daemon_type)
 
     # Open ports explicitly required for the daemon
-    if ports:
+    if endpoints:
         fw = Firewalld(ctx)
-        fw.open_ports(ports + fw.external_ports.get(daemon_type, []))
+        fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
         fw.apply_rules()
 
     # If this was a reconfig and the daemon is not a Ceph daemon, restart it
@@ -3810,7 +3831,7 @@ def deploy_daemon_units(
     enable: bool = True,
     start: bool = True,
     osd_fsid: Optional[str] = None,
-    ports: Optional[List[int]] = None,
+    endpoints: Optional[List[EndPoint]] = None,
 ) -> None:
     # cmd
 
@@ -3896,7 +3917,10 @@ def deploy_daemon_units(
             'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
         })
         if not meta.get('ports'):
-            meta['ports'] = ports
+            if endpoints:
+                meta['ports'] = [e.port for e in endpoints]
+            else:
+                meta['ports'] = []
         metaf.write(json.dumps(meta, indent=4) + '\n')
 
     timeout = 30 if daemon_type == 'osd' else None
@@ -4805,7 +4829,7 @@ WantedBy=ceph-{fsid}.target
 
         try:
             for _ in range(1001):
-                if not port_in_use(self.ctx, self.starting_port):
+                if not port_in_use(self.ctx, EndPoint('0.0.0.0', self.starting_port)):
                     self.listener_port = str(self.starting_port)
                     break
                 self.starting_port += 1
@@ -5638,11 +5662,11 @@ def create_mgr(
     mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
     # Note:the default port used by the Prometheus node exporter is opened in fw
     ctx.meta_properties = {'service_name': 'mgr'}
-    ports = [9283, 8765]
+    endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)]
     if not ctx.skip_monitoring_stack:
-        ports.append(8443)
+        endpoints.append(EndPoint('0.0.0.0', 8443))
     deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
-                  config=config, keyring=mgr_keyring, ports=ports)
+                  config=config, keyring=mgr_keyring, endpoints=endpoints)
 
     # wait for the service to become available
     logger.info('Waiting for mgr to start...')
@@ -6505,15 +6529,15 @@ def _common_deploy(ctx: CephadmContext) -> None:
     migrate_sysctl_dir(ctx, ctx.fsid)
 
     # Get and check ports explicitly required to be opened
-    daemon_ports = fetch_tcp_ports(ctx)
-    _dispatch_deploy(ctx, daemon_type, daemon_id, daemon_ports, deployment_type)
+    endpoints = fetch_tcp_ports(ctx)
+    _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type)
 
 
 def _dispatch_deploy(
     ctx: CephadmContext,
     daemon_type: str,
     daemon_id: str,
-    daemon_ports: List[int],
+    daemon_endpoints: List[EndPoint],
     deployment_type: DeploymentType,
 ) -> None:
     if daemon_type in Ceph.daemons:
@@ -6538,7 +6562,7 @@ def _dispatch_deploy(
                       config=config, keyring=keyring,
                       osd_fsid=ctx.osd_fsid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type in Monitoring.components:
         # monitoring daemon - prometheus, grafana, alertmanager, node-exporter
@@ -6560,12 +6584,13 @@ def _dispatch_deploy(
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == NFSGanesha.daemon_type:
         # only check ports if this is a fresh deployment
-        if deployment_type == DeploymentType.DEFAULT and not daemon_ports:
-            daemon_ports = list(NFSGanesha.port_map.values())
+        if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints:
+            nfs_ports = list(NFSGanesha.port_map.values())
+            daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports]
 
         config, keyring = get_config_and_keyring(ctx)
         # TODO: extract ganesha uid/gid (997, 994) ?
@@ -6574,7 +6599,7 @@ def _dispatch_deploy(
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CephIscsi.daemon_type:
         config, keyring = get_config_and_keyring(ctx)
@@ -6583,7 +6608,7 @@ def _dispatch_deploy(
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
     elif daemon_type == CephNvmeof.daemon_type:
         config, keyring = get_config_and_keyring(ctx)
         uid, gid = 167, 167  # TODO: need to get properly the uid/gid
@@ -6591,20 +6616,20 @@ def _dispatch_deploy(
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
     elif daemon_type in Tracing.components:
         uid, gid = 65534, 65534
         c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
     elif daemon_type == HAproxy.daemon_type:
         haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id)
         uid, gid = haproxy.extract_uid_gid_haproxy()
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == Keepalived.daemon_type:
         keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id)
@@ -6612,20 +6637,21 @@ def _dispatch_deploy(
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CustomContainer.daemon_type:
         cc = CustomContainer.init(ctx, ctx.fsid, daemon_id)
         # only check ports if this is a fresh deployment
         if deployment_type == DeploymentType.DEFAULT:
-            daemon_ports.extend(cc.ports)
+            daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports])
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
                                      privileged=cc.privileged,
                                      ptrace=ctx.allow_ptrace)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
                       uid=cc.uid, gid=cc.gid, config=None,
-                      keyring=None, deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      keyring=None,
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CephadmAgent.daemon_type:
         # get current user gid and uid
@@ -6634,7 +6660,7 @@ def _dispatch_deploy(
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
                       uid, gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == SNMPGateway.daemon_type:
         sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id)
@@ -6642,7 +6668,7 @@ def _dispatch_deploy(
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
                       sc.uid, sc.gid,
                       deployment_type=deployment_type,
-                      ports=daemon_ports)
+                      endpoints=daemon_endpoints)
 
     else:
         raise Error('daemon type {} not implemented in command_deploy function'
@@ -7572,6 +7598,7 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
     # should try to set the ports we know cephadm defaults
     # to for these services in the firewall.
     ports = Monitoring.port_map['prometheus']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'prometheus')
 
@@ -7594,7 +7621,7 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
     deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
-                  deployment_type=DeploymentType.REDEPLOY, ports=ports)
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7606,6 +7633,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
     # should try to set the ports we know cephadm defaults
     # to for these services in the firewall.
     ports = Monitoring.port_map['grafana']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'grafana-server')
 
@@ -7652,7 +7680,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
     deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
-                  deployment_type=DeploymentType.REDEPLOY, ports=ports)
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7664,6 +7692,7 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
     # should try to set the ports we know cephadm defaults
     # to for these services in the firewall.
     ports = Monitoring.port_map['alertmanager']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'prometheus-alertmanager')
 
@@ -7686,7 +7715,7 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
     deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
-                  deployment_type=DeploymentType.REDEPLOY, ports=ports)
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7768,7 +7797,8 @@ def command_rm_daemon(ctx):
     else:
         call_throws(ctx, ['rm', '-rf', data_dir])
 
-    ports: List[int] = fetch_tcp_ports(ctx)
+    endpoints = fetch_tcp_ports(ctx)
+    ports: List[int] = [e.port for e in endpoints]
     if ports:
         try:
             fw = Firewalld(ctx)
@@ -9747,6 +9777,10 @@ def _add_deploy_parser_args(
     parser_deploy.add_argument(
         '--tcp-ports',
         help='List of tcp ports to open in the host firewall')
+    parser_deploy.add_argument(
+        '--port-ips',
+        help='JSON dict mapping ports to IPs they need to be bound on'
+    )
     parser_deploy.add_argument(
         '--reconfig',
         action='store_true',
index f579729405a891b87fa387590499fc81bf1a1ec0..f9cf201e27527f3e7deebbbff6dafd235e54c7d7 100644 (file)
@@ -433,8 +433,8 @@ def test_agent_run(_pull_conf_settings, _port_in_use, _gatherer_start,
     host = AGENT_ID
     device_enhanced_scan = False
 
-    def _fake_port_in_use(ctx, port):
-        if port == open_listener_port:
+    def _fake_port_in_use(ctx, endpoint):
+        if endpoint.port == open_listener_port:
             return False
         return True
 
index ed09f91d54c5de8b7301db234386264e3b199420..911ae564656a0a9501e66cc12b0142a9a4048a76 100644 (file)
@@ -77,20 +77,40 @@ class TestCephAdm(object):
     def test_port_in_use(self, _logger, _attempt_bind):
         empty_ctx = None
 
-        assert _cephadm.port_in_use(empty_ctx, 9100) == False
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False
 
         _attempt_bind.side_effect = _cephadm.PortOccupiedError('msg')
-        assert _cephadm.port_in_use(empty_ctx, 9100) == True
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == True
 
         os_error = OSError()
         os_error.errno = errno.EADDRNOTAVAIL
         _attempt_bind.side_effect = os_error
-        assert _cephadm.port_in_use(empty_ctx, 9100) == False
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False
 
         os_error = OSError()
         os_error.errno = errno.EAFNOSUPPORT
         _attempt_bind.side_effect = os_error
-        assert _cephadm.port_in_use(empty_ctx, 9100) == False
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('0.0.0.0', 9100)) == False
+
+    @mock.patch('cephadm.attempt_bind')
+    @mock.patch('cephadm.logger')
+    def test_port_in_use_with_specific_ips(self, _logger, _attempt_bind):
+        empty_ctx = None
+
+        def _fake_attempt_bind(ctx, s: socket.socket, addr: str, port: int) -> None:
+            occupied_error = _cephadm.PortOccupiedError('msg')
+            if addr.startswith('200'):
+                raise occupied_error
+            if addr.startswith('100'):
+                if port == 4567:
+                    raise occupied_error
+
+        _attempt_bind.side_effect = _fake_attempt_bind
+
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('200.0.0.0', 9100)) == True
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('100.0.0.0', 9100)) == False
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('100.0.0.0', 4567)) == True
+        assert _cephadm.port_in_use(empty_ctx, _cephadm.EndPoint('155.0.0.0', 4567)) == False
 
     @mock.patch('socket.socket')
     @mock.patch('cephadm.logger')
index 6f557bafdc6fdd3ea70adca7e0e4114600dac8cc..ee6542feed8fc6d5fa681ba4b0298b6194fe48ab 100644 (file)
@@ -1244,6 +1244,7 @@ class CephadmServe:
                 image = ''
                 start_time = datetime_now()
                 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+                port_ips: Dict[str, str] = daemon_spec.port_ips if daemon_spec.port_ips else {}
 
                 if daemon_spec.daemon_type == 'container':
                     spec = cast(CustomContainerSpec,
@@ -1256,6 +1257,9 @@ class CephadmServe:
                 if len(ports) > 0:
                     daemon_params['tcp_ports'] = list(ports)
 
+                if port_ips:
+                    daemon_params['port_ips'] = port_ips
+
                 # osd deployments needs an --osd-uuid arg
                 if daemon_spec.daemon_type == 'osd':
                     if not osd_uuid_map:
index 73c15e295a4bc14f3cb962ce3d3b73ecebc6e897..7d7a04dad9d9c3aa2ed4490abe92aeba551e4bdc 100644 (file)
@@ -66,6 +66,7 @@ class CephadmDaemonDeploySpec:
                  daemon_type: Optional[str] = None,
                  ip: Optional[str] = None,
                  ports: Optional[List[int]] = None,
+                 port_ips: Optional[Dict[str, str]] = None,
                  rank: Optional[int] = None,
                  rank_generation: Optional[int] = None,
                  extra_container_args: Optional[ArgumentList] = None,
@@ -97,6 +98,11 @@ class CephadmDaemonDeploySpec:
 
         # TCP ports used by the daemon
         self.ports: List[int] = ports or []
+        # mapping of ports to IP addresses for ports
+        # we know we will only bind to on a specific IP.
+        # Useful for allowing multiple daemons to bind
+        # to the same port on different IPs on the same node
+        self.port_ips: Dict[str, str] = port_ips or {}
         self.ip: Optional[str] = ip
 
         # values to be populated during generate_config calls