From 643ba66daedd6d50c028d52f26eec760c6ec62fa Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Fri, 11 Aug 2023 14:24:57 +0000 Subject: [PATCH] tools/variable_load: Add generator of variable workload The tool is dedicated to create highly variable workloads. The intended audience is 1) scraper testing - tool to sniff on OSD ops 2) CoDel testing - bandwidth/latency optimization algorithm in BS Initial commit. Signed-off-by: Adam Kupczyk --- .../objectstore/allocsim/variable_load.py | 237 ++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100755 src/test/objectstore/allocsim/variable_load.py diff --git a/src/test/objectstore/allocsim/variable_load.py b/src/test/objectstore/allocsim/variable_load.py new file mode 100755 index 0000000000000..cb6ab0d371ced --- /dev/null +++ b/src/test/objectstore/allocsim/variable_load.py @@ -0,0 +1,237 @@ +#!/bin/python +import rados +import time +import math +import threading +import argparse +import logging +import random + +logger = logging.getLogger() + +""" +Workloads are defined based on sinusiodal waves. + +The base is a definition of workload density + D(x) = 1 + sin(x) +where x defines a point in time. +The period of function D is 2pi. + +The amount of IOs to execute in workload W until time t is: + W(t) = Int(0, t){x + cos(x)} + +The algorithm keeps calculating W(t) and schedules the additional IOs. + +Actual useful form includes amplitiude(A), frequency(F) and shift(S) + D(x) = A * (1 + sin(F * x + S)) +Which translates into: + Int(D) = A * (x + cos(F * x + S)/F) + W(t) = Int(0,t){D} = D(t) - D(0) + +To get workloads that go sometimes completely mute, one can shift +values of workload density by constant value M. + D(x) = A * (1 + sin(F*x+S) - M) + +At M<0 : D(x)>0, can be used to produce constant shift to workload density +At M=0 : D(x)>=0, basic sinusoidal workload that never stops +M in (0,1): D(x) gets both + and - values; for large t, W(t)->+inf, ideal to model sporadic bursts of load +M in <1,2>: W(t) can be possitive for small t, but for large t, W(t)->-inf; it is not useful +At M>=2 : D(x)<=0, produces no workload + + Int(D) = A * (x + cos(F*x+S)/F - M*x) = + = A * ((1-M)*x + cos(F*x+S)/F) +This variant of D(x) can achieve negative values, and as result W(t) decreases. +The algorithm stops scheduling new IOs when W(t) decreases until W(t) rises above +previous maximum value. + +M is replaced with coefficient P that models support for W(t), that is, area where workload W(t) is doing work. +It has to work like examples: +P=0 (M=1) : W(t) no workload, useless +P=0.3 (M=0.7) : W(t) sporadic 10% time workload occurs +P=1 (M=0) : W(t) nicely sinusoidal workload +P=1.2 (M=-0.2): W(t) amplified workload that never reaches less then 0.2*A +The relation is: M=(1-P) P=(1-M) + + Int(D) = A * (P*x + cos(F*x+S)/F) + +Workloads W(t) are sum of set of sinusoidal components + W(t) = Int(0,t){D0 + D1 + .. +Dn} = + = Val(0,t)( A0*(P*x+cos(F0*x+S0)/F0) + ... + An*(P*x+cos(Fn*x+Sn)/Fn) ) = + = Val(0,t)( Sum(Ai*P)*x + Sum(Ai*cos(Fi*x+Si)/Fi) ) +The coefficent P is defined only once per set. +""" + +class workload_sine: + class AFS: + def __init__(self, A: float, F:float, S:float) -> None: + assert(F > 0) + self.A = A + self.F = F + self.S = S + self.AdivF = A / F + + def __init__(self) -> None: + self.P = 0.0 + self.afs = list() + self.sum_Ai_P = 0.0 + + def due(self, x: float) -> float: + s = self.sum_Ai_P * x + for i in self.afs: + s += i.AdivF * math.cos(i.F * x + i.S) + return s - due0 + +class workload_custom: + pass + +class workload: +# def __init__(self, name: str, ready_time: int, freq: int, history_size: int) -> None: + def __init__(self, name: str) -> None: + self.name = name + self.start_time = time.time() + self.object_name = 'test_object_' + self.object_cnt = 1000 + self.object_rr = 0 + self.iodepth = 150 + self.ops_max_backlog = 2000 # how many ops we are allowed to fall behind before dropping some + self.ops_started = 0 # how many ops were scheduled for execution + self.ops_done = 0 # how many ops were finished already + self.ops_skipped = 0 # ops that should have been started, but we decided to skip them + # we do it when the ops backlog is already too large + self.stopped = False + self.reporting_frequency = 1.0 + self.reporting_next = self.start_time + + #returns ops/s + def work_density(self, x: float): + return 1 + math.cos(x*0.5) + def work_needed_until(self, x: float): + return 1000 * ((0.1)*x + 2*math.sin(x*0.5)) + + def next_op(self): + pass + def on_complete(self, object_name): + def action(result): + self.ops_done += 1 + #print(str(object_name) + ":" + str(result.get_return_value()) + \ + # ":" + str(self.ops_started - self.ops_done)) + if not self.stopped: + self.schedule_ops() + return + return action + #returns time.time() when next op should be scheduled + def get_next_op_at(self) -> float: + return next_op_at + + def schedule_op(self): + self.ops_started += 1 + self.object_rr = (self.object_rr + 1 ) % self.object_cnt + object_name = 'test_object_' + str(self.object_rr) + #data = b'Hello, Ceph!' + data = b'c'*4096 + completion_context = self.on_complete(object_name) + self.ioctx.aio_write(object_name, data, oncomplete=completion_context) + + #returns time.time() when next op should be scheduled + def schedule_ops(self) -> float: + now = time.time() + cumulative_todo = self.work_needed_until(now - self.start_time) + if now > self.reporting_next: + print("time="+str(now - self.start_time)+\ + " due="+str(cumulative_todo)+\ + " started="+str(self.ops_started)+\ + " done="+str(self.ops_done)+\ + " skipped="+str(self.ops_skipped) ) + self.reporting_next = now + self.reporting_frequency + ops_to_schedule = int(cumulative_todo) - (self.ops_started + self.ops_skipped) + # check if we have capacity to schedule more ops + # maybe we are lagging behind + #print("sum="+str(cumulative_todo)+" to_schd="+str(ops_to_schedule)) + if ops_to_schedule > self.ops_max_backlog: + ops_to_skip = ops_to_schedule - self.ops_max_backlog + self.ops_skipped += ops_to_skip + #reduce if we would go above iodepth + if ops_to_schedule + (self.ops_started - self.ops_done) > self.iodepth: + ops_to_schedule = self.iodepth - (self.ops_started - self.ops_done) + if ops_to_schedule >= 1: + for i in range(math.floor(ops_to_schedule)): + self.schedule_op() + #print("ops_in_flight="+str(self.ops_started - self.ops_done)) + if self.ops_started - self.ops_done == 0: + #signal main thread we need sleep + self.next_op_at = time.time() + 1 + wup = threading.Timer(0.01, self.schedule_ops) + wup.start() + return 0 + + def start(self, ioctx: rados.Ioctx): + self.ioctx = ioctx + self.schedule_ops() + +#end workload class + + + +def run(workloads): + #wakeup = threading.Semaphore + cluster = rados.Rados(conffile='ceph.conf') + cluster.connect() + pool_name = 'test' + ioctx = cluster.open_ioctx(pool_name) + + for w in workloads: + w.start(ioctx) + + # w = workload(ioctx, "workload-1") + + #w.schedule_ops() + #w1.work_needed_until = lambda x : 1000 *(3*x + 2*math.sin(x*0.8)) + #w1.schedule_ops() + time.sleep(50); + ioctx.close() + + + + +def main(): + parser = argparse.ArgumentParser( + prog='OSD operations parser') + parser.add_argument('--debug_level', type=str, default='1') + parser.add_argument('--runtime', required=False, type=int, default=60) + parser.add_argument('--workload', action='append', required=False, type=str, #nargs='*', + help='Comma separated list of osd names to parse. Default: "0,1,2"') + parser.add_argument('--out', required=False, help="filename to write output to. If none is provided it will be written to stdout") + args = parser.parse_args() + print(args) + #Env.setup_env(args) + + log_levels = { + '1': logging.CRITICAL, + '2': logging.ERROR, + '3': logging.WARNING, + '4': logging.INFO, + '5': logging.DEBUG, + '6': logging.NOTSET + } + + logger.setLevel(log_levels[args.debug_level.upper()]) + #logger.debug(str(Env.args())) + #logger.debug(str(osd_ls())) + print(args.workload) + + for i in args.workload: + wparams = i.split(", ") + wdict = dict() + for j in wparams: + k = j.split('=') + wdict[k[0]] = k[1] + print(wdict) + + workloads = list() + workloads.append(workload("w-1")) + #return + run(workloads) + + +if __name__ == '__main__': + main() -- 2.39.5