# processes
watchdog_interval: 120
+ # How old a scheduled job can be, in seconds, before the dispatcher
+ # considers it 'expired', skipping it.
+ max_job_age: 1209600
+
# How long a scheduled job should be allowed to run, in seconds, before
# it is killed by the supervisor process.
max_job_time: 259200
When tests finish or time out, send an email
here. May also be specified in ~/.teuthology.yaml
as 'results_email'
+ --expire <datetime> Do not execute jobs in the run if they have not
+ completed by this time. Valid formats include
+ ISO 8601, and relative offsets like '90s', '30m',
+ '1h', '3d', or '1w'
--rocketchat <rocketchat> Comma separated list of Rocket.Chat channels where
to send a message when tests finished or time out.
To be used with --sleep-before-teardown option.
'job_threshold': 500,
'lab_domain': 'front.sepia.ceph.com',
'lock_server': 'http://paddles.front.sepia.ceph.com/',
+ 'max_job_age': 1209600, # 2 weeks
'max_job_time': 259200, # 3 days
'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update',
'results_server': 'http://paddles.front.sepia.ceph.com/',
from teuthology.dispatcher import supervisor
from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
from teuthology.lock import ops as lock_ops
+from teuthology.util.time import parse_timestamp
from teuthology import safepath
log = logging.getLogger(__name__)
def prep_job(job_config, log_file_path, archive_dir):
job_id = job_config['job_id']
+ check_job_expiration(job_config)
+
safe_archive = safepath.munge(job_config['name'])
job_config['worker_log'] = log_file_path
archive_path_full = os.path.join(
return job_config, teuth_bin_path
+def check_job_expiration(job_config):
+ job_id = job_config['job_id']
+ expired = False
+ now = datetime.datetime.now(datetime.timezone.utc)
+ if expire_str := job_config.get('timestamp'):
+ expire = parse_timestamp(expire_str) + \
+ datetime.timedelta(seconds=teuth_config.max_job_age)
+ expired = expire < now
+ if not expired and (expire_str := job_config.get('expire')):
+ try:
+ expire = parse_timestamp(expire_str)
+ expired = expired or expire < now
+ except ValueError:
+ log.warning(f"Failed to parse job expiration: {expire_str=}")
+ pass
+ if expired:
+ log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past")
+ report.try_push_job_info(
+ job_config,
+ # TODO: Add a 'canceled' status to paddles, and use that.
+ dict(status='dead'),
+ )
+ raise SkipJob()
+
+
def lock_machines(job_config):
report.try_push_job_info(job_config, dict(status='running'))
fake_ctx = supervisor.create_fake_context(job_config, block=True)
from teuthology import dispatcher
from teuthology.config import FakeNamespace
from teuthology.contextutil import MaxWhileTries
+from teuthology.util.time import TIMESTAMP_FMT
class TestDispatcher(object):
for i in range(len(jobs)):
push_call = m_try_push_job_info.call_args_list[i]
assert push_call[0][1]['status'] == 'dead'
+
+ @pytest.mark.parametrize(
+ ["timestamp", "expire", "skip"],
+ [
+ [datetime.timedelta(days=-1), None, False],
+ [datetime.timedelta(days=-30), None, True],
+ [None, datetime.timedelta(days=1), False],
+ [None, datetime.timedelta(days=-1), True],
+ [datetime.timedelta(days=-1), datetime.timedelta(days=1), False],
+ [datetime.timedelta(days=1), datetime.timedelta(days=-1), True],
+ ]
+ )
+ @patch("teuthology.dispatcher.report.try_push_job_info")
+ def test_check_job_expiration(self, _, timestamp, expire, skip):
+ now = datetime.datetime.now(datetime.timezone.utc)
+ job_config = dict(
+ job_id="1",
+ name="job_name",
+ )
+ if timestamp:
+ job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT)
+ if expire:
+ job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT)
+ if skip:
+ with pytest.raises(dispatcher.SkipJob):
+ dispatcher.check_job_expiration(job_config)
+ else:
+ dispatcher.check_job_expiration(job_config)
elif key == 'subset' and value is not None:
# take input string '2/3' and turn into (2, 3)
value = tuple(map(int, value.split('/')))
+ elif key == 'expire' and value is None:
+ # Skip empty 'expire' values
+ continue
elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'):
if not value:
value = []
# Template for the config that becomes the base for each generated job config
dict_templ = {
'branch': Placeholder('ceph_branch'),
+ 'expire': Placeholder('expire'),
'sha1': Placeholder('ceph_hash'),
'teuthology_branch': Placeholder('teuthology_branch'),
'teuthology_sha1': Placeholder('teuthology_sha1'),
import copy
+import datetime
import logging
import os
import pwd
from humanfriendly import format_timespan
-from datetime import datetime
from tempfile import NamedTemporaryFile
from teuthology import repo_utils
from teuthology.suite.merge import config_merge
from teuthology.suite.build_matrix import build_matrix
from teuthology.suite.placeholder import substitute_placeholders, dict_templ
-from teuthology.util.time import TIMESTAMP_FMT
+from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT
log = logging.getLogger(__name__)
:returns: A JobConfig object
"""
+ now = datetime.datetime.now(datetime.timezone.utc)
+ expires = self.get_expiration()
+ if expires:
+ if now > expires:
+ util.schedule_fail(
+ f"Refusing to schedule because the expiration date is in the past: {self.args.expire}",
+ dry_run=self.args.dry_run,
+ )
+
self.os = self.choose_os()
self.kernel_dict = self.choose_kernel()
ceph_hash = self.choose_ceph_hash()
suite_repo=config.get_ceph_qa_suite_git_url(),
suite_relpath=self.args.suite_relpath,
flavor=self.args.flavor,
+ expire=expires.strftime(TIMESTAMP_FMT) if expires else None,
)
return self.build_base_config()
+ def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None:
+ """
+ _base_time: For testing, calculate relative offsets from this base time
+
+ :returns: True if the job should run; False if it has expired
+ """
+ log.info(f"Checking for expiration ({self.args.expire})")
+ expires_str = self.args.expire
+ if expires_str is None:
+ return None
+ now = datetime.datetime.now(datetime.timezone.utc)
+ if _base_time is None:
+ _base_time = now
+ try:
+ expires = parse_timestamp(expires_str)
+ except ValueError:
+ expires = _base_time + parse_offset(expires_str)
+ return expires
+
def choose_os(self):
os_type = self.args.distro
os_version = self.args.distro_version
suite_repo='https://example.com/ceph/suite.git',
suite_relpath='',
ceph_repo='https://example.com/ceph/ceph.git',
- flavor='default'
+ flavor='default',
+ expire='expire',
)
output_dict = substitute_placeholders(dict_templ, input_dict)
assert output_dict['suite'] == 'suite'
suite_relpath='',
ceph_repo='https://example.com/ceph/ceph.git',
flavor=None,
+ expire='expire',
)
output_dict = substitute_placeholders(dict_templ, input_dict)
assert 'os_type' not in output_dict
import contextlib
import yaml
-from datetime import datetime
+from datetime import datetime, timedelta, timezone
from mock import patch, call, ANY
from io import StringIO
from io import BytesIO
with pytest.raises(ScheduleFailError):
self.klass(self.args)
+ @pytest.mark.parametrize(
+ ["expire", "delta", "result"],
+ [
+ [None, timedelta(), False],
+ ["1m", timedelta(), True],
+ ["1m", timedelta(minutes=-2), False],
+ ["1m", timedelta(minutes=2), True],
+ ["7d", timedelta(days=-14), False],
+ ]
+ )
+ @patch('teuthology.repo_utils.fetch_repo')
+ @patch('teuthology.suite.run.util.git_branch_exists')
+ @patch('teuthology.suite.run.util.package_version_for_hash')
+ @patch('teuthology.suite.run.util.git_ls_remote')
+ def test_get_expiration(
+ self,
+ m_git_ls_remote,
+ m_package_version_for_hash,
+ m_git_branch_exists,
+ m_fetch_repo,
+ expire,
+ delta,
+ result,
+ ):
+ m_git_ls_remote.side_effect = 'hash'
+ m_package_version_for_hash.return_value = 'a_version'
+ m_git_branch_exists.return_value = True
+ self.args.expire = expire
+ obj = self.klass(self.args)
+ now = datetime.now(timezone.utc)
+ expires_result = obj.get_expiration(_base_time=now + delta)
+ if expire is None:
+ assert expires_result is None
+ assert obj.base_config['expire'] is None
+ else:
+ assert expires_result is not None
+ assert (now < expires_result) is result
+ assert obj.base_config['expire']
+
@patch('teuthology.suite.run.util.fetch_repos')
@patch('requests.head')
@patch('teuthology.suite.run.util.git_branch_exists')