Spark объединяет/объединяет массивы в groupBy/aggregate

Следующий код Spark правильно демонстрирует то, что я хочу сделать, и генерирует правильный вывод с крошечным набором демонстрационных данных.

Когда я запускаю тот же общий тип кода на большом объеме производственных данных, у меня возникают проблемы во время выполнения. Задание Spark выполняется в моем кластере примерно 12 часов и завершается сбоем.

Просто взглянув на приведенный ниже код, кажется неэффективным взрывать каждую строку, просто чтобы объединить ее обратно. В заданном наборе тестовых данных четвертая строка с тремя значениями в array_value_1 и тремя значениями в array_value_2 будет расширена до 3*3 или девяти развернутых строк.

Итак, в большом наборе данных строка с пятью такими столбцами массива и десятью значениями в каждом столбце разорвется на 10 ^ 5 развернутых строк?

Глядя на предоставленные функции Spark, нет готовых функций, которые делали бы то, что я хочу. Я мог бы предоставить пользовательскую функцию. Есть ли в этом недостатки скорости?

val sparkSession = SparkSession.builder.
  master("local")
  .appName("merge list test")
  .getOrCreate()

val schema = StructType(
  StructField("category", IntegerType) ::
    StructField("array_value_1", ArrayType(StringType)) ::
    StructField("array_value_2", ArrayType(StringType)) ::
    Nil)

val rows = List(
  Row(1, List("a", "b"), List("u", "v")),
  Row(1, List("b", "c"), List("v", "w")),
  Row(2, List("c", "d"), List("w")),
  Row(2, List("c", "d", "e"), List("x", "y", "z"))
)

val df = sparkSession.createDataFrame(rows.asJava, schema)

val dfExploded = df.
  withColumn("scalar_1", explode(col("array_value_1"))).
  withColumn("scalar_2", explode(col("array_value_2")))

// This will output 19. 2*2 + 2*2 + 2*1 + 3*3 = 19
logger.info(s"dfExploded.count()=${dfExploded.count()}")

val dfOutput = dfExploded.groupBy("category").agg(
  collect_set("scalar_1").alias("combined_values_2"),
  collect_set("scalar_2").alias("combined_values_2"))

dfOutput.show()

person clay    schedule 14.09.2016    source источник


Ответы (1)


Это может быть неэффективно для explode, но по сути операция, которую вы пытаетесь реализовать, просто дорогая. По сути, это просто еще один groupByKey, и здесь мало что можно сделать, чтобы улучшить его. Поскольку вы используете Spark> 2.0, вы можете collect_list напрямую и сгладить:

import org.apache.spark.sql.functions.{collect_list, udf}

val flatten_distinct = udf(
  (xs: Seq[Seq[String]]) => xs.flatten.distinct)

df
  .groupBy("category")
  .agg(
    flatten_distinct(collect_list("array_value_1")), 
    flatten_distinct(collect_list("array_value_2"))
  )

В Spark >= 2.4 вы можете заменить udf композицией встроенных функций:

import org.apache.spark.sql.functions.{array_distinct, flatten}

val flatten_distinct = (array_distinct _) compose (flatten _)

Также можно использовать custom Aggregator, но я сомневаюсь, что какой-либо из них будет иметь огромное значение.

Если наборы относительно велики и вы ожидаете значительного количества дубликатов, вы можете попробовать использовать aggregateByKey с изменяемыми наборами:

import scala.collection.mutable.{Set => MSet}

val rdd = df
  .select($"category", struct($"array_value_1", $"array_value_2"))
  .as[(Int, (Seq[String], Seq[String]))]
  .rdd

val agg = rdd
  .aggregateByKey((MSet[String](), MSet[String]()))( 
    {case ((accX, accY), (xs, ys)) => (accX ++= xs, accY ++ ys)},
    {case ((accX1, accY1), (accX2, accY2)) => (accX1 ++= accX2, accY1 ++ accY2)}
  )
  .mapValues { case (xs, ys) => (xs.toArray, ys.toArray) }
  .toDF
person zero323    schedule 14.09.2016
comment
Первое решение простого плоского udf полностью устранило проблему. Spark прошел путь от ~12 часов до сбоя до успешного завершения всей работы за 30 минут. Наблюдая за графическим интерфейсом монитора Spark, каждая из внутренних задач запускается и завершается за минуту или меньше. Спасибо за помощь в этом. - person clay; 14.09.2016
comment
Я рад это слышать, хотя должен признать, что удивлен. Я ожидал небольшого улучшения, но ничего столь впечатляющего. Насколько велики отдельные списки? - person zero323; 14.09.2016
comment
Вы сэкономили мне часы поиска... Большое спасибо! - person Mike Reiche; 10.05.2020