+import json
import logging
-from typing import TYPE_CHECKING, Iterator
+from typing import TYPE_CHECKING, Iterator, Optional, Dict, Any
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
from cephadm.schedule import HostAssignment
if TYPE_CHECKING:
from .module import CephadmOrchestrator
-LAST_MIGRATION = 2
+LAST_MIGRATION = 3
logger = logging.getLogger(__name__)
# upgrade code, while an old upgrade is still in progress), naming of daemons,
# fs-layout of the daemons, etc.
if self.mgr.migration_current is None:
- self.set(0)
+ self.set(LAST_MIGRATION)
+
+ v = mgr.get_store('nfs_migration_queue')
+ self.nfs_migration_queue = json.loads(v) if v else []
# for some migrations, we don't need to do anything except for
# setting migration_current = 1.
# let's try to shortcut things here.
- self.migrate()
+ self.migrate(True)
def set(self, val: int) -> None:
self.mgr.set_module_option('migration_current', val)
raise OrchestratorError(
"cephadm migration still ongoing. Please wait, until the migration is complete.")
- def migrate(self) -> None:
+ def migrate(self, startup: bool = False) -> None:
if self.mgr.migration_current == 0:
if self.migrate_0_1():
self.set(1)
if self.migrate_1_2():
self.set(2)
+ if self.mgr.migration_current == 2 and not startup:
+ if self.migrate_2_3():
+ self.set(3)
+
def migrate_0_1(self) -> bool:
"""
Migration 0 -> 1
self.mgr.spec_store.finally_rm(old)
return True
+
+ def migrate_2_3(self) -> bool:
+ if self.nfs_migration_queue:
+ from nfs.cluster import create_ganesha_pool
+
+ create_ganesha_pool(self.mgr)
+ for service_id, pool, ns in self.nfs_migration_queue:
+ if pool != '.nfs':
+ self.migrate_nfs_spec(service_id, pool, ns)
+ self.nfs_migration_queue = []
+ return True
+
+ def migrate_nfs_spec(self, service_id: str, pool: str, ns: Optional[str]) -> None:
+ self.mgr.log.info(
+ f'Migrating nfs.{service_id} from legacy pool {pool} namespace {ns}'
+ )
+
+ # read exports
+ ioctx = self.mgr.rados.open_ioctx(pool)
+ if ns is not None:
+ ioctx.set_namespace(ns)
+ object_iterator = ioctx.list_objects()
+ exports = []
+ while True:
+ try:
+ obj = object_iterator.__next__()
+ if obj.key.startswith('export-'):
+ self.mgr.log.debug(f'reading {obj.key}')
+ exports.append(obj.read().decode())
+ except StopIteration:
+ break
+ self.mgr.log.info(f'Found {len(exports)} exports for legacy nfs.{service_id}')
+
+ # import exports
+ for export in exports:
+ ex = ''
+ for line in export.splitlines():
+ if (
+ line.startswith(' secret_access_key =')
+ or line.startswith(' user_id =')
+ ):
+ continue
+ ex += line + '\n'
+ self.mgr.log.debug(f'importing export: {ex}')
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'nfs export apply',
+ 'cluster_id': service_id
+ }, inbuf=ex)
+ if ret:
+ self.mgr.log.warning(f'Failed to migrate export ({ret}): {err}\nExport was:\n{ex}')
+
+ # redeploy all ganesha daemons to ensures that the daemon
+ # cephx are correct AND container configs are set up properly
+ daemons = [d.name() for d in self.mgr.cache.get_daemons_by_service(f'nfs.{service_id}')]
+ self.mgr.log.info(f'Removing old daemons {daemons}')
+ self.mgr.remove_daemons(daemons)
+
+ # re-save service spec (without pool and namespace properties!)
+ spec = self.mgr.spec_store[f'nfs.{service_id}'].spec
+ self.mgr.spec_store.save(spec)
+
+
+def queue_migrate_nfs_spec(mgr: "CephadmOrchestrator", spec_dict: Dict[Any, Any]) -> None:
+ """
+ After 16.2.5 we dropped the NFSServiceSpec pool and namespace properties.
+ Queue up a migration to process later, once we are sure that RADOS is available
+ and so on.
+ """
+ service_id = spec_dict['spec']['service_id']
+ args = spec_dict['spec'].get('spec', {})
+ pool = args.pop('pool', 'nfs-ganesha')
+ ns = args.pop('namespace', service_id)
+ queued = mgr.get_store('nfs_migration_queue') or '[]'
+ ls = json.loads(queued)
+ ls.append([service_id, pool, ns])
+ mgr.set_store('nfs_migration_queue', json.dumps(ls))
+ mgr.log.info(f'Queued nfs.{service_id} for migration')
cephadm_module.migration_current = 0
cephadm_module.migration.migrate()
- assert cephadm_module.migration_current == 2
+ assert cephadm_module.migration_current >= 2
out = [o.spec.placement for o in wait(
cephadm_module, cephadm_module.describe_service())]
cephadm_module.migration_current = 1
cephadm_module.migration.migrate()
- assert cephadm_module.migration_current == 2
+ assert cephadm_module.migration_current >= 2
assert len(cephadm_module.spec_store.all_specs) == 1
assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
cephadm_module.migration_current = 1
cephadm_module.migration.migrate()
- assert cephadm_module.migration_current == 2
+ assert cephadm_module.migration_current >= 2
assert len(cephadm_module.spec_store.all_specs) == 1
assert cephadm_module.spec_store.all_specs['mon'] == ServiceSpec(
# there is nothing to migrate, as the spec is gone now.
assert len(cephadm_module.spec_store.all_specs) == 0
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('[]'))
+def test_migrate_nfs_initial(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'host1'):
+ cephadm_module.set_store(
+ SPEC_STORE_PREFIX + 'mds',
+ json.dumps({
+ 'spec': {
+ 'service_type': 'nfs',
+ 'service_id': 'foo',
+ 'placement': {
+ 'hosts': ['host1']
+ },
+ 'spec': {
+ 'pool': 'mypool',
+ 'namespace': 'foons',
+ },
+ },
+ 'created': datetime_to_str(datetime_now()),
+ }, sort_keys=True),
+ )
+ cephadm_module.migration_current = 1
+ cephadm_module.spec_store.load()
+
+ ls = json.loads(cephadm_module.get_store('nfs_migration_queue'))
+ assert ls == [['foo', 'mypool', 'foons']]
+
+ cephadm_module.migration.migrate(True)
+ assert cephadm_module.migration_current == 2
+
+ cephadm_module.migration.migrate()
+ assert cephadm_module.migration_current == 3