]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: osd scrape read/write operations 52407/head
authorPere Diaz Bou <pere-altea@hotmail.com>
Wed, 12 Jul 2023 11:05:17 +0000 (13:05 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Tue, 12 Sep 2023 13:51:49 +0000 (15:51 +0200)
To run:
```bash
../src/test/objectstore/allocsim/ops_scraper.py --ceph_bin_path bin/ceph
```

Example:
```bash
$ ../src/test/objectstore/allocsim/ops_scraper.py --ceph_bin_path bin/ceph                                                                           [13:06:39]
collecting from osd. 0
op_type | offset/extent | name | pgid | initiated_at | who
-------------------------------------------------------
write 0~4194304 4:c4b97d2c:::be151868021c187609fb97e4c22ae1f9:head 4.3 2023-07-12T13:06:34.959082+0200 client.4831.0:351
write 0~4194304 4:d392075e:::b068dce954dca8ca56d72e02e87f39c8:head 4.b 2023-07-12T13:06:35.093439+0200 client.4831.0:358
write 0~4194304 4:bc1ac643:::7b815bd25a40695c891dc06bdf2b65be:head 4.1d 2023-07-12T13:06:35.128097+0200 client.4831.0:360
write 0~4194304 4:d0d21bd5:::cc531199dd2707370bb461cce82bdef6:head 4.b 2023-07-12T13:06:36.023082+0200 client.4831.0:367
write 0~4194304 4:a9abc92c:::0703d6f43d14bc33d69f94ecb96c522c:head 4.15 2023-07-12T13:06:36.367886+0200 client.4831.0:369
write 0~4194304 4:9b4684ea:::d5856dc0ecad96f811012e0937d5e0ac:head 4.19 2023-07-12T13:06:36.654584+0200 client.4831.0:370
write 0~4194304 4:d15b8c87:::c3344e5de8c8936876965d9fb056ad89:head 4.b 2023-07-12T13:06:36.835690+0200 client.4831.0:371
write 0~4194304 4:ad41de2a:::8a1e2743b577a67d6bc8a1d514391b53:head 4.15 2023-07-12T13:06:37.102741+0200 client.4831.0:373
write 0~4194304 4:fc35d54f:::39cd961e7e33680f4bfd1ca0fe43dda4:head 4.1f 2023-07-12T13:06:38.047612+0200 client.4831.0:377

collecting from osd. 1
op_type | offset/extent | name | pgid | initiated_at | who
-------------------------------------------------------
write 0~4194304 4:4c9ed2e8:::86206ccc35d8aceaf78670a751a6b550:head 4.12 2023-07-12T13:06:34.771423+0200 client.4831.0:340
write 0~4194304 4:106d9524:::fe7534cc604b0eb50498cb23aaca9161:head 4.8 2023-07-12T13:06:35.128105+0200 client.4831.0:359
write 0~4194304 4:a6e3fdb1:::2692491bfc4c584ca5b1efcb896936c2:head 4.5 2023-07-12T13:06:35.567070+0200 client.4831.0:363
write 0~4194304 4:2d9a471a:::ba51255646d00c496c2638bad03e0869:head 4.14 2023-07-12T13:06:35.568290+0200 client.4831.0:364
write 0~4194304 4:2b1381b1:::c58fa31e5d3a4f5550ee143bcb07a49f:head 4.14 2023-07-12T13:06:35.764699+0200 client.4831.0:365
write 0~4194304 4:a22e158f:::b2116e08700da18595b96d58120fe81c:head 4.5 2023-07-12T13:06:36.023101+0200 client.4831.0:366
```

Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/test/objectstore/allocsim/ops_scraper.py [new file with mode: 0644]

diff --git a/src/test/objectstore/allocsim/ops_scraper.py b/src/test/objectstore/allocsim/ops_scraper.py
new file mode 100644 (file)
index 0000000..7e206c9
--- /dev/null
@@ -0,0 +1,302 @@
+#!/bin/python3.11
+import argparse
+import hashlib
+import json
+import subprocess
+from typing import List, Dict, Any, Optional
+import datetime
+import sys
+import time
+import logging
+
+FORMAT = '%(name)8s::%(levelname)8s:: %(message)s'
+logging.basicConfig(format=FORMAT)
+logger = logging.getLogger('osd-op-scrapper')
+
+
+class Env:
+    _args = None
+    _store = {}
+
+    @classmethod
+    def args(cls):
+        return cls._args
+
+    @classmethod
+    def store(cls):
+        return cls._store
+
+    @classmethod
+    def setup_env(cls, args):
+        if cls._args is not None:
+            logger.error('double setup')
+            sys.exit(1)
+        cls._args = args
+
+
+class OpDescription:
+    def __init__(self, reqid, pg, object_name, operations: List[List[str]], initiated_at):
+        self.reqid = reqid
+        self.pg = pg
+        self.object_name = object_name
+        self.operations = operations
+        self.initiated_at = initiated_at
+
+    def __str__(self):
+        return self.__repr__()
+
+    def __repr__(self):
+        res = ''
+        i = 0
+        while i < len(self.operations):
+            op_type = self.operations[i][0]
+            if op_type in ['write', 'writefull', 'read', 
+                                      'sync_read', 'sparse-read', 
+                                      'zero', 'append', 'mapext', 
+                                      'cmpext']:
+                if len(self.operations[i]) < 2:
+                    logger.error('malformed input, expected extents not found')
+                    logger.error(self.operations)
+                    break
+                extents = self.operations[i][1]
+                res += f'{self.initiated_at} {self.reqid} {op_type} {extents} {self.object_name} {self.pg}'
+            elif op_type in ['truncate']:
+                offset = self.operations[i][1]
+                if len(self.operations) < 2:
+                    logger.error('malformed input, expected offset not found')
+                    break
+                res += f'{self.initiated_at} {self.reqid} {op_type} {offset} {self.object_name} {self.pg}'
+
+            i += 1
+
+        return res
+
+
+def run_ceph_command(command: List[str], no_out=False) -> Any:
+    command.insert(0, 'ceph')
+    if Env.args().ceph_bin_path:
+        command[0] = Env.args().ceph_bin_path
+
+    command.append('--format')
+    command.append('json')
+
+    res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    if res.returncode != 0:
+        logger.error(f'error executing "{command}": \n{res.stderr}\n')
+    if no_out:
+        return None
+    return json.loads(res.stdout)
+
+def osd_ls() -> List[str]:
+    return run_ceph_command(['osd', 'ls'])
+
+def parse_osd_op(description_literal: str, initiated_at: datetime.datetime):
+    prev = 0
+    cursor = description_literal.find(' ', 0)
+    reqid = description_literal[prev:cursor]
+
+    prev = cursor
+    cursor = description_literal.find(' ', cursor+1)
+    pgid = description_literal[prev+1:cursor]
+
+    prev = cursor
+    cursor = description_literal.find(' ', cursor+1)
+    object_name = description_literal[prev+1:cursor]
+    object_name_split = object_name.split(':')
+    if not Env.args().nohash :
+        if len(object_name_split) < 2:
+            object_name = hashlib.md5(object_name.encode()).hexdigest()
+        else:
+            object_name_split[-2] = hashlib.md5(object_name_split[-2].encode()).hexdigest()
+            object_name = ':'.join(object_name_split)
+
+    prev = cursor
+    cursor = description_literal.find(']', cursor+1)
+    operations = description_literal[prev+2:cursor].split(',')
+    for i in range(len(operations)):
+        operations[i] = operations[i].split(' ')
+
+    return OpDescription(reqid, pgid, object_name, operations, initiated_at)
+
+class ProcessInfo:
+    def __init__(self, process_time: int, command_time: int, processed_info: str, ops_count: int,
+                 new_ops: int, oldest_operation: Optional[OpDescription], 
+                 capture_period_start: int, capture_period_end: int):
+        self.process_time = process_time
+        self.command_time = command_time
+        self.processed_info = processed_info
+        self.ops_count = ops_count
+        self.new_ops = new_ops
+        self.oldest_operation = oldest_operation
+        self.capture_period_start = capture_period_start
+        self.capture_period_end = capture_period_end
+
+def _to_timestamp(date: datetime.datetime) -> int:
+    return date.astimezone().timestamp()
+
+def process_osd(osd_id) -> ProcessInfo:
+    if osd_id not in Env.store():
+        Env.store()[osd_id] = {}
+        Env.store()[osd_id]['last_op'] = datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc)
+
+
+    logger.info(f'collecting from osd.{osd_id}')
+    command_time_start = time.time()
+    historic: Dict[str, Any] = run_ceph_command(['tell', f'osd.{osd_id}', 'dump_historic_ops'])
+    command_time = time.time() - command_time_start
+
+    osd_processing_start = time.time()
+
+    operations_str = 'initiated_at | who | op_type | offset/extent | name | pgid\n'
+    operations_str += '-------------------------------------------------------\n'
+    new_ops = 0 
+    oldest_operation = None
+    capture_period_start = _to_timestamp(Env.store()[osd_id]['last_op'])
+    for op in historic['ops']:
+        initiated_at = datetime.datetime.fromisoformat(op['initiated_at'])
+        if initiated_at < Env.store()[osd_id]['last_op']:
+            continue
+
+        Env.store()[osd_id]['last_op'] = initiated_at
+        description = op['description']
+        logger.debug(f'{description}')
+        if description.startswith('osd_op('):
+            new_ops += 1
+            description_data = description[7:-1]
+            op = parse_osd_op(description_data, initiated_at)
+            if not oldest_operation:
+                oldest_operation = op
+            operations_str += f'{str(op)}\n'
+    capture_period_end = time.time()
+    processing_time = time.time() - osd_processing_start
+    logger.info(f'osd.{osd_id} new_ops {new_ops}')
+
+    return ProcessInfo(processing_time, command_time, operations_str, len(historic['ops']), 
+                       new_ops, oldest_operation, capture_period_start, capture_period_end)
+
+def _set_osd_history_size(name: str, history_size: int):
+    run_ceph_command(['config', 'set', f'osd.{name}', 'osd_op_history_size', 
+                      str(int(history_size))], no_out=True)
+
+class OsdParameters:
+    def __init__(self, name: str, ready_time: int, freq: int, history_size: int) -> None:
+        self.name = name 
+        self.ready_time = ready_time 
+        self.freq = freq
+        self.history_size = history_size
+        self.sum_ops = 0
+        self.periods = 0
+
+def main():
+    parser = argparse.ArgumentParser(
+            prog='OSD operations parser')
+    parser.add_argument('--nohash',
+                        action='store_true', required=False)
+    parser.add_argument('--debug_level', type=str, default='1')
+    parser.add_argument('--ceph_bin_path', required=False)
+    parser.add_argument('--freq', required=False, type=int, default=1)
+    parser.add_argument('--min_history_size', required=False, type=int, default=100)
+    parser.add_argument('--max_history_size', required=False, type=int, default=1000)
+    parser.add_argument('--osds', required=True, type=str, default='0,1,2', 
+                        help='Comma separated list of osd names to parse. Default: "0,1,2"')
+    parser.add_argument('--out', required=False, help="filename to write output to. If none is provided it will be written to stdout")
+    args = parser.parse_args()
+
+    Env.setup_env(args)
+
+    log_levels = {
+            '1': logging.CRITICAL,
+            '2': logging.ERROR,
+            '3': logging.WARNING,
+            '4': logging.INFO,
+            '5': logging.DEBUG,
+            '6': logging.NOTSET
+    }
+
+    logger.setLevel(log_levels[Env.args().debug_level.upper()])
+    logger.debug(str(Env.args()))
+    logger.debug(str(osd_ls()))
+
+    outfile = sys.stdout
+
+    if Env.args().out:
+        outfile = open(Env.args().out, 'w+')
+
+    pref_freq = int(Env.args().freq)
+    freq = int(Env.args().freq)
+
+    sleep_time = 0
+    update_period = 10
+    sum_time_elapsed = 0
+    min_history_size = int(Env.args().min_history_size)
+    max_history_size = int(Env.args().max_history_size)
+    history_size = max_history_size
+    # -------------|-------------------|
+    history_overlap = 1.10
+    osds = Env.args().osds.split(',')
+    osds_info = []
+    for osd in osds:
+        _set_osd_history_size(osd, max_history_size)
+        osds_info.append(OsdParameters(osd, 0, freq, history_size))
+
+    logger.debug(f'start freq {freq}')
+    while True:
+        logger.debug(f'sleep sec {sleep_time}')
+        time.sleep(sleep_time)
+        sleep_time = freq
+        for osd in osds_info:
+
+            if osd.ready_time >= time.time():
+                continue
+            process_info = process_osd(osd.name)
+
+            if not process_info.new_ops:
+                new_ops_period = 1
+            else:
+                oldest_initiated_at = process_info.oldest_operation.initiated_at
+                new_ops_period = process_info.capture_period_end - _to_timestamp(oldest_initiated_at)
+            capture_period = process_info.capture_period_end - process_info.capture_period_start
+
+            sum_time_elapsed += capture_period
+            osd.sum_ops += process_info.new_ops
+
+            if process_info.new_ops >= max_history_size:
+                lost_ops = ((process_info.new_ops * capture_period) / (new_ops_period)) - process_info.new_ops
+                logger.debug(f'process_info.new_ops: {process_info.new_ops} capture_period: {capture_period} new_ops_period: {new_ops_period} start: {process_info.capture_period_start} end: {process_info.capture_period_end}')
+                osd.sum_ops += lost_ops
+
+
+
+
+            outfile.write(process_info.processed_info)
+            outfile.flush()
+
+            sleep_time -= process_info.process_time + process_info.command_time
+            osd.ready_time = time.time() + (osd.freq - process_info.process_time + process_info.command_time)
+            sleep_time = max(0, sleep_time)
+            logger.info(f'osd.{osd.name} parsing dump_historic_ops with {process_info.ops_count} ops took {process_info.process_time}')
+            logger.info(f'osd.{osd.name} command dump_historic_ops with {process_info.ops_count} ops took {process_info.command_time}')
+
+            osd.periods += 1
+            if (osd.periods % update_period) == 0:
+                ops_per_period = (osd.sum_ops / osd.periods)
+                new_history_size = int(ops_per_period * history_overlap)
+                if new_history_size >= min_history_size and new_history_size <= max_history_size:
+                    _set_osd_history_size(osd.name, new_history_size)
+                    logger.info(f'changing osd.{osd.name} history size from {osd.history_size} to {new_history_size}')
+                    osd.history_size = new_history_size
+                elif new_history_size < min_history_size:
+                    _set_osd_history_size(osd.name, min_history_size)
+                    logger.info(f'changing osd.{osd.name} history size from {osd.history_size} to {min_history_size}')
+                    logger.info(f'changing osd.{osd.name} freq from {freq} to {pref_freq}')
+                    freq = pref_freq
+                    osd.history_size = min_history_size
+                elif new_history_size >= max_history_size:
+                    new_freq = (new_history_size * freq) / osd.history_size
+                    logger.info(f'increasing freq from {freq} to {new_freq}. new_history_size: {new_history_size} osd.history_size: {osd.history_size}')
+                    freq = new_freq
+
+    close(outfile)
+
+if __name__ == '__main__':
+    main()