from __future__ import absolute_import
import collections
+import datetime
import importlib
import inspect
import json
import os
import pkgutil
import sys
+import time
import threading
import six
return self._mgr_module
+# pylint: disable=too-many-instance-attributes
+class ViewCache(object):
+ VALUE_OK = 0
+ VALUE_STALE = 1
+ VALUE_NONE = 2
+ VALUE_EXCEPTION = 3
+
+ class GetterThread(threading.Thread):
+ def __init__(self, view, fn, args, kwargs):
+ super(ViewCache.GetterThread, self).__init__()
+ self._view = view
+ self.event = threading.Event()
+ self.fn = fn
+ self.args = args
+ self.kwargs = kwargs
+
+ # pylint: disable=broad-except
+ def run(self):
+ try:
+ t0 = time.time()
+ val = self.fn(*self.args, **self.kwargs)
+ t1 = time.time()
+ except Exception as ex:
+ logger.exception("Error while calling fn=%s ex=%s", self.fn,
+ str(ex))
+ self._view.value = None
+ self._view.value_when = None
+ self._view.getter_thread = None
+ self._view.exception = ex
+ else:
+ with self._view.lock:
+ self._view.latency = t1 - t0
+ self._view.value = val
+ self._view.value_when = datetime.datetime.now()
+ self._view.getter_thread = None
+ self._view.exception = None
+
+ self.event.set()
+
+ class RemoteViewCache(object):
+ # Return stale data if
+ STALE_PERIOD = 1.0
+
+ def __init__(self, timeout):
+ self.getter_thread = None
+ # Consider data within 1s old to be sufficiently fresh
+ self.timeout = timeout
+ self.event = threading.Event()
+ self.value_when = None
+ self.value = None
+ self.latency = 0
+ self.exception = None
+ self.lock = threading.Lock()
+
+ def run(self, fn, args, kwargs):
+ """
+ If data less than `stale_period` old is available, return it
+ immediately.
+ If an attempt to fetch data does not complete within `timeout`, then
+ return the most recent data available, with a status to indicate that
+ it is stale.
+
+ Initialization does not count towards the timeout, so the first call
+ on one of these objects during the process lifetime may be slower
+ than subsequent calls.
+
+ :return: 2-tuple of value status code, value
+ """
+ with self.lock:
+ now = datetime.datetime.now()
+ if self.value_when and now - self.value_when < datetime.timedelta(
+ seconds=self.STALE_PERIOD):
+ return ViewCache.VALUE_OK, self.value
+
+ if self.getter_thread is None:
+ self.getter_thread = ViewCache.GetterThread(self, fn, args,
+ kwargs)
+ self.getter_thread.start()
+
+ ev = self.getter_thread.event
+
+ success = ev.wait(timeout=self.timeout)
+
+ with self.lock:
+ if success:
+ # We fetched the data within the timeout
+ if self.exception:
+ # execution raised an exception
+ return ViewCache.VALUE_EXCEPTION, self.exception
+ return ViewCache.VALUE_OK, self.value
+ elif self.value_when is not None:
+ # We have some data, but it doesn't meet freshness requirements
+ return ViewCache.VALUE_STALE, self.value
+ # We have no data, not even stale data
+ return ViewCache.VALUE_NONE, None
+
+ def __init__(self, timeout=5):
+ self.timeout = timeout
+ self.cache_by_args = {}
+
+ def __call__(self, fn):
+ def wrapper(*args, **kwargs):
+ rvc = self.cache_by_args.get(args, None)
+ if not rvc:
+ rvc = ViewCache.RemoteViewCache(self.timeout)
+ self.cache_by_args[args] = rvc
+ return rvc.run(fn, args, kwargs)
+ return wrapper
+
+
class RESTController(BaseController):
"""
Base class for providing a RESTful interface to a resource.