from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from .utils import resolve_ip, SpecialHostLabels
-from .migrations import queue_migrate_nfs_spec, queue_migrate_rgw_spec
+from .migrations import (
+ queue_migrate_nfs_spec,
+ queue_migrate_rgw_spec,
+ queue_migrate_nvmeof_spec,
+)
if TYPE_CHECKING:
from .module import CephadmOrchestrator
):
queue_migrate_rgw_spec(self.mgr, j)
+ if (
+ (self.mgr.migration_current or 0) < 8
+ and j['spec'].get('service_type') == 'nvmeof'
+ ):
+ queue_migrate_nvmeof_spec(self.mgr, j)
+
spec = ServiceSpec.from_json(j['spec'])
created = str_to_datetime(cast(str, j['created']))
self._specs[service_name] = spec
import logging
from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any, List
-from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec, RGWSpec
+from ceph.deployment.service_spec import (
+ PlacementSpec,
+ ServiceSpec,
+ HostPlacementSpec,
+ RGWSpec,
+ NvmeofServiceSpec,
+)
from cephadm.schedule import HostAssignment
from cephadm.utils import SpecialHostLabels
import rados
if TYPE_CHECKING:
from .module import CephadmOrchestrator
-LAST_MIGRATION = 7
+LAST_MIGRATION = 8
logger = logging.getLogger(__name__)
r = mgr.get_store('rgw_migration_queue')
self.rgw_migration_queue = json.loads(r) if r else []
+ n = mgr.get_store('nvmeof_migration_queue')
+ self.nvmeof_migration_queue = json.loads(n) if n else []
+
# for some migrations, we don't need to do anything except for
# incrementing migration_current.
# let's try to shortcut things here.
if self.migrate_6_7():
self.set(7)
+ if self.mgr.migration_current == 7:
+ if self.migrate_7_8():
+ self.set(8)
+
def migrate_0_1(self) -> bool:
"""
Migration 0 -> 1
# was set to true. Therefore we have nothing to migrate for those daemons
return True
+ def migrate_nvmeof_spec(self, spec: Dict[Any, Any], migration_counter: int) -> Optional[NvmeofServiceSpec]:
+ """ Add value for group parameter to nvmeof spec """
+ new_spec = spec.copy()
+ # Note: each spec must have a different group name so we append
+ # the value of a counter to the end
+ new_spec['spec']['group'] = f'default{str(migration_counter + 1)}'
+ return NvmeofServiceSpec.from_json(new_spec)
+
+ def nvmeof_spec_needs_migration(self, spec: Dict[Any, Any]) -> bool:
+ spec = spec.get('spec', None)
+ return (bool(spec) and spec.get('group', None) is None)
+
+ def migrate_7_8(self) -> bool:
+ """
+ Add a default value for the "group" parameter to nvmeof specs that don't have one
+ """
+ self.mgr.log.debug(f'Starting nvmeof migration (queue length is {len(self.nvmeof_migration_queue)})')
+ migrated_spec_counter = 0
+ for s in self.nvmeof_migration_queue:
+ spec = s['spec']
+ if self.nvmeof_spec_needs_migration(spec):
+ nvmeof_spec = self.migrate_nvmeof_spec(spec, migrated_spec_counter)
+ if nvmeof_spec is not None:
+ logger.info(f"Added group 'default{migrated_spec_counter + 1}' to nvmeof spec {spec}")
+ self.mgr.spec_store.save(nvmeof_spec)
+ migrated_spec_counter += 1
+ else:
+ logger.info(f"No Migration is needed for nvmeof spec: {spec}")
+ self.nvmeof_migration_queue = []
+ return True
+
+
+def queue_migrate_nvmeof_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
+ """
+ group has become a required field for nvmeof specs and has been added
+ to spec validation. We need to assign a default group to nvmeof specs
+ that don't have one
+ """
+ service_id = spec_dict['spec']['service_id']
+ queued = mgr.get_store('nvmeof_migration_queue') or '[]'
+ ls = json.loads(queued)
+ ls.append(spec_dict)
+ mgr.set_store('nvmeof_migration_queue', json.dumps(ls))
+ mgr.log.info(f'Queued nvmeof.{service_id} for migration')
+
def queue_migrate_rgw_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
"""
assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host2')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1')
assert cephadm_module.cert_key_store.get_key('grafana_key', host='host2')
+
+
+@pytest.mark.parametrize(
+ "nvmeof_spec_store_entries, should_migrate_specs, expected_groups",
+ [
+ (
+ [
+ {'spec': {
+ 'service_type': 'nvmeof',
+ 'service_name': 'nvmeof.foo',
+ 'service_id': 'foo',
+ 'placement': {
+ 'hosts': ['host1']
+ },
+ 'spec': {
+ 'pool': 'foo',
+ },
+ },
+ 'created': datetime_to_str(datetime_now())},
+ {'spec': {
+ 'service_type': 'nvmeof',
+ 'service_name': 'nvmeof.bar',
+ 'service_id': 'bar',
+ 'placement': {
+ 'hosts': ['host2']
+ },
+ 'spec': {
+ 'pool': 'bar',
+ },
+ },
+ 'created': datetime_to_str(datetime_now())}
+ ],
+ [True, True], ['default1', 'default2']),
+ (
+ [
+ {'spec':
+ {
+ 'service_type': 'nvmeof',
+ 'service_name': 'nvmeof.foo',
+ 'service_id': 'foo',
+ 'placement': {
+ 'hosts': ['host1']
+ },
+ 'spec': {
+ 'pool': 'foo',
+ },
+ },
+ 'created': datetime_to_str(datetime_now())},
+ {'spec':
+ {
+ 'service_type': 'nvmeof',
+ 'service_name': 'nvmeof.bar',
+ 'service_id': 'bar',
+ 'placement': {
+ 'hosts': ['host2']
+ },
+ 'spec': {
+ 'pool': 'bar',
+ 'group': 'bar'
+ },
+ },
+ 'created': datetime_to_str(datetime_now())},
+ {'spec':
+ {
+ 'service_type': 'nvmeof',
+ 'service_name': 'nvmeof.testing_testing',
+ 'service_id': 'testing_testing',
+ 'placement': {
+ 'label': 'baz'
+ },
+ 'spec': {
+ 'pool': 'baz',
+ },
+ },
+ 'created': datetime_to_str(datetime_now())}
+ ], [True, False, True], ['default1', 'bar', 'default2']
+ ),
+ ]
+)
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_nvmeof_spec(
+ cephadm_module: CephadmOrchestrator,
+ nvmeof_spec_store_entries,
+ should_migrate_specs,
+ expected_groups
+):
+ with with_host(cephadm_module, 'host1'):
+ for spec in nvmeof_spec_store_entries:
+ cephadm_module.set_store(
+ SPEC_STORE_PREFIX + spec['spec']['service_name'],
+ json.dumps(spec, sort_keys=True),
+ )
+
+ # make sure nvmeof_migration_queue is populated accordingly
+ cephadm_module.migration_current = 1
+ cephadm_module.spec_store.load()
+ ls = json.loads(cephadm_module.get_store('nvmeof_migration_queue'))
+ assert all([s['spec']['service_type'] == 'nvmeof' for s in ls])
+ # shortcut nvmeof_migration_queue loading by directly assigning
+ # ls output to nvmeof_migration_queue list
+ cephadm_module.migration.nvmeof_migration_queue = ls
+
+ # skip other migrations and go directly to 7_8 migration (nvmeof spec)
+ cephadm_module.migration_current = 7
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == LAST_MIGRATION
+
+ print(cephadm_module.spec_store.all_specs)
+
+ for i in range(len(nvmeof_spec_store_entries)):
+ nvmeof_spec_store_entry = nvmeof_spec_store_entries[i]
+ should_migrate = should_migrate_specs[i]
+ expected_group = expected_groups[i]
+
+ service_name = nvmeof_spec_store_entry['spec']['service_name']
+ service_id = nvmeof_spec_store_entry['spec']['service_id']
+ placement = nvmeof_spec_store_entry['spec']['placement']
+ pool = nvmeof_spec_store_entry['spec']['spec']['pool']
+
+ if should_migrate:
+ assert service_name in cephadm_module.spec_store.all_specs
+ nvmeof_spec = cephadm_module.spec_store.all_specs[service_name]
+ nvmeof_spec_json = nvmeof_spec.to_json()
+ assert nvmeof_spec_json['service_type'] == 'nvmeof'
+ assert nvmeof_spec_json['service_id'] == service_id
+ assert nvmeof_spec_json['service_name'] == service_name
+ assert nvmeof_spec_json['placement'] == placement
+ assert nvmeof_spec_json['spec']['pool'] == pool
+ # make sure spec has the "group" parameter set to "default<counter>"
+ assert nvmeof_spec_json['spec']['group'] == expected_group
+ else:
+ nvmeof_spec = cephadm_module.spec_store.all_specs[service_name]
+ nvmeof_spec_json = nvmeof_spec.to_json()
+ assert nvmeof_spec_json['spec']['pool'] == pool
+ # make sure spec has the "group" parameter still set to its old value
+ assert nvmeof_spec_json['spec']['group'] == expected_group