]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/argparse: blocklist ip validation 51812/head
authorNitzan Mordechai <nmordech@redhat.com>
Tue, 7 Mar 2023 06:36:21 +0000 (06:36 +0000)
committerNitzan Mordechai <nmordech@redhat.com>
Mon, 29 May 2023 12:26:40 +0000 (12:26 +0000)
ceph_argparse ip validation needed to be match the msg_type ip validation
adding v1\v2\any leading strings and additional tests

Fixes: https://tracker.ceph.com/issues/58884
Signed-off-by: Nitzan Mordechai <nmordech@redhat.com>
(cherry picked from commit c95106423568a660acbe08361d9df908fe188c2d)

src/pybind/ceph_argparse.py
src/test/pybind/test_ceph_argparse.py

index a8dd2451fcc3cd35854cd3536343329ad9f23dd5..c544378c33cb619852a908a1069e5a8d1bf403ad 100644 (file)
@@ -386,10 +386,21 @@ class CephIPAddr(CephArgtype):
     def valid(self, s, partial=False):
         # parse off port, use socket to validate addr
         type = 6
-        if s.startswith('['):
+        p: Optional[str] = None
+        if s.startswith('v1:'):
+            s = s[3:]
+            type = 4
+        elif s.startswith('v2:'):
+            s = s[3:]
+            type = 6
+        elif s.startswith('any:'):
+            s = s[4:]
+            type = 4
+        elif s.startswith('['):
             type = 6
         elif s.find('.') != -1:
             type = 4
+
         if type == 4:
             port = s.find(':')
             if port != -1:
@@ -441,6 +452,9 @@ class CephEntityAddr(CephIPAddr):
         nonce = None
         if '/' in s:
             ip, nonce = s.split('/')
+            if nonce.endswith(']'):
+                nonce = nonce[:-1]
+                ip += ']'
         else:
             ip = s
         super(self.__class__, self).valid(ip)
index 6b0d065dbc65fa48eca46c2822e94b639e63d90e..369acb0968faec54bd5c05e8b1068a54ceb90a5d 100755 (executable)
@@ -32,8 +32,11 @@ except ImportError:
 
 
 def get_command_descriptions(what):
+    print ("get_command_descriptions --" + what)
     CEPH_BIN = os.environ.get('CEPH_BIN', ".")
-    return os.popen(CEPH_BIN + "/get_command_descriptions " + "--" + what).read()
+    with os.popen(CEPH_BIN + "/get_command_descriptions " + "--" + what) as output_file:
+        output_contents = output_file.read()
+    return output_contents
 
 
 class ParseJsonFuncsigs(unittest.TestCase):
@@ -45,7 +48,6 @@ class ParseJsonFuncsigs(unittest.TestCase):
         commands = get_command_descriptions("pull585")
         self.assertRaises(TypeError, parse_json_funcsigs, commands, 'cli')
 
-
 sigdict = parse_json_funcsigs(get_command_descriptions("all"), 'cli')
 
 
@@ -910,6 +912,20 @@ class TestOSD(TestArgparse):
                                         '1.2.3.4/567', '600.40'])
             self._assert_valid_command(['osd', 'blocklist', action,
                                         '1.2.3.4', '600.40'])
+            
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        'v1:1.2.3.4', '600.40'])
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        'v1:1.2.3.4/0', '600.40'])
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        'v2:2001:0db8:85a3:0000:0000:8a2e:0370:7334', '600.40'])
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        'v2:fe80::1/0', '600.40'])
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        'v2:[2607:f298:4:2243::5522]:0/0', '600.40'])
+            self._assert_valid_command(['osd', 'blocklist', action,
+                                        '[2001:0db8::85a3:0000:8a2e:0370:7334]:0/0', '600.40'])
+            
             self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist',
                                                             action,
                                                             'invalid',
@@ -923,6 +939,14 @@ class TestOSD(TestArgparse):
                                                             '1.2.3.4/567',
                                                             '600.40',
                                                             'toomany']))
+            self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist',
+                                                            action,
+                                                            'v2:1.2.3.4/567',
+                                                            '600.40']))
+            self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist',
+                                                            action,
+                                                            'v1:1.2.3.4:65536/567',
+                                                            '600.40']))
 
     def test_pool_mksnap(self):
         self._assert_valid_command(['osd', 'pool', 'mksnap',