У нас серьезная проблема. Мы хотели, чтобы наши работники добирались до работы во время сна (именно это и есть их работа), но на самом деле мы не можем заставить их работать. Понятия не имею, о чем я? Часть 1 можно прочитать здесь, где я более спокойно описываю ситуацию. Или пропустите, если вас это не беспокоит — мы должны исправить этот беспорядок, прежде чем он выйдет из-под контроля.

Краткий обзор обстоятельств: мы отправляем сообщения в очередь задач RabbitMQ, и эти сообщения подхватываются рабочими процессами, которые затем переходят в спящий режим на количество секунд, указанное . в сообщении — чем больше число . х, тем дольше они спят.

Однако, если мы отправляем им сообщение, содержащее слово «работа», они не слишком радуются этому. К счастью, мы можем просто отправить сообщение, содержащее слово «еда», и они смогут обрабатывать любые новые «рабочие» сообщения, поступающие в их сторону. Но любые старые рабочие сообщения будут потеряны навсегда. Чтобы избежать потери сообщений, мы собираемся создать оболочку вокруг RabbitMQ под названием TortoiseMQ, которая обеспечит хранилище для ошибочных сообщений, а также гибкий способ повторного запуска определенных сбоев как можно скорее.

Мы скоро углубимся в детали, но вот вкратце о TortoiseMQ:

Часть 1. Изменения интерфейса

Прежде чем что-либо реализовать, давайте внесем несколько изменений в способ создания и потребления сообщений. Начнем с замены обычных методов channel.basic_publish и channel.basic_consume.
Теперь наш производитель выглядит так:

И наш потребитель:

Проницательный наблюдатель мог заметить, что мы больше не передаем такие параметры, как exchange и routing_key, и это потому, что наша реализация TortoiseMQ предполагает, что обмен по умолчанию и очередь task_queue являются значениями по умолчанию, и это работает для нас. Мы всегда можем сделать TortoiseMQ более настраиваемым, но сейчас мы не хотим отвлекаться.

Кроме того, нам нужен какой-то способ сообщить TortoiseMQ, удалось ли потребителю. Мы собираемся изменить обычную функцию обратного вызова, которую ожидает RabbitMQ, и вместо того, чтобы отправлять подтверждение на сервер, она вместо этого будет возвращать объект ответа TortoiseMQ после выполнения задачи. Наш обратный вызов worker теперь выглядит так:

Теперь TortoiseMQ несет ответственность за правильную обработку этих возвращаемых значений и отправку подтверждения, чтобы сообщение было удалено из очереди.

Часть 2. Общие сведения о хранилище сообщений

Теперь, вот где это становится сложно. tmq_basic_consume необходимо прослушивать входящие сообщения, запускать обратный вызов, указанный в потребителе, и, если есть ошибка, записывать подробности в хранилище для повторного запуска в будущем. Для нашего хранилища сообщений мы собираемся использовать ElasticSearch, поскольку он обеспечивает очень гибкий способ запроса нужных нам сообщений на основе различных условий.

Мы начнем с запуска ElasticSearch и консоли Kibana, чтобы мы могли опробовать запросы. Docker помогает нам и здесь:

ЭластичныйПоиск:

docker run --name es01-test --net elastic -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” docker.elastic.co/elasticsearch/elasticsearch:7.12.1

Консоль Кибаны:

docker run --name kib01-test --net elastic -p 5601:5601 -e “ELASTICSEARCH_HOSTS=http://es01-test:9200" docker.elastic.co/kibana/kibana:7.12.1

Теперь мы можем открыть инструменты разработки консоли, посетив http://localhost:5601/app/dev_tools#/console, где мы можем опробовать запросы.

В ElasticSearch вы можете вставлять документы в индексы. Индекс — это то, как ElasticSearch классифицирует данные, и его можно рассматривать как очень легкую базу данных, которую вы можете создать несколько раз без каких-либо проблем. Мы можем вставить документ, выполнив следующий запрос:

Это создает документ с указанными полями сообщения и состояния в индексе с именем a-new-index.

Мы можем получить информацию об индексе и написать довольно сложные запросы для поиска документов в индексе:

На самом деле, этот поисковый запрос очень похож на тот, который собирается использовать TortoiseMQ — сопоставление подстроки в сообщении об ошибке. Вот как будет выглядеть наш документ с подробной информацией об ошибке:

{
          "id" : "8zqdOHkBW88sDuOe1MBU",
          "queue_name" : "task_queue",
          "error_count" : 1,
          "status" : "error",
          "message" : "work",
          "errorMessage" : "Received work, crash!"
}

Мы будем использовать поле errorMessage для запроса ошибочных сообщений, а тело ошибочного сообщения будет извлечено из поля сообщения. Поле идентификатора будет использоваться для обновления состояния или количества ошибок при последующих повторных попытках отправки того же сообщения.

Часть 3 — Продюсер

TortoiseMQ ожидает, что все сообщения будут в указанном выше формате, поэтому он может легко анализировать и обновлять материал. tmq_produce просто оборачивает указанное сообщение соответствующими начальными значениями для остальных полей и помещает его в очередь задач. Это так просто.

Часть 4. Оболочка обратного вызова

Помните, чем наш интерфейс обратного вызова отличается от интерфейса RabbitMQ? Ну, так как мы используем RabbitMQ под капотом, он все еще ожидает нормальный интерфейс. Таким образом, вместо непосредственного выполнения обратного вызова, указанного потребителем, TortoiseMQ имеет собственный метод обратного вызова, который выполняется каждый раз при потреблении сообщения.

Эта оболочка выполняет исходный обратный вызов, используя сообщение, указанное в поле сообщения, и решает, что делать с ответом об ошибке. О, и это также отправляет подтверждение на канал.

Вот как выглядит basic_consume. Это может выглядеть пугающе, но все, что он делает, это указывает анонимную функцию (лямбда) для выполнения. Да, и сама лямбда выполняет tmq_callback_wrapper с нашей указанной функцией обратного вызова.

def tmq_basic_consume(self, channel, callback):
channel.basic_consume(queue='task_queue', on_message_callback=lambda ch, method, properties, body: self.__tmq_callback_wrapper(ch, method, properties, body, callback))

Я не собираюсь показывать вам tmq_callback_wrapper здесь, но все, что он делает, это извлекает сообщение из тела, выполняет наш обратный вызов и вызывает функции для входа в ElasticSearch, когда это необходимо.

Часть 5. Регистрация ошибок

Если наш обратный вызов возвращает ответ об ошибке TortoiseMQ, нам нужно обновить наше хранилище. Здесь нужно иметь дело с двумя сценариями. Если мы получаем это сообщение впервые (не как часть повторного запуска), то мы вставляем новый документ, содержащий сведения об ошибке, и с числом ошибок, равным 1:

insert_response = es.index(index='tmq-logs', body=tmq_message)

Мы используем поле _id, сгенерированное ElasticSearch, для нашего поля id, поэтому после вставки документа мы получаем id и обновляем наш документ:

tmq_message['id'] = insert_response['_id']
es.update(index='tmq-logs', id=insert_response['_id'], body={"doc": tmq_message})

С другой стороны, если мы получили ошибку при повторном запуске, нам нужно обновить сведения об ошибке существующего документа. Причина, по которой мы это делаем, заключается в том, что мы не запускаем повторяющиеся сообщения при каждом возникновении ошибки.

es.update(index='tmq-logs', id=tmq_message['id'], body={"doc": tmq_message})

Часть 6. Регистрация успеха

Мы не всегда регистрируем успех, но когда мы это делаем, это происходит потому, что что-то пошло не так, и мы перезапустили его. Итак, теперь нам нужно обновить наш документ, чтобы сообщение больше не появлялось. Это просто вопрос установки поля состояния в tmq_message на «успех» и запуска es.update.

Часть 7. Неудачные повторные попытки

Отличная работа! В этой статье нет всех подробностей реализации, но она должна содержать достаточно информации, чтобы вы могли написать свою собственную! Но если вас это не беспокоит, исходный код TortoiseMQ доступен на GitHub вместе с примерами производителя и потребителя, о которых мы здесь упоминали.

Вы можете использовать эту реализацию, чтобы опробовать некоторые сообщения и просмотреть журналы ошибок в ElasticSearch. Когда вы будете готовы (или нет), мы можем поработать над последним этапом: повторной отправкой ошибочных сообщений.

Наш образец клиента повторной попытки представляет собой базовый скрипт Python, который принимает в качестве аргумента командной строки сообщение об ошибке, которое мы хотим повторить.

python tortoise_mq_retrigger.py work crash!

Запуск этого скрипта вызывает функцию tmq_retrigger, которая запрашивает ElasticSearch, используя запрос, аналогичный приведенному выше примеру поиска. Он получает все ответы и публикует их, используя basic_publish RabbitMQ. Мы используем RabbitMQ, а не производителя TortoiseMQ, поскольку ответы уже содержат нужные нам сведения об ошибках, поэтому нет необходимости их снова оборачивать.

res = es.search(index="tmq-logs", body={"query": {"bool": { "must": [{"match": {"errorMessage": message}}, { "match": { "status": "error!" }}]}}})
[self.__publish_tmq_message(tmq_message['_source'], channel=channel) for tmq_message in res['hits']['hits']]

message определяется аргументами tortoise_mq_retrigger, а __publish_tmq_message просто вызывает basic_publish RabbitMQ. Потребители TortoiseMQ получают упакованное сообщение и выполняют его в обычном режиме.

Вы заметите, что эта реализация не предлагает большой гибкости, но вы всегда можете реализовать свою собственную, которая позволяет выполнять более сложные запросы.

Ух ты, сколько усилий ушло только на то, чтобы не потерять сообщения! Конечно, могут быть случаи, когда потеря сообщений может быть гораздо более катастрофической, чем отсутствие сна у нескольких работников. И если вы столкнетесь с такой ситуацией, помните, TortoiseMQ здесь, чтобы помочь!