import errno
import json
import logging
+from typing import List
from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec
from rados import TimedOut
log = logging.getLogger(__name__)
+def available_clusters(mgr):
+ completion = mgr.describe_service(service_type='nfs')
+ mgr._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return [cluster.spec.service_id for cluster in completion.result]
+
class GaneshaConfParser(object):
def __init__(self, raw_config):
self.pos = 0
self.text = ""
+ self.clean_config(raw_config)
+
+ def clean_config(self, raw_config):
+ for line in raw_config.split("\n"):
+ self.text += line
+ if line.startswith("%"):
+ self.text += "\n"
+
+ def remove_whitespaces_quotes(self):
+ if self.text.startswith("%url"):
+ self.text = self.text.replace('"', "")
+ else:
+ self.text = "".join(self.text.split())
+
+ def stream(self):
+ return self.text[self.pos:]
+
+ def parse_block_name(self):
+ idx = self.stream().find('{')
+ if idx == -1:
+ raise Exception("Cannot find block name")
+ block_name = self.stream()[:idx]
+ self.pos += idx+1
+ return block_name
+
+ def parse_block_or_section(self):
+ if self.stream().startswith("%url "):
+ # section line
+ self.pos += 5
+ idx = self.stream().find('\n')
+ if idx == -1:
+ value = self.stream()
+ self.pos += len(value)
+ else:
+ value = self.stream()[:idx]
+ self.pos += idx+1
+ block_dict = {'block_name': '%url', 'value': value}
+ return block_dict
+
+ block_dict = {'block_name': self.parse_block_name().upper()}
+ self.parse_block_body(block_dict)
+ if self.stream()[0] != '}':
+ raise Exception("No closing bracket '}' found at the end of block")
+ self.pos += 1
+ return block_dict
+
+ def parse_parameter_value(self, raw_value):
+ if raw_value.find(',') != -1:
+ return [self.parse_parameter_value(v.strip())
+ for v in raw_value.split(',')]
+ try:
+ return int(raw_value)
+ except ValueError:
+ if raw_value == "true":
+ return True
+ if raw_value == "false":
+ return False
+ if raw_value.find('"') == 0:
+ return raw_value[1:-1]
+ return raw_value
+
+ def parse_stanza(self, block_dict):
+ equal_idx = self.stream().find('=')
+ if equal_idx == -1:
+ raise Exception("Malformed stanza: no equal symbol found.")
+ semicolon_idx = self.stream().find(';')
+ parameter_name = self.stream()[:equal_idx].lower()
+ parameter_value = self.stream()[equal_idx+1:semicolon_idx]
+ block_dict[parameter_name] = self.parse_parameter_value(parameter_value)
+ self.pos += semicolon_idx+1
+
+ def parse_block_body(self, block_dict):
+ while True:
+ if self.stream().find('}') == 0:
+ # block end
+ return
+
+ last_pos = self.pos
+ semicolon_idx = self.stream().find(';')
+ lbracket_idx = self.stream().find('{')
+ is_semicolon = (semicolon_idx != -1)
+ is_lbracket = (lbracket_idx != -1)
+ is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx)
+
+ if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket):
+ self.parse_stanza(block_dict)
+ elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) or
+ (not is_semicolon)):
+ if '_blocks_' not in block_dict:
+ block_dict['_blocks_'] = []
+ block_dict['_blocks_'].append(self.parse_block_or_section())
+ else:
+ raise Exception("Malformed stanza: no semicolon found.")
+
+ if last_pos == self.pos:
+ raise Exception("Infinite loop while parsing block content")
+
+ def parse(self):
+ self.remove_whitespaces_quotes()
+ blocks = []
+ while self.stream():
+ blocks.append(self.parse_block_or_section())
+ return blocks
@staticmethod
def _indentation(depth, size=4):
return conf_str
@staticmethod
- def write_block(block, depth):
+ def write_block(block, depth=0):
if block['block_name'] == "%url":
return '%url "{}"\n\n'.format(block['value'])
conf_str += " {\n"
conf_str += GaneshaConfParser.write_block_body(block, depth+1)
conf_str += GaneshaConfParser._indentation(depth)
- conf_str += "}\n\n"
+ conf_str += "}\n"
return conf_str
- @staticmethod
- def write_conf(blocks):
- if not isinstance(blocks, list):
- blocks = [blocks]
- conf_str = ""
- for block in blocks:
- conf_str += GaneshaConfParser.write_block(block, 0)
- return conf_str
class CephFSFSal():
def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None,
self.transports = ["TCP"]
self.clients = clients
+ @classmethod
+ def from_export_block(cls, export_block, cluster_id):
+ log.debug("parsing export block: %s", export_block)
+
+ fsal_block = [b for b in export_block['_blocks_']
+ if b['block_name'] == "FSAL"]
+
+ client_blocks = [b for b in export_block['_blocks_']
+ if b['block_name'] == "CLIENT"]
+
+ return cls(export_block['export_id'],
+ export_block['path'],
+ CephFSFSal.from_fsal_block(fsal_block[0]),
+ cluster_id,
+ export_block['pseudo'],
+ export_block['access_type'],
+ [Client.from_client_block(client)
+ for client in client_blocks])
+
def to_export_block(self):
# pylint: disable=too-many-branches
result = {
'attr_expiration_time': self.attr_expiration_time,
'security_label': self.security_label,
'protocols': self.protocols,
- 'transports': [self.transports],
+ 'transports': self.transports,
}
result['_blocks_'] = [self.fsal.to_fsal_block()]
result['_blocks_'].extend([client.to_client_block()
self.mgr = mgr
self.rados_pool = 'nfs-ganesha'
self.rados_namespace = namespace #TODO check if cluster exists
- self.export_conf_blocks = []
self.exports = {}
+ try:
+ log.info("Begin export parsing")
+ for cluster_id in available_clusters(self.mgr):
+ # Removes 'ganesha-' prefixes from cluster ids.
+ cluster_id = cluster_id[cluster_id.index('-')+1:]
+ self.export_conf_objs = [] # type: List[Export]
+ self._read_raw_config(cluster_id)
+ self.exports[cluster_id] = self.export_conf_objs
+ log.info(f"Exports parsed successfully {self.exports.items()}")
+ except orchestrator.NoOrchestrator:
+ # Pass it for vstart
+ log.info("Orchestrator not found")
+ pass
+
@staticmethod
def _check_rados_notify(ioctx, obj):
try:
break
return nid
+ def _read_raw_config(self, rados_namespace):
+ with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ if rados_namespace:
+ ioctx.set_namespace(rados_namespace)
+
+ for obj in ioctx.list_objects():
+ if obj.key.startswith("export-"):
+ size, _ = obj.stat()
+ raw_config = obj.read(size)
+ raw_config = raw_config.decode("utf-8")
+ log.debug("read export configuration from rados "
+ "object %s/%s/%s:\n%s", self.rados_pool,
+ rados_namespace, obj.key, raw_config)
+ self.export_conf_objs.append(Export.from_export_block(
+ GaneshaConfParser(raw_config).parse()[0], rados_namespace))
+
def _write_raw_config(self, conf_block, obj, append=False):
- raw_config = GaneshaConfParser.write_conf(conf_block)
+ raw_config = GaneshaConfParser.write_block(conf_block)
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
if self.rados_namespace:
ioctx.set_namespace(self.rados_namespace)
ioctx.remove_object(common_conf)
log.info(f"Deleted object:{common_conf}")
- def available_clusters(self):
- completion = self.mgr.describe_service(service_type='nfs')
- self.mgr._orchestrator_wait([completion])
- orchestrator.raise_if_exception(completion)
- return [cluster.spec.service_id for cluster in completion.result]
-
def _set_cluster_id(self, cluster_id):
self.cluster_id = "ganesha-%s" % cluster_id
self._set_cluster_id(cluster_id)
self.create_empty_rados_obj()
- if self.cluster_id not in self.available_clusters():
+ if self.cluster_id not in available_clusters(self.mgr):
self._call_orch_apply_nfs(placement)
return 0, "NFS Cluster Created Successfully", ""
return 0, "", f"{self.cluster_id} cluster already exists"
try:
self._set_pool_namespace(cluster_id)
self._set_cluster_id(cluster_id)
- if self.cluster_id in self.available_clusters():
+ if self.cluster_id in available_clusters(self.mgr):
self._call_orch_apply_nfs(placement)
return 0, "NFS Cluster Updated Successfully", ""
return -errno.EINVAL, "", "Cluster does not exist"
def delete_nfs_cluster(self, cluster_id):
try:
+ self._set_pool_namespace(cluster_id)
self._set_cluster_id(cluster_id)
- cluster_list = self.available_clusters()
-
- if self.cluster_id in self.available_clusters():
+ cluster_list = available_clusters(self.mgr)
+ if self.cluster_id in cluster_list:
self.mgr.fs_export.delete_all_exports(cluster_id)
completion = self.mgr.remove_service('nfs.' + self.cluster_id)
self.mgr._orchestrator_wait([completion])