Уроки, полученные при обработке Википедии с помощью Apache Spark

1656619231 uroki poluchennye pri obrabotke vikipedii s pomoshhyu apache spark

Содержание статьи

автор Сиддхеш Ране

1*KVxCfnROdLEWVwI3NLecFA

Apache Spark – это отказоустойчивая кластерно-вычислительная платформа с открытым исходным кодом, которая также поддерживает аналитику SQL, машинное обучение и обработку графиков.

Он работает, разбивая ваши данные на разделы, а затем обрабатывая эти разделы параллельно всем узлам кластера. Если какой-либо узел выходит из строя, он переназначает задание этого узла другому узлу и, следовательно, обеспечивает устойчивость к отказам.

Будучи в 100 раз быстрее, чем Hadoop, сделал его очень популярным для обработки больших данных. Spark написан на Scala и работает на JVM, но хорошая новость состоит в том, что он также предоставляет API для Python и R, а также C#. Это хорошо документировано с примерами, которые вы должны проверить.

Когда вы будете готовы попробовать, эта статья поможет вам от загрузки и настройки до настройки производительности. Мой крошечный кластер Spark выполнил 100 миллионов совпадений строк для всех статей в Википедии – менее чем за два часа.

Когда вы пройдете курсы и выполняете серьезную работу, вы осознаете все трудности используемого технологического стека. Обучение на ошибках – лучший способ учиться. Но иногда у вас просто не хватает времени, и вам хочется знать все, что может пойти не так.

Здесь я описываю некоторые проблемы, с которыми я столкнулся, начиная с Spark, и как их избежать.

Как начать

Загрузите двоичный файл Spark, поставляемый с упакованными зависимостями Hadoop

Если вы решили загрузить Spark, вы заметите, что для одной версии доступны разные двоичные файлы. Spark объявляет, что ему не нужен Hadoop, поэтому вы можете скачать меньшую по размеру версию Hadoop, предоставляемую пользователем. Не делайте этого.

Хотя Spark не использует фреймворк Hadoop MapReduce, он зависит от других библиотек Hadoop, таких как HDFS и YARN. Версия без Hadoop предназначена для тех, у кого уже есть библиотеки Hadoop, предоставленные в другом месте.

Используйте режим автономного кластера, а не Mesos или YARN

После того как вы протестируете встроенные примеры на local кластера и убедитесь, что все установлено и работает правильно, перейдите к настройке своего кластера.

Spark предоставляет вам три варианта: Mesos, YARN и автономный.

Первые два являются распределителями ресурсов, контролирующих ваши узлы-реплики. Spark должен попросить их выделить собственные экземпляры. Как новичок, не увеличивайте свою сложность, ходя по этому пути.

Автономный кластер проще всего настроить. Он имеет разумные настройки по умолчанию, такие как использование всех ваших ядер для исполнителей. Он является частью самого дистрибутива Spark и имеет a sbin/start-all.sh сценарий, который может вызвать первоначальный, а также все ваши реплики, перечисленные в conf/slaves с помощью ssh.

Mesos/YARN – это отдельные программы, которые используются, когда ваш кластер не просто кластер искры. Кроме того, они не имеют разумного значения по умолчанию: исполнители не используют все ядра в репликах, если это явно не указано.

Вы также можете использовать режим высокой доступности с помощью Zookeeper, который хранит список резервных основных источников на случай сбоя любого основного. Если вы новичок, вы, скорее всего, не обрабатываете кластер с тысячей узлов, где риск сбоя узлов значителен. Вероятно, вы создадите кластер на управляемой облачной платформе, например Amazon или Google, которая уже устраняет сбои узлов.

Вам не нужна высокая доступность с облачной инфраструктурой или небольшим кластером

У меня был создан кластер во враждебной среде, где человеческий фактор был ответственен за сбои питания, а узлы отключались от сети. (В общем, мой компьютерный класс колледжа, где старательные студенты выключают машину, а неосторожные студенты извлекают кабели локальной сети). Я все еще мог бы выполнить без высокой доступности, тщательно выбрав первоначальный узел. Вам бы не пришлось беспокоиться об этом.

Проверьте версию Java, которую вы используете для запуска Spark

Одним очень важным аспектом является версия Java, используемая для запуска Spark. Обычно более поздняя версия Java работает с чем-то скомпилированным для старых выпусков.

Но с Project Jigsaw модульность ввела более жесткую изоляцию и границы в Java 9, что нарушает определенные вещи, использующие отображение. В Spark 2.3.0, работающем на Java 9, я получил незаконный доступ к отображению. Java 8 не имело проблем.

Это определенно изменится в ближайшее время, но помните об этом до тех пор.

Укажите первичный URL-адрес так, как он есть. Не превращайте доменные имена в IP-адреса или наоборот

Автономный кластер очень чувствителен к URL, которые используются для решения первичных и реплик узлов. Предположим, вы запускаете основной узел, как показано ниже:

> sbin/start-master.sh 

и ваш основной закончился localhost:8080

0*_dPwEaOa1Sf6C5sB

По умолчанию имя хоста вашего ПК выбирается в качестве основного URL-адреса. x360 решает к localhost но запуск реплики, как показано ниже не работает.

# does not work > sbin/start-slave.sh spark://localhost:7077 
# works > sbin/start-slave.sh spark://x360:7077

Это работает, и наша реплика была добавлена ​​к кластеру:

0*rPOG-Z-x_sAhSDRB

Наша копия имеет IP-адрес в субдомене 172.17.xx, который на самом деле является доменом, установленным Docker на моей машине.

Основной может общаться с этой репликой, поскольку оба находятся на одной машине. Но реплика не может общаться с другими репликами в сети или первичной на другой машине, поскольку его IP-адрес не является маршрутизацией.

Как и в первом случае выше, реплика на машине без основного будет занимать имя хоста машины. Если у вас есть идентичные машины, все они используют то же имя хоста, что и их адрес. Это создает полный беспорядок, и никто не может общаться с другим.

Следовательно, вышеупомянутые команды будут изменены на:

# start master> sbin/start-master.sh -h $myIP # start slave > sbin/start-slave.sh -h $myIP spark://<masterIP>:7077 # submit a job > SPARK_LOCAL_IP=$myIP bin/spark-submit ...

где myIP – это IP-адрес машины, который можно маршрутизировать между узлами кластера. Скорее всего, все узлы находятся в одной сети, поэтому вы можете написать скрипт, который будет устанавливать myIP на каждой машине.

# assume all nodes in the 10.1.26.x subdomain siddhesh@master:~$ myIP=`hostname -I | tr " " "\n" | grep 10.1.26. | head`

Поток кода

Пока мы создали наш кластер и убедились, что он работает. Теперь пора кодировать. Spark достаточно хорошо задокументирован и поставляется с большим количеством примеров, поэтому начать кодировку очень легко. Менее очевидно то, как все работает, что приводит к некоторым очень сложным для отладки ошибок во время выполнения. Предположим, вы закодировали что-то вроде этого:

class SomeClass {  static SparkSession spark;  static LongAccumulator numSentences; 
 public static void main(String[] args) {    spark = SparkSession.builder()                        .appName("Sparkl")                       .getOrCreate(); (1)    numSentences = spark.sparkContext()                       .longAccumulator("sentences"); (2)    spark.read()        .textFile(args[0])        .foreach(SomeClass::countSentences); (3)  }  static void countSentences(String s) { numSentences.add(1); } (4) }

1 создать сеанс искры

2 создайте длинный счетчик, чтобы отслеживать ход работы

3 обходить файл строку за строкой, вызывая countSentences для каждой строки

4 добавьте 1 к накопителю для каждого предложения

Приведенный выше код работает на a local кластер, но выйдет из строя за исключением нулевого указателя при запуске на многоузловом кластере. Оба spark так хорошо как numSentences будет нулевым на машине-копии.

Чтобы решить эту проблему, инкапсулируйте все инициализированные состояния в нестатические поля объекта. Используйте main создать объект и отложить дальнейшую обработку на него.

Вам нужно понимать, что код, который вы пишете, запускается узлом драйвера точно так, как есть, но то, что выполняют узлы-реплики, это сериализированная задача, которую дает им искра. Ваши классы будут загружены JVM на реплику.

Статические инициализаторы будут работать, как ожидалось, но функционируют как main не будет, поэтому статические значения, инициализированные драйвером, не будут отображаться в реплике. Я не уверен, как все работает, и я делаю выводы из опыта, поэтому воспринимайте мое объяснение с недоверием. Итак, ваш код теперь выглядит так:

class SomeClass {  SparkSession spark; (1)  LongAccumulator numSentences;  String[] args;   SomeClass(String[] args) { this.args = args; }   public static void main(String[] args){    new SomeClass(args).process(); (2)  }   void process() {    spark = SparkSession.builder().appName("Sparkl").getOrCreate();   numSentences = spark.sparkContext().longAccumulator("sentences");   spark.read().textFile(args[0]).foreach(this::countSentences); (3) }  void countSentences(String s) { numSentences.add(1); }}

1 Сделайте поля нестатическими

2 создать экземпляр класса, а затем выполнить задание Spark

3 ссылка на this in the foreach lambda переносит объект в закрытие доступных объектов и, таким образом, сериализируется и посылается всем репликам.

Те из вас, кто программирует в Scala, могут использовать объекты Scala, являющиеся однотонными классами, и, следовательно, никогда не столкнутся с этой проблемой. Тем не менее, это то, что вам нужно знать.

Отправьте приложение и зависимости

Выше о кодировании больше, но перед этим вам нужно подать заявку в кластер. Если ваше приложение не очень тривиально, скорее всего, вы используете внешние библиотеки.

Когда вы отправляете jar приложения, вам также нужно сообщить Spark о зависимых библиотеках, которые вы используете, чтобы он сделал их доступными на всех узлах. Это достаточно просто. Синтаксис таков:

bin/spark-submit --packages groupId:artifactId:version,...

У меня с этой схемой проблем не было. Работает безупречно. Обычно я разрабатываю на своем ноутбуке, а затем отправляю задания из узла в кластере. Мне нужно перенести программу и ее зависимости на любой узел, на который я по ssh.

Spark ищет зависимости в локальном репозитории maven, затем в центральном репо и любых репозиториях, которые вы укажете с помощью --repositories вариант. Это немного громоздко синхронизировать все это с драйвером, а затем ввести все эти зависимости в командной строке. Поэтому я предпочитаю все зависимости, упакованные в одну банку, которая называется uber jar.

Используйте плагин Maven shade, чтобы создать uber jar со всеми зависимостями, чтобы подать задание стало проще

Просто добавьте следующие строки в свой pom.xml

<build> <plugins>  <plugin>   <groupId>org.apache.maven.plugins</groupId>   <artifactId>maven-shade-plugin</artifactId   <version>3.0.0</version>   <configuration>    <artifactSet>     <excludes>      <exclude>org.apache.spark:*</exclude>     </excludes>    </artifactSet>   </configuration>   <executions>    <execution>     <phase>package</phase&gt;     <goals>      <goal>shade</goal>     </goals>    </execution>   </executions>  </plugin> </plugins> </build>

Когда вы создаете и упаковываете свой проект, в стандартный дистрибутивный jar будут включены все зависимости.

Когда вы отправляете вакансии, банки приложений накапливаются в work каталог и заполняться со временем.

Набор spark.worker.cleanup.enabled верно в conf/spark-defaults.conf

Этот параметр по умолчанию false и применим к автономному режиму.

Входящие и исходящие файлы

Это была самая запутанная часть, которую трудно было диагностировать.

Spark поддерживает чтение/запись из разных источников, таких как hdfs, ftp, jdbc или локальные файлы в системе, когда протокол есть file:// или отсутствует. Моя первая попытка была прочитать файл из драйвера. Я предполагал, что драйвер прочтет файл, превратит его в разделы, а затем распределит их по кластеру. Оказывается, это неверно.

Когда ты read файл из локальной файловой системы, убедитесь, что файл присутствует на всех рабочих узлах точно в одном месте. Spark не явно распространяет файлы из драйвера работникам.

Мне пришлось скопировать файл каждому работнику в одном месте. Расположение файла было передано в качестве аргумента моему приложению. Поскольку файл был расположен в папке родителей, я указал его путь как ../wikiArticles.txt. Это не сработало на рабочих узлах.

Всегда передавайте абсолютные пути в файлы для чтения

С моей стороны это могла быть ошибка, но я знаю, что путь к файлу вставил его как есть textFile функцию, и это вызвало ошибку «файл не найден».

Spark поддерживает обычные схемы сжатия, поэтому большинство текстовых файлов в формате gzip или bzip будут распакованы перед использованием. Может показаться, что сжатые файлы будут более эффективными, но не попадайте на эту ловушку.

Особо не читайте из сжатых текстовых файлов gzip. Несжатые файлы обрабатываются быстрее.

Gzip нельзя распаковать параллельно, как bzip2, поэтому узлы тратят основную часть своего времени на распаковку больших файлов.

Сделать входящие файлы доступными для всех рабочих – хлопот. Вместо этого можно использовать механизм передачи файлов Spark. Отправляя задачи, укажите разделенный запятыми список входных файлов с помощью --files вариант. Для доступа к этим файлам требуется SparkFiles.get(filename). Мне не удалось найти достаточно документов по этой функции.

Чтобы прочитать файл, который транслируется с помощью --files вариант, использование SparkFiles.get(<onlyFileNameNotFullPath>) как путь в функциях чтения.

Итак, файл, представленный как --files /opt/data/wikiAbstracts.txt будет доступен как SparkFiles.get("WikiAbstracts.txt"). Это возвращает строку, которую вы можете использовать в любой функции чтения, ожидающей путь. Опять же не забудьте указать абсолютные пути.

Поскольку мой входящий файл имел 5 Гб заархивированного файла, а моя сеть была довольно медленной со скоростью 12 МБ/с, я попытался использовать функцию передачи файлов Spark. Но самая декомпрессия длилась так долго, что я вручную скопировал файл каждому работнику. Если сеть достаточно быстрая, вы можете использовать файлы без сжатия. Или используйте HDFS или FTP-сервер.

Запись файлов также соответствует семантике чтения. Я хранил свой DataFrame в файле CSV в локальной системе. У меня снова было предположение, что результаты будут отправлены обратно на узел драйвера. У меня не сработало.

Когда DataFrame сохраняется в локальном пути к файлу, каждый работник сохраняет свои вычисленные разделы на своем диске. Данные не возвращаются водителю

Я получил лишь часть результатов, которых ожидал. Сначала я неправильно диагностировал эту проблему как ошибку в моем коде. Позже я узнал, что каждый работник сохраняет свои вычисленные результаты на собственном диске.

Перегородки

Количество созданных вами разделов влияет на производительность. По умолчанию Spark создаст столько разделов, сколько ядер в кластере. Это не всегда оптимально.

Следите за тем, сколько работников активно обрабатывают задачи. Если слишком мало, увеличьте количество разделов.

Если вы читаете из файла gzipped, Spark создает только один раздел, который будет обрабатывать только один работник. Это также одна из причин, почему файлы gzip медленно обрабатываются. Я наблюдал более низкую производительность с небольшим количеством больших разделов по сравнению с большим количеством маленьких разделов.

При чтении данных лучше четко указать количество разделов.

Возможно, вам не придется делать это при чтении с HDFS, поскольку файлы Hadoop уже разделены.

Википедия и DBpedia

Нет никаких случиться здесь, но я подумал, что было бы хорошо сообщить вам об альтернативах. Весь дамп xml в Википедии имеет 14 ГБ сжатого и 65 ГБ несжатого. В большинстве случаев вам нужен простой текст статьи, но дамп находится в разметке MediaWiki, поэтому требует предварительной обработки. Для этого существует много инструментов на разных языках. Хотя я лично ими не пользовался, я уверен, что это занимает много времени. Но есть альтернативы.

Если все, что вам нужно, это открытый текст статьи Википедии, предпочтительно для НЛП, загрузите набор данных, предоставленный DBpedia.

Я использовал полный дамп статьи (NIF Context) доступен на DBpedia (прямая загрузка отсюда). Этот набор избавляет от нежелательных элементов, таких как таблицы, информационные блоки и ссылки. Сжатая загрузка составляет 4,3 Гб turtle формат. Вы можете скрыть это tsv как так

Подобные наборы данных доступны другим свойствам, таким как ссылки на страницы, тексты привязки и т.д. Проверьте DBpedia.

Несколько слов о базах данных

Я никогда не понимал, почему существует большое количество баз данных, которые все так похожи, а кроме того, люди покупают лицензии на базы данных. До этого проекта я серьезно не пользовался. Я когда-нибудь использовал только MySQL и Apache Derby.

Для своего проекта я использовал базу данных SPARQL с тройным магазином, Apache Jena TDB, доступ к которой осуществляется через REST API, обслуживаемый Jena Fuseki. Эта база данных даст мне URL-адреса, метки и предикаты RDF для всех ресурсов, упомянутых в данной статье. Каждый узел совершит вызов базы данных и только потом приступит к дальнейшей обработке.

Моя рабочая нагрузка стала привязана к IO, поскольку я видел почти 0% использования ЦБ на рабочих узлах. Каждый раздел данных приведет к двум запросам SPARQL. В самом худшем случае обработка одного из двух запросов занимала 500–1000 секунд. К счастью, база данных TDB опирается на отображение памяти Linux. Я мог бы отразить всю БД в оперативной памяти и значительно улучшить производительность.

Если вы привязаны к IO и ваша база данных может поместиться в оперативную память, запустите ее в памяти.

Я нашел инструмент под названием vmtouch, показывающий, какой процент каталога базы данных был отражен в памяти. Этот инструмент также позволяет вам явно сопоставлять любые файлы/каталоги в оперативную память и по желанию заблокировать ее, чтобы она не удалялась.

Моя база данных объемом 16 ГБ могла бы легко уместиться в мой сервер с 32 ГБ оперативной памяти. Это повысило производительность запроса на порядки до 1–2 секунд по запросу. Используя элементарную форму балансировки нагрузки базы данных на основе номера раздела, я мог бы сократить время выполнения вдвое, используя 2 сервера SPARQL вместо одного.

Вывод

Мне действительно понравились распределенные вычисления на Spark. Без этого я бы не мог завершить свой проект. Было достаточно легко взять мое существующее приложение и запустить его на Spark. Я однозначно рекомендую всем попробовать.

Первоначально опубликовано на siddheshrane.github.io.

Добавить комментарий

Ваш адрес email не будет опубликован.