import logging
import socket
import json
+import re
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec
@cluster_setter
def create_nfs_cluster(self, cluster_id, placement):
try:
+ invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id)
+ if invalid_str:
+ raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. "
+ f"{invalid_str.group()} is char not permitted")
+
pool_list = [p['pool_name'] for p in self.mgr.get_osdmap().dump().get('pools', [])]
if self.pool_name not in pool_list: