)
import logging
-import random
-import string
import time
from ceph.deployment.service_spec import SMBSpec
OrchSubmitter,
PathResolver,
Simplified,
- checked,
)
from .resources import SMBResource
from .results import ErrorResult, Result, ResultGroup
+from .utils import checked, ynbool
ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
ShareRef = Union[resources.Share, resources.RemovedShare]
}
-def _ynbool(value: bool) -> str:
- """Convert a bool to an smb.conf compatible string."""
- return 'Yes' if value else 'No'
-
-
def _generate_share(
share: resources.Share, resolver: PathResolver, cephx_entity: str
) -> Dict[str, Dict[str, str]]:
'ceph:config_file': '/etc/ceph/ceph.conf',
'ceph:filesystem': share.cephfs.volume,
'ceph:user_id': cephx_entity,
- 'read only': _ynbool(share.readonly),
- 'browseable': _ynbool(share.browseable),
+ 'read only': ynbool(share.readonly),
+ 'browseable': ynbool(share.browseable),
'kernel share modes': 'no',
'x:ceph:id': f'{share.cluster_id}.{share.share_id}',
}
use for data access.
"""
return f'client.smb.fs.cluster.{cluster_id}'
-
-
-def rand_name(prefix: str, max_len: int = 18, suffix_len: int = 8) -> str:
- trunc = prefix[: (max_len - suffix_len)]
- suffix = ''.join(
- random.choice(string.ascii_lowercase) for _ in range(suffix_len)
- )
- return f'{trunc}{suffix}'
EntryKey,
Self,
Simplifiable,
- one,
)
from .resources import SMBResource
from .results import ErrorResult
+from .utils import one
T = TypeVar('T')
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
from mgr_module import MgrModule, Option
-from . import cli, fs, handler, mon_store, rados_store, resources, results
+from . import (
+ cli,
+ fs,
+ handler,
+ mon_store,
+ rados_store,
+ resources,
+ results,
+ utils,
+)
from .enums import AuthMode, JoinSourceType, UserGroupSourceType
from .proto import AccessAuthorizer, Simplified
'a domain join username & password value'
' must contain a "%" separator'
)
- rname = handler.rand_name(cluster_id)
+ rname = utils.rand_name(cluster_id)
join_sources.append(
resources.JoinSource(
source_type=JoinSourceType.RESOURCE,
for unpw in define_user_pass or []:
username, password = unpw.split('%', 1)
users.append({'name': username, 'password': password})
- rname = handler.rand_name(cluster_id)
+ rname = utils.rand_name(cluster_id)
user_group_settings.append(
resources.UserGroupSource(
source_type=UserGroupSourceType.RESOURCE, ref=rname
List,
Optional,
Tuple,
- TypeVar,
)
import sys
self, volume: str, entity: str, caps: str = ''
) -> None:
... # pragma: no cover
-
-
-T = TypeVar('T')
-
-
-# TODO: move to a utils.py
-def one(lst: List[T]) -> T:
- if len(lst) != 1:
- raise ValueError("list does not contain exactly one element")
- return lst[0]
-
-
-class IsNoneError(ValueError):
- pass
-
-
-def checked(v: Optional[T]) -> T:
- """Ensures the provided value is not a None or raises a IsNoneError.
- Intended use is similar to an `assert v is not None` but more usable in
- one-liners and list/dict/etc comprehensions.
- """
- if v is None:
- raise IsNoneError('value is None')
- return v
LoginCategory,
UserGroupSourceType,
)
-from .proto import Self, Simplified, checked
+from .proto import Self, Simplified
+from .utils import checked
def _get_intent(data: Simplified) -> Intent:
import errno
-from .proto import Simplified, one
+from .proto import Simplified
from .resources import SMBResource
+from .utils import one
_DOMAIN = 'domain'
assert not rg.success
-def test_one():
- assert smb.proto.one(['a']) == 'a'
- with pytest.raises(ValueError):
- smb.proto.one([])
- with pytest.raises(ValueError):
- smb.proto.one(['a', 'b'])
-
-
def test_apply_full_cluster_create(thandler):
to_apply = [
smb.resources.JoinAuth(
assert rs['results'][1]['msg'] == 'join auth linked to different cluster'
-def test_rand_name():
- name = smb.handler.rand_name('bob')
- assert name.startswith('bob')
- assert len(name) == 11
- name = smb.handler.rand_name('carla')
- assert name.startswith('carla')
- assert len(name) == 13
- name = smb.handler.rand_name('dangeresque')
- assert name.startswith('dangeresqu')
- assert len(name) == 18
- name = smb.handler.rand_name('fhqwhgadsfhqwhgadsfhqwhgads')
- assert name.startswith('fhqwhgadsf')
- assert len(name) == 18
- name = smb.handler.rand_name('')
- assert len(name) == 8
-
-
def test_apply_with_create_only(thandler):
test_apply_full_cluster_create(thandler)
--- /dev/null
+import pytest
+
+import smb.utils
+
+
+def test_one():
+ assert smb.utils.one(['a']) == 'a'
+ with pytest.raises(ValueError):
+ smb.utils.one([])
+ with pytest.raises(ValueError):
+ smb.utils.one(['a', 'b'])
+
+
+def test_rand_name():
+ name = smb.utils.rand_name('bob')
+ assert name.startswith('bob')
+ assert len(name) == 11
+ name = smb.utils.rand_name('carla')
+ assert name.startswith('carla')
+ assert len(name) == 13
+ name = smb.utils.rand_name('dangeresque')
+ assert name.startswith('dangeresqu')
+ assert len(name) == 18
+ name = smb.utils.rand_name('fhqwhgadsfhqwhgadsfhqwhgads')
+ assert name.startswith('fhqwhgadsf')
+ assert len(name) == 18
+ name = smb.utils.rand_name('')
+ assert len(name) == 8
--- /dev/null
+"""Assorted utility functions for smb mgr module."""
+from typing import List, Optional, TypeVar
+
+import random
+import string
+
+T = TypeVar('T')
+
+
+def one(lst: List[T]) -> T:
+ """Given a list, ensure that the list contains exactly one item and return
+ it. A ValueError will be raised in the case that the list does not contain
+ exactly one item.
+ """
+ if len(lst) != 1:
+ raise ValueError("list does not contain exactly one element")
+ return lst[0]
+
+
+class IsNoneError(ValueError):
+ """A ValueError subclass raised by ``checked`` function."""
+
+ pass
+
+
+def checked(v: Optional[T]) -> T:
+ """Ensures the provided value is not a None or raises a IsNoneError.
+ Intended use is similar to an `assert v is not None` but more usable in
+ one-liners and list/dict/etc comprehensions.
+ """
+ if v is None:
+ raise IsNoneError('value is None')
+ return v
+
+
+def ynbool(value: bool) -> str:
+ """Convert a bool to an smb.conf-style boolean string."""
+ return 'Yes' if value else 'No'
+
+
+def rand_name(prefix: str, max_len: int = 18, suffix_len: int = 8) -> str:
+ trunc = prefix[: (max_len - suffix_len)]
+ suffix = ''.join(
+ random.choice(string.ascii_lowercase) for _ in range(suffix_len)
+ )
+ return f'{trunc}{suffix}'