See https://blog.miguelgrinberg.com/post/it-s-time-for-a-change-datetime-utcnow-is-now-deprecated
Signed-off-by: Zack Cerza <zack@redhat.com>
+import datetime
import logging
import os
import psutil
import sys
import yaml
-from datetime import datetime
from typing import Dict, List
from teuthology import (
from teuthology import safepath
log = logging.getLogger(__name__)
-start_time = datetime.utcnow()
+start_time = datetime.datetime.now(datetime.timezone.utc)
restart_file_path = '/tmp/teuthology-restart-dispatcher'
stop_file_path = '/tmp/teuthology-stop-dispatcher'
def sentinel(path):
if not os.path.exists(path):
return False
- file_mtime = datetime.utcfromtimestamp(os.path.getmtime(path))
+ file_mtime = datetime.datetime.fromtimestamp(
+ os.path.getmtime(path),
+ datetime.timezone.utc,
+ )
return file_mtime > start_time
+import datetime
import logging
import os
import subprocess
import requests
from urllib.parse import urljoin
-from datetime import datetime
from teuthology import exporter, kill, report, safepath
from teuthology.config import config as teuth_config
def run_with_watchdog(process, job_config):
- job_start_time = datetime.utcnow()
+ job_start_time = datetime.datetime.now(datetime.timezone.utc)
# Only push the information that's relevant to the watchdog, to save db
# load
hit_max_timeout = False
while process.poll() is None:
# Kill jobs that have been running longer than the global max
- run_time = datetime.utcnow() - job_start_time
+ run_time = datetime.datetime.now(datetime.timezone.utc) - job_start_time
total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
if total_seconds > teuth_config.max_job_time:
hit_max_timeout = True
import datetime
-import dateutil.tz
import dateutil.parser
import json
import os
def expired(self):
if self.expires is None:
return True
- utcnow = datetime.datetime.now(dateutil.tz.tzutc())
+ utcnow = datetime.datetime.now(datetime.timezone.utc)
offset = datetime.timedelta(minutes=30)
return self.expires < (utcnow + offset)
+import datetime
import json
import logging
import requests
import socket
import re
-from datetime import datetime
from paramiko import SSHException
from paramiko.ssh_exception import NoValidConnectionsError
for task in host_tasks:
timestamp = task['createdTime']
time_delta = (
- datetime.utcnow() - datetime.strptime(
- timestamp, self.timestamp_format)
+ datetime.datetime.now(datetime.timezone.utc) - datetime.datetime.strptime(
+ timestamp, self.timestamp_format).replace(tzinfo=datetime.timezone.utc)
).total_seconds()
# There should only be one deploy task matching our host. Just in
# case there are multiple, select a very recent one.
+import datetime
+
from copy import deepcopy
-from datetime import datetime
from mock import patch, DEFAULT, PropertyMock
from pytest import raises, mark
tasktype_result = dict(tasktypes=[dict(name='deploy', id=tasktype_id)])
schedule_result = dict()
host_tasks = [dict(
- createdTime=datetime.strftime(
- datetime.utcnow(), self.klass.timestamp_format),
+ createdTime=datetime.datetime.strftime(
+ datetime.datetime.now(datetime.timezone.utc),
+ self.klass.timestamp_format
+ ),
id=task_id,
)]
self.mocks['m_requests_Session_send']\
Ssh-key key handlers and associated routines
"""
import contextlib
+import datetime
import logging
import paramiko
import re
-from datetime import datetime
from io import StringIO
from teuthology import contextutil
"""
Return a UTC timestamp suitable for use in filenames
"""
- return datetime.utcnow().strftime(format_)
+ return datetime.datetime.now(datetime.timezone.utc).strftime(format_)
def backup_file(remote, path, sudo=False):
+import datetime
import os
from unittest.mock import patch, Mock, MagicMock
-from datetime import datetime, timedelta
from teuthology import worker
@patch("os.path.getmtime")
@patch("os.path.exists")
- @patch("teuthology.worker.datetime")
- def test_needs_restart(self, m_datetime, m_exists, m_getmtime):
+ def test_needs_restart(self, m_exists, m_getmtime):
m_exists.return_value = True
- m_datetime.utcfromtimestamp.return_value = datetime.utcnow() + timedelta(days=1)
- result = worker.sentinel(worker.restart_file_path)
- assert result
+ now = datetime.datetime.now(datetime.timezone.utc)
+ m_getmtime.return_value = (now + datetime.timedelta(days=1)).timestamp()
+ assert worker.sentinel(worker.restart_file_path)
@patch("os.path.getmtime")
@patch("os.path.exists")
- @patch("teuthology.worker.datetime")
- def test_does_not_need_restart(self, m_datetime, m_exists, getmtime):
+ def test_does_not_need_restart(self, m_exists, m_getmtime):
m_exists.return_value = True
- m_datetime.utcfromtimestamp.return_value = datetime.utcnow() - timedelta(days=1)
- result = worker.sentinel(worker.restart_file_path)
- assert not result
+ now = datetime.datetime.now(datetime.timezone.utc)
+ m_getmtime.return_value = (now - datetime.timedelta(days=1)).timestamp()
+ assert not worker.sentinel(worker.restart_file_path)
@patch("os.symlink")
def test_symlink_success(self, m_symlink):
+import datetime
import logging
import os
import subprocess
import time
import yaml
-from datetime import datetime
-
from teuthology import (
# non-modules
setup_log_file,
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
log = logging.getLogger(__name__)
-start_time = datetime.utcnow()
+start_time = datetime.datetime.now(datetime.timezone.utc)
restart_file_path = '/tmp/teuthology-restart-workers'
stop_file_path = '/tmp/teuthology-stop-workers'
def sentinel(path):
if not os.path.exists(path):
return False
- file_mtime = datetime.utcfromtimestamp(os.path.getmtime(path))
+ file_mtime = datetime.datetime.fromtimestamp(
+ os.path.getmtime(path),
+ datetime.timezone.utc,
+ )
if file_mtime > start_time:
return True
else:
def run_with_watchdog(process, job_config):
- job_start_time = datetime.utcnow()
+ job_start_time = datetime.datetime.now(datetime.timezone.utc)
# Only push the information that's relevant to the watchdog, to save db
# load
symlink_worker_log(job_config['worker_log'], job_config['archive_path'])
while process.poll() is None:
# Kill jobs that have been running longer than the global max
- run_time = datetime.utcnow() - job_start_time
+ run_time = datetime.datetime.now(datetime.timezone.utc) - job_start_time
total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
if total_seconds > teuth_config.max_job_time:
log.warning("Job ran longer than {max}s. Killing...".format(