From f3cf1bf5bc8af4d47a74eb02b59f85ac1dbb5c8e Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 12 Jul 2023 13:05:17 +0200 Subject: [PATCH] os/bluestore: osd scrape read/write operations To run: ```bash ../src/test/objectstore/allocsim/ops_scraper.py --ceph_bin_path bin/ceph ``` Example: ```bash $ ../src/test/objectstore/allocsim/ops_scraper.py --ceph_bin_path bin/ceph [13:06:39] collecting from osd. 0 op_type | offset/extent | name | pgid | initiated_at | who ------------------------------------------------------- write 0~4194304 4:c4b97d2c:::be151868021c187609fb97e4c22ae1f9:head 4.3 2023-07-12T13:06:34.959082+0200 client.4831.0:351 write 0~4194304 4:d392075e:::b068dce954dca8ca56d72e02e87f39c8:head 4.b 2023-07-12T13:06:35.093439+0200 client.4831.0:358 write 0~4194304 4:bc1ac643:::7b815bd25a40695c891dc06bdf2b65be:head 4.1d 2023-07-12T13:06:35.128097+0200 client.4831.0:360 write 0~4194304 4:d0d21bd5:::cc531199dd2707370bb461cce82bdef6:head 4.b 2023-07-12T13:06:36.023082+0200 client.4831.0:367 write 0~4194304 4:a9abc92c:::0703d6f43d14bc33d69f94ecb96c522c:head 4.15 2023-07-12T13:06:36.367886+0200 client.4831.0:369 write 0~4194304 4:9b4684ea:::d5856dc0ecad96f811012e0937d5e0ac:head 4.19 2023-07-12T13:06:36.654584+0200 client.4831.0:370 write 0~4194304 4:d15b8c87:::c3344e5de8c8936876965d9fb056ad89:head 4.b 2023-07-12T13:06:36.835690+0200 client.4831.0:371 write 0~4194304 4:ad41de2a:::8a1e2743b577a67d6bc8a1d514391b53:head 4.15 2023-07-12T13:06:37.102741+0200 client.4831.0:373 write 0~4194304 4:fc35d54f:::39cd961e7e33680f4bfd1ca0fe43dda4:head 4.1f 2023-07-12T13:06:38.047612+0200 client.4831.0:377 collecting from osd. 1 op_type | offset/extent | name | pgid | initiated_at | who ------------------------------------------------------- write 0~4194304 4:4c9ed2e8:::86206ccc35d8aceaf78670a751a6b550:head 4.12 2023-07-12T13:06:34.771423+0200 client.4831.0:340 write 0~4194304 4:106d9524:::fe7534cc604b0eb50498cb23aaca9161:head 4.8 2023-07-12T13:06:35.128105+0200 client.4831.0:359 write 0~4194304 4:a6e3fdb1:::2692491bfc4c584ca5b1efcb896936c2:head 4.5 2023-07-12T13:06:35.567070+0200 client.4831.0:363 write 0~4194304 4:2d9a471a:::ba51255646d00c496c2638bad03e0869:head 4.14 2023-07-12T13:06:35.568290+0200 client.4831.0:364 write 0~4194304 4:2b1381b1:::c58fa31e5d3a4f5550ee143bcb07a49f:head 4.14 2023-07-12T13:06:35.764699+0200 client.4831.0:365 write 0~4194304 4:a22e158f:::b2116e08700da18595b96d58120fe81c:head 4.5 2023-07-12T13:06:36.023101+0200 client.4831.0:366 ``` Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_scraper.py | 302 +++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 src/test/objectstore/allocsim/ops_scraper.py diff --git a/src/test/objectstore/allocsim/ops_scraper.py b/src/test/objectstore/allocsim/ops_scraper.py new file mode 100644 index 0000000000000..7e206c9183465 --- /dev/null +++ b/src/test/objectstore/allocsim/ops_scraper.py @@ -0,0 +1,302 @@ +#!/bin/python3.11 +import argparse +import hashlib +import json +import subprocess +from typing import List, Dict, Any, Optional +import datetime +import sys +import time +import logging + +FORMAT = '%(name)8s::%(levelname)8s:: %(message)s' +logging.basicConfig(format=FORMAT) +logger = logging.getLogger('osd-op-scrapper') + + +class Env: + _args = None + _store = {} + + @classmethod + def args(cls): + return cls._args + + @classmethod + def store(cls): + return cls._store + + @classmethod + def setup_env(cls, args): + if cls._args is not None: + logger.error('double setup') + sys.exit(1) + cls._args = args + + +class OpDescription: + def __init__(self, reqid, pg, object_name, operations: List[List[str]], initiated_at): + self.reqid = reqid + self.pg = pg + self.object_name = object_name + self.operations = operations + self.initiated_at = initiated_at + + def __str__(self): + return self.__repr__() + + def __repr__(self): + res = '' + i = 0 + while i < len(self.operations): + op_type = self.operations[i][0] + if op_type in ['write', 'writefull', 'read', + 'sync_read', 'sparse-read', + 'zero', 'append', 'mapext', + 'cmpext']: + if len(self.operations[i]) < 2: + logger.error('malformed input, expected extents not found') + logger.error(self.operations) + break + extents = self.operations[i][1] + res += f'{self.initiated_at} {self.reqid} {op_type} {extents} {self.object_name} {self.pg}' + elif op_type in ['truncate']: + offset = self.operations[i][1] + if len(self.operations) < 2: + logger.error('malformed input, expected offset not found') + break + res += f'{self.initiated_at} {self.reqid} {op_type} {offset} {self.object_name} {self.pg}' + + i += 1 + + return res + + +def run_ceph_command(command: List[str], no_out=False) -> Any: + command.insert(0, 'ceph') + if Env.args().ceph_bin_path: + command[0] = Env.args().ceph_bin_path + + command.append('--format') + command.append('json') + + res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if res.returncode != 0: + logger.error(f'error executing "{command}": \n{res.stderr}\n') + if no_out: + return None + return json.loads(res.stdout) + +def osd_ls() -> List[str]: + return run_ceph_command(['osd', 'ls']) + +def parse_osd_op(description_literal: str, initiated_at: datetime.datetime): + prev = 0 + cursor = description_literal.find(' ', 0) + reqid = description_literal[prev:cursor] + + prev = cursor + cursor = description_literal.find(' ', cursor+1) + pgid = description_literal[prev+1:cursor] + + prev = cursor + cursor = description_literal.find(' ', cursor+1) + object_name = description_literal[prev+1:cursor] + object_name_split = object_name.split(':') + if not Env.args().nohash : + if len(object_name_split) < 2: + object_name = hashlib.md5(object_name.encode()).hexdigest() + else: + object_name_split[-2] = hashlib.md5(object_name_split[-2].encode()).hexdigest() + object_name = ':'.join(object_name_split) + + prev = cursor + cursor = description_literal.find(']', cursor+1) + operations = description_literal[prev+2:cursor].split(',') + for i in range(len(operations)): + operations[i] = operations[i].split(' ') + + return OpDescription(reqid, pgid, object_name, operations, initiated_at) + +class ProcessInfo: + def __init__(self, process_time: int, command_time: int, processed_info: str, ops_count: int, + new_ops: int, oldest_operation: Optional[OpDescription], + capture_period_start: int, capture_period_end: int): + self.process_time = process_time + self.command_time = command_time + self.processed_info = processed_info + self.ops_count = ops_count + self.new_ops = new_ops + self.oldest_operation = oldest_operation + self.capture_period_start = capture_period_start + self.capture_period_end = capture_period_end + +def _to_timestamp(date: datetime.datetime) -> int: + return date.astimezone().timestamp() + +def process_osd(osd_id) -> ProcessInfo: + if osd_id not in Env.store(): + Env.store()[osd_id] = {} + Env.store()[osd_id]['last_op'] = datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc) + + + logger.info(f'collecting from osd.{osd_id}') + command_time_start = time.time() + historic: Dict[str, Any] = run_ceph_command(['tell', f'osd.{osd_id}', 'dump_historic_ops']) + command_time = time.time() - command_time_start + + osd_processing_start = time.time() + + operations_str = 'initiated_at | who | op_type | offset/extent | name | pgid\n' + operations_str += '-------------------------------------------------------\n' + new_ops = 0 + oldest_operation = None + capture_period_start = _to_timestamp(Env.store()[osd_id]['last_op']) + for op in historic['ops']: + initiated_at = datetime.datetime.fromisoformat(op['initiated_at']) + if initiated_at < Env.store()[osd_id]['last_op']: + continue + + Env.store()[osd_id]['last_op'] = initiated_at + description = op['description'] + logger.debug(f'{description}') + if description.startswith('osd_op('): + new_ops += 1 + description_data = description[7:-1] + op = parse_osd_op(description_data, initiated_at) + if not oldest_operation: + oldest_operation = op + operations_str += f'{str(op)}\n' + capture_period_end = time.time() + processing_time = time.time() - osd_processing_start + logger.info(f'osd.{osd_id} new_ops {new_ops}') + + return ProcessInfo(processing_time, command_time, operations_str, len(historic['ops']), + new_ops, oldest_operation, capture_period_start, capture_period_end) + +def _set_osd_history_size(name: str, history_size: int): + run_ceph_command(['config', 'set', f'osd.{name}', 'osd_op_history_size', + str(int(history_size))], no_out=True) + +class OsdParameters: + def __init__(self, name: str, ready_time: int, freq: int, history_size: int) -> None: + self.name = name + self.ready_time = ready_time + self.freq = freq + self.history_size = history_size + self.sum_ops = 0 + self.periods = 0 + +def main(): + parser = argparse.ArgumentParser( + prog='OSD operations parser') + parser.add_argument('--nohash', + action='store_true', required=False) + parser.add_argument('--debug_level', type=str, default='1') + parser.add_argument('--ceph_bin_path', required=False) + parser.add_argument('--freq', required=False, type=int, default=1) + parser.add_argument('--min_history_size', required=False, type=int, default=100) + parser.add_argument('--max_history_size', required=False, type=int, default=1000) + parser.add_argument('--osds', required=True, type=str, default='0,1,2', + help='Comma separated list of osd names to parse. Default: "0,1,2"') + parser.add_argument('--out', required=False, help="filename to write output to. If none is provided it will be written to stdout") + args = parser.parse_args() + + Env.setup_env(args) + + log_levels = { + '1': logging.CRITICAL, + '2': logging.ERROR, + '3': logging.WARNING, + '4': logging.INFO, + '5': logging.DEBUG, + '6': logging.NOTSET + } + + logger.setLevel(log_levels[Env.args().debug_level.upper()]) + logger.debug(str(Env.args())) + logger.debug(str(osd_ls())) + + outfile = sys.stdout + + if Env.args().out: + outfile = open(Env.args().out, 'w+') + + pref_freq = int(Env.args().freq) + freq = int(Env.args().freq) + + sleep_time = 0 + update_period = 10 + sum_time_elapsed = 0 + min_history_size = int(Env.args().min_history_size) + max_history_size = int(Env.args().max_history_size) + history_size = max_history_size + # -------------|-------------------| + history_overlap = 1.10 + osds = Env.args().osds.split(',') + osds_info = [] + for osd in osds: + _set_osd_history_size(osd, max_history_size) + osds_info.append(OsdParameters(osd, 0, freq, history_size)) + + logger.debug(f'start freq {freq}') + while True: + logger.debug(f'sleep sec {sleep_time}') + time.sleep(sleep_time) + sleep_time = freq + for osd in osds_info: + + if osd.ready_time >= time.time(): + continue + process_info = process_osd(osd.name) + + if not process_info.new_ops: + new_ops_period = 1 + else: + oldest_initiated_at = process_info.oldest_operation.initiated_at + new_ops_period = process_info.capture_period_end - _to_timestamp(oldest_initiated_at) + capture_period = process_info.capture_period_end - process_info.capture_period_start + + sum_time_elapsed += capture_period + osd.sum_ops += process_info.new_ops + + if process_info.new_ops >= max_history_size: + lost_ops = ((process_info.new_ops * capture_period) / (new_ops_period)) - process_info.new_ops + logger.debug(f'process_info.new_ops: {process_info.new_ops} capture_period: {capture_period} new_ops_period: {new_ops_period} start: {process_info.capture_period_start} end: {process_info.capture_period_end}') + osd.sum_ops += lost_ops + + + + + outfile.write(process_info.processed_info) + outfile.flush() + + sleep_time -= process_info.process_time + process_info.command_time + osd.ready_time = time.time() + (osd.freq - process_info.process_time + process_info.command_time) + sleep_time = max(0, sleep_time) + logger.info(f'osd.{osd.name} parsing dump_historic_ops with {process_info.ops_count} ops took {process_info.process_time}') + logger.info(f'osd.{osd.name} command dump_historic_ops with {process_info.ops_count} ops took {process_info.command_time}') + + osd.periods += 1 + if (osd.periods % update_period) == 0: + ops_per_period = (osd.sum_ops / osd.periods) + new_history_size = int(ops_per_period * history_overlap) + if new_history_size >= min_history_size and new_history_size <= max_history_size: + _set_osd_history_size(osd.name, new_history_size) + logger.info(f'changing osd.{osd.name} history size from {osd.history_size} to {new_history_size}') + osd.history_size = new_history_size + elif new_history_size < min_history_size: + _set_osd_history_size(osd.name, min_history_size) + logger.info(f'changing osd.{osd.name} history size from {osd.history_size} to {min_history_size}') + logger.info(f'changing osd.{osd.name} freq from {freq} to {pref_freq}') + freq = pref_freq + osd.history_size = min_history_size + elif new_history_size >= max_history_size: + new_freq = (new_history_size * freq) / osd.history_size + logger.info(f'increasing freq from {freq} to {new_freq}. new_history_size: {new_history_size} osd.history_size: {osd.history_size}') + freq = new_freq + + close(outfile) + +if __name__ == '__main__': + main() -- 2.39.5