import json
import rados
import time
-import threading
+import multiprocessing
caps_base = ["mon", "profile rbd", "osd", "profile rbd pool=rbd namespace=test"]
cmd['entity'] = "client.{}".format(i)
conn.mon_command(json.dumps(cmd), b'')
-class Worker(threading.Thread):
+class Worker(multiprocessing.Process):
def __init__(self, conn, num, queue, duration):
super().__init__()
self.conn = conn
cmd = {'prefix': 'auth caps', 'entity': client}
start_time = time.time()
num_complete = 0
- while True:
- now = time.time()
- diff = now - start_time
- if diff > self.duration:
- self.queue.put((num_complete, diff))
- return
- caps = copy.deepcopy(caps_base)
- caps[-1] += ", profile rbd pool=rbd namespace=namespace{}".format(self.num * 10000 + num_complete)
- cmd['caps'] = caps
- cmd_start = time.time()
- ret, buf, out = self.conn.mon_command(json.dumps(cmd), b'')
- cmd_end = time.time()
- if ret != 0:
- self.queue.put((Exception("{0}: {1}".format(ret, out)), 0))
- return
- num_complete += 1
- print("Process {} finished op {} - latency: {}".format(self.num, num_complete, cmd_end - cmd_start))
+ with rados.Rados(conffile='') as conn:
+ while True:
+ now = time.time()
+ diff = now - start_time
+ if diff > self.duration:
+ self.queue.put((num_complete, diff))
+ return
+ caps = copy.deepcopy(caps_base)
+ caps[-1] += ", profile rbd pool=rbd namespace=namespace{}".format(self.num * 10000 + num_complete)
+ cmd['caps'] = caps
+ cmd_start = time.time()
+ ret, buf, out = conn.mon_command(json.dumps(cmd), b'')
+ cmd_end = time.time()
+ if ret != 0:
+ self.queue.put((Exception("{0}: {1}".format(ret, out)), 0))
+ return
+ num_complete += 1
+ print("Process {} finished op {} - latency: {}".format(self.num, num_complete, cmd_end - cmd_start))
def main():
parser = argparse.ArgumentParser(description="""
duration = args.duration
workers = []
results = []
- q = queue.Queue()
+ q = multiprocessing.Queue()
with rados.Rados(conffile=rados.Rados.DEFAULT_CONF_FILES) as conn:
create_users(conn, num_namespaces, num_threads)
- for i in range(num_threads):
- workers.append(Worker(conn, i, q, duration))
- workers[-1].start()
- for i in range(num_threads):
- num_complete, seconds = q.get()
- if isinstance(num_complete, Exception):
- raise num_complete
- results.append((num_complete, seconds))
+ for i in range(num_threads):
+ workers.append(Worker(conn, i, q, duration))
+ workers[-1].start()
+ for i in range(num_threads):
+ num_complete, seconds = q.get()
+ if isinstance(num_complete, Exception):
+ raise num_complete
+ results.append((num_complete, seconds))
total = 0
total_rate = 0
for num, sec in results: