From: Josh Durgin Date: Tue, 18 Aug 2020 04:44:07 +0000 (+0000) Subject: test/mon/bench_auth: use processes instead of threads X-Git-Tag: v16.1.0~1150^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0916e950a6043dfe362e8a1310eb52a443f71703;p=ceph.git test/mon/bench_auth: use processes instead of threads Just to verify that python's thread scheduling isn't affecting the benchmark. Signed-off-by: Josh Durgin --- diff --git a/src/test/mon/bench_auth.py b/src/test/mon/bench_auth.py index 143e76c3fb7..5242f6892c7 100755 --- a/src/test/mon/bench_auth.py +++ b/src/test/mon/bench_auth.py @@ -5,7 +5,7 @@ import copy import json import rados import time -import threading +import multiprocessing caps_base = ["mon", "profile rbd", "osd", "profile rbd pool=rbd namespace=test"] @@ -20,7 +20,7 @@ def create_users(conn, num_namespaces, num_users): cmd['entity'] = "client.{}".format(i) conn.mon_command(json.dumps(cmd), b'') -class Worker(threading.Thread): +class Worker(multiprocessing.Process): def __init__(self, conn, num, queue, duration): super().__init__() self.conn = conn @@ -33,23 +33,24 @@ class Worker(threading.Thread): cmd = {'prefix': 'auth caps', 'entity': client} start_time = time.time() num_complete = 0 - while True: - now = time.time() - diff = now - start_time - if diff > self.duration: - self.queue.put((num_complete, diff)) - return - caps = copy.deepcopy(caps_base) - caps[-1] += ", profile rbd pool=rbd namespace=namespace{}".format(self.num * 10000 + num_complete) - cmd['caps'] = caps - cmd_start = time.time() - ret, buf, out = self.conn.mon_command(json.dumps(cmd), b'') - cmd_end = time.time() - if ret != 0: - self.queue.put((Exception("{0}: {1}".format(ret, out)), 0)) - return - num_complete += 1 - print("Process {} finished op {} - latency: {}".format(self.num, num_complete, cmd_end - cmd_start)) + with rados.Rados(conffile='') as conn: + while True: + now = time.time() + diff = now - start_time + if diff > self.duration: + self.queue.put((num_complete, diff)) + return + caps = copy.deepcopy(caps_base) + caps[-1] += ", profile rbd pool=rbd namespace=namespace{}".format(self.num * 10000 + num_complete) + cmd['caps'] = caps + cmd_start = time.time() + ret, buf, out = conn.mon_command(json.dumps(cmd), b'') + cmd_end = time.time() + if ret != 0: + self.queue.put((Exception("{0}: {1}".format(ret, out)), 0)) + return + num_complete += 1 + print("Process {} finished op {} - latency: {}".format(self.num, num_complete, cmd_end - cmd_start)) def main(): parser = argparse.ArgumentParser(description=""" @@ -79,17 +80,17 @@ Benchmark updates to ceph users' capabilities. Run one update at a time in each duration = args.duration workers = [] results = [] - q = queue.Queue() + q = multiprocessing.Queue() with rados.Rados(conffile=rados.Rados.DEFAULT_CONF_FILES) as conn: create_users(conn, num_namespaces, num_threads) - for i in range(num_threads): - workers.append(Worker(conn, i, q, duration)) - workers[-1].start() - for i in range(num_threads): - num_complete, seconds = q.get() - if isinstance(num_complete, Exception): - raise num_complete - results.append((num_complete, seconds)) + for i in range(num_threads): + workers.append(Worker(conn, i, q, duration)) + workers[-1].start() + for i in range(num_threads): + num_complete, seconds = q.get() + if isinstance(num_complete, Exception): + raise num_complete + results.append((num_complete, seconds)) total = 0 total_rate = 0 for num, sec in results: