Apache Storm потрясающий. Вот почему (и как) вы должны его использовать.

1656613971 apache storm potryasayushhij vot pochemu i kak vy dolzhny ego

автор Усама Ашраф

1*mI5DoLNNv5Z84E2-0SR-0g
Источник: https://pxhere.com/ru/photo/77064

Непрерывные потоки данных распространены повсеместно и становятся еще больше с увеличением количества используемых устройств IoT. Конечно, это означает, что огромные объемы данных хранятся, обрабатываются и анализируются для получения прогнозируемых результатов, которые можно действовать.

Но анализ петабайтов данных занимает много времени, даже с помощью таких инструментов как Hadoop (насколько хорошо может быть MapReduce) или Spark (средство от ограничений MapReduce).

Зачастую нам не нужно выводить закономерности в течение длительного периода времени. Из петабайтов входных данных, собранных за месяцы, в любой момент нам может не понадобиться учитывать все, а только снимок в реальном времени. Возможно, нам не нужно знать самый продолжительный хэштег за пять лет, а только тот, который сейчас.

Это то, для чего создан Apache Storm, чтобы принимать тонны поступающих чрезвычайно быстро, возможно, из разных источников, анализировать их и публиковать обновления в интерфейсе пользователя или другого места… без сохранения каких-либо фактических данных.

Эта статья не является окончательным руководством Apache Storm и не предназначена для этого. Storm достаточно огромен, и только одно долгое чтение, пожалуй, не сможет сделать его справедливым. Конечно, любые дополнения, отзывы или конструктивная критика будут очень признательны.

Ладно, теперь, когда это не так, давайте посмотрим, что мы рассмотрим:

  • Необходимость Шторма, «почему» это, что это такое, а что нет
  • Как это работает с высоты птичьего полета.
  • Как примерно выглядит топология Storm в коде (Java)
  • Настройка кластера Storm, достойного производства и воспроизведения на Docker.
  • Несколько слов о надежности обработки сообщений.

Я также предполагаю, что вы хотя бы немного знакомы с Docker и контейнеризацией.

Как это работает

Архитектуру Apache Storm сравнима с сетью дорог, соединяющих набор контрольно-пропускных пунктов. Движение начинается на определенном контрольно-пропускном пункте (так называемом a носик) и проходит через другие контрольно-пропускные пункты (наз болты).

Конечно, трафик – это поток данных, получаемый носик (из источника данных, например, из общедоступного API) и перенаправлен на разные болты где данные фильтруются, очищаются, агрегируются, анализируются и отправляются в пользовательский интерфейс для просмотра (или к любой другой цели).

Сеть носиков и болтов называется а топологияа данные подаются в виде кортежей (список значений, которые могут иметь разные типы).

1*pJ4nzdWQJ5wLqrUsBvkUQQ
Источник: https://dzone.com/articles/apache-storm-architecture

Важно поговорить о направлении трафика данных.

Обычно у нас есть один или несколько носиков, которые считывают данные API, системы очереди и т.д. Данные тогда поступают односторонний к одному или нескольким болтам, которые могут перенаправлять его на другие болты и т.п. Bolts может публиковать проанализированные данные в пользовательском интерфейсе или в другой болт.

Но трафик почти всегда однонаправлен, как ориентированный ациклический граф (DAG). Хотя, безусловно, можно производить циклы, нам вряд ли нужна такая запутанная топология.

Настройка выпуска Storm включает в себя ряд шагов, которые вы можете выполнить на своем компьютере. Но позже я буду использовать контейнеры Docker для развертывания кластера Storm, а изображения настроят все, что нам нужно.

Какой код

Хотя Storm предлагает поддержку других языков, большинство топологий написано на Java, поскольку это самый эффективный вариант, который у нас есть.

Очень простой носик, просто выдающий случайные цифры, может выглядеть так:

И простой болт, принимающий поток случайных цифр и издающий только парные:

Еще один простой болт, получающий отфильтрованный поток EvenDigitBoltи просто умножьте каждую четную цифру на 10 и передайте ее вперед:

Соединив их вместе, чтобы сформировать нашу топологию:

Параллелизм в топологиях Storm

Полное понимание параллелизма в Storm может быть сложным, по крайней мере из моего опыта. Для работы топологии требуется по крайней мере один процесс. В этом процессе мы можем параллельно выполнять выполнение наших носиков и болтов с помощью резьбы.

В нашем примере RandomDigitSpout запустит только один поток, а выбрасываемые из этого потока данные будут распределены между двумя потоками. EvenDigitBolt.

Но способ этого распределения называется группирование потоков, может быть важным. Например, у вас может быть поток записей температуры из двух городов, где кортежи, выпускаемые носиком, выглядят так:

// City name, temperature, time of recording
(“Atlanta”,       94, “2018–05–11 23:14”)(“New York City”, 75, “2018–05–11 23:15”)(“New York City”, 76, “2018–05–11 23:16”)(“Atlanta”,       96, “2018–05–11 23:15”)(“New York City”, 77, “2018–05–11 23:17”)(“Atlanta”,       95, “2018–05–11 23:16”)(“New York City”, 76, “2018–05–11 23:18”)

Допустим, мы прикрепляем только один болт, задачей которого является вычисление изменения средней температуры каждого города.

Если мы можем разумно ожидать, что в любой данный интервал времени мы получим примерно равное количество кортежей из обоих городов, имеет смысл посвятить два потока нашему болту. Мы можем отправить данные для Атланты одному из них, а Нью-Йорку другому.

А группировка полей будет служить нашей цели, разделяющей данные между потоками по значению поля, указанного в группировке:

// The tuples with the same city name will go to the same thread.builder.setBolt(“avg-temp-bolt”, new AvgTempBolt(), 2)       .fieldsGrouping(“temp-spout”, new Fields(“city_name”));

И, конечно, есть и другие типы группировок. Однако в большинстве случаев группировка, вероятно, не будет иметь большого значения. Вы можете просто перетасовать данные и бросить их среди нитей болтов случайным образом (перемешивать группировку).

Теперь в этом еще один важный компонент: количество рабочих процессов, на которых будет работать наша топология.

Общее количество потоков, которое мы указали, будет поровну разделено между рабочими процессами. Итак, в нашем примере топологии со случайными цифрами мы имели одну резьбу носика, две парные резьбовые резьбы болтов и четыре резьбы болтов умножения на десять (всего семь).

Каждый из двух рабочих процессов будет отвечать за выполнение двух нитей болтов умножения на десять, одного болта с четной цифрой, а один из процессов будет выполнять одну нить носика.

1*E9IoN2Mjur31Bn-NCgrwAg

Конечно, два рабочих процесса будут иметь свои основные потоки, которые, в свою очередь, будут запускать резьбу носика и болта. Итак, всего у нас будет девять потоков. Они совместно называются исполнители.

Важно понимать, что если вы установите подсказку параллельности носика больше единицы (несколько исполнителей), вы можете получить одни и те же данные несколько раз.

Скажем, носик читает из общедоступного API потока Twitter и использует два исполнителя. Это означает, что болты, получающие данные из носика, получат один и тот же твит дважды. Это только после носик выпускает кортежи, входящие в игру. Другими словами, кортежи распределяются между болтами согласно указанному группированию потоков.

Запуск нескольких рабочих на одном узле было бы довольно глупо. Однако позже мы используем правильный распределенный кластер со многими узлами и посмотрим, как работники разделены на разные узлы.

Построение нашей топологии

Вот структуру каталогов, которую я предлагаю:

yourproject/            pom.xml             src/                jvm/                    packagename/                          RandomDigitSpout.java                          EvenDigitBolt.java                          MultiplyByTenBolt.java                          OurSimpleTopology.java

Maven обычно используется для построения топологий Storm, и он требует a pom.xml файл (POM), определяющий различные детали конфигурации, зависимости проекта и т.д. Вникать в тонкости POM, по всей вероятности, будет лишним.

  • Сначала мы побежим mvn clean внутри yourproject чтобы очистить все скомпилированные файлы, которые мы можем иметь, убедившись, что компилировали каждый модуль с нуля.
  • И затем mvn package чтобы скомпилировать наш код и упаковать его в исполняемый файл JAR, внутри только что созданного target папку. В первый раз это может занять несколько минут, особенно если у вашей топологии много зависимостей.
  • Чтобы представить нашу топологию: storm jar target/packagename-{version number}.jar packagename.OurSimpleTopology

Надеемся, что разрыв между концепцией и кодом в Storm был несколько преодолен. Однако ни одно серьезное развертывание Storm не будет одним экземпляром топологии, запущенным на одном сервере.

Как выглядит скопление Шторма

Чтобы в полной мере воспользоваться преимуществами масштабируемости и отказоустойчивости Storm, любая топология производственного уровня будет представлена ​​классером машин.

Дистрибутивы Storm устанавливаются на первичном узле (Nimbus) и на всех узлах реплик (Supervisors).

The первичный узел запускает демон Storm Nimbus и пользовательский интерфейс Storm. The реплика узлы запускают демоны Storm Supervisor. Демон Zookeeper на отдельном узле используется для координации между основным узлом и узлами реплики.

Zookeeper, кстати, используется только для управления кластерами и никогда не передает никаких сообщений. Это не то, что носики и болты посылают данные друг другу через него или что-нибудь подобное. Демон Nimbus находит доступных супервизоров через ZooKeeper, в котором демоны Supervisor регистрируются. Он также выполняет другие управленческие задачи, некоторые из которых вскоре станут понятными.

1*vMWbOJP4LE0upnSYHIojSg

Пользовательский интерфейс Storm — это веб-интерфейс, который используется для управления состоянием нашего кластера. К этому мы придем позже.

Наша топология передается демону Nimbus на первичном узле, а затем распределяется между рабочими процессами, запущенными на узлах реплики/супервизора. Благодаря Zookeeper не имеет значения сколько узлов реплики/супервизора вы запускаете сначала, поскольку вы всегда можете легко добавить больше. Storm автоматически интегрирует его в кластер.

Каждый раз, когда мы запускаем Supervisor, он выделяет определенное количество рабочих процессов (которые мы можем настроить). Затем они могут быть использованы по представленной топологии. Следовательно, на изображении выше в целом выделено пять работников.

Запомните эту строчку: conf.setNumWorkers(5)

Это означает, что топология будет пытаться использовать всего пять рабочих. И поскольку наши два узла Supervisor имеют в общей сложности пять выделенных рабочих, каждый из 5 выделенных рабочих процессов будет запускать один экземпляр топологии.

Если бы мы бегали conf.setNumWorkers(4) тогда один рабочий процесс остался бы неактивным/неиспользованным. Если бы количество указанных работников составляло шесть, а общее количество выделенных рабочих составляло пять, то из-за ограничений функционировали бы пять фактических рабочих топологии.

Прежде чем мы настроим все это с помощью Docker, следует помнить о нескольких важных вещах по отказоустойчивости:

  • Если работник на любом узле реплики погибнет, демон Supervisor перезапустит его. Если перезапуск не удается, работник будет переназначен на другую машину.
  • Если целый узел реплики погибнет, его часть работы будет передана другому супервизору/узлу реплики.
  • Если Nimbus рухнет, рабочие не пострадают. Однако пока Nimbus не будет восстановлен, работники не будут переназначены на другие узлы-копии, если, скажем, их узел выйдет из строя.
  • Nimbus и Supervisors сами являются лицами без гражданства. Но у Zookeeper некоторая информация о состоянии сохраняется, чтобы все могло начаться там, где было остановлено, если узел аварийно погибнет или демон неожиданно погибнет.
  • Демоны Nimbus, Supervisor и Zookeeper являются быстрыми. Это означает, что они сами не очень толерантны к неожиданным ошибкам и отключаются, если сталкиваются с таковой. По этой причине их нужно запускать под наблюдением с помощью программы сторожевого таймера, которая постоянно контролирует их и автоматически перезапускает их при сбое. Supervisord, пожалуй, самый популярный вариант для этого (не путать с демоном Storm Supervisor).

Примечание: у большинства кластеров Storm сам Nimbus никогда не разворачивается как один экземпляр, а как кластер. Если эта отказоустойчивость не будет включена и наш единственный Nimbus выйдет из строя, мы потеряем возможность подавать новые топологии, изящно уничтожать запущенные топологии, переназначать работу другим узлам Supervisor в случае сбоя и т.д.

Для простоты наш иллюстративный кластер будет использовать один экземпляр. Аналогично, Zookeeper очень часто разворачивается как кластер, но мы будем использовать только один.

Докеризация кластера

Запуск отдельных контейнеров и всего, что с ними идет, может быть громоздким, поэтому я предпочитаю использовать Docker Compose.

Сначала мы будем использовать один узел Zookeeper, один узел Nimbus и один узел Supervisor. Они будут определены как сервисы Compose, все сначала будут соответствовать одному контейнеру.

Позже я буду использовать зум Compose, чтобы добавить еще один узел Supervisor (контейнер). Вот весь код и структура проекта:

zookeeper/          Dockerfilestorm-nimbus/          Dockerfile          storm.yaml          code/               pom.xml               src/                   jvm/                       coincident_hashtags/                                  ExclamationTopology.java storm-supervisor/          Dockerfile          storm.yamldocker-compose.yml

И наши docker-compose.yml:

Не стесняйтесь изучить файлы Dockerfiles. Они в основном просто устанавливают зависимости (Java 8, Storm, Maven, Zookeeper) на соответствующие контейнеры.

The storm.yaml файлы заменяют определенные конфигурации по умолчанию для установки Storm. Линия ADD storm.yaml /conf внутри файлов Docker Nimbus и Supervisor помещает их в контейнеры, где Storm может их прочесть.

storm-nimbus/storm.yaml:

storm-supervisor/storm.yaml:

Эти параметры достаточно для нашего кластера. Если вам интересно, вы можете проверить все конфигурации по умолчанию здесь.

Беги docker-compose up в корне проекта.

После того, как все образы были созданы и запущена вся служба, откройте новый терминал, введите docker ps и вы увидите нечто подобное:

1*prWdSPqapMJY96SLJfQ-kQ

Начало Nimbus

Давайте введем SSH в контейнер Nimbus, используя его название:

docker exec -it coincidenthashtagswithapachestorm_storm-nimbus_1 bash

А потом запустите демона Nimbus: storm nimbus

1*bzBTGBMRYoJmLHR6xx_SRQ

Запуск интерфейса Storm

Аналогично откройте другой терминал, SSH снова в Nimbus и запустите пользовательский интерфейс с помощью storm ui:

1*v2h_5x3U8v3p7vmTrQ9cAg

Идти в localhost:8080 в вашем браузере, и вы увидите хороший обзор нашего кластера:

1*pY-0-U1VccLEMDz1UaW4hg

«Свободные слоты» в итоге кластера указывают, сколько всего рабочих (на всех узлах супервизора) доступны и ожидают, пока топология их использует.

«Использованные слоты» указывает, сколько из общего количества на данный момент занято топологией. Поскольку мы еще не запустили ни одного Supervisors, они оба нулевые. Мы доберемся Исполнители и Задача позже. Также, как видим, топология еще не представлена.

Запуск узла супервизора

SSH в один контейнер Supervisor и запустите демон Supervisor:

docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_1 bashstorm supervisor 
1*AazN5gMmeSgH6FTSy3zyvw

Теперь давайте обновим наш пользовательский интерфейс:

1*hhfrf7G50tsjnOYCnHdRDg

Примечание. Любые изменения в нашем кластере могут отобразиться в интерфейсе через несколько секунд.

У нас есть новый работающий супервизор, который имеет четыре назначенных работника. Эти четыре рабочих данных являются результатом определения четырех портов в нашем storm.yaml для узла Supervisor. Конечно, все они бесплатны (четыре бесплатных слота).

Давайте представим топологию Nimbus и запустим их.

Представление топологии в Nimbus

SSH в Nimbus на новом терминале. Я написал Dockerfile, чтобы мы попали в наш рабочий (посадочный) каталог /theproject. Внутри это codeгде находится наша топология.

Наша топология достаточно проста. Он использует носик, который генерирует случайные слова, и болт, который просто добавляет три восклицательных знака (!!!) к словам. Два из этих болтов прилагаются друг к другу, и поэтому в конце потока мы получим слова с шестью восклицательными знаками. Он также указывает, что ему нужны три работника (conf.setNumWorkers(3)).

Выполните эти команды:

1. cd code
2. mvn clean
3. mvn package
4. storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology

После успешной отправки топологии обновите пользовательский интерфейс:

1*DpLhx5dMl7uThJgEjDXACw

Как только мы подали топологию, смотритель зоопарка был сообщен. Смотритель зоопарка, в свою очередь, известил Супервайзера о загрузке кода из Nimbus. Теперь мы видим нашу топологию вместе с тремя занятыми работниками, оставляя всего одного свободным.

И десять резьбовых нитей слов + три нити болта exclaim1 + две нити exclaim болта + три основных нити от рабочих = всего 18 исполнителей.

И, возможно, вы заметили что-нибудь новое: задача.

Что такое задача?

Задача – это еще одно понятие в параллелизме Шторма. Но не переживайте, задача – это только экземпляр носика или болта, которым пользуется исполнитель. Конкретно они практически занимаются обработкой.

По умолчанию количество задач равно количеству исполнителей. В редких случаях вам может понадобиться, чтобы каждый исполнитель создавал больше задач.

// Each of the two executors (threads) of this bolt will instantiate// two objects of this bolt (total 4 bolt objects/tasks).builder.setBolt(“even-digit-bolt”, new EvenDigitBolt(), 2)       .setNumTasks(4)        .shuffleGrouping(“random-digit-spout”);
1*yJchF0mgDnzPTnvz39siew

С моей стороны, это недостаток, но я не могу придумать хороший вариант использования, если бы нам понадобилось несколько задач на одного исполнителя.

Возможно, если бы мы сами добавили некоторый параллелизм, например создание нового потока внутри болта для обработки долговременного задания, тогда основной поток исполнителя не будет блокироваться и сможет продолжить обработку с помощью другого болта.

Однако это может усложнить понимание нашей топологии. Если кто-нибудь знает сценарии, когда повышение производительности от нескольких задач преобладает дополнительную сложность, пожалуйста, опубликуйте комментарий.

В любом случае, возвращаясь с этого небольшого обходного пути, давайте посмотрим на обзор нашей топологии. Щелкните имя в разделе «Сводка топологии» и прокрутите вниз в «Ресурсы рабочих»:

1*2HQTiqg0xBhQITZFH-teYg

Мы хорошо видим распределение наших исполнителей (нитей) между тремя рабочими. И, конечно, все три работника работают на одном и том же узле Supervisor, который мы используем.

Теперь, скажем, масштабировать!

Добавить другого руководителя

Из корня проекта добавим еще один узел/контейнер Supervisor:

docker-compose scale storm-supervisor=2

SSH в новый контейнер:

docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_2 bash

И запали: storm supervisor

1*DzxzUU7HgrKYiaTiSFLQUg

Если вы обновите пользовательский интерфейс, вы увидите, что мы успешно добавили еще одного супервизора и еще четырех работников (всего восемь работников/слотов). Чтобы воспользоваться преимуществами нового Supervisor, давайте увеличим количество работников топологии.

  • Сначала убейте бегающего: storm kill exclamation-topology
  • Измените эту строку на: conf.setNumWorkers(6)
  • Измените номер версии проекта в своем pom.xml. Попробуйте использовать правильную схему, например семантическое управление версиями. Я просто соблюдаю 1.2.1.
  • Перестройте топологию: mvn package
  • Отправьте его повторно: storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology

Перезагрузите пользовательский интерфейс:

1*j09T9vFkeyRu7ptWuTlClQ

Теперь вы можете увидеть нового руководителя и шестерых занятых из восьми доступных работников.

Также важно отметить, что шесть занятых были поровну распределены между двумя надзирателями. Снова нажмите имя топологии и прокрутите вниз.

1*8nTM2oNM-PwU7QggAAQEyg

Мы видим два уникальных идентификатора супервизора, оба работают на разных узлах, и все наши исполнители достаточно равномерно распределены между ними. Это восхитительно.

Но у Storm есть еще один отличный способ сделать это при работе топологии — перебалансировка.

На Nimbus мы бы бегали:

storm rebalance exclamation-topology -n 6

Или изменить количество исполнителей для определенного компонента:

storm rebalance exclamation-topology -e even-digit-bolt=3

Надежная обработка сообщений

Один из вопросов, которые мы не рассмотрели, касается того, что произойдет, если болт не сможет обработать кортеж.

Шторм предоставляет нам механизм, с помощью которого возникает носик (в частности, Задача) может воспроизвести неудачный кортеж. Эта гарантия обработки не возникает сама по себе. Это осознанный выбор дизайна, добавляющего задержку.

Носицы посылают кортежи к болтам, выпускающим кортежи, полученные от входных кортежей, к другим болтам и т.п. Этот оригинальный кортеж провоцирует целое дерево кортежей.

Если любой дочерний кортеж, так сказать, выходного выходит из строя, то любые шаги по исправлению (откат и т.п.), возможно, придется предпринять несколькими болтами. Это может стать довольно мохнатым, и потому что делает Storm, так это позволяет оригинальному кортежу снова излучаться прямо из источника (носика).

Следовательно, любые операции, выполняемые болтами, являющимися функцией входных кортежей, должны быть идемпотентными.

Кортеж считается полностью обработанным, когда каждый кортеж в его дереве был обработан, и каждый кортеж должен быть явно подтвержден болтами.

Однако, это еще не все. Есть еще одна вещь, которую нужно сделать очевидно: поддерживать связь между исходным кортежем и его дочерними кортежами. Тогда Storm сможет проследить происхождение дочерних кортежей и таким образом сможет воспроизвести оригинальный кортеж. Это называется закрепление. И это было сделано с помощью нашего восклицания:

// ExclamationBolt
// ‘tuple’ is the original one received from the test word spout.// It’s been anchored to/with the tuple going out._collector.emit(tuple, new Values(exclamatedWord.toString()));
// Explicitly acknowledge that the tuple has been processed._collector.ack(tuple);

The ack вызов приведет к ack метод на носик, который вызывается, если он был реализован.

Итак, скажем, что вы читаете данные кортежа из некоторой очереди, и вы можете удалить их из очереди только если кортеж был полностью обработан. The ack метод, где бы вы это сделали.

Вы также можете выпускать кортежи без привязки:

_collector.emit(new Values(exclamatedWord.toString())) 

и отказаться от надежности.

Кортеж может выйти из строя двумя способами:

  1. Болт умирает, а кортеж кончается. Или это заканчивается по какой-то другой причине. Время ожидания по умолчанию составляет 30 секунд, и его можно изменить с помощью config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60)
  2. The fail метод явно вызывается для кортежа в болте: _collector.fail(tuple). Вы можете сделать это в случае исключения.

В обоих этих случаях, fail метод на носик будет вызван, если он реализован. И если мы хотим, чтобы кортеж воспроизводился, это нужно было сделать явно в файле fail метод по вызову emitкак и в nextTuple(). При отслеживании кортежей должен быть каждый ackред или failред. Иначе в топологии в конечном счете закончится память.

Также важно знать, что вы должны делать все это самостоятельно, когда пишете нестандартные носики и болты. Но ядро ​​Storm может помочь. К примеру, болт, реализующий BaseBasicBolt, автоматически выполняет подтверждение. Или встроенные носики для популярных источников данных, таких как Kafka, заботятся о очереди и логике воспроизведения после подтверждения и сбоя.

Выстрелы на прощание

Разработка топологии или кластера Storm всегда заключается в настройке различных ручек, которые у нас есть, и установке, где результат кажется оптимальным.

Есть несколько вещей, которые помогут в этом процессе, например использование файла конфигурации для чтения подсказок относительно параллелизма, количества работников и т.д., чтобы вам не пришлось многократно редактировать и перекомпилировать свой код.

Определите свои болты логически, по одному на неделимую задачу и держите их легкими и эффективными. Аналогично ваши носики nextTuple() методы следует оптимизировать.

Эффективно используйте пользовательский интерфейс Storm. По умолчанию он не показывает нам полную картину, только 5% от общего количества выпущенных кортежей. Чтобы контролировать их все, используйте config.setStatsSampleRate(1.0d).

Следите за Acks и Задержка значение для отдельных болтов и топологий через пользовательский интерфейс. Это то, на что вы хотите смотреть при настройке параметров.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *