Convertir un DataFrame de PySpark a diccionario

Quizás no sea lo más usual pero eso no significa que nunca nos veamos en la necesidad de crear un dict de Python a partir de los valores de un DataFrame. En esta entrada veremos algunas de las formas que tenemos para construir un dict a partir de un DataFrame. Para ello contaremos con el siguiente supuesto, Tenemos un conjunto de datos de personas y entre la información que poseen estará presente la provincia donde viven y nos piden generar un dict que contabilice el número de personas agrupadas por provincia. Vamos a ello

Vamos a empezar creando un DataFrame que será nuestro conjunto de datos de origen

data = [("Jose", "Rodríguez", "MADRID", 40, 74, 1.63, "Ingeniero de Sistemas"),
        ("María Isabel", "Perez", "MADRID", 38, 58, 1.65, "Médico"),
        ("Antonio José", "Rodríguez", "VALENCIA", 27, 60, 1.68, "Nutricionista"),
        ("Norma", "MARTINEZ", "TOLEDO", 63, 65, 1.54, "Chef"),
        ("Adriana", "Peña Hernandez", "PONTEVEDRA", 59, 59, 1.56, "Economista"),
        ("Jesús", "Marquez", "ALICANTE", 33, 67, 1.78, "Administrador"),
        ("Josefina", "Petón Marquez", "VALENCIA", 25, 54, 1.60, "Administrador"),
        ("María José", "Díaz", "VALENCIA", 33, 59, 1.61, "Administrador"),
        ("Edmundo", "Pérez", "VALENCIA", 54, 75, 1.70, "Contador"),
        ("Francisco", "Sánchez", "MADRID", 33, 59, 1.61, "Administrador"),
        ("Axel", "Rose", "PONTEVEDRA", 61, 83, 1.70, "Cantante"),
        ("Selena", "López", "ALICANTE", 29, 55, 1.59, "Cantante")
]

rdd = spark.sparkContext.parallelize(data)

sourceDF = rdd.toDF(["name", "lastname", "province", "age", "weight", "height", "profession"])
sourceDF.show(truncate = False)

Luego procedemos a crear un DataFrame que será el total de registros agrupados por el campo province (provincia) y ordenado de forma descendente por el total representado por el campo count y luego ordenado de forma ascendente por el campo province. Para conseguirlo invocaremos el método groupBy (que si revisamos el API arroja un objeto del tipo GroupedData, es decir, NO ES un DataFrame) invocamos el método count() (esto si nos devolverá un DataFrame) y procedemos a ordenarlo.

from pyspark.sql.functions import desc, asc

groupedDF = sourceDF.groupBy("province").count().orderBy(desc("count"), asc("province"))
groupedDF.show(truncate = False)
  1. El primer enfoque haremos un map del DataFrame, luego la acción collect que arroja una Lista de Row y ya luego es puro Python iterando la lista para generar un dict
  2. Para el segundo nos apoyaremos en la popular biblioteca Pandas y su integración con Apache Spark, haciendo inicialmente una conversión a un DataFrame pero de Pandas y luego invocar el método to_dict() que nos generara el diccionario.
  3. El Bonus track, aunque aquí hacemos un poco de trampa, aprovechamos la función countByValue de la clase RDD ya que esta acción nos arroja un dict de inmediato peeeeeeeeeero a diferencia de las otras soluciones es del tipo dict[Row, Long]
list_of_rows = groupedDF.collect()
dict1 = { row['province']: row['count'] for row in list_of_rows }

dict2 = groupedDF.toPandas().to_dict()

dict3 = sourceDF.select("province").rdd.countByValue()

print(dict1)
print(dict2)
print(dict3)
Resultado de cada uno de los diccionarios obtenidos