]> git-server-git.apps.pok.os.sepia.ceph.com Git - cephmetrics.git/commitdiff
base: add local fork of admin_socket function add-timeout
authorPaul Cuzner <pcuzner@redhat.com>
Wed, 6 Sep 2017 20:18:48 +0000 (08:18 +1200)
committerPaul Cuzner <pcuzner@redhat.com>
Wed, 6 Sep 2017 20:18:48 +0000 (08:18 +1200)
Main admin_socket call is blocking, and can therefore hang perf dump
requests. This change uses non-blocking socket call with a timeout so
the collector will not stall on a specific osd interaction

collectors/base.py

index 6e7e83cfb2ad593f5417ada23f6727b775025486..199859e21c5931bf7083b6a9853c5ec71019dff5 100644 (file)
@@ -5,9 +5,96 @@ import time
 import logging
 import os
 
-from ceph_daemon import admin_socket
+import socket
+import struct
+from ceph_argparse import parse_json_funcsigs, validate_command
+import select
+
+# from ceph_daemon import admin_socket
 from collectors.common import os_cmd, cmd_exists
 
+READ_CHUNK_SIZE = 4096
+
+# the ceph admin_command function currently uses a blocking socket call
+# which causes issues when the OSD doesn't respond to the perf dump command.
+#
+# To address this, cephmetrics includes a 'fork' of the admin_socket function
+# until upstream adopts a non-blocking socket approach. Prior to 'forking'
+# other options were considered; signal.SIGALRM(n) - not possible since
+# the collectors run outside of the main thread, threading.Timer - leads to
+# zombie threads, multiprocessing.Pool - generates another collectd process.
+
+def admin_socket2(asok_path, cmd, format='', timeout=1):
+    """
+
+    Local non-blocking fork of the main ceph admin_command function
+
+    Send a daemon (--admin-daemon) command 'cmd'.  asok_path is the
+    path to the admin socket; cmd is a list of strings; format may be
+    set to one of the formatted forms to get output in that form
+    (daemon commands don't support 'plain' output).
+    """
+
+    def do_sockio(path, cmd_bytes):
+        """ helper: do all the actual low-level stream I/O """
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock.setblocking(0)
+        sock.connect(path)
+        try:
+            sock.sendall(cmd_bytes + b'\0')
+            ready = select.select([sock], [], [], timeout)
+            if not ready[0]:
+                raise RuntimeError("timeout of {} secs exceeded for "
+                                   "initial response".format(timeout))
+
+            len_str = sock.recv(4)
+            if len(len_str) < 4:
+                raise RuntimeError("no data returned from admin socket")
+            l, = struct.unpack(">I", len_str)
+            sock_ret = b''
+
+            got = 0
+            while got < l:
+                # recv() receives signed int, i.e max 2GB
+                # workaround by capping READ_CHUNK_SIZE per call.
+                want = min(l - got, READ_CHUNK_SIZE)
+                ready = select.select([sock], [], [], 0.2)
+                if not ready[0]:
+                    raise RuntimeError("'payload' timeout exceeded "
+                                       "".format(timeout))
+                bit = sock.recv(want)
+                sock_ret += bit
+                got += len(bit)
+
+        except Exception as sock_e:
+            raise RuntimeError('exception: ' + str(sock_e))
+        return sock_ret
+
+    try:
+        cmd_json = do_sockio(asok_path,
+                             b'{"prefix": "get_command_descriptions"}')
+    except Exception as e:
+        raise RuntimeError(
+            'exception getting command descriptions: ' + str(e))
+
+    if cmd == 'get_command_descriptions':
+        return cmd_json
+
+    sigdict = parse_json_funcsigs(cmd_json.decode('utf-8'), 'cli')
+    valid_dict = validate_command(sigdict, cmd)
+    if not valid_dict:
+        raise RuntimeError('invalid command')
+
+    if format:
+        valid_dict['format'] = format
+
+    try:
+        ret = do_sockio(asok_path, json.dumps(valid_dict).encode('utf-8'))
+    except Exception as e:
+        raise RuntimeError('exception: ' + str(e))
+
+    return ret
+
 
 class BaseCollector(object):
 
@@ -26,6 +113,7 @@ class BaseCollector(object):
         self.version = self.get_version()
         self.error = False
         self.error_msgs = []
+        self.cmd_timeout = 1
 
         self.logger = logging.getLogger('cephmetrics')
 
@@ -42,9 +130,11 @@ class BaseCollector(object):
         start = time.time()
 
         if os.path.exists(adm_socket):
+
             try:
-                response = admin_socket(adm_socket, cmds,
-                                        format='json')
+                response = admin_socket2(adm_socket, cmds,
+                                         format='json',
+                                         timeout=self.cmd_timeout)
             except RuntimeError as e:
                 self.logger.error("admin_socket error: {}".format(e.message))
                 self.error = True