]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: Add tests for rescan function 46420/head
authorPaul Cuzner <pcuzner@redhat.com>
Mon, 27 Jun 2022 06:41:27 +0000 (18:41 +1200)
committerPaul Cuzner <pcuzner@redhat.com>
Mon, 25 Jul 2022 02:53:24 +0000 (14:53 +1200)
Adds unittets for the rescan code

Signed-off-by: Paul Cuzner <pcuzner@redhat.com>
src/cephadm/tests/test_cephadm.py

index 95de7facdd55dd6e6aeb9e488e2058fa67d3609a..e559fb1b734bd9b3e37c2ae60987d1019fefaefb 100644 (file)
@@ -17,6 +17,8 @@ from .fixtures import (
     mock_bad_firewalld,
 )
 
+from pyfakefs import fake_filesystem_unittest
+
 with mock.patch('builtins.open', create=True):
     from importlib.machinery import SourceFileLoader
     cd = SourceFileLoader('cephadm', 'cephadm').load_module()
@@ -2435,3 +2437,35 @@ class TestJaeger:
             with open(f'/var/lib/ceph/{fsid}/jaeger-agent.daemon_id/unit.run', 'r') as f:
                 run_cmd = f.readlines()[-1].rstrip()
                 assert run_cmd.endswith('quay.io/jaegertracing/jaeger-agent:1.29 --reporter.grpc.host-port=test:14250 --processor.jaeger-compact.server-host-port=6799')
+
+class TestRescan(fake_filesystem_unittest.TestCase):
+
+    def setUp(self):
+        self.setUpPyfakefs()
+        self.fs.create_dir('/sys/class')
+        self.ctx = cd.CephadmContext()
+        self.ctx.func = cd.command_rescan_disks
+
+    def test_no_hbas(self):
+        out = cd.command_rescan_disks(self.ctx)
+        assert out == 'Ok. No compatible HBAs found'
+
+    def test_success(self):
+        self.fs.create_file('/sys/class/scsi_host/host0/scan')
+        self.fs.create_file('/sys/class/scsi_host/host1/scan')
+        out = cd.command_rescan_disks(self.ctx)
+        assert out.startswith('Ok. 2 adapters detected: 2 rescanned, 0 skipped, 0 failed')
+
+    def test_skip_usb_adapter(self):
+        self.fs.create_file('/sys/class/scsi_host/host0/scan')
+        self.fs.create_file('/sys/class/scsi_host/host1/scan')
+        self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='usb-storage')
+        out = cd.command_rescan_disks(self.ctx)
+        assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed')
+
+    def test_skip_unknown_adapter(self):
+        self.fs.create_file('/sys/class/scsi_host/host0/scan')
+        self.fs.create_file('/sys/class/scsi_host/host1/scan')
+        self.fs.create_file('/sys/class/scsi_host/host1/proc_name', contents='unknown')
+        out = cd.command_rescan_disks(self.ctx)
+        assert out.startswith('Ok. 2 adapters detected: 1 rescanned, 1 skipped, 0 failed')