null

Partitioning in Apache Spark

Before beginning to the partitioning concept I am thinking that everyone who would like to follow this article is aware of following.

  1. Aware of Big Data concepts
  2. Basics of advanced Python understanding
  3. Technical insight of Apache Spark installation
  4. Basics of PySpark(Spark Python API)

 

Apache Spark

Apache Spark is an open-source, distributed cluster computing framework that is used for fast processing, querying and analyzing Big Data.

 

Spark is an engine for parallel processing of data on a cluster. One important way to increase parallelism of spark processing is to increase the number of executors on the cluster. However, knowing how the data should be distributed, so that the cluster can process data efficiently is extremely important. The secret to achieve this is partitioning in Spark. 

 

Apache Spark manages data through RDDs using partitions which help parallelize distributed data processing with negligible network traffic for sending data between executors. By default, Apache Spark reads data into an RDD from the nodes that are close to it.

 

PySpark

PySpark is an API of Apache Spark which is an open-source, distributed processing system used for big data processing which was originally developed in Scala programming language. 

 

Communication is very expensive in distributed programming, thus laying out data to minimize network traffic greatly helps improve performance. Just like how a single node program should choose the right data structure for a collection of records, a spark program can control RDD partitioning to reduce communications. 

 

Partitioning

Partitioning is an important concept in apache spark as it determines how the entire hardware resources are accessed when executing any job. In apache spark, by default a partition is created for every HDFS partition of size 64MB. RDDs are automatically partitioned in spark without human intervention, however, at times the programmers would like to change the partitioning scheme by changing the size of the partitions and number of partitions based on the requirements of the application. 

 

Partitioning in Spark might not be helpful for all applications, for instance, if a RDD is scanned only once, then portioning data within the RDD might not be helpful but if a dataset is reused multiple times in various key oriented operations like joins, then partitioning data will be helpful.

 

For custom partitioning developers have to check the number of slots in the hardware and how many tasks an executor can handle to optimize performance and achieve parallelism.

 

Create virtual environment(Ubuntu16.04)

$>virtualenv  venv

 

Activate virtual environment(Ubuntu16.04)

$>source venv/bin/activate

 

venv$>vim partitions1.py

 

from pyspark.sql import SparkSession, functions as f

import pandas as pd

import numpy as np

 

spark = SparkSession \

    .builder \

    .appName("Partitioning Example") \

    .getOrCreate()

 

length = 100

names = np.random.choice(['Ram', 'Rahul', 'Maha', 'Veena', None], length)

amounts = np.random.randint(0, 1000000, length)

country = np.random.choice(

    ['India', 'United Kingdom', 'Poland', 'USA', 'Germany', None], 

    length

)

df = pd.DataFrame({'name': names, 'amount': amounts, 'country': country})

 

#df.count()

#df.show()

 

transactions = spark.createDataFrame(df)

#To get number of partitions

print('Number of partitions: {}'.format(transactions.rdd.getNumPartitions()))

print('Partitioner: {}'.format(transactions.rdd.partitioner))

print('Partitions structure: {}'.format(transactions.rdd.glom().collect()))

 

$>python partitions1.py

 

How many partitions should a Spark RDD have?

    Having too large a number of partitions or too few - is not an ideal solution. The number of partitions in spark should be decided thoughtfully based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all. Apache Spark can run a single concurrent task for every partition of an RDD, up to the total number of cores in the cluster. If a cluster has 30 cores then programmers want their RDDs to have 30 cores at the very least or maybe 2 or 3 times of that.

 

As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD.

 

Simple demonstration of partitions with cores

val rdd= sc.textFile (“file.txt”, 4)

 

The above line of code will create an RDD named textFile with 4 partitions. Suppose that we have a cluster with three cores and assume that each partition needs to process for 5 minutes. In case of the above RDD with 4 partitions, 3 partition processes will run in parallel as there are three cores and the 4th partition process will process after 5 minutes when one of the 3 cores, is free. 

 

The entire processing will be completed in 10 minutes and during the 4th partition process, the resources (remaining 3 cores) will remain idle. The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.

 

The number of partitions in a Spark RDD can always be found by using the partitions method of RDD. For the RDD that we created the partitions method will show an output of 4 partitions

 

Ex:

rdd.partitions.size

 

Output = 4

 

If an RDD has too many partitions, then task scheduling may take more time than the actual execution time. To the contrary, having too less partitions is also not beneficial as some of the worker nodes could just be sitting idle resulting in less concurrency. This could lead to improper resource utilisation and data skewing.

 

Problem with single partition: data might be skewed on a single partition and a worker node might be doing more than other worker nodes. Thus, there is always a trade off when it comes to deciding on the number of partitions.

 

Data in the same partition will always be in the same machine. Data in a partition will not span multiple machines.

 

Note: 

Default number of partitions are equal to the number of CPU core in the machine/cluster.

 

Spark can run 1 concurrent task for every partition of an RDD . In general, more numerous partitions allow work to be distributed among more workers and achieve better parallelism but fewer partitions allow work to be done in larger chunks (and often quicker if task scheduling may take more time than actual execution time).

How to Choose Number of Partitions :

  • Lower bound — 2 X number of cores in cluster available to application
  • Upper bound — task should take 100+ ms time to execute.If it is taking less time than your partitioned data is too small and your application might be spending more time in scheduling the tasks.

Two types of partitioning:

Hash Partitioning

It spreads around the data in the partitioning based upon the key value.

p=key.hashCode() %noOfPartitions

Hash partitioning can make distributed data skewed.

Range Partitioning.

It partition data either based on some sorted order OR set of sorted ranges of keys, tuples with the same range will be on the same machine.

 

Conclusion:

We understand partitioning, partition types. How memory organised while partitioning applied.

 

We will understand more details with examples in next post.

 

References:

https://spark.apache.org/

コメントの追加