From a87013a6f3def23e8069de190dec1a1f31459687 Mon Sep 17 00:00:00 2001 From: John Mulligan Date: Fri, 9 Aug 2024 14:11:21 -0400 Subject: [PATCH] xxx: checkpoint: sqlite machine pool Signed-off-by: John Mulligan --- teuthology/lock/lock_server_pool.py | 46 ++++++++ teuthology/lock/machines.py | 19 ++++ teuthology/lock/ops.py | 22 +++- teuthology/lock/sqlite_pool.py | 161 ++++++++++++++++++++++++++++ teuthology/orchestra/remote.py | 12 +-- 5 files changed, 249 insertions(+), 11 deletions(-) create mode 100644 teuthology/lock/lock_server_pool.py create mode 100644 teuthology/lock/machines.py create mode 100644 teuthology/lock/sqlite_pool.py diff --git a/teuthology/lock/lock_server_pool.py b/teuthology/lock/lock_server_pool.py new file mode 100644 index 0000000000..1a5c957e59 --- /dev/null +++ b/teuthology/lock/lock_server_pool.py @@ -0,0 +1,46 @@ + +class LockServerMachinePool: + def list_locks( + self, + ctx, + machine_type, + up, + locked, + count, + tries=None, + ): + machines = query.list_locks( + machine_type=machine_type, + up=up, + locked=locked, + count=count, + tries=tries, + ) + return machines + + def acquire( + self, + ctx, + num, + machine_type, + user=None, + description=None, + os_type=None, + os_version=None, + arch=None, + reimage=True, + ): + from teuthology.lock.ops import lock_many + + newly_locked = lock_many( + ctx, + requested, + machine_type, + ctx.owner, + ctx.archive, + os_type, + os_version, + arch, + reimage=reimage, + ) + return newly_locked diff --git a/teuthology/lock/machines.py b/teuthology/lock/machines.py new file mode 100644 index 0000000000..07479d9b6a --- /dev/null +++ b/teuthology/lock/machines.py @@ -0,0 +1,19 @@ +import logging + +from teuthology.config import config + +from teuthology.lock.lock_server_pool import LockServerMachinePool +from teuthology.lock.sqlite_pool import SqliteMachinePool + + +log = logging.getLogger(__name__) + + +def pool(): + if not config.machine_pool or config.machine_pool.startswith('lock_server'): + log.info("Using lock server machine pool") + mpool = LockServerMachinePool() + elif config.machine_pool.startswith('sqlite:'): + log.info("Using sqlite machine pool @ %s", config.machine_pool) + mpool = SqliteMachinePool(config.machine_pool) + return mpool diff --git a/teuthology/lock/ops.py b/teuthology/lock/ops.py index fbb7fb6fbb..05fd22499c 100644 --- a/teuthology/lock/ops.py +++ b/teuthology/lock/ops.py @@ -5,6 +5,7 @@ import random import time import yaml import requests +import sqlite3 from typing import List, Union @@ -20,6 +21,7 @@ from teuthology.misc import canonicalize_hostname from teuthology.job_status import set_status from teuthology.lock import util, query +from teuthology.lock.machines import pool as machine_pool from teuthology.orchestra import remote log = logging.getLogger(__name__) @@ -377,11 +379,13 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr # change the status during the locking process report.try_push_job_info(ctx.config, dict(status='waiting')) + mpool = machine_pool() all_locked = dict() requested = total_requested while True: # get a candidate list of machines - machines = query.list_locks( + machines = mpool.list_locks( + ctx, machine_type=machine_type, up=True, locked=False, @@ -414,9 +418,17 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr (reserved, requested, len(machines))) try: - newly_locked = lock_many(ctx, requested, machine_type, - ctx.owner, ctx.archive, os_type, - os_version, arch, reimage=reimage) + newly_locked = mpool.acquire( + ctx, + requested, + machine_type, + ctx.owner, + ctx.archive, + os_type, + os_version, + arch, + reimage=reimage, + ) except Exception: # Lock failures should map to the 'dead' status instead of 'fail' if 'summary' in ctx: @@ -435,7 +447,7 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr if len(all_locked) == total_requested: vmlist = [] for lmach in all_locked: - if query.is_vm(lmach): + if mpool.is_vm(lmach): vmlist.append(lmach) if vmlist: log.info('Waiting for virtual machines to come up') diff --git a/teuthology/lock/sqlite_pool.py b/teuthology/lock/sqlite_pool.py new file mode 100644 index 0000000000..f647dfd3f6 --- /dev/null +++ b/teuthology/lock/sqlite_pool.py @@ -0,0 +1,161 @@ +import logging +import sqlite3 + + +log = logging.getLogger(__name__) + + +class SqliteMachinePool: + def __init__( + self, path: str, + ) -> None: + self._path = path + self._connect() + self._create_tables() + + def _select_machines(self, ): + with self._conn: + cur = self._conn.cursor() + cur.execute( + "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1" + ) + rows = [(jid, data) for jid, data in cur.fetchall()] + if rows: + assert len(rows) == 1 + return rows[0] + return None + + def _delete(self, jid: int) -> None: + with self._conn: + self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,)) + + def _create_tables(self) -> None: + try: + with self._conn: + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS machines ( + name TEXT, + mtype TEXT, + up INTEGER, + in_use INTEGER, + info JSON + ) + """) + except sqlite3.OperationalError: + pass + + def _select(self, machine_type, up, locked, count): + query = "SELECT name, mtype, up, in_use, info FROM machines" + where = [] + params = [] + if machine_type is not None: + where.append('mtype=?') + params.append(machine_type) + if up is not None: + where.append('up=?') + params.append(1 if up else 0) + if locked is not None: + where.append('in_use=?') + params.append(1 if locked else 0) + if where: + query += ' WHERE ' + (' AND '.join(where)) + if count is not None: + query += ' LIMIT '+ str(int(count)) + + with self._conn: + cur = self._conn.cursor() + cur.execute(query, tuple(params)) + rows = cur.fetchall() + log.info("Rows: %r", rows) + return rows + + def add_machine(self, name, machine_type, info): + with self._conn: + cur = self._conn.cursor() + cur.execute("INSERT INTO machines VALUES (?,?, 1, 0, ?)", (name, machine_type,info)) + cur.close() + + def remove_machine(self, name): + with self._conn: + cur = self._conn.cursor() + cur.execute("DELETE FROM machines WHERE name=?", (name,)) + cur.close() + + def _take(self, machine_type, count): + count = int(count) + query = "UPDATE machines SET in_use=1 WHERE rowid IN (SELECT rowid FROM machines WHERE in_use=0 AND mtype=? LIMIT ?)" + with self._conn: + cur = self._conn.cursor() + cur.execute(query, (machine_type, count)) + cur.close() + + def _connect(self) -> None: + path = self._path + if path.startswith('sqlite://'): + path = path[9:] + if path.startswith('sqlite:'): + path = path[7:] + log.warning("P:%s", path) + self._conn = sqlite3.connect(path) + + def everything(self): + return self._select(None, None, None, None) + + def list_locks( + self, + ctx, + machine_type, + up, + locked, + count, + tries=None, + ): + return {v[0]: None for v in self._select(machine_type, up, locked, count)} + + def acquire( + self, + ctx, + num, + machine_type, + user=None, + description=None, + os_type=None, + os_version=None, + arch=None, + reimage=True, + ): + self._take(machine_type, num) + return {v[0]: None for v in self._select(machine_type, True, True, num)} + + def is_vm(self, name): + return False + + +def main(): + from teuthology.config import config + import argparse + import sys + import yaml + + parser = argparse.ArgumentParser() + parser.add_argument('--list', action='store_true') + parser.add_argument('--add', action='append') + parser.add_argument('--rm', action='append') + parser.add_argument('--acquire', type=int) + parser.add_argument('--machine-type') + parser.add_argument('--info') + cli = parser.parse_args() + + mpool = SqliteMachinePool(config.machine_pool) + for name in cli.rm or []: + mpool.remove_machine(name) + for name in cli.add or []: + mpool.add_machine(name, cli.machine_type, cli.info) + if cli.acquire: + mpool.acquire(None, cli.acquire, cli.machine_type) + if cli.list: + yaml.safe_dump(mpool.everything(), sys.stdout) + + +if __name__ == '__main__': + main() diff --git a/teuthology/orchestra/remote.py b/teuthology/orchestra/remote.py index ce77a519cf..018ad99f47 100644 --- a/teuthology/orchestra/remote.py +++ b/teuthology/orchestra/remote.py @@ -2,8 +2,7 @@ Support for paramiko remote objects. """ -import teuthology.lock.query -import teuthology.lock.util +import teuthology.lock.machines from teuthology.contextutil import safe_while from teuthology.orchestra import run from teuthology.orchestra import connection @@ -153,7 +152,7 @@ class RemoteShell(object): if self.os.package_type != 'rpm' or \ self.os.name in ['opensuse', 'sle']: return - if teuthology.lock.query.is_vm(self.shortname): + if teuthology.lock.machines.pool().is_vm(self.shortname): return self.run(args="sudo chcon {con} {path}".format( con=context, path=file_path)) @@ -458,7 +457,7 @@ class Remote(RemoteShell): @property def machine_type(self): if not getattr(self, '_machine_type', None): - remote_info = teuthology.lock.query.get_status(self.hostname) + remote_info = teuthology.lock.machines.pool().get_status(self.hostname) if not remote_info: return None self._machine_type = remote_info.get("machine_type", None) @@ -706,7 +705,7 @@ class Remote(RemoteShell): @property def is_vm(self): if not hasattr(self, '_is_vm'): - self._is_vm = teuthology.lock.query.is_vm(self.name) + self._is_vm = teuthology.lock.machines.pool().is_vm(self.name) return self._is_vm @property @@ -745,7 +744,8 @@ def getRemoteConsole(name, ipmiuser=None, ipmipass=None, ipmidomain=None, """ Return either VirtualConsole or PhysicalConsole depending on name. """ - if teuthology.lock.query.is_vm(name): + log.info('getRemoteConsole: %s', name) + if teuthology.lock.machines.pool().is_vm(name): try: return console.VirtualConsole(name) except Exception: -- 2.39.5