LGPL-2.1 or LGPL-3.0. See file COPYING.
"""
+from abc import ABC, abstractmethod
import copy
import enum
import math
+import itertools
import json
import os
import pprint
import uuid
from collections import abc
-from typing import cast, Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union
+from typing import cast, Any, Callable, Dict, Generic, List, Optional, Sequence, Tuple, Union, \
+ Annotated, Generic, TypeVar
+
+S = TypeVar('S') # Input type
+T = TypeVar('T') # Output type
if sys.version_info >= (3, 8):
from typing import get_args, get_origin
pass
+class Converter(ABC, Generic[S, T]):
+ @abstractmethod
+ def convert(self, value: S) -> T: pass
+
+
+def _get_annotation_metadata(tp):
+ if get_origin(tp) is Annotated:
+ annotated_args = get_args(tp)
+ try:
+ return annotated_args[1]
+ except IndexError:
+ return None
+
+
class CephArgtype(object):
"""
Base class for all Ceph argument types
attrs['req'] = 'false'
if not positional:
attrs['positional'] = 'false'
+ if annotation := _get_annotation_metadata(tp):
+ if isinstance(annotation, CephArgtype):
+ return annotation.argdesc(attrs)
+
CEPH_ARG_TYPES = {
str: CephString,
int: CephInt,
@staticmethod
def cast_to(tp, v):
+ if annotation := _get_annotation_metadata(tp):
+ if isinstance(annotation, Converter):
+ return annotation.convert(v)
+
PYTHON_TYPES = (
str,
int,
return super().argdesc(attrs)
+class CephSizeBytes(CephArgtype, Converter[str, int]):
+ """
+ Size in bytes. e.g. 1024, 1KB, 100MB, etc..
+ """
+ MULTIPLES = ['', "K", "M", "G", "T", "P"]
+ UNITS = {
+ f"{prefix}{suffix}": 1024 ** mult
+ for mult, prefix in enumerate(MULTIPLES)
+ for suffix in ['', 'B', 'iB']
+ if not (prefix == '' and suffix == 'iB')
+ }
+
+ def __init__(self, units_types: set = None):
+ self.units = set(CephSizeBytes.UNITS.keys())
+ if units_types :
+ self.units &= units_types
+ self.re_exp = re.compile(f"r'^\d+({'|'.join(self.units)})?$'")
+ self.number_re_exp = re.compile(r'^\d+')
+ self.num_and_unit_re_exp = re.compile(r'(\d+)([A-Za-z]*)$')
+
+
+ def valid(self, s: str, partial: bool = False) -> None:
+ if not s:
+ raise ArgumentValid("Size string not provided.")
+
+ number_str = ''.join(itertools.takewhile(str.isdigit, s))
+ if not number_str:
+ raise ArgumentFormat("Size must start with a positive number.")
+
+ unit = s[len(number_str):]
+ if unit and not unit.isalpha():
+ raise ArgumentFormat("Invalid format. Expected format: <number>[KB|MB|GB] (e.g., 100MB, 10KB, 1000).")
+
+ if unit and unit not in self.units:
+ raise ArgumentValid(f'{unit} is not a valid size unit. Supported units: {self.units}')
+ self.val = s
+
+ def __str__(self) -> str:
+ b = '|'.join(self.units)
+ return '<sizebytes({0})>'.format(b)
+
+ def argdesc(self, attrs):
+ return super().argdesc(attrs)
+
+ @staticmethod
+ def _convert_to_bytes(size: Union[int, str], default_unit=None):
+ if isinstance(size, int):
+ number = size
+ size = str(size)
+ else:
+ num_str = ''.join(filter(str.isdigit, size))
+ number = int(num_str)
+ unit_str = ''.join(filter(str.isalpha, size))
+ if not unit_str:
+ if not default_unit:
+ raise ValueError("No size unit was provided")
+ unit_str = default_unit
+
+ if unit_str in CephSizeBytes.UNITS:
+ return number * CephSizeBytes.UNITS[unit_str]
+ raise ValueError(f"Invalid unit: {unit_str}")
+
+ def convert(self, value: str) -> int:
+ return CephSizeBytes._convert_to_bytes(value, default_unit="B")
+
+
class CephSocketpath(CephArgtype):
"""
Admin socket path; check that it's readable and S_ISSOCK
# -*- coding: utf-8 -*-
import logging
-from typing import Any, Dict, Optional
+from typing import Annotated, Any, Dict, Optional
import cherrypy
+from ceph_argparse import CephSizeBytes
from orchestrator import OrchestratorError
from .. import mgr
rbd_image_name: str,
rbd_pool: str = "rbd",
create_image: Optional[bool] = True,
- size: Optional[int] = 1024,
- rbd_image_size: Optional[int] = None,
+ size: Optional[int] = None,
+ rbd_image_size: Optional[Annotated[int, CephSizeBytes()]] = None,
trash_image: Optional[bool] = False,
block_size: int = 512,
load_balancing_group: Optional[int] = None,
description: RBD pool name
type: string
size:
- default: 1024
description: RBD image size
type: integer
trash_image:
--- /dev/null
+import enum
+import pytest
+from ceph_argparse import CephSizeBytes, CephArgtype
+
+class TestCephArgtypeArgdesc:
+
+ def test_to_argdesc_with_default(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(str, attrs, has_default=True)
+ assert result == "req=false,type=CephString"
+
+ def test_to_argdesc_without_default(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(str, attrs, has_default=False)
+ assert result == "type=CephString"
+
+ def test_to_argdesc_positional_false(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(str, attrs, positional=False)
+ assert result == "positional=false,type=CephString"
+
+ def test_to_argdesc_str(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(str, attrs)
+ assert result == "type=CephString"
+
+ def test_to_argdesc_int(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(int, attrs)
+ assert result == "type=CephInt"
+
+ def test_to_argdesc_float(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(float, attrs)
+ assert result == "type=CephFloat"
+
+ def test_to_argdesc_bool(self):
+ attrs = {}
+ result = CephArgtype.to_argdesc(bool, attrs)
+ assert result == "type=CephBool"
+
+ def test_to_argdesc_invalid_type(self):
+ attrs = {}
+ # Simulate an invalid type that isn't in CEPH_ARG_TYPES
+ with pytest.raises(ValueError):
+ CephArgtype.to_argdesc(object, attrs)
+
+ def test_to_argdesc_with_enum(self):
+ import enum
+ class MyEnum(enum.Enum):
+ A = "one"
+ B = "two"
+
+ attrs = {}
+ result = CephArgtype.to_argdesc(MyEnum, attrs)
+ assert result == "strings=one|two,type=CephChoices"
+
+
+class TestCephArgtypeCastTo:
+ def test_cast_to_with_str(self):
+ result = CephArgtype.cast_to(str, 123)
+ assert result == "123"
+
+ def test_cast_to_with_int(self):
+ result = CephArgtype.cast_to(int, "123")
+ assert result == 123
+
+ def test_cast_to_with_float(self):
+ result = CephArgtype.cast_to(float, "123.45")
+ assert result == 123.45
+
+ def test_cast_to_with_bool(self):
+ result = CephArgtype.cast_to(bool, "True")
+ assert result is True
+
+ def test_cast_to_with_enum(self):
+ class MyEnum(enum.Enum):
+ A = "one"
+ B = "two"
+
+ result = CephArgtype.cast_to(MyEnum, "one")
+ assert result == MyEnum.A
+
+ def test_cast_to_with_unknown_type(self):
+ class UnknownType:
+ pass
+
+ with pytest.raises(ValueError):
+ CephArgtype.cast_to(UnknownType, "value")
+
+ def test_cast_to_invalid_value_for_type(self):
+ with pytest.raises(ValueError):
+ CephArgtype.cast_to(int, "invalid_integer")
+
+
+class TestConvertToBytes:
+ def test_with_kb(self):
+ assert CephSizeBytes._convert_to_bytes(f"100KB") == 102400
+ assert CephSizeBytes._convert_to_bytes(f"100K") == 102400
+
+ def test_with_mb(self):
+ assert CephSizeBytes._convert_to_bytes(f"2MB") == 2 * 1024 ** 2
+ assert CephSizeBytes._convert_to_bytes(f"2M") == 2 * 1024 ** 2
+
+ def test_with_gb(self):
+ assert CephSizeBytes._convert_to_bytes(f"1GB") == 1024 ** 3
+ assert CephSizeBytes._convert_to_bytes(f"1G") == 1024 ** 3
+
+ def test_with_tb(self):
+ assert CephSizeBytes._convert_to_bytes(f"1TB") == 1024 ** 4
+ assert CephSizeBytes._convert_to_bytes(f"1T") == 1024 ** 4
+
+ def test_with_pb(self):
+ assert CephSizeBytes._convert_to_bytes(f"1PB") == 1024 ** 5
+ assert CephSizeBytes._convert_to_bytes(f"1P") == 1024 ** 5
+
+ def test_with_integer(self):
+ assert CephSizeBytes._convert_to_bytes(50, default_unit="B") == 50
+
+ def test_invalid_unit(self):
+ with pytest.raises(ValueError):
+ CephSizeBytes._convert_to_bytes("50XYZ")
+
+ def test_b(self):
+ assert CephSizeBytes._convert_to_bytes(f"500B") == 500
+
+ def test_with_large_number(self):
+ assert CephSizeBytes._convert_to_bytes(f"1000GB") == 1000 * 1024 ** 3
+
+ def test_no_number(self):
+ with pytest.raises(ValueError):
+ CephSizeBytes._convert_to_bytes("GB")
+
+ def test_no_unit_with_default_unit_gb(self):
+ assert CephSizeBytes._convert_to_bytes("500", default_unit="GB") == 500 * 1024 ** 3
+
+ def test_no_unit_with_no_default_unit_raises(self):
+ with pytest.raises(ValueError):
+ CephSizeBytes._convert_to_bytes("500")
+
+ def test_unit_in_input_overrides_default(self):
+ assert CephSizeBytes._convert_to_bytes("50", default_unit="KB") == 50 * 1024
+ assert CephSizeBytes._convert_to_bytes("50KB", default_unit="KB") == 50 * 1024
+ assert CephSizeBytes._convert_to_bytes("50MB", default_unit="KB") == 50 * 1024 ** 2
--- /dev/null
+from unittest.mock import MagicMock
+import json
+import pytest
+from typing import Annotated
+
+from mgr_module import CLICommand
+from ceph_argparse import CephSizeBytes
+
+@pytest.fixture(scope="class", name="command_with_size_annotation_name")
+def fixture_command_with_size_annotation_name():
+ return "test annotated size command"
+
+
+@pytest.fixture(scope="class", name="command_with_size_annotation")
+def fixture_command_with_size_annotation(command_with_size_annotation_name):
+ @CLICommand(command_with_size_annotation_name)
+ def func(_, param: Annotated[int, CephSizeBytes()]): # noqa # pylint: disable=unused-variable
+ return {'a': '1', 'param': param}
+ yield func
+ del CLICommand.COMMANDS[command_with_size_annotation_name]
+ assert command_with_size_annotation_name not in CLICommand.COMMANDS
+
+
+class TestConvertAnnotatedType:
+ def test_command_convert_annotated_parameter(self, command_with_size_annotation, command_with_size_annotation_name):
+ result = CLICommand.COMMANDS[command_with_size_annotation_name].call(MagicMock(), {"param": f"{5 * 1024 ** 2}"})
+ assert result['param'] == 5 * 1024 ** 2
+
+ result = CLICommand.COMMANDS[command_with_size_annotation_name].call(MagicMock(), {"param": f"{5 * 1024}KB"})
+ assert result['param'] == 5 * 1024 ** 2
+
+ result = CLICommand.COMMANDS[command_with_size_annotation_name].call(MagicMock(), {"param": f"5MB"})
+ assert result['param'] == 5 * 1024 ** 2