Как получить неудачные задания в сельдерее?

Я использую сельдерей для обработки некоторых задач. Я могу видеть, сколько из них активны или запланированы и т. Д., Но я не могу найти способ увидеть задачи, которые потерпели неудачу. Flower действительно показывает мне статус, но только если он был запущен, когда задание было запущено и провалилось. Есть ли какая-либо команда, чтобы получить все задачи, которые не удалось (STATUS: FAILURE)?

У меня есть идентификатор задачи, когда задача была создана. Но их миллионы. Поэтому я не могу проверить одну за другой, даже если есть способ проверить это по идентификатору задачи. Но если есть такая команда, пожалуйста, дайте мне знать.

2

Решение

Сельдерей не позволяет легко найти неудачную задачу, но Цветок (главное веб-приложение для управления Celery) действительно упрощает это. Он ведет учет идентификаторов задач даже после их завершения и имеет API, позволяющий находить только неудачные задачи.

Довольно простой HTTP API от Flower включает /api/tasks конечная точка — ты можешь использовать /api/tasks?state=FAILURE чтобы показать только невыполненные задачи, затем проанализируйте JSON, чтобы извлечь то, что вам нужно. Содержимое аналогично тому, что вы получаете в веб-API, и его легко создать с curl и форматировать / фильтровать с JQ:

curl -s 'http://localhost:5555/api/tasks?state=FAILURE&limit=5' | jq . | less

Цветок должен быть установлен и работает, конечно.

Поскольку у вас есть миллионы выполненных задач, вам может потребоваться собрать информацию о сбоях в хранилище данных для эффективного доступа — возможно, Flower поможет. Или вы можете попробовать пользовательский обработчик при сбое в Celery, чтобы получить информацию только о сбое задачи — см. этот ответ.

5

Другие решения

task id имеет state а также status свойства. Таким образом, вы можете получить статус задач по идентификатору.

my_task_id = my_task.delay(foo)
my_task_id.state
my_task_id.status

дает статус, находится ли он в состоянии ожидания, запущен, повтор, неудача или успех.

afaik, сельдерей показывает только активные, запланированные, зарезервированные, отозванные, но id не показывает невыполненные задачи

Поскольку у вас есть все идентификаторы задач, вы можете просто просмотреть их статус.

for task_id in task_id_list:
if task_id.state == 'FAILURE'
print(task_id)
1