]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/progress: Created first unit test for progress module
authorKamoltat (Junior) Sirivadhna <ksirivad@redhat.com>
Wed, 26 Jun 2019 18:01:21 +0000 (14:01 -0400)
committerKamoltat (Junior) Sirivadhna <ksirivad@redhat.com>
Wed, 26 Jun 2019 18:01:21 +0000 (14:01 -0400)
Created a unit test using pytest and mock for
PgRecovery Class and Module Class. Test suite
includes the function: pg_update and _osd_in_out

Signed-off-by: Kamoltat (Junior) Sirivadhna <ksirivad@redhat.com>
src/pybind/mgr/progress/__init__.py
src/pybind/mgr/progress/module.py
src/test/mgr/test_progress.py [new file with mode: 0644]

index 0f7dab556dcd2e0b17460ace3259669c53b13b0e..fcbdd8fbdc25d75a24c97ff7e34a01c931924fcc 100644 (file)
@@ -1,2 +1,11 @@
+import os
+if 'UNITTEST' not in os.environ:
+    from .module import *
+else:
+    import sys
+    import mock
+    sys.modules['ceph_module'] = mock.Mock()
+    sys.modules['rados'] = mock.Mock()
+    from .module import *
+   
 
-from .module import *
index 46ab9e7d03914a303f01e9ce0ef907802803b790..1bfe28ff862a4651fad25166195aa8a3a7bfb7c2 100644 (file)
@@ -1,4 +1,5 @@
 from mgr_module import MgrModule
+import os
 import threading
 import datetime
 import uuid
@@ -11,6 +12,9 @@ ENCODING_VERSION = 1
 # keep a global reference to the module so we can use it from Event methods
 _module = None
 
+# if unit test we want MgrModule to be blank
+if 'UNITTEST' in os.environ:
+    MgrModule = object
 
 class Event(object):
     """
diff --git a/src/test/mgr/test_progress.py b/src/test/mgr/test_progress.py
new file mode 100644 (file)
index 0000000..2e13987
--- /dev/null
@@ -0,0 +1,212 @@
+#python unit test
+import unittest
+import os
+import sys
+from mock import Mock
+import pytest
+import json
+os.environ['UNITTEST'] = "1"
+sys.path.insert(0, "../../pybind/mgr")
+from progress import module
+
+class TestPgRecoveryEvent(object):
+    # Testing PgRecoveryEvent class
+
+    def setup(self):
+        # Creating the class and Mocking 
+        # a bunch of attributes for testing
+        self.test_class = module.PgRecoveryEvent
+        self._pgs = [module.PgId(1,i) for i in range(3)]
+        self._evacuate_osds = [0]
+        self._original_pg_count = len(self._pgs)
+        self._original_bytes_recovered = None
+        self.module_class = Mock()
+        self.module_class.info = Mock()
+        self._refresh = Mock()
+        self._progress = 0.0
+        self._message = None
+        self._start_epoch = 30
+
+    def test_pg_update(self):
+        # Test for a completed event when the pg states show active+clear
+        pg_dump = {
+                "pg_stats":[
+        {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.0",
+          "reported_epoch": "30"
+        },
+       {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.1",
+          "reported_epoch": "30"
+        },
+       {
+          "state": "active+clean",
+          "stat_sum": {
+            "num_bytes": 10,
+            "num_bytes_recovered": 10
+          },
+          "up": [
+            3,
+            1
+          ],
+          "acting": [
+            3,
+            1
+          ],
+          "pgid": "1.2",
+          "reported_epoch": "30"
+        }
+        ]
+        }
+
+        self.test_class.pg_update(self,pg_dump,self.module_class)
+        assert self._progress == 1.0
+
+class OSDMap: 
+    
+    # This is an artificial class to help
+    # _osd_in_out function have all the 
+    # necessary characteristics, some
+    # of the funcitons are copied from
+    # mgr_module
+
+    def __init__(self, dump, pg_stats):
+        self._dump = dump
+        self._pg_stats = pg_stats
+        
+    def _pg_to_up_acting_osds(self, pool_id, ps):
+        pg_id = str(pool_id) + "." + str(ps)
+        for pg in self._pg_stats["pg_stats"]:
+            if pg["pg_id"] == pg_id:
+                ret = {
+                        "up_primary": pg["up_primary"],
+                        "acting_primary": pg["acting_primary"],
+                        "up": pg["up"],
+                        "acting": pg["acting"]
+                        }
+                return ret
+
+    def dump(self):
+        return self._dump
+
+    def get_pools(self):
+        d = self._dump()
+        return dict([(p['pool'], p) for p in d['pools']])
+
+    def get_pools_by_name(self):
+        d = self._dump()
+        return dict([(p['pool_name'], p) for p in d['pools']])
+
+    def pg_to_up_acting_osds(self, pool_id, ps):
+        return self._pg_to_up_acting_osds(pool_id, ps)
+
+class TestModule(object):
+    # Testing Module Class
+    
+    def setup(self):
+        # Creating the class and Mocking
+        # attributes for testing
+        self.test_class = module
+        self.test_class._module = Mock()
+        self.test_class.PgRecoveryEvent.pg_update = Mock()
+        self.test_module = module.Module
+        self.log = Mock()
+        self.get = Mock()
+        self._complete = Mock()
+        self._events = {}
+
+    def test_osd_in_out(self):
+        # test for the correct event being
+        # triggered and completed.
+        old_pg_stats = {
+            "pg_stats":[
+                {
+                "pg_id": "1.0",
+                "up_primary": 3,
+                "acting_primary": 3,
+                "up": [
+                    3,
+                    0
+                    ],
+                "acting": [
+                    3,
+                    0
+                    ]
+        
+                },
+
+                ]
+            }
+        new_pg_stats = {
+            "pg_stats":[
+                {
+              "pg_id": "1.0",
+              "up_primary": 0,
+              "acting_primary": 0,
+              "up": [
+                0,
+                2
+              ],
+              "acting": [
+                0,
+                2
+              ]
+            },
+                ]
+            }
+
+        old_dump ={ 
+            "pools": [
+                {
+                    "pool": 1,
+                    "pg_num": 1
+                    }
+                ]
+            }
+
+        new_dump = {
+                "pools": [
+                    {
+                        "pool": 1,
+                        "pg_num": 1
+                        }
+                    ]
+                }
+
+        new_map = OSDMap(new_dump, new_pg_stats)
+        old_map = OSDMap(old_dump, old_pg_stats)
+        marked_in = "in"
+        marked_out = "out"
+        osd_id = 3
+        self.test_module._osd_in_out(self, old_map, old_dump, new_map, osd_id, marked_out)
+        self.test_module._osd_in_out(self, old_map, old_dump, new_map, osd_id, marked_in)
+        # check if complete function is called
+        self._complete.assert_called_once() 
+        # check if a PgRecovery Event was created and pg_update gets triggered
+        self.test_class.PgRecoveryEvent.pg_update.asset_called_once()