if self == self.READ_WRITE_SHORT:
return LoginAccess(self.READ_WRITE)
return self
+
+
+class SMBClustering(_StrEnum):
+ DEFAULT = 'default'
+ ALWAYS = 'always'
+ NEVER = 'never'
sqlite_store,
utils,
)
-from .enums import AuthMode, JoinSourceType, UserGroupSourceType
+from .enums import (
+ AuthMode,
+ JoinSourceType,
+ SMBClustering,
+ UserGroupSourceType,
+)
from .proto import AccessAuthorizer, ConfigStore, Simplified
if TYPE_CHECKING:
define_user_pass: Optional[List[str]] = None,
custom_dns: Optional[List[str]] = None,
placement: Optional[str] = None,
+ clustering: Optional[SMBClustering] = None,
) -> results.Result:
"""Create an smb cluster"""
domain_settings = None
user_group_settings=user_group_settings,
custom_dns=custom_dns,
placement=pspec,
+ clustering=clustering,
)
to_apply.append(cluster)
return self._handler.apply(to_apply, create_only=True).squash(cluster)
JoinSourceType,
LoginAccess,
LoginCategory,
+ SMBClustering,
UserGroupSourceType,
)
from .proto import Self, Simplified
custom_smb_global_options: Optional[Dict[str, str]] = None
# embedded orchestration placement spec
placement: Optional[WrappedPlacementSpec] = None
+ # control if the cluster is really a cluster
+ clustering: Optional[SMBClustering] = None
def validate(self) -> None:
if not self.cluster_id:
def cleaned_custom_smb_global_options(self) -> Optional[Dict[str, str]]:
return validation.clean_custom_options(self.custom_smb_global_options)
+ @property
+ def clustering_mode(self) -> SMBClustering:
+ return self.clustering if self.clustering else SMBClustering.DEFAULT
+
+ def is_clustered(self) -> bool:
+ """Return true if smbd instance should use (CTDB) clustering."""
+ if self.clustering_mode == SMBClustering.ALWAYS:
+ return True
+ if self.clustering_mode == SMBClustering.NEVER:
+ return False
+ # do clustering automatically, based on the placement spec's count value
+ count = 0
+ if self.placement and self.placement.count:
+ count = self.placement.count
+ # clustering enabled unless we're deploying a single instance "cluster"
+ return count != 1
+
@resourcelib.resource('ceph.smb.join.auth')
class JoinAuth(_RBase):
import smb
+def _cluster(**kwargs):
+ if 'clustering' not in kwargs:
+ kwargs['clustering'] = smb.enums.SMBClustering.NEVER
+ return smb.resources.Cluster(**kwargs)
+
+
@pytest.fixture
def thandler():
ext_store = smb.config_store.MemConfigStore()
def test_internal_apply_cluster(thandler):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
def test_cluster_add(thandler):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
def test_internal_apply_cluster_and_share(thandler):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
password='Passw0rd',
),
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_settings=smb.resources.DomainSettings(
def test_apply_add_second_cluster(thandler):
test_apply_full_cluster_create(thandler)
to_apply = [
- smb.resources.Cluster(
+ _cluster(
cluster_id='coolcluster',
auth_mode=smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_settings=smb.resources.DomainSettings(
groups=[],
),
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster2',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
),
],
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster3',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
),
linked_to_cluster='mycluster1',
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_settings=smb.resources.DomainSettings(
),
linked_to_cluster='mycluster2',
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_settings=smb.resources.DomainSettings(
),
linked_to_cluster='mycluster2',
),
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
test_apply_full_cluster_create(thandler)
to_apply = [
- smb.resources.Cluster(
+ _cluster(
cluster_id='mycluster1',
auth_mode=smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_settings=smb.resources.DomainSettings(
'cluster_id': 'c1',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'resource',
'cluster_id': 'c2',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'resource',
import smb
+def _cluster(**kwargs):
+ if 'clustering' not in kwargs:
+ kwargs['clustering'] = smb.enums.SMBClustering.NEVER
+ return smb.resources.Cluster(**kwargs)
+
+
@pytest.fixture
def tmodule():
internal_store = smb.config_store.MemConfigStore()
def test_internal_apply_cluster(tmodule):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
def test_cluster_add_cluster_ls(tmodule):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
def test_internal_apply_cluster_and_share(tmodule):
- cluster = smb.resources.Cluster(
+ cluster = _cluster(
cluster_id='foo',
auth_mode=smb.enums.AuthMode.USER,
user_group_settings=[
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'user',
'intent': 'present',
+ 'clustering': 'never',
'user_group_settings': [
{
'source_type': 'empty',
'cluster_id': 'foo',
'auth_mode': 'active-directory',
'intent': 'present',
+ 'clustering': 'never',
'domain_settings': {
'realm': 'dom1.example.com',
'join_sources': [
smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_realm='fizzle.example.net',
domain_join_user_pass=['Administrator%Passw0rd'],
+ clustering='never',
)
assert result.success
assert result.status['state'] == 'created'
smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_realm='sizzle.example.net',
domain_join_ref=['jaad2'],
+ clustering='never',
)
assert result.success
assert result.status['state'] == 'created'
'dizzle',
smb.enums.AuthMode.USER,
user_group_ref=['ug1'],
+ clustering='never',
)
assert result.success
assert result.status['state'] == 'created'
'dizzle',
smb.enums.AuthMode.USER,
define_user_pass=['alice%123letmein', 'bob%1n0wh4t1t15'],
+ clustering='never',
)
assert result.success
assert result.status['state'] == 'created'
smb.enums.AuthMode.ACTIVE_DIRECTORY,
domain_realm='fizzle.example.net',
domain_join_user_pass=['Administrator'],
+ clustering='never',
)
"ref": "foo"
}
]
- }
+ },
+ "clustering": "never"
}
""".strip()
)
join_sources:
- source_type: resource
ref: foo
+clustering: never
""".strip()
)