]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
xxx: checkpoint: sqlite machine pool
authorJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 18:11:21 +0000 (14:11 -0400)
committerJohn Mulligan <phlogistonjohn@asynchrono.us>
Fri, 9 Aug 2024 18:11:21 +0000 (14:11 -0400)
Signed-off-by: John Mulligan <phlogistonjohn@asynchrono.us>
teuthology/lock/lock_server_pool.py [new file with mode: 0644]
teuthology/lock/machines.py [new file with mode: 0644]
teuthology/lock/ops.py
teuthology/lock/sqlite_pool.py [new file with mode: 0644]
teuthology/orchestra/remote.py

diff --git a/teuthology/lock/lock_server_pool.py b/teuthology/lock/lock_server_pool.py
new file mode 100644 (file)
index 0000000..1a5c957
--- /dev/null
@@ -0,0 +1,46 @@
+
+class LockServerMachinePool:
+    def list_locks(
+        self,
+        ctx,
+        machine_type,
+        up,
+        locked,
+        count,
+        tries=None,
+    ):
+        machines = query.list_locks(
+            machine_type=machine_type,
+            up=up,
+            locked=locked,
+            count=count,
+            tries=tries,
+        )
+        return machines
+
+    def acquire(
+        self,
+        ctx,
+        num,
+        machine_type,
+        user=None,
+        description=None,
+        os_type=None,
+        os_version=None,
+        arch=None,
+        reimage=True,
+    ):
+        from teuthology.lock.ops import lock_many
+
+        newly_locked = lock_many(
+            ctx,
+            requested,
+            machine_type,
+            ctx.owner,
+            ctx.archive,
+            os_type,
+            os_version,
+            arch,
+            reimage=reimage,
+        )
+        return newly_locked
diff --git a/teuthology/lock/machines.py b/teuthology/lock/machines.py
new file mode 100644 (file)
index 0000000..07479d9
--- /dev/null
@@ -0,0 +1,19 @@
+import logging
+
+from teuthology.config import config
+
+from teuthology.lock.lock_server_pool import LockServerMachinePool
+from teuthology.lock.sqlite_pool import SqliteMachinePool
+
+
+log = logging.getLogger(__name__)
+
+
+def pool():
+    if not config.machine_pool or config.machine_pool.startswith('lock_server'):
+        log.info("Using lock server machine pool")
+        mpool = LockServerMachinePool()
+    elif config.machine_pool.startswith('sqlite:'):
+        log.info("Using sqlite machine pool @ %s", config.machine_pool)
+        mpool = SqliteMachinePool(config.machine_pool)
+    return mpool
index fbb7fb6fbbfbce457cc158be4d18cd529013ad95..05fd22499c7de4474016828957f925b6afd4a4fc 100644 (file)
@@ -5,6 +5,7 @@ import random
 import time
 import yaml
 import requests
+import sqlite3
 
 from typing import List, Union
 
@@ -20,6 +21,7 @@ from teuthology.misc import canonicalize_hostname
 from teuthology.job_status import set_status
 
 from teuthology.lock import util, query
+from teuthology.lock.machines import pool as machine_pool
 from teuthology.orchestra import remote
 
 log = logging.getLogger(__name__)
@@ -377,11 +379,13 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
     # change the status during the locking process
     report.try_push_job_info(ctx.config, dict(status='waiting'))
 
+    mpool = machine_pool()
     all_locked = dict()
     requested = total_requested
     while True:
         # get a candidate list of machines
-        machines = query.list_locks(
+        machines = mpool.list_locks(
+            ctx,
             machine_type=machine_type,
             up=True,
             locked=False,
@@ -414,9 +418,17 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
                            (reserved, requested, len(machines)))
 
         try:
-            newly_locked = lock_many(ctx, requested, machine_type,
-                                     ctx.owner, ctx.archive, os_type,
-                                     os_version, arch, reimage=reimage)
+            newly_locked = mpool.acquire(
+                ctx,
+                requested,
+                machine_type,
+                ctx.owner,
+                ctx.archive,
+                os_type,
+                os_version,
+                arch,
+                reimage=reimage,
+            )
         except Exception:
             # Lock failures should map to the 'dead' status instead of 'fail'
             if 'summary' in ctx:
@@ -435,7 +447,7 @@ def block_and_lock_machines(ctx, total_requested, machine_type, reimage=True, tr
         if len(all_locked) == total_requested:
             vmlist = []
             for lmach in all_locked:
-                if query.is_vm(lmach):
+                if mpool.is_vm(lmach):
                     vmlist.append(lmach)
             if vmlist:
                 log.info('Waiting for virtual machines to come up')
diff --git a/teuthology/lock/sqlite_pool.py b/teuthology/lock/sqlite_pool.py
new file mode 100644 (file)
index 0000000..f647dfd
--- /dev/null
@@ -0,0 +1,161 @@
+import logging
+import sqlite3
+
+
+log = logging.getLogger(__name__)
+
+
+class SqliteMachinePool:
+    def __init__(
+        self, path: str,
+    ) -> None:
+        self._path = path
+        self._connect()
+        self._create_tables()
+
+    def _select_machines(self, ):
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute(
+                "SELECT rowid,jobdesc FROM jobs ORDER BY rowid LIMIT 1"
+            )
+            rows = [(jid, data) for jid, data in cur.fetchall()]
+        if rows:
+            assert len(rows) == 1
+            return rows[0]
+        return None
+
+    def _delete(self, jid: int) -> None:
+        with self._conn:
+            self._conn.execute("DELETE FROM jobs WHERE rowid=?", (jid,))
+
+    def _create_tables(self) -> None:
+        try:
+            with self._conn:
+                self._conn.execute("""
+                    CREATE TABLE IF NOT EXISTS machines (
+                        name TEXT,
+                        mtype TEXT,
+                        up INTEGER,
+                        in_use INTEGER,
+                        info JSON
+                    )
+                """)
+        except sqlite3.OperationalError:
+            pass
+
+    def _select(self, machine_type, up, locked, count):
+        query = "SELECT name, mtype, up, in_use, info FROM machines"
+        where = []
+        params = []
+        if machine_type is not None:
+            where.append('mtype=?')
+            params.append(machine_type)
+        if up is not None:
+            where.append('up=?')
+            params.append(1 if up else 0)
+        if locked is not None:
+            where.append('in_use=?')
+            params.append(1 if locked else 0)
+        if where:
+            query += ' WHERE ' + (' AND '.join(where))
+        if count is not None:
+            query += ' LIMIT '+ str(int(count))
+
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute(query, tuple(params))
+            rows = cur.fetchall()
+            log.info("Rows: %r", rows)
+        return rows
+
+    def add_machine(self, name, machine_type, info):
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute("INSERT INTO machines VALUES (?,?, 1, 0, ?)", (name, machine_type,info))
+            cur.close()
+
+    def remove_machine(self, name):
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute("DELETE FROM machines WHERE name=?", (name,))
+            cur.close()
+
+    def _take(self, machine_type, count):
+        count = int(count)
+        query = "UPDATE machines SET in_use=1 WHERE rowid IN (SELECT rowid FROM machines WHERE in_use=0 AND mtype=? LIMIT ?)"
+        with self._conn:
+            cur = self._conn.cursor()
+            cur.execute(query, (machine_type, count))
+            cur.close()
+
+    def _connect(self) -> None:
+        path = self._path
+        if path.startswith('sqlite://'):
+            path = path[9:]
+        if path.startswith('sqlite:'):
+            path = path[7:]
+        log.warning("P:%s", path)
+        self._conn = sqlite3.connect(path)
+
+    def everything(self):
+        return self._select(None, None, None, None)
+
+    def list_locks(
+        self,
+        ctx,
+        machine_type,
+        up,
+        locked,
+        count,
+        tries=None,
+    ):
+        return {v[0]: None for v in self._select(machine_type, up, locked, count)}
+
+    def acquire(
+        self,
+        ctx,
+        num,
+        machine_type,
+        user=None,
+        description=None,
+        os_type=None,
+        os_version=None,
+        arch=None,
+        reimage=True,
+    ):
+        self._take(machine_type, num)
+        return {v[0]: None for v in self._select(machine_type, True, True, num)}
+
+    def is_vm(self, name):
+        return False
+
+
+def main():
+    from teuthology.config import config
+    import argparse
+    import sys
+    import yaml
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--list', action='store_true')
+    parser.add_argument('--add', action='append')
+    parser.add_argument('--rm', action='append')
+    parser.add_argument('--acquire', type=int)
+    parser.add_argument('--machine-type')
+    parser.add_argument('--info')
+    cli = parser.parse_args()
+
+    mpool = SqliteMachinePool(config.machine_pool)
+    for name in cli.rm or []:
+        mpool.remove_machine(name)
+    for name in cli.add or []:
+        mpool.add_machine(name, cli.machine_type, cli.info)
+    if cli.acquire:
+        mpool.acquire(None, cli.acquire, cli.machine_type)
+    if cli.list:
+        yaml.safe_dump(mpool.everything(), sys.stdout)
+
+
+if __name__ == '__main__':
+    main()
index ce77a519cf36189c16a0e15c151b8e3c19467a24..018ad99f477f966cbde93981e0e301e6132af29c 100644 (file)
@@ -2,8 +2,7 @@
 Support for paramiko remote objects.
 """
 
-import teuthology.lock.query
-import teuthology.lock.util
+import teuthology.lock.machines
 from teuthology.contextutil import safe_while
 from teuthology.orchestra import run
 from teuthology.orchestra import connection
@@ -153,7 +152,7 @@ class RemoteShell(object):
         if self.os.package_type != 'rpm' or \
                 self.os.name in ['opensuse', 'sle']:
             return
-        if teuthology.lock.query.is_vm(self.shortname):
+        if teuthology.lock.machines.pool().is_vm(self.shortname):
             return
         self.run(args="sudo chcon {con} {path}".format(
             con=context, path=file_path))
@@ -458,7 +457,7 @@ class Remote(RemoteShell):
     @property
     def machine_type(self):
         if not getattr(self, '_machine_type', None):
-            remote_info = teuthology.lock.query.get_status(self.hostname)
+            remote_info = teuthology.lock.machines.pool().get_status(self.hostname)
             if not remote_info:
                 return None
             self._machine_type = remote_info.get("machine_type", None)
@@ -706,7 +705,7 @@ class Remote(RemoteShell):
     @property
     def is_vm(self):
         if not hasattr(self, '_is_vm'):
-            self._is_vm = teuthology.lock.query.is_vm(self.name)
+            self._is_vm = teuthology.lock.machines.pool().is_vm(self.name)
         return self._is_vm
 
     @property
@@ -745,7 +744,8 @@ def getRemoteConsole(name, ipmiuser=None, ipmipass=None, ipmidomain=None,
     """
     Return either VirtualConsole or PhysicalConsole depending on name.
     """
-    if teuthology.lock.query.is_vm(name):
+    log.info('getRemoteConsole: %s', name)
+    if teuthology.lock.machines.pool().is_vm(name):
         try:
             return console.VirtualConsole(name)
         except Exception: