This commit re-implements functionality that was removed with the nuke system.
Signed-off-by: Zack Cerza <zack@redhat.com>
# modules
beanstalk,
exporter,
- nuke,
report,
repo_utils,
worker,
error_message = "Saw error while trying to spawn supervisor."
log.exception(error_message)
if 'targets' in job_config:
- nuke.nuke(supervisor.create_fake_context(job_config), True)
+ node_names = job_config["targets"].keys()
+ lock_ops.unlock_safe(
+ node_names,
+ job_config["owner"],
+ job_config["name"],
+ job_config["job_id"]
+ )
report.try_push_job_info(job_config, dict(
status='fail',
failure_reason=error_message))
from urllib.parse import urljoin
from datetime import datetime
-from teuthology import exporter, kill, nuke, report, safepath
+from teuthology import exporter, kill, report, safepath
from teuthology.config import config as teuth_config
from teuthology.exceptions import SkipJob, MaxWhileTries
from teuthology import setup_log_file, install_except_hook
from teuthology.misc import get_user, archive_logs, compress_logs
from teuthology.config import FakeNamespace
-from teuthology.job_status import get_status
from teuthology.lock import ops as lock_ops
from teuthology.task import internal
from teuthology.misc import decanonicalize_hostname as shortname
ctx.summary = {
'sentry_event': sentry.report_error(job_config, e, task_name="reimage")
}
- nuke.nuke(ctx, True)
# Machine that fails to reimage after 10 times will be marked down
check_for_reimage_failures_and_mark_down(targets)
raise
serializer = report.ResultsSerializer(teuth_config.archive_base)
job_info = serializer.job_info(job_config['name'], job_config['job_id'])
machine_statuses = query.get_statuses(job_info['targets'].keys())
- # only unlock/nuke targets if locked and description matches
+ # only unlock targets if locked and description matches
locked = []
for status in machine_statuses:
name = shortname(status['name'])
locked.append(name)
if not locked:
return
- job_status = get_status(job_info)
- if job_status == 'pass' or job_config.get('unlock_on_failure', False):
+ if job_config.get("unlock_on_failure", True):
log.info('Unlocking machines...')
- fake_ctx = create_fake_context(job_config)
- for machine in locked:
- lock_ops.unlock_one(
- fake_ctx,
- machine, job_info['owner'],
- job_info['archive_path']
- )
+ lock_ops.unlock_safe(locked, job_info["owner"], job_info["name"], job_info["job_id"])
def run_with_watchdog(process, job_config):
log.warning("Job ran longer than {max}s. Killing...".format(
max=teuth_config.max_job_time))
try:
- # kill processes but do not nuke yet so we can save
+ # kill processes but do not unlock yet so we can save
# the logs, coredumps, etc.
kill.kill_job(
job_info['name'], job_info['job_id'],
teuth_config.archive_base, job_config['owner'],
- skip_nuke=True
+ skip_unlock=True
)
except Exception:
log.exception('Failed to kill job')
'os_type': job_config.get('os_type', 'ubuntu'),
'os_version': os_version,
'name': job_config['name'],
+ 'job_id': job_config['job_id'],
}
return FakeNamespace(ctx_args)
import yaml
import psutil
import subprocess
-import tempfile
import logging
import getpass
+from typing import Union
import teuthology.exporter
from teuthology import beanstalk
from teuthology import report
from teuthology.config import config
-from teuthology import misc
+from teuthology.lock import ops as lock_ops
log = logging.getLogger(__name__)
if kill_processes(run_name, run_info.get('pids')):
return
if owner is not None:
- targets = find_targets(run_name, owner)
- nuke_targets(targets, owner)
+ targets = find_targets(run_name)
+ names = [t["name"] for t in targets]
+ lock_ops.unlock_safe(names, owner, run_name)
report.try_mark_run_dead(run_name)
-def kill_job(run_name, job_id, archive_base=None, owner=None, skip_nuke=False):
+def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False):
serializer = report.ResultsSerializer(archive_base)
job_info = serializer.job_info(run_name, job_id)
if not owner:
owner = job_info['owner']
if kill_processes(run_name, [job_info.get('pid')]):
return
+ report.try_push_job_info(job_info, dict(status="dead"))
if 'machine_type' in job_info:
teuthology.exporter.JobResults.record(
job_info["machine_type"],
)
else:
log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus")
- # Because targets can be missing for some cases, for example, when all
- # the necessary nodes ain't locked yet, we do not use job_info to get them,
- # but use find_targets():
- targets = find_targets(run_name, owner, job_id)
- if not skip_nuke:
- nuke_targets(targets, owner)
+ if not skip_unlock:
+ targets = find_targets(run_name, job_id)
+ lock_ops.unlock_safe(list(targets.keys()), owner, run_name, job_id)
def find_run_info(serializer, run_name):
run_pids.append(pid)
return run_pids
-
-def find_targets(run_name, owner, job_id=None):
- lock_args = [
- 'teuthology-lock',
- '--list-targets',
- '--desc-pattern',
- '/' + run_name + '/' + str(job_id or ''),
- '--status',
- 'up',
- '--owner',
- owner
- ]
- proc = subprocess.Popen(lock_args, stdout=subprocess.PIPE)
- stdout, stderr = proc.communicate()
- out_obj = yaml.safe_load(stdout)
- if not out_obj or 'targets' not in out_obj:
- return {}
-
- return out_obj
-
-
-def nuke_targets(targets_dict, owner):
- targets = targets_dict.get('targets')
- if not targets:
- log.info("No locked machines. Not nuking anything")
- return
-
- to_nuke = []
- for target in targets:
- to_nuke.append(misc.decanonicalize_hostname(target))
-
- target_file = tempfile.NamedTemporaryFile(delete=False, mode='w+t')
- target_file.write(yaml.safe_dump(targets_dict))
- target_file.close()
-
- log.info("Nuking machines: " + str(to_nuke))
- nuke_args = [
- 'teuthology-nuke',
- '-t',
- target_file.name,
- '--owner',
- owner
- ]
- nuke_args.extend(['--reboot-all', '--unlock'])
-
- proc = subprocess.Popen(
- nuke_args,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- for line in proc.stdout:
- line = line.replace(b'\r', b'').replace(b'\n', b'')
- log.info(line.decode())
- sys.stdout.flush()
-
- os.unlink(target_file.name)
+def find_targets(run_name: str, job_id: Union[str, int, None] = None) -> dict:
+ if job_id is not None:
+ job_info = report.ResultsReporter().get_jobs(run_name, str(job_id))
+ return job_info.get("targets", dict())
+ result = dict()
+ run_info = report.ResultsReporter().get_jobs(run_name)
+ for job_info in run_info:
+ if job_info.get("status") not in ("running", "waiting"):
+ continue
+ result.update(job_info.get("targets", dict()))
+ return result
res = ops.unlock_many(machines, user)
return 0 if res else 1
for machine in machines:
- if not ops.unlock_one(ctx, machine, user):
+ if not ops.unlock_one(machine, user):
ret = 1
if not ctx.f:
return ret
if len(result) < ctx.num_to_lock:
log.error("Locking failed.")
for machine in result:
- ops.unlock_one(ctx, machine, user)
+ ops.unlock_one(machine, user)
ret = 1
else:
log.info("Successfully Locked:\n%s\n" % shortnames)
import random
import time
import yaml
-
import requests
+from typing import List, Union
+
import teuthology.orchestra.remote
import teuthology.parallel
import teuthology.provision
-from teuthology import misc, report
+from teuthology import misc, report, provision
from teuthology.config import config
from teuthology.contextutil import safe_while
from teuthology.task import console_log
from teuthology.job_status import set_status
from teuthology.lock import util, query
+from teuthology.orchestra import remote
log = logging.getLogger(__name__)
else:
log.error('Unable to create virtual machine: %s',
machine)
- unlock_one(ctx, machine, user)
+ unlock_one(machine, user)
ok_machs = do_update_keys(list(ok_machs.keys()))[1]
update_nodes(ok_machs)
return ok_machs
return response
+def unlock_safe(names: List[str], owner: str, run_name: str = "", job_id: str = ""):
+ # names = [misc.canonicalize_hostname(name, user=None) for name in names]
+ with teuthology.parallel.parallel() as p:
+ for name in names:
+ p.spawn(unlock_one_safe, name, owner, run_name, job_id)
+ return all(p)
+
+
+def unlock_one_safe(name: str, owner: str, run_name: str = "", job_id: str = "") -> bool:
+ node_status = query.get_status(name)
+ if node_status.get("locked", False) is False:
+ log.warn(f"Refusing to unlock {name} since it is already unlocked")
+ return False
+ maybe_job = query.node_active_job(name, node_status)
+ if not maybe_job:
+ return unlock_one(name, owner, node_status["description"], node_status)
+ if run_name and job_id and maybe_job.endswith(f"{run_name}/{job_id}"):
+ log.error(f"Refusing to unlock {name} since it has an active job: {run_name}/{job_id}")
+ return False
+ log.warning(f"Refusing to unlock {name} since it has an active job: {maybe_job}")
+ return False
+
+
def unlock_many(names, user):
fixed_names = [misc.canonicalize_hostname(name, user=None) for name in
names]
return False
-def unlock_one(ctx, name, user, description=None):
+def unlock_one(name, user, description=None, status: Union[dict, None] = None) -> bool:
name = misc.canonicalize_hostname(name, user=None)
- if not teuthology.provision.destroy_if_vm(ctx, name, user, description):
+ if not description and status:
+ description = status["description"]
+ if not teuthology.provision.destroy_if_vm(name, user, description or ""):
log.error('destroy failed for %s', name)
return False
request = dict(name=name, locked=False, locked_by=user,
response = requests.put(uri, json.dumps(request))
if response.ok:
log.info('unlocked: %s', name)
+ try:
+ stop_node(name, status)
+ except Exception:
+ log.exception(f"Failed to stop {name}!")
return response.ok
if response.status_code == 403:
break
if len(all_locked) == total_requested:
vmlist = []
for lmach in all_locked:
- if teuthology.lock.query.is_vm(lmach):
+ if query.is_vm(lmach):
vmlist.append(lmach)
if vmlist:
log.info('Waiting for virtual machines to come up')
if guest not in keys_dict.keys():
log.info('recreating: ' + guest)
full_name = misc.canonicalize_hostname(guest)
- teuthology.provision.destroy_if_vm(ctx, full_name)
+ teuthology.provision.destroy_if_vm(full_name)
teuthology.provision.create_if_vm(ctx, full_name)
- if teuthology.lock.ops.do_update_keys(keys_dict)[0]:
+ if do_update_keys(keys_dict)[0]:
log.info("Error in virtual machine keys")
newscandict = {}
for dkey in all_locked.keys():
- stats = teuthology.lock.query.get_status(dkey)
+ stats = query.get_status(dkey)
newscandict[dkey] = stats['ssh_pub_key']
ctx.config['targets'] = newscandict
else:
)
log.warning('Could not lock enough machines, waiting...')
time.sleep(10)
+
+
+def stop_node(name: str, status: Union[dict, None]):
+ status = status or query.get_status(name)
+ remote_ = remote.Remote(name)
+ if status['machine_type'] in provision.fog.get_types():
+ remote_.console.power_off()
+ return
+ elif status['machine_type'] in provision.pelagos.get_types():
+ provision.pelagos.park_node(name)
+ return
+ elif remote_.is_container:
+ remote_.run(
+ args=['sudo', '/testnode_stop.sh'],
+ check_status=False,
+ )
+ return
import logging
import os
-
import requests
+from typing import Union
+
from teuthology import misc
from teuthology.config import config
from teuthology.contextutil import safe_while
log = logging.getLogger(__name__)
-def get_status(name):
+def get_status(name) -> dict:
name = misc.canonicalize_hostname(name, user=None)
uri = os.path.join(config.lock_server, 'nodes', name, '')
with safe_while(
response = requests.get(uri)
if response.ok:
return response.json()
+ elif response.status_code == 404:
+ return dict()
log.warning(
"Failed to query lock server for status of {name}".format(name=name))
return dict()
"""
Return a list of node dicts corresponding to nodes that were locked to run
a job, but the job is no longer running. The purpose of this is to enable
- us to nuke nodes that were left locked due to e.g. infrastructure failures
+ us to find nodes that were left locked due to e.g. infrastructure failures
and return them to the pool.
:param owner: If non-None, return nodes locked by owner. Default is None.
nodes = [node for node in nodes if node['locked_by'] == owner]
nodes = filter(might_be_stale, nodes)
- def node_job_is_active(node, cache):
- """
- Is this node's job active (e.g. running or waiting)?
-
- :param node: The node dict as returned from the lock server
- :param cache: A set() used for caching results
- :returns: True or False
- """
- description = node['description']
- if description in cache:
- return True
- (name, job_id) = description.split('/')[-2:]
- url = os.path.join(config.results_server, 'runs', name, 'jobs', job_id,
- '')
- with safe_while(
- sleep=1, increment=0.5, action='node_is_active') as proceed:
- while proceed():
- resp = requests.get(url)
- if resp.ok:
- break
- if not resp.ok:
- return False
- job_info = resp.json()
- if job_info['status'] in ('running', 'waiting'):
- cache.add(description)
- return True
- return False
-
- result = list()
# Here we build the list of of nodes that are locked, for a job (as opposed
# to being locked manually for random monkeying), where the job is not
# running
- active_jobs = set()
+ result = list()
for node in nodes:
- if node_job_is_active(node, active_jobs):
+ if node_active_job(node["name"]):
continue
result.append(node)
return result
+
+def node_active_job(name: str, status: Union[dict, None] = None) -> Union[str, None]:
+ """
+ Is this node's job active (e.g. running or waiting)?
+
+ :param node: The node dict as returned from the lock server
+ :param cache: A set() used for caching results
+ :returns: True or False
+ """
+ status = status or get_status(name)
+ if not status:
+ # This should never happen with a normal node
+ return "node had no status"
+ description = status['description']
+ (run_name, job_id) = description.split('/')[-2:]
+ url = f"{config.results_server}/runs/{run_name}/jobs/{job_id}/"
+ job_status = ""
+ with safe_while(
+ sleep=1, increment=0.5, action='node_is_active') as proceed:
+ while proceed():
+ resp = requests.get(url)
+ if resp.ok:
+ job_status = resp.json()["status"]
+ break
+ if job_status and job_status not in ('pass', 'fail', 'dead'):
+ return description
import logging
+import os
import teuthology.exporter
import teuthology.lock.query
from teuthology.provision import fog
from teuthology.provision import openstack
from teuthology.provision import pelagos
-import os
log = logging.getLogger(__name__)
-def _logfile(ctx, shortname):
- if hasattr(ctx, 'config') and ctx.config.get('archive_path'):
- return os.path.join(ctx.config['archive_path'],
- shortname + '.downburst.log')
+def _logfile(shortname: str, archive_path: str = ""):
+ if os.path.isfile(archive_path):
+ return f"{archive_path}/{shortname}.downburst.log"
def get_reimage_types():
return dbrst.create()
-def destroy_if_vm(ctx, machine_name, user=None, description=None,
- _downburst=None):
+def destroy_if_vm(
+ machine_name: str,
+ user: str = "",
+ description: str = "",
+ _downburst=None
+):
"""
Use downburst to destroy a virtual machine
log.error(msg.format(node=machine_name, as_user=user,
locked_by=status_info['locked_by']))
return False
- if (description is not None and description !=
+ if (description and description !=
status_info['description']):
msg = "Tried to destroy {node} with description {desc_arg} " + \
"but it is locked with description {desc_lock}"
dbrst = _downburst or \
downburst.Downburst(name=machine_name, os_type=None,
os_version=None, status=status_info,
- logfile=_logfile(ctx, shortname))
+ logfile=_logfile(description, shortname))
return dbrst.destroy()
vm_host=dict(name='host999'),
is_vm=True,
machine_type='mtype',
+ locked_by='user@a',
+ description="desc",
)
def test_create_if_vm_success(self):
dbrst.destroy = MagicMock(name='destroy')
dbrst.destroy.return_value = True
- result = provision.destroy_if_vm(ctx, name, _downburst=dbrst)
+ result = provision.destroy_if_vm(name, user="user@a", _downburst=dbrst)
assert result is True
dbrst.destroy.assert_called_with()
name = self.name
ctx = self.ctx
status = self.status
- status['locked_by'] = 'user@a'
dbrst = provision.downburst.Downburst(
name, ctx.os_type, ctx.os_version, status)
dbrst.destroy = MagicMock(name='destroy', side_effect=RuntimeError)
- result = provision.destroy_if_vm(ctx, name, user='user@b',
+ result = provision.destroy_if_vm(name, user='user@b',
_downburst=dbrst)
assert result is False
name = self.name
ctx = self.ctx
status = self.status
- status['description'] = 'desc_a'
dbrst = provision.downburst.Downburst(
name, ctx.os_type, ctx.os_version, status)
dbrst.destroy = MagicMock(name='destroy')
dbrst.destroy = MagicMock(name='destroy', side_effect=RuntimeError)
- result = provision.destroy_if_vm(ctx, name, description='desc_b',
+ result = provision.destroy_if_vm(name, description='desc_b',
_downburst=dbrst)
assert result is False
import teuthology.lock.ops
import teuthology.lock.query
import teuthology.lock.util
-from teuthology.job_status import get_status
log = logging.getLogger(__name__)
try:
yield
finally:
- # If both unlock_on_failure and nuke-on-error are set, don't unlock now
- # because we're just going to nuke (and unlock) later.
- unlock_on_failure = (
- ctx.config.get('unlock_on_failure', False)
- and not ctx.config.get('nuke-on-error', False)
- )
- if get_status(ctx.summary) == 'pass' or unlock_on_failure:
+ if ctx.config.get("unlock_on_failure", True):
log.info('Unlocking machines...')
for machine in ctx.config['targets'].keys():
- teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)
+ teuthology.lock.ops.unlock_one(machine, ctx.owner, ctx.archive)