r[str(osd_id)] = o.get('uuid', '')
return r
- def resolve_hosts_for_osdspecs(self,
- specs: Optional[List[DriveGroupSpec]] = None,
- service_name: Optional[str] = None
- ) -> List[str]:
- osdspecs = []
- if service_name:
- self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
- osdspecs = self.spec_store.find(service_name=service_name)
- self.log.debug(f"Found OSDSpecs: {osdspecs}")
- if specs:
- osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
- if not service_name and not specs:
- # if neither parameters are fulfilled, search for all available osdspecs
- osdspecs = self.spec_store.find(service_name='osd')
- self.log.debug(f"Found OSDSpecs: {osdspecs}")
- if not osdspecs:
- self.log.debug("No OSDSpecs found")
- return []
- return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
-
- def resolve_osdspecs_for_host(self, host):
- matching_specs = []
- self.log.debug(f"Finding OSDSpecs for host: <{host}>")
- for spec in self.spec_store.find('osd'):
- if host in spec.placement.filter_matching_hosts(self._get_hosts):
- self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
- matching_specs.append(spec)
- return matching_specs
-
def _trigger_preview_refresh(self,
specs: Optional[List[DriveGroupSpec]] = None,
- service_name: Optional[str] = None):
- refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+ service_name: Optional[str] = None,
+ ) -> None:
+ # Only trigger a refresh when a spec has changed
+ trigger_specs = []
+ if specs:
+ for spec in specs:
+ preview_spec = self.spec_store.spec_preview.get(spec.service_name())
+ # the to-be-preview spec != the actual spec, this means we need to
+ # trigger a refresh, if the spec has been removed (==None) we need to
+ # refresh as well.
+ if not preview_spec or spec != preview_spec:
+ trigger_specs.append(spec)
+ if service_name:
+ trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
+ if not any(trigger_specs):
+ return None
+
+ refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
for host in refresh_hosts:
self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
self.cache.osdspec_previews_refresh_queue.append(host)
- @trivial_completion
- def apply_drivegroups(self, specs: List[DriveGroupSpec]):
- self._trigger_preview_refresh(specs=specs)
- return [self._apply(spec) for spec in specs]
-
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
return self.osd_service.create_from_spec(drive_group)
- @trivial_completion
- def preview_osdspecs(self,
- osdspec_name: Optional[str] = None,
- osdspecs: Optional[List[DriveGroupSpec]] = None
- ):
- matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+ def _preview_osdspecs(self,
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ):
+ if not osdspecs:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
if not matching_hosts:
return {'n/a': [{'error': True,
'message': 'No OSDSpec or matching hosts found.'}]}
# Report 'pending' when any of the matching hosts is still loading previews (flag is True)
return {'n/a': [{'error': True,
'message': 'Preview data is being generated.. '
- 'Please try again in a bit.'}]}
- # drop all keys that are not in search_hosts and return preview struct
- return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
+ 'Please re-run this command in a bit.'}]}
+ # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
+ previews_for_specs = {}
+ for host, raw_reports in self.cache.osdspec_previews.items():
+ if host not in matching_hosts:
+ continue
+ osd_reports = []
+ for osd_report in raw_reports:
+ if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
+ osd_reports.append(osd_report)
+ previews_for_specs.update({host: osd_reports})
+ return previews_for_specs
def _calc_daemon_deps(self, daemon_type, daemon_id):
need = {
daemon_type = spec.service_type
service_name = spec.service_name()
if spec.unmanaged:
- self.log.debug('Skipping unmanaged service %s spec' % service_name)
+ self.log.debug('Skipping unmanaged service %s' % service_name)
+ return False
+ if spec.preview_only:
+ self.log.debug('Skipping preview_only service %s' % service_name)
return False
self.log.debug('Applying service %s spec' % service_name)
if spec.service_type == 'host':
return self._add_host(cast(HostSpec, spec))
+ if spec.service_type == 'osd':
+ # _trigger preview refresh needs to be smart and
+ # should only refresh if a change has been detected
+ self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
+
return self._apply_service_spec(cast(ServiceSpec, spec))
+ def _plan(self, spec: ServiceSpec):
+ if spec.service_type == 'osd':
+ return {'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
+
+ ha = HostAssignment(
+ spec=spec,
+ get_hosts_func=self._get_hosts,
+ get_daemons_func=self.cache.get_daemons_by_service,
+ )
+ ha.validate()
+ hosts = ha.place()
+
+ add_daemon_hosts = ha.add_daemon_hosts(hosts)
+ remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
+
+ return {
+ 'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'add': [hs.hostname for hs in add_daemon_hosts],
+ 'remove': [d.hostname for d in remove_daemon_hosts]
+ }
+
+ @trivial_completion
+ def plan(self, specs: List[GenericSpec]):
+ results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
+ 'to the current inventory setup. If any on these conditions changes, the \n'
+ 'preview will be invalid. Please make sure to have a minimal \n'
+ 'timeframe between planning and applying the specs.'}]
+ if any([spec.service_type == 'host' for spec in specs]):
+ return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
+ for spec in specs:
+ results.append(self._plan(cast(ServiceSpec, spec)))
+ return results
+
def _apply_service_spec(self, spec: ServiceSpec) -> str:
if spec.placement.is_empty():
# fill in default placement
import datetime
import errno
import json
-from typing import List, Set, Optional, Iterator, cast
+from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union
import re
import ast
return yaml.dump(to_yaml(copy), default_flow_style=False)
+def generate_preview_tables(data):
+ error = [x.get('error') for x in data if x.get('error')]
+ if error:
+ return json.dumps(error)
+ warning = [x.get('warning') for x in data if x.get('warning')]
+ osd_table = preview_table_osd(data)
+ service_table = preview_table_services(data)
+ tables = f"""
+{''.join(warning)}
+
+####################
+SERVICESPEC PREVIEWS
+####################
+{service_table}
+
+################
+OSDSPEC PREVIEWS
+################
+{osd_table}
+"""
+ return tables
+
+
+def preview_table_osd(data):
+ table = PrettyTable(header_style='upper', title='OSDSPEC PREVIEWS', border=True)
+ table.field_names = "service name host data db wal".split()
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for osd_data in data:
+ if osd_data.get('service_type') != 'osd':
+ continue
+ for host, specs in osd_data.get('data').items():
+ for spec in specs:
+ if spec.get('error'):
+ return spec.get('message')
+ dg_name = spec.get('osdspec')
+ for osd in spec.get('data', {}).get('osds', []):
+ db_path = '-'
+ wal_path = '-'
+ block_db = osd.get('block.db', {}).get('path')
+ block_wal = osd.get('block.wal', {}).get('path')
+ block_data = osd.get('data', {}).get('path', '')
+ if not block_data:
+ continue
+ if block_db:
+ db_path = spec.get('data', {}).get('vg', {}).get('devices', [])
+ if block_wal:
+ wal_path = spec.get('data', {}).get('wal_vg', {}).get('devices', [])
+ table.add_row(('osd', dg_name, host, block_data, db_path, wal_path))
+ return table.get_string()
+
+
+def preview_table_services(data):
+ table = PrettyTable(header_style='upper', title="SERVICESPEC PREVIEW", border=True)
+ table.field_names = 'SERVICE NAME ADD_TO REMOVE_FROM'.split()
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 2
+ for item in data:
+ if item.get('warning'):
+ continue
+ if item.get('service_type') != 'osd':
+ table.add_row((item.get('service_type'), item.get('service_name'),
+ " ".join(item.get('add')), " ".join(item.get('remove'))))
+ return table.get_string()
+
+
class OrchestratorCli(OrchestratorClientMixin, MgrModule,
metaclass=CLICommandMeta):
MODULE_OPTIONS = [
table = PrettyTable(
['NAME', 'RUNNING', 'REFRESHED', 'AGE',
'PLACEMENT',
- 'IMAGE NAME', 'IMAGE ID',
+ 'IMAGE NAME', 'IMAGE ID'
],
border=False)
table.align['NAME'] = 'l'
@_cli_write_command(
'orch apply osd',
'name=all_available_devices,type=CephBool,req=false '
+ 'name=dry_run,type=CephBool,req=false '
'name=unmanaged,type=CephBool,req=false '
"name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
'Create OSD daemon(s) using a drive group spec')
def _apply_osd(self,
all_available_devices: bool = False,
- format: Optional[str] = 'plain',
+ format: str = 'plain',
unmanaged=None,
+ dry_run=None,
inbuf: Optional[str] = None) -> HandleCommandResult:
"""Apply DriveGroupSpecs to create OSDs"""
usage = """
usage:
- ceph orch apply osd -i <json_file/yaml_file>
- ceph orch apply osd --all-available-devices
- ceph orch apply osd --all-available-devices --unmanaged=true|false
+ ceph orch apply osd -i <json_file/yaml_file> [--dry-run]
+ ceph orch apply osd --all-available-devices [--dry-run] [--unmanaged]
Restrictions:
return HandleCommandResult(-errno.EINVAL, stderr=usage)
try:
drivegroups = yaml.safe_load_all(inbuf)
- dg_specs = [DriveGroupSpec.from_json(dg) for dg in drivegroups]
- # This acts weird when abstracted to a function
- completion = self.apply_drivegroups(dg_specs)
+
+ dg_specs = []
+ for dg in drivegroups:
+ spec = DriveGroupSpec.from_json(dg)
+ if dry_run:
+ spec.preview_only = True
+ dg_specs.append(spec)
+
+ completion = self.apply(dg_specs)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan(dg_specs)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_osd(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
+
except ValueError as e:
msg = 'Failed to read JSON/YAML input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
service_id='all-available-devices',
placement=PlacementSpec(host_pattern='*'),
data_devices=DeviceSelection(all=True),
- unmanaged=unmanaged
+ unmanaged=unmanaged,
+ preview_only=dry_run
)
]
# This acts weird when abstracted to a function
- completion = self.apply_drivegroups(dg_specs)
+ completion = self.apply(dg_specs)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan(dg_specs)
+ self._orchestrator_wait([completion])
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_osd(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
return HandleCommandResult(-errno.EINVAL, stderr=usage)
@_cli_write_command(
'orch apply',
'name=service_type,type=CephChoices,strings=mon|mgr|rbd-mirror|crash|alertmanager|grafana|node-exporter|prometheus,req=false '
+ 'name=dry_run,type=CephBool,req=false '
'name=placement,type=CephString,req=false '
+ 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
'name=unmanaged,type=CephBool,req=false',
'Update the size or placement for a service or apply a large yaml spec')
def _apply_misc(self,
service_type: Optional[str] = None,
placement: Optional[str] = None,
unmanaged: bool = False,
+ dry_run: bool = False,
+ format: str = 'plain',
inbuf: Optional[str] = None) -> HandleCommandResult:
usage = """Usage:
- ceph orch apply -i <yaml spec>
+ ceph orch apply -i <yaml spec> [--dry-run]
ceph orch apply <service_type> <placement> [--unmanaged]
"""
if inbuf:
if service_type or placement or unmanaged:
raise OrchestratorValidationError(usage)
content: Iterator = yaml.safe_load_all(inbuf)
- specs: List[GenericSpec] = [json_to_generic_spec(s) for s in content]
-
+ specs: List[Union[ServiceSpec, HostSpec]] = []
+ for s in content:
+ spec = json_to_generic_spec(s)
+ if dry_run and not isinstance(spec, HostSpec):
+ spec.preview_only = dry_run
+ specs.append(spec)
else:
- placmentspec = PlacementSpec.from_string(placement)
- if not service_type:
- raise OrchestratorValidationError(f'Error: Empty service_type\n{usage}')
+ placementspec = PlacementSpec.from_string(placement)
+ assert service_type
+ specs = [ServiceSpec(service_type, placement=placementspec, unmanaged=unmanaged, preview_only=dry_run)]
- specs = [ServiceSpec(service_type, placement=placmentspec, unmanaged=unmanaged)]
-
completion = self.apply(specs)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan(specs)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = generate_preview_tables(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch apply mds',
'name=fs_name,type=CephString '
+ 'name=dry_run,type=CephBool,req=false '
'name=placement,type=CephString,req=false '
- 'name=unmanaged,type=CephBool,req=false',
+ 'name=unmanaged,type=CephBool,req=false '
+ 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false',
'Update the number of MDS instances for the given fs_name')
def _apply_mds(self,
fs_name: str,
placement: Optional[str] = None,
+ dry_run: bool = False,
+ format: str = 'plain',
unmanaged: bool = False,
inbuf: Optional[str] = None) -> HandleCommandResult:
if inbuf:
service_type='mds',
service_id=fs_name,
placement=PlacementSpec.from_string(placement),
- unmanaged=unmanaged)
+ unmanaged=unmanaged,
+ preview_only=dry_run)
completion = self.apply_mds(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan([spec])
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_services(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch apply rgw',
'name=port,type=CephInt,req=false '
'name=ssl,type=CephBool,req=false '
'name=placement,type=CephString,req=false '
+ 'name=dry_run,type=CephBool,req=false '
+ 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
'name=unmanaged,type=CephBool,req=false',
'Update the number of RGW instances for the given zone')
def _apply_rgw(self,
port: Optional[int] = None,
ssl: bool = False,
placement: Optional[str] = None,
+ dry_run: bool = False,
+ format: str = 'plain',
unmanaged: bool = False,
inbuf: Optional[str] = None) -> HandleCommandResult:
if inbuf:
ssl=ssl,
placement=PlacementSpec.from_string(placement),
unmanaged=unmanaged,
+ preview_only=dry_run
)
completion = self.apply_rgw(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan([spec])
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_services(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch apply nfs',
'name=pool,type=CephString '
'name=namespace,type=CephString,req=false '
'name=placement,type=CephString,req=false '
+ 'name=dry_run,type=CephBool,req=false '
+ 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
'name=unmanaged,type=CephBool,req=false',
'Scale an NFS service')
def _apply_nfs(self,
pool: str,
namespace: Optional[str] = None,
placement: Optional[str] = None,
+ format: str = 'plain',
+ dry_run: bool = False,
unmanaged: bool = False,
inbuf: Optional[str] = None) -> HandleCommandResult:
if inbuf:
namespace=namespace,
placement=PlacementSpec.from_string(placement),
unmanaged=unmanaged,
+ preview_only=dry_run
)
completion = self.apply_nfs(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan([spec])
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_services(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch apply iscsi',
'name=api_password,type=CephString '
'name=trusted_ip_list,type=CephString,req=false '
'name=placement,type=CephString,req=false '
+ 'name=dry_run,type=CephBool,req=false '
+ 'name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false '
'name=unmanaged,type=CephBool,req=false',
'Scale an iSCSI service')
def _apply_iscsi(self,
trusted_ip_list: Optional[str] = None,
placement: Optional[str] = None,
unmanaged: bool = False,
+ dry_run: bool = False,
+ format: str = 'plain',
inbuf: Optional[str] = None) -> HandleCommandResult:
if inbuf:
raise OrchestratorValidationError('unrecognized command -i; -h or --help for usage')
trusted_ip_list=trusted_ip_list,
placement=PlacementSpec.from_string(placement),
unmanaged=unmanaged,
+ preview_only=dry_run
)
completion = self.apply_iscsi(spec)
self._orchestrator_wait([completion])
raise_if_exception(completion)
- return HandleCommandResult(stdout=completion.result_str())
+ out = completion.result_str()
+ if dry_run:
+ completion = self.plan([spec])
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ data = completion.result
+ if format == 'plain':
+ out = preview_table_services(data)
+ else:
+ out = to_format(data, format, many=True, cls=None)
+ return HandleCommandResult(stdout=out)
@_cli_write_command(
'orch set backend',
placement: Optional[PlacementSpec] = None,
count: Optional[int] = None,
unmanaged: bool = False,
+ preview_only: bool = False,
):
self.placement = PlacementSpec() if placement is None else placement # type: PlacementSpec
self.service_type = service_type
self.service_id = service_id
self.unmanaged = unmanaged
+ self.preview_only = preview_only
@classmethod
@handle_type_error
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.__dict__)
+ def __eq__(self, other):
+ return (self.__class__ == other.__class__
+ and
+ self.__dict__ == other.__dict__)
+
def one_line_str(self):
return '<{} for service_name={}>'.format(self.__class__.__name__, self.service_name())
namespace: Optional[str] = None,
placement: Optional[PlacementSpec] = None,
unmanaged: bool = False,
+ preview_only: bool = False
):
assert service_type == 'nfs'
super(NFSServiceSpec, self).__init__(
'nfs', service_id=service_id,
- placement=placement, unmanaged=unmanaged)
+ placement=placement, unmanaged=unmanaged, preview_only=preview_only)
#: RADOS pool where NFS client recovery data is stored.
self.pool = pool
#: RADOS namespace where NFS client recovery data is stored in the pool.
self.namespace = namespace
+ self.preview_only = preview_only
+
def validate(self):
super(NFSServiceSpec, self).validate()
rgw_frontend_ssl_key: Optional[List[str]] = None,
unmanaged: bool = False,
ssl: bool = False,
+ preview_only: bool = False,
):
assert service_type == 'rgw', service_type
if service_id:
service_id = '%s.%s' % (rgw_realm, rgw_zone)
super(RGWSpec, self).__init__(
'rgw', service_id=service_id,
- placement=placement, unmanaged=unmanaged)
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only)
self.rgw_realm = rgw_realm
self.rgw_zone = rgw_zone
self.rgw_frontend_ssl_certificate = rgw_frontend_ssl_certificate
self.rgw_frontend_ssl_key = rgw_frontend_ssl_key
self.ssl = ssl
+ self.preview_only = preview_only
def get_port(self):
if self.rgw_frontend_port:
ssl_cert: Optional[str] = None,
ssl_key: Optional[str] = None,
placement: Optional[PlacementSpec] = None,
- unmanaged: bool = False
+ unmanaged: bool = False,
+ preview_only: bool = False
):
assert service_type == 'iscsi'
super(IscsiServiceSpec, self).__init__('iscsi', service_id=service_id,
- placement=placement, unmanaged=unmanaged)
+ placement=placement, unmanaged=unmanaged,
+ preview_only=preview_only)
#: RADOS pool where ceph-iscsi config data is stored.
self.pool = pool
self.api_secure = api_secure
self.ssl_cert = ssl_cert
self.ssl_key = ssl_key
+ self.preview_only = preview_only
if not self.api_secure and self.ssl_cert and self.ssl_key:
self.api_secure = True