import socket
import threading
import time
-from mgr_module import MgrModule, MgrStandbyModule, PG_STATES, Option
+from mgr_module import MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT
from mgr_util import get_default_addr, profile_method
from rbd import RBD
from collections import namedtuple
try:
- from typing import DefaultDict, Optional, Dict, Any, Set, cast
+ from typing import DefaultDict, Optional, Dict, Any, List, Set, cast
except ImportError:
pass
for server in self.list_servers():
version = server.get('ceph_version', '')
host = server.get('hostname', '')
- for service in server.get('services', []):
+ for service in cast(List[ServiceInfoT], server.get('services', [])):
ret.update({(service['id'], service['type']): (host, version)})
return ret
targets = []
for server in servers:
hostname = server.get('hostname', '')
- for service in server.get('services', []):
+ for service in cast(List[ServiceInfoT], server.get('services', [])):
if service['type'] != 'mgr':
continue
id_ = service['id']