From c95106423568a660acbe08361d9df908fe188c2d Mon Sep 17 00:00:00 2001 From: Nitzan Mordechai Date: Tue, 7 Mar 2023 06:36:21 +0000 Subject: [PATCH] pybind/argparse: blocklist ip validation ceph_argparse ip validation needed to be match the msg_type ip validation adding v1\v2\any leading strings and additional tests Fixes: https://tracker.ceph.com/issues/58884 Signed-off-by: Nitzan Mordechai --- src/pybind/ceph_argparse.py | 15 +++++++++++++- src/test/pybind/test_ceph_argparse.py | 28 +++++++++++++++++++++++++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/pybind/ceph_argparse.py b/src/pybind/ceph_argparse.py index b4aace3dfb8..29b82be7986 100644 --- a/src/pybind/ceph_argparse.py +++ b/src/pybind/ceph_argparse.py @@ -386,10 +386,20 @@ class CephIPAddr(CephArgtype): # parse off port, use socket to validate addr type = 6 p: Optional[str] = None - if s.startswith('['): + if s.startswith('v1:'): + s = s[3:] + type = 4 + elif s.startswith('v2:'): + s = s[3:] + type = 6 + elif s.startswith('any:'): + s = s[4:] + type = 4 + elif s.startswith('['): type = 6 elif s.find('.') != -1: type = 4 + if type == 4: port = s.find(':') if port != -1: @@ -441,6 +451,9 @@ class CephEntityAddr(CephIPAddr): nonce = None if '/' in s: ip, nonce = s.split('/') + if nonce.endswith(']'): + nonce = nonce[:-1] + ip += ']' else: ip = s super(self.__class__, self).valid(ip) diff --git a/src/test/pybind/test_ceph_argparse.py b/src/test/pybind/test_ceph_argparse.py index 6fe56bf9810..3039223abdf 100755 --- a/src/test/pybind/test_ceph_argparse.py +++ b/src/test/pybind/test_ceph_argparse.py @@ -32,8 +32,11 @@ except ImportError: def get_command_descriptions(what): + print ("get_command_descriptions --" + what) CEPH_BIN = os.environ.get('CEPH_BIN', ".") - return os.popen(CEPH_BIN + "/get_command_descriptions " + "--" + what).read() + with os.popen(CEPH_BIN + "/get_command_descriptions " + "--" + what) as output_file: + output_contents = output_file.read() + return output_contents class ParseJsonFuncsigs(unittest.TestCase): @@ -45,7 +48,6 @@ class ParseJsonFuncsigs(unittest.TestCase): commands = get_command_descriptions("pull585") self.assertRaises(TypeError, parse_json_funcsigs, commands, 'cli') - sigdict = parse_json_funcsigs(get_command_descriptions("all"), 'cli') @@ -910,6 +912,20 @@ class TestOSD(TestArgparse): '1.2.3.4/567', '600.40']) self._assert_valid_command(['osd', 'blocklist', action, '1.2.3.4', '600.40']) + + self._assert_valid_command(['osd', 'blocklist', action, + 'v1:1.2.3.4', '600.40']) + self._assert_valid_command(['osd', 'blocklist', action, + 'v1:1.2.3.4/0', '600.40']) + self._assert_valid_command(['osd', 'blocklist', action, + 'v2:2001:0db8:85a3:0000:0000:8a2e:0370:7334', '600.40']) + self._assert_valid_command(['osd', 'blocklist', action, + 'v2:fe80::1/0', '600.40']) + self._assert_valid_command(['osd', 'blocklist', action, + 'v2:[2607:f298:4:2243::5522]:0/0', '600.40']) + self._assert_valid_command(['osd', 'blocklist', action, + '[2001:0db8::85a3:0000:8a2e:0370:7334]:0/0', '600.40']) + self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist', action, 'invalid', @@ -923,6 +939,14 @@ class TestOSD(TestArgparse): '1.2.3.4/567', '600.40', 'toomany'])) + self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist', + action, + 'v2:1.2.3.4/567', + '600.40'])) + self.assertEqual({}, validate_command(sigdict, ['osd', 'blocklist', + action, + 'v1:1.2.3.4:65536/567', + '600.40'])) def test_pool_mksnap(self): self._assert_valid_command(['osd', 'pool', 'mksnap', -- 2.39.5