##################################
+class PortOccupiedError(Error):
+ pass
+
def attempt_bind(ctx, s, address, port):
# type: (CephadmContext, socket.socket, str, int) -> None
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((address, port))
except (socket.error, OSError) as e: # py2 and py3
- msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e)
- logger.warning(msg)
if e.errno == errno.EADDRINUSE:
- raise OSError(msg)
- elif e.errno == errno.EADDRNOTAVAIL:
- pass
+ msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e)
+ logger.warning(msg)
+ raise PortOccupiedError(msg)
+ else:
+ raise e
finally:
s.close()
# type: (CephadmContext, int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
logger.info('Verifying port %d ...' % port_num)
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- attempt_bind(ctx, s, '0.0.0.0', port_num)
-
- s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- attempt_bind(ctx, s, '::', port_num)
- except OSError:
- return True
- else:
+ def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
+ try:
+ s = socket.socket(af, socket.SOCK_STREAM)
+ attempt_bind(ctx, s, address, port_num)
+ except PortOccupiedError:
+ return True
+ except OSError as e:
+ if e.errno in (errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL):
+ # Ignore EAFNOSUPPORT and EADDRNOTAVAIL as two interfaces are
+ # being tested here and one might be intentionally be disabled.
+ # In that case no error should be raised.
+ return False
+ else:
+ raise e
return False
+ return any(_port_in_use(af, address) for af, address in (
+ (socket.AF_INET, '0.0.0.0'),
+ (socket.AF_INET6, '::')
+ ))
def check_ip_port(ctx, ip, port):
ip = unwrap_ipv6(ip)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- attempt_bind(ctx, s, ip, port)
- except OSError as e:
- raise Error(e)
+ attempt_bind(ctx, s, ip, port)
##################################
# type: ignore
from typing import List, Optional
import mock
-from mock import patch
+from mock import patch, call
import os
import sys
import unittest
import threading
import time
+import errno
+import socket
from http.server import HTTPServer
from urllib.request import Request, urlopen
from urllib.error import HTTPError
cd = SourceFileLoader('cephadm', 'cephadm').load_module()
class TestCephAdm(object):
+
+ @mock.patch('cephadm.logger')
+ def test_attempt_bind(self, logger):
+ ctx = None
+ address = None
+ port = 0
+
+ def os_error(errno):
+ _os_error = OSError()
+ _os_error.errno = errno
+ return _os_error
+
+ for side_effect, expected_exception in (
+ (os_error(errno.EADDRINUSE), cd.PortOccupiedError),
+ (os_error(errno.EAFNOSUPPORT), OSError),
+ (os_error(errno.EADDRNOTAVAIL), OSError),
+ (None, None),
+ ):
+ _socket = mock.Mock()
+ _socket.bind.side_effect = side_effect
+ try:
+ cd.attempt_bind(ctx, _socket, address, port)
+ except Exception as e:
+ assert isinstance(e, expected_exception)
+ else:
+ if expected_exception is not None:
+ assert False
+
+ @mock.patch('cephadm.attempt_bind')
+ @mock.patch('cephadm.logger')
+ def test_port_in_use(self, logger, attempt_bind):
+ empty_ctx = None
+
+ assert cd.port_in_use(empty_ctx, 9100) == False
+
+ attempt_bind.side_effect = cd.PortOccupiedError('msg')
+ assert cd.port_in_use(empty_ctx, 9100) == True
+
+ os_error = OSError()
+ os_error.errno = errno.EADDRNOTAVAIL
+ attempt_bind.side_effect = os_error
+ assert cd.port_in_use(empty_ctx, 9100) == False
+
+ os_error = OSError()
+ os_error.errno = errno.EAFNOSUPPORT
+ attempt_bind.side_effect = os_error
+ assert cd.port_in_use(empty_ctx, 9100) == False
+
+ @mock.patch('socket.socket')
+ @mock.patch('cephadm.logger')
+ def test_check_ip_port_success(self, logger, _socket):
+ ctx = mock.Mock()
+ ctx.skip_ping_check = False # enables executing port check with `check_ip_port`
+
+ for address, address_family in (
+ ('0.0.0.0', socket.AF_INET),
+ ('::', socket.AF_INET6),
+ ):
+ try:
+ cd.check_ip_port(ctx, address, 9100)
+ except:
+ assert False
+ else:
+ assert _socket.call_args == call(address_family, socket.SOCK_STREAM)
+
+ @mock.patch('socket.socket')
+ @mock.patch('cephadm.logger')
+ def test_check_ip_port_failure(self, logger, _socket):
+ ctx = mock.Mock()
+ ctx.skip_ping_check = False # enables executing port check with `check_ip_port`
+
+ def os_error(errno):
+ _os_error = OSError()
+ _os_error.errno = errno
+ return _os_error
+
+ for address, address_family in (
+ ('0.0.0.0', socket.AF_INET),
+ ('::', socket.AF_INET6),
+ ):
+ for side_effect, expected_exception in (
+ (os_error(errno.EADDRINUSE), cd.PortOccupiedError),
+ (os_error(errno.EADDRNOTAVAIL), OSError),
+ (os_error(errno.EAFNOSUPPORT), OSError),
+ (None, None),
+ ):
+ mock_socket_obj = mock.Mock()
+ mock_socket_obj.bind.side_effect = side_effect
+ _socket.return_value = mock_socket_obj
+ try:
+ cd.check_ip_port(ctx, address, 9100)
+ except Exception as e:
+ assert isinstance(e, expected_exception)
+ else:
+ if side_effect is not None:
+ assert False
+
+
def test_is_not_fsid(self):
assert not cd.is_fsid('no-uuid')