"""
import logging
-from typing import Optional, List, Set
+from typing import Any, Optional
from mgr_module import MgrModule
from ceph.deployment.service_spec import ServiceSpec
import orchestrator
"""
MDS autoscaler.
"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
MgrModule.__init__(self, *args, **kwargs)
self.set_mgr(self)
return fs['mdsmap']['max_mds']
assert False
- def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str):
+ def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str) -> None:
assert fs_map is not None
try:
self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
pass
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: str, notify_id: str) -> None:
if notify_type != 'fs_map':
return
fs_map = self.get('fs_map')