from typing import Dict, List, Optional, Tuple, Union, cast
+import base64
import errno
import json
JoinSourceType,
LoginAccess,
LoginCategory,
+ PasswordFilter,
SMBClustering,
UserGroupSourceType,
)
from .proto import Self, Simplified
from .utils import checked
+ConversionOp = Tuple[PasswordFilter, PasswordFilter]
+
def _get_intent(data: Simplified) -> Intent:
"""Helper function that returns the intent value from a data dict."""
rc = getattr(self, '_resource_config')
return rc.object_to_simplified(self)
+ def convert(self, operation: ConversionOp) -> Self:
+ return self
+
@resourcelib.component()
class CephFSStorage(_RBase):
username: str
password: str
+ def convert(self, operation: ConversionOp) -> Self:
+ return self.__class__(
+ username=self.username,
+ password=_password_convert(self.password, operation),
+ )
+
@resourcelib.component()
class JoinSource(_RBase):
rc.on_construction_error(InvalidResourceError.wrap)
return rc
+ def convert(self, operation: ConversionOp) -> Self:
+ return self.__class__(
+ auth_id=self.auth_id,
+ intent=self.intent,
+ auth=None if not self.auth else self.auth.convert(operation),
+ linked_to_cluster=self.linked_to_cluster,
+ )
+
@resourcelib.resource('ceph.smb.usersgroups')
class UsersAndGroups(_RBase):
structured types.
"""
return resourcelib.load(data)
+
+
+def _password_convert(pvalue: str, operation: ConversionOp) -> str:
+ if operation == (PasswordFilter.NONE, PasswordFilter.BASE64):
+ pvalue = base64.b64encode(pvalue.encode("utf8")).decode("utf8")
+ elif operation == (PasswordFilter.NONE, PasswordFilter.HIDDEN):
+ pvalue = "*" * 16
+ elif operation == (PasswordFilter.BASE64, PasswordFilter.NONE):
+ pvalue = base64.b64decode(pvalue.encode("utf8")).decode("utf8")
+ else:
+ osrc, odst = operation
+ raise ValueError(
+ f"can not convert password value encoding from {osrc} to {odst}"
+ )
+ return pvalue