Есть несколько способов сделать это:
- Используя UDF
- Используя взрыв и собирайте
- Используя инструменты databricks
UDF
UDF - это все. Вы в основном создаете пользовательскую функцию для выполнения работы. В отличие от преобразования в набор данных он не будет создавать весь класс Doc, но вместо этого будет обрабатывать только соответствующие данные:
def f(posts: Seq[Row]): Seq[Post] = {
posts.map(r => Post(r.getAs[Int](0), r.getAs[Long](1))).filter(p => p.createdTime > 3 && p.createdTime < 9))
}
val u = udf(f _)
val filtered = df.withColumn("posts", u($"posts"))
Использование explode и collect_list
df.withColumn("posts", explode($"posts")).filter($"posts.createdTime" > 3 && $"posts.createdTime" < 9).groupBy("test").agg(collect_list("posts").as("posts"))
Это, вероятно, менее эффективно, чем предыдущий, но это один лайнер (и в какой-то момент в будущем он может быть оптимизирован).
Использование инструментов databricks
Если вы работаете над облаком databricks, вы можете использовать функции более высокого порядка. см. здесь для получения дополнительной информации. Так как это не общий общий вариант искры, я не буду его пересматривать.
Надеюсь, в будущем они интегрируют его в стандартную искру (я нашел эту джиру по этому вопросу, но в настоящее время она не поддерживается).