From 162a57156f8ed654f3f39274961776f8633b722b Mon Sep 17 00:00:00 2001 From: Varsha Rao Date: Fri, 5 Jun 2020 14:44:59 +0530 Subject: [PATCH] mgr/volumes/nfs: Fetch exports in persistent way if mgr is restarted Fixes: https://tracker.ceph.com/issues/45740 Signed-off-by: Varsha Rao --- src/pybind/mgr/volumes/fs/nfs.py | 192 +++++++++++++++++++++++++++---- 1 file changed, 168 insertions(+), 24 deletions(-) diff --git a/src/pybind/mgr/volumes/fs/nfs.py b/src/pybind/mgr/volumes/fs/nfs.py index 761a2e88ea882..e24dd2f04214f 100644 --- a/src/pybind/mgr/volumes/fs/nfs.py +++ b/src/pybind/mgr/volumes/fs/nfs.py @@ -1,6 +1,7 @@ import errno import json import logging +from typing import List from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec from rados import TimedOut @@ -11,10 +12,119 @@ from .fs_util import create_pool log = logging.getLogger(__name__) +def available_clusters(mgr): + completion = mgr.describe_service(service_type='nfs') + mgr._orchestrator_wait([completion]) + orchestrator.raise_if_exception(completion) + return [cluster.spec.service_id for cluster in completion.result] + class GaneshaConfParser(object): def __init__(self, raw_config): self.pos = 0 self.text = "" + self.clean_config(raw_config) + + def clean_config(self, raw_config): + for line in raw_config.split("\n"): + self.text += line + if line.startswith("%"): + self.text += "\n" + + def remove_whitespaces_quotes(self): + if self.text.startswith("%url"): + self.text = self.text.replace('"', "") + else: + self.text = "".join(self.text.split()) + + def stream(self): + return self.text[self.pos:] + + def parse_block_name(self): + idx = self.stream().find('{') + if idx == -1: + raise Exception("Cannot find block name") + block_name = self.stream()[:idx] + self.pos += idx+1 + return block_name + + def parse_block_or_section(self): + if self.stream().startswith("%url "): + # section line + self.pos += 5 + idx = self.stream().find('\n') + if idx == -1: + value = self.stream() + self.pos += len(value) + else: + value = self.stream()[:idx] + self.pos += idx+1 + block_dict = {'block_name': '%url', 'value': value} + return block_dict + + block_dict = {'block_name': self.parse_block_name().upper()} + self.parse_block_body(block_dict) + if self.stream()[0] != '}': + raise Exception("No closing bracket '}' found at the end of block") + self.pos += 1 + return block_dict + + def parse_parameter_value(self, raw_value): + if raw_value.find(',') != -1: + return [self.parse_parameter_value(v.strip()) + for v in raw_value.split(',')] + try: + return int(raw_value) + except ValueError: + if raw_value == "true": + return True + if raw_value == "false": + return False + if raw_value.find('"') == 0: + return raw_value[1:-1] + return raw_value + + def parse_stanza(self, block_dict): + equal_idx = self.stream().find('=') + if equal_idx == -1: + raise Exception("Malformed stanza: no equal symbol found.") + semicolon_idx = self.stream().find(';') + parameter_name = self.stream()[:equal_idx].lower() + parameter_value = self.stream()[equal_idx+1:semicolon_idx] + block_dict[parameter_name] = self.parse_parameter_value(parameter_value) + self.pos += semicolon_idx+1 + + def parse_block_body(self, block_dict): + while True: + if self.stream().find('}') == 0: + # block end + return + + last_pos = self.pos + semicolon_idx = self.stream().find(';') + lbracket_idx = self.stream().find('{') + is_semicolon = (semicolon_idx != -1) + is_lbracket = (lbracket_idx != -1) + is_semicolon_lt_lbracket = (semicolon_idx < lbracket_idx) + + if is_semicolon and ((is_lbracket and is_semicolon_lt_lbracket) or not is_lbracket): + self.parse_stanza(block_dict) + elif is_lbracket and ((is_semicolon and not is_semicolon_lt_lbracket) or + (not is_semicolon)): + if '_blocks_' not in block_dict: + block_dict['_blocks_'] = [] + block_dict['_blocks_'].append(self.parse_block_or_section()) + else: + raise Exception("Malformed stanza: no semicolon found.") + + if last_pos == self.pos: + raise Exception("Infinite loop while parsing block content") + + def parse(self): + self.remove_whitespaces_quotes() + blocks = [] + while self.stream(): + blocks.append(self.parse_block_or_section()) + return blocks @staticmethod def _indentation(depth, size=4): @@ -48,7 +158,7 @@ class GaneshaConfParser(object): return conf_str @staticmethod - def write_block(block, depth): + def write_block(block, depth=0): if block['block_name'] == "%url": return '%url "{}"\n\n'.format(block['value']) @@ -58,17 +168,9 @@ class GaneshaConfParser(object): conf_str += " {\n" conf_str += GaneshaConfParser.write_block_body(block, depth+1) conf_str += GaneshaConfParser._indentation(depth) - conf_str += "}\n\n" + conf_str += "}\n" return conf_str - @staticmethod - def write_conf(blocks): - if not isinstance(blocks, list): - blocks = [blocks] - conf_str = "" - for block in blocks: - conf_str += GaneshaConfParser.write_block(block, 0) - return conf_str class CephFSFSal(): def __init__(self, name, user_id=None, fs_name=None, sec_label_xattr=None, @@ -162,6 +264,25 @@ class Export(object): self.transports = ["TCP"] self.clients = clients + @classmethod + def from_export_block(cls, export_block, cluster_id): + log.debug("parsing export block: %s", export_block) + + fsal_block = [b for b in export_block['_blocks_'] + if b['block_name'] == "FSAL"] + + client_blocks = [b for b in export_block['_blocks_'] + if b['block_name'] == "CLIENT"] + + return cls(export_block['export_id'], + export_block['path'], + CephFSFSal.from_fsal_block(fsal_block[0]), + cluster_id, + export_block['pseudo'], + export_block['access_type'], + [Client.from_client_block(client) + for client in client_blocks]) + def to_export_block(self): # pylint: disable=too-many-branches result = { @@ -174,7 +295,7 @@ class Export(object): 'attr_expiration_time': self.attr_expiration_time, 'security_label': self.security_label, 'protocols': self.protocols, - 'transports': [self.transports], + 'transports': self.transports, } result['_blocks_'] = [self.fsal.to_fsal_block()] result['_blocks_'].extend([client.to_client_block() @@ -196,9 +317,22 @@ class FSExport(object): self.mgr = mgr self.rados_pool = 'nfs-ganesha' self.rados_namespace = namespace #TODO check if cluster exists - self.export_conf_blocks = [] self.exports = {} + try: + log.info("Begin export parsing") + for cluster_id in available_clusters(self.mgr): + # Removes 'ganesha-' prefixes from cluster ids. + cluster_id = cluster_id[cluster_id.index('-')+1:] + self.export_conf_objs = [] # type: List[Export] + self._read_raw_config(cluster_id) + self.exports[cluster_id] = self.export_conf_objs + log.info(f"Exports parsed successfully {self.exports.items()}") + except orchestrator.NoOrchestrator: + # Pass it for vstart + log.info("Orchestrator not found") + pass + @staticmethod def _check_rados_notify(ioctx, obj): try: @@ -254,8 +388,24 @@ class FSExport(object): break return nid + def _read_raw_config(self, rados_namespace): + with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: + if rados_namespace: + ioctx.set_namespace(rados_namespace) + + for obj in ioctx.list_objects(): + if obj.key.startswith("export-"): + size, _ = obj.stat() + raw_config = obj.read(size) + raw_config = raw_config.decode("utf-8") + log.debug("read export configuration from rados " + "object %s/%s/%s:\n%s", self.rados_pool, + rados_namespace, obj.key, raw_config) + self.export_conf_objs.append(Export.from_export_block( + GaneshaConfParser(raw_config).parse()[0], rados_namespace)) + def _write_raw_config(self, conf_block, obj, append=False): - raw_config = GaneshaConfParser.write_conf(conf_block) + raw_config = GaneshaConfParser.write_block(conf_block) with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx: if self.rados_namespace: ioctx.set_namespace(self.rados_namespace) @@ -403,12 +553,6 @@ class NFSCluster: ioctx.remove_object(common_conf) log.info(f"Deleted object:{common_conf}") - def available_clusters(self): - completion = self.mgr.describe_service(service_type='nfs') - self.mgr._orchestrator_wait([completion]) - orchestrator.raise_if_exception(completion) - return [cluster.spec.service_id for cluster in completion.result] - def _set_cluster_id(self, cluster_id): self.cluster_id = "ganesha-%s" % cluster_id @@ -445,7 +589,7 @@ class NFSCluster: self._set_cluster_id(cluster_id) self.create_empty_rados_obj() - if self.cluster_id not in self.available_clusters(): + if self.cluster_id not in available_clusters(self.mgr): self._call_orch_apply_nfs(placement) return 0, "NFS Cluster Created Successfully", "" return 0, "", f"{self.cluster_id} cluster already exists" @@ -457,7 +601,7 @@ class NFSCluster: try: self._set_pool_namespace(cluster_id) self._set_cluster_id(cluster_id) - if self.cluster_id in self.available_clusters(): + if self.cluster_id in available_clusters(self.mgr): self._call_orch_apply_nfs(placement) return 0, "NFS Cluster Updated Successfully", "" return -errno.EINVAL, "", "Cluster does not exist" @@ -467,10 +611,10 @@ class NFSCluster: def delete_nfs_cluster(self, cluster_id): try: + self._set_pool_namespace(cluster_id) self._set_cluster_id(cluster_id) - cluster_list = self.available_clusters() - - if self.cluster_id in self.available_clusters(): + cluster_list = available_clusters(self.mgr) + if self.cluster_id in cluster_list: self.mgr.fs_export.delete_all_exports(cluster_id) completion = self.mgr.remove_service('nfs.' + self.cluster_id) self.mgr._orchestrator_wait([completion]) -- 2.39.5