Cómo crear un DataFrame en PySpark

En PySpark existen distintas maneras de crear u obtener un DataFrame. Las formas más habituales son a partir de la lectura desde una fuente de datos como puede ser un fichero de texto (Texto no delimitado, CSV, TSV, JSON, XML), ficheros en formato Columnar (Parquet, ORC), ficheros serializados en formatos de fila (Avro), bases de datos NoSQL (Cassandra, MongoDB, …), bases de datos relacionales (MySQL, MariaDB, MSSQL, Oracle, …), formatos de tablas abierto (Open Table Format como podrían ser Delta, Iceberg y Hudi), pero no son los únicos, existe una forma no tan común y es la creación de DataFrames a partir de datos estáticos, es decir, datos constantes y la veremos a continuación.

Mostraremos a continuación la creación de un DataFrame a partir de datos en línea mediante el uso de:

  • Lista (diccionario y tupla)
  • RDD (rdd.toDF, spark.createDataFrame con y sin esquema)
  • Row
  • EmptyDataFrame
  • EmptyDataFrame con esquema

Antes de empezar es necesario comentar que los ejemplos se realizaron en una cuenta community de Databricks por lo cual por defecto es inyectada a cada notebook una variable denominada spark que es del tipo SparkSession (https://docs.databricks.com/en/spark/index.html#how-does-apache-spark-work-on-databricks)

Creando un DataFrame a partir de una lista o conjunto de valores

Empezaremos el primer de los ejemplos con una lista de diccionarios, en este caso los nombre de las columnas son obtenidos a partir de las claves de los diccionarios y si no lo indicamos el esquema es inferido a partir de los datos

data1 = [{"name":"Jose", "age":40, "weight":74, "height":1.63}, 
         {"name":"María Isabel", "age":38, "weight":58, "height":1.65}, 
         {"name":"Antonio", "age":27, "weight":60, "height":1.68}, 
         {"name":"Norma", "age":63, "weight":65, "height":1.54}]

#Creando DF a partir de una lista de Dict, por lo tanto tendrá nombre de columna pero el esquema será inferido
df1 = spark.createDataFrame(data1)
df1.printSchema()
df1.show()

Para el siguiente ejemplo, crearemos el DataFrame a partir de una lista pero en esta caso de duplas, por lo cual, la herramienta aunque será capaz de inferir el esquema a partir de los datos le otorgará los nombres por defecto a las columnas.

data2 = [("Paco", 44, 75, 1.83), 
         ("Martina", 28, 59, 1.66), 
         ("Iker", 31, 60, 1.68), 
         ("Idoia", 36, 57, 1.63)]

#Creando DF a partir de una lista de Tuplas, el DF resultante tendrá los nombres de columna por defecto _1, _2, ... y el esquema de igual forma será inferido
df2 = spark.createDataFrame(data2)
df2.printSchema()
df2.show()

Para el último ejemplo crearemos un DataFrame y le pasaremos un esquema

from pyspark.sql.types import *
schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("weight", IntegerType(), True),
   StructField("height", DoubleType(), True)])

#Se genera un DF a partir de una lista de Dict y se indica el esquema de los datos (NO será inferido)
df3 = spark.createDataFrame(data1, schema)
df3.printSchema()
df3.show()

Creando un DataFrame a partir de un RDD

rdd = spark.sparkContext.parallelize(data2)

#Creando DF a partir de una un RDD, el DF resultante tendrá los nombres de columna por defecto _1, _2, ... y el esquema de igual forma será inferido
df4 = rdd.toDF()
df4.printSchema()
df4.show()

En el ejemplo anterior el DataFrame generado posee como nombres de columnas los valores por defecto, esto debido a que el RDD generado si creó a partir de la lista de tuplas, para solventar esa situación al invocar el método red.toDF le podemos indicar un array de cadenas de caracteres como se muestra a continuación

#Creando DF a partir de un RDD, Los nombres de columna son pasados como parámetro el esquema será inferido
df5 = rdd.toDF(["name", "age", "weight", "height"])
df5.printSchema()
df5.show()

También podemos pasarle el esquema al invocar le método rdd.toDF y de esta forma evitamos que el esquema sea inferido como veremos ahora

#Creando DF a partir de uun RDD y el esquema es pasado como parámetro
df6 = rdd.toDF(schema)
df6.printSchema()
df6.show()

Creando un DataFrame a elementos Row

Podemos utilizar el método createDataFrame está vez acompañado de una lista de elementos Row con y sin esquema como hemos podido hacer en los ejemplos anterior. En nuestro primer ejemplo lo haremos sin esquema y por lo tanto será inferido y luego le especificaremos el esquema

from pyspark.sql import Row

#Creando un DF a partir de elementos Row
Person = Row('name', 'age', 'weight', 'height')
persons = list(map(lambda p: Person(p[0], p[1], p[2], p[3]), data2))
df7 = spark.createDataFrame(persons)
df7.printSchema()
df7.show()
from pyspark.sql import Row

#Creando un DF a partir de elementos Row con esquema
Person = Row('name', 'age', 'weight', 'height')
persons = list(map(lambda p: Person(p[0], p[1], p[2], p[3]), data2))
df8 = spark.createDataFrame(persons, schema)
df8.printSchema()
df8.show()

Creando un DataFrame vacío sin esquema

#Se crea un DF vacío y sin esquema
df9 = spark.createDataFrame([], StructType([]))
df9.printSchema()
df9.show()

Creando un DataFrame vacío con esquema

#Se crea un DF vacío con esquema
df10 = spark.createDataFrame([], schema)
df10.printSchema()
df10.show()