MgrModule.__init__(self, *args, **kwargs)
self.set_mgr(self)
- def get_service(self, fs_name: str) -> List[orchestrator.ServiceDescription]:
+ def get_service(self, fs_name: str) -> Optional[orchestrator.ServiceDescription]:
service = f"mds.{fs_name}"
completion = self.describe_service(service_type='mds',
service_name=service,
refresh=True)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
- return completion.result
+ if completion.result:
+ return completion.result[0]
+ return None
- def get_daemons(self, fs_name: str) -> List[orchestrator.DaemonDescription]:
- service = f"mds.{fs_name}"
- completion = self.list_daemons(service_name=service)
- self._orchestrator_wait([completion])
- orchestrator.raise_if_exception(completion)
- return completion.result
-
- def update_daemon_count(self, fs_name: str, abscount: int) -> ServiceSpec:
- svclist = self.get_service(fs_name)
-
- assert svclist is not None
- assert len(svclist) > 0
-
- svc = svclist[0]
-
- assert svc.spec.placement.count != abscount
-
- ps = copy.deepcopy(svc.spec.placement)
+ def update_daemon_count(self, spec: ServiceSpec, fs_name: str, abscount: int) -> ServiceSpec:
+ ps = copy.deepcopy(spec.placement)
ps.count = abscount
- newspec = ServiceSpec(service_type=svc.spec.service_type,
- service_id=svc.spec.service_id,
+ newspec = ServiceSpec(service_type=spec.service_type,
+ service_id=spec.service_id,
placement=ps)
return newspec
return fs['mdsmap']['standby_count_wanted']
assert False
- def get_current_standby_count(self, fs_map: dict, fs_name: str, daemons: List[orchestrator.DaemonDescription]) -> int:
- # standbys are not grouped by filesystems in fs_map
- # available = standby_replay + standby_active
- assert fs_map is not None
- total = 0
- daemon_names = {
- d.name() for d in daemons
- }
- for sb in fs_map['standbys']:
- full_name = f"mds.{sb['name']}"
- if full_name in daemon_names:
- total += 1
- return total
-
- def get_active_names(self, fs_map: dict, fs_name: str) -> Set[str]:
- active_names = set()
- for fs in fs_map['filesystems']:
- if fs['mdsmap']['fs_name'] == fs_name:
- for active in fs['mdsmap']['up']:
- gid = fs['mdsmap']['up'][active]
- gid_key = f"gid_{gid}"
- active_names.add(f"mds.{fs['mdsmap']['info'][gid_key]['name']}")
- return active_names
-
- def get_current_active_count(self, fs_map: dict, fs_name: str, daemons: List[orchestrator.DaemonDescription]) -> int:
- assert fs_map is not None
- total = 0
- daemon_names = {
- d.name() for d in daemons
- }
- active_names = self.get_active_names(fs_map, fs_name)
- return len(daemon_names.intersection(active_names))
-
def get_required_max_mds(self, fs_map: dict, fs_name: str) -> int:
assert fs_map is not None
for fs in fs_map['filesystems']:
assert fs_map is not None
try:
- daemons = self.get_daemons(fs_name)
- standbys_required = self.get_required_standby_count(fs_map, fs_name)
- standbys_current = self.get_current_standby_count(fs_map, fs_name, daemons)
- active = self.get_current_active_count(fs_map, fs_name, daemons)
- max_mds_required = self.get_required_max_mds(fs_map, fs_name)
-
- self.log.info(f"fs_name:{fs_name} "
- f"standbys_required:{standbys_required}, "
- f"standbys_current:{standbys_current}, "
- f"active:{active}, "
- f"max_mds_required:{max_mds_required}")
-
- total_current = standbys_current + active
- total_required = max_mds_required + standbys_required
- self.log.info(f"fs:{fs_name} total_required:{total_required}, total_current:{total_current}")
-
- if total_required < total_current:
- self.log.info(f"fs:{fs_name}, killing {total_current - total_required} standby mds ...")
- elif total_required > total_current:
- self.log.info(f"fs:{fs_name}, spawning {total_required - total_current} standby mds ...")
- else:
- self.log.info(f"fs:{fs_name} no change to mds count")
+ svc = self.get_service(fs_name)
+ if not svc:
+ self.log.info(f"fs {fs_name}: no service defined; skipping")
+ return
+ if not svc.spec.placement.count:
+ self.log.info(f"fs {fs_name}: service does not specify a count; skipping")
return
- newspec = self.update_daemon_count(fs_name, total_required)
+ standbys_required = self.get_required_standby_count(fs_map, fs_name)
+ max_mds = self.get_required_max_mds(fs_map, fs_name)
+ want = max_mds + standbys_required
+
+ self.log.info(f"fs {fs_name}: "
+ f"max_mds={max_mds} "
+ f"standbys_required={standbys_required}, "
+ f"count={svc.spec.placement.count}")
- self.log.info(f"fs:{fs_name}, new placement count:{newspec.placement.count}")
+ if want == svc.spec.placement.count:
+ return
+ self.log.info(f"fs {fs_name}: adjusting daemon count from {svc.spec.placement.count} to {want}")
+ newspec = self.update_daemon_count(svc.spec, fs_name, want)
completion = self.apply_mds(newspec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except orchestrator.OrchestratorError as e:
- self.log.exception(f"fs:{fs_name} exception while verifying mds status: {e}")
+ self.log.exception(f"fs {fs_name}: exception while updating service: {e}")
pass
def notify(self, notify_type, notify_id):
fs_map = self.get('fs_map')
if not fs_map:
return
+
# we don't know for which fs config has been changed
for fs in fs_map['filesystems']:
fs_name = fs['mdsmap']['fs_name']
- self.log.info(f"processing fs:{fs_name}")
self.verify_and_manage_mds_instance(fs_map, fs_name)