Dataframe manipulation

    A Dataframe can be manipulated by using either a SQL syntax as specification (Spark-SQL) or by using manipulation functions applied to the dataframe object itself.

Data Manipulation using Functions

    It's a method more suitable for those who are more used to coding than to writing queries. The way you do data manipulation is simple: given a dataframe, apply transformation functions to it to get what you need.

    As an example, let's suppose you have a structured data that corresponds to a table of divisions of an imaginary IT company


divisions = [
Row(id='1', name='Data Science'),
Row(id='2', name='Big Data'),
Row(id='3', name='Artificial Inteligence'),
Row(id='4', name='Devops')
]

    Let's also have a list of employees for that company


Employee = Row("name", "email")
employees = [
Employee("John Robbins", 'jrobbins@company.com'),
Employee("Julia Silva", 'jsilva@company.com'),
Employee("Charles Darwin", 'cdarwin@company.com'),
Employee("Isacc Oliveira", 'iko@company.com'),
Employee("Simone Sanchez", 'ssanchez@company.com'),
Employee("Kauan Kuling", 'kkuling@company.com'),
Employee("Mark Spencer", 'mspencer@company.com')
]

    Notice that for the employees we are using another way of instantiating data in a Row

    Now, let's have an object holding the relationship between employees and divisions, a.k.a. who works where.


division_employees = [
Row(division = divisions[0], emplist=[employees[0], employees[1]]),
Row(division = divisions[1], emplist=[employees[0], employees[2], employees[3]]),
Row(division = divisions[2], emplist=[employees[4]]),
Row(division = divisions[3], emplist=[employees[5], employees[6]]),
]

    Now lets suppose we create two dataframes, one with the divisions 0 and 1 and the other with the divisions 2 and 3:


df1 = spark.createDataFrame([division_employees[0], division_employees[1]])
df2 = spark.createDataFrame([division_employees[2], division_employees[3]])

    Why two dataframes? for no reason, just for the sake of demonstrating our first function:  union()

    union() is a function that concatenates two dataframes:    


dfUnion = df1.union(df2)
dfUnion.show()


+--------------------+--------------------+ | division| emplist| +--------------------+--------------------+ | {1, Data Science}|[{John Robbins, j...| | {2, Big Data}|[{John Robbins, j...| |{3, Artificial In...|[{Simone Sanchez,...| | {4, Devops}|[{Kauan Kuling, k...| +--------------------+--------------------+


      The next function we'll explore is the "explode" function. It is responsible for unfold or flatten the data. You can use it when you have columns that are an array, so the "common" data is repeated much like a inner join works in a dbms,:


from pyspark.sql.functions import explode
dfExplode = dfUnion.select([col("division.name").alias("dept_name"), explode(col("emplist")).alias("employee")])
dfExplode.show()

+--------------------+--------------------+ | dept_name| employee| +--------------------+--------------------+ | Data Science|{John Robbins, jr...| | Data Science|{Julia Silva, jsi...| | Big Data|{John Robbins, jr...| | Big Data|{Charles Darwin, ...| | Big Data|{Isacc Oliveira, ...| |Artificial Inteli...|{Simone Sanchez, ...| | Devops|{Kauan Kuling, kk...| | Devops|{Mark Spencer, ms...| +--------------------+--------------------+



    Please notice that emplist holds a list of objects, each representing an employee row, while employee holds an object for a single employee row! Therefore, the column dept_name had to have repetitions

    Finally we could use selectExpr to access the properties inside the objects stored in the columns, futher flattening our data:


dfFlatten = dfExplode.selectExpr("dept_name", "employee.name", "employee.email")
dfFlatten.show()

+--------------------+--------------+--------------------+ | dept_name| name| email| +--------------------+--------------+--------------------+ | Data Science| John Robbins|jrobbins@company.com| | Data Science| Julia Silva| jsilva@company.com| | Big Data| John Robbins|jrobbins@company.com| | Big Data|Charles Darwin| cdarwin@company.com| | Big Data|Isacc Oliveira| iko@company.com| |Artificial Inteli...|Simone Sanchez|ssanchez@company.com| | Devops| Kauan Kuling| kkuling@company.com| | Devops| Mark Spencer|mspencer@company.com| +--------------------+--------------+--------------------+

 

Data Manipulation using Spark-SQL

    Let's implement the same code as above, but using Spark-SQL this time

Union:


df1.createOrReplaceTempView("v1")
df2.createOrReplaceTempView("v2")

dfUnion = spark.sql("select * from v1 union select * from v2")
dfUnion.show()
dfUnion.createOrReplaceTempView("v_union")

+--------------------+--------------------+ | division| emplist| +--------------------+--------------------+ | {1, Data Science}|[{John Robbins, j...| | {2, Big Data}|[{John Robbins, j...| | {4, Devops}|[{Kauan Kuling, k...| |{3, Artificial In...|[{Simone Sanchez,...| +--------------------+--------------------+

Explosion


dfExplode = spark.sql("select division.name as dept_name, explode(emplist) as employee from v_union")
dfExplode.show()
dfExplode.createOrReplaceTempView("v_explode")

+--------------------+--------------------+ | dept_name| employee| +--------------------+--------------------+ | Data Science|{John Robbins, jr...| | Data Science|{Julia Silva, jsi...| | Big Data|{John Robbins, jr...| | Big Data|{Charles Darwin, ...| | Big Data|{Isacc Oliveira, ...| | Devops|{Kauan Kuling, kk...| | Devops|{Mark Spencer, ms...| |Artificial Inteli...|{Simone Sanchez, ...| +--------------------+--------------------+

Flatten


dfFlatten = spark.sql("select dept_name, employee.name, employee.email from v_explode")
dfFlatten.show()

+--------------------+--------------+--------------------+ | dept_name| name| email| +--------------------+--------------+--------------------+ | Data Science| John Robbins|jrobbins@company.com| | Data Science| Julia Silva| jsilva@company.com| | Big Data| John Robbins|jrobbins@company.com| | Big Data|Charles Darwin| cdarwin@company.com| | Big Data|Isacc Oliveira| iko@company.com| | Devops| Kauan Kuling| kkuling@company.com| | Devops| Mark Spencer|mspencer@company.com| |Artificial Inteli...|Simone Sanchez|ssanchez@company.com| +--------------------+--------------+--------------------+


Comentários

Postagens mais visitadas deste blog

Data Acquisition: Connection to Relational Databases