]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
telemetry: misc scripts
authorSage Weil <sage@newdream.net>
Tue, 20 Aug 2019 22:23:39 +0000 (22:23 +0000)
committerSage Weil <sage@newdream.net>
Fri, 4 Oct 2019 20:55:32 +0000 (20:55 +0000)
Signed-off-by: Sage Weil <sage@newdream.net>
src/telemetry/dump_from_es.py [new file with mode: 0644]
src/telemetry/es_dump_to_file.py [new file with mode: 0644]
src/telemetry/file_to_pg.py [new file with mode: 0644]
src/telemetry/proc_reports.py [new file with mode: 0644]

diff --git a/src/telemetry/dump_from_es.py b/src/telemetry/dump_from_es.py
new file mode 100644 (file)
index 0000000..5703480
--- /dev/null
@@ -0,0 +1,11 @@
+import json
+from server.ceph_telemetry.rest import Report
+
+f = open('es_dump.txt', 'r')
+a = f.read()
+
+j = json.loads(a)
+reports = j['hits']['hits']
+print(len(reports))
+
+
diff --git a/src/telemetry/es_dump_to_file.py b/src/telemetry/es_dump_to_file.py
new file mode 100644 (file)
index 0000000..67016b8
--- /dev/null
@@ -0,0 +1,34 @@
+from os import path
+import json
+
+f = open('es_dump.txt', 'r')
+a = f.read()
+
+j = json.loads(a)
+reports = j['hits']['hits']
+print(len(reports))
+
+r = reports[0]
+json.dumps(r)
+
+total = len(reports)
+bad = 0
+exists = 0
+for item in reports:
+    r = item['_source']
+    ts = r.get('report_timestamp')
+    if not ts:
+        bad += 1
+        continue
+    rid = r.get('report_id')
+    assert rid
+    fn = '/opt/telemetry/raw/' + rid + '.' + ts
+    
+    if path.exists(fn):
+        exists += 1
+    else:
+        f = open(fn, 'w')
+        f.write(json.dumps(r))
+        f.close()
+
+print('total %d, bad %d, exists %d' % (total, bad, exists))
diff --git a/src/telemetry/file_to_pg.py b/src/telemetry/file_to_pg.py
new file mode 100644 (file)
index 0000000..8edbec1
--- /dev/null
@@ -0,0 +1,36 @@
+import json
+from os import listdir
+from os.path import isfile, join
+import psycopg2
+
+DIR = '/opt/telemetry/raw'
+
+files = [f for f in listdir(DIR) if isfile(join(DIR, f))]
+
+f = open('/opt/telemetry/pg_pass.txt', 'r')
+password = f.read().strip()
+f.close()
+
+conn = psycopg2.connect(
+    host='localhost',
+    database='telemetry',
+    user='telemetry',
+    password=password
+)
+cur = conn.cursor()
+
+for fn in files:
+    f = open('/opt/telemetry/raw/' + fn, 'r')
+    report = f.read()
+    f.close()
+    j = json.loads(report)
+    ts = j.get('report_timestamp')
+    if not ts:
+        continue
+    cur.execute(
+        'INSERT INTO report (cluster_id, report_stamp, report) VALUES (%s,%s,%s) ON CONFLICT DO NOTHING',
+        (j.get('report_id'),
+         ts,
+         report)
+    )
+conn.commit()
diff --git a/src/telemetry/proc_reports.py b/src/telemetry/proc_reports.py
new file mode 100644 (file)
index 0000000..98fc972
--- /dev/null
@@ -0,0 +1,95 @@
+import json
+import psycopg2
+
+f = open('/opt/telemetry/pg_pass.txt', 'r')
+password = f.read().strip()
+f.close()
+
+conn = psycopg2.connect(
+    host='localhost',
+    database='telemetry',
+    user='telemetry',
+    password=password
+)
+rcur = conn.cursor()
+rcur.execute("SELECT cluster_id, report FROM report")
+for row in rcur.fetchall():
+    cluster_id = row[0]
+    report = json.loads(row[1])
+    ts = report.get('report_timestamp')
+
+    ccur = conn.cursor()
+
+    # cluster
+    ccur.execute("SELECT cluster_id,latest_report_stamp FROM cluster WHERE cluster_id=%s", (cluster_id,))
+    crows = ccur.fetchall()
+    update = True
+    if crows:
+        crow = crows[0]
+        if str(crow[1]) > ts:
+            print('cluster %s already has newer report %s' % (cluster_id, crow[1]))
+            update = False
+
+    if update:
+        print('updating %s' % (cluster_id))
+        num_pgs = 0
+        for pool in report.get('pools', []):
+            num_pgs += pool.get('pg_num', 0)
+        ccur.execute(
+            "INSERT INTO cluster (cluster_id, latest_report_stamp, num_mon, num_osd, num_pools, num_pgs, total_bytes, total_used_bytes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (cluster_id) DO UPDATE SET latest_report_stamp=%s, num_mon=%s, num_osd=%s, num_pools=%s, num_pgs=%s, total_bytes=%s, total_used_bytes=%s",
+            (cluster_id,
+             ts,
+             report.get('mon', {}).get('count', 0),
+             report.get('osd', {}).get('count', 0),
+             report.get('usage', {}).get('pools', 0),
+             num_pgs,
+             report.get('usage', {}).get('total_bytes', 0),
+             report.get('usage', {}).get('total_used_bytes', 0),
+             ts,
+             report.get('mon', {}).get('count', 0),
+             report.get('osd', {}).get('count', 0),
+             report.get('usage', {}).get('pools', 0),
+             num_pgs,
+             report.get('usage', {}).get('total_bytes', 0),
+             report.get('usage', {}).get('total_used_bytes', 0)),)
+
+        # cluster_version
+        ccur.execute(
+            "DELETE FROM cluster_version WHERE cluster_id=%s",
+            (cluster_id,)
+        )
+        for (entity_type, info) in report.get('metadata', {}).items():
+            for (version, num) in info.get('ceph_version', {}).items():
+                ccur.execute(
+                    "INSERT INTO cluster_version (cluster_id, entity_type, version, num_daemons) VALUES (%s, %s, %s, %s)",
+                    (cluster_id,
+                     entity_type,
+                     version,
+                     num,))
+
+    # crash
+    crashes = report.get('crashes', [])
+    if isinstance(crashes, dict):
+        tmp = []
+        for c in crashes.valeus():
+            tmp.append(c)
+        crashes = tmp
+
+    for crash in crashes:
+        crash_id = crash.get('crash_id')
+        if not crash_id:
+            continue
+        stack = str(crash.get('backtrace'))
+        ccur.execute(
+            "INSERT INTO crash (crash_id, cluster_id, raw_report, timestamp, entity_name, version, stack) values (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING",
+            (crash_id,
+             cluster_id,
+             json.dumps(crash, indent=4),
+             crash.get('timestamp'),
+             crash.get('entity_name'),
+             crash.get('ceph_version'),
+             stack,
+            ))
+
+    conn.commit()
+