]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Add job expiration dates
authorZack Cerza <zack@redhat.com>
Mon, 1 Jul 2024 23:00:46 +0000 (17:00 -0600)
committerZack Cerza <zack@redhat.com>
Thu, 8 Aug 2024 00:05:15 +0000 (18:05 -0600)
This feature has two parts:
* Specifying expiration dates when scheduling test runs
* A global maximum age

Expiration dates are provided by passing `--expire` to `teuthology-suite` with
a relative value like `1d` (one day), `1w` (one week), or an absolute value like
`1999-12-31_23:59:59`.

A new configuration item, `max_job_age`, is specified in seconds. This defaults
to two weeks.

When the dispatcher checks the queue for the next job to run, it will first
compare the job's `timestamp` value - which reflects the time the job was
scheduled. If more than `max_job_age` seconds have passed, the job is skipped
and marked dead. It next checks for an `expire` value; if that value is in the
past, the job is skipped and marked dead. Otherwise, it will be run as usual.

Signed-off-by: Zack Cerza <zack@redhat.com>
docs/siteconfig.rst
scripts/suite.py
teuthology/config.py
teuthology/dispatcher/__init__.py
teuthology/dispatcher/test/test_dispatcher.py
teuthology/suite/__init__.py
teuthology/suite/placeholder.py
teuthology/suite/run.py
teuthology/suite/test/test_placeholder.py
teuthology/suite/test/test_run_.py

index 83c5f1599c2f5bf954d3ca00abc0cc5f53172443..effb8219c227aa2772bdb709e8447da210d26c12 100644 (file)
@@ -85,6 +85,10 @@ Here is a sample configuration with many of the options set and documented::
     # processes
     watchdog_interval: 120
 
+    # How old a scheduled job can be, in seconds, before the dispatcher
+    # considers it 'expired', skipping it.
+    max_job_age: 1209600
+
     # How long a scheduled job should be allowed to run, in seconds, before 
     # it is killed by the supervisor process.
     max_job_time: 259200
index 77561b7e03c43a114f88d3b50b03a0a50392ec65..c98a5cb89225772dee7123fa7d4554097e8d011a 100644 (file)
@@ -112,6 +112,10 @@ Scheduler arguments:
                               When tests finish or time out, send an email
                               here. May also be specified in ~/.teuthology.yaml
                               as 'results_email'
+  --expire <datetime>         Do not execute jobs in the run if they have not
+                              completed by this time. Valid formats include
+                              ISO 8601, and relative offsets like '90s', '30m',
+                              '1h', '3d', or '1w'
   --rocketchat <rocketchat>   Comma separated list of Rocket.Chat channels where
                               to send a message when tests finished or time out.
                               To be used with --sleep-before-teardown option.
index 55fa966c2352b37535103e3107d5b39034bf68a4..30204aa466a4bea4f99aacbc236f5b8f053a277b 100644 (file)
@@ -158,6 +158,7 @@ class TeuthologyConfig(YamlConfig):
         'job_threshold': 500,
         'lab_domain': 'front.sepia.ceph.com',
         'lock_server': 'http://paddles.front.sepia.ceph.com/',
+        'max_job_age': 1209600,  # 2 weeks
         'max_job_time': 259200,  # 3 days
         'nsupdate_url': 'http://nsupdate.front.sepia.ceph.com/update',
         'results_server': 'http://paddles.front.sepia.ceph.com/',
index 15c4003c990ff9c489126365effaf89ce96fcf00..59f8ae3279d8e27509e2322d6cc32c0ea8bf6815 100644 (file)
@@ -22,6 +22,7 @@ from teuthology.config import config as teuth_config
 from teuthology.dispatcher import supervisor
 from teuthology.exceptions import BranchNotFoundError, CommitNotFoundError, SkipJob, MaxWhileTries
 from teuthology.lock import ops as lock_ops
+from teuthology.util.time import parse_timestamp
 from teuthology import safepath
 
 log = logging.getLogger(__name__)
@@ -234,6 +235,8 @@ def find_dispatcher_processes() -> Dict[str, List[psutil.Process]]:
 
 def prep_job(job_config, log_file_path, archive_dir):
     job_id = job_config['job_id']
+    check_job_expiration(job_config)
+
     safe_archive = safepath.munge(job_config['name'])
     job_config['worker_log'] = log_file_path
     archive_path_full = os.path.join(
@@ -308,6 +311,31 @@ def prep_job(job_config, log_file_path, archive_dir):
     return job_config, teuth_bin_path
 
 
+def check_job_expiration(job_config):
+    job_id = job_config['job_id']
+    expired = False
+    now = datetime.datetime.now(datetime.timezone.utc)
+    if expire_str := job_config.get('timestamp'):
+        expire = parse_timestamp(expire_str) + \
+            datetime.timedelta(seconds=teuth_config.max_job_age)
+        expired = expire < now
+    if not expired and (expire_str := job_config.get('expire')):
+        try:
+            expire = parse_timestamp(expire_str)
+            expired = expired or expire < now
+        except ValueError:
+            log.warning(f"Failed to parse job expiration: {expire_str=}")
+            pass
+    if expired:
+        log.info(f"Skipping job {job_id} because it is expired: {expire_str} is in the past")
+        report.try_push_job_info(
+            job_config,
+            # TODO: Add a 'canceled' status to paddles, and use that.
+            dict(status='dead'),
+        )
+        raise SkipJob()
+
+
 def lock_machines(job_config):
     report.try_push_job_info(job_config, dict(status='running'))
     fake_ctx = supervisor.create_fake_context(job_config, block=True)
index e7c59d8bd2ab5dfdca7188c69e000437e0575685..58f58cf9cf1f970997744c2cfbdc0cbbcefe12f0 100644 (file)
@@ -7,6 +7,7 @@ from unittest.mock import patch, Mock, MagicMock
 from teuthology import dispatcher
 from teuthology.config import FakeNamespace
 from teuthology.contextutil import MaxWhileTries
+from teuthology.util.time import TIMESTAMP_FMT
 
 
 class TestDispatcher(object):
@@ -172,3 +173,31 @@ class TestDispatcher(object):
         for i in range(len(jobs)):
             push_call = m_try_push_job_info.call_args_list[i]
             assert push_call[0][1]['status'] == 'dead'
+
+    @pytest.mark.parametrize(
+        ["timestamp", "expire", "skip"],
+        [
+            [datetime.timedelta(days=-1), None, False],
+            [datetime.timedelta(days=-30), None, True],
+            [None, datetime.timedelta(days=1), False],
+            [None, datetime.timedelta(days=-1), True],
+            [datetime.timedelta(days=-1), datetime.timedelta(days=1), False],
+            [datetime.timedelta(days=1), datetime.timedelta(days=-1), True],
+        ]
+    )
+    @patch("teuthology.dispatcher.report.try_push_job_info")
+    def test_check_job_expiration(self, _, timestamp, expire, skip):
+        now = datetime.datetime.now(datetime.timezone.utc)
+        job_config = dict(
+            job_id="1",
+            name="job_name",
+        )
+        if timestamp:
+            job_config["timestamp"] = (now + timestamp).strftime(TIMESTAMP_FMT)
+        if expire:
+            job_config["expire"] = (now + expire).strftime(TIMESTAMP_FMT)
+        if skip:
+            with pytest.raises(dispatcher.SkipJob):
+                dispatcher.check_job_expiration(job_config)
+        else:
+            dispatcher.check_job_expiration(job_config)
index 6fc167fab6ba3c7f62c43dd63977861cf39ebfcf..8a17cf5f105e8d3e115586e45ae906abc98e8c88 100644 (file)
@@ -63,6 +63,9 @@ def process_args(args):
         elif key == 'subset' and value is not None:
             # take input string '2/3' and turn into (2, 3)
             value = tuple(map(int, value.split('/')))
+        elif key == 'expire' and value is None:
+            # Skip empty 'expire' values
+            continue
         elif key in ('filter_all', 'filter_in', 'filter_out', 'rerun_statuses'):
             if not value:
                 value = []
index 3a177511700b18e0bbd836420e3e309b56ae4bde..f812fccac2d5e6845fe6da6935eb2ffe2291e4a4 100644 (file)
@@ -45,6 +45,7 @@ def substitute_placeholders(input_dict, values_dict):
 # Template for the config that becomes the base for each generated job config
 dict_templ = {
     'branch': Placeholder('ceph_branch'),
+    'expire': Placeholder('expire'),
     'sha1': Placeholder('ceph_hash'),
     'teuthology_branch': Placeholder('teuthology_branch'),
     'teuthology_sha1': Placeholder('teuthology_sha1'),
index 600f1568feab17910f724d958cf0deb03bf859a4..4f425cada659dd0febe15e6acac04438b3136da6 100644 (file)
@@ -1,4 +1,5 @@
 import copy
+import datetime
 import logging
 import os
 import pwd
@@ -8,7 +9,6 @@ import time
 
 from humanfriendly import format_timespan
 
-from datetime import datetime
 from tempfile import NamedTemporaryFile
 from teuthology import repo_utils
 
@@ -24,7 +24,7 @@ from teuthology.suite import util
 from teuthology.suite.merge import config_merge
 from teuthology.suite.build_matrix import build_matrix
 from teuthology.suite.placeholder import substitute_placeholders, dict_templ
-from teuthology.util.time import TIMESTAMP_FMT
+from teuthology.util.time import parse_offset, parse_timestamp, TIMESTAMP_FMT
 
 log = logging.getLogger(__name__)
 
@@ -87,6 +87,15 @@ class Run(object):
 
         :returns: A JobConfig object
         """
+        now = datetime.datetime.now(datetime.timezone.utc)
+        expires = self.get_expiration()
+        if expires:
+            if now > expires:
+                util.schedule_fail(
+                    f"Refusing to schedule because the expiration date is in the past: {self.args.expire}",
+                    dry_run=self.args.dry_run,
+                )
+
         self.os = self.choose_os()
         self.kernel_dict = self.choose_kernel()
         ceph_hash = self.choose_ceph_hash()
@@ -123,9 +132,29 @@ class Run(object):
             suite_repo=config.get_ceph_qa_suite_git_url(),
             suite_relpath=self.args.suite_relpath,
             flavor=self.args.flavor,
+            expire=expires.strftime(TIMESTAMP_FMT) if expires else None,
         )
         return self.build_base_config()
 
+    def get_expiration(self, _base_time: datetime.datetime | None = None) -> datetime.datetime | None:
+        """
+        _base_time: For testing, calculate relative offsets from this base time
+
+        :returns:   True if the job should run; False if it has expired
+        """
+        log.info(f"Checking for expiration ({self.args.expire})")
+        expires_str = self.args.expire
+        if expires_str is None:
+            return None
+        now = datetime.datetime.now(datetime.timezone.utc)
+        if _base_time is None:
+            _base_time = now
+        try:
+            expires = parse_timestamp(expires_str)
+        except ValueError:
+            expires = _base_time + parse_offset(expires_str)
+        return expires
+
     def choose_os(self):
         os_type = self.args.distro
         os_version = self.args.distro_version
index acf1b6a44fd86e2b9a36c4996da4613242d215d3..31b51755d28e94e8d97709cd1e3de0d591396160 100644 (file)
@@ -22,7 +22,8 @@ class TestPlaceholder(object):
             suite_repo='https://example.com/ceph/suite.git',
             suite_relpath='',
             ceph_repo='https://example.com/ceph/ceph.git',
-            flavor='default'
+            flavor='default',
+            expire='expire',
         )
         output_dict = substitute_placeholders(dict_templ, input_dict)
         assert output_dict['suite'] == 'suite'
@@ -50,6 +51,7 @@ class TestPlaceholder(object):
             suite_relpath='',
             ceph_repo='https://example.com/ceph/ceph.git',
             flavor=None,
+            expire='expire',
         )
         output_dict = substitute_placeholders(dict_templ, input_dict)
         assert 'os_type' not in output_dict
index abe78a38630783fa946e36821e46f973fbe25c29..51ce29a859041d8d8e3e2ec009e5cda6a2db9094 100644 (file)
@@ -4,7 +4,7 @@ import requests
 import contextlib
 import yaml
 
-from datetime import datetime
+from datetime import datetime, timedelta, timezone
 from mock import patch, call, ANY
 from io import StringIO
 from io import BytesIO
@@ -90,6 +90,45 @@ class TestRun(object):
         with pytest.raises(ScheduleFailError):
             self.klass(self.args)
 
+    @pytest.mark.parametrize(
+        ["expire", "delta", "result"],
+        [
+            [None, timedelta(), False],
+            ["1m", timedelta(), True],
+            ["1m", timedelta(minutes=-2), False],
+            ["1m", timedelta(minutes=2), True],
+            ["7d", timedelta(days=-14), False],
+        ]
+    )
+    @patch('teuthology.repo_utils.fetch_repo')
+    @patch('teuthology.suite.run.util.git_branch_exists')
+    @patch('teuthology.suite.run.util.package_version_for_hash')
+    @patch('teuthology.suite.run.util.git_ls_remote')
+    def test_get_expiration(
+        self,
+        m_git_ls_remote,
+        m_package_version_for_hash,
+        m_git_branch_exists,
+        m_fetch_repo,
+        expire,
+        delta,
+        result,
+    ):
+        m_git_ls_remote.side_effect = 'hash'
+        m_package_version_for_hash.return_value = 'a_version'
+        m_git_branch_exists.return_value = True
+        self.args.expire = expire
+        obj = self.klass(self.args)
+        now = datetime.now(timezone.utc)
+        expires_result = obj.get_expiration(_base_time=now + delta)
+        if expire is None:
+            assert expires_result is None
+            assert obj.base_config['expire'] is None
+        else:
+            assert expires_result is not None
+            assert (now < expires_result) is result
+            assert obj.base_config['expire']
+
     @patch('teuthology.suite.run.util.fetch_repos')
     @patch('requests.head')
     @patch('teuthology.suite.run.util.git_branch_exists')