VALUE_EXECUTING = "executing"
_executing_tasks = set()
- _finished_tasks = set()
+ _finished_tasks = []
_lock = threading.Lock()
_task_local_data = threading.local()
logger.info("TM: finished %s", task)
with cls._lock:
cls._executing_tasks.remove(task)
- cls._finished_tasks.add(task)
+ cls._finished_tasks.append(task)
@classmethod
def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None):
value.
"""
now = datetime.now()
- # list of finished tasks that are older than TTL
- to_remove = [t for t in task_list
- if now - datetime.fromtimestamp(t.end_time) >
- timedelta(seconds=cls.FINISHED_TASK_TTL)]
- to_remove.sort(key=lambda t: t.end_time, reverse=True)
- for task in to_remove[cls.FINISHED_TASK_SIZE:]:
- cls._finished_tasks.remove(task)
+ for idx, t in enumerate(task_list):
+ if idx < cls.FINISHED_TASK_SIZE:
+ continue
+ if now - datetime.fromtimestamp(t[1].end_time) > \
+ timedelta(seconds=cls.FINISHED_TASK_TTL):
+ del cls._finished_tasks[t[0]]
@classmethod
def list(cls, name_glob=None):
for task in cls._executing_tasks:
if not name_glob or fnmatch.fnmatch(task.name, name_glob):
executing_tasks.append(task)
- for task in cls._finished_tasks:
+ for idx, task in enumerate(cls._finished_tasks):
if not name_glob or fnmatch.fnmatch(task.name, name_glob):
- finished_tasks.append(task)
+ finished_tasks.append((idx, task))
+ finished_tasks.sort(key=lambda t: t[1].end_time, reverse=True)
cls._cleanup_old_tasks(finished_tasks)
executing_tasks.sort(key=lambda t: t.begin_time, reverse=True)
- finished_tasks.sort(key=lambda t: t.end_time, reverse=True)
- return executing_tasks, finished_tasks
+ return executing_tasks, [t[1] for t in finished_tasks]
@classmethod
def list_serializable(cls, ns_glob=None):