import json
import mgr_util
import threading
-from typing import Optional, Tuple
+from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, Union
import uuid
from prettytable import PrettyTable
-from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option
+from mgr_module import HealthChecksT, CLIReadCommand, CLIWriteCommand, CRUSHMap, MgrModule, Option, OSDMap
"""
Some terminology is made up for the purposes of this module:
PG_NUM_MIN = 32 # unless specified on a per-pool basis
-def nearest_power_of_two(n):
+if TYPE_CHECKING:
+ import sys
+ if sys.version_info >= (3, 8):
+ from typing import Literal
+ else:
+ from typing_extensions import Literal
+
+ ScaleModeT = Literal['scale-up', 'scale-down']
+
+
+def nearest_power_of_two(n: int) -> int:
v = int(n)
v -= 1
return x if (v - n) > (n - x) else v
-def effective_target_ratio(target_ratio, total_target_ratio, total_target_bytes, capacity):
+
+def effective_target_ratio(target_ratio: float,
+ total_target_ratio: float,
+ total_target_bytes: int,
+ capacity: int) -> float:
"""
Returns the target ratio after normalizing for ratios across pools and
adjusting for capacity reserved by pools that have target_size_bytes set.
return target_ratio
+
class PgAdjustmentProgress(object):
"""
Keeps the initial and target pg_num values
"""
- def __init__(self, pool_id, pg_num, pg_num_target):
+ def __init__(self, pool_id: int, pg_num: int, pg_num_target: int) -> None:
self.ev_id = str(uuid.uuid4())
self.pool_id = pool_id
self.reset(pg_num, pg_num_target)
- def reset(self, pg_num, pg_num_target):
+ def reset(self, pg_num: int, pg_num_target: int) -> None:
self.pg_num = pg_num
self.pg_num_target = pg_num_target
- def update(self, module, progress):
+ def update(self, module: MgrModule, progress: float) -> None:
desc = 'increasing' if self.pg_num < self.pg_num_target else 'decreasing'
module.remote('progress', 'update', self.ev_id,
ev_msg="PG autoscaler %s pool %d PGs from %d to %d" %
class CrushSubtreeResourceStatus:
- def __init__(self):
- self.root_ids = []
- self.osds = set()
- self.osd_count = None # Number of OSDs
- self.pg_target = None # Ideal full-capacity PG count?
+ def __init__(self) -> None:
+ self.root_ids: List[int] = []
+ self.osds: Set[int] = set()
+ self.osd_count: Optional[int] = None # Number of OSDs
+ self.pg_target: Optional[int] = None # Ideal full-capacity PG count?
self.pg_current = 0 # How many PGs already?
self.pg_left = 0
- self.capacity = None # Total capacity of OSDs in subtree
- self.pool_ids = []
- self.pool_names = []
- self.pool_count = None
+ self.capacity: Optional[int] = None # Total capacity of OSDs in subtree
+ self.pool_ids: List[int] = []
+ self.pool_names: List[str] = []
+ self.pool_count: Optional[int] = None
self.pool_used = 0
self.total_target_ratio = 0.0
self.total_target_bytes = 0 # including replication / EC overhead
runtime=True),
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(PgAutoscaler, self).__init__(*args, **kwargs)
self._shutdown = threading.Event()
- self._event = {}
+ self._event: Dict[int, PgAdjustmentProgress] = {}
# So much of what we do peeks at the osdmap that it's easiest
# to just keep a copy of the pythonized version.
self._osd_map = None
+ if TYPE_CHECKING:
+ self.autoscale_profile: 'ScaleModeT' = 'scale-up'
+ self.sleep_interval = 60
+ self.mon_target_pg_per_osd = 0
- def config_notify(self):
+ def config_notify(self) -> None:
for opt in self.NATIVE_OPTIONS:
setattr(self,
opt,
self.set_module_option("autoscale_profile", "scale-down")
return 0, "", "autoscale-profile is now scale-down"
- def serve(self):
+ def serve(self) -> None:
self.config_notify()
while not self._shutdown.is_set():
self._maybe_adjust()
self._update_progress_events()
self._shutdown.wait(timeout=self.sleep_interval)
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping pg_autoscaler')
self._shutdown.set()
- def get_subtree_resource_status(self, osdmap, crush):
+ def get_subtree_resource_status(self,
+ osdmap: OSDMap,
+ crush: CRUSHMap) -> Tuple[Dict[int, CrushSubtreeResourceStatus],
+ Dict[int, int]]:
"""
For each CRUSH subtree of interest (i.e. the roots under which
we have pools), calculate the current resource usages and targets,
such as how many PGs there are, vs. how many PGs we would
like there to be.
"""
- result = {}
+ result: Dict[int, CrushSubtreeResourceStatus] = {}
pool_root = {}
roots = []
# identify subtrees (note that they may overlap!)
for pool_id, pool in osdmap.get_pools().items():
- cr_name = crush.get_rule_by_id(pool['crush_rule'])['rule_name']
- root_id = int(crush.get_rule_root(cr_name))
+ crush_rule = crush.get_rule_by_id(pool['crush_rule'])
+ assert crush_rule is not None
+ cr_name = crush_rule['rule_name']
+ root_id = crush.get_rule_root(cr_name)
+ assert root_id is not None
pool_root[pool_id] = root_id
osds = set(crush.get_osds_under(root_id))
# finish subtrees
all_stats = self.get('osd_stats')
for s in roots:
+ assert s.osds is not None
s.osd_count = len(s.osds)
s.pg_target = s.osd_count * self.mon_target_pg_per_osd
s.pg_left = s.pg_target
s.pool_count = len(s.pool_ids)
- capacity = 0.0
+ capacity = 0
for osd_stats in all_stats['osd_stats']:
if osd_stats['osd'] in s.osds:
# Intentionally do not apply the OSD's reweight to
def _calc_final_pg_target(
self,
- p,
- pool_name,
- root_map,
- root_id,
- capacity_ratio,
- even_pools,
- bias,
- is_used,
- profile,
- ):
+ p: Dict[str, Any],
+ pool_name: str,
+ root_map: Dict[int, CrushSubtreeResourceStatus],
+ root_id: int,
+ capacity_ratio: float,
+ even_pools: Dict[str, Dict[str, Any]],
+ bias: float,
+ is_used: bool,
+ profile: 'ScaleModeT',
+ ) -> Union[Tuple[float, int, int], Tuple[None, None, None]]:
"""
`profile` determines behaviour of the autoscaler.
`is_used` flag used to determine if this is the first
if profile == "scale-up":
final_ratio = capacity_ratio
# So what proportion of pg allowance should we be using?
- pool_pg_target = (final_ratio * root_map[root_id].pg_target) / p['size'] * bias
+ pg_target = root_map[root_id].pg_target
+ assert pg_target is not None
+ pool_pg_target = (final_ratio * pg_target) / p['size'] * bias
final_pg_target = max(p.get('options', {}).get('pg_num_min', PG_NUM_MIN),
nearest_power_of_two(pool_pg_target))
else:
if is_used:
- even_ratio = 1 / root_map[root_id].pool_count
+ pool_count = root_map[root_id].pool_count
+ assert pool_count is not None
+ even_ratio = 1 / pool_count
used_ratio = capacity_ratio
if used_ratio > even_ratio:
return None, None, None
final_ratio = max(used_ratio, even_ratio)
- used_pg = final_ratio * root_map[root_id].pg_target
- root_map[root_id].pg_left -= used_pg
+ pg_target = root_map[root_id].pg_target
+ assert pg_target is not None
+ used_pg = final_ratio * pg_target
+ root_map[root_id].pg_left -= int(used_pg)
pool_pg_target = used_pg / p['size'] * bias
else:
- final_ratio = 1 / (root_map[root_id].pool_count - root_map[root_id].pool_used)
+ pool_count = root_map[root_id].pool_count
+ assert pool_count is not None
+ final_ratio = 1 / (pool_count - root_map[root_id].pool_used)
pool_pg_target = (final_ratio * root_map[root_id].pg_left) / p['size'] * bias
final_pg_target = max(p.get('options', {}).get('pg_num_min', PG_NUM_MIN),
def _calc_pool_targets(
self,
- osdmap,
- pools,
- crush_map,
- root_map,
- pool_root,
- pool_stats,
- ret,
- threshold,
- is_used,
- profile,
- ):
+ osdmap: OSDMap,
+ pools: Dict[str, Dict[str, Any]],
+ crush_map: CRUSHMap,
+ root_map: Dict[int, CrushSubtreeResourceStatus],
+ pool_root: Dict[int, int],
+ pool_stats: Dict[int, Dict[str, int]],
+ ret: List[Dict[str, Any]],
+ threshold: float,
+ is_used: bool,
+ profile: 'ScaleModeT',
+ ) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Any]]]:
"""
Calculates final_pg_target of each pools and determine if it needs
scaling, this depends on the profile of the autoscaler. For scale-down,
pools needs more pgs due to increased usage. For scale-up, we start out with
the minimal amount of pgs and only scale when there is increase in usage.
"""
- even_pools = {}
+ even_pools: Dict[str, Dict[str, Any]] = {}
for pool_name, p in pools.items():
pool_id = p['pool']
if pool_id not in pool_stats:
# FIXME: we assume there is only one take per pool, but that
# may not be true.
- cr_name = crush_map.get_rule_by_id(p['crush_rule'])['rule_name']
- root_id = int(crush_map.get_rule_root(cr_name))
+ crush_rule = crush_map.get_rule_by_id(p['crush_rule'])
+ assert crush_rule is not None
+ cr_name = crush_rule['rule_name']
+ root_id = crush_map.get_rule_root(cr_name)
+ assert root_id is not None
capacity = root_map[root_id].capacity
+ assert capacity is not None
if capacity == 0:
self.log.debug('skipping empty subtree %s', cr_name)
continue
profile,
)
- if final_ratio == None:
+ if final_ratio is None:
continue
adjust = False
final_ratio <= 1.0:
adjust = True
+ assert pool_pg_target is not None
ret.append({
'pool_id': pool_id,
'pool_name': p['pool_name'],
def _get_pool_status(
self,
- osdmap,
- pools,
- profile,
- threshold=3.0,
- ):
+ osdmap: OSDMap,
+ pools: Dict[str, Dict[str, Any]],
+ profile: 'ScaleModeT',
+ threshold: float = 3.0,
+ ) -> Tuple[List[Dict[str, Any]],
+ Dict[int, CrushSubtreeResourceStatus],
+ Dict[int, int]]:
assert threshold >= 2.0
crush_map = osdmap.get_crush()
df = self.get('df')
pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
- ret = []
+ ret: List[Dict[str, Any]] = []
# Iterate over all pools to determine how they should be sized.
# First call of _calc_pool_targets() is to find/adjust pools that uses more capacaity than
return (ret, root_map, pool_root)
- def _update_progress_events(self):
+ def _update_progress_events(self) -> None:
osdmap = self.get_osdmap()
pools = osdmap.get_pools()
for pool_id in list(self._event):
continue
ev.update(self, (ev.pg_num - pool_data['pg_num']) / (ev.pg_num - ev.pg_num_target))
- def _maybe_adjust(self):
+ def _maybe_adjust(self) -> None:
self.log.info('_maybe_adjust')
osdmap = self.get_osdmap()
if osdmap.get_require_osd_release() < 'nautilus':
too_few = []
too_many = []
bytes_and_ratio = []
- health_checks = {}
+ health_checks: Dict[str, Dict[str, Union[int, str, List[str]]]] = {}
total_bytes = dict([(r, 0) for r in iter(root_map)])
total_target_bytes = dict([(r, 0.0) for r in iter(root_map)])
- target_bytes_pools = dict([(r, []) for r in iter(root_map)])
+ target_bytes_pools: Dict[int, List[int]] = dict([(r, []) for r in iter(root_map)])
for p in ps:
pool_id = p['pool_id']
too_much_target_bytes = []
for root_id, total in total_bytes.items():
- total_target = total_target_bytes[root_id]
- if total_target > 0 and total > root_map[root_id].capacity and root_map[root_id].capacity:
+ total_target = int(total_target_bytes[root_id])
+ capacity = root_map[root_id].capacity
+ assert capacity is not None
+ if total_target > 0 and total > capacity and capacity:
too_much_target_bytes.append(
'Pools %s overcommit available storage by %.03fx due to '
'target_size_bytes %s on pools %s' % (
root_map[root_id].pool_names,
- total / root_map[root_id].capacity,
+ total / capacity,
mgr_util.format_bytes(total_target, 5, colored=False),
target_bytes_pools[root_id]
)
)
- elif total_target > root_map[root_id].capacity and root_map[root_id].capacity:
+ elif total_target > capacity and capacity:
too_much_target_bytes.append(
'Pools %s overcommit available storage by %.03fx due to '
'collective target_size_bytes of %s' % (
root_map[root_id].pool_names,
- total / root_map[root_id].capacity,
+ total / capacity,
mgr_util.format_bytes(total_target, 5, colored=False),
)
)