from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from .utils import resolve_ip, SpecialHostLabels
-from .migrations import (
- queue_migrate_nfs_spec,
- queue_migrate_rgw_spec,
- queue_migrate_nvmeof_spec,
-)
+from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec
if TYPE_CHECKING:
from .module import CephadmOrchestrator
):
queue_migrate_rgw_spec(self.mgr, j)
- if (
- (self.mgr.migration_current or 0) < 8
- and j['spec'].get('service_type') == 'nvmeof'
- ):
- queue_migrate_nvmeof_spec(self.mgr, j)
-
spec = ServiceSpec.from_json(j['spec'])
created = str_to_datetime(cast(str, j['created']))
self._specs[service_name] = spec
import logging
from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any, List
-from ceph.deployment.service_spec import (
- PlacementSpec,
- ServiceSpec,
- HostPlacementSpec,
- RGWSpec,
- NvmeofServiceSpec,
-)
+from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec, RGWSpec
from cephadm.schedule import HostAssignment
from cephadm.utils import SpecialHostLabels
import rados
if TYPE_CHECKING:
from .module import CephadmOrchestrator
-LAST_MIGRATION = 8
+LAST_MIGRATION = 7
logger = logging.getLogger(__name__)
r = mgr.get_store('rgw_migration_queue')
self.rgw_migration_queue = json.loads(r) if r else []
- n = mgr.get_store('nvmeof_migration_queue')
- self.nvmeof_migration_queue = json.loads(n) if n else []
-
# for some migrations, we don't need to do anything except for
# incrementing migration_current.
# let's try to shortcut things here.
if self.migrate_6_7():
self.set(7)
- if self.mgr.migration_current == 7:
- if self.migrate_7_8():
- self.set(8)
-
def migrate_0_1(self) -> bool:
"""
Migration 0 -> 1
# was set to true. Therefore we have nothing to migrate for those daemons
return True
- def migrate_nvmeof_spec(self, spec: Dict[Any, Any], migration_counter: int) -> Optional[NvmeofServiceSpec]:
- """ Add value for group parameter to nvmeof spec """
- new_spec = spec.copy()
- # Note: each spec must have a different group name so we append
- # the value of a counter to the end
- new_spec['spec']['group'] = f'default{str(migration_counter + 1)}'
- return NvmeofServiceSpec.from_json(new_spec)
-
- def nvmeof_spec_needs_migration(self, spec: Dict[Any, Any]) -> bool:
- spec = spec.get('spec', None)
- return (bool(spec) and spec.get('group', None) is None)
-
- def migrate_7_8(self) -> bool:
- """
- Add a default value for the "group" parameter to nvmeof specs that don't have one
- """
- self.mgr.log.debug(f'Starting nvmeof migration (queue length is {len(self.nvmeof_migration_queue)})')
- migrated_spec_counter = 0
- for s in self.nvmeof_migration_queue:
- spec = s['spec']
- if self.nvmeof_spec_needs_migration(spec):
- nvmeof_spec = self.migrate_nvmeof_spec(spec, migrated_spec_counter)
- if nvmeof_spec is not None:
- logger.info(f"Added group 'default{migrated_spec_counter + 1}' to nvmeof spec {spec}")
- self.mgr.spec_store.save(nvmeof_spec)
- migrated_spec_counter += 1
- else:
- logger.info(f"No Migration is needed for nvmeof spec: {spec}")
- self.nvmeof_migration_queue = []
- return True
-
-
-def queue_migrate_nvmeof_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
- """
- group has become a required field for nvmeof specs and has been added
- to spec validation. We need to assign a default group to nvmeof specs
- that don't have one
- """
- service_id = spec_dict['spec']['service_id']
- queued = mgr.get_store('nvmeof_migration_queue') or '[]'
- ls = json.loads(queued)
- ls.append(spec_dict)
- mgr.set_store('nvmeof_migration_queue', json.dumps(ls))
- mgr.log.info(f'Queued nvmeof.{service_id} for migration')
-
def queue_migrate_rgw_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
"""
assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host2')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host2')
-
-
-@pytest.mark.parametrize(
- "nvmeof_spec_store_entries, should_migrate_specs, expected_groups",
- [
- (
- [
- {'spec': {
- 'service_type': 'nvmeof',
- 'service_name': 'nvmeof.foo',
- 'service_id': 'foo',
- 'placement': {
- 'hosts': ['host1']
- },
- 'spec': {
- 'pool': 'foo',
- },
- },
- 'created': datetime_to_str(datetime_now())},
- {'spec': {
- 'service_type': 'nvmeof',
- 'service_name': 'nvmeof.bar',
- 'service_id': 'bar',
- 'placement': {
- 'hosts': ['host2']
- },
- 'spec': {
- 'pool': 'bar',
- },
- },
- 'created': datetime_to_str(datetime_now())}
- ],
- [True, True], ['default1', 'default2']),
- (
- [
- {'spec':
- {
- 'service_type': 'nvmeof',
- 'service_name': 'nvmeof.foo',
- 'service_id': 'foo',
- 'placement': {
- 'hosts': ['host1']
- },
- 'spec': {
- 'pool': 'foo',
- },
- },
- 'created': datetime_to_str(datetime_now())},
- {'spec':
- {
- 'service_type': 'nvmeof',
- 'service_name': 'nvmeof.bar',
- 'service_id': 'bar',
- 'placement': {
- 'hosts': ['host2']
- },
- 'spec': {
- 'pool': 'bar',
- 'group': 'bar'
- },
- },
- 'created': datetime_to_str(datetime_now())},
- {'spec':
- {
- 'service_type': 'nvmeof',
- 'service_name': 'nvmeof.testing_testing',
- 'service_id': 'testing_testing',
- 'placement': {
- 'label': 'baz'
- },
- 'spec': {
- 'pool': 'baz',
- },
- },
- 'created': datetime_to_str(datetime_now())}
- ], [True, False, True], ['default1', 'bar', 'default2']
- ),
- ]
-)
-@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
-def test_migrate_nvmeof_spec(
- cephadm_module: CephadmOrchestrator,
- nvmeof_spec_store_entries,
- should_migrate_specs,
- expected_groups
-):
- with with_host(cephadm_module, 'host1'):
- for spec in nvmeof_spec_store_entries:
- cephadm_module.set_store(
- SPEC_STORE_PREFIX + spec['spec']['service_name'],
- json.dumps(spec, sort_keys=True),
- )
-
- # make sure nvmeof_migration_queue is populated accordingly
- cephadm_module.migration_current = 1
- cephadm_module.spec_store.load()
- ls = json.loads(cephadm_module.get_store('nvmeof_migration_queue'))
- assert all([s['spec']['service_type'] == 'nvmeof' for s in ls])
- # shortcut nvmeof_migration_queue loading by directly assigning
- # ls output to nvmeof_migration_queue list
- cephadm_module.migration.nvmeof_migration_queue = ls
-
- # skip other migrations and go directly to 7_8 migration (nvmeof spec)
- cephadm_module.migration_current = 7
- cephadm_module.migration.migrate()
- assert cephadm_module.migration_current == LAST_MIGRATION
-
- print(cephadm_module.spec_store.all_specs)
-
- for i in range(len(nvmeof_spec_store_entries)):
- nvmeof_spec_store_entry = nvmeof_spec_store_entries[i]
- should_migrate = should_migrate_specs[i]
- expected_group = expected_groups[i]
-
- service_name = nvmeof_spec_store_entry['spec']['service_name']
- service_id = nvmeof_spec_store_entry['spec']['service_id']
- placement = nvmeof_spec_store_entry['spec']['placement']
- pool = nvmeof_spec_store_entry['spec']['spec']['pool']
-
- if should_migrate:
- assert service_name in cephadm_module.spec_store.all_specs
- nvmeof_spec = cephadm_module.spec_store.all_specs[service_name]
- nvmeof_spec_json = nvmeof_spec.to_json()
- assert nvmeof_spec_json['service_type'] == 'nvmeof'
- assert nvmeof_spec_json['service_id'] == service_id
- assert nvmeof_spec_json['service_name'] == service_name
- assert nvmeof_spec_json['placement'] == placement
- assert nvmeof_spec_json['spec']['pool'] == pool
- # make sure spec has the "group" parameter set to "default<counter>"
- assert nvmeof_spec_json['spec']['group'] == expected_group
- else:
- nvmeof_spec = cephadm_module.spec_store.all_specs[service_name]
- nvmeof_spec_json = nvmeof_spec.to_json()
- assert nvmeof_spec_json['spec']['pool'] == pool
- # make sure spec has the "group" parameter still set to its old value
- assert nvmeof_spec_json['spec']['group'] == expected_group