Почему Спарк дважды обрабатывал одни и те же данные?

Я застрял SELECT date, field1, field2, ..., field10 FROM table1 WHERE field1 = <some number> AND date BETWEEN date ('2018-05-01') AND date ('2018-05-30') ORDER К 1 странной ситуации, когда простейшее возможное искровое приложение, похоже, выполнило одну и ту же работу дважды.

Что я наделал

Сам запрос выполняет запрос:

table1

и сохраняет результаты в HDFS.

Таблица /root/date=2018-05-01/hour=0/data-1.snappy.parquet /root/date=2018-05-01/hour=0/data-2.snappy.parquet ... /root/date=2018-05-01/hour=1/data-1.snappy.parquet ... /root/date=2018-05-02/hour=0/data-1.snappy.parquet ... etc. представляет собой кучу паркетных файлов, хранящихся на HDFS, и разбивается следующим образом

int

Все паркетные файлы от 700M до 2G и имеют одну и ту же схему: 10 ненулевых полей intили bigintтипов.

Результат приложения крошечный по размеру - всего несколько тысяч строк.

Мое искровое приложение работало в режиме YARN с кластерным режимом. Базовые параметры искры были

spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster

Во время выполнения было выгружено несколько контейнеров, никаких ошибок и никаких сбоев. Вся заявка завершена в одну попытку.

Странная вещь

Скриншоты из Spark UI:

  • главный экран
  • этап 2
  • этап 4

Как видно, на этапах 2 и 4 обрабатывалось одинаковое количество входных строк, но на этапе 4 также выполнялась некоторая перетасовка (это были строки результатов). Неудачными задачами являются те, которые были выгружены контейнерами.

Таким образом, похоже, что мое приложение обрабатывало одни и те же файлы дважды.

Я понятия не имею, как это возможно и что произошло. Пожалуйста, помогите мне понять, почему Spark делает такую ??странную вещь.

Фактический физический план:

== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- Coalesce 16
   +- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
         +- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
            +- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
               +- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...

Ниже приведены DAG для этапов 2 и 4:

  • этап 2
  • этап 4

apache-spark,

2

Ответов: 1


0

Я все еще не уверен, почему искра ведет себя таким образом, и я все еще копаю, но мне удалось получить то, что происходит.

Примечание: мой SQL заканчивается ORDER. Поскольку ожидается, что работа вернет очень мало строк, я думал, что окончательная сортировка должна быть легкой задачей.

Поэтому, когда я удаляю ORDERпредложение, мой запрос выполняется так, как ожидалось, и читает паркеты только один раз. Это странное поведение воспроизводимо независимо от того, насколько велик набор данных и сколько раз задачи выгружаются во время выполнения: добавление orderпредложения вызывает искру, чтобы дважды сканировать весь набор данных (по крайней мере, это похоже на это).

Я забыл упомянуть: я использую искру 2.3.0 из дистрибутива Hortonworks (HDP-2.6.5).

апаш-искра,
Похожие вопросы
Яндекс.Метрика