]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
tools/variable_load: Improve load generation
authorAdam Kupczyk <akupczyk@ibm.com>
Tue, 29 Aug 2023 13:50:01 +0000 (13:50 +0000)
committerAdam Kupczyk <akupczyk@ibm.com>
Thu, 31 Aug 2023 10:09:04 +0000 (10:09 +0000)
Improve generation of workloads.
Add options. Add useful help.

Signed-off-by: Adam Kupczyk <akupczyk@ibm.com>
src/test/objectstore/allocsim/variable_load.py

index cb6ab0d371cedf310b965fe28bfdeadd29c1c6c5..1f68d45151d04b4b3b3ca81e2564aaabf2d3917b 100755 (executable)
@@ -7,12 +7,12 @@ import argparse
 import logging
 import random
 
-logger = logging.getLogger()
+logger = logging.getLogger("iots")
 
 """
 Workloads are defined based on sinusiodal waves.
 
-The base is a definition of workload density 
+The base is a definition of workload density
   D(x) = 1 + sin(x)
 where x defines a point in time.
 The period of function D is 2pi.
@@ -38,8 +38,8 @@ M in (0,1): D(x) gets both + and - values; for large t, W(t)->+inf, ideal to mod
 M in <1,2>: W(t) can be possitive for small t, but for large t, W(t)->-inf; it is not useful
 At M>=2   : D(x)<=0, produces no workload
 
-  Int(D) = A * (x + cos(F*x+S)/F - M*x) =
-         = A * ((1-M)*x + cos(F*x+S)/F)
+  Int(D) = A * (x - cos(F*x+S)/F - M*x) =
+         = A * ((1-M)*x - cos(F*x+S)/F)
 This variant of D(x) can achieve negative values, and as result W(t) decreases.
 The algorithm stops scheduling new IOs when W(t) decreases until W(t) rises above
 previous maximum value.
@@ -52,16 +52,30 @@ P=1   (M=0)   : W(t) nicely sinusoidal workload
 P=1.2 (M=-0.2): W(t) amplified workload that never reaches less then 0.2*A
 The relation is: M=(1-P) P=(1-M)
 
-  Int(D) = A * (P*x + cos(F*x+S)/F)
-  
+  Int(D) = A * (P*x - cos(F*x+S)/F)
+
 Workloads W(t) are sum of set of sinusoidal components
   W(t) = Int(0,t){D0 + D1 + .. +Dn} =
-       = Val(0,t)( A0*(P*x+cos(F0*x+S0)/F0) + ... + An*(P*x+cos(Fn*x+Sn)/Fn) ) =
-       = Val(0,t)( Sum(Ai*P)*x + Sum(Ai*cos(Fi*x+Si)/Fi) )
+       = Val(0,t)( A0*(P*x-cos(F0*x+S0)/F0) + ... + An*(P*x-cos(Fn*x+Sn)/Fn) ) =
+       = Val(0,t)( Sum(Ai*P)*x - Sum(Ai*cos(Fi*x+Si)/Fi) )
 The coefficent P is defined only once per set.
 """
 
-class workload_sine:
+
+class Idue:
+  def __init__(self) -> None:
+    pass
+  def due(self, x: float) -> float:
+    return 0
+
+class workload_sine(Idue):
+  #set of default ranges for random to select from
+  #defaut "cnt=3 amp=500-1000 period=90-120 shift=0-1 supp=1"
+  default_amp: str = '500-1000'
+  default_period: str = '30-60'
+  default_shift: str = '0-1'
+  default_supp: str = '1'
+  default_cnt: int = 3
   class AFS:
     def __init__(self, A: float, F:float, S:float) -> None:
       assert(F > 0)
@@ -69,25 +83,82 @@ class workload_sine:
       self.F = F
       self.S = S
       self.AdivF = A / F
+  def __init__(self, params: dict) -> None:
+    def construct(name: str, default: str) -> list:
+      #we need to construct cnt elements from definition from params
+      #input cnt=5 amp=300,10-20,500-1000
+      #result: [300, 16.34, 515.93, 773.68, 604.11]
+      result = list()
+      #select val as either default of from param
+      if name in params:
+        val = params[name]
+      else:
+        val = default
+
+      vals_list = val.split(',')
+      #vals_list contains list of either direct values '123' or range '120-150'
+      for i in range(cnt):
+        #we assume there will be one element in vals_list for each element we want to produce
+        #if we run out of elements in vals_list, we reuse the last one
+        vals = vals_list[0].split('-')
+        if len(vals_list) > 1:
+          vals_list.pop(0)
+        assert(0 < len(vals) and len(vals) <= 2)
+        if len(vals) == 1:
+          low = float(vals[0])
+          high = low
+        else:
+          low = float(vals[0])
+          high = float(vals[1])
+        result.append((high-low)*rnd.random() + low)
+      return result
 
-  def __init__(self) -> None:
     self.P = 0.0
-    self.afs = list()
     self.sum_Ai_P = 0.0
+    self.afs = list()
+    #first decide how many sin components are to be here
+    if 'cnt' in params:
+      cnt = int(params['cnt'])
+    else:
+      cnt = self.default_cnt
+    if 'rand' in params:
+      rnd = random.Random(params['rand'])
+    else:
+      rnd = random.Random(time.time())
+
+    if 'supp' in params:
+      self.P = float(params['supp'])
+    else:
+      self.P = 1
+
+    #don't care if cnt==0, lame workload is still a workload
+    amps =    construct('amp',    self.default_amp)
+    periods = construct('period', self.default_period)
+    shifts =  construct('shift',  self.default_shift)
+    assert(len(amps) == cnt)
+    assert(len(periods) == cnt)
+    assert(len(shifts) == cnt)
+    for i in range(cnt):
+      self.afs.append(self.AFS(amps[i] / 2,
+                               (2 * math.pi) / periods[i],
+                               shifts[i] * (2 * math.pi) ))
+      self.sum_Ai_P += amps[i] / 2 * self.P
+    self.due0 = 0;
+    self.due0 = self.due(0)
 
   def due(self, x: float) -> float:
     s = self.sum_Ai_P * x
     for i in self.afs:
-      s += i.AdivF * math.cos(i.F * x + i.S)
-    return s - due0
+      s -= i.AdivF * math.cos(i.F * x + i.S)
+    return s - self.due0
 
-class workload_custom:
+class workload_custom(Idue):
   pass
 
 class workload:
-#  def __init__(self, name: str, ready_time: int, freq: int, history_size: int) -> None:
-  def __init__(self, name: str) -> None:
+  def __init__(self, name: str, due: Idue) -> None:
     self.name = name
+    self.due = due
     self.start_time = time.time()
     self.object_name = 'test_object_'
     self.object_cnt = 1000
@@ -98,25 +169,24 @@ class workload:
     self.ops_done = 0 # how many ops were finished already
     self.ops_skipped = 0 # ops that should have been started, but we decided to skip them
                          # we do it when the ops backlog is already too large
+    self.ops_reported = 0 # to track newly completed
+    self.stopping = False
     self.stopped = False
     self.reporting_frequency = 1.0
     self.reporting_next = self.start_time
 
-  #returns ops/s 
+  #returns ops/s
   def work_density(self, x: float):
     return 1 + math.cos(x*0.5)
   def work_needed_until(self, x: float):
-    return 1000 * ((0.1)*x + 2*math.sin(x*0.5))
+    return self.due.due(x)
 
   def next_op(self):
     pass
   def on_complete(self, object_name):
     def action(result):
       self.ops_done += 1
-      #print(str(object_name) + ":" + str(result.get_return_value()) + \
-      #      ":" + str(self.ops_started - self.ops_done))
-      if not self.stopped:
-        self.schedule_ops()
+      self.schedule_ops()
       return
     return action
   #returns time.time() when next op should be scheduled
@@ -127,26 +197,22 @@ class workload:
     self.ops_started += 1
     self.object_rr = (self.object_rr + 1 ) % self.object_cnt
     object_name = 'test_object_' + str(self.object_rr)
-    #data = b'Hello, Ceph!'
     data = b'c'*4096
     completion_context = self.on_complete(object_name)
     self.ioctx.aio_write(object_name, data, oncomplete=completion_context)
-    
+
   #returns time.time() when next op should be scheduled
-  def schedule_ops(self) -> float:
+  def schedule_ops(self):
+    if self.stopping:
+      self.stopped = True
+      return
     now = time.time()
     cumulative_todo = self.work_needed_until(now - self.start_time)
     if now > self.reporting_next:
-      print("time="+str(now - self.start_time)+\
-            " due="+str(cumulative_todo)+\
-            " started="+str(self.ops_started)+\
-            " done="+str(self.ops_done)+\
-            " skipped="+str(self.ops_skipped) )
-      self.reporting_next = now + self.reporting_frequency
+      print_report(now)
     ops_to_schedule = int(cumulative_todo) - (self.ops_started + self.ops_skipped)
     # check if we have capacity to schedule more ops
     # maybe we are lagging behind
-    #print("sum="+str(cumulative_todo)+" to_schd="+str(ops_to_schedule))
     if ops_to_schedule > self.ops_max_backlog:
       ops_to_skip = ops_to_schedule - self.ops_max_backlog
       self.ops_skipped += ops_to_skip
@@ -156,24 +222,56 @@ class workload:
     if ops_to_schedule >= 1:
       for i in range(math.floor(ops_to_schedule)):
         self.schedule_op()
-    #print("ops_in_flight="+str(self.ops_started - self.ops_done))
     if self.ops_started - self.ops_done == 0:
       #signal main thread we need sleep
       self.next_op_at = time.time() + 1
       wup = threading.Timer(0.01, self.schedule_ops)
       wup.start()
-    return 0
+    return
+
+  def dry_schedule_ops(self):
+    if self.stopping:
+      self.stopped = True
+      return
+    now = time.time() + (time.time() - self.start_time) * 10
+    cumulative_todo = self.work_needed_until(now - self.start_time)
+    if now > self.reporting_next:
+      print_report(now)
+    ops_to_schedule = int(cumulative_todo) - (self.ops_started + self.ops_skipped)
+    self.ops_started += ops_to_schedule
+    self.ops_done += ops_to_schedule
+    wup = threading.Timer(0.01, self.dry_schedule_ops)
+    wup.start()
+    return
 
   def start(self, ioctx: rados.Ioctx):
     self.ioctx = ioctx
     self.schedule_ops()
 
-#end workload class
+  def dry_start(self):
+    self.dry_schedule_ops()
 
+  def stop(self):
+    self.stopping = True
+    while (not self.stopped or self.ops_done != self.ops_started):
+      logger.debug("stopped=%d ios_in_flight=%d" % \
+                   (self.stopped, self.ops_started - self.ops_done))
+      time.sleep(0.01)
+    logger.debug("workload %s stopped" % self.name)
 
+#end workload class
 
-def run(workloads):
-  #wakeup = threading.Semaphore
+def print_report(now):
+  elapsed = now - workloads[0].start_time
+  iops = 0
+  for w in workloads:
+    iops = iops + (w.ops_done - w.ops_reported)
+    w.reporting_next = w.reporting_next + w.reporting_frequency
+    w.ops_reported = w.ops_done
+  print("time=%7.3f" % elapsed + \
+        " iops=%d" % iops)
+
+def run(workloads, runtime):
   cluster = rados.Rados(conffile='ceph.conf')
   cluster.connect()
   pool_name = 'test'
@@ -182,28 +280,31 @@ def run(workloads):
   for w in workloads:
     w.start(ioctx)
 
-  #  w = workload(ioctx, "workload-1")
+  time.sleep(runtime);
+  for w in workloads:
+    w.stop()
 
-  #w.schedule_ops()
-  #w1.work_needed_until = lambda x : 1000 *(3*x + 2*math.sin(x*0.8))
-  #w1.schedule_ops()
-  time.sleep(50);
   ioctx.close()
 
+def dry_run(workloads, runtime):
+  for w in workloads:
+    w.dry_start()
+  time.sleep(runtime/10);
+  for w in workloads:
+    w.stop()
 
-
+workloads = list()
 
 def main():
   parser = argparse.ArgumentParser(
-      prog='OSD operations parser')
+      prog='Variable OSD load generator')
   parser.add_argument('--debug_level', type=str, default='1')
+  parser.add_argument('--dry-run', action='store_true', default=False)
   parser.add_argument('--runtime', required=False, type=int, default=60)
   parser.add_argument('--workload', action='append', required=False, type=str, #nargs='*', 
-                      help='Comma separated list of osd names to parse. Default: "0,1,2"')
-  parser.add_argument('--out', required=False, help="filename to write output to. If none is provided it will be written to stdout")
+                      help='Space separated list workload specs. '
+                      'Default: "cnt=3 amp=500-1000 period=90-120 shift=0-1 supp=1"')
   args = parser.parse_args()
-  print(args)
-  #Env.setup_env(args)
 
   log_levels = {
       '1': logging.CRITICAL,
@@ -214,23 +315,27 @@ def main():
       '6': logging.NOTSET
   }
 
-  logger.setLevel(log_levels[args.debug_level.upper()])
-  #logger.debug(str(Env.args()))
-  #logger.debug(str(osd_ls()))
-  print(args.workload)
-  
+  logging.basicConfig(level=log_levels[args.debug_level])
+  logger = logging.getLogger("iots")
+  logger.setLevel(log_levels[args.debug_level])
+
+  wcnt = 0
   for i in args.workload:
-    wparams = i.split(", ")
+    wparams = i.split(" ")
     wdict = dict()
     for j in wparams:
-      k = j.split('=')
-      wdict[k[0]] = k[1]
-    print(wdict)
-
-  workloads = list()
-  workloads.append(workload("w-1"))
-  #return
-  run(workloads)
+      if j.find('=') != -1:
+        k = j.split('=')
+        wdict[k[0]] = k[1]
+    wsine = workload_sine(wdict);
+    w = workload("workload_%d" % wcnt, wsine)
+    wcnt = wcnt + 1
+    workloads.append(w)
+
+  if args.dry_run:
+    dry_run(workloads, args.runtime)
+  else:
+    run(workloads, args.runtime)
 
 
 if __name__ == '__main__':