From: Shraddha Agrawal Date: Wed, 19 Aug 2020 13:52:58 +0000 (+0530) Subject: archive logs before nuking machines when job times out X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=75a3684d8b6a3f02cee03d0ba5eb01fcbe75329a;p=teuthology.git archive logs before nuking machines when job times out This commit does the following: 1. teuthology/task/internal/__init__.py: Adds the file path of ceph log directory to the job's info.yaml log file. 2. teuthology/dispatcher/supervisor.py: Compress and transfer the log dirs listed in info.yaml to teuthology host before nuking test machines incase job times out. The above enables graceful job timeout. Signed-off-by: Shraddha Agrawal --- diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 6c4314f6d..f9be3a532 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -4,6 +4,10 @@ import subprocess import time import yaml +from datetime import datetime +from tarfile import ReadError + +import teuthology from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config @@ -12,10 +16,12 @@ from teuthology import setup_log_file, install_except_hook from teuthology.lock.ops import reimage_many from teuthology.misc import get_user from teuthology.config import FakeNamespace -from teuthology.worker import run_with_watchdog from teuthology.job_status import get_status -import teuthology from teuthology.nuke import nuke +from teuthology.kill import kill_job +from teuthology.misc import pull_directory +from teuthology.task.internal import add_remotes +from teuthology.orchestra import run log = logging.getLogger(__name__) @@ -146,6 +152,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): if p.returncode != 0: log.error('Child exited with code %d', p.returncode) + return else: log.info('Success!') if 'targets' in job_config: @@ -169,17 +176,58 @@ def unlock_targets(job_config): serializer = report.ResultsSerializer(teuth_config.archive_base) job_info = serializer.job_info(job_config['name'], job_config['job_id']) job_status = get_status(job_info) - if job_status == 'pass' or job_config.get('unlock_on_failure', False): + if job_status == 'pass' or \ + (job_config.get('unlock_on_failure', False) and not job_config.get('nuke-on-error', False)): log.info('Unlocking machines...') fake_ctx = create_fake_context(job_config) for machine in job_info['targets'].keys(): teuthology.lock.ops.unlock_one(fake_ctx, machine, job_info['owner'], job_info['archive_path']) if job_status != 'pass' and job_config.get('nuke-on-error', False): + log.info('Nuking machines...') fake_ctx = create_fake_context(job_config) nuke(fake_ctx, True) +def run_with_watchdog(process, job_config): + job_start_time = datetime.utcnow() + + # Only push the information that's relevant to the watchdog, to save db + # load + job_info = dict( + name=job_config['name'], + job_id=job_config['job_id'], + ) + + # Sleep once outside of the loop to avoid double-posting jobs + time.sleep(teuth_config.watchdog_interval) + while process.poll() is None: + # Kill jobs that have been running longer than the global max + run_time = datetime.utcnow() - job_start_time + total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds + if total_seconds > teuth_config.max_job_time: + log.warning("Job ran longer than {max}s. Killing...".format( + max=teuth_config.max_job_time)) + transfer_archives(job_info['name'], job_info['job_id'], + teuth_config.archive_base, job_config) + kill_job(job_info['name'], job_info['job_id'], + teuth_config.archive_base, job_config['owner']) + + # calling this without a status just updates the jobs updated time + report.try_push_job_info(job_info) + time.sleep(teuth_config.watchdog_interval) + + # we no longer support testing theses old branches + assert(job_config.get('teuthology_branch') not in ('argonaut', 'bobtail', + 'cuttlefish', 'dumpling')) + + # Let's make sure that paddles knows the job is finished. We don't know + # the status, but if it was a pass or fail it will have already been + # reported to paddles. In that case paddles ignores the 'dead' status. + # If the job was killed, paddles will use the 'dead' status. + report.try_push_job_info(job_info, dict(status='dead')) + + def create_fake_context(job_config, block=False): if job_config['owner'] is None: job_config['owner'] = get_user() @@ -202,3 +250,66 @@ def create_fake_context(job_config, block=False): fake_ctx = FakeNamespace(ctx_args) return fake_ctx + + +def transfer_archives(run_name, job_id, archive_base, job_config): + serializer = report.ResultsSerializer(archive_base) + job_info = serializer.job_info(run_name, job_id, simple=True) + + if 'archive' in job_info: + ctx = create_fake_context(job_config) + add_remotes(ctx, job_config) + + for log_type, log_path in job_info['archive'].items(): + if log_type == 'init': + log_type = '' + compress_logs(ctx, log_path) + archive_logs(ctx, log_path, log_type) + else: + log.info('No archives to transfer.') + + +def archive_logs(ctx, remote_path, log_path): + """ + Archive directories from all nodes in a cliuster. It pulls all files in + remote_path dir to job's archive dir under log_path dir. + """ + path = os.path.join(ctx.archive, 'remote') + if (not os.path.exists(path)): + os.mkdir(path) + for remote in ctx.cluster.remotes.keys(): + sub = os.path.join(path, remote.shortname) + if (not os.path.exists(sub)): + os.makedirs(sub) + try: + pull_directory(remote, remote_path, os.path.join(sub, log_path)) + except ReadError: + pass + + +def compress_logs(ctx, remote_dir): + """ + Compress all files in remote_dir from all nodes in a cluster. + """ + log.info('Compressing logs...') + run.wait( + ctx.cluster.run( + args=[ + 'sudo', + 'find', + remote_dir, + '-name', + '*.log', + '-print0', + run.Raw('|'), + 'sudo', + 'xargs', + '-0', + '--no-run-if-empty', + '--', + 'gzip', + '--', + ], + wait=False, + ), + ) diff --git a/teuthology/task/internal/__init__.py b/teuthology/task/internal/__init__.py index 640008e7e..a73bc02f3 100644 --- a/teuthology/task/internal/__init__.py +++ b/teuthology/task/internal/__init__.py @@ -348,6 +348,15 @@ def archive(ctx, config): ) ) + with open(os.path.join(ctx.archive, 'info.yaml'), 'r+') as info_file: + info_yaml = yaml.safe_load(info_file) + info_file.seek(0) + if 'archive' not in info_yaml: + info_yaml['archive'] = {'init': archive_dir} + else: + info_yaml['archive']['init'] = archive_dir + yaml.safe_dump(info_yaml, info_file, default_flow_style=False) + try: yield except Exception: