import json
import logging
import operator
+import re
from oauthlib.oauth1 import SIGNATURE_PLAINTEXT
from requests import Response
self.os_type, self.os_version = os_type, os_version
+ # Normalize CentOS Stream version inputs early (keep internal form as "<N>.stream")
+ if self.os_type == "centos" and self.os_version:
+ v = str(self.os_version).lower().strip()
+ m = re.match(r"^centos(\d+)-stream$", v)
+ if m:
+ self.os_version = f"{m.group(1)}.stream"
+ elif re.match(r"^\d+$", v):
+ self.os_version = f"{v}.stream"
+ # if already like "9.stream", leave it as-is
+
def _get_system_info(self) -> Tuple[Optional[str], Optional[str], str, str]:
"""Get the system info for the deployed machine
if status_name in ["deployed", "allocated"]:
os_type = machine.get("osystem", "").lower()
- os_version = OS._codename_to_version(
- os_type, machine.get("distro_series")
- )
+ distro_series = (machine.get("distro_series") or "").lower()
+
+ # MAAS uses series strings for some distros (e.g. centos9-stream),
+ # but OS._codename_to_version() expects codenames ("stream", "core").
+ if os_type == "centos":
+ m = re.match(r"^centos(\d+)-stream$", distro_series)
+ if m:
+ # normalize to what teuth CLI wants to accept: 9.stream
+ os_version = f"{m.group(1)}.stream"
+ else:
+ # fall back to existing mapping (covers core/stream)
+ os_version = OS._codename_to_version(os_type, distro_series)
+ else:
+ os_version = OS._codename_to_version(os_type, distro_series)
+
return os_type, os_version, arch, machine.get("system_id")
raise RuntimeError(