Convertir un DataFrame de Spark a Map de Scala

Quizás no sea lo más usual pero eso no significa que nunca nos veamos en la necesidad de crear un Map (Collection) de Scala a partir de los valores de un DataFrame. En esta entrada veremos algunas de las formas que tenemos para construir un Map a partir de un DataFrame. Para ello contaremos con el siguiente supuesto, Tenemos un conjunto de datos de personas y entre la información que poseen estará presente la provincia donde viven y nos piden generar un Map que contabilice el número de personas agrupadas por provincia. Vamos a ello

Vamos a empezar creando un DataFrame que será nuestro conjunto de datos de origen

val data = Seq(("Jose", "Rodríguez", "MADRID", 40, 74, 1.63, "Ingeniero de Sistemas"),
        ("María Isabel", "Perez", "MADRID", 38, 58, 1.65, "Médico"),
        ("Antonio José", "Rodríguez", "VALENCIA", 27, 60, 1.68, "Nutricionista"),
        ("Norma", "MARTINEZ", "TOLEDO", 63, 65, 1.54, "Chef"),
        ("Adriana", "Peña Hernandez", "PONTEVEDRA", 59, 59, 1.56, "Economista"),
        ("Jesús", "Marquez", "ALICANTE", 33, 67, 1.78, "Administrador"),
        ("Josefina", "Petón Marquez", "VALENCIA", 25, 54, 1.60, "Administrador"),
        ("María José", "Díaz", "VALENCIA", 33, 59, 1.61, "Administrador"),
        ("Edmundo", "Pérez", "VALENCIA", 54, 75, 1.70, "Contador"),
        ("Francisco", "Sánchez", "MADRID", 33, 59, 1.61, "Administrador"),
        ("Axel", "Rose", "PONTEVEDRA", 61, 83, 1.70, "Cantante"),
        ("Selena", "López", "ALICANTE", 29, 55, 1.59, "Cantante")
)

val rdd = spark.sparkContext.parallelize(data)

val sourceDF = rdd.toDF("name", "lastname", "province", "age", "weight", "height", "profession")
sourceDF.show(false)

Luego procedemos a crear un DataFrame que será el total de registros agrupados por el campo province (provincia) y ordenado de forma descendente por el total representado por el campo count y luego ordenado de forma ascendente por el campo province. Para conseguirlo invocaremos el método groupBy (que si revisamos el API arroja un objeto del tipo RelationalGroupedDataset, es decir, NO ES un DataFrame/Dataset) invocamos el método count() (esto si nos devolverá un DataFrame) y procedemos a ordenarlo.

val groupedDF = sourceDF.groupBy("province").count().orderBy($"count".desc, $"province")
groupedDF.show(false)
  1. El primer enfoque haremos un map del DataFrame, luego la acción collect que arroja un Array[Row] y ya luego es método de Scala para convertir ese array en un Map
  2. Para el segundo haremos también un map del DataFrame y tomaremos ventaja de uno de los métodos de la clase PairRDD que es collectAsMap y esa acción ya nos arroja un Map
  3. Para el tercer enfoque haremos una conversión a un Dataset de tuplas que se traduce en Dataset[(String, Long)] y de nuevo llamamos al collectAsMap a través del RDD, la diferencia de este paso con el anterior es que a nivel de código nos ahorramos el map y esa conversión se lleva a cabo implícitamente.
  4. El cuarto enfoque digamos que es netamente Scala, ya que hacemos un collect del DataFrame groupedDF y a partir de este Array (Array[Row]) al igual que en el primer enfoque lo convertimos en un Map aunque luego hacemos un cast para convertir el Map[Any, Any] a Map[String, Long]
  5. El Bonus track, aunque aquí hacemos un poco de trampa, aprovechamos la función countByValue de la clase RDD ya que esta acción nos arroja un Map de inmediato peeeeeeeeeero a diferencia de las otras soluciones es del tipo Map[Row, Long]
val map1 = groupedDF.map(row => row.getString(0) -> row.getLong(1)).collect().toMap
val map2 = groupedDF.map(row => row.getString(0) -> row.getLong(1)).rdd.collectAsMap
val map3 = groupedDF.as[(String, Long)].rdd.collectAsMap
val map4 = groupedDF.collect.map(row => (row(0) -> row(1))).toMap.asInstanceOf[Map[String,Long]]
val map5 = sourceDF.select("province").rdd.countByValue
Resultado de cada uno de los Map obtenidos