from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
from ceph.utils import datetime_now
-from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING
try:
from ceph.deployment.drive_group import DriveGroupSpec
default='local',
desc='storage class name for LSO-discovered PVs',
),
+ Option(
+ 'drive_group_interval',
+ type='float',
+ default=300.0,
+ desc='interval in seconds between re-application of applied drive_groups',
+ ),
]
@staticmethod
self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None
- self.storage_class = self.get_module_option('storage_class')
+ self.config_notify()
+ if TYPE_CHECKING:
+ self.storage_class = 'foo'
+ self.drive_group_interval = 10.0
+
+ self._load_drive_groups()
self._shutdown = threading.Event()
def config_notify(self) -> None:
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
assert isinstance(self.storage_class, str)
- self.rook_cluster.storage_class = self.storage_class
+ assert isinstance(self.drive_group_interval, float)
+
+ if self._rook_cluster:
+ self._rook_cluster.storage_class = self.storage_class
def shutdown(self) -> None:
self._shutdown.set()
self.storage_class)
self._initialized.set()
+ self.config_notify()
while not self._shutdown.is_set():
- self._shutdown.wait(5)
+ self._apply_drivegroups(list(self._drive_group_map.values()))
+ self._shutdown.wait(self.drive_group_interval)
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
return self.rook_cluster.remove_pods(names)
def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
- result_list = []
- all_hosts = raise_if_exception(self.get_hosts())
for drive_group in specs:
- matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
+ self._drive_group_map[str(drive_group.service_id)] = drive_group
+ self._save_drive_groups()
+ return OrchResult(self._apply_drivegroups(specs))
+
+ def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
+ all_hosts = raise_if_exception(self.get_hosts())
+ result_list: List[str] = []
+ for drive_group in ls:
+ matching_hosts = drive_group.placement.filter_matching_hosts(
+ lambda label=None, as_hostspec=None: all_hosts
+ )
if not self.rook_cluster.node_exists(matching_hosts[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
- return OrchResult(result_list)
+ return result_list
+
+ def _load_drive_groups(self) -> None:
+ stored_drive_group = self.get_store("drive_group_map")
+ self._drive_group_map: Dict[str, DriveGroupSpec] = {}
+ if stored_drive_group:
+ for name, dg in json.loads(stored_drive_group).items():
+ try:
+ self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
+ except ValueError as e:
+ self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
+
+ def _save_drive_groups(self) -> None:
+ json_drive_group_map = {
+ name: dg.to_json() for name, dg in self._drive_group_map.items()
+ }
+ self.set_store("drive_group_map", json.dumps(json_drive_group_map))
def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False) -> OrchResult[str]:
assert self._rook_cluster is not None