assert rs['results'][1]['msg'] == 'join auth linked to different cluster'
+def test_apply_cluster_bad_linked_ug(thandler):
+ to_apply = [
+ smb.resources.UsersAndGroups(
+ users_groups_id='ug1',
+ values=smb.resources.UserGroupSettings(
+ users=[{"username": "foo"}],
+ groups=[],
+ ),
+ linked_to_cluster='mycluster2',
+ ),
+ smb.resources.Cluster(
+ cluster_id='mycluster1',
+ auth_mode=smb.enums.AuthMode.USER,
+ user_group_settings=[
+ smb.resources.UserGroupSource(
+ source_type=smb.resources.UserGroupSourceType.RESOURCE,
+ ref='ug1',
+ ),
+ ],
+ ),
+ ]
+ results = thandler.apply(to_apply)
+ assert not results.success
+ rs = results.to_simplified()
+ assert len(rs['results']) == 2
+ assert rs['results'][0]['msg'] == 'linked_to_cluster id not valid'
+ assert (
+ rs['results'][1]['msg']
+ == 'users and groups linked to different cluster'
+ )
+
+
def test_apply_with_create_only(thandler):
test_apply_full_cluster_create(thandler)
'shares',
'mycluster1.foodirs',
) in thandler.internal_store.data
+
+
+def test_remove_in_use_cluster(thandler):
+ thandler.internal_store.overwrite(
+ {
+ 'clusters.foo': {
+ 'resource_type': 'ceph.smb.cluster',
+ 'cluster_id': 'foo',
+ 'auth_mode': 'active-directory',
+ 'intent': 'present',
+ 'domain_settings': {
+ 'realm': 'dom1.example.com',
+ 'join_sources': [
+ {
+ 'source_type': 'resource',
+ 'ref': 'foo1',
+ }
+ ],
+ },
+ },
+ 'join_auths.foo1': {
+ 'resource_type': 'ceph.smb.join.auth',
+ 'auth_id': 'foo1',
+ 'intent': 'present',
+ 'auth': {
+ 'username': 'testadmin',
+ 'password': 'Passw0rd',
+ },
+ },
+ 'shares.foo.s1': {
+ 'resource_type': 'ceph.smb.share',
+ 'cluster_id': 'foo',
+ 'share_id': 's1',
+ 'intent': 'present',
+ 'name': 'Ess One',
+ 'readonly': False,
+ 'browseable': True,
+ 'cephfs': {
+ 'volume': 'cephfs',
+ 'path': '/',
+ 'provider': 'samba-vfs',
+ },
+ },
+ }
+ )
+
+ to_apply = [
+ smb.resources.RemovedCluster(
+ cluster_id='foo',
+ ),
+ ]
+ results = thandler.apply(to_apply)
+ rs = results.to_simplified()
+ assert not results.success
+ assert 'cluster in use' in rs['results'][0]['msg']
+
+
+def test_remove_in_use_join_auth(thandler):
+ thandler.internal_store.overwrite(
+ {
+ 'clusters.foo': {
+ 'resource_type': 'ceph.smb.cluster',
+ 'cluster_id': 'foo',
+ 'auth_mode': 'active-directory',
+ 'intent': 'present',
+ 'domain_settings': {
+ 'realm': 'dom1.example.com',
+ 'join_sources': [
+ {
+ 'source_type': 'resource',
+ 'ref': 'foo1',
+ }
+ ],
+ },
+ },
+ 'join_auths.foo1': {
+ 'resource_type': 'ceph.smb.join.auth',
+ 'auth_id': 'foo1',
+ 'intent': 'present',
+ 'auth': {
+ 'username': 'testadmin',
+ 'password': 'Passw0rd',
+ },
+ },
+ 'shares.foo.s1': {
+ 'resource_type': 'ceph.smb.share',
+ 'cluster_id': 'foo',
+ 'share_id': 's1',
+ 'intent': 'present',
+ 'name': 'Ess One',
+ 'readonly': False,
+ 'browseable': True,
+ 'cephfs': {
+ 'volume': 'cephfs',
+ 'path': '/',
+ 'provider': 'samba-vfs',
+ },
+ },
+ }
+ )
+
+ to_apply = [
+ smb.resources.JoinAuth(
+ auth_id='foo1',
+ intent=smb.enums.Intent.REMOVED,
+ ),
+ ]
+ results = thandler.apply(to_apply)
+ rs = results.to_simplified()
+ assert not results.success
+ assert 'resource in use' in rs['results'][0]['msg']
+
+
+def test_remove_in_use_ug(thandler):
+ thandler.internal_store.overwrite(
+ {
+ 'clusters.foo': {
+ 'resource_type': 'ceph.smb.cluster',
+ 'cluster_id': 'foo',
+ 'auth_mode': 'user',
+ 'intent': 'present',
+ 'user_group_settings': [
+ {
+ 'source_type': 'resource',
+ 'ref': 'foo1',
+ }
+ ],
+ },
+ 'users_and_groups.foo1': {
+ 'resource_type': 'ceph.smb.usersgroups',
+ 'users_groups_id': 'foo1',
+ 'intent': 'present',
+ 'values': {
+ 'users': [{"username": "foo"}],
+ 'groups': [],
+ },
+ },
+ 'shares.foo.s1': {
+ 'resource_type': 'ceph.smb.share',
+ 'cluster_id': 'foo',
+ 'share_id': 's1',
+ 'intent': 'present',
+ 'name': 'Ess One',
+ 'readonly': False,
+ 'browseable': True,
+ 'cephfs': {
+ 'volume': 'cephfs',
+ 'path': '/',
+ 'provider': 'samba-vfs',
+ },
+ },
+ }
+ )
+
+ to_apply = [
+ smb.resources.UsersAndGroups(
+ users_groups_id='foo1',
+ intent=smb.enums.Intent.REMOVED,
+ ),
+ ]
+ results = thandler.apply(to_apply)
+ rs = results.to_simplified()
+ assert not results.success
+ assert 'resource in use' in rs['results'][0]['msg']