"""Create an smb cluster"""
domain_settings = None
user_group_settings = None
+ to_apply: List[resources.SMBResource] = []
if domain_realm or domain_join_ref or domain_join_user_pass:
join_sources: List[resources.JoinSource] = []
'a domain join username & password value'
' must contain a "%" separator'
)
+ rname = handler.rand_name(cluster_id)
join_sources.append(
resources.JoinSource(
- source_type=JoinSourceType.PASSWORD,
+ source_type=JoinSourceType.RESOURCE,
+ ref=rname,
+ )
+ )
+ to_apply.append(
+ resources.JoinAuth(
+ auth_id=rname,
auth=resources.JoinAuthValues(
username=username,
password=password,
),
+ linked_to_cluster=cluster_id,
)
)
domain_settings = resources.DomainSettings(
for unpw in define_user_pass or []:
username, password = unpw.split('%', 1)
users.append({'name': username, 'password': password})
- user_group_settings += [
+ rname = handler.rand_name(cluster_id)
+ user_group_settings.append(
resources.UserGroupSource(
- source_type=UserGroupSourceType.INLINE,
+ source_type=UserGroupSourceType.RESOURCE, ref=rname
+ )
+ )
+ to_apply.append(
+ resources.UsersAndGroups(
+ users_groups_id=rname,
values=resources.UserGroupSettings(
users=users,
groups=[],
),
+ linked_to_cluster=cluster_id,
)
- ]
+ )
pspec = resources.WrappedPlacementSpec.wrap(
PlacementSpec.from_string(placement)
custom_dns=custom_dns,
placement=pspec,
)
- return self._handler.apply([cluster]).one()
+ to_apply.append(cluster)
+ return self._handler.apply(to_apply).squash(cluster)
@cli.SMBCommand('cluster rm', perm='rw')
def cluster_rm(self, cluster_id: str) -> handler.Result:
assert len(result.src.domain_settings.join_sources) == 1
assert (
result.src.domain_settings.join_sources[0].source_type
- == smb.enums.JoinSourceType.PASSWORD
+ == smb.enums.JoinSourceType.RESOURCE
)
+ assert result.src.domain_settings.join_sources[0].ref.startswith('fizzle')
+ assert 'additional_results' in result.status
+ assert len(result.status['additional_results']) == 1
assert (
- result.src.domain_settings.join_sources[0].auth.username
- == 'Administrator'
+ result.status['additional_results'][0]['resource']['resource_type']
+ == 'ceph.smb.join.auth'
)
assert (
- result.src.domain_settings.join_sources[0].auth.password == 'Passw0rd'
+ result.status['additional_results'][0]['resource'][
+ 'linked_to_cluster'
+ ]
+ == 'fizzle'
)
+ assert result.status['additional_results'][0]['resource'][
+ 'auth_id'
+ ].startswith('fizzle')
def test_cluster_create_ad2(tmodule):