From: Sebastian Wagner Date: Mon, 4 May 2020 10:47:38 +0000 (+0200) Subject: mgr/cephadm: move nfs and osd to services/ X-Git-Tag: wip-pdonnell-testing-20200918.022351~1333^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=011747c74e9308d9301613395020800f556e85c4;p=ceph-ci.git mgr/cephadm: move nfs and osd to services/ Signed-off-by: Sebastian Wagner --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index ca6ad58e208..541737bd5e9 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -39,8 +39,8 @@ from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpe from . import remotes from . import utils -from .nfs import NFSGanesha -from .osd import RemoveUtil, OSDRemoval +from .services.nfs import NFSGanesha +from .services.osd import RemoveUtil, OSDRemoval from .inventory import Inventory, SpecStore, HostCache try: diff --git a/src/pybind/mgr/cephadm/nfs.py b/src/pybind/mgr/cephadm/nfs.py deleted file mode 100644 index 1c0a45200ec..00000000000 --- a/src/pybind/mgr/cephadm/nfs.py +++ /dev/null @@ -1,155 +0,0 @@ -import logging -import rados - -from typing import Dict, Optional - -from ceph.deployment.service_spec import NFSServiceSpec - -import cephadm -from orchestrator import OrchestratorError - -from . import utils - -logger = logging.getLogger(__name__) - -class NFSGanesha(object): - def __init__(self, - mgr, - daemon_id, - spec): - # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None - assert spec.service_id and daemon_id.startswith(spec.service_id) - self.mgr = mgr - self.daemon_id = daemon_id - self.spec = spec - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.spec.service_type, self.daemon_id) - - def get_rados_user(self): - # type: () -> str - return '%s.%s' % (self.spec.service_type, self.daemon_id) - - def get_keyring_entity(self): - # type: () -> str - return utils.name_to_config_section(self.get_rados_user()) - - def get_or_create_keyring(self, entity=None): - # type: (Optional[str]) -> str - if not entity: - entity = self.get_keyring_entity() - - logger.info('Create keyring: %s' % entity) - ret, keyring, err = self.mgr.mon_command({ - 'prefix': 'auth get-or-create', - 'entity': entity, - }) - - if ret != 0: - raise OrchestratorError( - 'Unable to create keyring %s: %s %s' \ - % (entity, ret, err)) - return keyring - - def update_keyring_caps(self, entity=None): - # type: (Optional[str]) -> None - if not entity: - entity = self.get_keyring_entity() - - osd_caps='allow rw pool=%s' % (self.spec.pool) - if self.spec.namespace: - osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace) - - logger.info('Updating keyring caps: %s' % entity) - ret, out, err = self.mgr.mon_command({ - 'prefix': 'auth caps', - 'entity': entity, - 'caps': ['mon', 'allow r', - 'osd', osd_caps, - 'mds', 'allow rw'], - }) - - if ret != 0: - raise OrchestratorError( - 'Unable to update keyring caps %s: %s %s' \ - % (entity, ret, err)) - - def create_rados_config_obj(self, clobber=False): - # type: (Optional[bool]) -> None - obj = self.spec.rados_config_name() - - with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx: - if self.spec.namespace: - ioctx.set_namespace(self.spec.namespace) - - exists = True - try: - ioctx.stat(obj) - except rados.ObjectNotFound as e: - exists = False - - if exists and not clobber: - # Assume an existing config - logger.info('Rados config object exists: %s' % obj) - else: - # Create an empty config object - logger.info('Creating rados config object: %s' % obj) - ioctx.write_full(obj, ''.encode('utf-8')) - - def get_ganesha_conf(self): - # type: () -> str - return '''# generated by cephadm -NFS_CORE_PARAM {{ - Enable_NLM = false; - Enable_RQUOTA = false; - Protocols = 4; -}} - -CACHEINODE {{ - Dir_Chunk = 0; - NParts = 1; - Cache_Size = 1; -}} - -EXPORT_DEFAULTS {{ - Attr_Expiration_Time = 0; -}} - -NFSv4 {{ - Delegations = false; - RecoveryBackend = 'rados_cluster'; - Minor_Versions = 1, 2; -}} - -RADOS_KV {{ - UserId = "{user}"; - nodeid = "{nodeid}"; - pool = "{pool}"; - namespace = "{namespace}"; -}} - -RADOS_URLS {{ - UserId = "{user}"; - watch_url = "{url}"; -}} - -%url {url} -'''.format(user=self.get_rados_user(), - nodeid=self.get_daemon_name(), - pool=self.spec.pool, - namespace=self.spec.namespace if self.spec.namespace else '', - url=self.spec.rados_config_location()) - - def get_cephadm_config(self): - # type: () -> Dict - config = {'pool' : self.spec.pool} # type: Dict - if self.spec.namespace: - config['namespace'] = self.spec.namespace - config['userid'] = self.get_rados_user() - config['extra_args'] = ['-N', 'NIV_EVENT'] - config['files'] = { - 'ganesha.conf' : self.get_ganesha_conf(), - } - logger.debug('Generated cephadm config-json: %s' % config) - return config diff --git a/src/pybind/mgr/cephadm/osd.py b/src/pybind/mgr/cephadm/osd.py deleted file mode 100644 index 2ead0cb8077..00000000000 --- a/src/pybind/mgr/cephadm/osd.py +++ /dev/null @@ -1,191 +0,0 @@ -import datetime -import json -import logging -import time - -from typing import List, Dict, Any, Set, Union - -import orchestrator -from orchestrator import OrchestratorError - -logger = logging.getLogger(__name__) - - -class OSDRemoval(object): - def __init__(self, - osd_id: str, - replace: bool, - force: bool, - nodename: str, - fullname: str, - start_at: datetime.datetime, - pg_count: int): - self.osd_id = osd_id - self.replace = replace - self.force = force - self.nodename = nodename - self.fullname = fullname - self.started_at = start_at - self.pg_count = pg_count - - # needed due to changing 'started_at' attr - def __eq__(self, other): - return self.osd_id == other.osd_id - - def __hash__(self): - return hash(self.osd_id) - - def __repr__(self): - return ('(osd_id={}, replace={}, force={}, nodename={}' - ', fullname={}, started_at={}, pg_count={})').format( - self.osd_id, self.replace, self.force, self.nodename, - self.fullname, self.started_at, self.pg_count) - - @property - def pg_count_str(self) -> str: - return 'n/a' if self.pg_count < 0 else str(self.pg_count) - - -class RemoveUtil(object): - def __init__(self, mgr): - self.mgr = mgr - self.to_remove_osds: Set[OSDRemoval] = set() - self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() - - @property - def report(self) -> Set[OSDRemoval]: - return self.to_remove_osds.copy() - - def queue_osds_for_removal(self, osds: Set[OSDRemoval]): - self.to_remove_osds.update(osds) - - def _remove_osds_bg(self) -> None: - """ - Performs actions in the _serve() loop to remove an OSD - when criteria is met. - """ - logger.debug( - f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") - self._update_osd_removal_status() - remove_osds: set = self.to_remove_osds.copy() - for osd in remove_osds: - if not osd.force: - self.drain_osd(osd.osd_id) - # skip criteria - if not self.is_empty(osd.osd_id): - logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") - continue - - if not self.ok_to_destroy([osd.osd_id]): - logger.info( - f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") - continue - - # abort criteria - if not self.down_osd([osd.osd_id]): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError( - f"Could not set OSD <{osd.osd_id}> to 'down'") - - if osd.replace: - if not self.destroy_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError( - f"Could not destroy OSD <{osd.osd_id}>") - else: - if not self.purge_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") - - self.mgr._remove_daemon(osd.fullname, osd.nodename) - logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") - logger.debug(f"Removing {osd.osd_id} from the queue.") - self.to_remove_osds.remove(osd) - - def _update_osd_removal_status(self): - """ - Generate a OSD report that can be printed to the CLI - """ - logger.debug("Update OSD removal status") - for osd in self.to_remove_osds: - osd.pg_count = self.get_pg_count(str(osd.osd_id)) - logger.debug(f"OSD removal status: {self.to_remove_osds}") - - def drain_osd(self, osd_id: str) -> bool: - """ - Uses `osd_support` module to schedule a drain operation of an OSD - """ - cmd_args = { - 'prefix': 'osd drain', - 'osd_ids': [int(osd_id)] - } - return self._run_mon_cmd(cmd_args) - - def get_pg_count(self, osd_id: str) -> int: - """ Queries for PG count of an OSD """ - self.mgr.log.debug("Querying for drain status") - ret, out, err = self.mgr.mon_command({ - 'prefix': 'osd drain status', - }) - if ret != 0: - self.mgr.log.error(f"Calling osd drain status failed with {err}") - raise OrchestratorError("Could not query `osd drain status`") - out = json.loads(out) - for o in out: - if str(o.get('osd_id', '')) == str(osd_id): - return int(o.get('pgs', -1)) - return -1 - - def is_empty(self, osd_id: str) -> bool: - """ Checks if an OSD is empty """ - return self.get_pg_count(osd_id) == 0 - - def ok_to_destroy(self, osd_ids: List[int]) -> bool: - """ Queries the safe-to-destroy flag for OSDs """ - cmd_args = {'prefix': 'osd safe-to-destroy', - 'ids': osd_ids} - return self._run_mon_cmd(cmd_args) - - def destroy_osd(self, osd_id: int) -> bool: - """ Destroys an OSD (forcefully) """ - cmd_args = {'prefix': 'osd destroy-actual', - 'id': int(osd_id), - 'yes_i_really_mean_it': True} - return self._run_mon_cmd(cmd_args) - - def down_osd(self, osd_ids: List[int]) -> bool: - """ Sets `out` flag to OSDs """ - cmd_args = { - 'prefix': 'osd down', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - - def purge_osd(self, osd_id: int) -> bool: - """ Purges an OSD from the cluster (forcefully) """ - cmd_args = { - 'prefix': 'osd purge-actual', - 'id': int(osd_id), - 'yes_i_really_mean_it': True - } - return self._run_mon_cmd(cmd_args) - - def out_osd(self, osd_ids: List[int]) -> bool: - """ Sets `down` flag to OSDs """ - cmd_args = { - 'prefix': 'osd out', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - - def _run_mon_cmd(self, cmd_args: dict) -> bool: - """ - Generic command to run mon_command and evaluate/log the results - """ - ret, out, err = self.mgr.mon_command(cmd_args) - if ret != 0: - self.mgr.log.debug(f"ran {cmd_args} with mon_command") - self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") - return False - self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") - return True diff --git a/src/pybind/mgr/cephadm/services/__init__.py b/src/pybind/mgr/cephadm/services/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py new file mode 100644 index 00000000000..2eaa9344ab6 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/nfs.py @@ -0,0 +1,155 @@ +import logging +import rados + +from typing import Dict, Optional + +from ceph.deployment.service_spec import NFSServiceSpec + +import cephadm +from orchestrator import OrchestratorError + +from .. import utils + +logger = logging.getLogger(__name__) + +class NFSGanesha(object): + def __init__(self, + mgr, + daemon_id, + spec): + # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None + assert spec.service_id and daemon_id.startswith(spec.service_id) + self.mgr = mgr + self.daemon_id = daemon_id + self.spec = spec + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.spec.service_type, self.daemon_id) + + def get_rados_user(self): + # type: () -> str + return '%s.%s' % (self.spec.service_type, self.daemon_id) + + def get_keyring_entity(self): + # type: () -> str + return utils.name_to_config_section(self.get_rados_user()) + + def get_or_create_keyring(self, entity=None): + # type: (Optional[str]) -> str + if not entity: + entity = self.get_keyring_entity() + + logger.info('Create keyring: %s' % entity) + ret, keyring, err = self.mgr.mon_command({ + 'prefix': 'auth get-or-create', + 'entity': entity, + }) + + if ret != 0: + raise OrchestratorError( + 'Unable to create keyring %s: %s %s' \ + % (entity, ret, err)) + return keyring + + def update_keyring_caps(self, entity=None): + # type: (Optional[str]) -> None + if not entity: + entity = self.get_keyring_entity() + + osd_caps='allow rw pool=%s' % (self.spec.pool) + if self.spec.namespace: + osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace) + + logger.info('Updating keyring caps: %s' % entity) + ret, out, err = self.mgr.mon_command({ + 'prefix': 'auth caps', + 'entity': entity, + 'caps': ['mon', 'allow r', + 'osd', osd_caps, + 'mds', 'allow rw'], + }) + + if ret != 0: + raise OrchestratorError( + 'Unable to update keyring caps %s: %s %s' \ + % (entity, ret, err)) + + def create_rados_config_obj(self, clobber=False): + # type: (Optional[bool]) -> None + obj = self.spec.rados_config_name() + + with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx: + if self.spec.namespace: + ioctx.set_namespace(self.spec.namespace) + + exists = True + try: + ioctx.stat(obj) + except rados.ObjectNotFound as e: + exists = False + + if exists and not clobber: + # Assume an existing config + logger.info('Rados config object exists: %s' % obj) + else: + # Create an empty config object + logger.info('Creating rados config object: %s' % obj) + ioctx.write_full(obj, ''.encode('utf-8')) + + def get_ganesha_conf(self): + # type: () -> str + return '''# generated by cephadm +NFS_CORE_PARAM {{ + Enable_NLM = false; + Enable_RQUOTA = false; + Protocols = 4; +}} + +CACHEINODE {{ + Dir_Chunk = 0; + NParts = 1; + Cache_Size = 1; +}} + +EXPORT_DEFAULTS {{ + Attr_Expiration_Time = 0; +}} + +NFSv4 {{ + Delegations = false; + RecoveryBackend = 'rados_cluster'; + Minor_Versions = 1, 2; +}} + +RADOS_KV {{ + UserId = "{user}"; + nodeid = "{nodeid}"; + pool = "{pool}"; + namespace = "{namespace}"; +}} + +RADOS_URLS {{ + UserId = "{user}"; + watch_url = "{url}"; +}} + +%url {url} +'''.format(user=self.get_rados_user(), + nodeid=self.get_daemon_name(), + pool=self.spec.pool, + namespace=self.spec.namespace if self.spec.namespace else '', + url=self.spec.rados_config_location()) + + def get_cephadm_config(self): + # type: () -> Dict + config = {'pool' : self.spec.pool} # type: Dict + if self.spec.namespace: + config['namespace'] = self.spec.namespace + config['userid'] = self.get_rados_user() + config['extra_args'] = ['-N', 'NIV_EVENT'] + config['files'] = { + 'ganesha.conf' : self.get_ganesha_conf(), + } + logger.debug('Generated cephadm config-json: %s' % config) + return config diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py new file mode 100644 index 00000000000..2ead0cb8077 --- /dev/null +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -0,0 +1,191 @@ +import datetime +import json +import logging +import time + +from typing import List, Dict, Any, Set, Union + +import orchestrator +from orchestrator import OrchestratorError + +logger = logging.getLogger(__name__) + + +class OSDRemoval(object): + def __init__(self, + osd_id: str, + replace: bool, + force: bool, + nodename: str, + fullname: str, + start_at: datetime.datetime, + pg_count: int): + self.osd_id = osd_id + self.replace = replace + self.force = force + self.nodename = nodename + self.fullname = fullname + self.started_at = start_at + self.pg_count = pg_count + + # needed due to changing 'started_at' attr + def __eq__(self, other): + return self.osd_id == other.osd_id + + def __hash__(self): + return hash(self.osd_id) + + def __repr__(self): + return ('(osd_id={}, replace={}, force={}, nodename={}' + ', fullname={}, started_at={}, pg_count={})').format( + self.osd_id, self.replace, self.force, self.nodename, + self.fullname, self.started_at, self.pg_count) + + @property + def pg_count_str(self) -> str: + return 'n/a' if self.pg_count < 0 else str(self.pg_count) + + +class RemoveUtil(object): + def __init__(self, mgr): + self.mgr = mgr + self.to_remove_osds: Set[OSDRemoval] = set() + self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() + + @property + def report(self) -> Set[OSDRemoval]: + return self.to_remove_osds.copy() + + def queue_osds_for_removal(self, osds: Set[OSDRemoval]): + self.to_remove_osds.update(osds) + + def _remove_osds_bg(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + """ + logger.debug( + f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") + self._update_osd_removal_status() + remove_osds: set = self.to_remove_osds.copy() + for osd in remove_osds: + if not osd.force: + self.drain_osd(osd.osd_id) + # skip criteria + if not self.is_empty(osd.osd_id): + logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + continue + + if not self.ok_to_destroy([osd.osd_id]): + logger.info( + f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + continue + + # abort criteria + if not self.down_osd([osd.osd_id]): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not set OSD <{osd.osd_id}> to 'down'") + + if osd.replace: + if not self.destroy_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not destroy OSD <{osd.osd_id}>") + else: + if not self.purge_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + + self.mgr._remove_daemon(osd.fullname, osd.nodename) + logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") + logger.debug(f"Removing {osd.osd_id} from the queue.") + self.to_remove_osds.remove(osd) + + def _update_osd_removal_status(self): + """ + Generate a OSD report that can be printed to the CLI + """ + logger.debug("Update OSD removal status") + for osd in self.to_remove_osds: + osd.pg_count = self.get_pg_count(str(osd.osd_id)) + logger.debug(f"OSD removal status: {self.to_remove_osds}") + + def drain_osd(self, osd_id: str) -> bool: + """ + Uses `osd_support` module to schedule a drain operation of an OSD + """ + cmd_args = { + 'prefix': 'osd drain', + 'osd_ids': [int(osd_id)] + } + return self._run_mon_cmd(cmd_args) + + def get_pg_count(self, osd_id: str) -> int: + """ Queries for PG count of an OSD """ + self.mgr.log.debug("Querying for drain status") + ret, out, err = self.mgr.mon_command({ + 'prefix': 'osd drain status', + }) + if ret != 0: + self.mgr.log.error(f"Calling osd drain status failed with {err}") + raise OrchestratorError("Could not query `osd drain status`") + out = json.loads(out) + for o in out: + if str(o.get('osd_id', '')) == str(osd_id): + return int(o.get('pgs', -1)) + return -1 + + def is_empty(self, osd_id: str) -> bool: + """ Checks if an OSD is empty """ + return self.get_pg_count(osd_id) == 0 + + def ok_to_destroy(self, osd_ids: List[int]) -> bool: + """ Queries the safe-to-destroy flag for OSDs """ + cmd_args = {'prefix': 'osd safe-to-destroy', + 'ids': osd_ids} + return self._run_mon_cmd(cmd_args) + + def destroy_osd(self, osd_id: int) -> bool: + """ Destroys an OSD (forcefully) """ + cmd_args = {'prefix': 'osd destroy-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True} + return self._run_mon_cmd(cmd_args) + + def down_osd(self, osd_ids: List[int]) -> bool: + """ Sets `out` flag to OSDs """ + cmd_args = { + 'prefix': 'osd down', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def purge_osd(self, osd_id: int) -> bool: + """ Purges an OSD from the cluster (forcefully) """ + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True + } + return self._run_mon_cmd(cmd_args) + + def out_osd(self, osd_ids: List[int]) -> bool: + """ Sets `down` flag to OSDs """ + cmd_args = { + 'prefix': 'osd out', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def _run_mon_cmd(self, cmd_args: dict) -> bool: + """ + Generic command to run mon_command and evaluate/log the results + """ + ret, out, err = self.mgr.mon_command(cmd_args) + if ret != 0: + self.mgr.log.debug(f"ran {cmd_args} with mon_command") + self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + return False + self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + return True diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index 2d9ba86ee1d..8226bdb2ed7 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -5,7 +5,7 @@ from contextlib import contextmanager import pytest from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection -from cephadm.osd import OSDRemoval +from cephadm.services.osd import OSDRemoval try: from typing import Any, List @@ -341,7 +341,7 @@ class TestCephadm(object): ) ]) )) - @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0) + @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0) def test_remove_osds(self, cephadm_module): with self._with_host(cephadm_module, 'test'): c = cephadm_module.list_daemons(refresh=True)