]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Remove nuke: Rework unlocking
authorZack Cerza <zack@redhat.com>
Thu, 1 Feb 2024 00:51:09 +0000 (17:51 -0700)
committerZack Cerza <zack@redhat.com>
Fri, 2 Feb 2024 18:51:15 +0000 (11:51 -0700)
This commit re-implements functionality that was removed with the nuke system.

Signed-off-by: Zack Cerza <zack@redhat.com>
teuthology/dispatcher/__init__.py
teuthology/dispatcher/supervisor.py
teuthology/kill.py
teuthology/lock/cli.py
teuthology/lock/ops.py
teuthology/lock/query.py
teuthology/provision/__init__.py
teuthology/provision/test/test_downburst.py
teuthology/task/internal/lock_machines.py

index 8a0ee1af1c83cc35425bb527e333e02d2b9cfca8..dee4f55fcadcc4490b4191a7b0247815cc7c2275 100644 (file)
@@ -15,7 +15,6 @@ from teuthology import (
     # modules
     beanstalk,
     exporter,
-    nuke,
     report,
     repo_utils,
     worker,
@@ -191,7 +190,13 @@ def main(args):
             error_message = "Saw error while trying to spawn supervisor."
             log.exception(error_message)
             if 'targets' in job_config:
-                nuke.nuke(supervisor.create_fake_context(job_config), True)
+                node_names = job_config["targets"].keys()
+                lock_ops.unlock_safe(
+                    node_names,
+                    job_config["owner"],
+                    job_config["name"],
+                    job_config["job_id"]
+                )
             report.try_push_job_info(job_config, dict(
                 status='fail',
                 failure_reason=error_message))
index 22304926f88950946fa73d8223d8b6c9cabe5376..5806bb5473718525e43611e3deba39808439066a 100644 (file)
@@ -8,13 +8,12 @@ import requests
 from urllib.parse import urljoin
 from datetime import datetime
 
-from teuthology import exporter, kill, nuke, report, safepath
+from teuthology import exporter, kill, report, safepath
 from teuthology.config import config as teuth_config
 from teuthology.exceptions import SkipJob, MaxWhileTries
 from teuthology import setup_log_file, install_except_hook
 from teuthology.misc import get_user, archive_logs, compress_logs
 from teuthology.config import FakeNamespace
-from teuthology.job_status import get_status
 from teuthology.lock import ops as lock_ops
 from teuthology.task import internal
 from teuthology.misc import decanonicalize_hostname as shortname
@@ -242,7 +241,6 @@ def reimage(job_config):
         ctx.summary = {
             'sentry_event': sentry.report_error(job_config, e, task_name="reimage")
         }
-        nuke.nuke(ctx, True)
         # Machine that fails to reimage after 10 times will be marked down
         check_for_reimage_failures_and_mark_down(targets)
         raise
@@ -255,7 +253,7 @@ def unlock_targets(job_config):
     serializer = report.ResultsSerializer(teuth_config.archive_base)
     job_info = serializer.job_info(job_config['name'], job_config['job_id'])
     machine_statuses = query.get_statuses(job_info['targets'].keys())
-    # only unlock/nuke targets if locked and description matches
+    # only unlock targets if locked and description matches
     locked = []
     for status in machine_statuses:
         name = shortname(status['name'])
@@ -271,16 +269,9 @@ def unlock_targets(job_config):
         locked.append(name)
     if not locked:
         return
-    job_status = get_status(job_info)
-    if job_status == 'pass' or job_config.get('unlock_on_failure', False):
+    if job_config.get("unlock_on_failure", True):
         log.info('Unlocking machines...')
-        fake_ctx = create_fake_context(job_config)
-        for machine in locked:
-            lock_ops.unlock_one(
-                fake_ctx,
-                machine, job_info['owner'],
-                job_info['archive_path']
-            )
+        lock_ops.unlock_safe(locked, job_info["owner"], job_info["name"], job_info["job_id"])
 
 
 def run_with_watchdog(process, job_config):
@@ -305,12 +296,12 @@ def run_with_watchdog(process, job_config):
             log.warning("Job ran longer than {max}s. Killing...".format(
                 max=teuth_config.max_job_time))
             try:
-                # kill processes but do not nuke yet so we can save
+                # kill processes but do not unlock yet so we can save
                 # the logs, coredumps, etc.
                 kill.kill_job(
                     job_info['name'], job_info['job_id'],
                     teuth_config.archive_base, job_config['owner'],
-                    skip_nuke=True
+                    skip_unlock=True
                 )
             except Exception:
                 log.exception('Failed to kill job')
@@ -365,6 +356,7 @@ def create_fake_context(job_config, block=False):
         'os_type': job_config.get('os_type', 'ubuntu'),
         'os_version': os_version,
         'name': job_config['name'],
+        'job_id': job_config['job_id'],
     }
 
     return FakeNamespace(ctx_args)
index 8e14109faa6da64a1a73f5e6a8a09e81e49a90a0..2e8560c755ab2dd41889aa92e227fbf00cb9c5eb 100755 (executable)
@@ -4,17 +4,17 @@ import sys
 import yaml
 import psutil
 import subprocess
-import tempfile
 import logging
 import getpass
 
+from typing import Union
 
 import teuthology.exporter
 
 from teuthology import beanstalk
 from teuthology import report
 from teuthology.config import config
-from teuthology import misc
+from teuthology.lock import ops as lock_ops
 
 log = logging.getLogger(__name__)
 
@@ -73,12 +73,13 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None,
     if kill_processes(run_name, run_info.get('pids')):
         return
     if owner is not None:
-        targets = find_targets(run_name, owner)
-        nuke_targets(targets, owner)
+        targets = find_targets(run_name)
+        names = [t["name"] for t in targets]
+        lock_ops.unlock_safe(names, owner, run_name)
     report.try_mark_run_dead(run_name)
 
 
-def kill_job(run_name, job_id, archive_base=None, owner=None, skip_nuke=False):
+def kill_job(run_name, job_id, archive_base=None, owner=None, skip_unlock=False):
     serializer = report.ResultsSerializer(archive_base)
     job_info = serializer.job_info(run_name, job_id)
     if not owner:
@@ -89,6 +90,7 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_nuke=False):
         owner = job_info['owner']
     if kill_processes(run_name, [job_info.get('pid')]):
         return
+    report.try_push_job_info(job_info, dict(status="dead"))
     if 'machine_type' in job_info:
         teuthology.exporter.JobResults.record(
             job_info["machine_type"],
@@ -96,12 +98,9 @@ def kill_job(run_name, job_id, archive_base=None, owner=None, skip_nuke=False):
         )
     else:
         log.warn(f"Job {job_id} has no machine_type; cannot report via Prometheus")
-    # Because targets can be missing for some cases, for example, when all
-    # the necessary nodes ain't locked yet, we do not use job_info to get them,
-    # but use find_targets():
-    targets = find_targets(run_name, owner, job_id)
-    if not skip_nuke:
-        nuke_targets(targets, owner)
+    if not skip_unlock:
+        targets = find_targets(run_name, job_id)
+        lock_ops.unlock_safe(list(targets.keys()), owner, run_name, job_id)
 
 
 def find_run_info(serializer, run_name):
@@ -231,58 +230,14 @@ def find_pids(run_name):
             run_pids.append(pid)
     return run_pids
 
-
-def find_targets(run_name, owner, job_id=None):
-    lock_args = [
-        'teuthology-lock',
-        '--list-targets',
-        '--desc-pattern',
-        '/' + run_name + '/' + str(job_id or ''),
-        '--status',
-        'up',
-        '--owner',
-        owner
-    ]
-    proc = subprocess.Popen(lock_args, stdout=subprocess.PIPE)
-    stdout, stderr = proc.communicate()
-    out_obj = yaml.safe_load(stdout)
-    if not out_obj or 'targets' not in out_obj:
-        return {}
-
-    return out_obj
-
-
-def nuke_targets(targets_dict, owner):
-    targets = targets_dict.get('targets')
-    if not targets:
-        log.info("No locked machines. Not nuking anything")
-        return
-
-    to_nuke = []
-    for target in targets:
-        to_nuke.append(misc.decanonicalize_hostname(target))
-
-    target_file = tempfile.NamedTemporaryFile(delete=False, mode='w+t')
-    target_file.write(yaml.safe_dump(targets_dict))
-    target_file.close()
-
-    log.info("Nuking machines: " + str(to_nuke))
-    nuke_args = [
-        'teuthology-nuke',
-        '-t',
-        target_file.name,
-        '--owner',
-        owner
-    ]
-    nuke_args.extend(['--reboot-all', '--unlock'])
-
-    proc = subprocess.Popen(
-        nuke_args,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.STDOUT)
-    for line in proc.stdout:
-        line = line.replace(b'\r', b'').replace(b'\n', b'')
-        log.info(line.decode())
-        sys.stdout.flush()
-
-    os.unlink(target_file.name)
+def find_targets(run_name: str, job_id: Union[str, int, None] = None) -> dict:
+    if job_id is not None:
+        job_info = report.ResultsReporter().get_jobs(run_name, str(job_id))
+        return job_info.get("targets", dict())
+    result = dict()
+    run_info = report.ResultsReporter().get_jobs(run_name)
+    for job_info in run_info:
+        if job_info.get("status") not in ("running", "waiting"):
+            continue
+        result.update(job_info.get("targets", dict()))
+    return result
index 9cc8210a22d20ccd964671305fee3a41f0568669..c562a3d512e071bcef8d3d77c599c3b30fc7560b 100644 (file)
@@ -200,7 +200,7 @@ def main(ctx):
             res = ops.unlock_many(machines, user)
             return 0 if res else 1
         for machine in machines:
-            if not ops.unlock_one(ctx, machine, user):
+            if not ops.unlock_one(machine, user):
                 ret = 1
                 if not ctx.f:
                     return ret
@@ -221,7 +221,7 @@ def main(ctx):
                 if len(result) < ctx.num_to_lock:
                     log.error("Locking failed.")
                     for machine in result:
-                        ops.unlock_one(ctx, machine, user)
+                        ops.unlock_one(machine, user)
                     ret = 1
                 else:
                     log.info("Successfully Locked:\n%s\n" % shortnames)
index 2d24b68e62876879a4c38ce757fd4acb80fb2360..9ea3e786561a1b623f0f769a34e3a8d7076d4cb6 100644 (file)
@@ -4,14 +4,15 @@ import os
 import random
 import time
 import yaml
-
 import requests
 
+from typing import List, Union
+
 import teuthology.orchestra.remote
 import teuthology.parallel
 import teuthology.provision
 
-from teuthology import misc, report
+from teuthology import misc, report, provision
 from teuthology.config import config
 from teuthology.contextutil import safe_while
 from teuthology.task import console_log
@@ -19,6 +20,7 @@ from teuthology.misc import canonicalize_hostname
 from teuthology.job_status import set_status
 
 from teuthology.lock import util, query
+from teuthology.orchestra import remote
 
 log = logging.getLogger(__name__)
 
@@ -134,7 +136,7 @@ def lock_many(ctx, num, machine_type, user=None, description=None,
                     else:
                         log.error('Unable to create virtual machine: %s',
                                   machine)
-                        unlock_one(ctx, machine, user)
+                        unlock_one(machine, user)
                     ok_machs = do_update_keys(list(ok_machs.keys()))[1]
                 update_nodes(ok_machs)
                 return ok_machs
@@ -172,6 +174,29 @@ def lock_one(name, user=None, description=None):
     return response
 
 
+def unlock_safe(names: List[str], owner: str, run_name: str = "", job_id: str = ""):
+    # names = [misc.canonicalize_hostname(name, user=None) for name in names]
+    with teuthology.parallel.parallel() as p:
+        for name in names:
+            p.spawn(unlock_one_safe, name, owner, run_name, job_id)
+        return all(p)
+
+
+def unlock_one_safe(name: str, owner: str, run_name: str = "", job_id: str = "") -> bool:
+    node_status = query.get_status(name)
+    if node_status.get("locked", False) is False:
+        log.warn(f"Refusing to unlock {name} since it is already unlocked")
+        return False
+    maybe_job = query.node_active_job(name, node_status)
+    if not maybe_job:
+        return unlock_one(name, owner, node_status["description"], node_status)
+    if run_name and job_id and maybe_job.endswith(f"{run_name}/{job_id}"):
+            log.error(f"Refusing to unlock {name} since it has an active job: {run_name}/{job_id}")
+            return False
+    log.warning(f"Refusing to unlock {name} since it has an active job: {maybe_job}")
+    return False
+
+
 def unlock_many(names, user):
     fixed_names = [misc.canonicalize_hostname(name, user=None) for name in
                    names]
@@ -196,9 +221,11 @@ def unlock_many(names, user):
     return False
 
 
-def unlock_one(ctx, name, user, description=None):
+def unlock_one(name, user, description=None, status: Union[dict, None] = None) -> bool:
     name = misc.canonicalize_hostname(name, user=None)
-    if not teuthology.provision.destroy_if_vm(ctx, name, user, description):
+    if not description and status:
+        description = status["description"]
+    if not teuthology.provision.destroy_if_vm(name, user, description or ""):
         log.error('destroy failed for %s', name)
         return False
     request = dict(name=name, locked=False, locked_by=user,
@@ -211,6 +238,10 @@ def unlock_one(ctx, name, user, description=None):
                 response = requests.put(uri, json.dumps(request))
                 if response.ok:
                     log.info('unlocked: %s', name)
+                    try:
+                        stop_node(name, status)
+                    except Exception:
+                        log.exception(f"Failed to stop {name}!")
                     return response.ok
                 if response.status_code == 403:
                     break
@@ -402,7 +433,7 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
         if len(all_locked) == total_requested:
             vmlist = []
             for lmach in all_locked:
-                if teuthology.lock.query.is_vm(lmach):
+                if query.is_vm(lmach):
                     vmlist.append(lmach)
             if vmlist:
                 log.info('Waiting for virtual machines to come up')
@@ -421,13 +452,13 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
                             if guest not in keys_dict.keys():
                                 log.info('recreating: ' + guest)
                                 full_name = misc.canonicalize_hostname(guest)
-                                teuthology.provision.destroy_if_vm(ctx, full_name)
+                                teuthology.provision.destroy_if_vm(full_name)
                                 teuthology.provision.create_if_vm(ctx, full_name)
-                if teuthology.lock.ops.do_update_keys(keys_dict)[0]:
+                if do_update_keys(keys_dict)[0]:
                     log.info("Error in virtual machine keys")
                 newscandict = {}
                 for dkey in all_locked.keys():
-                    stats = teuthology.lock.query.get_status(dkey)
+                    stats = query.get_status(dkey)
                     newscandict[dkey] = stats['ssh_pub_key']
                 ctx.config['targets'] = newscandict
             else:
@@ -453,3 +484,20 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
         )
         log.warning('Could not lock enough machines, waiting...')
         time.sleep(10)
+
+
+def stop_node(name: str, status: Union[dict, None]):
+    status = status or query.get_status(name)
+    remote_ = remote.Remote(name)
+    if status['machine_type'] in provision.fog.get_types():
+        remote_.console.power_off()
+        return
+    elif status['machine_type'] in provision.pelagos.get_types():
+        provision.pelagos.park_node(name)
+        return
+    elif remote_.is_container:
+        remote_.run(
+            args=['sudo', '/testnode_stop.sh'],
+            check_status=False,
+        )
+        return
index 9fd09d9abe6efb5da37af1d81e66e12e77c1add8..8e043b4db24d302f56d47dfe97b40e7704ce5538 100644 (file)
@@ -1,8 +1,9 @@
 import logging
 import os
-
 import requests
 
+from typing import Union
+
 from teuthology import misc
 from teuthology.config import config
 from teuthology.contextutil import safe_while
@@ -12,7 +13,7 @@ from teuthology.util.compat import urlencode
 log = logging.getLogger(__name__)
 
 
-def get_status(name):
+def get_status(name) -> dict:
     name = misc.canonicalize_hostname(name, user=None)
     uri = os.path.join(config.lock_server, 'nodes', name, '')
     with safe_while(
@@ -21,6 +22,8 @@ def get_status(name):
             response = requests.get(uri)
             if response.ok:
                 return response.json()
+            elif response.status_code == 404:
+                return dict()
     log.warning(
         "Failed to query lock server for status of {name}".format(name=name))
     return dict()
@@ -88,7 +91,7 @@ def find_stale_locks(owner=None):
     """
     Return a list of node dicts corresponding to nodes that were locked to run
     a job, but the job is no longer running. The purpose of this is to enable
-    us to nuke nodes that were left locked due to e.g. infrastructure failures
+    us to find nodes that were left locked due to e.g. infrastructure failures
     and return them to the pool.
 
     :param owner: If non-None, return nodes locked by owner. Default is None.
@@ -117,41 +120,38 @@ def find_stale_locks(owner=None):
         nodes = [node for node in nodes if node['locked_by'] == owner]
     nodes = filter(might_be_stale, nodes)
 
-    def node_job_is_active(node, cache):
-        """
-        Is this node's job active (e.g. running or waiting)?
-
-        :param node:  The node dict as returned from the lock server
-        :param cache: A set() used for caching results
-        :returns:     True or False
-        """
-        description = node['description']
-        if description in cache:
-            return True
-        (name, job_id) = description.split('/')[-2:]
-        url = os.path.join(config.results_server, 'runs', name, 'jobs', job_id,
-                           '')
-        with safe_while(
-                sleep=1, increment=0.5, action='node_is_active') as proceed:
-            while proceed():
-                resp = requests.get(url)
-                if resp.ok:
-                    break
-        if not resp.ok:
-            return False
-        job_info = resp.json()
-        if job_info['status'] in ('running', 'waiting'):
-            cache.add(description)
-            return True
-        return False
-
-    result = list()
     # Here we build the list of of nodes that are locked, for a job (as opposed
     # to being locked manually for random monkeying), where the job is not
     # running
-    active_jobs = set()
+    result = list()
     for node in nodes:
-        if node_job_is_active(node, active_jobs):
+        if node_active_job(node["name"]):
             continue
         result.append(node)
     return result
+
+def node_active_job(name: str, status: Union[dict, None] = None) -> Union[str, None]:
+    """
+    Is this node's job active (e.g. running or waiting)?
+
+    :param node:  The node dict as returned from the lock server
+    :param cache: A set() used for caching results
+    :returns:     True or False
+    """
+    status = status or get_status(name)
+    if not status:
+        # This should never happen with a normal node
+        return "node had no status"
+    description = status['description']
+    (run_name, job_id) = description.split('/')[-2:]
+    url = f"{config.results_server}/runs/{run_name}/jobs/{job_id}/"
+    job_status = ""
+    with safe_while(
+            sleep=1, increment=0.5, action='node_is_active') as proceed:
+        while proceed():
+            resp = requests.get(url)
+            if resp.ok:
+                job_status = resp.json()["status"]
+                break
+    if job_status and job_status not in ('pass', 'fail', 'dead'):
+        return description
index d60d8acdb43797a3eeee4e09e3062168dad77d9d..5afc5ed7e17dc1a708e3655966ea5456db8c16fc 100644 (file)
@@ -1,4 +1,5 @@
 import logging
+import os
 
 import teuthology.exporter
 import teuthology.lock.query
@@ -9,15 +10,13 @@ from teuthology.provision import downburst
 from teuthology.provision import fog
 from teuthology.provision import openstack
 from teuthology.provision import pelagos
-import os
 
 log = logging.getLogger(__name__)
 
 
-def _logfile(ctx, shortname):
-    if hasattr(ctx, 'config') and ctx.config.get('archive_path'):
-        return os.path.join(ctx.config['archive_path'],
-                            shortname + '.downburst.log')
+def _logfile(shortname: str, archive_path: str = ""):
+    if os.path.isfile(archive_path):
+        return f"{archive_path}/{shortname}.downburst.log"
 
 
 def get_reimage_types():
@@ -95,8 +94,12 @@ def create_if_vm(ctx, machine_name, _downburst=None):
     return dbrst.create()
 
 
-def destroy_if_vm(ctx, machine_name, user=None, description=None,
-                  _downburst=None):
+def destroy_if_vm(
+    machine_name: str,
+    user: str = "",
+    description: str = "",
+    _downburst=None
+):
     """
     Use downburst to destroy a virtual machine
 
@@ -116,7 +119,7 @@ def destroy_if_vm(ctx, machine_name, user=None, description=None,
         log.error(msg.format(node=machine_name, as_user=user,
                              locked_by=status_info['locked_by']))
         return False
-    if (description is not None and description !=
+    if (description and description !=
             status_info['description']):
         msg = "Tried to destroy {node} with description {desc_arg} " + \
             "but it is locked with description {desc_lock}"
@@ -134,5 +137,5 @@ def destroy_if_vm(ctx, machine_name, user=None, description=None,
     dbrst = _downburst or \
         downburst.Downburst(name=machine_name, os_type=None,
                             os_version=None, status=status_info,
-                            logfile=_logfile(ctx, shortname))
+                            logfile=_logfile(description, shortname))
     return dbrst.destroy()
index 71ecfaff41b7c716ee5a4696c62921e09295cbca..4ba7d9a8ae2d6e3dc9e2e12ed5b6c526b761f471 100644 (file)
@@ -14,6 +14,8 @@ class TestDownburst(object):
             vm_host=dict(name='host999'),
             is_vm=True,
             machine_type='mtype',
+            locked_by='user@a',
+            description="desc",
         )
 
     def test_create_if_vm_success(self):
@@ -48,7 +50,7 @@ class TestDownburst(object):
         dbrst.destroy = MagicMock(name='destroy')
         dbrst.destroy.return_value = True
 
-        result = provision.destroy_if_vm(ctx, name, _downburst=dbrst)
+        result = provision.destroy_if_vm(name, user="user@a", _downburst=dbrst)
         assert result is True
 
         dbrst.destroy.assert_called_with()
@@ -57,13 +59,12 @@ class TestDownburst(object):
         name = self.name
         ctx = self.ctx
         status = self.status
-        status['locked_by'] = 'user@a'
 
         dbrst = provision.downburst.Downburst(
             name, ctx.os_type, ctx.os_version, status)
         dbrst.destroy = MagicMock(name='destroy', side_effect=RuntimeError)
 
-        result = provision.destroy_if_vm(ctx, name, user='user@b',
+        result = provision.destroy_if_vm(name, user='user@b',
                                          _downburst=dbrst)
         assert result is False
 
@@ -71,14 +72,13 @@ class TestDownburst(object):
         name = self.name
         ctx = self.ctx
         status = self.status
-        status['description'] = 'desc_a'
 
         dbrst = provision.downburst.Downburst(
             name, ctx.os_type, ctx.os_version, status)
         dbrst.destroy = MagicMock(name='destroy')
         dbrst.destroy = MagicMock(name='destroy', side_effect=RuntimeError)
 
-        result = provision.destroy_if_vm(ctx, name, description='desc_b',
+        result = provision.destroy_if_vm(name, description='desc_b',
                                          _downburst=dbrst)
         assert result is False
 
index d7598719ebde71f921605e425667f7e4416ca241..fdbfcc225102632277a4321f75edf513266c4511 100644 (file)
@@ -4,7 +4,6 @@ import logging
 import teuthology.lock.ops
 import teuthology.lock.query
 import teuthology.lock.util
-from teuthology.job_status import get_status
 
 log = logging.getLogger(__name__)
 
@@ -24,13 +23,7 @@ def lock_machines(ctx, config):
     try:
         yield
     finally:
-        # If both unlock_on_failure and nuke-on-error are set, don't unlock now
-        # because we're just going to nuke (and unlock) later.
-        unlock_on_failure = (
-                ctx.config.get('unlock_on_failure', False)
-                and not ctx.config.get('nuke-on-error', False)
-            )
-        if get_status(ctx.summary) == 'pass' or unlock_on_failure:
+        if ctx.config.get("unlock_on_failure", True):
             log.info('Unlocking machines...')
             for machine in ctx.config['targets'].keys():
-                teuthology.lock.ops.unlock_one(ctx, machine, ctx.owner, ctx.archive)
+                teuthology.lock.ops.unlock_one(machine, ctx.owner, ctx.archive)