Spark first steps - Introduction [2] - Process data using RDD x DataFrame


Processing Data using RDD

    Now,  let's see some examples of data manipulation using RDD, DataFrame and DataSet, starting with the most low-level "basic" RDD API.

    The first thing you will notice here is that it mimics or exposes the way a map-reduce paradigm should work, so what you'll get is a set of functions that perform basic functional operations to either modify the data or to reduce it (or merge it, aggregate it etc).

    Let's start with a simple example: We have a simple CSV file representing our data. It could be a lot of files in a HDFS, but for now lets keep things simple for these first steps.

Name,Job Role,Salary

Josh,Data Scientist,10000

Josh,Data Engineer,3456

Maria,Data Engineer,5654

Maria,IoT Specialist,2256

Frank,Project Manager,8756

Charles,Data Engineer,3645


    Lets now suppose we want to know how much is each person salary. Please notice that a person can have more than one job role. For each job role, he/she will receive a paycheck, so the total amount is the sum of all values.

    Using RDD we need to follow these steps:

  1.  RDD doesn't know our data schema/format, so the first thing we need to do is to transform the input  into a meaningful structured data.

  2. After that, we need to combine the data into key, value, where key is each person's name and value is the total accumulated value of each person's received payments

  3. Finally, let's order it from lower to bigger just to make things look cool
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as sf

# Creating a spark session
spark = (SparkSession.builder.appName("spark_learn_sc").getOrCreate())

# Reading our input-data into a rdd
rdd = spark.sparkContext.textFile("../examples/salary.dat")

    At this point, all we have is an array of strings, where each string is a line of our file:

rdd.collect()

['Name,Job Role,Salary', 'Josh,Data Scientist,10000', 'Josh,Data Engineer,3456', 'Maria,Data Engineer,5654', 'Maria,IoT Specialist,2256', 'Frank,Project Manager,8756', 'Charles,Data Engineer,3645']

    Let's implement the first step:

def drop_first (idx, itr):
return iter(list(itr)[1:]) if idx == 0 else itr

mapped_rdd = (rdd.mapPartitionsWithIndex (drop_first)
.map(lambda p: p.split(','))
.map(lambda p: (p[0], (p[1], int(p[2])))))

    For this step, the first map function, mapPartitionsWithIndex, will call the input function (in this case drop_first) for every line it reads while processing the input data.

    Since mapPartitionsWithIndex passes the line index as a parameter, all we need to do here is to check whether the line is the first one. If so, we return a list containing the rest of the data but the first line. If it isn't the first one, we just do nothing by returning back what we had.

    The second map function will split the CSV into cols.

    Finally, the third map function transforms it into a (key, value) format, in which the key is the employee's name while the value is a 2-tuple (job role, salary)

    Right now, our data looks like this:

[('Josh', ('Data Scientist', 10000)), ('Josh', ('Data Engineer', 3456)), ('Maria', ('Data Engineer', 5654)), ('Maria', ('IoT Specialist', 2256)), ('Frank', ('Project Manager', 8756)), ('Charles', ('Data Engineer', 3645))]

     After that, let's implement the second and third steps:

def createCombiner(val):
# val[0] = 'dept', val[1] = salary
return (val[1])

def mergeValue(accum, val):
return (accum + val[1])

def mergeCombiner(accum1, accum2):
return accum1 + accum2

res = (mapped_rdd.combineByKey(createCombiner, mergeValue, mergeCombiner)
.sortBy(lambda e: e[1]))

print(res.collect())

    In order to merge our data correctly, we use the combineByKey method, which expects 3 functions. 

    The first function will receive the "value" for each key. In our case, the value is our 2-tuple (job role, salary). It is responsible for creating a new entry for the result, when a "new key" is to be processed.

    In this example, it ignores the val[0] value, because it has nothing important, and then returns val[1], which has the salary information.

    The second function is called whenever an existing key is found. In our example, we have an accumulator that is a simple variable that will have it's value added by a salary for a new job role our task just read. 

    Bear in mind that this function is being called, instead of createCombiner, because it is not a new key. Sometime in the past, this task read that key and called createCombiner... now it is reading it again!, so it isn't supposed to call createCombiner again, but to accumulate the salary value into the "local" accumulator.  

    If we were joining more data, this accumulator could be an array, accumulating a bunch of stuff at the same time for each key.

    The third function is called when we are reducing accumulators. Supose that task1 has read lines 1 to  3 and task2 has read lines 4 to 6. Now we need to combine the two local accumulators to get the desired value for that key we are reducing.

    At this point, we already have our data ready to go. The final step is done by sortBy(), which takes a function to decide which peace of the result value should be used to sort it.

    Finally, that's our output for this example:

[('Charles', 3645), ('Maria', 7910), ('Frank', 8756), ('Josh', 13456)]    


Processing Data using DataFrame

    Since this data is structured like a table, it's possible to do the same thing using DataFrame as a higher-level API, with much less code.

    For that to work, we just need to change the read function to a new one, returning a DataFrame.

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as sf

# Creating a spark session
spark = (SparkSession.builder.appName("spark_learn_sc").getOrCreate())

# Reading our input data as a DataFrame
df:DataFrame = spark.read.csv("./examples/salary.dat", inferSchema=True, header=True)

df.show()

    Not only the data is already parsed, but also the header was used as a source for naming the columns.

+-------+---------------+------+ | Name| Job Role|Salary| +-------+---------------+------+ | Josh| Data Scientist| 10000| | Josh| Data Engineer| 3456| | Maria| Data Engineer| 5654| | Maria| IoT Specialist| 2256| | Frank|Project Manager| 8756| |Charles| Data Engineer| 3645| +-------+---------------+------+

    From here, it is easy to manipulate data with the builtin functions. So, all we need to do is to group data using Name as the key, sum the column "Salary" for each key and finally sort it properly.

res = (df.groupBy("Name").agg(sf.sum("Salary").alias("salary"))
.orderBy("salary"))

res.show()

    Finally getting the same result as before:

+-------+------+ | Name|salary| +-------+------+ |Charles| 3645| | Maria| 7910| | Frank| 8756| | Josh| 13456| +-------+------+


Comentários

Postagens mais visitadas deste blog

Data Acquisition: Connection to Relational Databases