From d5b1698c9e6cf73188616955698d8c35b1220b4a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 20 Aug 2019 22:23:39 +0000 Subject: [PATCH] telemetry: misc scripts Signed-off-by: Sage Weil --- src/telemetry/dump_from_es.py | 11 ++++ src/telemetry/es_dump_to_file.py | 34 ++++++++++++ src/telemetry/file_to_pg.py | 36 ++++++++++++ src/telemetry/proc_reports.py | 95 ++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+) create mode 100644 src/telemetry/dump_from_es.py create mode 100644 src/telemetry/es_dump_to_file.py create mode 100644 src/telemetry/file_to_pg.py create mode 100644 src/telemetry/proc_reports.py diff --git a/src/telemetry/dump_from_es.py b/src/telemetry/dump_from_es.py new file mode 100644 index 00000000000..570348044f6 --- /dev/null +++ b/src/telemetry/dump_from_es.py @@ -0,0 +1,11 @@ +import json +from server.ceph_telemetry.rest import Report + +f = open('es_dump.txt', 'r') +a = f.read() + +j = json.loads(a) +reports = j['hits']['hits'] +print(len(reports)) + + diff --git a/src/telemetry/es_dump_to_file.py b/src/telemetry/es_dump_to_file.py new file mode 100644 index 00000000000..67016b84ffc --- /dev/null +++ b/src/telemetry/es_dump_to_file.py @@ -0,0 +1,34 @@ +from os import path +import json + +f = open('es_dump.txt', 'r') +a = f.read() + +j = json.loads(a) +reports = j['hits']['hits'] +print(len(reports)) + +r = reports[0] +json.dumps(r) + +total = len(reports) +bad = 0 +exists = 0 +for item in reports: + r = item['_source'] + ts = r.get('report_timestamp') + if not ts: + bad += 1 + continue + rid = r.get('report_id') + assert rid + fn = '/opt/telemetry/raw/' + rid + '.' + ts + + if path.exists(fn): + exists += 1 + else: + f = open(fn, 'w') + f.write(json.dumps(r)) + f.close() + +print('total %d, bad %d, exists %d' % (total, bad, exists)) diff --git a/src/telemetry/file_to_pg.py b/src/telemetry/file_to_pg.py new file mode 100644 index 00000000000..8edbec1913b --- /dev/null +++ b/src/telemetry/file_to_pg.py @@ -0,0 +1,36 @@ +import json +from os import listdir +from os.path import isfile, join +import psycopg2 + +DIR = '/opt/telemetry/raw' + +files = [f for f in listdir(DIR) if isfile(join(DIR, f))] + +f = open('/opt/telemetry/pg_pass.txt', 'r') +password = f.read().strip() +f.close() + +conn = psycopg2.connect( + host='localhost', + database='telemetry', + user='telemetry', + password=password +) +cur = conn.cursor() + +for fn in files: + f = open('/opt/telemetry/raw/' + fn, 'r') + report = f.read() + f.close() + j = json.loads(report) + ts = j.get('report_timestamp') + if not ts: + continue + cur.execute( + 'INSERT INTO report (cluster_id, report_stamp, report) VALUES (%s,%s,%s) ON CONFLICT DO NOTHING', + (j.get('report_id'), + ts, + report) + ) +conn.commit() diff --git a/src/telemetry/proc_reports.py b/src/telemetry/proc_reports.py new file mode 100644 index 00000000000..98fc9720af3 --- /dev/null +++ b/src/telemetry/proc_reports.py @@ -0,0 +1,95 @@ +import json +import psycopg2 + +f = open('/opt/telemetry/pg_pass.txt', 'r') +password = f.read().strip() +f.close() + +conn = psycopg2.connect( + host='localhost', + database='telemetry', + user='telemetry', + password=password +) +rcur = conn.cursor() +rcur.execute("SELECT cluster_id, report FROM report") +for row in rcur.fetchall(): + cluster_id = row[0] + report = json.loads(row[1]) + ts = report.get('report_timestamp') + + ccur = conn.cursor() + + # cluster + ccur.execute("SELECT cluster_id,latest_report_stamp FROM cluster WHERE cluster_id=%s", (cluster_id,)) + crows = ccur.fetchall() + update = True + if crows: + crow = crows[0] + if str(crow[1]) > ts: + print('cluster %s already has newer report %s' % (cluster_id, crow[1])) + update = False + + if update: + print('updating %s' % (cluster_id)) + num_pgs = 0 + for pool in report.get('pools', []): + num_pgs += pool.get('pg_num', 0) + ccur.execute( + "INSERT INTO cluster (cluster_id, latest_report_stamp, num_mon, num_osd, num_pools, num_pgs, total_bytes, total_used_bytes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (cluster_id) DO UPDATE SET latest_report_stamp=%s, num_mon=%s, num_osd=%s, num_pools=%s, num_pgs=%s, total_bytes=%s, total_used_bytes=%s", + (cluster_id, + ts, + report.get('mon', {}).get('count', 0), + report.get('osd', {}).get('count', 0), + report.get('usage', {}).get('pools', 0), + num_pgs, + report.get('usage', {}).get('total_bytes', 0), + report.get('usage', {}).get('total_used_bytes', 0), + ts, + report.get('mon', {}).get('count', 0), + report.get('osd', {}).get('count', 0), + report.get('usage', {}).get('pools', 0), + num_pgs, + report.get('usage', {}).get('total_bytes', 0), + report.get('usage', {}).get('total_used_bytes', 0)),) + + # cluster_version + ccur.execute( + "DELETE FROM cluster_version WHERE cluster_id=%s", + (cluster_id,) + ) + for (entity_type, info) in report.get('metadata', {}).items(): + for (version, num) in info.get('ceph_version', {}).items(): + ccur.execute( + "INSERT INTO cluster_version (cluster_id, entity_type, version, num_daemons) VALUES (%s, %s, %s, %s)", + (cluster_id, + entity_type, + version, + num,)) + + # crash + crashes = report.get('crashes', []) + if isinstance(crashes, dict): + tmp = [] + for c in crashes.valeus(): + tmp.append(c) + crashes = tmp + + for crash in crashes: + crash_id = crash.get('crash_id') + if not crash_id: + continue + stack = str(crash.get('backtrace')) + ccur.execute( + "INSERT INTO crash (crash_id, cluster_id, raw_report, timestamp, entity_name, version, stack) values (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING", + (crash_id, + cluster_id, + json.dumps(crash, indent=4), + crash.get('timestamp'), + crash.get('entity_name'), + crash.get('ceph_version'), + stack, + )) + + conn.commit() + -- 2.39.5