CephFSStorageProvider,
Intent,
JoinSourceType,
+ LoginAccess,
+ LoginCategory,
State,
UserGroupSourceType,
)
'x:ceph:id': f'{share.cluster_id}.{share.share_id}',
}
}
+ # extend share with user+group login access lists
+ _generate_share_login_control(share, cfg)
# extend share with custom options
custom_opts = share.cleaned_custom_smb_share_options
if custom_opts:
return cfg
+def _generate_share_login_control(
+ share: resources.Share, cfg: Simplified
+) -> None:
+ valid_users: List[str] = []
+ invalid_users: List[str] = []
+ read_list: List[str] = []
+ write_list: List[str] = []
+ admin_users: List[str] = []
+ for entry in share.login_control or []:
+ if entry.category == LoginCategory.GROUP:
+ name = f'@{entry.name}'
+ else:
+ name = entry.name
+ if entry.access == LoginAccess.NONE:
+ invalid_users.append(name)
+ continue
+ elif entry.access == LoginAccess.ADMIN:
+ admin_users.append(name)
+ elif entry.access == LoginAccess.READ_ONLY:
+ read_list.append(name)
+ elif entry.access == LoginAccess.READ_WRITE:
+ write_list.append(name)
+ if share.restrict_access:
+ valid_users.append(name)
+ if valid_users:
+ cfg['options']['valid users'] = ' '.join(valid_users)
+ if invalid_users:
+ cfg['options']['invalid users'] = ' '.join(invalid_users)
+ if read_list:
+ cfg['options']['read list'] = ' '.join(read_list)
+ if write_list:
+ cfg['options']['write list'] = ' '.join(write_list)
+ if admin_users:
+ cfg['options']['admin users'] = ' '.join(admin_users)
+
+
def _generate_config(
cluster: resources.Cluster,
shares: Iterable[resources.Share],