]> git-server-git.apps.pok.os.sepia.ceph.com Git - teuthology.git/commitdiff
archive logs before nuking machines when job times out
authorShraddha Agrawal <shraddha.agrawal000@gmail.com>
Wed, 19 Aug 2020 13:52:58 +0000 (19:22 +0530)
committerJosh Durgin <jdurgin@redhat.com>
Sat, 16 Jan 2021 21:36:26 +0000 (16:36 -0500)
This commit does the following:

1. teuthology/task/internal/__init__.py: Adds the file path of ceph
log directory to the job's info.yaml log file.

2. teuthology/dispatcher/supervisor.py: Compress and transfer the
log dirs listed in info.yaml to teuthology host before nuking test
machines incase job times out.

The above enables graceful job timeout.

Signed-off-by: Shraddha Agrawal <shraddha.agrawal000@gmail.com>
teuthology/dispatcher/supervisor.py
teuthology/task/internal/__init__.py

index 6c4314f6dfece157607b4d536a630310c784791a..f9be3a532c68e8fbd164d3edb829fb0ef727ae8d 100644 (file)
@@ -4,6 +4,10 @@ import subprocess
 import time
 import yaml
 
+from datetime import datetime
+from tarfile import ReadError
+
+import teuthology
 from teuthology import report
 from teuthology import safepath
 from teuthology.config import config as teuth_config
@@ -12,10 +16,12 @@ from teuthology import setup_log_file, install_except_hook
 from teuthology.lock.ops import reimage_many
 from teuthology.misc import get_user
 from teuthology.config import FakeNamespace
-from teuthology.worker import run_with_watchdog
 from teuthology.job_status import get_status
-import teuthology
 from teuthology.nuke import nuke
+from teuthology.kill import kill_job
+from teuthology.misc import pull_directory
+from teuthology.task.internal import add_remotes
+from teuthology.orchestra import run
 
 log = logging.getLogger(__name__)
 
@@ -146,6 +152,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose):
 
     if p.returncode != 0:
         log.error('Child exited with code %d', p.returncode)
+        return
     else:
         log.info('Success!')
     if 'targets' in job_config:
@@ -169,17 +176,58 @@ def unlock_targets(job_config):
     serializer = report.ResultsSerializer(teuth_config.archive_base)
     job_info = serializer.job_info(job_config['name'], job_config['job_id'])
     job_status = get_status(job_info)
-    if job_status == 'pass' or job_config.get('unlock_on_failure', False):
+    if job_status == 'pass' or \
+            (job_config.get('unlock_on_failure', False) and not job_config.get('nuke-on-error', False)):
         log.info('Unlocking machines...')
         fake_ctx = create_fake_context(job_config)
         for machine in job_info['targets'].keys():
             teuthology.lock.ops.unlock_one(fake_ctx, machine, job_info['owner'],
                                            job_info['archive_path'])
     if job_status != 'pass' and job_config.get('nuke-on-error', False):
+        log.info('Nuking machines...')
         fake_ctx = create_fake_context(job_config)
         nuke(fake_ctx, True)
 
 
+def run_with_watchdog(process, job_config):
+    job_start_time = datetime.utcnow()
+
+    # Only push the information that's relevant to the watchdog, to save db
+    # load
+    job_info = dict(
+        name=job_config['name'],
+        job_id=job_config['job_id'],
+    )
+
+    # Sleep once outside of the loop to avoid double-posting jobs
+    time.sleep(teuth_config.watchdog_interval)
+    while process.poll() is None:
+        # Kill jobs that have been running longer than the global max
+        run_time = datetime.utcnow() - job_start_time
+        total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
+        if total_seconds > teuth_config.max_job_time:
+            log.warning("Job ran longer than {max}s. Killing...".format(
+                max=teuth_config.max_job_time))
+            transfer_archives(job_info['name'], job_info['job_id'],
+                              teuth_config.archive_base, job_config)
+            kill_job(job_info['name'], job_info['job_id'],
+                     teuth_config.archive_base, job_config['owner'])
+
+        # calling this without a status just updates the jobs updated time
+        report.try_push_job_info(job_info)
+        time.sleep(teuth_config.watchdog_interval)
+
+    # we no longer support testing theses old branches
+    assert(job_config.get('teuthology_branch') not in ('argonaut', 'bobtail',
+                                                       'cuttlefish', 'dumpling'))
+
+    # Let's make sure that paddles knows the job is finished. We don't know
+    # the status, but if it was a pass or fail it will have already been
+    # reported to paddles. In that case paddles ignores the 'dead' status.
+    # If the job was killed, paddles will use the 'dead' status.
+    report.try_push_job_info(job_info, dict(status='dead'))
+
+
 def create_fake_context(job_config, block=False):
     if job_config['owner'] is None:
         job_config['owner'] = get_user()
@@ -202,3 +250,66 @@ def create_fake_context(job_config, block=False):
 
     fake_ctx = FakeNamespace(ctx_args)
     return fake_ctx
+
+
+def transfer_archives(run_name, job_id, archive_base, job_config):
+    serializer = report.ResultsSerializer(archive_base)
+    job_info = serializer.job_info(run_name, job_id, simple=True)
+
+    if 'archive' in job_info:
+        ctx = create_fake_context(job_config)
+        add_remotes(ctx, job_config)
+
+        for log_type, log_path in job_info['archive'].items():
+            if log_type == 'init':
+                log_type = ''
+            compress_logs(ctx, log_path)
+            archive_logs(ctx, log_path, log_type)
+    else:
+        log.info('No archives to transfer.')
+
+
+def archive_logs(ctx, remote_path, log_path):
+    """
+    Archive directories from all nodes in a cliuster. It pulls all files in
+    remote_path dir to job's archive dir under log_path dir.
+    """
+    path = os.path.join(ctx.archive, 'remote')
+    if (not os.path.exists(path)):
+        os.mkdir(path)
+    for remote in ctx.cluster.remotes.keys():
+        sub = os.path.join(path, remote.shortname)
+        if (not os.path.exists(sub)):
+            os.makedirs(sub)
+        try:
+            pull_directory(remote, remote_path, os.path.join(sub, log_path))
+        except ReadError:
+            pass
+
+
+def compress_logs(ctx, remote_dir):
+    """
+    Compress all files in remote_dir from all nodes in a cluster.
+    """
+    log.info('Compressing logs...')
+    run.wait(
+        ctx.cluster.run(
+            args=[
+                'sudo',
+                'find',
+                remote_dir,
+                '-name',
+                '*.log',
+                '-print0',
+                run.Raw('|'),
+                'sudo',
+                'xargs',
+                '-0',
+                '--no-run-if-empty',
+                '--',
+                'gzip',
+                '--',
+            ],
+            wait=False,
+        ),
+    )
index 5e63d91c1cabb11c4fe095926a2ae2a8a388ff91..aa898c9b64fa3af9da5426b77e2254eb1964dc2d 100644 (file)
@@ -361,6 +361,15 @@ def archive(ctx, config):
         )
     )
 
+    with open(os.path.join(ctx.archive, 'info.yaml'), 'r+') as info_file:
+        info_yaml = yaml.safe_load(info_file)
+        info_file.seek(0)
+        if 'archive' not in info_yaml:
+            info_yaml['archive'] = {'init': archive_dir}
+        else:
+            info_yaml['archive']['init'] = archive_dir
+        yaml.safe_dump(info_yaml, info_file, default_flow_style=False)
+
     try:
         yield
     except Exception: