image_name = cl['spec'].get('cephVersion', {}).get('image', None)
num_nodes = len(self.rook_cluster.get_node_names())
+ def sum_running_pods(service_type: str, service_name: Optional[str] = None) -> int:
+ all_pods = self.rook_cluster.describe_pods(None, None, None)
+ if service_name is None:
+ return sum(pod['phase'] == 'Running' for pod in all_pods if pod['labels']['app'] == f"rook-ceph-{service_type}")
+ else:
+ if service_type == 'mds':
+ key = 'rook_file_system'
+ elif service_type == 'rgw':
+ key = 'rook_object_store'
+ elif service_type == 'nfs':
+ key = 'ceph_nfs'
+ else:
+ self.log.error(f"Unknow service type {service_type}")
+ return 0
+
+ return sum(pod['phase'] == 'Running' \
+ for pod in all_pods \
+ if pod['labels']['app'] == f"rook-ceph-{service_type}" \
+ and service_name == pod['labels'][key])
+
spec = {}
if service_type == 'mon' or service_type is None:
spec['mon'] = orchestrator.ServiceDescription(
size=cl['spec'].get('mon', {}).get('count', 1),
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mon')
)
if service_type == 'mgr' or service_type is None:
spec['mgr'] = orchestrator.ServiceDescription(
size=1,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mgr')
)
if (
size=num_nodes,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('crashcollector')
)
if service_type == 'mds' or service_type is None:
# CephFilesystems
all_fs = self.rook_cluster.get_resource("cephfilesystems")
for fs in all_fs:
- svc = 'mds.' + fs['metadata']['name']
+ fs_name = fs['metadata']['name']
+ svc = 'mds.' + fs_name
if svc in spec:
continue
# FIXME: we are conflating active (+ standby) with count
size=total_mds,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mds', fs_name)
)
if service_type == 'rgw' or service_type is None:
# CephObjectstores
all_zones = self.rook_cluster.get_resource("cephobjectstores")
for zone in all_zones:
- svc = 'rgw.' + zone['metadata']['name']
+ zone_name = zone['metadata']['name']
+ svc = 'rgw.' + zone_name
if svc in spec:
continue
active = zone['spec']['gateway']['instances'];
size=active,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('rgw', zone_name)
)
if service_type == 'nfs' or service_type is None:
),
size=active,
last_refresh=now,
- running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]),
+ running=sum_running_pods('nfs', nfs_name),
created=creation_timestamp.astimezone(tz=datetime.timezone.utc)
)
if service_type == 'osd' or service_type is None:
),
size=len(all_osds),
last_refresh=now,
- running=sum(osd.status.phase == 'Running' for osd in all_osds)
+ running=sum_running_pods('osd')
)
# drivegroups
size=0,
running=0,
)
-
+
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
),
size=1,
last_refresh=now,
+ running=sum_running_pods('rbd-mirror', mirror_name)
)
-
+
for dd in self._list_daemons():
if dd.service_name() not in spec:
continue
service = spec[dd.service_name()]
- service.running += 1
if not service.container_image_id:
service.container_image_id = dd.container_image_id
if not service.container_image_name:
daemon_id: Optional[str] = None,
host: Optional[str] = None,
refresh: bool = False) -> List[orchestrator.DaemonDescription]:
+
+ def _pod_to_servicename(pod: Dict[str, Any]) -> Optional[str]:
+ if 'ceph_daemon_type' not in pod['labels']:
+ return None
+ daemon_type = pod['labels']['ceph_daemon_type']
+ if daemon_type in ['mds', 'rgw', 'nfs', 'rbd-mirror']:
+ if 'app.kubernetes.io/part-of' in pod['labels']:
+ service_name = f"{daemon_type}.{pod['labels']['app.kubernetes.io/part-of']}"
+ else:
+ service_name = f"{daemon_type}"
+ else:
+ service_name = f"{daemon_type}"
+ return service_name
+
pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
- self.log.debug('pods %s' % pods)
result = []
for p in pods:
- sd = orchestrator.DaemonDescription()
+ pod_svc_name = _pod_to_servicename(p)
+ sd = orchestrator.DaemonDescription(service_name=pod_svc_name)
sd.hostname = p['hostname']
# In Rook environments, the 'ceph-exporter' daemon is named 'exporter' whereas