From: Paul Cuzner Date: Mon, 27 Jun 2022 06:41:27 +0000 (+1200) Subject: cephadm: Add tests for rescan function X-Git-Tag: v18.0.0~402^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=972fb0ab3c519141b2f07ac4f2d6bbb6b855441a;p=ceph.git cephadm: Add tests for rescan function Adds unittets for the rescan code Signed-off-by: Paul Cuzner --- diff --git a/src/cephadm/tests/test_cephadm.py b/src/cephadm/tests/test_cephadm.py index 95de7facdd55..e559fb1b734b 100644 --- a/src/cephadm/tests/test_cephadm.py +++ b/src/cephadm/tests/test_cephadm.py @@ -17,6 +17,8 @@ from .fixtures import ( mock_bad_firewalld, ) +from pyfakefs import fake_filesystem_unittest + with mock.patch('builtins.open', create=True): from importlib.machinery import SourceFileLoader cd = SourceFileLoader('cephadm', 'cephadm').load_module() @@ -2435,3 +2437,35 @@ class TestJaeger: with open(f'/var/lib/ceph/{fsid}/jaeger-agent.daemon_id/unit.run', 'r') as f: run_cmd = f.readlines()[-1].rstrip() assert run_cmd.endswith('quay.io/jaegertracing/jaeger-agent:1.29 --reporter.grpc.host-port=test:14250 --processor.jaeger-compact.server-host-port=6799') + +class TestRescan(fake_filesystem_unittest.TestCase): + + def setUp(self): + self.setUpPyfakefs() + self.fs.create_dir('/sys/class') + self.ctx = cd.CephadmContext() + self.ctx.func = cd.command_rescan_disks + + def test_no_hbas(self): + out = cd.command_rescan_disks(self.ctx) + assert out == 'Ok. No compatible HBAs found' + + def test_success(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 2 rescanned, 0 skipped, 0 failed') + + def test_skip_usb_adapter(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='usb-storage') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed') + + def test_skip_unknown_adapter(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='unknown') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed')