En Spark existen distintas maneras de crear u obtener un DataFrame. Las formas más habituales son a partir de la lectura desde una fuente de datos como puede ser un fichero de texto (Texto no delimitado, CSV, TSV, JSON, XML), ficheros en formato Columnar (Parquet, ORC), ficheros serializados en formatos de fila (Avro), bases de datos NoSQL (Cassandra, MongoDB, …), bases de daos relacionales (MySQL, MariaDB, MSSQL, Oracle, …), formatos de tablas abierto (Open Table Format como podrían ser Delta, Iceberg y Hudi), pero no son los únicos, existe una forma no tan común y es la creación de DataFrames a partir de datos estáticos, es decir, datos constantes y la veremos a continuación.
Mostraremos a continuación la creación de un DataFrame a partir de datos en línea mediante el uso de:
- Seq (Con y sin esquema)
- RDD (rdd.toDF, spark.createDataFrame con y sin esquema)
- Row (con y sin esquema)
- GenericRowWithSchema
- GenericRow
- EmptyDataFrame
- EmptyDataFrame con esquema
Antes de empezar es necesario comentar que los ejemplos se realizaron en una cuenta community de Databricks por lo cual por defecto es inyectada a cada notebook una variable denominada spark que es del tipo SparkSession (https://docs.databricks.com/en/spark/index.html#how-does-apache-spark-work-on-databricks)
Creando un DataFrame a partir de una secuencia o conjunto (Sequencial) de valores
Empezaremos creando un DataFrame a partir de un conjunto de datos (Seq) de datos, en este caso nos apoyaremos del implicits -> Seq.toDF, a su vez es importante destacar que el schema es inferido a partir de los mismos datos y los nombres de columnas son asignados por defecto: _1, _2, …
// SparkSession inyectado al notebook por ende no es necesario instanciarlo
// Empezando a partir de un Seq
import spark.implicits._
val columns = Seq("name","age","weight", "height")
val data1 = Seq(("Jose", 40, 74, 1.63), ("María Isabel", 38, 58, 1.65), ("Antonio", 27, 60, 1.68), ("Norma", 63, 65, 1.54))
// Creando DF a partir de un Seq de datos utilizando implicits -> Seq.toDF
// El schema es inferido y los nombres de columnas son asignados por defecto: _1, _2, ...
val dfFromData1 = data1.toDF()
dfFromData1.printSchema()
dfFromData1.show()
Ahora repetiremos el ejemplo anterior pero apoyándonoslas en una case class denominada Person cuyos atributos serán: Name, Age, Weight y Height. De esta manera ya no se inferirá el schema a partir de los datos sino que se hará a partir de la estructura de la case class y para constatarnos de ello indicaremos que los atributos Age y Weight serán de tipo Short (Entero corto)
//A partir de un Seq y una case class como Schema
case class Person(name: String, age: Short, weight: Short, height: Double)
val data2 = Seq(new Person("Jose", 40, 74, 1.63), new Person("María Isabel", 38, 58, 1.65), new Person("Antonio", 27, 60, 1.68), new Person("Norma", 63, 65, 1.54))
// Creando DF a partir de un Seq de datos utilizando implicits -> Seq.toDF
// El schema es inferido y los nombres de columnas son asignados por defecto: _1, _2, ...
val dfFromData2 = data2.toDF()
dfFromData2.printSchema()
dfFromData2.show()
Creando un DataFrame a partir de un RDD
Para ello los siguientes ejemplos serán una continuación de los anteriores y por lo tanto reutilizaremos variables como serán la secuencia data1, así como también ya asumiremos que spark.implicits está importada, etc.
En este ejemplo podremos ver que el schema es inferido y al no poseer nombres de columnas se le otorgarán los valores por defecto que como pudimos apreciar en nuestro primer ejemplo son _1, _2, _3 y _4
// Creando DF a partir de un RDD utilizando implicits -> rdd.toDF
// El schema es inferido y los nombres de columnas son asignados por defecto: _1, _2, ...
val rdd = spark.sparkContext.parallelize(data1)
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
dfFromRDD1.show()
En el próximo ejemplo reutilizaremos el mismo RDD pero ahora le otorgaremos nombre a las columnas utilizando el array columns declarado en el primer ejemplo
// Creando DF a partir de un RDD utilizando implicits -> rdd.toDF
// El schema es inferido y los nombres de columnas pasados como argumentos al método toDF
val dfFromRDD1WithColumnNames = rdd.toDF(columns:_*)
dfFromRDD1WithColumnNames.printSchema()
dfFromRDD1WithColumnNames.show()
Ahora el mismo ejemplo anterior pero utilizando el método createDataFrame
// Creando DF a partir de método createDataFrame (de SparkSession) utilizando un RDD como argumento
// El schema es inferido y los nombres de columnas pasados como argumentos al método toDF
val dfFromRDDCreateDataFrameWithColumnNames = spark.createDataFrame(rdd).toDF(columns:_*)
dfFromRDDCreateDataFrameWithColumnNames.printSchema()
dfFromRDDCreateDataFrameWithColumnNames.show()
Existe una sobrecarga del método createDataFrame pero que en vez de recibir un RDD como argumento, recibe un conjunto de datos (Seq). De igual manera, en este caso el esquema es inferido, pero se toma los nombres de columnas del array columns
// Creando DF a partir de método createDataFrame (de SparkSession) utilizando un Seq como argumento
// El schema es inferido y los nombres de columnas pasados como argumentos al método toDF
val dfFromData1CreateDataFrameWithColumnNames = spark.createDataFrame(data1).toDF(columns:_*)
dfFromData1CreateDataFrameWithColumnNames.printSchema()
dfFromData1CreateDataFrameWithColumnNames.show()
Por ultimo mostraremos otra de las tantas sobrecargas del método createDataFrame pero usaremos la colección de elementos de nombre data2 que no es más que un conjunto de datos de tipo Person, por lo tanto el esquema del DataFrame vendrá dado por la estructura de las case class
val dfFromData2CreateDataFrame = spark.createDataFrame(data2)
dfFromData2CreateDataFrame.printSchema()
dfFromData2CreateDataFrame.show()
Creando un DataFrame a partir de un conjunto de elementos Row usando el método createDataFrame con y sin esquema
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructField, StructType}
import org.apache.spark.sql.Row
val schema = StructType( Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("weight", IntegerType, true),
StructField("height", DoubleType, true)
))
//From Data (USING createDataFrame and Adding schema using StructType)
val rowData = Seq(Row("Jose", 40, 74, 1.63),
Row("María Isabel", 38, 58, 1.65),
Row("Antonio", 27, 60, 1.68),
Row("Norma", 63, 65, 1.54))
// Otra forma, mas dinamica para generar rowData
//val rowData = data.map(dataElement => Row.fromTuple(dataElement))
// Creando DF a partir de método createDataFrame (de SparkSession) utilizando un RDD (sparkContext.parallelize) como argumento.
// El RDD se creó a partir de un conjunto de instancias Row
// El schema es pasado como parametro al crear el DataFrame
val dfFromRDDCreateDataFrameRowWithSchema = spark.createDataFrame(spark.sparkContext.parallelize(rowData),schema)
dfFromRDDCreateDataFrameRowWithSchema.printSchema()
dfFromRDDCreateDataFrameRowWithSchema.show()
La diferencia del ejemplo anterior con el que veremos a continuación, radica únicamente en que en este último cada fila o instancia de tipo Row ya posee un esquema por si sola y aunque no lo había mencionado aun, en todos los ejemplos donde se pasa un esquema como parámetro, si los datos no llegan a cumplir con dicho esquema se lanzaría entonces una excepción en tiempo de ejecución.
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
val rowDataWithSchema: Seq[Row] = Seq(new GenericRowWithSchema(Array("Jose", 40, 74, 1.63), schema),
new GenericRowWithSchema(Array("María Isabel", 38, 58, 1.65), schema),
new GenericRowWithSchema(Array("Antonio", 27, 60, 1.68), schema),
new GenericRowWithSchema(Array("Norma", 63, 65, 1.54), schema))
// Otra forma, mas dinamica para generar rowDataWithSchema
// val rowDataWithSchema: Seq[Row] = data.map(dataElement => new GenericRowWithSchema(dataElement.productIterator.toArray, schema))
// Creando DF a partir de método createDataFrame (de SparkSession) utilizando un RDD (sparkContext.parallelize) como argumento.
// El RDD se creó a partir de un conjunto (Seq) de instancias GenericRowWithSchema
// El schema es pasado como parametro al crear el DataFrame
val dfFromRDDCreateDataFrameGenericRowWithSchema = spark.createDataFrame(spark.sparkContext.parallelize(rowDataWithSchema), schema)
dfFromRDDCreateDataFrameGenericRowWithSchema.printSchema()
dfFromRDDCreateDataFrameGenericRowWithSchema.show()
Por último haremos un ejemplo pero esta vez usando instancias de la clase GenericRow que a diferencia del ejemplo anterior no posee esquema y en esta ocasión si utilizamos la inicialización dinámica que teníamos comentada en otros ejemplos
import org.apache.spark.sql.catalyst.expressions.GenericRow
val rowDataWithoutSchema: Seq[Row] = data1.map(dataElement => new GenericRow(dataElement.productIterator.toArray))
// Creando DF a partir de método createDataFrame (de SparkSession) utilizando un RDD (sparkContext.parallelize) como argumento.
// El RDD se creó a partir de un conjunto (Seq) de instancias GenericRow
// El schema es pasado como parametro al crear el DataFrame
val dfFromRDDCreateDataFrameGenericRowWithSchema2 = spark.createDataFrame(spark.sparkContext.parallelize(rowDataWithoutSchema), schema)
dfFromRDDCreateDataFrameGenericRowWithSchema2.printSchema()
dfFromRDDCreateDataFrameGenericRowWithSchema2.show()
Creación de un DataFrame vacío
Puede darse el caso por ejemplo si tenemos una función que devuelva un DataFrame, puede darse la circunstancia que para cumplir este criterio sea necesario al menos devolver un DataFrame vacío, para este tipo de casos mostraremos una de las formas de como crear un DataFrame vacío sin esquema y además será un recurso adicional que llegaremos a conocer en nuestro paso a dominar Spark o certificarnos.
//Método devuelve un DF sin filas ni columnas
val dfEmptyDataFrame= spark.emptyDataFrame
dfEmptyDataFrame.printSchema()
dfEmptyDataFrame.show()
Creación de un DataFrame vacío con esquema
//Creación de un DF vacío con esquema
val dfEmptyDataFrameWithSchema = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
dfEmptyDataFrameWithSchema.printSchema()
dfEmptyDataFrameWithSchema.show()
Espero que todo este contenido sea de mucha utilidad en la formación y aprendizaje de un framework tan relevante como es Spark.