Apache Flink - это фреймворк для написания приложений распределенной обработки данных в реальном времени на Java / Scala / Python. Uber, Netflix, Disney и другие крупные компании используют Flink для различных целей. Недавно он был куплен Alibaba за многомиллионную сделку.

Flink вдохновлен моделью потока данных Google. Согласно этой модели, события обрабатываются сразу после их получения, а не группируются в пакет. Это позволяет легко и эффективно масштабировать структуру для миллионов событий в секунду.

Вы можете изучить и прочитать больше о Flink на их официальном сайте. В этой статье мы рассмотрим, как Flink работает поверх пряжи.

Кликните поверх ПРЯЖИ

Приложение Flink состоит из двух основных модулей: одного менеджера заданий и нескольких диспетчеров задач. Если вы знакомы с Apache Spark, Jobmanager и Taskmanager эквивалентны Driver и Executors.

Jobmanager осуществляет координацию между TaskManager. Он назначает им операции и распределяет данные в соответствии с параллелизмом. С другой стороны, диспетчеры задач - это процессы, в которых происходят фактические вычисления, такие как отображение, сокращение, объединение и т. Д.

Ниже приведена типичная команда bash, используемая для запуска задания Flink в YARN:

./bin/flink run -m yarn-cluster -d -yn 4 -ys 3 -ytm 4096m -yjm 2048m WordCount.jar

В приведенной выше команде мы говорим Flink начать работу над кластером пряжи. YARN должен назначить для TaskManager 4 контейнера JVM с памятью 4 ГБ каждый и имеющую 3 слота. Слоты аналогичны потокам JVM, но обеспечивают изоляцию памяти. Например, в приведенном выше сценарии 4 ГБ памяти будут равномерно распределены на каждый слот, и они не смогут получить доступ к памяти друг друга. Однако они не предлагают изоляцию ЦП.

После выполнения команды происходит следующее:

  1. Wordcount.jar и все файлы из каталога установки Flink lib / копируются в HDFS кластера. Это позволяет всем узлам кластера загружать файлы на свой локальный диск.
  2. В YARN выдается запрос на запуск Application Master на одном из узлов. Мастер приложений - это место, где работает менеджер вакансий.
  3. Мастер приложений запрашивает ресурсы у диспетчера ресурсов YARN.
  4. Когда ресурсы становятся доступными, Application Master развертывает JVM TaskManager на доступных узлах кластера.
  5. Когда все диспетчеры задач исправны, JobManager начинает назначать подзадачи каждому слоту.

Что происходит при сбое одного из TaskManager?

Каждый раз, когда Jobmanager обнаруживает неисправный TaskManager, он перезапускает все приложение и возобновляет обработку данных из последнего состояния. Это делается только при включенной контрольной точке. При отключении задание завершается.

Количество перезапусков также ограничено свойством yarn.maximum-failed-container. Его значение по умолчанию равно количеству запрошенных диспетчеров задач. Это означает, что приложение выдержит только такое количество сбоев, после которых оно отключится. Чтобы избежать этого, вы можете установить значение config равным -1.

Что происходит, если сам менеджер вакансий выходит из строя?

Если Jobmanager неработоспособен, Yarn откажет приложению.

Чтобы избежать этого сценария, вы можете запустить Jobmanager в режиме высокой доступности (HA). В этом режиме JobManager сохраняет свое состояние в HDFS и сохраняет указатель на это состояние в zookeeper. В случае сбоя YARN запустит новый мастер приложений, который снова развернет задание и возобновит работу с последнего состояния.

Чтобы включить HA, вам нужно добавить следующие конфигурации в flink-conf.yaml.

Эта конфигурация предписывает Flink использовать zookeeper для координации при перезапуске JobManager. Jobmanager сохранит свое текущее состояние в storageDir, а указатель на это состояние - в корневом пути zookeeper. Yarn может перезапускать JobManager не более yarn.application-попытки-1 раз.

Количество перезапусков Application Master также ограничено свойством YARN yarn.resourcemanager.am.max-plays в yarn-site.xml, которое по умолчанию равно 2. Вы можете установить это значение на -1 для бесконечного попытки, хотя и не рекомендуется.

Как посмотреть логи приложения?

Журналы можно просмотреть, открыв файлы taskmanager.log на отдельных узлах кластера или выполнив команду

yarn logs -applicationId appId123

Проблема с последним заключается в том, что журналы доступны только после завершения задания.
Flink использует log4j для ведения журнала, поэтому вы также можете использовать KafkaAppender для отправки журналов в Kafka и сохранения их в HDFS / S3 для анализа в реальном времени. Чтобы включить это, вы можете просто добавить следующее в файл log4j.properties в папке lib Flink.

Если вы хотите узнать больше по этой теме, вы можете перейти по ссылкам ниже:

  1. Официальная документация
  2. Модель потока данных: практический подход к уравновешиванию правильности, задержки и затрат при крупномасштабной, неограниченной, неупорядоченной обработке данных
  3. Учебное пособие по Hadoop YARN - изучение основ архитектуры YARN

Свяжитесь со мной в LinkedIn или Twitter или напишите письмо на [email protected]