import platform
import pwd
import random
-import re
import select
import shutil
import socket
container_path = ''
cached_stdin = None
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
# Log and console output config
logging_config = {
return datetime.datetime.fromtimestamp(
mt, tz=datetime.timezone.utc
).strftime(DATEFMT)
- except Exception as e:
+ except Exception:
return None
p = re.compile(r'(\.[\d]{6})[\d]*')
s = p.sub(r'\1', s)
- # replace trailling Z with -0000, since (on python 3.6.8) it won't parse
+ # replace trailing Z with -0000, since (on python 3.6.8) it won't parse
if s and s[-1] == 'Z':
s = s[:-1] + '-0000'
- # cut off the redundnat 'CST' part that strptime can't parse, if
+ # cut off the redundant 'CST' part that strptime can't parse, if
# present.
v = s.split(' ')
s = ' '.join(v[0:3])
import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
-from cephadm.utils import str_to_datetime, datetime_to_str
+from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
if TYPE_CHECKING:
self.spec_preview[spec.service_name()] = spec
return None
self.specs[spec.service_name()] = spec
- self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
+ self.spec_created[spec.service_name()] = datetime_now()
self.mgr.set_store(
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
- self.last_daemon_update[host] = datetime.datetime.utcnow()
+ self.last_daemon_update[host] = datetime_now()
def update_host_facts(self, host, facts):
# type: (str, Dict[str, Dict[str, Any]]) -> None
# type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
self.devices[host] = dls
self.networks[host] = nets
- self.last_device_update[host] = datetime.datetime.utcnow()
+ self.last_device_update[host] = datetime_now()
def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
self.daemon_config_deps[host][name] = {
def update_last_host_check(self, host):
# type: (str) -> None
- self.last_host_check[host] = datetime.datetime.utcnow()
+ self.last_host_check[host] = datetime_now()
def prime_empty_host(self, host):
# type: (str) -> None
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.daemon_cache_timeout)
if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
return True
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.device_cache_timeout)
if host not in self.last_device_update or self.last_device_update[host] < cutoff:
return True
def host_needs_check(self, host):
# type: (str) -> bool
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
if not self.mgr.last_monmap:
return
- self.last_etc_ceph_ceph_conf[host] = datetime.datetime.utcnow()
+ self.last_etc_ceph_ceph_conf[host] = datetime_now()
def host_needs_registry_login(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'service',
+ e = OrchestratorEvent(datetime_now(), 'service',
spec.service_name(), level, message)
self.add(e)
def from_orch_error(self, e: OrchestratorError) -> None:
if e.event_subject is not None:
self.add(OrchestratorEvent(
- datetime.datetime.utcnow(),
+ datetime_now(),
e.event_subject[0],
e.event_subject[1],
"ERROR",
))
def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+ e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
self.add(e)
def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
CustomContainerSpec, HostPlacementSpec
+from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
-from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, \
- str_to_datetime, datetime_to_str
+from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
try:
import remoto
ConnectTimeout=30
"""
-CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
-
CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
if notify_type == "mon_map":
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
- self.last_monmap = datetime.datetime.strptime(
- monmap['modified'], CEPH_DATEFMT)
- if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
+ self.last_monmap = str_to_datetime(monmap['modified'])
+ if self.last_monmap and self.last_monmap > datetime_now():
self.last_monmap = None # just in case clocks are skewed
if getattr(self, 'manage_etc_ceph_ceph_conf', False):
# getattr, due to notify() being called before config_notify()
self.set_store("extra_ceph_conf", json.dumps({
'conf': inbuf,
- 'last_modified': datetime_to_str(datetime.datetime.utcnow())
+ 'last_modified': datetime_to_str(datetime_now())
}))
self.log.info('Set extra_ceph_conf')
self._kick_serve_loop()
).service_id(), overwrite=True):
image = ''
- start_time = datetime.datetime.utcnow()
+ start_time = datetime_now()
ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
if daemon_spec.daemon_type == 'container':
force=force,
hostname=daemon.hostname,
fullname=daemon.name(),
- process_started_at=datetime.datetime.utcnow(),
+ process_started_at=datetime_now(),
remove_util=self.to_remove_osds.rm_util))
except NotFoundError:
return f"Unable to find OSDs: {osd_ids}"
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec
+from ceph.utils import str_to_datetime, datetime_now
import orchestrator
from cephadm.schedule import HostAssignment
from cephadm.upgrade import CEPH_UPGRADE_ORDER
-from cephadm.utils import forall_hosts, cephadmNoImage, str_to_datetime, is_repo_digest
+from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest
from orchestrator import OrchestratorError
if TYPE_CHECKING:
if '.' not in d['name']:
continue
sd = orchestrator.DaemonDescription()
- sd.last_refresh = datetime.datetime.utcnow()
+ sd.last_refresh = datetime_now()
for k in ['created', 'started', 'last_configured', 'last_deployed']:
v = d.get(k, None)
if v:
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import DriveSelection
from ceph.deployment.inventory import Device
+from ceph.utils import datetime_to_str, str_to_datetime
from datetime import datetime
import orchestrator
-from cephadm.utils import forall_hosts, datetime_to_str, str_to_datetime
+from cephadm.utils import forall_hosts
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
-import datetime
import time
import fnmatch
from contextlib import contextmanager
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
-from cephadm.module import CEPH_DATEFMT
+from ceph.utils import datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
try:
store = {}
if '_ceph_get/mon_map' not in store:
m.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
for k, v in store.items():
-import datetime
import json
from contextlib import contextmanager
from unittest.mock import ANY
NFSServiceSpec, IscsiServiceSpec, HostPlacementSpec, CustomContainerSpec
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
+from ceph.utils import datetime_to_str, datetime_now
from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module, with_service, assert_rm_service
-from cephadm.module import CephadmOrchestrator, CEPH_DATEFMT
+from cephadm.module import CephadmOrchestrator
"""
TODOs:
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module._store['_ceph_get/mon_map'] = {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
}
cephadm_module.notify('mon_map', None)
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', None)
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', None)
force=False,
hostname='test',
fullname='osd.0',
- process_started_at=datetime.datetime.utcnow(),
+ process_started_at=datetime_now(),
remove_util=cephadm_module.to_remove_osds.rm_util
))
cephadm_module.to_remove_osds.process_removal_queue()
# Make sure, _check_daemons does a redeploy due to monmap change:
cephadm_module.mock_store_set('_ceph_get', 'mon_map', {
- 'modified': datetime.datetime.utcnow().strftime(CEPH_DATEFMT),
+ 'modified': datetime_to_str(datetime_now()),
'fsid': 'foobar',
})
cephadm_module.notify('mon_map', mock.MagicMock())
import json
-from datetime import datetime
import pytest
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
+from ceph.utils import datetime_to_str, datetime_now
from cephadm import CephadmOrchestrator
from cephadm.inventory import SPEC_STORE_PREFIX
-from cephadm.utils import DATEFMT
from cephadm.tests.fixtures import _run_cephadm, cephadm_module, wait, with_host
from orchestrator import OrchestratorError
from cephadm.serve import CephadmServe
# Sorry, for this hack, but I need to make sure, Migration thinks,
# we have updated all daemons already.
- cephadm_module.cache.last_daemon_update['host1'] = datetime.now()
- cephadm_module.cache.last_daemon_update['host2'] = datetime.now()
+ cephadm_module.cache.last_daemon_update['host1'] = datetime_now()
+ cephadm_module.cache.last_daemon_update['host2'] = datetime_now()
cephadm_module.migration_current = 0
cephadm_module.migration.migrate()
'hosts': ['host1']
}
},
- 'created': datetime.utcnow().strftime(DATEFMT),
+ 'created': datetime_to_str(datetime_now()),
}, sort_keys=True),
)
'count': 5,
}
},
- 'created': datetime.utcnow().strftime(DATEFMT),
+ 'created': datetime_to_str(datetime_now()),
}, sort_keys=True),
)
cephadm_module.set_store(SPEC_STORE_PREFIX + 'mon.wrong', json.dumps({
'hosts': ['host1']
}
},
- 'created': datetime.utcnow().strftime(DATEFMT),
+ 'created': datetime_to_str(datetime_now()),
}, sort_keys=True),
)
'hosts': ['host1']
}
},
- 'created': datetime.utcnow().strftime(DATEFMT),
+ 'created': datetime_to_str(datetime_now()),
}, sort_keys=True),
)
# https://tracker.ceph.com/issues/44934
# Those are real user data from early octopus.
# Please do not modify those JSON values.
- assert dd_json == DaemonDescription.from_json(dd_json).to_json()
+
+ # Convert datetime properties to old style.
+ # 2020-04-03T07:29:16.926292Z -> 2020-04-03T07:29:16.926292
+ def convert_to_old_style_json(j):
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if k in j:
+ j[k] = j[k].rstrip('Z')
+ return j
+
+ assert dd_json == convert_to_old_style_json(
+ DaemonDescription.from_json(dd_json).to_json())
@pytest.mark.parametrize("spec,dd,valid",
import logging
-import re
import json
-import datetime
import socket
from enum import Enum
from functools import wraps
-from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any
+from typing import Callable, TypeVar, List, NewType, TYPE_CHECKING, Any
from orchestrator import OrchestratorError
if TYPE_CHECKING:
ConfEntity = NewType('ConfEntity', str)
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
-
class CephadmNoImage(Enum):
token = 1
return '@' in image_name
-def str_to_datetime(input: str) -> datetime.datetime:
- return datetime.datetime.strptime(input, DATEFMT)
-
-
-def datetime_to_str(dt: datetime.datetime) -> str:
- return dt.strftime(DATEFMT)
-
-
def resolve_ip(hostname: str) -> str:
try:
return socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME, type=socket.SOCK_STREAM)[0][4][0]
ServiceSpecValidationError, IscsiServiceSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec
+from ceph.utils import datetime_to_str, str_to_datetime
from mgr_module import MgrModule, CLICommand, HandleCommandResult
logger = logging.getLogger(__name__)
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
-
T = TypeVar('T')
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if getattr(self, k):
- out[k] = getattr(self, k).strftime(DATEFMT)
+ out[k] = datetime_to_str(getattr(self, k))
if self.events:
out['events'] = [e.to_json() for e in self.events]
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if k in c:
- c[k] = datetime.datetime.strptime(c[k], DATEFMT)
+ c[k] = str_to_datetime(c[k])
events = [OrchestratorEvent.from_json(e) for e in event_strs]
return cls(events=events, **c)
}
for k in ['last_refresh', 'created']:
if getattr(self, k):
- status[k] = getattr(self, k).strftime(DATEFMT)
+ status[k] = datetime_to_str(getattr(self, k))
status = {k: v for (k, v) in status.items() if v is not None}
out['status'] = status
if self.events:
c_status = status.copy()
for k in ['last_refresh', 'created']:
if k in c_status:
- c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
+ c_status[k] = str_to_datetime(c_status[k])
events = [OrchestratorEvent.from_json(e) for e in event_strs]
return cls(spec=spec, events=events, **c_status)
def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
if isinstance(created, str):
- created = datetime.datetime.strptime(created, DATEFMT)
+ created = str_to_datetime(created)
self.created: datetime.datetime = created
assert kind in "service daemon".split()
def to_json(self) -> str:
# Make a long list of events readable.
- created = self.created.strftime(DATEFMT)
+ created = datetime_to_str(self.created)
return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
@classmethod
def from_json(cls, data) -> "OrchestratorEvent":
"""
>>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
- '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+ '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
:param data:
:return:
-import datetime
import errno
import json
from typing import List, Set, Optional, Iterator, cast, Dict, Any, Union
from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
+from ceph.utils import datetime_now
from mgr_util import format_bytes, to_pretty_timedelta, format_dimless
from mgr_module import MgrModule, HandleCommandResult
else:
return HandleCommandResult(stdout=to_format(services, format, many=True, cls=ServiceDescription))
else:
- now = datetime.datetime.utcnow()
+ now = datetime_now()
table = PrettyTable(
['NAME', 'RUNNING', 'REFRESHED', 'AGE',
'PLACEMENT',
if len(daemons) == 0:
return HandleCommandResult(stdout="No daemons reported")
- now = datetime.datetime.utcnow()
+ now = datetime_now()
table = PrettyTable(
['NAME', 'HOST', 'STATUS', 'REFRESHED', 'AGE',
'VERSION', 'IMAGE NAME', 'IMAGE ID', 'CONTAINER ID'],
from __future__ import absolute_import
-import datetime
import json
import pytest
from ceph.deployment.service_spec import ServiceSpec
from ceph.deployment import inventory
+from ceph.utils import datetime_now
from test_orchestrator import TestOrchestrator as _TestOrchestrator
from tests import mock
status_desc: starting
is_active: false
events:
-- 2020-06-10T10:08:22.933241 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on
+- 2020-06-10T10:08:22.933241Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on
host 'ubuntu'"
---
service_type: crash
status:
container_image_id: 74803e884bea289d2d2d3ebdf6d37cd560499e955595695b1390a89800f4e37a
container_image_name: docker.io/ceph/daemon-base:latest-master-devel
- created: '2020-06-10T10:37:31.051288'
- last_refresh: '2020-06-10T10:57:40.715637'
+ created: '2020-06-10T10:37:31.051288Z'
+ last_refresh: '2020-06-10T10:57:40.715637Z'
running: 1
size: 1
events:
-- 2020-06-10T10:37:31.139159 service:crash [INFO] "service was created"
+- 2020-06-10T10:37:31.139159Z service:crash [INFO] "service was created"
"""
types = (DaemonDescription, ServiceDescription)
def test_event_multiline():
from .._interface import OrchestratorEvent
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'service', 'subject', 'ERROR', 'message')
+ e = OrchestratorEvent(datetime_now(), 'service', 'subject', 'ERROR', 'message')
assert OrchestratorEvent.from_json(e.to_json()) == e
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'service',
+ e = OrchestratorEvent(datetime_now(), 'service',
'subject', 'ERROR', 'multiline\nmessage')
assert OrchestratorEvent.from_json(e.to_json()) == e
--- /dev/null
+import datetime
+
+import pytest
+
+from ceph.utils import datetime_now, datetime_to_str, str_to_datetime
+
+
+def test_datetime_to_str_1():
+ dt = datetime.datetime.now()
+ assert type(datetime_to_str(dt)) is str
+
+
+def test_datetime_to_str_2():
+ dt = datetime.datetime.strptime('2019-04-24T17:06:53.039991',
+ '%Y-%m-%dT%H:%M:%S.%f')
+ assert datetime_to_str(dt) == '2019-04-24T17:06:53.039991Z'
+
+
+def test_datetime_to_str_3():
+ dt = datetime.datetime.strptime('2020-11-02T04:40:12.748172-0800',
+ '%Y-%m-%dT%H:%M:%S.%f%z')
+ assert datetime_to_str(dt) == '2020-11-02T12:40:12.748172Z'
+
+
+def test_str_to_datetime_1():
+ dt = str_to_datetime('2020-03-03T09:21:43.636153304Z')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_2():
+ dt = str_to_datetime('2020-03-03T15:52:30.136257504-0600')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_3():
+ dt = str_to_datetime('2020-03-03T15:52:30.136257504')
+ assert type(dt) is datetime.datetime
+ assert dt.tzinfo is not None
+
+
+def test_str_to_datetime_invalid_format_1():
+ with pytest.raises(ValueError):
+ str_to_datetime('2020-03-03 15:52:30.136257504')
+
+
+def test_str_to_datetime_invalid_format_2():
+ with pytest.raises(ValueError):
+ str_to_datetime('2020-03-03')
+
+
+def test_datetime_now_1():
+ dt = str_to_datetime('2020-03-03T09:21:43.636153304Z')
+ dt_now = datetime_now()
+ assert type(dt_now) is datetime.datetime
+ assert dt_now.tzinfo is not None
+ assert dt < dt_now
--- /dev/null
+import datetime
+import re
+
+
+def datetime_now() -> datetime.datetime:
+ """
+ Return the current local date and time.
+ :return: Returns an aware datetime object of the current date
+ and time.
+ """
+ return datetime.datetime.now(tz=datetime.timezone.utc)
+
+
+def datetime_to_str(dt: datetime.datetime) -> str:
+ """
+ Convert a datetime object into a ISO 8601 string, e.g.
+ '2019-04-24T17:06:53.039991Z'.
+ :param dt: The datetime object to process.
+ :return: Return a string representing the date in
+ ISO 8601 (timezone=UTC).
+ """
+ return dt.astimezone(tz=datetime.timezone.utc).strftime(
+ '%Y-%m-%dT%H:%M:%S.%fZ')
+
+
+def str_to_datetime(string: str) -> datetime.datetime:
+ """
+ Convert an ISO 8601 string into a datetime object.
+ The following formats are supported:
+
+ - 2020-03-03T09:21:43.636153304Z
+ - 2020-03-03T15:52:30.136257504-0600
+ - 2020-03-03T15:52:30.136257504
+
+ :param string: The string to parse.
+ :return: Returns an aware datetime object of the given date
+ and time string.
+ :raises: :exc:`~exceptions.ValueError` for an unknown
+ datetime string.
+ """
+ fmts = [
+ '%Y-%m-%dT%H:%M:%S.%f',
+ '%Y-%m-%dT%H:%M:%S.%f%z'
+ ]
+
+ # In *all* cases, the 9 digit second precision is too much for
+ # Python's strptime. Shorten it to 6 digits.
+ p = re.compile(r'(\.[\d]{6})[\d]*')
+ string = p.sub(r'\1', string)
+
+ # Replace trailing Z with -0000, since (on Python 3.6.8) it
+ # won't parse.
+ if string and string[-1] == 'Z':
+ string = string[:-1] + '-0000'
+
+ for fmt in fmts:
+ try:
+ dt = datetime.datetime.strptime(string, fmt)
+ # Make sure the datetime object is aware (timezone is set).
+ # If not, then assume the time is in UTC.
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=datetime.timezone.utc)
+ return dt
+ except ValueError:
+ pass
+
+ raise ValueError("Time data {} does not match one of the formats {}".format(
+ string, str(fmts)))