Где предложение в Spark между элементами для элемента в массиве struct?


1 принят

Есть несколько способов сделать это:

  • Используя UDF
  • Используя взрыв и собирайте
  • Используя инструменты databricks

UDF

UDF - это все. Вы в основном создаете пользовательскую функцию для выполнения работы. В отличие от преобразования в набор данных он не будет создавать весь класс Doc, но вместо этого будет обрабатывать только соответствующие данные:

def f(posts: Seq[Row]): Seq[Post] = {
  posts.map(r => Post(r.getAs[Int](0), r.getAs[Long](1))).filter(p => p.createdTime > 3 && p.createdTime < 9))
}
val u = udf(f _)
val filtered = df.withColumn("posts", u($"posts"))

Использование explode и collect_list

df.withColumn("posts", explode($"posts")).filter($"posts.createdTime" > 3 && $"posts.createdTime" < 9).groupBy("test").agg(collect_list("posts").as("posts"))

Это, вероятно, менее эффективно, чем предыдущий, но это один лайнер (и в какой-то момент в будущем он может быть оптимизирован).

Использование инструментов databricks

Если вы работаете над облаком databricks, вы можете использовать функции более высокого порядка. см. здесь для получения дополнительной информации. Так как это не общий общий вариант искры, я не буду его пересматривать.
Надеюсь, в будущем они интегрируют его в стандартную искру (я нашел эту джиру по этому вопросу, но в настоящее время она не поддерживается).

Scala, апаша-искра, улей, где,

scala,apache-spark,hive,where,

1

Ответов: 1


1 принят

Есть несколько способов сделать это:

  • Используя UDF
  • Используя взрыв и собирайте
  • Используя инструменты databricks

UDF

UDF - это все. Вы в основном создаете пользовательскую функцию для выполнения работы. В отличие от преобразования в набор данных он не будет создавать весь класс Doc, но вместо этого будет обрабатывать только соответствующие данные:

def f(posts: Seq[Row]): Seq[Post] = {
  posts.map(r => Post(r.getAs[Int](0), r.getAs[Long](1))).filter(p => p.createdTime > 3 && p.createdTime < 9))
}
val u = udf(f _)
val filtered = df.withColumn("posts", u($"posts"))

Использование explode и collect_list

df.withColumn("posts", explode($"posts")).filter($"posts.createdTime" > 3 && $"posts.createdTime" < 9).groupBy("test").agg(collect_list("posts").as("posts"))

Это, вероятно, менее эффективно, чем предыдущий, но это один лайнер (и в какой-то момент в будущем он может быть оптимизирован).

Использование инструментов databricks

Если вы работаете над облаком databricks, вы можете использовать функции более высокого порядка. см. здесь для получения дополнительной информации. Так как это не общий общий вариант искры, я не буду его пересматривать.
Надеюсь, в будущем они интегрируют его в стандартную искру (я нашел эту джиру по этому вопросу, но в настоящее время она не поддерживается).

Scala, апаша-искра, улей, где,
Похожие вопросы
Яндекс.Метрика