from . import remotes
from . import utils
-from .nfs import NFSGanesha
-from .osd import RemoveUtil, OSDRemoval
+from .services.nfs import NFSGanesha
+from .services.osd import RemoveUtil, OSDRemoval
from .inventory import Inventory, SpecStore, HostCache
try:
+++ /dev/null
-import logging
-import rados
-
-from typing import Dict, Optional
-
-from ceph.deployment.service_spec import NFSServiceSpec
-
-import cephadm
-from orchestrator import OrchestratorError
-
-from . import utils
-
-logger = logging.getLogger(__name__)
-
-class NFSGanesha(object):
- def __init__(self,
- mgr,
- daemon_id,
- spec):
- # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None
- assert spec.service_id and daemon_id.startswith(spec.service_id)
- self.mgr = mgr
- self.daemon_id = daemon_id
- self.spec = spec
-
- def get_daemon_name(self):
- # type: () -> str
- return '%s.%s' % (self.spec.service_type, self.daemon_id)
-
- def get_rados_user(self):
- # type: () -> str
- return '%s.%s' % (self.spec.service_type, self.daemon_id)
-
- def get_keyring_entity(self):
- # type: () -> str
- return utils.name_to_config_section(self.get_rados_user())
-
- def get_or_create_keyring(self, entity=None):
- # type: (Optional[str]) -> str
- if not entity:
- entity = self.get_keyring_entity()
-
- logger.info('Create keyring: %s' % entity)
- ret, keyring, err = self.mgr.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': entity,
- })
-
- if ret != 0:
- raise OrchestratorError(
- 'Unable to create keyring %s: %s %s' \
- % (entity, ret, err))
- return keyring
-
- def update_keyring_caps(self, entity=None):
- # type: (Optional[str]) -> None
- if not entity:
- entity = self.get_keyring_entity()
-
- osd_caps='allow rw pool=%s' % (self.spec.pool)
- if self.spec.namespace:
- osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace)
-
- logger.info('Updating keyring caps: %s' % entity)
- ret, out, err = self.mgr.mon_command({
- 'prefix': 'auth caps',
- 'entity': entity,
- 'caps': ['mon', 'allow r',
- 'osd', osd_caps,
- 'mds', 'allow rw'],
- })
-
- if ret != 0:
- raise OrchestratorError(
- 'Unable to update keyring caps %s: %s %s' \
- % (entity, ret, err))
-
- def create_rados_config_obj(self, clobber=False):
- # type: (Optional[bool]) -> None
- obj = self.spec.rados_config_name()
-
- with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx:
- if self.spec.namespace:
- ioctx.set_namespace(self.spec.namespace)
-
- exists = True
- try:
- ioctx.stat(obj)
- except rados.ObjectNotFound as e:
- exists = False
-
- if exists and not clobber:
- # Assume an existing config
- logger.info('Rados config object exists: %s' % obj)
- else:
- # Create an empty config object
- logger.info('Creating rados config object: %s' % obj)
- ioctx.write_full(obj, ''.encode('utf-8'))
-
- def get_ganesha_conf(self):
- # type: () -> str
- return '''# generated by cephadm
-NFS_CORE_PARAM {{
- Enable_NLM = false;
- Enable_RQUOTA = false;
- Protocols = 4;
-}}
-
-CACHEINODE {{
- Dir_Chunk = 0;
- NParts = 1;
- Cache_Size = 1;
-}}
-
-EXPORT_DEFAULTS {{
- Attr_Expiration_Time = 0;
-}}
-
-NFSv4 {{
- Delegations = false;
- RecoveryBackend = 'rados_cluster';
- Minor_Versions = 1, 2;
-}}
-
-RADOS_KV {{
- UserId = "{user}";
- nodeid = "{nodeid}";
- pool = "{pool}";
- namespace = "{namespace}";
-}}
-
-RADOS_URLS {{
- UserId = "{user}";
- watch_url = "{url}";
-}}
-
-%url {url}
-'''.format(user=self.get_rados_user(),
- nodeid=self.get_daemon_name(),
- pool=self.spec.pool,
- namespace=self.spec.namespace if self.spec.namespace else '',
- url=self.spec.rados_config_location())
-
- def get_cephadm_config(self):
- # type: () -> Dict
- config = {'pool' : self.spec.pool} # type: Dict
- if self.spec.namespace:
- config['namespace'] = self.spec.namespace
- config['userid'] = self.get_rados_user()
- config['extra_args'] = ['-N', 'NIV_EVENT']
- config['files'] = {
- 'ganesha.conf' : self.get_ganesha_conf(),
- }
- logger.debug('Generated cephadm config-json: %s' % config)
- return config
+++ /dev/null
-import datetime
-import json
-import logging
-import time
-
-from typing import List, Dict, Any, Set, Union
-
-import orchestrator
-from orchestrator import OrchestratorError
-
-logger = logging.getLogger(__name__)
-
-
-class OSDRemoval(object):
- def __init__(self,
- osd_id: str,
- replace: bool,
- force: bool,
- nodename: str,
- fullname: str,
- start_at: datetime.datetime,
- pg_count: int):
- self.osd_id = osd_id
- self.replace = replace
- self.force = force
- self.nodename = nodename
- self.fullname = fullname
- self.started_at = start_at
- self.pg_count = pg_count
-
- # needed due to changing 'started_at' attr
- def __eq__(self, other):
- return self.osd_id == other.osd_id
-
- def __hash__(self):
- return hash(self.osd_id)
-
- def __repr__(self):
- return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
- ', fullname={}, started_at={}, pg_count={})').format(
- self.osd_id, self.replace, self.force, self.nodename,
- self.fullname, self.started_at, self.pg_count)
-
- @property
- def pg_count_str(self) -> str:
- return 'n/a' if self.pg_count < 0 else str(self.pg_count)
-
-
-class RemoveUtil(object):
- def __init__(self, mgr):
- self.mgr = mgr
- self.to_remove_osds: Set[OSDRemoval] = set()
- self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
-
- @property
- def report(self) -> Set[OSDRemoval]:
- return self.to_remove_osds.copy()
-
- def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
- self.to_remove_osds.update(osds)
-
- def _remove_osds_bg(self) -> None:
- """
- Performs actions in the _serve() loop to remove an OSD
- when criteria is met.
- """
- logger.debug(
- f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
- self._update_osd_removal_status()
- remove_osds: set = self.to_remove_osds.copy()
- for osd in remove_osds:
- if not osd.force:
- self.drain_osd(osd.osd_id)
- # skip criteria
- if not self.is_empty(osd.osd_id):
- logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
- continue
-
- if not self.ok_to_destroy([osd.osd_id]):
- logger.info(
- f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
- continue
-
- # abort criteria
- if not self.down_osd([osd.osd_id]):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(
- f"Could not set OSD <{osd.osd_id}> to 'down'")
-
- if osd.replace:
- if not self.destroy_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(
- f"Could not destroy OSD <{osd.osd_id}>")
- else:
- if not self.purge_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
- self.mgr._remove_daemon(osd.fullname, osd.nodename)
- logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
- logger.debug(f"Removing {osd.osd_id} from the queue.")
- self.to_remove_osds.remove(osd)
-
- def _update_osd_removal_status(self):
- """
- Generate a OSD report that can be printed to the CLI
- """
- logger.debug("Update OSD removal status")
- for osd in self.to_remove_osds:
- osd.pg_count = self.get_pg_count(str(osd.osd_id))
- logger.debug(f"OSD removal status: {self.to_remove_osds}")
-
- def drain_osd(self, osd_id: str) -> bool:
- """
- Uses `osd_support` module to schedule a drain operation of an OSD
- """
- cmd_args = {
- 'prefix': 'osd drain',
- 'osd_ids': [int(osd_id)]
- }
- return self._run_mon_cmd(cmd_args)
-
- def get_pg_count(self, osd_id: str) -> int:
- """ Queries for PG count of an OSD """
- self.mgr.log.debug("Querying for drain status")
- ret, out, err = self.mgr.mon_command({
- 'prefix': 'osd drain status',
- })
- if ret != 0:
- self.mgr.log.error(f"Calling osd drain status failed with {err}")
- raise OrchestratorError("Could not query `osd drain status`")
- out = json.loads(out)
- for o in out:
- if str(o.get('osd_id', '')) == str(osd_id):
- return int(o.get('pgs', -1))
- return -1
-
- def is_empty(self, osd_id: str) -> bool:
- """ Checks if an OSD is empty """
- return self.get_pg_count(osd_id) == 0
-
- def ok_to_destroy(self, osd_ids: List[int]) -> bool:
- """ Queries the safe-to-destroy flag for OSDs """
- cmd_args = {'prefix': 'osd safe-to-destroy',
- 'ids': osd_ids}
- return self._run_mon_cmd(cmd_args)
-
- def destroy_osd(self, osd_id: int) -> bool:
- """ Destroys an OSD (forcefully) """
- cmd_args = {'prefix': 'osd destroy-actual',
- 'id': int(osd_id),
- 'yes_i_really_mean_it': True}
- return self._run_mon_cmd(cmd_args)
-
- def down_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `out` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd down',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
- def purge_osd(self, osd_id: int) -> bool:
- """ Purges an OSD from the cluster (forcefully) """
- cmd_args = {
- 'prefix': 'osd purge-actual',
- 'id': int(osd_id),
- 'yes_i_really_mean_it': True
- }
- return self._run_mon_cmd(cmd_args)
-
- def out_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `down` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd out',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
- def _run_mon_cmd(self, cmd_args: dict) -> bool:
- """
- Generic command to run mon_command and evaluate/log the results
- """
- ret, out, err = self.mgr.mon_command(cmd_args)
- if ret != 0:
- self.mgr.log.debug(f"ran {cmd_args} with mon_command")
- self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
- return False
- self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
- return True
--- /dev/null
+import logging
+import rados
+
+from typing import Dict, Optional
+
+from ceph.deployment.service_spec import NFSServiceSpec
+
+import cephadm
+from orchestrator import OrchestratorError
+
+from .. import utils
+
+logger = logging.getLogger(__name__)
+
+class NFSGanesha(object):
+ def __init__(self,
+ mgr,
+ daemon_id,
+ spec):
+ # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None
+ assert spec.service_id and daemon_id.startswith(spec.service_id)
+ self.mgr = mgr
+ self.daemon_id = daemon_id
+ self.spec = spec
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.spec.service_type, self.daemon_id)
+
+ def get_rados_user(self):
+ # type: () -> str
+ return '%s.%s' % (self.spec.service_type, self.daemon_id)
+
+ def get_keyring_entity(self):
+ # type: () -> str
+ return utils.name_to_config_section(self.get_rados_user())
+
+ def get_or_create_keyring(self, entity=None):
+ # type: (Optional[str]) -> str
+ if not entity:
+ entity = self.get_keyring_entity()
+
+ logger.info('Create keyring: %s' % entity)
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': entity,
+ })
+
+ if ret != 0:
+ raise OrchestratorError(
+ 'Unable to create keyring %s: %s %s' \
+ % (entity, ret, err))
+ return keyring
+
+ def update_keyring_caps(self, entity=None):
+ # type: (Optional[str]) -> None
+ if not entity:
+ entity = self.get_keyring_entity()
+
+ osd_caps='allow rw pool=%s' % (self.spec.pool)
+ if self.spec.namespace:
+ osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace)
+
+ logger.info('Updating keyring caps: %s' % entity)
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': entity,
+ 'caps': ['mon', 'allow r',
+ 'osd', osd_caps,
+ 'mds', 'allow rw'],
+ })
+
+ if ret != 0:
+ raise OrchestratorError(
+ 'Unable to update keyring caps %s: %s %s' \
+ % (entity, ret, err))
+
+ def create_rados_config_obj(self, clobber=False):
+ # type: (Optional[bool]) -> None
+ obj = self.spec.rados_config_name()
+
+ with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx:
+ if self.spec.namespace:
+ ioctx.set_namespace(self.spec.namespace)
+
+ exists = True
+ try:
+ ioctx.stat(obj)
+ except rados.ObjectNotFound as e:
+ exists = False
+
+ if exists and not clobber:
+ # Assume an existing config
+ logger.info('Rados config object exists: %s' % obj)
+ else:
+ # Create an empty config object
+ logger.info('Creating rados config object: %s' % obj)
+ ioctx.write_full(obj, ''.encode('utf-8'))
+
+ def get_ganesha_conf(self):
+ # type: () -> str
+ return '''# generated by cephadm
+NFS_CORE_PARAM {{
+ Enable_NLM = false;
+ Enable_RQUOTA = false;
+ Protocols = 4;
+}}
+
+CACHEINODE {{
+ Dir_Chunk = 0;
+ NParts = 1;
+ Cache_Size = 1;
+}}
+
+EXPORT_DEFAULTS {{
+ Attr_Expiration_Time = 0;
+}}
+
+NFSv4 {{
+ Delegations = false;
+ RecoveryBackend = 'rados_cluster';
+ Minor_Versions = 1, 2;
+}}
+
+RADOS_KV {{
+ UserId = "{user}";
+ nodeid = "{nodeid}";
+ pool = "{pool}";
+ namespace = "{namespace}";
+}}
+
+RADOS_URLS {{
+ UserId = "{user}";
+ watch_url = "{url}";
+}}
+
+%url {url}
+'''.format(user=self.get_rados_user(),
+ nodeid=self.get_daemon_name(),
+ pool=self.spec.pool,
+ namespace=self.spec.namespace if self.spec.namespace else '',
+ url=self.spec.rados_config_location())
+
+ def get_cephadm_config(self):
+ # type: () -> Dict
+ config = {'pool' : self.spec.pool} # type: Dict
+ if self.spec.namespace:
+ config['namespace'] = self.spec.namespace
+ config['userid'] = self.get_rados_user()
+ config['extra_args'] = ['-N', 'NIV_EVENT']
+ config['files'] = {
+ 'ganesha.conf' : self.get_ganesha_conf(),
+ }
+ logger.debug('Generated cephadm config-json: %s' % config)
+ return config
--- /dev/null
+import datetime
+import json
+import logging
+import time
+
+from typing import List, Dict, Any, Set, Union
+
+import orchestrator
+from orchestrator import OrchestratorError
+
+logger = logging.getLogger(__name__)
+
+
+class OSDRemoval(object):
+ def __init__(self,
+ osd_id: str,
+ replace: bool,
+ force: bool,
+ nodename: str,
+ fullname: str,
+ start_at: datetime.datetime,
+ pg_count: int):
+ self.osd_id = osd_id
+ self.replace = replace
+ self.force = force
+ self.nodename = nodename
+ self.fullname = fullname
+ self.started_at = start_at
+ self.pg_count = pg_count
+
+ # needed due to changing 'started_at' attr
+ def __eq__(self, other):
+ return self.osd_id == other.osd_id
+
+ def __hash__(self):
+ return hash(self.osd_id)
+
+ def __repr__(self):
+ return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
+ ', fullname={}, started_at={}, pg_count={})').format(
+ self.osd_id, self.replace, self.force, self.nodename,
+ self.fullname, self.started_at, self.pg_count)
+
+ @property
+ def pg_count_str(self) -> str:
+ return 'n/a' if self.pg_count < 0 else str(self.pg_count)
+
+
+class RemoveUtil(object):
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.to_remove_osds: Set[OSDRemoval] = set()
+ self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
+
+ @property
+ def report(self) -> Set[OSDRemoval]:
+ return self.to_remove_osds.copy()
+
+ def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
+ self.to_remove_osds.update(osds)
+
+ def _remove_osds_bg(self) -> None:
+ """
+ Performs actions in the _serve() loop to remove an OSD
+ when criteria is met.
+ """
+ logger.debug(
+ f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+ self._update_osd_removal_status()
+ remove_osds: set = self.to_remove_osds.copy()
+ for osd in remove_osds:
+ if not osd.force:
+ self.drain_osd(osd.osd_id)
+ # skip criteria
+ if not self.is_empty(osd.osd_id):
+ logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ continue
+
+ if not self.ok_to_destroy([osd.osd_id]):
+ logger.info(
+ f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ continue
+
+ # abort criteria
+ if not self.down_osd([osd.osd_id]):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+ if osd.replace:
+ if not self.destroy_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not destroy OSD <{osd.osd_id}>")
+ else:
+ if not self.purge_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+ self.mgr._remove_daemon(osd.fullname, osd.nodename)
+ logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
+ logger.debug(f"Removing {osd.osd_id} from the queue.")
+ self.to_remove_osds.remove(osd)
+
+ def _update_osd_removal_status(self):
+ """
+ Generate a OSD report that can be printed to the CLI
+ """
+ logger.debug("Update OSD removal status")
+ for osd in self.to_remove_osds:
+ osd.pg_count = self.get_pg_count(str(osd.osd_id))
+ logger.debug(f"OSD removal status: {self.to_remove_osds}")
+
+ def drain_osd(self, osd_id: str) -> bool:
+ """
+ Uses `osd_support` module to schedule a drain operation of an OSD
+ """
+ cmd_args = {
+ 'prefix': 'osd drain',
+ 'osd_ids': [int(osd_id)]
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def get_pg_count(self, osd_id: str) -> int:
+ """ Queries for PG count of an OSD """
+ self.mgr.log.debug("Querying for drain status")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'osd drain status',
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Calling osd drain status failed with {err}")
+ raise OrchestratorError("Could not query `osd drain status`")
+ out = json.loads(out)
+ for o in out:
+ if str(o.get('osd_id', '')) == str(osd_id):
+ return int(o.get('pgs', -1))
+ return -1
+
+ def is_empty(self, osd_id: str) -> bool:
+ """ Checks if an OSD is empty """
+ return self.get_pg_count(osd_id) == 0
+
+ def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+ """ Queries the safe-to-destroy flag for OSDs """
+ cmd_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': osd_ids}
+ return self._run_mon_cmd(cmd_args)
+
+ def destroy_osd(self, osd_id: int) -> bool:
+ """ Destroys an OSD (forcefully) """
+ cmd_args = {'prefix': 'osd destroy-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True}
+ return self._run_mon_cmd(cmd_args)
+
+ def down_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `out` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd down',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def purge_osd(self, osd_id: int) -> bool:
+ """ Purges an OSD from the cluster (forcefully) """
+ cmd_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def out_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `down` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd out',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def _run_mon_cmd(self, cmd_args: dict) -> bool:
+ """
+ Generic command to run mon_command and evaluate/log the results
+ """
+ ret, out, err = self.mgr.mon_command(cmd_args)
+ if ret != 0:
+ self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+ self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ return False
+ self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+ return True
import pytest
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.osd import OSDRemoval
+from cephadm.services.osd import OSDRemoval
try:
from typing import Any, List
)
])
))
- @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+ @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
def test_remove_osds(self, cephadm_module):
with self._with_host(cephadm_module, 'test'):
c = cephadm_module.list_daemons(refresh=True)