From 972fb0ab3c519141b2f07ac4f2d6bbb6b855441a Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Mon, 27 Jun 2022 18:41:27 +1200 Subject: [PATCH] cephadm: Add tests for rescan function Adds unittets for the rescan code Signed-off-by: Paul Cuzner --- src/cephadm/tests/test_cephadm.py | 34 +++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/cephadm/tests/test_cephadm.py b/src/cephadm/tests/test_cephadm.py index 95de7facdd55d..e559fb1b734bd 100644 --- a/src/cephadm/tests/test_cephadm.py +++ b/src/cephadm/tests/test_cephadm.py @@ -17,6 +17,8 @@ from .fixtures import ( mock_bad_firewalld, ) +from pyfakefs import fake_filesystem_unittest + with mock.patch('builtins.open', create=True): from importlib.machinery import SourceFileLoader cd = SourceFileLoader('cephadm', 'cephadm').load_module() @@ -2435,3 +2437,35 @@ class TestJaeger: with open(f'/var/lib/ceph/{fsid}/jaeger-agent.daemon_id/unit.run', 'r') as f: run_cmd = f.readlines()[-1].rstrip() assert run_cmd.endswith('quay.io/jaegertracing/jaeger-agent:1.29 --reporter.grpc.host-port=test:14250 --processor.jaeger-compact.server-host-port=6799') + +class TestRescan(fake_filesystem_unittest.TestCase): + + def setUp(self): + self.setUpPyfakefs() + self.fs.create_dir('/sys/class') + self.ctx = cd.CephadmContext() + self.ctx.func = cd.command_rescan_disks + + def test_no_hbas(self): + out = cd.command_rescan_disks(self.ctx) + assert out == 'Ok. No compatible HBAs found' + + def test_success(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 2 rescanned, 0 skipped, 0 failed') + + def test_skip_usb_adapter(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='usb-storage') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed') + + def test_skip_unknown_adapter(self): + self.fs.create_file('/sys/class/scsi_host/host0/scan') + self.fs.create_file('/sys/class/scsi_host/host1/scan') + self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='unknown') + out = cd.command_rescan_disks(self.ctx) + assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed') -- 2.39.5