import time
import yaml
+from datetime import datetime
+from tarfile import ReadError
+
+import teuthology
from teuthology import report
from teuthology import safepath
from teuthology.config import config as teuth_config
from teuthology.lock.ops import reimage_many
from teuthology.misc import get_user
from teuthology.config import FakeNamespace
-from teuthology.worker import run_with_watchdog
from teuthology.job_status import get_status
-import teuthology
from teuthology.nuke import nuke
+from teuthology.kill import kill_job
+from teuthology.misc import pull_directory
+from teuthology.task.internal import add_remotes
+from teuthology.orchestra import run
log = logging.getLogger(__name__)
if p.returncode != 0:
log.error('Child exited with code %d', p.returncode)
+ return
else:
log.info('Success!')
if 'targets' in job_config:
serializer = report.ResultsSerializer(teuth_config.archive_base)
job_info = serializer.job_info(job_config['name'], job_config['job_id'])
job_status = get_status(job_info)
- if job_status == 'pass' or job_config.get('unlock_on_failure', False):
+ if job_status == 'pass' or \
+ (job_config.get('unlock_on_failure', False) and not job_config.get('nuke-on-error', False)):
log.info('Unlocking machines...')
fake_ctx = create_fake_context(job_config)
for machine in job_info['targets'].keys():
teuthology.lock.ops.unlock_one(fake_ctx, machine, job_info['owner'],
job_info['archive_path'])
if job_status != 'pass' and job_config.get('nuke-on-error', False):
+ log.info('Nuking machines...')
fake_ctx = create_fake_context(job_config)
nuke(fake_ctx, True)
+def run_with_watchdog(process, job_config):
+ job_start_time = datetime.utcnow()
+
+ # Only push the information that's relevant to the watchdog, to save db
+ # load
+ job_info = dict(
+ name=job_config['name'],
+ job_id=job_config['job_id'],
+ )
+
+ # Sleep once outside of the loop to avoid double-posting jobs
+ time.sleep(teuth_config.watchdog_interval)
+ while process.poll() is None:
+ # Kill jobs that have been running longer than the global max
+ run_time = datetime.utcnow() - job_start_time
+ total_seconds = run_time.days * 60 * 60 * 24 + run_time.seconds
+ if total_seconds > teuth_config.max_job_time:
+ log.warning("Job ran longer than {max}s. Killing...".format(
+ max=teuth_config.max_job_time))
+ transfer_archives(job_info['name'], job_info['job_id'],
+ teuth_config.archive_base, job_config)
+ kill_job(job_info['name'], job_info['job_id'],
+ teuth_config.archive_base, job_config['owner'])
+
+ # calling this without a status just updates the jobs updated time
+ report.try_push_job_info(job_info)
+ time.sleep(teuth_config.watchdog_interval)
+
+ # we no longer support testing theses old branches
+ assert(job_config.get('teuthology_branch') not in ('argonaut', 'bobtail',
+ 'cuttlefish', 'dumpling'))
+
+ # Let's make sure that paddles knows the job is finished. We don't know
+ # the status, but if it was a pass or fail it will have already been
+ # reported to paddles. In that case paddles ignores the 'dead' status.
+ # If the job was killed, paddles will use the 'dead' status.
+ report.try_push_job_info(job_info, dict(status='dead'))
+
+
def create_fake_context(job_config, block=False):
if job_config['owner'] is None:
job_config['owner'] = get_user()
fake_ctx = FakeNamespace(ctx_args)
return fake_ctx
+
+
+def transfer_archives(run_name, job_id, archive_base, job_config):
+ serializer = report.ResultsSerializer(archive_base)
+ job_info = serializer.job_info(run_name, job_id, simple=True)
+
+ if 'archive' in job_info:
+ ctx = create_fake_context(job_config)
+ add_remotes(ctx, job_config)
+
+ for log_type, log_path in job_info['archive'].items():
+ if log_type == 'init':
+ log_type = ''
+ compress_logs(ctx, log_path)
+ archive_logs(ctx, log_path, log_type)
+ else:
+ log.info('No archives to transfer.')
+
+
+def archive_logs(ctx, remote_path, log_path):
+ """
+ Archive directories from all nodes in a cliuster. It pulls all files in
+ remote_path dir to job's archive dir under log_path dir.
+ """
+ path = os.path.join(ctx.archive, 'remote')
+ if (not os.path.exists(path)):
+ os.mkdir(path)
+ for remote in ctx.cluster.remotes.keys():
+ sub = os.path.join(path, remote.shortname)
+ if (not os.path.exists(sub)):
+ os.makedirs(sub)
+ try:
+ pull_directory(remote, remote_path, os.path.join(sub, log_path))
+ except ReadError:
+ pass
+
+
+def compress_logs(ctx, remote_dir):
+ """
+ Compress all files in remote_dir from all nodes in a cluster.
+ """
+ log.info('Compressing logs...')
+ run.wait(
+ ctx.cluster.run(
+ args=[
+ 'sudo',
+ 'find',
+ remote_dir,
+ '-name',
+ '*.log',
+ '-print0',
+ run.Raw('|'),
+ 'sudo',
+ 'xargs',
+ '-0',
+ '--no-run-if-empty',
+ '--',
+ 'gzip',
+ '--',
+ ],
+ wait=False,
+ ),
+ )