from mgr_module import MgrModule
from .controllers.auth import Auth
-from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool
+from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool, \
+ NotificationQueue
from . import logger
self.configure_cherrypy()
cherrypy.engine.start()
+ NotificationQueue.start_queue()
logger.info('Waiting for engine...')
+ self.log.info('Waiting for engine...')
cherrypy.engine.block()
logger.info('Engine done')
def shutdown(self):
logger.info('Stopping server...')
+ NotificationQueue.stop()
cherrypy.engine.exit()
logger.info('Stopped server')
return (-errno.EINVAL, '', 'Command not found \'{0}\''
.format(cmd['prefix']))
+ def notify(self, notify_type, notify_id):
+ NotificationQueue.new_notification(notify_type, notify_id)
+
class ApiRoot(object):
def __init__(self, mgrmod):
--- /dev/null
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import random
+import time
+import unittest
+
+
+from ..tools import NotificationQueue
+
+
+class Listener(object):
+ def __init__(self):
+ NotificationQueue.register(self.log_type1, 'type1')
+ NotificationQueue.register(self.log_type2, 'type2')
+ NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
+ NotificationQueue.register(self.log_all)
+ self.type1 = []
+ self.type2 = []
+ self.type1_3 = []
+ self.all = []
+
+ # these should be ignored by the queue
+ NotificationQueue.register(self.log_type1, 'type1')
+ NotificationQueue.register(self.log_type1_3, ['type1', 'type3'])
+ NotificationQueue.register(self.log_all)
+
+ def log_type1(self, val):
+ self.type1.append(val)
+
+ def log_type2(self, val):
+ self.type2.append(val)
+
+ def log_type1_3(self, val):
+ self.type1_3.append(val)
+
+ def log_all(self, val):
+ self.all.append(val)
+
+ def clear(self):
+ self.type1 = []
+ self.type2 = []
+ self.type1_3 = []
+ self.all = []
+
+
+class NotificationQueueTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.listener = Listener()
+
+ def setUp(self):
+ self.listener.clear()
+
+ def test_invalid_register(self):
+ with self.assertRaises(Exception) as ctx:
+ NotificationQueue.register(None, 1)
+ self.assertEqual(str(ctx.exception),
+ "types param is neither a string nor a list")
+
+ def test_notifications(self):
+ NotificationQueue.start_queue()
+ NotificationQueue.new_notification('type1', 1)
+ NotificationQueue.new_notification('type2', 2)
+ NotificationQueue.new_notification('type3', 3)
+ NotificationQueue.stop()
+ self.assertEqual(self.listener.type1, [1])
+ self.assertEqual(self.listener.type2, [2])
+ self.assertEqual(self.listener.type1_3, [1, 3])
+ self.assertEqual(self.listener.all, [1, 2, 3])
+
+ def test_notifications2(self):
+ NotificationQueue.start_queue()
+ for i in range(0, 600):
+ typ = "type{}".format(i % 3 + 1)
+ if random.random() < 0.5:
+ time.sleep(0.002)
+ NotificationQueue.new_notification(typ, i)
+ NotificationQueue.stop()
+ for i in range(0, 500):
+ typ = i % 3 + 1
+ if typ == 1:
+ self.assertIn(i, self.listener.type1)
+ self.assertIn(i, self.listener.type1_3)
+ elif typ == 2:
+ self.assertIn(i, self.listener.type2)
+ elif typ == 3:
+ self.assertIn(i, self.listener.type1_3)
+ self.assertIn(i, self.listener.all)
+
+ self.assertEqual(len(self.listener.type1), 200)
+ self.assertEqual(len(self.listener.type2), 200)
+ self.assertEqual(len(self.listener.type1_3), 400)
+ self.assertEqual(len(self.listener.all), 600)
# pylint: disable=W0212
from __future__ import absolute_import
+import collections
import importlib
import inspect
import json
import os
import pkgutil
import sys
+import threading
import six
import cherrypy
if name in cookie:
del cookie[name]['expires']
del cookie[name]['max-age']
+
+
+class NotificationQueue(threading.Thread):
+ _ALL_TYPES_ = '__ALL__'
+ _listeners = collections.defaultdict(set)
+ _lock = threading.Lock()
+ _cond = threading.Condition()
+ _queue = collections.deque()
+ _running = False
+ _instance = None
+
+ def __init__(self):
+ super(NotificationQueue, self).__init__()
+
+ @classmethod
+ def start_queue(cls):
+ with cls._lock:
+ if cls._instance:
+ # the queue thread is already running
+ return
+ cls._running = True
+ cls._instance = NotificationQueue()
+ cls._instance.start()
+
+ @classmethod
+ def stop(cls):
+ with cls._lock:
+ if not cls._instance:
+ # the queue thread was not started
+ return
+ instance = cls._instance
+ cls._instance = None
+ cls._running = False
+ with cls._cond:
+ cls._cond.notify()
+ instance.join()
+
+ @classmethod
+ def register(cls, func, types=None):
+ """Registers function to listen for notifications
+
+ If the second parameter `types` is omitted, the function in `func`
+ parameter will be called for any type of notifications.
+
+ Args:
+ func (function): python function ex: def foo(val)
+ types (str|list): the single type to listen, or a list of types
+ """
+ with cls._lock:
+ if not types:
+ cls._listeners[cls._ALL_TYPES_].add(func)
+ return
+ if isinstance(types, str):
+ cls._listeners[types].add(func)
+ elif isinstance(types, list):
+ for typ in types:
+ cls._listeners[typ].add(func)
+ else:
+ raise Exception("types param is neither a string nor a list")
+
+ @classmethod
+ def new_notification(cls, notify_type, notify_value):
+ cls._queue.append((notify_type, notify_value))
+ with cls._cond:
+ cls._cond.notify()
+
+ @classmethod
+ def notify_listeners(cls, events):
+ for ev in events:
+ notify_type, notify_value = ev
+ with cls._lock:
+ listeners = list(cls._listeners[notify_type])
+ listeners.extend(cls._listeners[cls._ALL_TYPES_])
+ for listener in listeners:
+ listener(notify_value)
+
+ def run(self):
+ while self._running:
+ private_buffer = []
+ try:
+ while True:
+ private_buffer.append(self._queue.popleft())
+ except IndexError:
+ pass
+ self.notify_listeners(private_buffer)
+ with self._cond:
+ self._cond.wait(1.0)
+ # flush remaining events
+ self.notify_listeners(self._queue)
+ self._queue.clear()