]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/progress: Add test_progress.py to tox 32985/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Thu, 30 Jan 2020 16:23:24 +0000 (17:23 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 30 Jan 2020 16:23:24 +0000 (17:23 +0100)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/progress/__init__.py
src/pybind/mgr/progress/test_progress.py [new file with mode: 0644]
src/pybind/mgr/tox.ini
src/test/mgr/test_progress.py [deleted file]

index fcbdd8fbdc25d75a24c97ff7e34a01c931924fcc..e3379608af763aaea15df615d9ee398eb9b7d7ed 100644 (file)
@@ -2,10 +2,6 @@ import os
 if 'UNITTEST' not in os.environ:
     from .module import *
 else:
-    import sys
-    import mock
-    sys.modules['ceph_module'] = mock.Mock()
-    sys.modules['rados'] = mock.Mock()
-    from .module import *
-   
+    import tests
+
 
diff --git a/src/pybind/mgr/progress/test_progress.py b/src/pybind/mgr/progress/test_progress.py
new file mode 100644 (file)
index 0000000..db2013d
--- /dev/null
@@ -0,0 +1,204 @@
+#python unit test
+import unittest
+import os
+import sys
+from tests import mock
+
+import pytest
+import json
+os.environ['UNITTEST'] = "1"
+sys.path.insert(0, "../../pybind/mgr")
+from progress import module
+
+class TestPgRecoveryEvent(object):
+    # Testing PgRecoveryEvent class
+
+    def setup(self):
+        # Creating the class and Mocking 
+        # a bunch of attributes for testing
+        module._module = mock.Mock() # just so Event._refresh() works
+        self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30)
+
+    def test_pg_update(self):
+        # Test for a completed event when the pg states show active+clear
+        pg_dump = {
+                "pg_stats":[
+        {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.0",
+          "reported_epoch": "30"
+        },
+       {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.1",
+          "reported_epoch": "30"
+        },
+       {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.2",
+          "reported_epoch": "30"
+        }
+        ]
+        }
+
+        self.test_event.pg_update(pg_dump, mock.Mock())
+        assert self.test_event._progress == 1.0
+       
+class OSDMap: 
+    
+    # This is an artificial class to help
+    # _osd_in_out function have all the 
+    # necessary characteristics, some
+    # of the funcitons are copied from
+    # mgr_module
+
+    def __init__(self, dump, pg_stats):
+        self._dump = dump
+        self._pg_stats = pg_stats
+        
+    def _pg_to_up_acting_osds(self, pool_id, ps):
+        pg_id = str(pool_id) + "." + str(ps)
+        for pg in self._pg_stats["pg_stats"]:
+            if pg["pg_id"] == pg_id:
+                ret = {
+                        "up_primary": pg["up_primary"],
+                        "acting_primary": pg["acting_primary"],
+                        "up": pg["up"],
+                        "acting": pg["acting"]
+                        }
+                return ret
+
+    def dump(self):
+        return self._dump
+
+    def get_pools(self):
+        d = self._dump()
+        return dict([(p['pool'], p) for p in d['pools']])
+
+    def get_pools_by_name(self):
+        d = self._dump()
+        return dict([(p['pool_name'], p) for p in d['pools']])
+
+    def pg_to_up_acting_osds(self, pool_id, ps):
+        return self._pg_to_up_acting_osds(pool_id, ps)
+
+class TestModule(object):
+    # Testing Module Class
+    
+    def setup(self):
+        # Creating the class and Mocking a
+        # bunch of attributes for testing
+
+        module.PgRecoveryEvent.pg_update = mock.Mock()
+        self.test_module = module.Module() # so we can see if an event gets created
+        self.test_module.log = mock.Mock() # we don't need to log anything
+        self.test_module.get = mock.Mock() # so we can call pg_update
+        self.test_module._complete = mock.Mock() # we want just to see if this event gets called
+        self.test_module.get_osdmap = mock.Mock() # so that self.get_osdmap().get_epoch() works
+        module._module = mock.Mock() # so that Event.refresh() works
+
+    def test_osd_in_out(self):
+        # test for the correct event being
+        # triggered and completed.
+
+        old_pg_stats = {
+            "pg_stats":[
+                {
+                "pg_id": "1.0",
+                "up_primary": 3,
+                "acting_primary": 3,
+                "up": [
+                    3,
+                    0
+                    ],
+                "acting": [
+                    3,
+                    0
+                    ]
+        
+                },
+
+                ]
+            }
+        new_pg_stats = {
+            "pg_stats":[
+                {
+              "pg_id": "1.0",
+              "up_primary": 0,
+              "acting_primary": 0,
+              "up": [
+                0,
+                2
+              ],
+              "acting": [
+                0,
+                2
+              ]
+            },
+                ]
+            }
+
+        old_dump ={ 
+            "pools": [
+                {
+                    "pool": 1,
+                    "pg_num": 1
+                    }
+                ]
+            }
+
+        new_dump = {
+                "pools": [
+                    {
+                        "pool": 1,
+                        "pg_num": 1
+                        }
+                    ]
+                }
+
+        new_map = OSDMap(new_dump, new_pg_stats)
+        old_map = OSDMap(old_dump, old_pg_stats)
+        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
+        # check if only one event is created
+        assert len(self.test_module._events) == 1
+        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
+        # check if complete function is called
+        assert self.test_module._complete.call_count == 1
+        # check if a PgRecovery Event was created and pg_update gets triggered
+        assert module.PgRecoveryEvent.pg_update.call_count == 2
index 81ae3752e795edb217f09cced7044bccbefb972d..e6e3441e270c84fc917ab1577f3f2e7b90782e99 100644 (file)
@@ -5,7 +5,7 @@ skipsdist = true
 [testenv]
 setenv = UNITTEST = true
 deps = -r requirements.txt
-commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/}
+commands = pytest -v --cov --cov-append --cov-report=term --doctest-modules {posargs:mgr_util.py tests/ cephadm/ ansible/ progress/}
 
 [testenv:mypy]
 basepython = python3
diff --git a/src/test/mgr/test_progress.py b/src/test/mgr/test_progress.py
deleted file mode 100644 (file)
index 80f51a1..0000000
+++ /dev/null
@@ -1,203 +0,0 @@
-#python unit test
-import unittest
-import os
-import sys
-from mock import Mock
-import pytest
-import json
-os.environ['UNITTEST'] = "1"
-sys.path.insert(0, "../../pybind/mgr")
-from progress import module
-
-class TestPgRecoveryEvent(object):
-    # Testing PgRecoveryEvent class
-
-    def setup(self):
-        # Creating the class and Mocking 
-        # a bunch of attributes for testing
-        module._module = Mock() # just so Event._refresh() works
-        self.test_event = module.PgRecoveryEvent(None, None, [module.PgId(1,i) for i in range(3)], [0], 30)
-
-    def test_pg_update(self):
-        # Test for a completed event when the pg states show active+clear
-        pg_dump = {
-                "pg_stats":[
-        {
-          "state": "active+clean",
-          "stat_sum": {
-            "num_bytes": 10,
-            "num_bytes_recovered": 10
-          },
-          "up": [
-            3,
-            1
-          ],
-          "acting": [
-            3,
-            1
-          ],
-          "pgid": "1.0",
-          "reported_epoch": "30"
-        },
-       {
-          "state": "active+clean",
-          "stat_sum": {
-            "num_bytes": 10,
-            "num_bytes_recovered": 10
-          },
-          "up": [
-            3,
-            1
-          ],
-          "acting": [
-            3,
-            1
-          ],
-          "pgid": "1.1",
-          "reported_epoch": "30"
-        },
-       {
-          "state": "active+clean",
-          "stat_sum": {
-            "num_bytes": 10,
-            "num_bytes_recovered": 10
-          },
-          "up": [
-            3,
-            1
-          ],
-          "acting": [
-            3,
-            1
-          ],
-          "pgid": "1.2",
-          "reported_epoch": "30"
-        }
-        ]
-        }
-
-        self.test_event.pg_update(pg_dump, Mock())
-        assert self.test_event._progress == 1.0
-       
-class OSDMap: 
-    
-    # This is an artificial class to help
-    # _osd_in_out function have all the 
-    # necessary characteristics, some
-    # of the funcitons are copied from
-    # mgr_module
-
-    def __init__(self, dump, pg_stats):
-        self._dump = dump
-        self._pg_stats = pg_stats
-        
-    def _pg_to_up_acting_osds(self, pool_id, ps):
-        pg_id = str(pool_id) + "." + str(ps)
-        for pg in self._pg_stats["pg_stats"]:
-            if pg["pg_id"] == pg_id:
-                ret = {
-                        "up_primary": pg["up_primary"],
-                        "acting_primary": pg["acting_primary"],
-                        "up": pg["up"],
-                        "acting": pg["acting"]
-                        }
-                return ret
-
-    def dump(self):
-        return self._dump
-
-    def get_pools(self):
-        d = self._dump()
-        return dict([(p['pool'], p) for p in d['pools']])
-
-    def get_pools_by_name(self):
-        d = self._dump()
-        return dict([(p['pool_name'], p) for p in d['pools']])
-
-    def pg_to_up_acting_osds(self, pool_id, ps):
-        return self._pg_to_up_acting_osds(pool_id, ps)
-
-class TestModule(object):
-    # Testing Module Class
-    
-    def setup(self):
-        # Creating the class and Mocking a
-        # bunch of attributes for testing
-
-        module.PgRecoveryEvent.pg_update = Mock()
-        self.test_module = module.Module() # so we can see if an event gets created
-        self.test_module.log = Mock() # we don't need to log anything
-        self.test_module.get = Mock() # so we can call pg_update
-        self.test_module._complete = Mock() # we want just to see if this event gets called
-        self.test_module.get_osdmap = Mock() # so that self.get_osdmap().get_epoch() works
-        module._module = Mock() # so that Event.refresh() works
-
-    def test_osd_in_out(self):
-        # test for the correct event being
-        # triggered and completed.
-
-        old_pg_stats = {
-            "pg_stats":[
-                {
-                "pg_id": "1.0",
-                "up_primary": 3,
-                "acting_primary": 3,
-                "up": [
-                    3,
-                    0
-                    ],
-                "acting": [
-                    3,
-                    0
-                    ]
-        
-                },
-
-                ]
-            }
-        new_pg_stats = {
-            "pg_stats":[
-                {
-              "pg_id": "1.0",
-              "up_primary": 0,
-              "acting_primary": 0,
-              "up": [
-                0,
-                2
-              ],
-              "acting": [
-                0,
-                2
-              ]
-            },
-                ]
-            }
-
-        old_dump ={ 
-            "pools": [
-                {
-                    "pool": 1,
-                    "pg_num": 1
-                    }
-                ]
-            }
-
-        new_dump = {
-                "pools": [
-                    {
-                        "pool": 1,
-                        "pg_num": 1
-                        }
-                    ]
-                }
-
-        new_map = OSDMap(new_dump, new_pg_stats)
-        old_map = OSDMap(old_dump, old_pg_stats)
-        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "out")
-        # check if only one event is created
-        assert len(self.test_module._events) == 1
-        self.test_module._osd_in_out(old_map, old_dump, new_map, 3, "in")
-        # check if complete function is called
-        assert self.test_module._complete.call_count == 1
-        # check if a PgRecovery Event was created and pg_update gets triggered
-        assert module.PgRecoveryEvent.pg_update.call_count == 2