-import datetime
import threading
import functools
import os
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
+from ceph.utils import datetime_now
try:
from typing import List, Dict, Optional, Callable, Any
@deferred_read
def describe_service(self, service_type=None, service_name=None,
refresh=False):
- now = datetime.datetime.utcnow()
+ now = datetime_now()
# CephCluster
cl = self.rook_cluster.rook_api_get(
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec
+from ceph.utils import datetime_now
from mgr_util import merge_dicts
try:
return False
return True
- refreshed = datetime.datetime.utcnow()
+ refreshed = datetime_now()
pods = [i for i in self.rook_pods.items if predicate(i)]
pods_summary = []
'created': None,
}
- # note: we want UTC but no tzinfo
+ # note: we want UTC
if d['metadata'].get('creation_timestamp', None):
s['created'] = d['metadata']['creation_timestamp'].astimezone(
- tz=datetime.timezone.utc).replace(tzinfo=None)
+ tz=datetime.timezone.utc)
if d['status'].get('start_time', None):
s['started'] = d['status']['start_time'].astimezone(
- tz=datetime.timezone.utc).replace(tzinfo=None)
+ tz=datetime.timezone.utc)
pods_summary.append(s)